Real-Time Email Marketing Triggers with Streaming SQL

Real-Time Email Marketing Triggers with Streaming SQL

Real-time email marketing triggers fire automated emails within seconds of a user action—cart abandonment, price drop on a viewed item, inactivity after a trial signup—by using streaming SQL to detect behavioral patterns as they happen. RisingWave maintains continuously updated trigger conditions as materialized views, replacing polling-based automation platforms with an event-driven pipeline.

Why Timing Is the Most Important Variable in Trigger Email

A cart abandonment email sent 5 minutes after abandonment converts at 3–5x the rate of one sent an hour later. A welcome series that starts within 60 seconds of signup outperforms a 4-hour delayed version by a wide margin in both open rates and activation metrics.

Most marketing automation platforms trigger emails through polling loops: check who qualifies for a trigger every 5–15 minutes and send. That's not real-time—it's batched at short intervals. The delay costs conversions.

RisingWave replaces the polling loop with a streaming SQL layer that detects trigger conditions as events arrive. The moment a qualifying event pattern is detected, the trigger fires.

Architecture: Event-Driven Email Triggers

The trigger pipeline has three components:

  1. Event ingestion — behavioral events from web, app, and transaction systems via Kafka
  2. Trigger detection — materialized views that continuously evaluate trigger conditions
  3. Trigger dispatch — sink connectors that push trigger records to your email sending platform

Your email platform (Braze, Iterable, Postmark, SendGrid) receives trigger events from Kafka or a JDBC-connected database and dispatches the email. RisingWave handles the "when to trigger" logic; the email platform handles the "what to send" rendering and delivery.

Ingesting Behavioral Events

CREATE SOURCE user_behavior_events (
    event_id        VARCHAR,
    user_id         VARCHAR,
    session_id      VARCHAR,
    event_type      VARCHAR,
    product_id      VARCHAR,
    cart_value      DECIMAL,
    page_url        VARCHAR,
    event_time      TIMESTAMPTZ
)
WITH (
    connector = 'kafka',
    topic = 'user-behavior-events',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;

CREATE SOURCE email_send_events (
    send_id         VARCHAR,
    user_id         VARCHAR,
    trigger_type    VARCHAR,
    sent_at         TIMESTAMPTZ
)
WITH (
    connector = 'kafka',
    topic = 'email-sends',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'latest'
)
FORMAT PLAIN ENCODE JSON;

Store email suppression and frequency control rules:

CREATE TABLE email_suppression (
    user_id         VARCHAR PRIMARY KEY,
    reason          VARCHAR,
    suppressed_at   TIMESTAMPTZ
);

CREATE TABLE trigger_cooldowns (
    trigger_type    VARCHAR PRIMARY KEY,
    cooldown_hours  INTEGER
);

Cart Abandonment Trigger

Detect users who added to cart but haven't purchased within the session, using session windows:

CREATE MATERIALIZED VIEW cart_abandonment_triggers AS
SELECT
    user_id,
    window_start    AS session_start,
    window_end      AS session_end,
    MAX(cart_value) FILTER (WHERE event_type = 'add_to_cart')   AS abandoned_cart_value,
    COUNT(*) FILTER (WHERE event_type = 'add_to_cart')          AS items_added,
    COUNT(*) FILTER (WHERE event_type = 'purchase')             AS purchases_in_session,
    MAX(event_time) FILTER (WHERE event_type = 'add_to_cart')   AS cart_add_time
FROM SESSION(user_behavior_events, event_time, '30 MINUTES', user_id)
GROUP BY user_id, window_start, window_end
HAVING
    COUNT(*) FILTER (WHERE event_type = 'add_to_cart') > 0
    AND COUNT(*) FILTER (WHERE event_type = 'purchase') = 0;

Browse Abandonment and Re-Engagement Triggers

Identify users who browsed products without adding to cart, enabling top-of-funnel recovery emails:

CREATE MATERIALIZED VIEW browse_abandonment_triggers AS
SELECT
    user_id,
    window_start,
    window_end,
    COUNT(*) FILTER (WHERE event_type = 'product_view')         AS products_viewed,
    COUNT(*) FILTER (WHERE event_type = 'add_to_cart')          AS items_added,
    COUNT(DISTINCT product_id)                                   AS unique_products,
    MAX(event_time)                                              AS last_activity
FROM TUMBLE(user_behavior_events, event_time, INTERVAL '1 HOUR')
GROUP BY user_id, window_start, window_end
HAVING
    COUNT(*) FILTER (WHERE event_type = 'product_view') >= 3
    AND COUNT(*) FILTER (WHERE event_type = 'add_to_cart') = 0;

Track per-user trigger send history to enforce cooldown periods:

CREATE MATERIALIZED VIEW recent_trigger_sends AS
SELECT
    user_id,
    trigger_type,
    window_start,
    window_end,
    COUNT(*) AS sends_in_window
FROM TUMBLE(email_send_events, sent_at, INTERVAL '24 HOURS')
GROUP BY user_id, trigger_type, window_start, window_end;

Sending Triggers to Your Email Platform

Push detected triggers to Kafka for your email platform to consume:

CREATE SINK cart_abandonment_email_sink
FROM cart_abandonment_triggers
WITH (
    connector = 'kafka',
    topic = 'email-triggers-cart-abandonment',
    properties.bootstrap.server = 'kafka:9092'
)
FORMAT PLAIN ENCODE JSON;

Or write directly to a trigger queue in your marketing database:

CREATE SINK browse_abandonment_to_db
FROM browse_abandonment_triggers
WITH (
    connector = 'jdbc',
    jdbc.url = 'jdbc:postgresql://marketing-db:5432/email',
    table.name = 'pending_triggers',
    type = 'append-only'
);

Comparison: Email Trigger Platforms

CapabilityESP Built-In AutomationCustom Batch JobStreaming SQL (RisingWave)
Trigger latency5–15 min (polling)15–60 minSeconds
Custom trigger logicLimited (UI-based rules)Flexible (code)Flexible (SQL)
Multi-event pattern detectionBasicManual codingNative
Frequency cap enforcementPlatform-limitedCustom codeSQL views
ScalabilityPlatform plan limitsInfrastructure costHorizontally scalable
Real-time suppression updatesDelayed syncManualCDC-backed

FAQ

Q: How does RisingWave prevent duplicate trigger emails? The cooldown materialized view tracks recent sends per user per trigger type. Before writing to the trigger queue, join your trigger output against the cooldown view using a LEFT JOIN and filter out users who have received the same trigger within the cooldown period. The cooldown view updates in real time as new sends are recorded.

Q: Can I use RisingWave to trigger transactional emails (order confirmations, shipping updates)? Yes. Transactional triggers are simpler than behavioral triggers—a single event (order placed, shipment created) fires the trigger directly. Ingest the transaction event from Kafka or CDC, create a materialized view that passes through the event with enriched data (customer name, order details from a joined table), and sink to your email platform.

Q: How do I handle users who convert between the trigger detection and send time? Create a real-time suppression check: before dispatching, query a "recent_purchasers" materialized view to verify the user hasn't converted since the trigger was detected. Most email platforms support this as a pre-send API check or via a continuously updated suppression list fed by RisingWave.

Q: What is the minimum trigger latency achievable with RisingWave? RisingWave processes events with sub-second internal latency. End-to-end trigger latency (event arrives → email sent) depends on the downstream email platform. Most modern platforms can dispatch within 5–30 seconds of receiving a trigger from Kafka, giving total end-to-end latency under 60 seconds.

Get Started

Build event-driven email triggers that fire in seconds, not minutes.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.