Real-Time Call Detail Record (CDR) Processor
Event-driven CDR processing pipeline for real-time call analytics, billing aggregation, and fraud detection using AWS Kinesis, Lambda, and SIP protocol integration.
Project Overview
Built a real-time CDR processing pipeline that ingests, aggregates, and analyzes call detail records from a SIP-based soft-phone system, processing 500,000+ call records per day.
Implemented stream processing using AWS Kinesis with Lambda consumers for real-time analytics including top callers by data usage, call duration analysis, and fraud pattern detection.
Integrated with Asterisk PBX to generate CDRs compliant with ITU-T standards, storing raw records in S3 and aggregated metrics in Amazon Timestream for time-series analysis.
The Challenge
Problem Statement
This project addressed multiple critical technical challenges:
1. High-Velocity Stream Processing
Processing 500K+ CDRs per day (5-6 records/second average, 50+ records/second peak) required careful Kinesis shard configuration and Lambda concurrency tuning to prevent throttling and maintain sub-second processing latency.
2. CDR Schema Complexity
SIP call records include multiple legs (caller→proxy→callee), redirections, transfers, and conference calls. Accurately attributing call duration, data usage, and costs across complex call flows required deep SIP protocol understanding.
Solution Architecture
(Calling Parties)"] Asterisk["Asterisk PBX
(SIP Server)"] CDRGen["CDR Generator
(Custom Module)"] Kinesis["Kinesis Data Stream
(CDR Ingestion)"] subgraph Processing["Processing Layer"] Lambda1["Lambda Consumer 1
(Aggregation)"] Lambda2["Lambda Consumer 2
(Fraud Detection)"] Lambda3["Lambda Consumer 3
(Billing)"] end subgraph Storage["Storage Layer"] S3Raw[("S3
Raw CDRs")] Timestream[("Timestream
Metrics")] DynamoDB[("DynamoDB
Aggregates")] end QuickSight["QuickSight
Dashboard"] SIPClients -->|"SIP INVITE/BYE"| Asterisk Asterisk -->|"Call Events"| CDRGen CDRGen -->|"JSON CDRs"| Kinesis Kinesis --> Lambda1 Kinesis --> Lambda2 Kinesis --> Lambda3 Lambda1 --> S3Raw Lambda1 --> Timestream Lambda2 --> DynamoDB Lambda3 --> DynamoDB Timestream --> QuickSight DynamoDB --> QuickSight style Asterisk fill:#48bb78,stroke:#333,stroke-width:2px,color:#fff style Kinesis fill:#ed8936,stroke:#333,stroke-width:2px,color:#fff style Lambda1 fill:#4299e1,stroke:#333,stroke-width:2px,color:#fff style Lambda2 fill:#4299e1,stroke:#333,stroke-width:2px,color:#fff style Lambda3 fill:#4299e1,stroke:#333,stroke-width:2px,color:#fff style QuickSight fill:#9f7aea,stroke:#333,stroke-width:2px,color:#fff
Key Features & Implementation
- SIP CDR Generation: Asterisk PBX integration with custom CDR module generating ITU-T compliant records
- Real-Time Ingestion: Kinesis Data Streams with 3 shards handling 1,000 records/second capacity
- Stream Processing: Lambda consumers with batch processing (100 records/invocation) for efficiency
- Call Aggregation: Real-time metrics by phone number, call direction, hour of day, and destination
- Fraud Detection: Pattern analysis detecting unusual call volumes, international dialing anomalies
- Billing Aggregation: Per-user monthly summaries with call duration, data usage, and cost calculations
- Time-Series Analytics: Amazon Timestream storing 13 months of granular call metrics
- Interactive Dashboard: QuickSight visualizations showing top 10 users, call trends, and revenue metrics
Implementation Highlights
1. CDR Schema Definition
TypeScript interface for Call Detail Record structure:
export interface CallDetailRecord {
// Call Identification
callId: string; // Unique call identifier
recordId: string; // CDR record ID
recordType: 'START' | 'STOP' | 'INTERIM' | 'EVENT';
// Timestamp Information
startTime: string; // ISO 8601 format
answerTime?: string; // When call was answered
endTime?: string; // When call ended
duration: number; // Total call duration in seconds
billableSeconds: number; // Rounded duration for billing
// Party Information
callingParty: {
number: string; // E.164 format (+1234567890)
numberType: 'MOBILE' | 'LANDLINE' | 'VOIP';
subscriberId: string; // Internal subscriber ID
location: {
country: string;
city?: string;
coordinates?: { lat: number; lon: number };
};
};
calledParty: {
number: string;
numberType: 'MOBILE' | 'LANDLINE' | 'VOIP' | 'INTERNATIONAL';
destination: string; // Country/region
};
// Call Characteristics
callType: 'VOICE' | 'VIDEO' | 'CONFERENCE';
callDirection: 'MOBILE_ORIGINATED' | 'MOBILE_TERMINATED';
terminationReason: 'NORMAL' | 'BUSY' | 'NO_ANSWER' | 'FAILED' | 'CANCELLED';
// Network Information
sipData: {
fromUri: string; // SIP From header
toUri: string; // SIP To header
callIdHeader: string; // SIP Call-ID
userAgent: string; // SIP User-Agent
codec: string; // Audio codec (G.711, Opus, etc.)
ipAddress: string; // Caller IP address
};
// Quality Metrics
qos?: {
jitter: number; // Milliseconds
packetLoss: number; // Percentage
latency: number; // Milliseconds
mos: number; // Mean Opinion Score (1-5)
};
// Billing Information
cost: {
currency: string; // ISO 4217 (USD, EUR, etc.)
amount: number; // Total call cost
perMinuteRate: number; // Rate charged
taxAmount: number; // Tax portion
};
// Additional Metadata
metadata: {
serverNode: string; // Which PBX handled the call
recordingId?: string; // If call was recorded
tags: string[]; // Custom tags (fraud-alert, premium, etc.)
};
}
2. CDR Stream Processor Lambda
Lambda function for processing CDR records from Kinesis:
import { KinesisStreamEvent, KinesisStreamRecord } from 'aws-lambda';
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { DynamoDBDocumentClient, UpdateCommand } from '@aws-sdk/lib-dynamodb';
import { CallDetailRecord } from '../types/cdr';
export class CDRAggregator {
private docClient: DynamoDBDocumentClient;
private tableName: string;
constructor() {
const client = new DynamoDBClient({});
this.docClient = DynamoDBDocumentClient.from(client);
this.tableName = process.env.AGGREGATES_TABLE!;
}
async processBatch(records: KinesisStreamRecord[]): Promise<void> {
const aggregates = new Map<string, AggregateMetrics>();
// Process all records in batch
for (const record of records) {
const cdr = this.decodeCDR(record);
if (cdr.recordType === 'STOP' || cdr.recordType === 'INTERIM') {
this.aggregateCDR(cdr, aggregates);
}
}
// Write aggregates to DynamoDB
await this.writeAggregates(aggregates);
}
private decodeCDR(record: KinesisStreamRecord): CallDetailRecord {
const payload = Buffer.from(record.kinesis.data, 'base64').toString('utf-8');
return JSON.parse(payload) as CallDetailRecord;
}
private aggregateCDR(
cdr: CallDetailRecord,
aggregates: Map<string, AggregateMetrics>
): void {
const subscriberId = cdr.callingParty.subscriberId;
const dateKey = cdr.startTime.substring(0, 10); // YYYY-MM-DD
const hourKey = cdr.startTime.substring(11, 13); // HH
const key = `${subscriberId}#${dateKey}#${hourKey}`;
if (!aggregates.has(key)) {
aggregates.set(key, {
subscriberId,
date: dateKey,
hour: parseInt(hourKey),
callCount: 0,
totalDuration: 0,
totalCost: 0,
callsByDirection: { MO: 0, MT: 0 },
callsByType: { VOICE: 0, VIDEO: 0, CONFERENCE: 0 }
});
}
const agg = aggregates.get(key)!;
agg.callCount++;
agg.totalDuration += cdr.duration;
agg.totalCost += cdr.cost.amount;
if (cdr.callDirection === 'MOBILE_ORIGINATED') {
agg.callsByDirection.MO++;
} else {
agg.callsByDirection.MT++;
}
agg.callsByType[cdr.callType]++;
}
private async writeAggregates(
aggregates: Map<string, AggregateMetrics>
): Promise<void> {
const writePromises = Array.from(aggregates.entries()).map(
async ([key, metrics]) => {
const [subscriberId, date, hour] = key.split('#');
await this.docClient.send(new UpdateCommand({
TableName: this.tableName,
Key: {
subscriberId,
dateHour: `${date}#${hour}`
},
UpdateExpression: `
ADD callCount :count,
totalDuration :duration,
totalCost :cost,
callsByDirection.MO :mo,
callsByDirection.MT :mt,
callsByType.VOICE :voice,
callsByType.VIDEO :video
`,
ExpressionAttributeValues: {
':count': metrics.callCount,
':duration': metrics.totalDuration,
':cost': metrics.totalCost,
':mo': metrics.callsByDirection.MO,
':mt': metrics.callsByDirection.MT,
':voice': metrics.callsByType.VOICE || 0,
':video': metrics.callsByType.VIDEO || 0
}
}));
}
);
await Promise.all(writePromises);
}
}
interface AggregateMetrics {
subscriberId: string;
date: string;
hour: number;
callCount: number;
totalDuration: number;
totalCost: number;
callsByDirection: { MO: number; MT: number };
callsByType: { VOICE: number; VIDEO: number; CONFERENCE: number };
}
export const handler = async (event: KinesisStreamEvent): Promise<void> => {
const aggregator = new CDRAggregator();
try {
await aggregator.processBatch(event.Records);
console.log(`Processed ${event.Records.length} CDR records`);
} catch (error) {
console.error('Error processing CDR batch:', error);
throw error; // Trigger Lambda retry
}
};
Results & Impact
Outcomes Achieved
- Processing Capacity: 500,000+ CDRs per day with average latency of 800ms
- Real-Time Analytics: Dashboard updates within 2 seconds of call completion
- Cost Savings: 85% reduction in CDR storage costs using S3 vs. traditional database
- Fraud Detection: Identified $50K+ in fraudulent calls within first 3 months
- Billing Accuracy: 99.98% accuracy vs. manual billing verification
- Dashboard Adoption: Used daily by operations team, customer service, and finance
Technical Insights & Best Practices
CDR Standards & Best Practices
- ITU-T Rec. Q.825 specifies CDR field requirements for billing and regulatory compliance
- Store raw CDRs immutably in S3 for audit trail and regulatory requirements (7+ years)
- Use E.164 format for phone numbers to ensure international compatibility
- Distinguish between answered calls (duration > 0) and attempts for accurate metrics
Stream Processing Optimization
- Use Kinesis batch processing (100-500 records) to reduce Lambda invocations by 99%
- Implement partial batch failure handling to retry only failed records
- Use DynamoDB atomic counters for real-time aggregates without read-modify-write
- Set appropriate Kinesis shard iterator age alarms (< 1 hour) to detect processing delays
Standards & References
- ITU-T Recommendation Q.825: Specification of charging detail records
- IETF RFC 3261: Session Initiation Protocol (SIP)
- IETF RFC 4566: Session Description Protocol (SDP)
- ITU-T Recommendation E.164: International telephone numbering plan
- AWS Well-Architected Framework: Performance Efficiency Pillar