23 minute read

Master the pattern behind online algorithms, streaming analytics, and dynamic programming, a single elegant idea powering countless production systems.

Problem

Given an integer array nums, find the subarray with the largest sum, and return its sum.

A subarray is a contiguous non-empty sequence of elements within an array.

Example 1:

Input: nums = [-2,1,-3,4,-1,2,1,-5,4]
Output: 6
Explanation: The subarray [4,-1,2,1] has the largest sum 6.

Example 2:

Input: nums = [1]
Output: 1
Explanation: The subarray [1] has the largest sum 1.

Example 3:

Input: nums = [5,4,-1,7,8]
Output: 23
Explanation: The subarray [5,4,-1,7,8] has the largest sum 23.

Constraints:

  • 1 <= nums.length <= 10^5
  • -10^4 <= nums[i] <= 10^4

Intuition

Key Insight: At each position, decide whether to:

  1. Extend the current subarray by including this element
  2. Start fresh from this element

Why this works: If the sum up to the previous element is negative, it can only hurt the sum, better to start fresh.

This is Kadane’s Algorithm: a classic example of greedy + dynamic programming.


Approach 1: Brute Force (Not Optimal)

Try all possible subarrays.

Implementation

from typing import List

def maxSubArrayBruteForce(nums: List[int]) -> int:
    """
    Try all subarrays
    
    Time: O(n²)
    Space: O(1)
    """
    n = len(nums)
    max_sum = float('-inf')
    
    for i in range(n):
        current_sum = 0
        for j in range(i, n):
            current_sum += nums[j]
            max_sum = max(max_sum, current_sum)
    
    return max_sum

# Example
print(maxSubArrayBruteForce([-2,1,-3,4,-1,2,1,-5,4]))  # 6

Time Complexity: O(n²)
Space Complexity: O(1)

Why it’s bad: For n = 100,000, this requires 10 billion operations, too slow for production.


Approach 2: Kadane’s Algorithm (Optimal)

Track the maximum sum ending at each position.

Implementation

from typing import List

def maxSubArray(nums: List[int]) -> int:
    """
    Kadane's Algorithm
    
    Time: O(n) - single pass
    Space: O(1) - two variables
    
    Algorithm:
    1. Track current_sum (max sum ending here)
    2. At each element: extend OR start fresh
    3. Track global max_sum
    """
    if not nums:
        return 0
    
    current_sum = nums[0]
    max_sum = nums[0]
    
    for i in range(1, len(nums)):
        # Key decision: extend OR start fresh
        current_sum = max(nums[i], current_sum + nums[i])
        
        # Update global maximum
        max_sum = max(max_sum, current_sum)
    
    return max_sum

Detailed Walkthrough

nums = [-2, 1, -3, 4, -1, 2, 1, -5, 4]

Initial:
  current_sum = -2
  max_sum = -2

i=1, nums[i]=1:
  current_sum = max(1, -2+1) = max(1, -1) = 1  (start fresh!)
  max_sum = max(-2, 1) = 1

i=2, nums[i]=-3:
  current_sum = max(-3, 1-3) = max(-3, -2) = -2  (extend, even though negative)
  max_sum = max(1, -2) = 1

i=3, nums[i]=4:
  current_sum = max(4, -2+4) = max(4, 2) = 4  (start fresh!)
  max_sum = max(1, 4) = 4

i=4, nums[i]=-1:
  current_sum = max(-1, 4-1) = max(-1, 3) = 3  (extend)
  max_sum = max(4, 3) = 4

i=5, nums[i]=2:
  current_sum = max(2, 3+2) = max(2, 5) = 5  (extend)
  max_sum = max(4, 5) = 5

i=6, nums[i]=1:
  current_sum = max(1, 5+1) = max(1, 6) = 6  (extend)
  max_sum = max(5, 6) = 6

i=7, nums[i]=-5:
  current_sum = max(-5, 6-5) = max(-5, 1) = 1  (extend)
  max_sum = max(6, 1) = 6

i=8, nums[i]=4:
  current_sum = max(4, 1+4) = max(4, 5) = 5  (extend)
  max_sum = max(6, 5) = 6

Final: max_sum = 6
Subarray: [4, -1, 2, 1]

Why This Works

Invariant: current_sum always holds the maximum sum of a subarray ending at position i.

Correctness:

  • If current_sum < 0, it can’t help future elements → start fresh
  • We consider every possible ending position
  • max_sum tracks the best across all positions

Greedy Choice: At each step, make locally optimal decision (extend or start fresh).


Approach 3: Dynamic Programming Formulation

View as a DP problem for deeper understanding.

Formulation

State: dp[i] = maximum sum of subarray ending at index i

Recurrence:

dp[i] = max(nums[i], dp[i-1] + nums[i])

Base case: dp[0] = nums[0]

Answer: max(dp[0], dp[1], ..., dp[n-1])

Implementation

def maxSubArrayDP(nums: List[int]) -> int:
    """
    Explicit DP formulation
    
    Time: O(n)
    Space: O(n) → can optimize to O(1)
    """
    n = len(nums)
    if n == 0:
        return 0
    
    # DP table
    dp = [0] * n
    dp[0] = nums[0]
    
    # Fill table
    for i in range(1, n):
        dp[i] = max(nums[i], dp[i-1] + nums[i])
    
    # Answer is max of all dp values
    return max(dp)

Optimization: Since dp[i] only depends on dp[i-1], we can use O(1) space → this becomes identical to Kadane’s algorithm!


Returning the Actual Subarray

Modify algorithm to track indices.

def maxSubArrayWithIndices(nums: List[int]) -> tuple[int, int, int]:
    """
    Return (max_sum, start_index, end_index)
    """
    if not nums:
        return (0, -1, -1)
    
    current_sum = nums[0]
    max_sum = nums[0]
    
    # Track indices
    start = 0
    end = 0
    temp_start = 0
    
    for i in range(1, len(nums)):
        # If starting fresh, update temp_start
        if nums[i] > current_sum + nums[i]:
            current_sum = nums[i]
            temp_start = i
        else:
            current_sum = current_sum + nums[i]
        
        # Update global max
        if current_sum > max_sum:
            max_sum = current_sum
            start = temp_start
            end = i
    
    return (max_sum, start, end)

# Usage
nums = [-2,1,-3,4,-1,2,1,-5,4]
max_sum, start, end = maxSubArrayWithIndices(nums)
print(f"Max sum: {max_sum}")
print(f"Subarray: nums[{start}:{end+1}] = {nums[start:end+1]}")
# Output:
# Max sum: 6
# Subarray: nums[3:7] = [4, -1, 2, 1]

Edge Cases & Testing

Edge Cases

def test_edge_cases():
    # Single element
    assert maxSubArray([1]) == 1
    assert maxSubArray([-1]) == -1
    
    # All negative
    assert maxSubArray([-2, -3, -1, -4]) == -1
    
    # All positive
    assert maxSubArray([1, 2, 3, 4]) == 10
    
    # Mixed
    assert maxSubArray([-2, 1, -3, 4, -1, 2, 1, -5, 4]) == 6
    
    # Alternating signs
    assert maxSubArray([5, -3, 5]) == 7
    
    # Zero in array
    assert maxSubArray([0, -3, 1, 1]) == 2
    
    # Large numbers
    assert maxSubArray([10000, -1, 10000]) == 19999

Comprehensive Test Suite

import unittest
from typing import List

class TestMaxSubArray(unittest.TestCase):
    
    def test_example1(self):
        self.assertEqual(maxSubArray([-2,1,-3,4,-1,2,1,-5,4]), 6)
    
    def test_example2(self):
        self.assertEqual(maxSubArray([1]), 1)
    
    def test_example3(self):
        self.assertEqual(maxSubArray([5,4,-1,7,8]), 23)
    
    def test_all_negative(self):
        # When all negative, return the largest (least negative) element
        self.assertEqual(maxSubArray([-3, -2, -5, -1]), -1)
    
    def test_all_positive(self):
        # When all positive, sum is the entire array
        self.assertEqual(maxSubArray([1, 2, 3, 4, 5]), 15)
    
    def test_alternating(self):
        self.assertEqual(maxSubArray([1, -1, 1, -1, 1]), 1)
    
    def test_zeros(self):
        self.assertEqual(maxSubArray([0, 0, 0]), 0)
    
    def test_large_array(self):
        # Performance test: 100k elements
        import random
        large = [random.randint(-100, 100) for _ in range(100000)]
        result = maxSubArray(large)  # Should complete quickly
        self.assertIsInstance(result, int)

if __name__ == '__main__':
    unittest.main()

Variations

Variation 1: Circular Array

Array is circular (can wrap around).

def maxSubarraySumCircular(nums: List[int]) -> int:
    """
    Maximum sum in circular array
    
    Strategy:
    1. Max subarray not wrapping = standard Kadane's
    2. Max subarray wrapping = total_sum - min_subarray
    3. Return max of both
    
    Time: O(n)
    Space: O(1)
    """
    def kadane_max(arr):
        current = arr[0]
        maximum = arr[0]
        for i in range(1, len(arr)):
            current = max(arr[i], current + arr[i])
            maximum = max(maximum, current)
        return maximum
    
    def kadane_min(arr):
        current = arr[0]
        minimum = arr[0]
        for i in range(1, len(arr)):
            current = min(arr[i], current + arr[i])
            minimum = min(minimum, current)
        return minimum
    
    total_sum = sum(nums)
    
    # Case 1: Max subarray not wrapping
    max_kadane = kadane_max(nums)
    
    # Case 2: Max subarray wrapping
    # = total_sum - min_subarray
    min_kadane = kadane_min(nums)
    max_wrap = total_sum - min_kadane
    
    # Edge case: all elements negative
    if max_wrap == 0:
        return max_kadane
    
    return max(max_kadane, max_wrap)

# Example
print(maxSubarraySumCircular([5, -3, 5]))  # 10 (5 + 5, wrapping)
print(maxSubarraySumCircular([1, -2, 3, -2]))  # 3 (just [3])

Variation 2: Maximum Product Subarray

Find subarray with maximum product instead of sum.

def maxProduct(nums: List[int]) -> int:
    """
    Maximum product subarray
    
    Track both max and min (for handling negatives)
    
    Time: O(n)
    Space: O(1)
    """
    if not nums:
        return 0
    
    max_so_far = nums[0]
    min_so_far = nums[0]
    result = nums[0]
    
    for i in range(1, len(nums)):
        # If current number is negative, swap max and min
        if nums[i] < 0:
            max_so_far, min_so_far = min_so_far, max_so_far
        
        # Update max and min
        max_so_far = max(nums[i], max_so_far * nums[i])
        min_so_far = min(nums[i], min_so_far * nums[i])
        
        # Update result
        result = max(result, max_so_far)
    
    return result

# Example
print(maxProduct([2, 3, -2, 4]))  # 6 (subarray [2,3])
print(maxProduct([-2, 0, -1]))  # 0

Connection to ML Systems

Kadane’s algorithm pattern appears everywhere in ML:

1. Streaming Metrics

class StreamingMetrics:
    """
    Track running statistics using Kadane-like pattern
    
    Use case: Monitor model performance in real-time
    """
    
    def __init__(self):
        self.current_window_sum = 0
        self.best_window_sum = float('-inf')
        self.window_start = 0
        self.best_window_start = 0
        self.best_window_end = 0
        self.position = 0
    
    def add_metric(self, value):
        """
        Add new metric value
        
        Tracks best performing window
        """
        # Kadane's pattern: extend or start fresh
        if self.current_window_sum < 0:
            self.current_window_sum = value
            self.window_start = self.position
        else:
            self.current_window_sum += value
        
        # Update best window
        if self.current_window_sum > self.best_window_sum:
            self.best_window_sum = self.current_window_sum
            self.best_window_start = self.window_start
            self.best_window_end = self.position
        
        self.position += 1
    
    def get_best_window(self):
        """Get indices of best performing window"""
        return {
            'sum': self.best_window_sum,
            'start': self.best_window_start,
            'end': self.best_window_end,
            'length': self.best_window_end - self.best_window_start + 1
        }

# Usage: Track model accuracy improvements
metrics = StreamingMetrics()

# Simulate daily accuracy changes
accuracy_deltas = [0.02, 0.01, -0.03, 0.05, 0.03, 0.01, -0.02, 0.04]

for delta in accuracy_deltas:
    metrics.add_metric(delta)

best = metrics.get_best_window()
print(f"Best improvement window: days {best['start']} to {best['end']}")
print(f"Total improvement: {best['sum']:.2f}")

2. A/B Test Analysis

from typing import List

class ABTestWindowAnalyzer:
    """
    Find best time window for A/B test metric
    
    Use Kadane's to find period with max lift
    """
    
    def find_best_test_period(self, daily_lifts: List[float]) -> dict:
        """
        Find consecutive days with maximum cumulative lift
        
        Args:
            daily_lifts: Daily lift (treatment - control) metrics
        
        Returns:
            Best testing period details
        """
        if not daily_lifts:
            return None
        
        current_sum = daily_lifts[0]
        max_sum = daily_lifts[0]
        
        start = 0
        end = 0
        temp_start = 0
        
        for i in range(1, len(daily_lifts)):
            # Kadane's pattern
            if daily_lifts[i] > current_sum + daily_lifts[i]:
                current_sum = daily_lifts[i]
                temp_start = i
            else:
                current_sum += daily_lifts[i]
            
            if current_sum > max_sum:
                max_sum = current_sum
                start = temp_start
                end = i
        
        return {
            'max_cumulative_lift': max_sum,
            'start_day': start,
            'end_day': end,
            'duration_days': end - start + 1,
            'average_daily_lift': max_sum / (end - start + 1)
        }

# Usage
analyzer = ABTestWindowAnalyzer()

# Daily conversion rate lift (treatment - control)
daily_lifts = [0.002, 0.001, -0.001, 0.005, 0.003, 0.002, -0.002, 0.004]

result = analyzer.find_best_test_period(daily_lifts)
print(f"Best test period: days {result['start_day']} to {result['end_day']}")
print(f"Cumulative lift: {result['max_cumulative_lift']:.4f}")
print(f"Average daily lift: {result['average_daily_lift']:.4f}")

3. Batch Processing Optimization

from typing import List

class BatchSizeOptimizer:
    """
    Find optimal batch size range for processing
    
    Use Kadane's pattern to optimize throughput
    """
    
    def find_optimal_batching(self, processing_gains: List[float]) -> dict:
        """
        Find range of batch sizes with max throughput gain
        
        Args:
            processing_gains: Throughput gain per batch size increment
        
        Returns:
            Optimal batch size range
        """
        # Kadane's to find best subarray
        current_gain = 0
        max_gain = float('-inf')
        start_size = 0
        end_size = 0
        temp_start = 0
        
        for i, gain in enumerate(processing_gains):
            if current_gain < 0:
                current_gain = gain
                temp_start = i
            else:
                current_gain += gain
            
            if current_gain > max_gain:
                max_gain = current_gain
                start_size = temp_start
                end_size = i
        
        return {
            'optimal_min_batch': start_size,
            'optimal_max_batch': end_size,
            'total_gain': max_gain
        }

# Usage
optimizer = BatchSizeOptimizer()

# Throughput gains for batch sizes 1-10
# (e.g., batch size 1→2 gains 0.1, 2→3 gains 0.2, etc.)
gains = [0.1, 0.2, 0.15, -0.05, -0.1, 0.3, 0.2, 0.1, -0.15, -0.2]

result = optimizer.find_optimal_batching(gains)
print(f"Optimal batch size range: {result['optimal_min_batch']}-{result['optimal_max_batch']}")
print(f"Expected throughput gain: {result['total_gain']:.2f}")

Advanced Applications in ML Systems

Time-Series Analysis

Kadane’s algorithm for finding anomalies in time-series data.

class TimeSeriesAnomalyDetector:
    """
    Detect anomalous periods in time-series
    
    Uses modified Kadane's to find sustained deviations
    """
    
    def __init__(self, baseline_mean=0.0):
        self.baseline = baseline_mean
    
    def detect_anomalous_period(
        self,
        values: List[float],
        threshold: float = 2.0
    ) -> Dict:
        """
        Find period with maximum cumulative deviation from baseline
        
        Args:
            values: Time-series values
            threshold: Deviation threshold to report
        
        Returns:
            {
                'max_deviation': float,
                'start_idx': int,
                'end_idx': int,
                'is_anomalous': bool
            }
        """
        # Convert to deviations from baseline
        deviations = [v - self.baseline for v in values]
        
        # Apply Kadane's
        current_sum = deviations[0]
        max_sum = deviations[0]
        start = 0
        end = 0
        temp_start = 0
        
        for i in range(1, len(deviations)):
            if deviations[i] > current_sum + deviations[i]:
                current_sum = deviations[i]
                temp_start = i
            else:
                current_sum += deviations[i]
            
            if current_sum > max_sum:
                max_sum = current_sum
                start = temp_start
                end = i
        
        return {
            'max_deviation': max_sum,
            'start_idx': start,
            'end_idx': end,
            'duration': end - start + 1,
            'is_anomalous': max_sum > threshold,
            'values_in_period': values[start:end+1]
        }

# Usage: Detect CPU spike periods
cpu_usage = [45, 50, 48, 75, 80, 85, 90, 78, 52, 48, 50]
detector = TimeSeriesAnomalyDetector(baseline_mean=50)

result = detector.detect_anomalous_period(cpu_usage, threshold=100)
if result['is_anomalous']:
    print(f"Anomaly detected: indices {result['start_idx']}-{result['end_idx']}")
    print(f"Max deviation: {result['max_deviation']:.1f}")
    print(f"Duration: {result['duration']} time steps")

Feature Importance over Time

Track feature contribution windows in ML models.

class FeatureContributionTracker:
    """
    Track windows where features contribute most to predictions
    
    Use Kadane's pattern to find impactful periods
    """
    
    def __init__(self):
        self.feature_impacts = {}
    
    def track_feature_impact(
        self,
        feature_name: str,
        daily_impacts: List[float]
    ) -> Dict:
        """
        Find period where feature had maximum cumulative impact
        
        Args:
            feature_name: Name of feature
            daily_impacts: Daily SHAP values or feature importance
        
        Returns:
            Analysis of most impactful period
        """
        if not daily_impacts:
            return None
        
        # Kadane's algorithm
        current_sum = daily_impacts[0]
        max_sum = daily_impacts[0]
        start = 0
        end = 0
        temp_start = 0
        
        for i in range(1, len(daily_impacts)):
            if daily_impacts[i] > current_sum + daily_impacts[i]:
                current_sum = daily_impacts[i]
                temp_start = i
            else:
                current_sum += daily_impacts[i]
            
            if current_sum > max_sum:
                max_sum = current_sum
                start = temp_start
                end = i
        
        # Also track minimum impact period (negative contribution)
        current_min = daily_impacts[0]
        min_sum = daily_impacts[0]
        min_start = 0
        min_end = 0
        temp_min_start = 0
        
        for i in range(1, len(daily_impacts)):
            if daily_impacts[i] < current_min + daily_impacts[i]:
                current_min = daily_impacts[i]
                temp_min_start = i
            else:
                current_min += daily_impacts[i]
            
            if current_min < min_sum:
                min_sum = current_min
                min_start = temp_min_start
                min_end = i
        
        return {
            'feature_name': feature_name,
            'max_positive_impact': {
                'cumulative': max_sum,
                'start_day': start,
                'end_day': end,
                'duration': end - start + 1,
                'avg_daily': max_sum / (end - start + 1)
            },
            'max_negative_impact': {
                'cumulative': min_sum,
                'start_day': min_start,
                'end_day': min_end,
                'duration': min_end - min_start + 1,
                'avg_daily': min_sum / (min_end - min_start + 1)
            }
        }

# Usage
tracker = FeatureContributionTracker()

# SHAP values for a feature over 30 days
shap_values = [0.1, 0.2, 0.15, -0.05, 0.3, 0.25, 0.2, -0.1, -0.15, 0.05,
               0.1, 0.12, 0.18, 0.22, 0.19, -0.08, 0.1, 0.15, 0.2, 0.25,
               0.3, 0.28, -0.12, -0.2, 0.05, 0.1, 0.15, 0.12, 0.08, 0.1]

analysis = tracker.track_feature_impact('user_engagement_score', shap_values)

print(f"Feature: {analysis['feature_name']}")
print(f"Most impactful period: days {analysis['max_positive_impact']['start_day']}"
      f" to {analysis['max_positive_impact']['end_day']}")
print(f"Cumulative impact: {analysis['max_positive_impact']['cumulative']:.3f}")

Sliding Window with Constraints

Maximum subarray with length constraints.

def maxSubArrayWithConstraints(
    nums: List[int],
    min_length: int = 1,
    max_length: int = None
) -> tuple[int, int, int]:
    """
    Find maximum subarray with length constraints
    
    Args:
        nums: Input array
        min_length: Minimum subarray length
        max_length: Maximum subarray length (None = no limit)
    
    Returns:
        (max_sum, start_idx, end_idx)
    """
    n = len(nums)
    if n < min_length:
        return (float('-inf'), -1, -1)
    
    max_sum = float('-inf')
    best_start = 0
    best_end = 0
    
    # For each starting position
    for start in range(n):
        current_sum = 0
        
        # Try different ending positions
        for end in range(start, n):
            current_sum += nums[end]
            length = end - start + 1
            
            # Check constraints
            if max_length and length > max_length:
                break
            
            if length >= min_length and current_sum > max_sum:
                max_sum = current_sum
                best_start = start
                best_end = end
    
    return (max_sum, best_start, best_end)

# Usage: Find best 3-5 day trading window
prices_changes = [5, -2, 8, -3, 4, -1, 7, -2, 3]
max_profit, start, end = maxSubArrayWithConstraints(
    prices_changes,
    min_length=3,
    max_length=5
)

print(f"Best window: days {start} to {end}")
print(f"Total gain: {max_profit}")
print(f"Window length: {end - start + 1} days")

Divide and Conquer Solution

O(n log n) approach for understanding recursion.

def maxSubArrayDivideConquer(nums: List[int]) -> int:
    """
    Divide and conquer approach
    
    Time: O(n log n)
    Space: O(log n) for recursion stack
    
    Educational value: Shows different algorithmic paradigm
    """
    
    def maxCrossingSum(nums, left, mid, right):
        """
        Find max sum crossing the midpoint
        """
        # Left side of mid
        left_sum = float('-inf')
        current_sum = 0
        for i in range(mid, left - 1, -1):
            current_sum += nums[i]
            left_sum = max(left_sum, current_sum)
        
        # Right side of mid
        right_sum = float('-inf')
        current_sum = 0
        for i in range(mid + 1, right + 1):
            current_sum += nums[i]
            right_sum = max(right_sum, current_sum)
        
        return left_sum + right_sum
    
    def maxSubArrayRecursive(nums, left, right):
        """
        Recursive divide and conquer
        """
        # Base case
        if left == right:
            return nums[left]
        
        # Divide
        mid = (left + right) // 2
        
        # Conquer: three cases
        # 1. Max subarray in left half
        left_max = maxSubArrayRecursive(nums, left, mid)
        
        # 2. Max subarray in right half
        right_max = maxSubArrayRecursive(nums, mid + 1, right)
        
        # 3. Max subarray crossing midpoint
        cross_max = maxCrossingSum(nums, left, mid, right)
        
        # Return maximum of three
        return max(left_max, right_max, cross_max)
    
    return maxSubArrayRecursive(nums, 0, len(nums) - 1)

# Example
nums = [-2, 1, -3, 4, -1, 2, 1, -5, 4]
print(f"Max subarray sum: {maxSubArrayDivideConquer(nums)}")  # 6

Interview Tips & Common Patterns

Recognizing Kadane’s Pattern

When to use Kadane’s:

  • “Maximum/minimum subarray sum”
  • “Best consecutive period”
  • “Optimal window with contiguous elements”
  • “Track running optimum with reset option”

Key characteristics:

  • Contiguous subsequence required
  • Looking for optimum (max/min)
  • Can “start fresh” at any point
  • Single pass possible

Follow-up Questions to Expect

Q1: What if array can be empty?

def maxSubArrayEmptyAllowed(nums: List[int]) -> int:
    """
    Allow empty subarray (return 0 if all negative)
    """
    if not nums:
        return 0
    
    max_sum = 0  # Empty subarray
    current_sum = 0
    
    for num in nums:
        current_sum = max(0, current_sum + num)
        max_sum = max(max_sum, current_sum)
    
    return max_sum

Q2: Return all maximum subarrays (in case of ties)?

def findAllMaxSubarrays(nums: List[int]) -> List[tuple[int, int]]:
    """
    Find all subarrays with maximum sum
    """
    # First, find max sum
    max_sum = maxSubArray(nums)
    
    # Find all subarrays with this sum
    result = []
    n = len(nums)
    
    for i in range(n):
        current_sum = 0
        for j in range(i, n):
            current_sum += nums[j]
            if current_sum == max_sum:
                result.append((i, j))
    
    return result

# Example
nums = [1, 2, -3, 4]
print(findAllMaxSubarrays(nums))  # [(0, 1), (3, 3)] - both sum to 4

Q3: 2D version (maximum sum rectangle)?

def maxSumRectangle(matrix: List[List[int]]) -> int:
    """
    Find maximum sum rectangle in 2D matrix
    
    Strategy: Fix left and right columns, apply Kadane's on rows
    
    Time: O(n² * m) where matrix is n x m
    """
    if not matrix or not matrix[0]:
        return 0
    
    rows = len(matrix)
    cols = len(matrix[0])
    max_sum = float('-inf')
    
    # Try all pairs of columns
    for left in range(cols):
        # Temp array to store row sums
        temp = [0] * rows
        
        for right in range(left, cols):
            # Add current column to temp
            for row in range(rows):
                temp[row] += matrix[row][right]
            
            # Apply Kadane's on temp (1D problem)
            current_sum = temp[0]
            current_max = temp[0]
            
            for i in range(1, rows):
                current_sum = max(temp[i], current_sum + temp[i])
                current_max = max(current_max, current_sum)
            
            max_sum = max(max_sum, current_max)
    
    return max_sum

# Example
matrix = [
    [1, 2, -1, -4],
    [-8, -3, 4, 2],
    [3, 8, 10, -8]
]
print(maxSumRectangle(matrix))  # 19 (rectangle from (0,1) to (2,2))

Production Considerations

Handling Real-World Data

import math

class RobustMaxSubArray:
    """
    Production-ready maximum subarray with validation
    """
    
    def max_subarray(self, nums: List[float]) -> float:
        """
        Handle floating point values, NaN, inf
        """
        # Filter out invalid values
        valid_nums = [
            x for x in nums
            if x is not None and not math.isnan(x) and not math.isinf(x)
        ]
        
        if not valid_nums:
            return 0.0
        
        # Standard Kadane's
        current_sum = valid_nums[0]
        max_sum = valid_nums[0]
        
        for i in range(1, len(valid_nums)):
            current_sum = max(valid_nums[i], current_sum + valid_nums[i])
            max_sum = max(max_sum, current_sum)
        
        return round(max_sum, 6)  # Round for float precision
    
    def max_subarray_with_metadata(self, nums: List[float]) -> Dict:
        """
        Return comprehensive analysis
        """
        if not nums:
            return {
                'max_sum': 0,
                'start': -1,
                'end': -1,
                'length': 0,
                'percentage_of_total': 0
            }
        
        current_sum = nums[0]
        max_sum = nums[0]
        start = 0
        end = 0
        temp_start = 0
        
        for i in range(1, len(nums)):
            if nums[i] > current_sum + nums[i]:
                current_sum = nums[i]
                temp_start = i
            else:
                current_sum += nums[i]
            
            if current_sum > max_sum:
                max_sum = current_sum
                start = temp_start
                end = i
        
        total_sum = sum(nums)
        
        return {
            'max_sum': max_sum,
            'start': start,
            'end': end,
            'length': end - start + 1,
            'percentage_of_array': (end - start + 1) / len(nums) * 100,
            'percentage_of_total': (max_sum / total_sum * 100) if total_sum != 0 else 0,
            'subarray': nums[start:end+1]
        }

Performance Monitoring

import time

class PerformanceTracker:
    """
    Track algorithm performance
    """
    
    def benchmark(self, sizes):
        """Benchmark on different input sizes"""
        for size in sizes:
            nums = [(-1) ** i * (i % 100) for i in range(size)]
            
            start = time.perf_counter()
            result = maxSubArray(nums)
            end = time.perf_counter()
            
            elapsed_ms = (end - start) * 1000
            throughput = size / (end - start) / 1_000_000  # M elements/sec
            
            print(f"n={size:>7}: {elapsed_ms:>8.3f}ms, {throughput:>6.2f} M/s")
    
    def compare_approaches(self, nums):
        """Compare different approaches"""
        approaches = {
            "Kadane's O(n)": maxSubArray,
            "Brute Force O(n²)": maxSubArrayBruteForce,
            "Divide & Conquer O(n log n)": maxSubArrayDivideConquer,
        }
        
        print(f"Array size: {len(nums)}")
        print("-" * 50)
        
        for name, func in approaches.items():
            start = time.perf_counter()
            result = func(nums)
            end = time.perf_counter()
            
            elapsed_ms = (end - start) * 1000
            print(f"{name:30} {elapsed_ms:>8.3f}ms  Result: {result}")

# Run benchmark
tracker = PerformanceTracker()
tracker.benchmark([100, 1_000, 10_000, 100_000, 1_000_000])

# Compare on smaller array
small_array = [(-1) ** i * (i % 10) for i in range(1000)]
tracker.compare_approaches(small_array)

Monitoring in Production

class MaxSubarrayMonitor:
    """
    Monitor Kadane's algorithm in production
    
    Track performance, edge cases, and anomalies
    """
    
    def __init__(self):
        self.execution_count = 0
        self.total_time = 0
        self.edge_case_count = 0
        self.all_negative_count = 0
        self.all_positive_count = 0
    
    def monitored_max_subarray(self, nums: List[int]) -> Dict:
        """
        Wrap max_subarray with monitoring
        """
        self.execution_count += 1
        
        start = time.perf_counter()
        
        # Edge case detection
        if not nums:
            self.edge_case_count += 1
            return {'result': 0, 'edge_case': 'empty_array'}
        
        if all(x < 0 for x in nums):
            self.all_negative_count += 1
        
        if all(x > 0 for x in nums):
            self.all_positive_count += 1
        
        # Execute algorithm
        result = maxSubArray(nums)
        
        end = time.perf_counter()
        self.total_time += (end - start)
        
        return {
            'result': result,
            'execution_time_ms': (end - start) * 1000,
            'array_size': len(nums)
        }
    
    def get_metrics(self) -> Dict:
        """Get performance metrics"""
        if self.execution_count == 0:
            return {}
        
        return {
            'total_executions': self.execution_count,
            'avg_time_ms': (self.total_time / self.execution_count) * 1000,
            'edge_cases': self.edge_case_count,
            'all_negative_arrays': self.all_negative_count,
            'all_positive_arrays': self.all_positive_count,
            'edge_case_rate': self.edge_case_count / self.execution_count * 100
        }

Key Takeaways

Kadane’s algorithm is a perfect example of greedy + DP
Single pass O(n) with O(1) space, optimal for streaming data
Local optimality → global optimality when problem has optimal substructure
Pattern extends to circular arrays, max product, and many ML applications
Production systems use this pattern for online metrics, A/B tests, and batch optimization
Connection to DP helps understand state transitions and decision making
Similar to stock problem (Day 4), both track running optimum in single pass


Master these variations:


Originally published at: arunbaby.com/dsa/0005-maximum-subarray

If you found this helpful, consider sharing it with others who might benefit.