Real-Time Attribution Modeling with Streaming SQL

Real-Time Attribution Modeling with Streaming SQL

Real-time attribution modeling assigns conversion credit to marketing touchpoints—ads, emails, organic search—within seconds of a purchase or sign-up. Using RisingWave's streaming SQL, you maintain continuously updated first-touch, last-touch, and multi-touch attribution scores as events arrive, replacing overnight batch jobs with always-current attribution data.

Why Attribution Latency Destroys Campaign Optimization

Attribution is how marketers know which channels deserve credit for a conversion. When it runs as an overnight batch job, you're making today's bidding decisions based on yesterday's data. In programmatic advertising, where bids are adjusted every few minutes based on performance signals, stale attribution can mean days of misallocated budget before the feedback loop corrects.

Real-time attribution closes this loop. When a conversion fires, the streaming system immediately updates credit scores for every touchpoint in the customer's journey. Bid management systems, budget allocation tools, and campaign managers all see accurate attribution within seconds.

How Streaming Attribution Works

The attribution pipeline in RisingWave tracks two event streams: touchpoint events (ad impressions, clicks, email opens) and conversion events (purchases, sign-ups, form fills). A materialized view joins these streams within a configurable lookback window and distributes credit according to the chosen attribution model.

The three most common models implemented in streaming SQL:

  • Last-touch: 100% credit to the final touchpoint before conversion
  • First-touch: 100% credit to the first touchpoint in the journey
  • Linear: Equal credit split across all touchpoints

Setting Up the Attribution Pipeline

Ingest touchpoint and conversion events from Kafka:

CREATE SOURCE touchpoint_events (
    event_id        VARCHAR,
    user_id         VARCHAR,
    channel         VARCHAR,
    campaign_id     VARCHAR,
    ad_id           VARCHAR,
    event_type      VARCHAR,
    event_time      TIMESTAMPTZ
)
WITH (
    connector = 'kafka',
    topic = 'touchpoint-events',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;

CREATE SOURCE conversion_events (
    conversion_id   VARCHAR,
    user_id         VARCHAR,
    revenue         DECIMAL,
    conversion_type VARCHAR,
    event_time      TIMESTAMPTZ
)
WITH (
    connector = 'kafka',
    topic = 'conversion-events',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;

Create a reference table mapping channels to attribution weights:

CREATE TABLE channel_metadata (
    channel         VARCHAR PRIMARY KEY,
    channel_group   VARCHAR,
    is_paid         BOOLEAN,
    default_weight  DECIMAL
);

Last-Touch Attribution in Streaming SQL

The most common attribution model—credit goes entirely to the last touchpoint before conversion. Using a 30-day lookback window:

CREATE MATERIALIZED VIEW last_touch_attribution AS
WITH ranked_touchpoints AS (
    SELECT
        c.conversion_id,
        c.user_id,
        c.revenue,
        c.event_time                            AS conversion_time,
        t.campaign_id,
        t.channel,
        t.ad_id,
        t.event_time                            AS touchpoint_time,
        ROW_NUMBER() OVER (
            PARTITION BY c.conversion_id
            ORDER BY t.event_time DESC
        )                                       AS touch_rank
    FROM conversion_events c
    JOIN touchpoint_events t
        ON  c.user_id = t.user_id
        AND t.event_time BETWEEN c.event_time - INTERVAL '30 DAYS'
                             AND c.event_time
)
SELECT
    conversion_id,
    user_id,
    revenue             AS attributed_revenue,
    campaign_id,
    channel,
    ad_id,
    conversion_time,
    touchpoint_time     AS credited_touchpoint_time
FROM ranked_touchpoints
WHERE touch_rank = 1;

Linear Multi-Touch Attribution

Distribute credit equally across all touchpoints in the 30-day lookback window:

CREATE MATERIALIZED VIEW linear_attribution AS
WITH all_touchpoints AS (
    SELECT
        c.conversion_id,
        c.user_id,
        c.revenue,
        c.event_time            AS conversion_time,
        t.campaign_id,
        t.channel,
        t.ad_id,
        t.event_time            AS touchpoint_time,
        COUNT(*) OVER (
            PARTITION BY c.conversion_id
        )                       AS total_touches
    FROM conversion_events c
    JOIN touchpoint_events t
        ON  c.user_id = t.user_id
        AND t.event_time BETWEEN c.event_time - INTERVAL '30 DAYS'
                             AND c.event_time
)
SELECT
    conversion_id,
    user_id,
    campaign_id,
    channel,
    ad_id,
    conversion_time,
    touchpoint_time,
    ROUND(revenue / total_touches, 4)   AS attributed_revenue,
    total_touches
FROM all_touchpoints;

Channel-Level Attribution Rollup

Aggregate attributed revenue by channel in real time for marketing mix reporting:

CREATE MATERIALIZED VIEW channel_attribution_summary AS
SELECT
    channel,
    window_start,
    window_end,
    COUNT(DISTINCT conversion_id)           AS conversions,
    SUM(attributed_revenue)                 AS total_attributed_revenue,
    AVG(attributed_revenue)                 AS avg_order_value
FROM TUMBLE(linear_attribution, conversion_time, INTERVAL '1 HOUR')
GROUP BY channel, window_start, window_end;

Comparison: Attribution Model Approaches

ModelCredit DistributionBest ForStreaming Complexity
Last-touch100% to final touchpointDirect response, retargetingLow
First-touch100% to first touchpointBrand awareness measurementLow
LinearEqual splitFull-journey visibilityMedium
Time-decayMore weight to recent touchesLong sales cyclesMedium
Data-driven (ML)Model-determined weightsLarge datasets, precisionHigh (external ML needed)

Sinking Attribution Data to Downstream Systems

Push attribution results to your CRM or BI layer via JDBC sink:

CREATE SINK attribution_to_crm
FROM channel_attribution_summary
WITH (
    connector = 'jdbc',
    jdbc.url = 'jdbc:postgresql://crm-db:5432/marketing',
    table.name = 'channel_attribution_live',
    type = 'upsert',
    primary_key = 'channel,window_start'
);

FAQ

Q: How does RisingWave handle users with no prior touchpoints? Conversions without matching touchpoint events in the lookback window simply produce no rows in the attribution materialized view for those models. You can add a separate view that tracks unattributed conversions using a LEFT JOIN and WHERE t.event_id IS NULL filter.

Q: Can I switch attribution models without rebuilding the pipeline? Each attribution model is a separate materialized view, so you can run last-touch, first-touch, and linear models simultaneously. Switching which model feeds your reporting system is a query routing change, not a pipeline rebuild.

Q: What is the lookback window limit? There is no hard limit imposed by RisingWave. Longer lookback windows require more state to be maintained in memory. For windows beyond 90 days, consider whether a combination of recent streaming data and historical warehouse data (joined via a CDC-backed table) is more cost-effective.

Q: How accurate is real-time attribution compared to batch? For users who convert quickly (same session or same day), real-time attribution is as accurate as batch. For users with long consideration journeys, both approaches need the same lookback window configuration to be accurate. The difference is timing: streaming gives you results in seconds rather than hours.

Get Started

Build a real-time attribution pipeline that updates the moment conversions happen.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.