Driver Score IoT Platform

Real-time vehicle telemetry processing with distributed scheduling and quantized event analysis

Building a system that processes vehicle telemetry data from thousands of cars, scores driver behavior in real-time, and predicts maintenance needs requires solving several interesting engineering problems: distributed scheduling, time-series data optimization, and real-time event quantization.

This is a technical deep-dive into the architecture and algorithms behind a driver scoring platform that handled 5-6K concurrent vehicle bookings, processing millions of OBD data points daily.

System Architecture

The system processes two distinct data streams: high-frequency vehicle telemetry (OBD data) and low-frequency booking lifecycle events. The key insight was separating these concerns - telemetry data flows through optimized time-series storage, while booking events trigger scoring workflows through a distributed scheduling system.

+-------------+ +-------------+ +-------------+ +-------------+ | OBD Device |--->| Server |--->| Binary |--->| TimescaleDB | | (CAN Bus) | | (REST API) | | Decoder | | (Time-ser.) | +-------------+ +-------------+ +-------------+ +-------------+ | | (Debezium CDC) v +-------------+ +-------------+ | Booking | | Kafka | | Lifecycle |-------------------------------------------->| (Events) | | Events | +-------------+ +-------------+ | v +-------------+ +-------------+ +-------------+ | SQS |<---| Service |<---| Consumer | | (Scheduler) | | Pickup | | | +-------------+ +-------------+ +-------------+ | v +-------------+ +-------------+ +-------------+ | SQS |--->| Distance |--->| SQS | | (Intervals) | | Calculator | | (Events) | +-------------+ +-------------+ +-------------+ | v +-------------+ | Event | | Scoring | | Algorithm | +-------------+ | v +-------------+ | Final Score | | & Publish | +-------------+

Key Design Decisions

TimescaleDB for telemetry storage: Migrated from MemSQL to TimescaleDB for superior time-series data compression and query performance. TimescaleDB's automatic partitioning by time intervals enabled efficient queries for specific booking windows without scanning entire datasets.

Separation of data streams: OBD telemetry (high-frequency, high-volume) flows through optimized storage, while booking events (low-frequency, critical) trigger scheduling workflows. This prevents telemetry volume from overwhelming the scoring system.

Change Data Capture (CDC): Used Debezium to capture database changes and publish to Kafka, enabling real-time data pipeline without impacting the primary telemetry ingestion performance.

Distributed Scheduling with SQS

The core challenge: score driver behavior every 5 minutes for each active booking, handling 5-6K concurrent bookings without external schedulers or distributed timer systems.

The Problem: Traditional cron jobs or timer-based systems become complex when managing thousands of independent schedules with different start times, variable durations, and potential failures.

Self-Perpetuating Queue Pattern

The solution leverages SQS's VisibilityTimeout feature to create a self-scheduling system where each message schedules its own successor:

// Message structure carrying all necessary context { "booking_id": "abc123", "car_id": "vehicle_456", "car_type": "sedan", "booking_start": "2023-01-01T10:00:00Z", "next_score_time": "2023-01-01T10:05:00Z", "interval_count": 1 } // Processing logic def process_scoring_message(message): booking_data = parse_message(message) # Calculate score for this interval score = calculate_driver_score(booking_data) # Publish score results publish_score(score) # Schedule next scoring (self-perpetuation) if booking_still_active(booking_data.booking_id): next_message = create_next_interval_message(booking_data) sqs.send_message( MessageBody=next_message, DelaySeconds=300 # 5 minutes )

Why this works: Each message contains complete context (booking metadata, car details, timing information), eliminating the need for external state storage or database lookups during scheduling. The system is inherently fault-tolerant - if a message fails, only that specific booking's scoring is affected.

Performance Characteristics

6K Concurrent Bookings
72K Messages/Hour
300s Scoring Interval
0 External Schedulers

Idle Detection Optimization: Before processing telemetry data, the system checks vehicle movement status:

def is_vehicle_idle(booking_id, time_window): telemetry = fetch_recent_telemetry(booking_id, time_window) lat_variance = calculate_position_variance(telemetry.gps_data) speed_max = max(telemetry.speed_readings) ignition_status = telemetry.ignition_off return (lat_variance < POSITION_THRESHOLD and speed_max == 0 and ignition_status == True) # Skip expensive scoring computation for idle vehicles if is_vehicle_idle(booking_id, last_5_minutes): schedule_next_check() # Just reschedule, skip scoring return

Event Quantization Algorithm

Driver scoring combines two components: baseline smoothness scoring and discrete event analysis. The event scoring system quantizes continuous telemetry data into discrete behavioral events using statistical distribution analysis.

Baseline Score Calculation

Establishes driving smoothness baseline relative to similar vehicles in similar geographic areas:

baseline_score = Σ(smoothness_metrics) / normalization_factor

where smoothness_metrics = [acceleration_variance, jerk_frequency, speed_consistency]

The baseline accounts for vehicle type (sedan vs SUV have different acceleration profiles) and geographic context (city vs highway driving patterns).

Event Detection and Quantization

Continuous telemetry streams are processed to extract discrete driving events:

def extract_driving_events(telemetry_window): events = [] # Acceleration events for i in range(1, len(telemetry_window)): delta_v = telemetry_window[i].speed - telemetry_window[i-1].speed delta_t = telemetry_window[i].timestamp - telemetry_window[i-1].timestamp acceleration = delta_v / delta_t if abs(acceleration) > ACCELERATION_THRESHOLD: events.append({ 'type': 'acceleration' if acceleration > 0 else 'braking', 'magnitude': abs(acceleration), 'timestamp': telemetry_window[i].timestamp }) # Cornering events lateral_accelerations = calculate_lateral_acceleration(telemetry_window) for lat_accel in lateral_accelerations: if abs(lat_accel.magnitude) > CORNERING_THRESHOLD: events.append({ 'type': 'cornering', 'magnitude': abs(lat_accel.magnitude), 'timestamp': lat_accel.timestamp }) return events

Statistical Distribution and Quantile Bucketing

Events are scored against statistical distributions built from historical data of similar vehicle types and geographic regions:

# Build distribution from historical data def build_event_distributions(car_type, geographic_region): historical_events = query_historical_events(car_type, geographic_region) acceleration_dist = build_distribution(historical_events, 'acceleration') braking_dist = build_distribution(historical_events, 'braking') cornering_dist = build_distribution(historical_events, 'cornering') return { 'acceleration': create_quantile_buckets(acceleration_dist), 'braking': create_quantile_buckets(braking_dist), 'cornering': create_quantile_buckets(cornering_dist) } def create_quantile_buckets(distribution): return { 'gentle': distribution.percentile(0, 25), # Bottom 25% 'normal': distribution.percentile(25, 75), # Middle 50% 'aggressive': distribution.percentile(75, 95), # Next 20% 'extreme': distribution.percentile(95, 100) # Top 5% }

Event Scoring: Each detected event is scored based on its quantile bucket:

event_score = bucket_weight × frequency_penalty × magnitude_factor

where:
• gentle events: weight = 1.0
• normal events: weight = 1.2
• aggressive events: weight = 2.0
• extreme events: weight = 4.0

Composite Score Calculation

The final driver score combines baseline smoothness with event scoring:

def calculate_composite_score(baseline_score, events, time_window): # Weighted event scoring event_penalties = 0 for event in events: bucket = classify_event_severity(event) penalty = BUCKET_WEIGHTS[bucket] * event.magnitude event_penalties += penalty # Normalize by time window and expected event frequency normalized_events = event_penalties / (time_window / BASELINE_INTERVAL) # Composite scoring with configurable weights composite_score = ( baseline_score * BASELINE_WEIGHT + # 60% (100 - normalized_events) * EVENT_WEIGHT # 40% ) return max(0, min(100, composite_score)) # Clamp to 0-100 range

Performance Optimizations

Time-Series Query Optimization

TimescaleDB's hypertable partitioning enables efficient querying of specific time windows:

-- Optimized query leveraging time partitioning SELECT timestamp, speed, acceleration_x, acceleration_y, gps_latitude, gps_longitude FROM vehicle_telemetry WHERE booking_id = $1 AND timestamp BETWEEN $2 AND $3 ORDER BY timestamp ASC; -- Automatic partition pruning reduces scan from entire dataset -- to single 5-minute partition

Computation Caching Strategy

Statistical distributions and quantile buckets are pre-computed and cached:

Cache Strategy: Event distributions are calculated daily for each (car_type, geographic_region) combination and cached in Redis. This reduces scoring latency from ~200ms to ~15ms by eliminating real-time statistical computations.

Real-World Challenges and Solutions

Variable OBD Data Frequency

Problem: OBD devices prioritize vehicle operation over data transmission. When battery load is high or processing power is needed for critical vehicle functions, telemetry frequency can drop significantly.

Solution: Adaptive scoring windows and intelligent interpolation:

def adaptive_scoring_window(booking_id, target_interval): available_data = count_telemetry_points(booking_id, target_interval) if available_data < MINIMUM_POINTS_THRESHOLD: # Extend window to gather sufficient data extended_window = target_interval * DATA_SUFFICIENCY_MULTIPLIER return extended_window return target_interval def interpolate_missing_data(telemetry_points): # Use GPS trajectory and last known speed for interpolation # More sophisticated than linear interpolation due to vehicle physics for gap in detect_data_gaps(telemetry_points): interpolated = physics_based_interpolation( gap.start_point, gap.end_point, gap.duration ) telemetry_points.insert_interpolated(interpolated)

Geographic Context Normalization

Challenge: Driver behavior varies significantly between geographic contexts - highway driving vs city traffic vs rural roads require different scoring baselines.

Solution: Dynamic geographic classification using GPS data and map matching:

def classify_geographic_context(gps_trajectory): road_types = map_match_to_road_network(gps_trajectory) context_scores = { 'highway': calculate_highway_percentage(road_types), 'urban': calculate_urban_percentage(road_types), 'rural': calculate_rural_percentage(road_types) } primary_context = max(context_scores.items(), key=lambda x: x[1]) return primary_context[0] # Different scoring parameters for different contexts CONTEXT_PARAMETERS = { 'highway': { 'speed_tolerance': 15, # Higher speed variance acceptable 'acceleration_threshold': 2.5 # Gentler thresholds }, 'urban': { 'speed_tolerance': 5, # Tight speed control expected 'acceleration_threshold': 1.8 # More sensitive to events } }

Key Engineering Insights

Self-scheduling systems scale better than external schedulers: The SQS pattern eliminated the complexity of distributed timer management while providing natural fault isolation - failed bookings don't affect others.

Statistical normalization is crucial for fairness: Raw event detection without geographic and vehicle-type normalization led to biased scoring. A defensive driver in heavy traffic would score worse than an aggressive driver on empty highways.

Time-series databases transform IoT performance: Moving from traditional RDBMS to TimescaleDB reduced query latencies from seconds to milliseconds for time-window operations, enabling real-time scoring.

Production Learnings

Monitoring and Observability: Real-time scoring systems require comprehensive monitoring of both data quality and algorithm performance. We instrumented scoring latency, data completeness percentages, and score distribution changes to detect system degradation.

Algorithm Validation: Scoring algorithms require continuous validation against ground truth. We implemented A/B testing infrastructure to compare algorithm versions and validate scoring accuracy against manual expert assessment.

Graceful Degradation: When telemetry data quality drops below thresholds, the system falls back to simplified scoring models rather than failing completely. This maintains service availability during network issues or device malfunctions.