Real-time Audio Segmentation
Build production audio segmentation systems that detect boundaries in real-time using interval merging and temporal processing—the same principles from merge intervals and event stream processing.
Problem Statement
Design a Real-time Audio Segmentation System that detects and merges speech segments, speaker boundaries, and audio events in streaming audio with minimal latency.
Functional Requirements
- Voice Activity Detection: Detect speech vs silence boundaries
- Speaker change detection: Identify speaker turn boundaries
- Segment merging: Merge adjacent segments intelligently
- Real-time processing: <100ms latency for streaming audio
- Boundary refinement: Smooth and optimize segment boundaries
- Multi-channel support: Handle stereo/multi-mic audio
- Quality metrics: Calculate segmentation accuracy
- Format support: Handle various audio formats and sample rates
Non-Functional Requirements
- Latency: p95 < 100ms for boundary detection
- Accuracy: >95% F1-score for segment detection
- Throughput: Process 1000+ audio streams concurrently
- Real-time factor: <0.1x (process 10min audio in 1min)
- Memory: <100MB per audio stream
- CPU efficiency: <5% CPU per stream
- Robustness: Handle noise, varying quality
Understanding the Problem
Audio segmentation is critical for speech applications:
Real-World Use Cases
| Company | Use Case | Latency Requirement | Scale |
|---|---|---|---|
| Zoom | Meeting segmentation | Real-time (<100ms) | 300M+ meetings/day |
| Google Meet | Speaker turn detection | Real-time (<50ms) | Billions of minutes |
| Otter.ai | Transcript segmentation | Near real-time | 10M+ hours |
| Amazon Alexa | Wake word detection | Real-time (<50ms) | 100M+ devices |
| Microsoft Teams | Audio preprocessing | Real-time | Enterprise scale |
| Apple Siri | Voice command boundaries | Real-time (<30ms) | Billions of requests |
Why Segmentation Matters
- Speech recognition: Better boundaries → better transcription
- Speaker diarization: Prerequisite for “who spoke when”
- Audio indexing: Enable search within audio
- Compression: Skip silence to reduce data
- User experience: Show real-time captions with proper breaks
- Quality of service: Detect issues (silence, noise)
The Interval Processing Connection
Just like Merge Intervals and Event Stream Processing:
| Merge Intervals | Event Streams | Audio Segmentation |
|---|---|---|
| Merge overlapping ranges | Merge event windows | Merge audio segments |
| Sort by start time | Event ordering | Temporal ordering |
| Greedy merging | Window aggregation | Boundary merging |
| Overlap detection | Event correlation | Segment alignment |
| O(N log N) | Buffer + process | Sliding window |
All three deal with temporal data requiring efficient interval/boundary processing.
High-Level Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Real-time Audio Segmentation System │
└─────────────────────────────────────────────────────────────────┘
Audio Input (Streaming)
16kHz PCM, Real-time
↓
┌───────────────────────┐
│ Audio Buffering │
│ - Ring buffer │
│ - Overlap handling │
└───────────┬───────────┘
│
┌───────────▼───────────┐
│ Feature Extraction │
│ - MFCCs │
│ - Energy │
│ - Zero crossings │
└───────────┬───────────┘
│
┌───────────▼───────────┐
│ VAD (Voice Activity)│
│ - WebRTC VAD │
│ - ML-based VAD │
└───────────┬───────────┘
│
┌───────────▼───────────┐
│ Boundary Detection │
│ - Energy changes │
│ - Spectral changes │
│ - ML classifier │
└───────────┬───────────┘
│
┌───────────▼───────────┐
│ Segment Merging │
│ (Like Merge │
│ Intervals!) │
│ - Min duration │
│ - Max gap │
└───────────┬───────────┘
│
┌───────────▼───────────┐
│ Boundary Refinement │
│ - Smooth edges │
│ - Snap to zero │
│ crossings │
└───────────┬───────────┘
│
Segmented Audio
[(start, end, label)]
Key Components
- Audio Buffering: Manage streaming audio with overlaps
- VAD: Detect speech vs non-speech
- Boundary Detection: Find segment boundaries
- Segment Merging: Merge intervals (same algorithm!)
- Refinement: Optimize boundaries
Component Deep-Dives
1. Audio Segmentation with Interval Merging
The core algorithm is exactly merge intervals:
import numpy as np
from typing import List, Tuple, Optional
from dataclasses import dataclass
import librosa
@dataclass
class AudioSegment:
"""
Audio segment with time boundaries.
Exactly like intervals in merge intervals problem:
- start: segment start time (seconds)
- end: segment end time (seconds)
- label: segment type ("speech", "silence", "speaker_A", etc.)
"""
start: float
end: float
label: str = "speech"
confidence: float = 1.0
@property
def duration(self) -> float:
return self.end - self.start
def overlaps(self, other: 'AudioSegment') -> bool:
"""
Check if this segment overlaps with another.
Same as interval overlap check:
max(start1, start2) <= min(end1, end2)
"""
return max(self.start, other.start) <= min(self.end, other.end)
def merge(self, other: 'AudioSegment') -> 'AudioSegment':
"""
Merge this segment with another.
Same as merging intervals:
- New start = min of starts
- New end = max of ends
"""
return AudioSegment(
start=min(self.start, other.start),
end=max(self.end, other.end),
label=self.label,
confidence=min(self.confidence, other.confidence)
)
def to_samples(self, sample_rate: int) -> Tuple[int, int]:
"""Convert time to sample indices."""
start_sample = int(self.start * sample_rate)
end_sample = int(self.end * sample_rate)
return start_sample, end_sample
class AudioSegmenter:
"""
Audio segmentation using interval merging.
This is the merge intervals algorithm applied to audio!
"""
def __init__(
self,
min_segment_duration: float = 0.3,
max_gap: float = 0.2,
sample_rate: int = 16000
):
"""
Initialize segmenter.
Args:
min_segment_duration: Minimum segment length (seconds)
max_gap: Maximum gap to merge over (seconds)
sample_rate: Audio sample rate
"""
self.min_segment_duration = min_segment_duration
self.max_gap = max_gap
self.sample_rate = sample_rate
def merge_segments(self, segments: List[AudioSegment]) -> List[AudioSegment]:
"""
Merge audio segments.
This is EXACTLY the merge intervals algorithm!
Steps:
1. Sort segments by start time
2. Merge overlapping/close segments
3. Filter short segments
Args:
segments: List of audio segments
Returns:
Merged segments
"""
if not segments:
return []
# Step 1: Sort by start time (like merge intervals)
sorted_segments = sorted(segments, key=lambda s: s.start)
# Step 2: Merge overlapping or close segments
merged = [sorted_segments[0]]
for current in sorted_segments[1:]:
last = merged[-1]
# Check if should merge
# Overlap OR gap <= max_gap
gap = current.start - last.end
if gap <= self.max_gap and current.label == last.label:
# Merge (like merging intervals)
merged[-1] = last.merge(current)
else:
# No merge - add new segment
merged.append(current)
# Step 3: Filter short segments
filtered = [
seg for seg in merged
if seg.duration >= self.min_segment_duration
]
return filtered
def segment_by_vad(
self,
audio: np.ndarray,
vad_probs: np.ndarray,
frame_duration_ms: float = 30.0
) -> List[AudioSegment]:
"""
Create segments from VAD probabilities.
Args:
audio: Audio waveform
vad_probs: VAD probabilities per frame (0=silence, 1=speech)
frame_duration_ms: Duration of each VAD frame
Returns:
List of speech segments
"""
frame_duration_sec = frame_duration_ms / 1000.0
# Find speech frames (threshold at 0.5)
speech_frames = vad_probs > 0.5
# Convert to segments
segments = []
in_speech = False
segment_start = 0.0
for i, is_speech in enumerate(speech_frames):
current_time = i * frame_duration_sec
if is_speech and not in_speech:
# Speech started
segment_start = current_time
in_speech = True
elif not is_speech and in_speech:
# Speech ended
segment_end = current_time
segments.append(AudioSegment(
start=segment_start,
end=segment_end,
label="speech"
))
in_speech = False
# Handle case where speech continues to end
if in_speech:
segment_end = len(speech_frames) * frame_duration_sec
segments.append(AudioSegment(
start=segment_start,
end=segment_end,
label="speech"
))
# Merge segments (interval merging!)
return self.merge_segments(segments)
def find_gaps(self, segments: List[AudioSegment]) -> List[AudioSegment]:
"""
Find silence gaps between speech segments.
Similar to finding gaps in merge intervals problem.
"""
if len(segments) < 2:
return []
# Sort segments
sorted_segments = sorted(segments, key=lambda s: s.start)
gaps = []
for i in range(len(sorted_segments) - 1):
current_end = sorted_segments[i].end
next_start = sorted_segments[i + 1].start
gap_duration = next_start - current_end
if gap_duration > 0:
gaps.append(AudioSegment(
start=current_end,
end=next_start,
label="silence"
))
return gaps
def refine_boundaries(
self,
audio: np.ndarray,
segments: List[AudioSegment]
) -> List[AudioSegment]:
"""
Refine segment boundaries by snapping to zero crossings.
This reduces audio artifacts at boundaries.
"""
refined = []
for segment in segments:
# Convert to samples
start_sample, end_sample = segment.to_samples(self.sample_rate)
# Find nearest zero crossing for start
start_refined = self._find_nearest_zero_crossing(
audio,
start_sample,
search_window=int(0.01 * self.sample_rate) # 10ms
)
# Find nearest zero crossing for end
end_refined = self._find_nearest_zero_crossing(
audio,
end_sample,
search_window=int(0.01 * self.sample_rate)
)
# Convert back to time
refined_segment = AudioSegment(
start=start_refined / self.sample_rate,
end=end_refined / self.sample_rate,
label=segment.label,
confidence=segment.confidence
)
refined.append(refined_segment)
return refined
def _find_nearest_zero_crossing(
self,
audio: np.ndarray,
sample_idx: int,
search_window: int = 160
) -> int:
"""Find nearest zero crossing to given sample."""
start = max(0, sample_idx - search_window)
end = min(len(audio), sample_idx + search_window)
# Find zero crossings
window = audio[start:end]
zero_crossings = np.where(np.diff(np.sign(window)))[0]
if len(zero_crossings) == 0:
return sample_idx
# Find closest to target
target_pos = sample_idx - start
closest_zc = zero_crossings[
np.argmin(np.abs(zero_crossings - target_pos))
]
return start + closest_zc
2. Real-time VAD with WebRTC
import webrtcvad
from collections import deque
class RealtimeVAD:
"""
Real-time Voice Activity Detection.
Uses WebRTC VAD for low-latency detection.
"""
def __init__(
self,
sample_rate: int = 16000,
frame_duration_ms: int = 30,
aggressiveness: int = 2
):
"""
Initialize VAD.
Args:
sample_rate: Audio sample rate (8000, 16000, 32000, 48000)
frame_duration_ms: Frame duration (10, 20, 30 ms)
aggressiveness: VAD aggressiveness (0-3, higher = more aggressive)
"""
self.vad = webrtcvad.Vad(aggressiveness)
self.sample_rate = sample_rate
self.frame_duration_ms = frame_duration_ms
self.frame_length = int(sample_rate * frame_duration_ms / 1000)
# Buffer for incomplete frames
self.buffer = bytearray()
# Smoothing buffer
self.smoothing_window = 5
self.recent_results = deque(maxlen=self.smoothing_window)
def process_chunk(self, audio_chunk: np.ndarray) -> List[bool]:
"""
Process audio chunk and return VAD decisions.
Args:
audio_chunk: Audio samples (int16)
Returns:
List of VAD decisions (True = speech, False = silence)
"""
# Convert to bytes
audio_bytes = (audio_chunk * 32767).astype(np.int16).tobytes()
self.buffer.extend(audio_bytes)
results = []
# Process complete frames
frame_bytes = self.frame_length * 2 # 2 bytes per sample (int16)
while len(self.buffer) >= frame_bytes:
# Extract frame
frame = bytes(self.buffer[:frame_bytes])
self.buffer = self.buffer[frame_bytes:]
# Run VAD
is_speech = self.vad.is_speech(frame, self.sample_rate)
# Apply smoothing
self.recent_results.append(is_speech)
smoothed = sum(self.recent_results) > len(self.recent_results) // 2
results.append(smoothed)
return results
class StreamingSegmenter:
"""
Streaming audio segmenter.
Processes audio in real-time, emitting segments as they complete.
"""
def __init__(self, sample_rate: int = 16000):
self.sample_rate = sample_rate
self.vad = RealtimeVAD(sample_rate=sample_rate)
self.segmenter = AudioSegmenter(sample_rate=sample_rate)
# Streaming state
self.current_segment: Optional[AudioSegment] = None
self.completed_segments: List[AudioSegment] = []
self.current_time = 0.0
# Buffering for boundary refinement
self.audio_buffer = deque(maxlen=sample_rate * 5) # 5 seconds
def process_audio_chunk(
self,
audio_chunk: np.ndarray,
chunk_duration_ms: float = 100.0
) -> List[AudioSegment]:
"""
Process audio chunk and return completed segments.
Similar to processing events in stream processing:
- Buffer incoming data
- Detect boundaries
- Emit completed segments
Args:
audio_chunk: Audio samples
chunk_duration_ms: Chunk duration
Returns:
List of newly completed segments
"""
# Add to buffer
self.audio_buffer.extend(audio_chunk)
# Run VAD
vad_results = self.vad.process_chunk(audio_chunk)
# Update segments
frame_duration = self.vad.frame_duration_ms / 1000.0
completed = []
for is_speech in vad_results:
if is_speech:
if self.current_segment is None:
# Start new segment
self.current_segment = AudioSegment(
start=self.current_time,
end=self.current_time + frame_duration,
label="speech"
)
else:
# Extend current segment
self.current_segment.end = self.current_time + frame_duration
else:
if self.current_segment is not None:
# End current segment
# Check if meets minimum duration
if self.current_segment.duration >= self.segmenter.min_segment_duration:
completed.append(self.current_segment)
self.current_segment = None
self.current_time += frame_duration
return completed
3. Speaker Change Detection
from scipy.signal import find_peaks
class SpeakerChangeDetector:
"""
Detect speaker change boundaries in audio.
Uses spectral change detection + embedding similarity.
"""
def __init__(self, sample_rate: int = 16000):
self.sample_rate = sample_rate
def detect_speaker_changes(
self,
audio: np.ndarray,
frame_size: int = 1024,
hop_length: int = 512
) -> List[float]:
"""
Detect speaker change points.
Algorithm:
1. Compute spectral features per frame
2. Calculate frame-to-frame distance
3. Find peaks in distance (speaker changes)
4. Return change point times
Returns:
List of change point times (seconds)
"""
# Compute MFCC features
mfccs = librosa.feature.mfcc(
y=audio,
sr=self.sample_rate,
n_mfcc=13,
n_fft=frame_size,
hop_length=hop_length
)
# Compute frame-to-frame distance
distances = np.zeros(mfccs.shape[1] - 1)
for i in range(len(distances)):
distances[i] = np.linalg.norm(mfccs[:, i+1] - mfccs[:, i])
# Smooth distances
from scipy.ndimage import gaussian_filter1d
distances_smooth = gaussian_filter1d(distances, sigma=2)
# Find peaks (speaker changes)
peaks, _ = find_peaks(
distances_smooth,
height=np.percentile(distances_smooth, 75),
distance=int(1.0 * self.sample_rate / hop_length) # Min 1 second apart
)
# Convert to times
change_times = [
peak * hop_length / self.sample_rate
for peak in peaks
]
return change_times
def segment_by_speaker(
self,
audio: np.ndarray,
change_points: List[float]
) -> List[AudioSegment]:
"""
Create segments based on speaker changes.
Args:
audio: Audio waveform
change_points: Speaker change times
Returns:
List of speaker segments
"""
if not change_points:
# Single speaker
return [AudioSegment(
start=0.0,
end=len(audio) / self.sample_rate,
label="speaker_0"
)]
segments = []
# First segment
segments.append(AudioSegment(
start=0.0,
end=change_points[0],
label="speaker_0"
))
# Middle segments
for i in range(len(change_points) - 1):
speaker_id = i % 2 # Alternate speakers (simplified)
segments.append(AudioSegment(
start=change_points[i],
end=change_points[i + 1],
label=f"speaker_{speaker_id}"
))
# Last segment
last_speaker = (len(change_points) - 1) % 2
segments.append(AudioSegment(
start=change_points[-1],
end=len(audio) / self.sample_rate,
label=f"speaker_{last_speaker}"
))
return segments
4. Production Pipeline
import logging
from typing import Callable
class ProductionAudioSegmenter:
"""
Production-ready audio segmentation system.
Features:
- Real-time processing
- Multiple detection methods
- Segment merging (interval merging!)
- Boundary refinement
- Monitoring
"""
def __init__(
self,
sample_rate: int = 16000,
enable_vad: bool = True,
enable_speaker_detection: bool = False
):
self.sample_rate = sample_rate
self.enable_vad = enable_vad
self.enable_speaker_detection = enable_speaker_detection
# Components
self.segmenter = AudioSegmenter(sample_rate=sample_rate)
self.streaming_segmenter = StreamingSegmenter(sample_rate=sample_rate)
self.speaker_detector = SpeakerChangeDetector(sample_rate=sample_rate)
self.logger = logging.getLogger(__name__)
# Metrics
self.segments_created = 0
self.total_audio_processed_sec = 0.0
def segment_audio(
self,
audio: np.ndarray,
mode: str = "batch"
) -> List[AudioSegment]:
"""
Segment audio.
Args:
audio: Audio waveform
mode: "batch" or "streaming"
Returns:
List of audio segments
"""
audio_duration = len(audio) / self.sample_rate
self.total_audio_processed_sec += audio_duration
if mode == "batch":
return self._segment_batch(audio)
else:
return self._segment_streaming(audio)
def _segment_batch(self, audio: np.ndarray) -> List[AudioSegment]:
"""Batch segmentation."""
segments = []
# VAD segmentation
if self.enable_vad:
vad = RealtimeVAD(sample_rate=self.sample_rate)
# Process audio in chunks
chunk_size = int(0.03 * self.sample_rate) # 30ms
vad_probs = []
for i in range(0, len(audio), chunk_size):
chunk = audio[i:i + chunk_size]
if len(chunk) < chunk_size:
# Pad last chunk
chunk = np.pad(chunk, (0, chunk_size - len(chunk)))
results = vad.process_chunk(chunk)
vad_probs.extend(results)
vad_probs = np.array(vad_probs)
# Create segments from VAD
segments = self.segmenter.segment_by_vad(
audio,
vad_probs,
frame_duration_ms=30.0
)
# Speaker change detection
if self.enable_speaker_detection:
change_points = self.speaker_detector.detect_speaker_changes(audio)
speaker_segments = self.speaker_detector.segment_by_speaker(
audio,
change_points
)
# Merge with VAD segments
segments = self._merge_vad_and_speaker_segments(
segments,
speaker_segments
)
# Refine boundaries
segments = self.segmenter.refine_boundaries(audio, segments)
self.segments_created += len(segments)
self.logger.info(
f"Created {len(segments)} segments from "
f"{len(audio)/self.sample_rate:.1f}s audio"
)
return segments
def _segment_streaming(self, audio: np.ndarray) -> List[AudioSegment]:
"""Streaming segmentation."""
# Process in chunks
chunk_duration_ms = 100 # 100ms chunks
chunk_size = int(chunk_duration_ms * self.sample_rate / 1000)
all_segments = []
for i in range(0, len(audio), chunk_size):
chunk = audio[i:i + chunk_size]
# Process chunk
segments = self.streaming_segmenter.process_audio_chunk(
chunk,
chunk_duration_ms
)
all_segments.extend(segments)
return all_segments
def _merge_vad_and_speaker_segments(
self,
vad_segments: List[AudioSegment],
speaker_segments: List[AudioSegment]
) -> List[AudioSegment]:
"""
Merge VAD and speaker segments.
Strategy: Split VAD segments at speaker boundaries.
"""
merged = []
for vad_seg in vad_segments:
# Find speaker segments that overlap with VAD segment
current_start = vad_seg.start
for spk_seg in speaker_segments:
if spk_seg.overlaps(vad_seg):
# Create segment for overlap
overlap_start = max(vad_seg.start, spk_seg.start)
overlap_end = min(vad_seg.end, spk_seg.end)
if overlap_end > current_start:
merged.append(AudioSegment(
start=current_start,
end=overlap_end,
label=spk_seg.label
))
current_start = overlap_end
# Handle remaining part
if current_start < vad_seg.end:
merged.append(AudioSegment(
start=current_start,
end=vad_seg.end,
label="speech"
))
return self.segmenter.merge_segments(merged)
def export_segments(
self,
segments: List[AudioSegment],
format: str = "rttm"
) -> str:
"""Export segments to standard format."""
if format == "rttm":
lines = []
for seg in segments:
line = (
f"SPEAKER file 1 {seg.start:.2f} {seg.duration:.2f} "
f"<NA> <NA> {seg.label} <NA> <NA>"
)
lines.append(line)
return '\n'.join(lines)
elif format == "json":
import json
return json.dumps([
{
"start": seg.start,
"end": seg.end,
"duration": seg.duration,
"label": seg.label
}
for seg in segments
], indent=2)
else:
raise ValueError(f"Unknown format: {format}")
def get_metrics(self) -> dict:
"""Get processing metrics."""
return {
"segments_created": self.segments_created,
"audio_processed_sec": self.total_audio_processed_sec,
"segments_per_second": (
self.segments_created / self.total_audio_processed_sec
if self.total_audio_processed_sec > 0 else 0
)
}
# Example usage
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
# Generate sample audio (or load real audio)
sample_rate = 16000
duration = 10 # seconds
audio = np.random.randn(sample_rate * duration) * 0.1
# Create segmenter
segmenter = ProductionAudioSegmenter(
sample_rate=sample_rate,
enable_vad=True,
enable_speaker_detection=False
)
# Segment audio
segments = segmenter.segment_audio(audio, mode="batch")
print(f"\nSegmentation Results:")
print(f"Audio duration: {duration}s")
print(f"Segments created: {len(segments)}")
print(f"\nSegments:")
for i, seg in enumerate(segments):
print(f" {i+1}. [{seg.start:.2f}s - {seg.end:.2f}s] {seg.label} ({seg.duration:.2f}s)")
# Export
rttm = segmenter.export_segments(segments, format="rttm")
print(f"\nRTTM format:\n{rttm}")
# Metrics
print(f"\nMetrics: {segmenter.get_metrics()}")
Evaluation Metrics
def calculate_segmentation_metrics(
reference: List[AudioSegment],
hypothesis: List[AudioSegment],
collar: float = 0.2
) -> dict:
"""
Calculate segmentation accuracy metrics.
Metrics:
- Precision: How many detected boundaries are correct?
- Recall: How many true boundaries were detected?
- F1-score: Harmonic mean of precision and recall
Args:
reference: Ground truth segments
hypothesis: Detected segments
collar: Forgiveness window around boundaries (seconds)
"""
# Extract boundary points
ref_boundaries = set()
for seg in reference:
ref_boundaries.add(seg.start)
ref_boundaries.add(seg.end)
hyp_boundaries = set()
for seg in hypothesis:
hyp_boundaries.add(seg.start)
hyp_boundaries.add(seg.end)
# Calculate matches
true_positives = 0
for hyp_bound in hyp_boundaries:
# Check if within collar of any reference boundary
for ref_bound in ref_boundaries:
if abs(hyp_bound - ref_bound) <= collar:
true_positives += 1
break
# Calculate metrics
precision = true_positives / len(hyp_boundaries) if hyp_boundaries else 0
recall = true_positives / len(ref_boundaries) if ref_boundaries else 0
f1 = (
2 * precision * recall / (precision + recall)
if precision + recall > 0 else 0
)
return {
"precision": precision,
"recall": recall,
"f1_score": f1,
"true_positives": true_positives,
"false_positives": len(hyp_boundaries) - true_positives,
"false_negatives": len(ref_boundaries) - true_positives
}
Real-World Case Study: Zoom’s Audio Segmentation
Zoom’s Approach
Zoom processes 300M+ meetings daily with real-time segmentation:
Architecture:
- Client-side VAD: WebRTC VAD for initial detection
- Server-side refinement: ML-based boundary refinement
- Speaker tracking: Incremental speaker change detection
- Adaptive thresholds: Adjust based on audio quality
Results:
- <50ms latency for boundary detection
- >95% F1-score on internal benchmarks
- Real-time factor < 0.05x
- <2% CPU per stream
Key Lessons
- Client-side processing reduces server load
- Hybrid approach (WebRTC + ML) balances speed and accuracy
- Adaptive thresholds handle varying audio quality
- Interval merging critical for clean segments
- Boundary refinement improves downstream tasks
Cost Analysis
Processing Costs (1000 concurrent streams)
| Component | CPU | Memory | Cost/Month |
|---|---|---|---|
| VAD | 5% per stream | 10MB | $500 |
| Boundary detection | 3% per stream | 20MB | $300 |
| Speaker detection | 10% per stream | 50MB | $1000 |
| Total (VAD only) | 50 cores | 10GB | $800/month |
Optimization:
- Client-side VAD: 80% cost reduction
- Batch processing: 50% cost reduction
- Model quantization: 40% faster
Key Takeaways
✅ Segmentation is interval merging - same algorithm applies
✅ WebRTC VAD is industry standard for real-time detection
✅ Boundary refinement critical for quality
✅ Streaming requires buffering and incremental processing
✅ Speaker detection adds significant value
✅ Same patterns as merge intervals and event streams
✅ Real-time factor <0.1x achievable with optimization
✅ Client-side processing dramatically reduces costs
✅ Adaptive thresholds handle varying conditions
✅ Monitor F1-score as key quality metric
Connection to Thematic Link: Interval Processing and Temporal Reasoning
All three topics use the same interval processing pattern:
DSA (Merge Intervals):
# Sort + merge overlapping intervals
intervals.sort(key=lambda x: x[0])
for current in intervals:
if current.overlaps(last):
last = merge(last, current)
ML System Design (Event Streams):
# Sort events + merge event windows
events.sort(key=lambda e: e.timestamp)
for event in events:
if event.in_window(last_window):
last_window.extend(event)
Speech Tech (Audio Segmentation):
# Sort segments + merge audio boundaries
segments.sort(key=lambda s: s.start)
for segment in segments:
if segment.gap(last) <= max_gap:
last = merge(last, segment)
Universal pattern across all three:
- Sort by temporal position
- Check overlap/proximity
- Merge if conditions met
- Output consolidated ranges
Originally published at: arunbaby.com/speech-tech/0016-real-time-audio-segmentation
If you found this helpful, consider sharing it with others who might benefit.