Stream Processing

Real-Time Call Detail Record (CDR) Processor

Event-driven CDR processing pipeline for real-time call analytics, billing aggregation, and fraud detection using AWS Kinesis, Lambda, and SIP protocol integration.

Client / Context
Voice over IP (VoIP) Service Provider
Timeline
Q2 2023 - Q4 2023
Role
Solutions Architect & Stream Processing Engineer
Technology Stack
AWS Kinesis Data Streams AWS Lambda Amazon Timestream S3 SIP Protocol Asterisk PBX Node.js Python QuickSight

Project Overview

Built a real-time CDR processing pipeline that ingests, aggregates, and analyzes call detail records from a SIP-based soft-phone system, processing 500,000+ call records per day.

Implemented stream processing using AWS Kinesis with Lambda consumers for real-time analytics including top callers by data usage, call duration analysis, and fraud pattern detection.

Integrated with Asterisk PBX to generate CDRs compliant with ITU-T standards, storing raw records in S3 and aggregated metrics in Amazon Timestream for time-series analysis.

The Challenge

Problem Statement

This project addressed multiple critical technical challenges:

1. High-Velocity Stream Processing

Processing 500K+ CDRs per day (5-6 records/second average, 50+ records/second peak) required careful Kinesis shard configuration and Lambda concurrency tuning to prevent throttling and maintain sub-second processing latency.

2. CDR Schema Complexity

SIP call records include multiple legs (caller→proxy→callee), redirections, transfers, and conference calls. Accurately attributing call duration, data usage, and costs across complex call flows required deep SIP protocol understanding.

Solution Architecture

graph TB SIPClients["SIP Softphones
(Calling Parties)"] Asterisk["Asterisk PBX
(SIP Server)"] CDRGen["CDR Generator
(Custom Module)"] Kinesis["Kinesis Data Stream
(CDR Ingestion)"] subgraph Processing["Processing Layer"] Lambda1["Lambda Consumer 1
(Aggregation)"] Lambda2["Lambda Consumer 2
(Fraud Detection)"] Lambda3["Lambda Consumer 3
(Billing)"] end subgraph Storage["Storage Layer"] S3Raw[("S3
Raw CDRs")] Timestream[("Timestream
Metrics")] DynamoDB[("DynamoDB
Aggregates")] end QuickSight["QuickSight
Dashboard"] SIPClients -->|"SIP INVITE/BYE"| Asterisk Asterisk -->|"Call Events"| CDRGen CDRGen -->|"JSON CDRs"| Kinesis Kinesis --> Lambda1 Kinesis --> Lambda2 Kinesis --> Lambda3 Lambda1 --> S3Raw Lambda1 --> Timestream Lambda2 --> DynamoDB Lambda3 --> DynamoDB Timestream --> QuickSight DynamoDB --> QuickSight style Asterisk fill:#48bb78,stroke:#333,stroke-width:2px,color:#fff style Kinesis fill:#ed8936,stroke:#333,stroke-width:2px,color:#fff style Lambda1 fill:#4299e1,stroke:#333,stroke-width:2px,color:#fff style Lambda2 fill:#4299e1,stroke:#333,stroke-width:2px,color:#fff style Lambda3 fill:#4299e1,stroke:#333,stroke-width:2px,color:#fff style QuickSight fill:#9f7aea,stroke:#333,stroke-width:2px,color:#fff

Key Features & Implementation

  • SIP CDR Generation: Asterisk PBX integration with custom CDR module generating ITU-T compliant records
  • Real-Time Ingestion: Kinesis Data Streams with 3 shards handling 1,000 records/second capacity
  • Stream Processing: Lambda consumers with batch processing (100 records/invocation) for efficiency
  • Call Aggregation: Real-time metrics by phone number, call direction, hour of day, and destination
  • Fraud Detection: Pattern analysis detecting unusual call volumes, international dialing anomalies
  • Billing Aggregation: Per-user monthly summaries with call duration, data usage, and cost calculations
  • Time-Series Analytics: Amazon Timestream storing 13 months of granular call metrics
  • Interactive Dashboard: QuickSight visualizations showing top 10 users, call trends, and revenue metrics

Implementation Highlights

1. CDR Schema Definition

TypeScript interface for Call Detail Record structure:

src/types/cdr.ts
export interface CallDetailRecord {
  // Call Identification
  callId: string;                    // Unique call identifier
  recordId: string;                  // CDR record ID
  recordType: 'START' | 'STOP' | 'INTERIM' | 'EVENT';
  
  // Timestamp Information
  startTime: string;                 // ISO 8601 format
  answerTime?: string;               // When call was answered
  endTime?: string;                  // When call ended
  duration: number;                  // Total call duration in seconds
  billableSeconds: number;           // Rounded duration for billing
  
  // Party Information
  callingParty: {
    number: string;                  // E.164 format (+1234567890)
    numberType: 'MOBILE' | 'LANDLINE' | 'VOIP';
    subscriberId: string;            // Internal subscriber ID
    location: {
      country: string;
      city?: string;
      coordinates?: { lat: number; lon: number };
    };
  };
  
  calledParty: {
    number: string;
    numberType: 'MOBILE' | 'LANDLINE' | 'VOIP' | 'INTERNATIONAL';
    destination: string;             // Country/region
  };
  
  // Call Characteristics
  callType: 'VOICE' | 'VIDEO' | 'CONFERENCE';
  callDirection: 'MOBILE_ORIGINATED' | 'MOBILE_TERMINATED';
  terminationReason: 'NORMAL' | 'BUSY' | 'NO_ANSWER' | 'FAILED' | 'CANCELLED';
  
  // Network Information
  sipData: {
    fromUri: string;                 // SIP From header
    toUri: string;                   // SIP To header
    callIdHeader: string;            // SIP Call-ID
    userAgent: string;               // SIP User-Agent
    codec: string;                   // Audio codec (G.711, Opus, etc.)
    ipAddress: string;               // Caller IP address
  };
  
  // Quality Metrics
  qos?: {
    jitter: number;                  // Milliseconds
    packetLoss: number;              // Percentage
    latency: number;                 // Milliseconds
    mos: number;                     // Mean Opinion Score (1-5)
  };
  
  // Billing Information
  cost: {
    currency: string;                // ISO 4217 (USD, EUR, etc.)
    amount: number;                  // Total call cost
    perMinuteRate: number;          // Rate charged
    taxAmount: number;               // Tax portion
  };
  
  // Additional Metadata
  metadata: {
    serverNode: string;              // Which PBX handled the call
    recordingId?: string;            // If call was recorded
    tags: string[];                  // Custom tags (fraud-alert, premium, etc.)
  };
}

2. CDR Stream Processor Lambda

Lambda function for processing CDR records from Kinesis:

src/processors/cdrAggregator.ts
import { KinesisStreamEvent, KinesisStreamRecord } from 'aws-lambda';
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { DynamoDBDocumentClient, UpdateCommand } from '@aws-sdk/lib-dynamodb';
import { CallDetailRecord } from '../types/cdr';

export class CDRAggregator {
  private docClient: DynamoDBDocumentClient;
  private tableName: string;

  constructor() {
    const client = new DynamoDBClient({});
    this.docClient = DynamoDBDocumentClient.from(client);
    this.tableName = process.env.AGGREGATES_TABLE!;
  }

  async processBatch(records: KinesisStreamRecord[]): Promise<void> {
    const aggregates = new Map<string, AggregateMetrics>();

    // Process all records in batch
    for (const record of records) {
      const cdr = this.decodeCDR(record);
      
      if (cdr.recordType === 'STOP' || cdr.recordType === 'INTERIM') {
        this.aggregateCDR(cdr, aggregates);
      }
    }

    // Write aggregates to DynamoDB
    await this.writeAggregates(aggregates);
  }

  private decodeCDR(record: KinesisStreamRecord): CallDetailRecord {
    const payload = Buffer.from(record.kinesis.data, 'base64').toString('utf-8');
    return JSON.parse(payload) as CallDetailRecord;
  }

  private aggregateCDR(
    cdr: CallDetailRecord,
    aggregates: Map<string, AggregateMetrics>
  ): void {
    const subscriberId = cdr.callingParty.subscriberId;
    const dateKey = cdr.startTime.substring(0, 10); // YYYY-MM-DD
    const hourKey = cdr.startTime.substring(11, 13); // HH
    
    const key = `${subscriberId}#${dateKey}#${hourKey}`;
    
    if (!aggregates.has(key)) {
      aggregates.set(key, {
        subscriberId,
        date: dateKey,
        hour: parseInt(hourKey),
        callCount: 0,
        totalDuration: 0,
        totalCost: 0,
        callsByDirection: { MO: 0, MT: 0 },
        callsByType: { VOICE: 0, VIDEO: 0, CONFERENCE: 0 }
      });
    }

    const agg = aggregates.get(key)!;
    agg.callCount++;
    agg.totalDuration += cdr.duration;
    agg.totalCost += cdr.cost.amount;
    
    if (cdr.callDirection === 'MOBILE_ORIGINATED') {
      agg.callsByDirection.MO++;
    } else {
      agg.callsByDirection.MT++;
    }
    
    agg.callsByType[cdr.callType]++;
  }

  private async writeAggregates(
    aggregates: Map<string, AggregateMetrics>
  ): Promise<void> {
    const writePromises = Array.from(aggregates.entries()).map(
      async ([key, metrics]) => {
        const [subscriberId, date, hour] = key.split('#');
        
        await this.docClient.send(new UpdateCommand({
          TableName: this.tableName,
          Key: {
            subscriberId,
            dateHour: `${date}#${hour}`
          },
          UpdateExpression: `
            ADD callCount :count,
                totalDuration :duration,
                totalCost :cost,
                callsByDirection.MO :mo,
                callsByDirection.MT :mt,
                callsByType.VOICE :voice,
                callsByType.VIDEO :video
          `,
          ExpressionAttributeValues: {
            ':count': metrics.callCount,
            ':duration': metrics.totalDuration,
            ':cost': metrics.totalCost,
            ':mo': metrics.callsByDirection.MO,
            ':mt': metrics.callsByDirection.MT,
            ':voice': metrics.callsByType.VOICE || 0,
            ':video': metrics.callsByType.VIDEO || 0
          }
        }));
      }
    );

    await Promise.all(writePromises);
  }
}

interface AggregateMetrics {
  subscriberId: string;
  date: string;
  hour: number;
  callCount: number;
  totalDuration: number;
  totalCost: number;
  callsByDirection: { MO: number; MT: number };
  callsByType: { VOICE: number; VIDEO: number; CONFERENCE: number };
}

export const handler = async (event: KinesisStreamEvent): Promise<void> => {
  const aggregator = new CDRAggregator();
  
  try {
    await aggregator.processBatch(event.Records);
    console.log(`Processed ${event.Records.length} CDR records`);
  } catch (error) {
    console.error('Error processing CDR batch:', error);
    throw error; // Trigger Lambda retry
  }
};

Results & Impact

Outcomes Achieved

  • Processing Capacity: 500,000+ CDRs per day with average latency of 800ms
  • Real-Time Analytics: Dashboard updates within 2 seconds of call completion
  • Cost Savings: 85% reduction in CDR storage costs using S3 vs. traditional database
  • Fraud Detection: Identified $50K+ in fraudulent calls within first 3 months
  • Billing Accuracy: 99.98% accuracy vs. manual billing verification
  • Dashboard Adoption: Used daily by operations team, customer service, and finance

Technical Insights & Best Practices

CDR Standards & Best Practices

  • ITU-T Rec. Q.825 specifies CDR field requirements for billing and regulatory compliance
  • Store raw CDRs immutably in S3 for audit trail and regulatory requirements (7+ years)
  • Use E.164 format for phone numbers to ensure international compatibility
  • Distinguish between answered calls (duration > 0) and attempts for accurate metrics

Stream Processing Optimization

  • Use Kinesis batch processing (100-500 records) to reduce Lambda invocations by 99%
  • Implement partial batch failure handling to retry only failed records
  • Use DynamoDB atomic counters for real-time aggregates without read-modify-write
  • Set appropriate Kinesis shard iterator age alarms (< 1 hour) to detect processing delays

Standards & References

  • ITU-T Recommendation Q.825: Specification of charging detail records
  • IETF RFC 3261: Session Initiation Protocol (SIP)
  • IETF RFC 4566: Session Description Protocol (SDP)
  • ITU-T Recommendation E.164: International telephone numbering plan
  • AWS Well-Architected Framework: Performance Efficiency Pillar
Back to Portfolio