18 minute read

Build production audio segmentation systems that detect boundaries in real-time using interval merging and temporal processing—the same principles from merge intervals and event stream processing.

Problem Statement

Design a Real-time Audio Segmentation System that detects and merges speech segments, speaker boundaries, and audio events in streaming audio with minimal latency.

Functional Requirements

  1. Voice Activity Detection: Detect speech vs silence boundaries
  2. Speaker change detection: Identify speaker turn boundaries
  3. Segment merging: Merge adjacent segments intelligently
  4. Real-time processing: <100ms latency for streaming audio
  5. Boundary refinement: Smooth and optimize segment boundaries
  6. Multi-channel support: Handle stereo/multi-mic audio
  7. Quality metrics: Calculate segmentation accuracy
  8. Format support: Handle various audio formats and sample rates

Non-Functional Requirements

  1. Latency: p95 < 100ms for boundary detection
  2. Accuracy: >95% F1-score for segment detection
  3. Throughput: Process 1000+ audio streams concurrently
  4. Real-time factor: <0.1x (process 10min audio in 1min)
  5. Memory: <100MB per audio stream
  6. CPU efficiency: <5% CPU per stream
  7. Robustness: Handle noise, varying quality

Understanding the Problem

Audio segmentation is critical for speech applications:

Real-World Use Cases

Company Use Case Latency Requirement Scale
Zoom Meeting segmentation Real-time (<100ms) 300M+ meetings/day
Google Meet Speaker turn detection Real-time (<50ms) Billions of minutes
Otter.ai Transcript segmentation Near real-time 10M+ hours
Amazon Alexa Wake word detection Real-time (<50ms) 100M+ devices
Microsoft Teams Audio preprocessing Real-time Enterprise scale
Apple Siri Voice command boundaries Real-time (<30ms) Billions of requests

Why Segmentation Matters

  1. Speech recognition: Better boundaries → better transcription
  2. Speaker diarization: Prerequisite for “who spoke when”
  3. Audio indexing: Enable search within audio
  4. Compression: Skip silence to reduce data
  5. User experience: Show real-time captions with proper breaks
  6. Quality of service: Detect issues (silence, noise)

The Interval Processing Connection

Just like Merge Intervals and Event Stream Processing:

Merge Intervals Event Streams Audio Segmentation
Merge overlapping ranges Merge event windows Merge audio segments
Sort by start time Event ordering Temporal ordering
Greedy merging Window aggregation Boundary merging
Overlap detection Event correlation Segment alignment
O(N log N) Buffer + process Sliding window

All three deal with temporal data requiring efficient interval/boundary processing.

High-Level Architecture

┌─────────────────────────────────────────────────────────────────┐
│              Real-time Audio Segmentation System                 │
└─────────────────────────────────────────────────────────────────┘

        Audio Input (Streaming)
        16kHz PCM, Real-time
                ↓
    ┌───────────────────────┐
    │   Audio Buffering     │
    │   - Ring buffer       │
    │   - Overlap handling  │
    └───────────┬───────────┘
                │
    ┌───────────▼───────────┐
    │   Feature Extraction  │
    │   - MFCCs            │
    │   - Energy           │
    │   - Zero crossings   │
    └───────────┬───────────┘
                │
    ┌───────────▼───────────┐
    │   VAD (Voice Activity)│
    │   - WebRTC VAD       │
    │   - ML-based VAD     │
    └───────────┬───────────┘
                │
    ┌───────────▼───────────┐
    │   Boundary Detection  │
    │   - Energy changes   │
    │   - Spectral changes │
    │   - ML classifier    │
    └───────────┬───────────┘
                │
    ┌───────────▼───────────┐
    │   Segment Merging     │
    │   (Like Merge         │
    │    Intervals!)        │
    │   - Min duration     │
    │   - Max gap          │
    └───────────┬───────────┘
                │
    ┌───────────▼───────────┐
    │   Boundary Refinement │
    │   - Smooth edges     │
    │   - Snap to zero     │
    │     crossings        │
    └───────────┬───────────┘
                │
        Segmented Audio
        [(start, end, label)]

Key Components

  1. Audio Buffering: Manage streaming audio with overlaps
  2. VAD: Detect speech vs non-speech
  3. Boundary Detection: Find segment boundaries
  4. Segment Merging: Merge intervals (same algorithm!)
  5. Refinement: Optimize boundaries

Component Deep-Dives

1. Audio Segmentation with Interval Merging

The core algorithm is exactly merge intervals:

import numpy as np
from typing import List, Tuple, Optional
from dataclasses import dataclass
import librosa

@dataclass
class AudioSegment:
    """
    Audio segment with time boundaries.
    
    Exactly like intervals in merge intervals problem:
    - start: segment start time (seconds)
    - end: segment end time (seconds)
    - label: segment type ("speech", "silence", "speaker_A", etc.)
    """
    start: float
    end: float
    label: str = "speech"
    confidence: float = 1.0
    
    @property
    def duration(self) -> float:
        return self.end - self.start
    
    def overlaps(self, other: 'AudioSegment') -> bool:
        """
        Check if this segment overlaps with another.
        
        Same as interval overlap check:
        max(start1, start2) <= min(end1, end2)
        """
        return max(self.start, other.start) <= min(self.end, other.end)
    
    def merge(self, other: 'AudioSegment') -> 'AudioSegment':
        """
        Merge this segment with another.
        
        Same as merging intervals:
        - New start = min of starts
        - New end = max of ends
        """
        return AudioSegment(
            start=min(self.start, other.start),
            end=max(self.end, other.end),
            label=self.label,
            confidence=min(self.confidence, other.confidence)
        )
    
    def to_samples(self, sample_rate: int) -> Tuple[int, int]:
        """Convert time to sample indices."""
        start_sample = int(self.start * sample_rate)
        end_sample = int(self.end * sample_rate)
        return start_sample, end_sample


class AudioSegmenter:
    """
    Audio segmentation using interval merging.
    
    This is the merge intervals algorithm applied to audio!
    """
    
    def __init__(
        self,
        min_segment_duration: float = 0.3,
        max_gap: float = 0.2,
        sample_rate: int = 16000
    ):
        """
        Initialize segmenter.
        
        Args:
            min_segment_duration: Minimum segment length (seconds)
            max_gap: Maximum gap to merge over (seconds)
            sample_rate: Audio sample rate
        """
        self.min_segment_duration = min_segment_duration
        self.max_gap = max_gap
        self.sample_rate = sample_rate
    
    def merge_segments(self, segments: List[AudioSegment]) -> List[AudioSegment]:
        """
        Merge audio segments.
        
        This is EXACTLY the merge intervals algorithm!
        
        Steps:
        1. Sort segments by start time
        2. Merge overlapping/close segments
        3. Filter short segments
        
        Args:
            segments: List of audio segments
            
        Returns:
            Merged segments
        """
        if not segments:
            return []
        
        # Step 1: Sort by start time (like merge intervals)
        sorted_segments = sorted(segments, key=lambda s: s.start)
        
        # Step 2: Merge overlapping or close segments
        merged = [sorted_segments[0]]
        
        for current in sorted_segments[1:]:
            last = merged[-1]
            
            # Check if should merge
            # Overlap OR gap <= max_gap
            gap = current.start - last.end
            
            if gap <= self.max_gap and current.label == last.label:
                # Merge (like merging intervals)
                merged[-1] = last.merge(current)
            else:
                # No merge - add new segment
                merged.append(current)
        
        # Step 3: Filter short segments
        filtered = [
            seg for seg in merged
            if seg.duration >= self.min_segment_duration
        ]
        
        return filtered
    
    def segment_by_vad(
        self,
        audio: np.ndarray,
        vad_probs: np.ndarray,
        frame_duration_ms: float = 30.0
    ) -> List[AudioSegment]:
        """
        Create segments from VAD probabilities.
        
        Args:
            audio: Audio waveform
            vad_probs: VAD probabilities per frame (0=silence, 1=speech)
            frame_duration_ms: Duration of each VAD frame
            
        Returns:
            List of speech segments
        """
        frame_duration_sec = frame_duration_ms / 1000.0
        
        # Find speech frames (threshold at 0.5)
        speech_frames = vad_probs > 0.5
        
        # Convert to segments
        segments = []
        in_speech = False
        segment_start = 0.0
        
        for i, is_speech in enumerate(speech_frames):
            current_time = i * frame_duration_sec
            
            if is_speech and not in_speech:
                # Speech started
                segment_start = current_time
                in_speech = True
            
            elif not is_speech and in_speech:
                # Speech ended
                segment_end = current_time
                segments.append(AudioSegment(
                    start=segment_start,
                    end=segment_end,
                    label="speech"
                ))
                in_speech = False
        
        # Handle case where speech continues to end
        if in_speech:
            segment_end = len(speech_frames) * frame_duration_sec
            segments.append(AudioSegment(
                start=segment_start,
                end=segment_end,
                label="speech"
            ))
        
        # Merge segments (interval merging!)
        return self.merge_segments(segments)
    
    def find_gaps(self, segments: List[AudioSegment]) -> List[AudioSegment]:
        """
        Find silence gaps between speech segments.
        
        Similar to finding gaps in merge intervals problem.
        """
        if len(segments) < 2:
            return []
        
        # Sort segments
        sorted_segments = sorted(segments, key=lambda s: s.start)
        
        gaps = []
        
        for i in range(len(sorted_segments) - 1):
            current_end = sorted_segments[i].end
            next_start = sorted_segments[i + 1].start
            
            gap_duration = next_start - current_end
            
            if gap_duration > 0:
                gaps.append(AudioSegment(
                    start=current_end,
                    end=next_start,
                    label="silence"
                ))
        
        return gaps
    
    def refine_boundaries(
        self,
        audio: np.ndarray,
        segments: List[AudioSegment]
    ) -> List[AudioSegment]:
        """
        Refine segment boundaries by snapping to zero crossings.
        
        This reduces audio artifacts at boundaries.
        """
        refined = []
        
        for segment in segments:
            # Convert to samples
            start_sample, end_sample = segment.to_samples(self.sample_rate)
            
            # Find nearest zero crossing for start
            start_refined = self._find_nearest_zero_crossing(
                audio,
                start_sample,
                search_window=int(0.01 * self.sample_rate)  # 10ms
            )
            
            # Find nearest zero crossing for end
            end_refined = self._find_nearest_zero_crossing(
                audio,
                end_sample,
                search_window=int(0.01 * self.sample_rate)
            )
            
            # Convert back to time
            refined_segment = AudioSegment(
                start=start_refined / self.sample_rate,
                end=end_refined / self.sample_rate,
                label=segment.label,
                confidence=segment.confidence
            )
            
            refined.append(refined_segment)
        
        return refined
    
    def _find_nearest_zero_crossing(
        self,
        audio: np.ndarray,
        sample_idx: int,
        search_window: int = 160
    ) -> int:
        """Find nearest zero crossing to given sample."""
        start = max(0, sample_idx - search_window)
        end = min(len(audio), sample_idx + search_window)
        
        # Find zero crossings
        window = audio[start:end]
        zero_crossings = np.where(np.diff(np.sign(window)))[0]
        
        if len(zero_crossings) == 0:
            return sample_idx
        
        # Find closest to target
        target_pos = sample_idx - start
        closest_zc = zero_crossings[
            np.argmin(np.abs(zero_crossings - target_pos))
        ]
        
        return start + closest_zc

2. Real-time VAD with WebRTC

import webrtcvad
from collections import deque

class RealtimeVAD:
    """
    Real-time Voice Activity Detection.
    
    Uses WebRTC VAD for low-latency detection.
    """
    
    def __init__(
        self,
        sample_rate: int = 16000,
        frame_duration_ms: int = 30,
        aggressiveness: int = 2
    ):
        """
        Initialize VAD.
        
        Args:
            sample_rate: Audio sample rate (8000, 16000, 32000, 48000)
            frame_duration_ms: Frame duration (10, 20, 30 ms)
            aggressiveness: VAD aggressiveness (0-3, higher = more aggressive)
        """
        self.vad = webrtcvad.Vad(aggressiveness)
        self.sample_rate = sample_rate
        self.frame_duration_ms = frame_duration_ms
        self.frame_length = int(sample_rate * frame_duration_ms / 1000)
        
        # Buffer for incomplete frames
        self.buffer = bytearray()
        
        # Smoothing buffer
        self.smoothing_window = 5
        self.recent_results = deque(maxlen=self.smoothing_window)
    
    def process_chunk(self, audio_chunk: np.ndarray) -> List[bool]:
        """
        Process audio chunk and return VAD decisions.
        
        Args:
            audio_chunk: Audio samples (int16)
            
        Returns:
            List of VAD decisions (True = speech, False = silence)
        """
        # Convert to bytes
        audio_bytes = (audio_chunk * 32767).astype(np.int16).tobytes()
        self.buffer.extend(audio_bytes)
        
        results = []
        
        # Process complete frames
        frame_bytes = self.frame_length * 2  # 2 bytes per sample (int16)
        
        while len(self.buffer) >= frame_bytes:
            # Extract frame
            frame = bytes(self.buffer[:frame_bytes])
            self.buffer = self.buffer[frame_bytes:]
            
            # Run VAD
            is_speech = self.vad.is_speech(frame, self.sample_rate)
            
            # Apply smoothing
            self.recent_results.append(is_speech)
            smoothed = sum(self.recent_results) > len(self.recent_results) // 2
            
            results.append(smoothed)
        
        return results


class StreamingSegmenter:
    """
    Streaming audio segmenter.
    
    Processes audio in real-time, emitting segments as they complete.
    """
    
    def __init__(self, sample_rate: int = 16000):
        self.sample_rate = sample_rate
        self.vad = RealtimeVAD(sample_rate=sample_rate)
        self.segmenter = AudioSegmenter(sample_rate=sample_rate)
        
        # Streaming state
        self.current_segment: Optional[AudioSegment] = None
        self.completed_segments: List[AudioSegment] = []
        self.current_time = 0.0
        
        # Buffering for boundary refinement
        self.audio_buffer = deque(maxlen=sample_rate * 5)  # 5 seconds
    
    def process_audio_chunk(
        self,
        audio_chunk: np.ndarray,
        chunk_duration_ms: float = 100.0
    ) -> List[AudioSegment]:
        """
        Process audio chunk and return completed segments.
        
        Similar to processing events in stream processing:
        - Buffer incoming data
        - Detect boundaries
        - Emit completed segments
        
        Args:
            audio_chunk: Audio samples
            chunk_duration_ms: Chunk duration
            
        Returns:
            List of newly completed segments
        """
        # Add to buffer
        self.audio_buffer.extend(audio_chunk)
        
        # Run VAD
        vad_results = self.vad.process_chunk(audio_chunk)
        
        # Update segments
        frame_duration = self.vad.frame_duration_ms / 1000.0
        completed = []
        
        for is_speech in vad_results:
            if is_speech:
                if self.current_segment is None:
                    # Start new segment
                    self.current_segment = AudioSegment(
                        start=self.current_time,
                        end=self.current_time + frame_duration,
                        label="speech"
                    )
                else:
                    # Extend current segment
                    self.current_segment.end = self.current_time + frame_duration
            else:
                if self.current_segment is not None:
                    # End current segment
                    # Check if meets minimum duration
                    if self.current_segment.duration >= self.segmenter.min_segment_duration:
                        completed.append(self.current_segment)
                    
                    self.current_segment = None
            
            self.current_time += frame_duration
        
        return completed

3. Speaker Change Detection

from scipy.signal import find_peaks

class SpeakerChangeDetector:
    """
    Detect speaker change boundaries in audio.
    
    Uses spectral change detection + embedding similarity.
    """
    
    def __init__(self, sample_rate: int = 16000):
        self.sample_rate = sample_rate
    
    def detect_speaker_changes(
        self,
        audio: np.ndarray,
        frame_size: int = 1024,
        hop_length: int = 512
    ) -> List[float]:
        """
        Detect speaker change points.
        
        Algorithm:
        1. Compute spectral features per frame
        2. Calculate frame-to-frame distance
        3. Find peaks in distance (speaker changes)
        4. Return change point times
        
        Returns:
            List of change point times (seconds)
        """
        # Compute MFCC features
        mfccs = librosa.feature.mfcc(
            y=audio,
            sr=self.sample_rate,
            n_mfcc=13,
            n_fft=frame_size,
            hop_length=hop_length
        )
        
        # Compute frame-to-frame distance
        distances = np.zeros(mfccs.shape[1] - 1)
        
        for i in range(len(distances)):
            distances[i] = np.linalg.norm(mfccs[:, i+1] - mfccs[:, i])
        
        # Smooth distances
        from scipy.ndimage import gaussian_filter1d
        distances_smooth = gaussian_filter1d(distances, sigma=2)
        
        # Find peaks (speaker changes)
        peaks, _ = find_peaks(
            distances_smooth,
            height=np.percentile(distances_smooth, 75),
            distance=int(1.0 * self.sample_rate / hop_length)  # Min 1 second apart
        )
        
        # Convert to times
        change_times = [
            peak * hop_length / self.sample_rate
            for peak in peaks
        ]
        
        return change_times
    
    def segment_by_speaker(
        self,
        audio: np.ndarray,
        change_points: List[float]
    ) -> List[AudioSegment]:
        """
        Create segments based on speaker changes.
        
        Args:
            audio: Audio waveform
            change_points: Speaker change times
            
        Returns:
            List of speaker segments
        """
        if not change_points:
            # Single speaker
            return [AudioSegment(
                start=0.0,
                end=len(audio) / self.sample_rate,
                label="speaker_0"
            )]
        
        segments = []
        
        # First segment
        segments.append(AudioSegment(
            start=0.0,
            end=change_points[0],
            label="speaker_0"
        ))
        
        # Middle segments
        for i in range(len(change_points) - 1):
            speaker_id = i % 2  # Alternate speakers (simplified)
            segments.append(AudioSegment(
                start=change_points[i],
                end=change_points[i + 1],
                label=f"speaker_{speaker_id}"
            ))
        
        # Last segment
        last_speaker = (len(change_points) - 1) % 2
        segments.append(AudioSegment(
            start=change_points[-1],
            end=len(audio) / self.sample_rate,
            label=f"speaker_{last_speaker}"
        ))
        
        return segments

4. Production Pipeline

import logging
from typing import Callable

class ProductionAudioSegmenter:
    """
    Production-ready audio segmentation system.
    
    Features:
    - Real-time processing
    - Multiple detection methods
    - Segment merging (interval merging!)
    - Boundary refinement
    - Monitoring
    """
    
    def __init__(
        self,
        sample_rate: int = 16000,
        enable_vad: bool = True,
        enable_speaker_detection: bool = False
    ):
        self.sample_rate = sample_rate
        self.enable_vad = enable_vad
        self.enable_speaker_detection = enable_speaker_detection
        
        # Components
        self.segmenter = AudioSegmenter(sample_rate=sample_rate)
        self.streaming_segmenter = StreamingSegmenter(sample_rate=sample_rate)
        self.speaker_detector = SpeakerChangeDetector(sample_rate=sample_rate)
        
        self.logger = logging.getLogger(__name__)
        
        # Metrics
        self.segments_created = 0
        self.total_audio_processed_sec = 0.0
    
    def segment_audio(
        self,
        audio: np.ndarray,
        mode: str = "batch"
    ) -> List[AudioSegment]:
        """
        Segment audio.
        
        Args:
            audio: Audio waveform
            mode: "batch" or "streaming"
            
        Returns:
            List of audio segments
        """
        audio_duration = len(audio) / self.sample_rate
        self.total_audio_processed_sec += audio_duration
        
        if mode == "batch":
            return self._segment_batch(audio)
        else:
            return self._segment_streaming(audio)
    
    def _segment_batch(self, audio: np.ndarray) -> List[AudioSegment]:
        """Batch segmentation."""
        segments = []
        
        # VAD segmentation
        if self.enable_vad:
            vad = RealtimeVAD(sample_rate=self.sample_rate)
            
            # Process audio in chunks
            chunk_size = int(0.03 * self.sample_rate)  # 30ms
            vad_probs = []
            
            for i in range(0, len(audio), chunk_size):
                chunk = audio[i:i + chunk_size]
                if len(chunk) < chunk_size:
                    # Pad last chunk
                    chunk = np.pad(chunk, (0, chunk_size - len(chunk)))
                
                results = vad.process_chunk(chunk)
                vad_probs.extend(results)
            
            vad_probs = np.array(vad_probs)
            
            # Create segments from VAD
            segments = self.segmenter.segment_by_vad(
                audio,
                vad_probs,
                frame_duration_ms=30.0
            )
        
        # Speaker change detection
        if self.enable_speaker_detection:
            change_points = self.speaker_detector.detect_speaker_changes(audio)
            speaker_segments = self.speaker_detector.segment_by_speaker(
                audio,
                change_points
            )
            
            # Merge with VAD segments
            segments = self._merge_vad_and_speaker_segments(
                segments,
                speaker_segments
            )
        
        # Refine boundaries
        segments = self.segmenter.refine_boundaries(audio, segments)
        
        self.segments_created += len(segments)
        
        self.logger.info(
            f"Created {len(segments)} segments from "
            f"{len(audio)/self.sample_rate:.1f}s audio"
        )
        
        return segments
    
    def _segment_streaming(self, audio: np.ndarray) -> List[AudioSegment]:
        """Streaming segmentation."""
        # Process in chunks
        chunk_duration_ms = 100  # 100ms chunks
        chunk_size = int(chunk_duration_ms * self.sample_rate / 1000)
        
        all_segments = []
        
        for i in range(0, len(audio), chunk_size):
            chunk = audio[i:i + chunk_size]
            
            # Process chunk
            segments = self.streaming_segmenter.process_audio_chunk(
                chunk,
                chunk_duration_ms
            )
            
            all_segments.extend(segments)
        
        return all_segments
    
    def _merge_vad_and_speaker_segments(
        self,
        vad_segments: List[AudioSegment],
        speaker_segments: List[AudioSegment]
    ) -> List[AudioSegment]:
        """
        Merge VAD and speaker segments.
        
        Strategy: Split VAD segments at speaker boundaries.
        """
        merged = []
        
        for vad_seg in vad_segments:
            # Find speaker segments that overlap with VAD segment
            current_start = vad_seg.start
            
            for spk_seg in speaker_segments:
                if spk_seg.overlaps(vad_seg):
                    # Create segment for overlap
                    overlap_start = max(vad_seg.start, spk_seg.start)
                    overlap_end = min(vad_seg.end, spk_seg.end)
                    
                    if overlap_end > current_start:
                        merged.append(AudioSegment(
                            start=current_start,
                            end=overlap_end,
                            label=spk_seg.label
                        ))
                        current_start = overlap_end
            
            # Handle remaining part
            if current_start < vad_seg.end:
                merged.append(AudioSegment(
                    start=current_start,
                    end=vad_seg.end,
                    label="speech"
                ))
        
        return self.segmenter.merge_segments(merged)
    
    def export_segments(
        self,
        segments: List[AudioSegment],
        format: str = "rttm"
    ) -> str:
        """Export segments to standard format."""
        if format == "rttm":
            lines = []
            for seg in segments:
                line = (
                    f"SPEAKER file 1 {seg.start:.2f} {seg.duration:.2f} "
                    f"<NA> <NA> {seg.label} <NA> <NA>"
                )
                lines.append(line)
            return '\n'.join(lines)
        
        elif format == "json":
            import json
            return json.dumps([
                {
                    "start": seg.start,
                    "end": seg.end,
                    "duration": seg.duration,
                    "label": seg.label
                }
                for seg in segments
            ], indent=2)
        
        else:
            raise ValueError(f"Unknown format: {format}")
    
    def get_metrics(self) -> dict:
        """Get processing metrics."""
        return {
            "segments_created": self.segments_created,
            "audio_processed_sec": self.total_audio_processed_sec,
            "segments_per_second": (
                self.segments_created / self.total_audio_processed_sec
                if self.total_audio_processed_sec > 0 else 0
            )
        }


# Example usage
if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    
    # Generate sample audio (or load real audio)
    sample_rate = 16000
    duration = 10  # seconds
    audio = np.random.randn(sample_rate * duration) * 0.1
    
    # Create segmenter
    segmenter = ProductionAudioSegmenter(
        sample_rate=sample_rate,
        enable_vad=True,
        enable_speaker_detection=False
    )
    
    # Segment audio
    segments = segmenter.segment_audio(audio, mode="batch")
    
    print(f"\nSegmentation Results:")
    print(f"Audio duration: {duration}s")
    print(f"Segments created: {len(segments)}")
    print(f"\nSegments:")
    for i, seg in enumerate(segments):
        print(f"  {i+1}. [{seg.start:.2f}s - {seg.end:.2f}s] {seg.label} ({seg.duration:.2f}s)")
    
    # Export
    rttm = segmenter.export_segments(segments, format="rttm")
    print(f"\nRTTM format:\n{rttm}")
    
    # Metrics
    print(f"\nMetrics: {segmenter.get_metrics()}")

Evaluation Metrics

def calculate_segmentation_metrics(
    reference: List[AudioSegment],
    hypothesis: List[AudioSegment],
    collar: float = 0.2
) -> dict:
    """
    Calculate segmentation accuracy metrics.
    
    Metrics:
    - Precision: How many detected boundaries are correct?
    - Recall: How many true boundaries were detected?
    - F1-score: Harmonic mean of precision and recall
    
    Args:
        reference: Ground truth segments
        hypothesis: Detected segments
        collar: Forgiveness window around boundaries (seconds)
    """
    # Extract boundary points
    ref_boundaries = set()
    for seg in reference:
        ref_boundaries.add(seg.start)
        ref_boundaries.add(seg.end)
    
    hyp_boundaries = set()
    for seg in hypothesis:
        hyp_boundaries.add(seg.start)
        hyp_boundaries.add(seg.end)
    
    # Calculate matches
    true_positives = 0
    
    for hyp_bound in hyp_boundaries:
        # Check if within collar of any reference boundary
        for ref_bound in ref_boundaries:
            if abs(hyp_bound - ref_bound) <= collar:
                true_positives += 1
                break
    
    # Calculate metrics
    precision = true_positives / len(hyp_boundaries) if hyp_boundaries else 0
    recall = true_positives / len(ref_boundaries) if ref_boundaries else 0
    f1 = (
        2 * precision * recall / (precision + recall)
        if precision + recall > 0 else 0
    )
    
    return {
        "precision": precision,
        "recall": recall,
        "f1_score": f1,
        "true_positives": true_positives,
        "false_positives": len(hyp_boundaries) - true_positives,
        "false_negatives": len(ref_boundaries) - true_positives
    }

Real-World Case Study: Zoom’s Audio Segmentation

Zoom’s Approach

Zoom processes 300M+ meetings daily with real-time segmentation:

Architecture:

  1. Client-side VAD: WebRTC VAD for initial detection
  2. Server-side refinement: ML-based boundary refinement
  3. Speaker tracking: Incremental speaker change detection
  4. Adaptive thresholds: Adjust based on audio quality

Results:

  • <50ms latency for boundary detection
  • >95% F1-score on internal benchmarks
  • Real-time factor < 0.05x
  • <2% CPU per stream

Key Lessons

  1. Client-side processing reduces server load
  2. Hybrid approach (WebRTC + ML) balances speed and accuracy
  3. Adaptive thresholds handle varying audio quality
  4. Interval merging critical for clean segments
  5. Boundary refinement improves downstream tasks

Cost Analysis

Processing Costs (1000 concurrent streams)

Component CPU Memory Cost/Month
VAD 5% per stream 10MB $500
Boundary detection 3% per stream 20MB $300
Speaker detection 10% per stream 50MB $1000
Total (VAD only) 50 cores 10GB $800/month

Optimization:

  • Client-side VAD: 80% cost reduction
  • Batch processing: 50% cost reduction
  • Model quantization: 40% faster

Key Takeaways

Segmentation is interval merging - same algorithm applies

WebRTC VAD is industry standard for real-time detection

Boundary refinement critical for quality

Streaming requires buffering and incremental processing

Speaker detection adds significant value

Same patterns as merge intervals and event streams

Real-time factor <0.1x achievable with optimization

Client-side processing dramatically reduces costs

Adaptive thresholds handle varying conditions

Monitor F1-score as key quality metric

All three topics use the same interval processing pattern:

DSA (Merge Intervals):

# Sort + merge overlapping intervals
intervals.sort(key=lambda x: x[0])
for current in intervals:
    if current.overlaps(last):
        last = merge(last, current)

ML System Design (Event Streams):

# Sort events + merge event windows
events.sort(key=lambda e: e.timestamp)
for event in events:
    if event.in_window(last_window):
        last_window.extend(event)

Speech Tech (Audio Segmentation):

# Sort segments + merge audio boundaries
segments.sort(key=lambda s: s.start)
for segment in segments:
    if segment.gap(last) <= max_gap:
        last = merge(last, segment)

Universal pattern across all three:

  1. Sort by temporal position
  2. Check overlap/proximity
  3. Merge if conditions met
  4. Output consolidated ranges

Originally published at: arunbaby.com/speech-tech/0016-real-time-audio-segmentation

If you found this helpful, consider sharing it with others who might benefit.