How to Process Call Detail Records in Real Time

How to Process Call Detail Records in Real Time

Call Detail Records are the financial source of truth for every telecom operator — but traditional CDR processing batches data in hourly or daily files, delaying revenue assurance and fraud detection by hours. RisingWave, a PostgreSQL-compatible streaming database, processes CDRs as they flow from the mediation layer, giving revenue assurance, fraud, and product teams live analytics without rewriting their SQL skills.

Why Real-Time CDR Processing Matters

Every minute a CDR sits in a batch file is a minute a fraudulent call goes undetected and a minute a revenue assurance exception goes unresolved. In voice interconnect fraud, where fraudulent traffic peaks and disappears within minutes, batch processing is useless.

Beyond fraud, real-time CDR analytics enable:

  • Live usage dashboards for prepaid balance management
  • Interconnect cost monitoring — catch unexpected peering partner traffic spikes
  • Regulatory compliance — some jurisdictions require near-real-time lawful intercept correlation
  • Product analytics — understand calling patterns for bundle design

CDR fields that matter:

  • A-number (calling party) and B-number (called party)
  • Call duration (seconds)
  • Bytes transferred (for data CDRs)
  • Call type (voice, SMS, data, roaming)
  • Start time / end time
  • Cell tower ID at call start
  • SIM ID (IMSI) and device (IMEI)
  • Termination reason (normal, dropped, busy, etc.)

How Streaming SQL Solves This

RisingWave ingests CDRs from Kafka as they are emitted by the mediation platform. Materialized views aggregate usage by subscriber, destination, and call type in real time. Fraud detection rules — such as rapid call velocity or premium-rate number dialing — are expressed as SQL conditions and evaluated continuously without a separate rules engine.

Step-by-Step Tutorial

Step 1: Data Source

Create a Kafka source for voice and data CDRs. Both types share a common schema with type-specific nullable fields.

CREATE SOURCE cdr_stream (
    cdr_id              VARCHAR,
    cdr_type            VARCHAR,    -- VOICE, SMS, DATA, ROAMING
    imsi                VARCHAR,    -- subscriber identifier
    imei                VARCHAR,    -- device identifier
    a_number            VARCHAR,    -- calling number (E.164)
    b_number            VARCHAR,    -- called number (E.164)
    call_start          TIMESTAMPTZ,
    call_end            TIMESTAMPTZ,
    duration_sec        INT,
    bytes_uploaded      BIGINT,
    bytes_downloaded    BIGINT,
    cell_id_start       VARCHAR,    -- cell tower at call start
    call_type_detail    VARCHAR,    -- MO, MT, roaming_out, etc.
    termination_reason  VARCHAR,    -- NORMAL, DROPPED, BUSY, etc.
    destination_country VARCHAR,
    event_time          TIMESTAMPTZ  -- CDR generation time
)
WITH (
    connector      = 'kafka',
    topic          = 'telecom.cdr.stream',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON
( TIMESTAMP COLUMN = event_time );

Step 2: Core View

Aggregate CDRs by subscriber and destination over 15-minute windows for usage monitoring, and compute per-subscriber call velocity for fraud detection.

-- Usage summary per subscriber per 15 minutes
CREATE MATERIALIZED VIEW subscriber_usage_15min AS
SELECT
    imsi,
    cdr_type,
    window_start,
    window_end,
    COUNT(*)                                            AS call_count,
    SUM(duration_sec)                                  AS total_duration_sec,
    SUM(bytes_downloaded + bytes_uploaded)             AS total_bytes,
    COUNT(*) FILTER (WHERE termination_reason = 'DROPPED') AS dropped_calls,
    COUNT(DISTINCT b_number)                           AS distinct_destinations
FROM TUMBLE(cdr_stream, event_time, INTERVAL '15 MINUTES')
GROUP BY imsi, cdr_type, window_start, window_end;

-- International call aggregation (cost monitoring)
CREATE MATERIALIZED VIEW intl_calls_15min AS
SELECT
    imsi,
    destination_country,
    window_start,
    window_end,
    COUNT(*)          AS call_count,
    SUM(duration_sec) AS total_duration_sec
FROM TUMBLE(cdr_stream, event_time, INTERVAL '15 MINUTES')
WHERE cdr_type = 'VOICE'
  AND destination_country <> 'HOME'  -- replace with your country code
GROUP BY imsi, destination_country, window_start, window_end;

-- Call velocity per subscriber (last 5 minutes) for fraud detection
CREATE MATERIALIZED VIEW call_velocity_5min AS
SELECT
    imsi,
    window_start,
    window_end,
    COUNT(*)                                            AS calls_initiated,
    COUNT(DISTINCT b_number)                           AS unique_destinations,
    SUM(duration_sec)                                  AS total_duration_sec,
    COUNT(*) FILTER (WHERE b_number LIKE '09%' OR b_number LIKE '+449%') AS premium_calls
FROM TUMBLE(cdr_stream, event_time, INTERVAL '5 MINUTES')
WHERE cdr_type = 'VOICE'
GROUP BY imsi, window_start, window_end;

Step 3: Alerts and Sinks

Emit fraud signals and revenue assurance exceptions.

-- Fraud velocity alert: >20 calls or >5 premium-rate calls in 5 minutes
CREATE MATERIALIZED VIEW cdr_fraud_alerts AS
SELECT
    imsi,
    window_end        AS alert_time,
    calls_initiated,
    unique_destinations,
    premium_calls,
    total_duration_sec,
    CASE
        WHEN premium_calls > 5              THEN 'IRSF_SUSPECTED'
        WHEN calls_initiated > 50           THEN 'CALL_STORM'
        WHEN unique_destinations > 30       THEN 'WANGIRI_SUSPECTED'
        ELSE 'VELOCITY_ANOMALY'
    END AS fraud_type,
    CASE
        WHEN premium_calls > 5 OR calls_initiated > 50 THEN 'CRITICAL'
        ELSE 'WARNING'
    END AS severity
FROM call_velocity_5min
WHERE calls_initiated > 20 OR premium_calls > 2;

CREATE SINK cdr_fraud_alerts_sink
FROM cdr_fraud_alerts
WITH (
    connector  = 'kafka',
    topic      = 'telecom.fraud.alerts',
    properties.bootstrap.server = 'kafka:9092'
)
FORMAT PLAIN ENCODE JSON;

-- Usage aggregation sink for real-time billing system
CREATE SINK subscriber_usage_sink
FROM subscriber_usage_15min
WITH (
    connector  = 'kafka',
    topic      = 'telecom.billing.usage',
    properties.bootstrap.server = 'kafka:9092'
)
FORMAT PLAIN ENCODE JSON;

Comparison Table

ApproachCDR LatencyFraud DetectionSQL-QueryableBilling Integration
Nightly batch files12–24 hoursNext-dayVia exportBatch
Hourly mediation batch1 hour1 hourVia BI toolDelayed
Custom streaming jobSecondsSecondsNo (API)Complex
RisingWave streaming SQLSecondsSecondsYesKafka sink

FAQ

Q: How do I handle CDRs that arrive out of order from different mediation nodes? Configure a watermark on event_time with a tolerance of 2–5 minutes. RisingWave will wait for late CDRs before closing a window, ensuring aggregations are complete even when records arrive from geographically distributed mediation platforms.

Q: Can I correlate voice CDRs with SS7 signaling events to detect CLI spoofing? Yes. Create a second source for SS7 ISUP events and join it to the CDR stream on the call reference number. If the A-number in the CDR does not match the originating address in ISUP, flag the record for CLI spoofing investigation.

Q: How do I feed real-time CDR aggregations into a prepaid charging system? Create a materialized view that computes cumulative usage per IMSI within the current billing day. Write this view to a PostgreSQL sink or expose it via the PostgreSQL protocol for the charging system to poll. Update the view as each new CDR arrives.

Key Takeaways

  • RisingWave processes CDRs as they leave the mediation layer, collapsing fraud detection latency from hours to seconds.
  • Velocity-based fraud detection for IRSF, Wangiri, and call-storm patterns is expressible in a few lines of SQL — no separate rules engine required.
  • Usage aggregation views feed billing and balance management systems in near real time, enabling prepaid balance protection and interconnect cost monitoring.
  • The PostgreSQL-compatible interface means revenue assurance teams can query live CDR data with the SQL skills they already have.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.