What is Supabase?


Supabase enables developers to efficiently build production-ready products. It utilizes Postgres as the underlying OLTP database for handling transactional business logic and offers comprehensive toolkits to accelerate the delivery of common application features. Additionally, Supabase Realtime enhances the development toolkits by empowering real-time scenarios such as online chat rooms and real-time monitoring dashboards. It provides a convenient framework to develop real-time event-driven applications.


How does RisingWave Empower Stream Processing?


Using the data directly from the OLTP database might not be sufficient for many use cases. Let's consider the real-time monitoring dashboard as an example. We cannot directly obtain the total number of users who have recently modified their username within the last 5 minutes directly from the database. Usually, these metrics are not readily available in OLTP databases since they are derived from aggregated results based on raw data within sliding time windows.

To do this seamlessly, we need a real-time data pipeline monitoring the change data capture (CDC) stream of the users table to continuously aggregate the number of username updated in the last 5 minutes. Usually, the results should also be stored at some data service so that the dashboard application can fetch from it.

Close
Featured An example of the real-time monitoring dashboard.

Sounds quite cumbersome! What if we have something like Supabase in stream processing so that we can focus on the business logic. A product can help us automatically build a data pipeline based on SQL statements that describe the desired results. It is elastic, meaning it can scale compute resources based on the amount of data ingested. It can also serve the results directly without external data services. Additionally, it is cost-efficient, easy to maintain, reliable (with guaranteed SLA), and user-friendly. Introducing RisingWave, the streaming database that can fulfill all your needs.

RisingWave was built in early 2021 by RisingWave Labs (formerly known as Singularity Data). It was designed to fulfill all these requirements from day one. After being integrated with various companies' data stacks and used in multiple scenarios, RisingWave is now prepared to democratize stream processing.

In this blog, we will explore an example of how to enhance advertisement performance in an e-commerce scenario using Supabase and RisingWave Cloud. This example is inspired by the examples provided in the book "Designing Data-Intensive Applications", the excellent guideline for data engineering.


Use Supabase Tables as Sources


Let’s first create some tables in Supabase: “products”, “customers”, “promotions” and “sale_events”. The first three tables are entities modeling the participants in the e-commercial business logic. The fourth table “sale_events” is known as the "fact table", keeping all transactions made. The relations are visualized in the Supabase schema visualizer:

Close
Featured The relations of the tables are visualized in the Supabase schema visualizer.

Supabase excels at handling customers' orders and sales as it is an OLTP database. Now, we want to perform stream processing to determine the most effective promotion approach in real-time and dynamically adjust the promotion portfolio. This involves utilizing the data generated from the business, performing real-time calculations on it, and then using the computed result back in the business.

Unlike batch processing, stream processing automatically updates the real-time result with incremental computation algorithm when new data is ingested. Let’s take calculating average value as an example. Instead of summing up all values in a list and dividing it by the total amount of numbers, we can incremental approach update the result in time complexity of O(1):

Close
Featured

where the values of “sum” and “count” are also calculated in an incremental way. RisingWave also optimizes top N, sliding window, and other complicated calculations in stream processing. By running SQL statement EXPLAIN CREATE MATERIALIZED VIEW ... you can get the whole graph of the calculation process.

To simplify things, we use RisingWave Cloud to set up a free-tier RisingWave cluster. RisingWave Cloud helps us manage the cluster and provides useful features such as a web SQL editor, data pipeline visualizer, GUI for source/sink management, database user management, and metrics dashboards.

Once the RisingWave cluster is set up, we need to create database replications of the sale_events and promotions tables in Supabase. These replications will serve as data sources in the RisingWave cluster. We can then use the SQL editor in RisingWave Cloud to create the source table. Instead of using the "CREATE SOURCE" statement, we can connect to the source by using the "CREATE TABLE" statement, which allows us to persist a copy of the data in the RisingWave cluster.

CREATE TABLE sale_events (
  -- The same columns in `sale_events` table in Supabase
  id int8,
  created_at TIMESTAMPTZ, 
  customer_id string,
  product_id int8,
  promotion_id int8,
  quantity int8,
  net_price_cents int8,
  discount_price_cents int8,
  PRIMARY KEY (id)
) 
WITH (
  connector='postgres-cdc',
  hostname = 'db.xxxxxx.supabase.co',
  port = '5432',
  username = 'postgres',
  password = 'xxxxxx',
  database.name = 'postgres',
  schema.name = 'public',
  table.name = 'sale_events',
  publication.name = 'rw_publication' -- Database Replications name in Supabase
);

CREATE TABLE promotions (
  id int8,
  created_at TIMESTAMPTZ, 
  ad_type string,
  product_id int8,
  weight_percent int8,
  PRIMARY KEY (id)
) 
WITH (
  ......
  table.name = 'promotions'
);


Design the Stream Processing Pipelines


Before we program the calculation part, let's examine the data flows of the final design. Firstly, the E-commerce system records sales events in the sale_events table (step 1). The changes in the table are captured and ingested into the RisingWave cluster through CDC streams. To calculate the ROI (Return on Investment) for each promotion, the promotions table is also replicated to the RisingWave cluster (step 2). The data pipelines then calculate the ROI and volatility as intermediate results, which are stored in a materialized view. Based on the intermediate results, new promotion weights can be calculated using SQL-defined strategies (step 3). The final result is stored in another materialized view with the same structure as the promotions table in Supabase. This allows the RisingWave cluster to use JDBC to update the values of the weight column in Supabase (step 4). Subsequently, Supabase Realtime captures the update events in the promotions table, enabling the application to immediately call the APIs and adjust the weights in the E-commerce system (steps 5 & 6).

Close
Featured The data flows of the final design.

Let go back to our use case. To make thing easier, let’s have two assumptions:

  1. The product vendor has a fixed budget for their product promotion. That means we can treat weights as the amount of investment, so the “ROI” we have is actually a relative ROI indicator.
  2. Each product has only two promotion types (we will talk about this later).

The return is the total sales amount which can be calculated from the sale_events table. This indicator can be formed as

Close
Featured

where $n$ is the time window length.


Preprocess Data to Get the Intermediate Results


We use standard deviation of the time-windowed sales amounts as the volatility. With the return rate and the volatility, we can construct a simple optimized portfolio to make the return rate as large as possible while diversifying the idiosyncratic risks.

The sale_events table looks like:

                              sale_events (table)
---------------------------------------------------------------------------
 product_id | promotiom_id     | created_at | discount_price | quantity
------------+------------------+------------+------------------------------
 headphones | Search Result    |      10:03 |           6600 |        1 
 headphones | Youtube Video Ad |      10:08 |           6600 |        2
 guitar     | Search Result    |      10:09 |          19900 |        1
 guitar     | Search Result    |      10:09 |          19900 |        1 
 headphones | Youtube Video Ad |      10:09 |           7299 |        2

Let’s use 5-minute as the time window length, and use product_id and promotion_id as the key to group rows together in a time window.

CREATE MATERIALIZED VIEW promotions_sequences_60mins AS(
  SELECT 
    product_id, 
    promotion_id, 
    sum(total_price) AS sales_amount,
    window_start,
    window_end
  FROM (
    SELECT 
      product_id, 
      promotion_id, 
      discount_price_cents * quantity as total_price,
      window_start,
      window_end
    FROM TUMBLE (sale_events, created_at, INTERVAL '5 MINUTES')
  )
  WHERE window_start > NOW() - INTERVAL '60 minutes'
  GROUP BY window_start, window_end, product_id, promotion_id
  ORDER BY window_start DESC
);

Then we can have something like:

               promotions_sequences_60mins (materialized view)
---------------------------------------------------------------------------
product_id | promotion_id      | sales_amount | creaed_at| window_start | window_end 
------------+------------------+--------------+----------+--------------+--
 headphones | Search Result    |         6600 |    10:08 |        10:05 |      10:10
 headphones | Youtube Video Ad |        13200 |    10:08 |        10:05 |      10:10
     guitar | Search Result    |        19900 |    10:07 |        10:05 |      10:10
     guitar | Youtube Video Ad |        19900 |    10:05 |        10:05 |      10:10
===========================================================================
 headphones | Search Result    |         6600 |    10:04 |        10:00 |      10:05
 headphones | Youtube Video Ad |         6600 |    10:03 |        10:00 |      10:05
     guitar | Search Result    |        19900 |    10:03 |        10:00 |      10:05
     guitar | Youtube Video Ad |        19900 |    10:02 |        10:00 |      10:05
===========================================================================
        ... |              ... |          ... |      ... |          ... |        ... 


Calculate Volatility and ROI Indicator


With this materialized view representing the time series, we can easily calculate the volatility and ROII of the latest time window:

CREATE MATERIALIZED VIEW promotions_stat_60mins AS (
  SELECT
    promotions.product_id AS product_id,
    promotion_id,
    SUM(sales_amount) / SUM(promotions.weight_percent) AS roii,
    StDDEV_SAMP(sales_amount) as vol
  FROM promotions_sequences_60mins as seq
  JOIN promotions ON promotion_id = promotions.id AND seq.product_id = promotions.product_id
  GROUP BY product_id, promotion_id
);

The new materialized view looks like:

        promotions_stat_60mins (materialized view)
------------------------------------------------------------
 product_id | promotion_id     | roii         |         vol 
------------+------------------+--------------+-------------
 headphones | Search Result    |         89.3 |       0.532 
 headphones | Youtube Video Ad |         67.8 |       0.110 
     guitar | Search Result    |        125.7 |       0.239 
     guitar | Youtube Video Ad |        293.3 |       0.614 

Suppose the correlation between different promotion types is zero. Then the new weights for the product headphones can be determined by constructing a portfolio with the maximum Sharpe ratio. The math representation of maximizing the Sharpe ratio is:


Sink the New Values to Supabase

Close
Featured

With the help of Lagrange multiplier method, we can know the weights are:

Close
Featured

Now It is time to calculate the new weights and ingest them to Supabase using JDBC sink:

CREATE SINK promotion_update AS (
  SELECT 
    product_id,
    promotion_id,
    o2.vol^2 * o1.roii / (o1.roii * o2.vol^2 + o2.roii * o1.vol^2) as weight
  FROM promotions_stat_60mins as o1
  JOIN promotions_stat_60mins as o2 ON o1.product_id = o2.product_id AND o1.promotion_id <> o2.promition_id
  GROUP BY product_id
) WITH (
  connector='jdbc',
  jdbc.url='jdbc:postgresql://xxxx.supabase.co:5432/postgres?user=postgres&password=xxx',
  table.name = 'promotions',
  type = 'upsert'
);

This sink will use JDBC to update the values in the promotions table in Supabase if there are any changes. Supabase Realtime will further sense the changes in promotions table and take actions in real-time, which completes the real-time control loop of our dynamic advertisement system.

We assume each products has only two promotions, because calculating Sharpe ratio based optimal portfolio is too difficult for pure SQL expressions. However, RisingWave supports Python, Java and Rust UDF (User Defined Function), which can easily solve the convex quadratic optimization problem. Additionally, it provides support for User Defined Aggregate Functions (UDAFs) and User Defined Table Generating Functions (UDTFs) to adapt a wide array of needs. You can find more resources about UDF in our documentation website.

CONCLUSION

This example demonstrates how RisingWave can offer a seamless development experience for Supabase developers when building real-time applications. RisingWave is also Postgres-compatible, which means that experienced Supabase developers do not need to frequently check syntax during development. RisingWave also supports many sources and sinks, allowing developers to integrate with their current architecture.

The example highlights the user-friendly nature of RisingWave, making it easier to harness streaming data pipelines. However, real-world cases are often more complex than the example. Fortunately, RisingWave has supports for hard-core features like exactly-once semantics, snapshot read, second-level recovery and scaling, complicated join operations, and more, to accommodate various scenarios in production environments. Access our source code on GitHub or start a free-tier cluster in RisingWave cloud to start your new journey in stream processing with RisingWave!

Related Posts_
[ Blogs ]

Change Data Capture With MySQL and RisingWave Cloud

In this blog, we'll walk through setting up MySQL and RisingWave to create a streaming pipeline where RisingWave ingests and processes CDC events from MySQL in real-time.

[ Blogs ]

Real-Time Website Security Monitoring with WarpStream, RisingWave, and Grafana

In this blog, we have presented the development of a real-time security threat monitoring system that integrates RisingWave, WarpStream, and Grafana. The setup process for the entire system is quite straightforward. To monitor each metric, you only need to create a single materialized view in RisingWave and visualize it in Grafana.

[ Blogs ]

Continuous E-Commerce Data Pipeline with PostgreSQL CDC and RisingWave Cloud

In this blog post, we explored the combination of PostgreSQL, Change Data Capture (CDC), and RisingWave to build a robust e-commerce data pipeline. PostgreSQL serves as the storage for transactional data, CDC captures and tracks changes efficiently, and RisingWave enables real-time analytics capabilities such as sales monitoring and inventory management. By adopting this architecture, an e-commerce system can achieve responsiveness and adaptability to dynamic market conditions.

Try RisingWave
Serverless stream processing with Postgres-style SQL. Build real-time applications and data pipelines faster than ever before.
Learn more

sign up successfully_

Welcome to RisingWave community

Get ready to embark on an exciting journey of growth and inspiration. Stay tuned for updates, exclusive content, and opportunities to connect with like-minded individuals.

message sent successfully_

Thank you for reaching out to us

We appreciate your interest in RisingWave and will respond to your inquiry as soon as possible. In the meantime, feel free to explore our website for more information about our services and offerings.

subscribe successfully_

Welcome to RisingWave community

Get ready to embark on an exciting journey of growth and inspiration. Stay tuned for updates, exclusive content, and opportunities to connect with like-minded individuals.