Event Stream Processing
Build production event stream processing systems that handle millions of events per second using windowing and temporal aggregation—applying the same interval merging principles from algorithm design.
Problem Statement
Design an Event Stream Processing System that ingests, processes, and analyzes millions of events per second in real-time, supporting windowed aggregations, pattern detection, and low-latency analytics.
Functional Requirements
- Event ingestion: Ingest millions of events/second from multiple sources
- Stream processing: Real-time transformations, filtering, enrichment
- Windowed aggregations: Tumbling, sliding, session windows
- Pattern detection: Complex event processing (CEP)
- State management: Maintain state across events
- Exactly-once semantics: No duplicate or lost events
- Late data handling: Handle out-of-order events
- Multiple outputs: Write to databases, caches, dashboards
Non-Functional Requirements
- Throughput: 1M+ events/second per partition
- Latency: p99 < 100ms for event processing
- Availability: 99.99% uptime
- Scalability: Horizontal scaling to 1000+ nodes
- Fault tolerance: Automatic recovery from failures
- Backpressure: Handle traffic spikes gracefully
- Cost efficiency: Optimize resource utilization
Understanding the Requirements
Event stream processing is the backbone of real-time analytics at scale:
Real-World Use Cases
| Company | Use Case | Scale | Technology |
|---|---|---|---|
| Netflix | Real-time recommendation updates | 10M+ events/sec | Kafka + Flink |
| Uber | Surge pricing, driver matching | 5M+ events/sec | Kafka + custom |
| News feed ranking | 1M+ events/sec | Kafka + Samza | |
| Airbnb | Pricing optimization | 500K+ events/sec | Kafka + Spark |
| Trending topics | 5M+ tweets/sec | Kafka + custom | |
| Spotify | Real-time playlist updates | 1M+ events/sec | Kafka + Flink |
Why Event Streams Matter
- Real-time analytics: Instant insights from data
- ML feature computation: Real-time feature updates
- Fraud detection: Immediate anomaly detection
- User engagement: Real-time personalization
- Monitoring: Live system health tracking
- Business intelligence: Instant KPI updates
The Interval Processing Connection
Just like the Merge Intervals problem:
| Merge Intervals | Event Stream Processing | Audio Segmentation |
|---|---|---|
| Merge overlapping time ranges | Merge event windows | Merge audio segments |
| Sort by start time | Event time ordering | Temporal ordering |
| Greedy merging | Window aggregation | Boundary merging |
| O(N log N) complexity | Stream buffering | Segment processing |
| Overlap detection | Event correlation | Segment alignment |
All three deal with temporal data requiring efficient interval/window processing.
High-Level Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Event Stream Processing System │
└─────────────────────────────────────────────────────────────────┘
Event Sources
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Apps │ │ Services│ │ IoT │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
└───────────┼───────────┘
│
┌──────▼──────┐
│ Kafka │
│ (Message │
│ Broker) │
└──────┬──────┘
│
┌──────────────────┼──────────────────┐
│ │ │
┌───────▼────────┐ ┌──────▼──────┐ ┌────────▼────────┐
│ Stream │ │ Windowing │ │ Aggregation │
│ Processing │ │ Engine │ │ Engine │
│ │ │ │ │ │
│ - Filter │ │ - Tumbling │ │ - Count │
│ - Transform │ │ - Sliding │ │ - Sum │
│ - Enrich │ │ - Session │ │ - Average │
└───────┬────────┘ └──────┬──────┘ └────────┬────────┘
│ │ │
└──────────────────┼──────────────────┘
│
┌──────▼──────┐
│ State │
│ Store │
│ (RocksDB) │
└──────┬──────┘
│
┌──────────────────┼──────────────────┐
│ │ │
┌───────▼────────┐ ┌──────▼──────┐ ┌────────▼────────┐
│ Database │ │ Cache │ │ Dashboard │
│ (Cassandra) │ │ (Redis) │ │ (Grafana) │
└────────────────┘ └─────────────┘ └─────────────────┘
Key Components
- Message Broker: Kafka for event ingestion and buffering
- Stream Processor: Flink/Spark for real-time computation
- Windowing Engine: Time-based and session-based windows
- State Store: RocksDB for stateful processing
- Output Sinks: Multiple destinations for processed events
Component Deep-Dives
1. Event Windowing - Similar to Interval Merging
Windows group events by time, just like merging intervals:
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
from collections import defaultdict
import time
@dataclass
class Event:
"""A single event in the stream."""
event_id: str
event_type: str
timestamp: int # Unix timestamp in milliseconds
user_id: str
data: Dict[str, Any]
@property
def event_time(self) -> datetime:
"""Get event time as datetime."""
return datetime.fromtimestamp(self.timestamp / 1000.0)
@dataclass
class Window:
"""
A time window containing events.
Similar to intervals in merge intervals problem:
- start: interval start
- end: interval end
- events: data within interval
"""
start: int # Window start (ms)
end: int # Window end (ms)
events: List[Event]
def overlaps(self, other: 'Window') -> bool:
"""
Check if this window overlaps with another.
Same logic as interval overlap:
max(start1, start2) <= min(end1, end2)
"""
return max(self.start, other.start) <= min(self.end, other.end)
def merge(self, other: 'Window') -> 'Window':
"""
Merge this window with another.
Same as merging intervals:
- New start = min of starts
- New end = max of ends
- Combine events
"""
return Window(
start=min(self.start, other.start),
end=max(self.end, other.end),
events=self.events + other.events
)
@property
def duration_ms(self) -> int:
return self.end - self.start
@property
def event_count(self) -> int:
return len(self.events)
class WindowManager:
"""
Manage event windows for stream processing.
Similar to merge intervals:
- Group events into time windows
- Merge overlapping windows
- Maintain sorted window list
"""
def __init__(self, window_type: str = "tumbling", window_size_ms: int = 60000):
"""
Initialize window manager.
Args:
window_type: "tumbling", "sliding", or "session"
window_size_ms: Window size in milliseconds
"""
self.window_type = window_type
self.window_size_ms = window_size_ms
self.windows: List[Window] = []
def assign_to_window(self, event: Event) -> List[Window]:
"""
Assign event to window(s).
Returns:
List of windows this event belongs to
"""
if self.window_type == "tumbling":
return self._assign_tumbling(event)
elif self.window_type == "sliding":
return self._assign_sliding(event)
elif self.window_type == "session":
return self._assign_session(event)
else:
raise ValueError(f"Unknown window type: {self.window_type}")
def _assign_tumbling(self, event: Event) -> List[Window]:
"""
Tumbling windows: Fixed-size, non-overlapping.
Example: 1-minute windows
[0-60s], [60-120s], [120-180s], ...
Each event belongs to exactly one window.
"""
# Calculate which window this event belongs to
window_id = event.timestamp // self.window_size_ms
window_start = window_id * self.window_size_ms
window_end = window_start + self.window_size_ms
# Find or create window
window = self._find_or_create_window(window_start, window_end)
window.events.append(event)
return [window]
def _assign_sliding(self, event: Event) -> List[Window]:
"""
Sliding windows: Fixed-size, overlapping.
Example: 1-minute windows, sliding every 30 seconds
[0-60s], [30-90s], [60-120s], ...
Each event can belong to multiple windows.
"""
slide_interval = self.window_size_ms // 2 # 50% overlap
# Find all windows this event falls into
windows = []
# Calculate first window that could contain this event
first_window_id = (event.timestamp - self.window_size_ms) // slide_interval
first_window_start = first_window_id * slide_interval
# Check windows until event is past window end
current_start = first_window_start
while current_start <= event.timestamp:
current_end = current_start + self.window_size_ms
if current_start <= event.timestamp < current_end:
window = self._find_or_create_window(current_start, current_end)
window.events.append(event)
windows.append(window)
current_start += slide_interval
return windows
def _assign_session(self, event: Event) -> List[Window]:
"""
Session windows: Dynamic windows based on activity gaps.
A session ends when there's a gap > session_timeout between events.
This is like merging intervals with a max gap tolerance!
"""
session_timeout = 5 * 60 * 1000 # 5 minutes
# Find window that could be extended
for window in self.windows:
# Check if event is within session timeout of window end
if event.timestamp - window.end <= session_timeout:
# Extend window
window.end = event.timestamp
window.events.append(event)
return [window]
# Start new session
window = Window(
start=event.timestamp,
end=event.timestamp,
events=[event]
)
self.windows.append(window)
return [window]
def _find_or_create_window(self, start: int, end: int) -> Window:
"""Find existing window or create new one."""
for window in self.windows:
if window.start == start and window.end == end:
return window
# Create new window
new_window = Window(start=start, end=end, events=[])
self.windows.append(new_window)
return new_window
def get_completed_windows(self, watermark: int) -> List[Window]:
"""
Get windows that are complete (past watermark).
Watermark = latest timestamp we're confident we've seen all events for.
Similar to merge intervals: return all intervals before a certain time.
"""
completed = []
remaining = []
for window in self.windows:
if window.end < watermark:
completed.append(window)
else:
remaining.append(window)
self.windows = remaining
return completed
def merge_overlapping_windows(self) -> List[Window]:
"""
Merge overlapping windows.
This is exactly the merge intervals algorithm!
"""
if not self.windows:
return []
# Sort by start time
sorted_windows = sorted(self.windows, key=lambda w: w.start)
merged = [sorted_windows[0]]
for current in sorted_windows[1:]:
last = merged[-1]
if current.overlaps(last):
# Merge
merged[-1] = last.merge(current)
else:
# Add new window
merged.append(current)
return merged
2. Stream Processing Engine
from typing import Callable, List
from queue import Queue
import threading
class StreamProcessor:
"""
Event stream processing engine.
Features:
- Real-time event processing
- Windowed aggregations
- Stateful operations
- Exactly-once semantics
"""
def __init__(self):
self.window_manager = WindowManager(window_type="tumbling", window_size_ms=60000)
self.aggregators: Dict[str, Callable] = {}
self.state_store: Dict[str, Any] = {}
# Processing queue
self.event_queue = Queue(maxsize=10000)
self.running = False
# Metrics
self.events_processed = 0
self.windows_created = 0
def register_aggregator(self, name: str, func: Callable):
"""Register an aggregation function."""
self.aggregators[name] = func
def process_event(self, event: Event):
"""
Process a single event.
Steps:
1. Assign to window(s)
2. Update state
3. Apply aggregations
4. Emit results
"""
# Assign to windows
windows = self.window_manager.assign_to_window(event)
# Update state for each window
for window in windows:
window_key = f"{window.start}-{window.end}"
# Initialize state if needed
if window_key not in self.state_store:
self.state_store[window_key] = {
'count': 0,
'sum': 0,
'events': []
}
# Update state
state = self.state_store[window_key]
state['count'] += 1
state['events'].append(event)
# Apply aggregations
for name, aggregator in self.aggregators.items():
result = aggregator(window.events)
state[name] = result
self.events_processed += 1
def get_window_aggregates(self, window_start: int, window_end: int) -> Dict:
"""Get aggregates for a specific window."""
window_key = f"{window_start}-{window_end}"
return self.state_store.get(window_key, {})
def flush_completed_windows(self, watermark: int) -> List[Dict]:
"""
Flush completed windows to output.
Similar to returning merged intervals after processing.
"""
completed = self.window_manager.get_completed_windows(watermark)
results = []
for window in completed:
window_key = f"{window.start}-{window.end}"
if window_key in self.state_store:
result = {
'window_start': window.start,
'window_end': window.end,
'aggregates': self.state_store[window_key]
}
results.append(result)
# Clean up state
del self.state_store[window_key]
return results
# Example usage
def example_stream_processing():
"""Example: Count events per user in 1-minute windows."""
processor = StreamProcessor()
# Register aggregator
def count_by_user(events: List[Event]) -> Dict[str, int]:
"""Count events per user."""
counts = defaultdict(int)
for event in events:
counts[event.user_id] += 1
return dict(counts)
processor.register_aggregator('user_counts', count_by_user)
# Process events
events = [
Event("1", "click", 1000, "user1", {}),
Event("2", "click", 2000, "user1", {}),
Event("3", "click", 3000, "user2", {}),
Event("4", "view", 65000, "user1", {}), # Next window
]
for event in events:
processor.process_event(event)
# Flush completed windows (watermark = 70000ms)
results = processor.flush_completed_windows(70000)
for result in results:
print(f"Window {result['window_start']}-{result['window_end']}:")
print(f" User counts: {result['aggregates']['user_counts']}")
3. Complex Event Processing (CEP)
from typing import List, Callable
from dataclasses import dataclass
@dataclass
class Pattern:
"""Event pattern for detection."""
name: str
conditions: List[Callable[[Event], bool]]
window_ms: int
class CEPEngine:
"""
Complex Event Processing engine.
Detect patterns in event streams:
- Sequences: A followed by B within time window
- Conditions: Events matching criteria
- Aggregations: Count, sum over window
"""
def __init__(self):
self.patterns: List[Pattern] = []
self.matches: List[Dict] = []
def register_pattern(self, pattern: Pattern):
"""Register a pattern to detect."""
self.patterns.append(pattern)
def detect_patterns(self, events: List[Event]) -> List[Dict]:
"""
Detect registered patterns in event stream.
Uses interval-style processing:
- Sort events by time
- Sliding window over events
- Check pattern conditions
"""
matches = []
# Sort events by timestamp
sorted_events = sorted(events, key=lambda e: e.timestamp)
for pattern in self.patterns:
# Find sequences matching pattern
pattern_matches = self._find_pattern_matches(sorted_events, pattern)
matches.extend(pattern_matches)
return matches
def _find_pattern_matches(
self,
events: List[Event],
pattern: Pattern
) -> List[Dict]:
"""Find all matches of pattern in events."""
matches = []
for i in range(len(events)):
# Try to match pattern starting at event i
match_events = [events[i]]
# Check if first condition matches
if not pattern.conditions[0](events[i]):
continue
# Look for subsequent events matching remaining conditions
j = i + 1
condition_idx = 1
while j < len(events) and condition_idx < len(pattern.conditions):
# Check if within time window
if events[j].timestamp - events[i].timestamp > pattern.window_ms:
break
# Check if condition matches
if pattern.conditions[condition_idx](events[j]):
match_events.append(events[j])
condition_idx += 1
j += 1
# Check if full pattern matched
if condition_idx == len(pattern.conditions):
matches.append({
'pattern': pattern.name,
'events': match_events,
'start_time': events[i].timestamp,
'end_time': match_events[-1].timestamp
})
return matches
# Example: Fraud detection pattern
def fraud_detection_example():
"""Detect potential fraud: multiple failed logins followed by success."""
cep = CEPEngine()
# Define pattern
pattern = Pattern(
name="suspicious_login",
conditions=[
lambda e: e.event_type == "login_failed",
lambda e: e.event_type == "login_failed",
lambda e: e.event_type == "login_failed",
lambda e: e.event_type == "login_success"
],
window_ms=60000 # Within 1 minute
)
cep.register_pattern(pattern)
# Test events
events = [
Event("1", "login_failed", 1000, "user1", {}),
Event("2", "login_failed", 2000, "user1", {}),
Event("3", "login_failed", 3000, "user1", {}),
Event("4", "login_success", 4000, "user1", {}),
]
matches = cep.detect_patterns(events)
for match in matches:
print(f"Pattern '{match['pattern']}' detected:")
print(f" Time window: {match['start_time']}-{match['end_time']}")
print(f" Events: {[e.event_id for e in match['events']]}")
4. State Management with Checkpointing
import pickle
import os
class StateManager:
"""
Manage stateful stream processing with checkpointing.
Features:
- Fault tolerance through checkpoints
- Exactly-once semantics
- State recovery
"""
def __init__(self, checkpoint_dir: str = "/tmp/checkpoints"):
self.checkpoint_dir = checkpoint_dir
self.state: Dict[str, Any] = {}
self.checkpoint_interval_ms = 60000
self.last_checkpoint_time = 0
os.makedirs(checkpoint_dir, exist_ok=True)
def update_state(self, key: str, value: Any):
"""Update state."""
self.state[key] = value
def get_state(self, key: str, default: Any = None) -> Any:
"""Get state value."""
return self.state.get(key, default)
def checkpoint(self, watermark: int):
"""
Create state checkpoint.
Similar to saving merged intervals periodically.
"""
checkpoint_path = os.path.join(
self.checkpoint_dir,
f"checkpoint_{watermark}.pkl"
)
with open(checkpoint_path, 'wb') as f:
pickle.dump({
'watermark': watermark,
'state': self.state
}, f)
self.last_checkpoint_time = watermark
# Clean old checkpoints
self._cleanup_old_checkpoints(watermark)
def restore_from_checkpoint(self, watermark: Optional[int] = None):
"""Restore state from checkpoint."""
if watermark is None:
# Find latest checkpoint
checkpoints = [
f for f in os.listdir(self.checkpoint_dir)
if f.startswith("checkpoint_")
]
if not checkpoints:
return
latest = max(checkpoints, key=lambda f: int(f.split('_')[1].split('.')[0]))
checkpoint_path = os.path.join(self.checkpoint_dir, latest)
else:
checkpoint_path = os.path.join(
self.checkpoint_dir,
f"checkpoint_{watermark}.pkl"
)
if os.path.exists(checkpoint_path):
with open(checkpoint_path, 'rb') as f:
data = pickle.load(f)
self.state = data['state']
return data['watermark']
return None
def _cleanup_old_checkpoints(self, current_watermark: int, keep_last: int = 3):
"""Keep only recent checkpoints."""
checkpoints = [
(f, int(f.split('_')[1].split('.')[0]))
for f in os.listdir(self.checkpoint_dir)
if f.startswith("checkpoint_")
]
# Sort by watermark
checkpoints.sort(key=lambda x: x[1], reverse=True)
# Delete old ones
for checkpoint_file, watermark in checkpoints[keep_last:]:
os.remove(os.path.join(self.checkpoint_dir, checkpoint_file))
Production Deployment
Apache Kafka + Flink Architecture
# docker-compose.yml for stream processing stack
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
flink-jobmanager:
image: flink:latest
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=flink-jobmanager
flink-taskmanager:
image: flink:latest
depends_on:
- flink-jobmanager
command: taskmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=flink-jobmanager
deploy:
replicas: 3
Kafka Producer
from kafka import KafkaProducer
import json
class EventProducer:
"""Produce events to Kafka."""
def __init__(self, bootstrap_servers: List[str], topic: str):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
self.topic = topic
def send_event(self, event: Event):
"""Send event to Kafka."""
event_dict = {
'event_id': event.event_id,
'event_type': event.event_type,
'timestamp': event.timestamp,
'user_id': event.user_id,
'data': event.data
}
self.producer.send(
self.topic,
value=event_dict,
key=event.user_id.encode('utf-8') # Partition by user
)
def flush(self):
"""Flush pending messages."""
self.producer.flush()
Scaling Strategies
Horizontal Scaling
# Kafka topics with multiple partitions for parallelism
def create_kafka_topic(admin_client, topic_name: str, num_partitions: int = 10):
"""Create Kafka topic with partitions."""
from kafka.admin import NewTopic
topic = NewTopic(
name=topic_name,
num_partitions=num_partitions,
replication_factor=3
)
admin_client.create_topics([topic])
Auto-scaling Based on Lag
class StreamProcessorAutoScaler:
"""Auto-scale stream processors based on consumer lag."""
def __init__(self, max_lag_threshold: int = 10000):
self.max_lag_threshold = max_lag_threshold
def should_scale_up(self, consumer_lag: int) -> bool:
"""Check if should add more processors."""
return consumer_lag > self.max_lag_threshold
def should_scale_down(self, consumer_lag: int) -> bool:
"""Check if can reduce processors."""
return consumer_lag < self.max_lag_threshold * 0.5
Real-World Case Study: Netflix Event Processing
Netflix’s Approach
Netflix processes 10M+ events/second for real-time recommendations:
Architecture:
- Kafka: 36+ clusters, 4000+ brokers
- Flink: Real-time stream processing
- Keystone: Real-time data pipeline
- Mantis: Reactive stream processing
Use Cases:
- Real-time viewing analytics
- Recommendation updates
- A/B test metric computation
- Anomaly detection
Results:
- 10M events/sec throughput
- <100ms p99 latency
- 99.99% availability
- Petabytes/day processed
Key Lessons
- Partition strategically - by user ID for locality
- Use watermarks for late data handling
- Checkpoint frequently for fault tolerance
- Monitor lag closely - key metric for health
- Test backpressure - must handle traffic spikes
Cost Analysis
Infrastructure Costs (1M events/sec)
| Component | Nodes | Cost/Month | Notes |
|---|---|---|---|
| Kafka brokers | 10 | $5,000 | r5.2xlarge |
| Flink workers | 20 | $8,000 | c5.4xlarge |
| State storage | - | $500 | S3 for checkpoints |
| Monitoring | - | $200 | Prometheus + Grafana |
| Total | $13,700/month | $0.37 per million events |
Optimization Strategies
- Batch processing: Micro-batches reduce overhead
- Compression: Reduce network/storage costs by 70%
- State backends: RocksDB vs in-memory trade-offs
- Spot instances: 70% cost reduction for stateless workers
Key Takeaways
✅ Windows are intervals - same merging logic applies
✅ Event time vs processing time - critical distinction
✅ Watermarks enable late data handling
✅ State management requires checkpointing for fault tolerance
✅ Exactly-once semantics possible with careful design
✅ Kafka + Flink is industry standard stack
✅ Partition for parallelism - key to horizontal scaling
✅ Monitor consumer lag - critical health metric
✅ Backpressure handling essential for reliability
✅ Same interval processing as merge intervals problem
Connection to Thematic Link: Interval Processing and Temporal Reasoning
All three topics share interval/window processing:
DSA (Merge Intervals):
- Sort intervals by start time
- Merge overlapping ranges
- O(N log N) greedy algorithm
ML System Design (Event Stream Processing):
- Sort events by timestamp
- Merge event windows
- Windowed aggregations
Speech Tech (Audio Segmentation):
- Sort audio segments temporally
- Merge adjacent segments
- Boundary detection
Universal Pattern
# Pattern used across all three:
1. Sort items by time/position
2. Process in temporal order
3. Merge adjacent/overlapping ranges
4. Apply aggregations within ranges
This pattern is fundamental to temporal data processing!
Originally published at: arunbaby.com/ml-system-design/0016-event-stream-processing
If you found this helpful, consider sharing it with others who might benefit.