Building Event-Driven Services with RisingWave Subscriptions

Building Event-Driven Services with RisingWave Subscriptions

Whether you're working with source data ingested into tables or the continuously updated results in RisingWave’s materialized views, a common requirement is to stream these data changes out to drive downstream systems. This could be a notification service, a monitoring dashboard, or any other data-driven application. While this is often handled by an intermediary message queue like Kafka, this approach adds architectural complexity and operational overhead.

A more direct, native solution for this use case is RisingWave Subscriptions. This powerful feature provides a persistent way to stream data changes directly from any table or materialized view to a client application, removing the need for an external message broker. In this post, we’ll dive deep into how subscriptions work and how you can use them to build robust, event-driven applications.

SELECT vs. Subscriptions: From Polling Snapshots to Streaming Changes

To understand why subscriptions are a more effective tool for this task, it's useful to compare them directly with traditional SELECT statements.

AspectSELECT StatementRisingWave Subscription
Execution ModelPull / Request-ResponsePersistent Stream
A stateless, one-time query for data.A stateful, durable stream that a client can connect to and consume from over time.
Data ScopePoint-in-Time SnapshotContinuous Incremental Changes
Returns the entire state of the data.Delivers a stream of individual change events (inserts, updates, deletes) as they happen.
Efficiency for Detecting ChangesInefficientHighly Efficient
Requires re-querying the full dataset.The database tracks and pushes only the delta, minimizing data transfer and processing.
Primary Use CaseAd-hoc querying and reporting.Building event-driven applications.

In short, using SELECT to poll for changes is inefficient and complex. A subscription provides a purpose-built, efficient, and reliable mechanism for consuming a continuous stream of updates.

Building a Real-Time Order Tracker with Subscriptions

Let's walk through a common use case: streaming the results of a materialized view that tracks e-commerce order status updates.

Step 1: Set Up a Materialized View for Live Order Status

First, let's create a materialized view in RisingWave. This view will always contain the latest status for every single order.

-- Create a materialized view to track the latest order status
CREATE MATERIALIZED VIEW order_status_updates AS
SELECT order_id, status
FROM orders;

Step 2: Create a Subscription to Watch for Changes

Now, we can create a subscription on this view. This tells RisingWave we want to be notified of any changes to any order's status.

-- Create a subscription on our view
CREATE SUBSCRIPTION order_sub 
FROM order_status_updates
WITH ( retention = '24h' );

Step 3: Fetch the Updates

With our subscription ready, we can start listening for changes. We use a cursor to read the stream of updates. When an order's status is updated, our subscription catches it immediately.

For example, let's say an order's status changes from 'Processing' to 'Shipped'.

-- A packer updates the order status in the 'orders' table
UPDATE orders SET status = 'Shipped' WHERE order_id = 'ORD123';
-- Now, let's fetch this change from our subscription
-- First, we declare a cursor to read from our position
DECLARE cur SUBSCRIPTION CURSOR FOR order_sub;
-- Then, we fetch the change
FETCH NEXT FROM cur;
---- RESULT 
order_id | status | op | rw_timestamp
----------+-------------+--------------+----------------- 
ORD123 | Processing | UpdateDelete | 1678886400000 
ORD123 | Shipped | UpdateInsert | 1678886400000
(2 rows)

As you can see, we didn't get a list of all orders. We only got the exact change that happened—the old status was deleted and the new one was inserted. This is exactly the information our application needs to send a "Your order has shipped!" notification.

Powering an Application: A Python Example

Because subscriptions use the standard Postgres protocol, connecting them to a backend service is easy. Here’s how a Python application could listen for these order updates to power a notification service.

import psycopg2
import time

def order_notification_service():
    # Connect to RisingWave
    conn = psycopg2.connect(
        host="localhost", port="4566", user="root", database="dev"
    )
    try:
        cur = conn.cursor()

        # Create a cursor to read from our order subscription
        cur.execute("DECLARE cur SUBSCRIPTION CURSOR FOR order_sub;")

        print("Listening for order updates...")
        while True:
            cur.execute("FETCH NEXT FROM cur;")
            change = cur.fetchone()

            if change:
                # Example change: ('ORD123', 'Shipped', 'UpdateInsert', ...)
                order_id, status, operation = change[0], change[1], change[2]

                # We only care about the new status
                if operation == 'UpdateInsert':
                    print(f"Update detected for {order_id}: status is now {status}. Triggering notification!")
                    # TODO: Add logic here to send an email or push notification
            else:
                # No new updates, wait a second
                time.sleep(1)

    finally:
        print("Closing connection.")
        cur.close()
        conn.close()

if __name__ == "__main__":
    order_notification_service()

Save this script as order_service.py and run it from your terminal: python order_service.py. It will start printing the initial data and then wait for new changes. Try running more INSERT or UPDATE commands in your SQL client and watch them appear in the service's output instantly.

# output in the terminal
[NOTIFICATION] New order 'ORD123' has been created with status: 'Shipped'
[NOTIFICATION] New order 'ORD124' has been created with status: 'Processing'
[NOTIFICATION] New order 'ORD125' has been created with status: 'Confirmed'

Key Benefits of Using Subscriptions

  • Reduced Architectural Complexity: Stream data directly from tables or materialized views without needing a separate message broker.

  • Configurable Data Retention: Changes are retained persistently by the subscription object, with configurable time- or size-based retention policies to ensure even offline clients don't miss data.

  • Built-in State Initialization: The default cursor behavior provides a full data snapshot, allowing new applications to reliably initialize their state.

  • Client-Controlled Batching: Efficiently fetch data in batches of any size with the FETCH command, giving you full control over throughput.

  • Broad Compatibility: Integrates with your existing applications using any language with a standard PostgreSQL driver.

Get Started with RisingWave

RisingWave Subscriptions are a game-changer for building modern, event-driven applications. By efficiently streaming only what has changed, you can easily build features like live order tracking, financial data feeds, or real-time monitoring dashboards.

If you’d like to see a personalized demo or discuss how this could work for your use case, please contact our sales team.

The Modern Backbone for Your
Data Streaming Workloads
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.