Real-Time Viewer Engagement Scoring with RisingWave and DynamoDB

Real-Time Viewer Engagement Scoring with RisingWave and DynamoDB

Every time a viewer opens a streaming app, the recommendation service has a few hundred milliseconds to decide what to show them. It pulls a viewer engagement profile from a low-latency store, passes it to a ranking model, and returns a personalized content list. That profile needs to reflect what the viewer watched yesterday, not last week.

Most teams build this the slow way. A Spark job runs nightly, writes rolling engagement totals to a DynamoDB table, and calls it done. The recommendation service reads stale profiles. Users who binge three episodes Saturday morning get the same recommendations Sunday night as they would have gotten Friday. The engagement signal is there. The pipeline is just too slow to use it.

This post describes a faster path: stream viewer engagement scores continuously from RisingWave into DynamoDB, and serve fresh profiles to your recommendation API with single-digit millisecond reads.

Why Real-Time Engagement Scoring Matters

Viewer engagement is not static. A user who has spent the last two hours watching a documentary series is in a different recommendation context than the same user's 28-day aggregate behavior suggests. Rolling watch time, completion rates, and genre affinity shift as viewing sessions unfold.

A batch pipeline that refreshes once a day discards that intra-day signal entirely. A recommendation service reading from that pipeline will surface content the viewer has already seen or content that no longer matches their current mood.

Real-time engagement scoring closes this gap. As watch events arrive, the engagement profile updates within seconds. When the viewer opens the app four hours into a binge session, the recommendation service sees a profile that reflects those four hours.

Why RisingWave + DynamoDB

RisingWave is a streaming database that ingests event streams, maintains materialized views as watch events arrive, and propagates changes downstream through sink connectors. DynamoDB is a fully managed key-value store with predictable single-digit millisecond read latency.

The two systems handle separate responsibilities:

  • RisingWave handles all stateful computation: rolling aggregates over watch time, completion rate calculations, genre affinity windows, and incremental view maintenance. No Spark clusters. No scheduled jobs.
  • DynamoDB handles low-latency serving. The recommendation API calls GetItem and returns in under 5 milliseconds. It never touches RisingWave at serve time.

The result is sub-second profile freshness with single-digit millisecond read latency, without a separate orchestration layer to maintain.

Architecture Overview

flowchart TD
    A[Kafka\nWatch Event Stream] --> B[RisingWave\nCompute Layer]
    B --> C1[Materialized View\nWatch History Metrics]
    B --> C2[Materialized View\nContent Interactions]
    B --> C3[Materialized View\nRolling Engagement Profile]
    C1 --> C3
    C2 --> C3
    C3 --> D[DynamoDB Sink Connector]
    D --> E[DynamoDB\nViewer Profiles Table]
    E --> F[Recommendation API]

    style A fill:#f5f5f5,stroke:#999
    style B fill:#dbeafe,stroke:#3b82f6
    style C1 fill:#dbeafe,stroke:#3b82f6
    style C2 fill:#dbeafe,stroke:#3b82f6
    style C3 fill:#dbeafe,stroke:#3b82f6
    style D fill:#dcfce7,stroke:#16a34a
    style E fill:#dcfce7,stroke:#16a34a
    style F fill:#f5f5f5,stroke:#999

Watch events arrive in Kafka. RisingWave reads them via source connectors and maintains a set of materialized views, one per engagement domain. The DynamoDB sink connector propagates each row change from the materialized view into DynamoDB automatically. The recommendation API reads from DynamoDB directly.

DynamoDB Modeling: Four Options

How you model viewer profiles in DynamoDB determines write amplification, read ergonomics, item size growth, and how cleanly different teams can own different engagement signals. There are four common approaches. Three of them carry real costs at scale.

OptionPartition KeySort KeyItem Contents
1. Universal wide rowviewer_idlatestAll engagement metrics across all domains
2. One item per metricviewer_idmetric#watch_minutes_7dOne scalar value
3. Per-domain wide rowviewer_idengagement#watchAll metrics for one engagement domain
4. Daily history itemsviewer_iddaily#2026-03-15All metrics for one calendar day

Option 1: One Wide Row Per Viewer

Structure: PK = viewer_id, SK = latest

One DynamoDB item holds every engagement metric for a viewer: watch time totals, completion rates, genre affinities, interaction counts, search behavior.

Pros:

  • Simplest possible read. One GetItem call returns everything the recommendation API needs.
  • No client-side merging.

Cons:

  • Every engagement domain writes to the same item. A new watch event triggers a full item update, even if the interaction and preference signals did not change.
  • Item size grows as you add engagement domains. DynamoDB items are capped at 400 KB; a viewer profile with dozens of metrics across multiple engagement domains reaches that limit faster than expected.
  • Multiple teams writing to the same item creates coordination overhead and risks accidental attribute overwrites.

Option 2: One Item Per Metric

Structure: PK = viewer_id, SK = metric#watch_minutes_7d

One DynamoDB item per scalar engagement metric per viewer.

Pros:

  • Surgical writes. Updating watch_minutes_7d touches exactly one item.
  • Maximum flexibility when adding or removing metrics.

Cons:

  • Assembling a full engagement profile for the recommendation API requires dozens of GetItem calls or a Query scan over a key prefix, followed by client-side assembly.
  • At scale, a single recommendation request generates tens of DynamoDB reads.
  • Higher read capacity unit cost per API call.

This structure works for exploratory analysis. It is not viable for a real-time recommendation serving path.

Structure: PK = viewer_id, SK = engagement#watch (or engagement#interactions, engagement#preferences)

One item holds all metrics for a single engagement domain. A viewer with watch history, interaction, and preference domains has three items in DynamoDB.

Pros:

  • One read returns the complete domain metric bundle. The recommendation API fetches watch history engagement in one GetItem.
  • Domain boundaries match team ownership. The team that owns watch history metrics owns the engagement#watch item. No shared item, no coordination required.
  • Items stay bounded in size. Adding a new watch metric updates only the engagement#watch item.

Cons:

  • Cross-domain reads require multiple GetItem calls. A recommendation model that needs metrics from three domains makes three reads.
  • Clients that need a unified engagement profile must merge domain items.

For most recommendation serving patterns, this is the right trade-off. Ranking models typically consume all metrics from one engagement domain together.

Option 4: Daily History Items

Structure: PK = viewer_id, SK = daily#2026-03-15

One item per viewer per calendar day.

Pros:

  • Natural shape for daily aggregate windows. A per-day watch total maps cleanly to a daily item.
  • Good for historical analysis and offline model training data generation.

Cons:

  • "What are this viewer's current engagement metrics?" requires knowing today's date and querying a specific sort key. A stale cache hit returns yesterday's item.
  • Reading recent history requires multiple GetItem calls by date or a range query.
  • Not the right serving contract for a real-time recommendation API.

This pattern suits a training data store or an analytics layer, not an online serving path.

Why Per-Domain Wide Rows Is the Right Default

The key insight is that the recommendation API rarely needs all engagement domains simultaneously. A content ranking model that scores against watch history needs watch history metrics. A model that scores against social signals needs interaction metrics. The domain is the natural unit of consumption at serve time.

Per-domain wide rows align with this reality. When the recommendation API calls DynamoDB for watch history context, it fetches engagement#watch and gets every watch metric in one round trip. No client-side assembly, no multi-key batch queries, no prefix scans.

The domain boundary also matches team ownership. The team responsible for watch history signals writes to engagement#watch. The team responsible for interaction signals writes to engagement#interactions. These are independent items with independent update paths. Adding a new interaction metric does not require coordinating with the watch history team.

Item size stays bounded. A domain-scoped item grows only when that domain adds new metrics. A universal wide row grows whenever any domain adds a metric.

Building the Pipeline in RisingWave

The full pipeline uses the two-tier aggregation pattern: raw events aggregate into daily tiles (Tier 1), and rolling windows summarize the daily tiles (Tier 2). The rolling summary is the serving contract that gets sinked to DynamoDB.

Step 1: Connect to the Watch Event Stream

CREATE SOURCE view_events (
    event_id         VARCHAR,
    viewer_id        BIGINT,
    content_id       VARCHAR,
    genre            VARCHAR,
    watch_duration_sec INT,
    completion_pct   DECIMAL,
    watched_at       TIMESTAMP
) WITH (
    connector = 'kafka',
    topic = 'view_events',
    properties.bootstrap.server = 'broker:9092',
    scan.startup.mode = 'latest'
) FORMAT PLAIN ENCODE JSON;

FORMAT PLAIN treats each Kafka message as an append-only insert. Each message represents a single viewing session event, not an update to an existing row.

Step 2: Enrich Events and Build Daily Tiles

-- Daily watch metrics per viewer: the stable tile layer
CREATE MATERIALIZED VIEW viewer_watch_metrics_1d AS
SELECT
    viewer_id,
    DATE_TRUNC('day', watched_at)                             AS watch_date,
    SUM(watch_duration_sec) / 60.0                           AS watch_time_minutes,
    COUNT(DISTINCT content_id)                                AS content_started,
    COUNT(CASE WHEN completion_pct >= 0.9 THEN 1 END)        AS completions,
    COUNT(DISTINCT genre)                                     AS genres_watched
FROM view_events
GROUP BY viewer_id, DATE_TRUNC('day', watched_at);

Once a calendar day closes, its tile row is stable. New watch events on subsequent days create new rows without touching past tiles.

Step 3: Rolling Engagement Profile

-- Rolling engagement profile: the serving contract
CREATE MATERIALIZED VIEW viewer_engagement_rolling AS
SELECT
    viewer_id,
    SUM(CASE WHEN watch_date >= CURRENT_DATE - INTERVAL '7 days'
             THEN watch_time_minutes ELSE 0 END)              AS watch_minutes_7d,
    SUM(CASE WHEN watch_date >= CURRENT_DATE - INTERVAL '28 days'
             THEN watch_time_minutes ELSE 0 END)              AS watch_minutes_28d,
    SUM(CASE WHEN watch_date >= CURRENT_DATE - INTERVAL '7 days'
             THEN content_started ELSE 0 END)                 AS titles_7d,
    SUM(CASE WHEN watch_date >= CURRENT_DATE - INTERVAL '28 days'
             THEN completions ELSE 0 END)                     AS completions_28d,
    SUM(CASE WHEN watch_date >= CURRENT_DATE - INTERVAL '28 days'
             THEN genres_watched ELSE 0 END)                  AS genres_watched_28d
FROM viewer_watch_metrics_1d
GROUP BY viewer_id;

This view holds one row per viewer. Every column is a pre-computed rolling engagement metric. RisingWave maintains it incrementally as new watch events arrive and as the calendar date advances.

Step 4: Content Interactions Domain

-- Separate domain: interaction signals
CREATE MATERIALIZED VIEW content_interactions_rolling AS
SELECT
    viewer_id,
    COUNT(CASE WHEN event_type = 'like'   THEN 1 END) AS likes_28d,
    COUNT(CASE WHEN event_type = 'share'  THEN 1 END) AS shares_28d,
    COUNT(CASE WHEN event_type = 'search' THEN 1 END) AS searches_28d
FROM content_interactions
WHERE event_ts >= CURRENT_TIMESTAMP - INTERVAL '28 days'
GROUP BY viewer_id;

Step 5: DynamoDB Sinks

-- Sink the rolling watch engagement profile to DynamoDB
CREATE SINK viewer_engagement_sink
FROM viewer_engagement_rolling
WITH (
    connector   = 'dynamodb',
    table       = 'viewer_profiles',
    region      = 'us-east-1',
    primary_key = 'viewer_id'
);

-- Sink the interaction domain to the same table, different sort key
CREATE SINK viewer_interactions_sink
FROM content_interactions_rolling
WITH (
    connector   = 'dynamodb',
    table       = 'viewer_profiles',
    region      = 'us-east-1',
    primary_key = 'viewer_id'
);

The sinks write the latest row per viewer_id to DynamoDB. As new watch events update the materialized views, the sinks propagate those changes automatically. No cron job, no Spark job, no Lambda trigger.

The item written to DynamoDB for the watch engagement domain looks like this:

{
    "viewer_id":         { "N": "8821043" },
    "sk":                { "S": "engagement#watch" },
    "watch_minutes_7d":  { "N": "312.5" },
    "watch_minutes_28d": { "N": "980.0" },
    "titles_7d":         { "N": "14" },
    "completions_28d":   { "N": "31" },
    "genres_watched_28d":{ "N": "6" }
}

The Serving Contract Rule

Sink from the rolling summary view, not from the daily tile view.

The daily tile is an internal aggregation layer. It exists to make rolling window computation efficient. Its rows carry a watch_date column as part of the primary key, which means DynamoDB would hold one row per viewer per day instead of a single current-state profile per viewer. That is not useful for a recommendation API that needs to read the viewer's current engagement state in one call.

The rolling summary is the serving contract. It carries one row per viewer with current-state metric values. When you add a new engagement domain, create a separate rolling summary view and a separate sink. The recommendation API fetches by domain sort key.

FAQ

Do I need to configure the sort key manually in the DynamoDB sink?

The RisingWave DynamoDB connector uses the primary_key field to set the partition key. To write per-domain rows with a specific sort key pattern such as engagement#watch, add a computed column to your materialized view and include it in the primary_key specification. Adding 'engagement#watch' AS domain_key to the rolling view and specifying primary_key = 'viewer_id,domain_key' produces the per-domain row structure described above.

What is the engagement profile freshness guarantee?

End-to-end freshness depends on Kafka consumer lag in RisingWave and DynamoDB sink propagation latency. In a well-provisioned setup, a viewer watch event flows from Kafka to an updated DynamoDB item in under five seconds. For a recommendation service polling viewer profiles, this is sufficient to capture intra-session behavior shifts.

Can I sink multiple engagement domains to the same DynamoDB table?

Yes. Use the same table parameter in each sink definition but use distinct sort key values per domain. The viewer_profiles table can hold engagement#watch, engagement#interactions, and engagement#preferences rows for the same viewer. This is the per-domain wide row pattern applied to a single DynamoDB table.

How does this handle late-arriving watch events?

The daily tile layer uses DATE_TRUNC('day', watched_at) as the grouping key, not a watermark boundary. A late-arriving watch event updates the correct daily tile retroactively, which propagates through to the rolling profile and then to DynamoDB. If you need to bound acceptable event lateness, configure watermarks on your source to control how late data is handled.

What if the recommendation API needs metrics from two engagement domains?

Issue two GetItem calls in parallel. DynamoDB GetItem is fast enough that two parallel reads add negligible latency. At P99, two parallel GetItem calls typically complete in under 15 milliseconds. For three or more domains, use BatchGetItem.

Can I use the same data for offline model training?

Yes. Create an additional sink from viewer_watch_metrics_1d (the daily tile view) to an Apache Iceberg table or S3. The daily tile rows accumulate over time and provide the historical engagement values needed for training data generation. The online recommendation API reads from DynamoDB; the offline training pipeline reads from Iceberg. Both derive from the same RisingWave materialized views.

Conclusion

A real-time viewer engagement pipeline built on RisingWave and DynamoDB does not require scheduled jobs or dedicated compute clusters. RisingWave maintains engagement materialized views incrementally as watch events arrive. The DynamoDB sink connector propagates changes continuously. The recommendation API reads from DynamoDB and gets an engagement profile that reflects viewing behavior from seconds ago, not last night's batch run.

The DynamoDB modeling decision shapes everything downstream. Per-domain wide rows are the right default for recommendation serving: one read returns the complete domain engagement bundle, items stay bounded in size, and ownership stays clear across teams. Start with domain-scoped items and handle cross-domain composition at the application layer only when you need it.


Try it on RisingWave Cloud. Free tier available, no credit card required. Sign up here and connect a Kafka watch event stream to a DynamoDB sink in minutes.

Join the RisingWave Slack community to ask questions, share what you are building, and see how other data engineering teams are using streaming SQL for real-time personalization pipelines.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.