Clickstream Processing Pipeline with Kafka and Streaming SQL

Clickstream Processing Pipeline with Kafka and Streaming SQL

Every e-commerce team wants to know what users are doing right now: which pages they're viewing, where they're dropping off, and which sessions are on the verge of converting. Traditional analytics pipelines run hourly or nightly, so by the time the report lands, the user who abandoned their cart is long gone. A real-time clickstream processing pipeline closes that gap.

This tutorial builds a complete clickstream processing pipeline with Kafka and streaming SQL. You will ingest raw browser events from Apache Kafka, store them in RisingWave, and define materialized views that continuously compute session summaries, funnel stages, page traffic, and drop-off classifications. The views update within milliseconds of each new event, with no scheduled jobs, no re-processing, and no separate serving layer required.

All SQL in this article has been verified against RisingWave 2.8.0.

Architecture: Kafka to Materialized Views

A production clickstream processing pipeline with Kafka typically follows this shape:

flowchart LR
    A[Browser / Mobile App] -->|Raw events| B[Apache Kafka]
    B -->|Streaming source| C[RisingWave]
    C -->|Materialized views| D[Dashboards / APIs]
    C -->|Sinks| E[Alerting / CRM]

The browser emits a JSON event for every user action: page views, clicks, add-to-cart events, and purchases. Those events land in a Kafka topic. RisingWave connects to the topic as a source, ingests the stream, and maintains a set of materialized views that continuously compute analytics. Downstream dashboards and services query RisingWave directly through its PostgreSQL-compatible interface.

The key architectural advantage is that RisingWave serves as both the stream processor and the query engine. You do not need to write Flink jobs, export results to a separate database, and then query that database. One system handles the full pipeline.

Step 1: Define the Kafka Source

In a production deployment, you create a RisingWave source that connects directly to your Kafka topic. The source definition maps the JSON schema of your clickstream events to SQL columns:

CREATE SOURCE click_kafka_source (
    event_id     VARCHAR,
    user_id      VARCHAR,
    session_id   VARCHAR,
    event_type   VARCHAR,
    page_url     VARCHAR,
    referrer_url VARCHAR,
    device_type  VARCHAR,
    country      VARCHAR,
    event_ts     TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'clickstream-events',
    properties.bootstrap.server = 'your-kafka-broker:9092',
    scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;

RisingWave will begin consuming events from the earliest available offset and keep the source continuously updated. For full details on Kafka source options such as SASL authentication, SSL, and consumer group configuration, see the RisingWave Kafka source documentation.

For the hands-on examples below, we use a regular table with the same schema. The materialized views you build on top are identical regardless of whether the underlying source is a table or a Kafka topic.

Step 2: Create the Events Table

CREATE TABLE click_events (
    event_id     VARCHAR,
    user_id      VARCHAR,
    session_id   VARCHAR,
    event_type   VARCHAR,
    page_url     VARCHAR,
    referrer_url VARCHAR,
    device_type  VARCHAR,
    country      VARCHAR,
    event_ts     TIMESTAMPTZ
);

In production, replace this CREATE TABLE with the CREATE SOURCE from Step 1. Everything else in this tutorial stays exactly the same.

Step 3: Load Sample Clickstream Data

The sample data represents six user sessions moving through an e-commerce funnel. Some sessions complete a purchase, some abandon mid-funnel, and some browse without converting.

INSERT INTO click_events VALUES
('ev001','u1','s1001','page_view',   '/home',              'https://google.com',   'desktop','US','2026-04-01 09:00:00+00'),
('ev002','u1','s1001','page_view',   '/products',          '/home',                'desktop','US','2026-04-01 09:01:10+00'),
('ev003','u1','s1001','page_view',   '/products/pro-plan', '/products',            'desktop','US','2026-04-01 09:02:30+00'),
('ev004','u1','s1001','add_to_cart', '/products/pro-plan', '/products/pro-plan',   'desktop','US','2026-04-01 09:03:00+00'),
('ev005','u1','s1001','page_view',   '/checkout',          '/products/pro-plan',   'desktop','US','2026-04-01 09:03:45+00'),
('ev006','u1','s1001','purchase',    '/checkout/confirm',  '/checkout',            'desktop','US','2026-04-01 09:05:20+00'),

('ev007','u2','s1002','page_view',   '/home',              'https://twitter.com',  'mobile', 'UK','2026-04-01 09:00:30+00'),
('ev008','u2','s1002','page_view',   '/products',          '/home',                'mobile', 'UK','2026-04-01 09:02:00+00'),
('ev009','u2','s1002','page_view',   '/products/pro-plan', '/products',            'mobile', 'UK','2026-04-01 09:03:30+00'),
('ev010','u2','s1002','add_to_cart', '/products/pro-plan', '/products/pro-plan',   'mobile', 'UK','2026-04-01 09:04:00+00'),

('ev011','u3','s1003','page_view',   '/home',              'https://google.com',   'desktop','DE','2026-04-01 09:01:00+00'),
('ev012','u3','s1003','page_view',   '/pricing',           '/home',                'desktop','DE','2026-04-01 09:02:30+00'),
('ev013','u3','s1003','page_view',   '/docs',              '/pricing',             'desktop','DE','2026-04-01 09:05:00+00'),

('ev014','u4','s1004','page_view',   '/home',              'https://linkedin.com', 'tablet', 'US','2026-04-01 09:00:00+00'),
('ev015','u4','s1004','page_view',   '/products',          '/home',                'tablet', 'US','2026-04-01 09:01:00+00'),
('ev016','u4','s1004','page_view',   '/products/pro-plan', '/products',            'tablet', 'US','2026-04-01 09:02:00+00'),
('ev017','u4','s1004','page_view',   '/checkout',          '/products/pro-plan',   'tablet', 'US','2026-04-01 09:03:00+00'),

('ev018','u5','s1005','page_view',   '/home',              'https://google.com',   'mobile', 'JP','2026-04-01 09:02:00+00'),
('ev019','u5','s1005','page_view',   '/products',          '/home',                'mobile', 'JP','2026-04-01 09:03:30+00'),

('ev020','u6','s1006','page_view',   '/home',              'https://facebook.com', 'desktop','US','2026-04-01 09:05:00+00'),
('ev021','u6','s1006','page_view',   '/products',          '/home',                'desktop','US','2026-04-01 09:06:00+00'),
('ev022','u6','s1006','page_view',   '/products/pro-plan', '/products',            'desktop','US','2026-04-01 09:07:00+00'),
('ev023','u6','s1006','add_to_cart', '/products/pro-plan', '/products/pro-plan',   'desktop','US','2026-04-01 09:07:30+00'),
('ev024','u6','s1006','page_view',   '/checkout',          '/products/pro-plan',   'desktop','US','2026-04-01 09:08:00+00'),
('ev025','u6','s1006','purchase',    '/checkout/confirm',  '/checkout',            'desktop','US','2026-04-01 09:09:00+00');

Step 4: Build Session Tracking with a Materialized View

The first thing most product teams want from a clickstream pipeline is per-session metrics: how long did the session last, how many pages did the user view, did they add anything to their cart?

This materialized view computes all of that continuously:

CREATE MATERIALIZED VIEW click_mv_session_summary AS
SELECT
    session_id,
    user_id,
    device_type,
    country,
    MIN(event_ts)                                          AS session_start,
    MAX(event_ts)                                          AS session_end,
    EXTRACT(EPOCH FROM MAX(event_ts) - MIN(event_ts))      AS duration_seconds,
    COUNT(*)                                               AS total_events,
    COUNT(*) FILTER (WHERE event_type = 'page_view')       AS page_views,
    COUNT(*) FILTER (WHERE event_type = 'add_to_cart')     AS add_to_carts,
    COUNT(*) FILTER (WHERE event_type = 'purchase')        AS purchases
FROM click_events
GROUP BY session_id, user_id, device_type, country;

A few things worth noting in this query. The FILTER (WHERE ...) clause is a PostgreSQL-standard conditional aggregation syntax that RisingWave fully supports. It is more readable than SUM(CASE WHEN ...) and performs equally well. The EXTRACT(EPOCH FROM ...) call converts the interval between first and last event into a plain number of seconds, which is easier to work with downstream.

SELECT * FROM click_mv_session_summary ORDER BY session_id;
 session_id | user_id | device_type | country |       session_start       |        session_end        | duration_seconds | total_events | page_views | add_to_carts | purchases
------------+---------+-------------+---------+---------------------------+---------------------------+------------------+--------------+------------+--------------+-----------
 s1001      | u1      | desktop     | US      | 2026-04-01 09:00:00+00:00 | 2026-04-01 09:05:20+00:00 |       320.000000 |            6 |          4 |            1 |         1
 s1002      | u2      | mobile      | UK      | 2026-04-01 09:00:30+00:00 | 2026-04-01 09:04:00+00:00 |       210.000000 |            4 |          3 |            1 |         0
 s1003      | u3      | desktop     | DE      | 2026-04-01 09:01:00+00:00 | 2026-04-01 09:05:00+00:00 |       240.000000 |            3 |          3 |            0 |         0
 s1004      | u4      | tablet      | US      | 2026-04-01 09:00:00+00:00 | 2026-04-01 09:03:00+00:00 |       180.000000 |            4 |          4 |            0 |         0
 s1005      | u5      | mobile      | JP      | 2026-04-01 09:02:00+00:00 | 2026-04-01 09:03:30+00:00 |        90.000000 |            2 |          2 |            0 |         0
 s1006      | u6      | desktop     | US      | 2026-04-01 09:05:00+00:00 | 2026-04-01 09:09:00+00:00 |       240.000000 |            6 |          4 |            1 |         1

Sessions s1001 and s1006 completed purchases. Session s1002 added to cart but did not buy. Sessions s1003, s1004, and s1005 never reached the cart. Because this is a materialized view, RisingWave recomputes only the affected rows when new events arrive. A high-throughput Kafka topic with thousands of events per second does not cause a full table scan on each update.

You can query this view to break down conversion rates by device:

SELECT
    device_type,
    COUNT(*)                                             AS total_sessions,
    COUNT(*) FILTER (WHERE purchases > 0)               AS converted_sessions,
    ROUND(
        COUNT(*) FILTER (WHERE purchases > 0) * 100.0
        / COUNT(*),
        1
    )                                                    AS conversion_rate_pct
FROM click_mv_session_summary
GROUP BY device_type
ORDER BY total_sessions DESC;
 device_type | total_sessions | converted_sessions | conversion_rate_pct
-------------+----------------+--------------------+---------------------
 desktop     |              3 |                  2 |                66.7
 mobile      |              2 |                  0 |                   0
 tablet      |              1 |                  0 |                   0

Desktop users are converting at 66.7% in this sample. Mobile and tablet sessions browse but do not buy, which is a common pattern that points to checkout UX issues on smaller screens.

Step 5: Build Funnel Analysis

Funnel analysis answers the most important conversion question: at each step of the purchase path, how many sessions make it to the next step?

The view below computes a six-stage e-commerce funnel. The inner CTE flags each session as having reached a given stage using bool_or. The outer query counts how many sessions achieved each flag.

CREATE MATERIALIZED VIEW click_mv_funnel_analysis AS
WITH session_flags AS (
    SELECT
        session_id,
        bool_or(page_url = '/home')                                          AS saw_home,
        bool_or(page_url = '/products')                                      AS saw_products,
        bool_or(page_url LIKE '/products/%' AND page_url <> '/products')     AS saw_product_detail,
        bool_or(event_type = 'add_to_cart')                                  AS added_to_cart,
        bool_or(page_url = '/checkout')                                      AS reached_checkout,
        bool_or(event_type = 'purchase')                                     AS converted
    FROM click_events
    GROUP BY session_id
)
SELECT
    COUNT(*)                                       AS total_sessions,
    COUNT(*) FILTER (WHERE saw_home)               AS step_home,
    COUNT(*) FILTER (WHERE saw_products)           AS step_products,
    COUNT(*) FILTER (WHERE saw_product_detail)     AS step_product_detail,
    COUNT(*) FILTER (WHERE added_to_cart)          AS step_add_to_cart,
    COUNT(*) FILTER (WHERE reached_checkout)       AS step_checkout,
    COUNT(*) FILTER (WHERE converted)              AS step_purchase
FROM session_flags;

bool_or is a Boolean aggregate that returns true if any row in the group satisfies the condition. It is the cleanest way to flag whether a session ever reached a particular state, regardless of how many events that session produced.

SELECT * FROM click_mv_funnel_analysis;
 total_sessions | step_home | step_products | step_product_detail | step_add_to_cart | step_checkout | step_purchase
----------------+-----------+---------------+---------------------+------------------+---------------+---------------
              6 |         6 |             5 |                   4 |                3 |             3 |             2

Reading the funnel: all 6 sessions started at home. Five progressed to the products listing (83%). Four clicked into a product detail page (67%). Three added to cart (50%). Three reached checkout (50%). Two completed a purchase (33%).

The biggest single drop-off is between products listing and product detail: 5 sessions viewed the products page but only 4 clicked into a specific product. That 20% drop points to a catalog browsing problem, not a checkout problem. Teams that only look at cart abandonment miss this upstream signal entirely.

Watching the Funnel Update in Real Time

To see incremental maintenance in action, insert two more events: the mobile user from session s1002 proceeds to checkout and completes a purchase.

INSERT INTO click_events VALUES
('ev026','u2','s1002','page_view','/checkout','/products/pro-plan','mobile','UK','2026-04-01 09:05:00+00'),
('ev027','u2','s1002','purchase', '/checkout/confirm','/checkout',  'mobile','UK','2026-04-01 09:06:30+00');

Query the funnel again immediately:

SELECT * FROM click_mv_funnel_analysis;
 total_sessions | step_home | step_products | step_product_detail | step_add_to_cart | step_checkout | step_purchase
----------------+-----------+---------------+---------------------+------------------+---------------+---------------
              6 |         6 |             5 |                   4 |                3 |             4 |             3

step_checkout moved from 3 to 4, and step_purchase moved from 2 to 3. RisingWave updated only the rows affected by the two new events. No full scan, no recomputation of unrelated sessions.

Step 6: Track Page Traffic in Real Time

Understanding which pages attract the most traffic across all sessions helps prioritize engineering and content investment.

CREATE MATERIALIZED VIEW click_mv_page_traffic AS
SELECT
    page_url,
    COUNT(*)                    AS total_views,
    COUNT(DISTINCT user_id)     AS unique_users,
    COUNT(DISTINCT session_id)  AS unique_sessions
FROM click_events
WHERE event_type = 'page_view'
GROUP BY page_url;
SELECT * FROM click_mv_page_traffic ORDER BY total_views DESC;
      page_url      | total_views | unique_users | unique_sessions
--------------------+-------------+--------------+-----------------
 /home              |           6 |            6 |               6
 /products          |           5 |            5 |               5
 /products/pro-plan |           4 |            4 |               4
 /checkout          |           3 |            3 |               3
 /docs              |           1 |            1 |               1
 /pricing           |           1 |            1 |               1

In a real clickstream pipeline, this view would update continuously as Kafka delivers new events. A product or engineering team monitoring this view during a launch would see page traffic fluctuate in real time, without running a query against the raw event table.

Step 7: Classify Drop-Offs and Track Conversion Outcomes

The final view identifies what happened to each non-converting session. Did the user add to cart but not buy? Did they reach checkout and bail? Did they browse without engaging at all?

CREATE MATERIALIZED VIEW click_mv_conversion_metrics AS
WITH base AS (
    SELECT
        session_id,
        user_id,
        device_type,
        country,
        bool_or(event_type = 'add_to_cart')  AS added_to_cart,
        bool_or(event_type = 'purchase')      AS purchased,
        MAX(CASE WHEN event_type = 'page_view' THEN page_url END) AS last_page_viewed
    FROM click_events
    GROUP BY session_id, user_id, device_type, country
)
SELECT
    session_id,
    user_id,
    device_type,
    country,
    CASE
        WHEN purchased                              THEN 'converted'
        WHEN added_to_cart AND NOT purchased        THEN 'cart_abandonment'
        WHEN last_page_viewed = '/checkout'
             AND NOT added_to_cart                  THEN 'checkout_abandonment'
        ELSE                                             'browse_abandonment'
    END AS outcome
FROM base
WHERE NOT purchased;
SELECT session_id, user_id, device_type, country, outcome
FROM click_mv_conversion_metrics
ORDER BY session_id;
 session_id | user_id | device_type | country |       outcome
------------+---------+-------------+---------+--------------------
 s1003      | u3      | desktop     | DE      | browse_abandonment
 s1004      | u4      | tablet      | US      | browse_abandonment
 s1005      | u5      | mobile      | JP      | browse_abandonment

After the additional events we inserted in Step 5, session s1002 no longer appears here because it converted. The view automatically removed it when the purchase event arrived. The three remaining non-converting sessions are all classified as browse abandonment: they explored pages but never added anything to their cart.

In a production system, you would push changes from this view downstream using a RisingWave sink. When a new cart_abandonment row appears, a Kafka sink can deliver a message to a notification service within seconds. That service triggers a re-engagement email or push notification while the user is still warm, not 24 hours later when a batch job finally surfaces the event.

Connecting the Pipeline to Kafka: Production Setup

The CREATE TABLE approach above is suitable for development and testing. In production, swap it for a source connected to your Kafka cluster. The full DDL for a production source with a schema registry would look like this:

-- Using Confluent Schema Registry with Avro encoding
CREATE SOURCE click_events_source (
    event_id     VARCHAR,
    user_id      VARCHAR,
    session_id   VARCHAR,
    event_type   VARCHAR,
    page_url     VARCHAR,
    referrer_url VARCHAR,
    device_type  VARCHAR,
    country      VARCHAR,
    event_ts     TIMESTAMPTZ
) WITH (
    connector = 'kafka',
    topic = 'clickstream-events',
    properties.bootstrap.server = 'broker1:9092,broker2:9092',
    scan.startup.mode = 'latest',
    properties.sasl.mechanism = 'PLAIN',
    properties.security.protocol = 'SASL_SSL',
    properties.sasl.username = 'your-api-key',
    properties.sasl.password = 'your-api-secret'
) FORMAT PLAIN ENCODE JSON;

Once this source exists, create your materialized views on top of click_events_source instead of click_events. Everything else in this tutorial applies unchanged.

RisingWave also supports Avro and Protobuf encoding if your Kafka topics use a schema registry. See the Ingest from Kafka documentation for the full list of options.

Clickstream processing pipelines built with Apache Flink or Spark Structured Streaming work well, but they introduce operational costs that compound as the pipeline grows.

DimensionSpark StreamingApache FlinkRisingWave
Query languageSpark SQL + Python/ScalaFlink SQL + JavaStandard PostgreSQL SQL
State managementExternal (checkpoints)Built-in, manual tuningBuilt-in, automatic
Separate serving layerRequiredRequiredNot required
JVM operational overheadHighHighNone
Funnel analysis complexityMedium (manual state)Medium (custom operators)Low (SQL aggregation)

With Flink, you write a Flink SQL job or a Java/Scala application, manage checkpoints, tune parallelism, and then export results to a PostgreSQL database or a data warehouse so dashboards can query them. That is three systems to operate: Kafka, Flink, and a serving database.

With RisingWave, you write SQL against a PostgreSQL-compatible interface. Materialized views handle the incremental computation. Grafana, Metabase, Superset, and any other PostgreSQL-compatible BI tool can connect directly to query results. One system handles ingestion, processing, and serving.

For a deeper comparison of the two approaches, see Apache Flink vs RisingWave: a streaming SQL comparison.

FAQ

How does RisingWave handle late-arriving clickstream events from mobile devices?

Mobile users frequently lose connectivity and re-establish it minutes later, which means events can arrive out of order. RisingWave handles this through its incremental computation model: when a late event arrives and is inserted into the source table or Kafka topic, all affected materialized views update automatically. If a mobile user's add_to_cart event arrives two minutes late, the session summary view will update the add_to_carts count, and the conversion metrics view will reclassify that session from browse abandonment to cart abandonment.

For use cases that require strict watermark-based windowing, such as tumbling or sliding window aggregations where late data should be discarded after a cutoff, RisingWave supports temporal filters and watermarks. You can define how long the system waits for late data before finalizing a window.

Can I sessionize users without a pre-defined session_id field?

Yes. If your raw clickstream events do not include a session identifier, you can derive sessions using gap-based sessionization: group consecutive events from the same user where no two adjacent events are more than 30 minutes apart. In RisingWave, this requires a window function approach. The standard pattern is to assign a new session boundary whenever the gap between event_ts and the previous event from the same user exceeds your inactivity threshold, then generate a session key from the user ID and the session boundary timestamp.

The session_id approach used in this tutorial is simpler and works well when your event collection layer already assigns session tokens (which is the case for most modern analytics SDKs).

How does this pipeline scale to billions of events per day?

RisingWave is designed for production-scale workloads. Its compute nodes process stream operators in parallel, and its storage layer uses a shared-storage architecture (backed by S3-compatible object storage) that separates compute from storage. You can scale compute nodes independently of storage, and the Kafka connector partitions consumption across available compute nodes.

For clickstream analytics specifically, the materialized views partition state by group key: each session_id lives on a specific node, so adding compute nodes expands both throughput and session capacity proportionally. Teams processing billions of events per day run RisingWave in production. RisingWave Cloud handles scaling automatically if you prefer a managed deployment.

Can I connect this pipeline directly to a BI tool like Grafana or Metabase?

Yes, directly. RisingWave exposes a PostgreSQL-compatible wire protocol on port 4566. Any tool that can connect to PostgreSQL, including Grafana, Metabase, Superset, Redash, Tableau, and custom applications using psycopg2 or the node-postgres client, can query your materialized views. There is no export step, no ETL to a separate database, and no caching layer needed. The materialized views are the serving layer.

For Grafana specifically, you add RisingWave as a PostgreSQL data source using the hostname, port 4566, username, and database name. From there, you build dashboards that query click_mv_funnel_analysis, click_mv_session_summary, and click_mv_page_traffic like ordinary database tables.

Conclusion

A clickstream processing pipeline with Kafka and streaming SQL gives you the real-time visibility that batch pipelines cannot provide. The approach covered in this tutorial replaces a multi-system architecture (Kafka + Flink + PostgreSQL + BI cache) with a single streaming database that handles ingestion, incremental computation, and query serving in one place.

The four materialized views you built cover the core analytics primitives:

  • Session tracking (click_mv_session_summary): continuously aggregates per-session metrics including duration, page views, and purchase events
  • Funnel analysis (click_mv_funnel_analysis): flags each session by funnel stage and counts how many sessions reach each step
  • Page traffic (click_mv_page_traffic): tracks page-level view counts and unique visitor metrics
  • Conversion outcomes (click_mv_conversion_metrics): classifies non-converting sessions by drop-off type for re-engagement targeting

The SQL patterns here, bool_or for session flags, FILTER for conditional aggregation, and EXTRACT(EPOCH FROM ...) for durations, are composable building blocks. You can extend them to attribution modeling, cohort analysis, A/B test evaluation, and any other analytics your team needs. For a broader look at how streaming SQL handles time-based windowing patterns, see the windowing in stream processing guide.


Ready to build your own clickstream pipeline? Start with RisingWave Cloud, no credit card required, or install locally with brew install risingwave.

Join the RisingWave Slack community to ask questions and connect with other data engineers building real-time pipelines.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.