With the exponential growth in data generation through various events, the significance of real-time analytics has become increasingly evident. However, what exactly does real-time analytics involve? Is it merely faster data analysis? Can fast historical data analysis be considered real-time analytics?

The key point is not to analyze data fast, but to analyze fresh and well-formatted data fast. That means data needs to be ingested and transformed in real time before analytics is performed.

To achieve seamless real-time data ingestion, transformation, and analytics, a powerful combination to explore is RisingWave and ClickHouse. RisingWave is a PostgreSQL-compatible database specifically designed for stream processing. It excels at ingesting real-time data streams, performing diverse transformations, and enabling instant querying of results. ClickHouse® is a high-performance, column-oriented SQL database management system (DBMS) purpose-built for efficient online analytical processing (OLAP) of large data volumes and complex analytical queries.

By utilizing RisingWave's real-time data transformation capabilities, you can preprocess and enhance data prior to ingestion into ClickHouse, ensuring that the data aligns with your precise analytical requirements when it reaches ClickHouse.


Use case: Enrich cart events with product details


In this blog post, I’ll use an online retail example to show you how to build a system that ingests, transforms, and analyzes data in real time.

We have a stream of data for recording events that are triggered when customers add merchandise to their cart.

Typical cart events look like this:

(customerId, eventTime, itemId)
--------------------------------
("1234","2023-02-01 10:01:00","P001")
("1234","2023-02-01 10:05:00","P002")

As there is not product information in this stream, it’s hard to do analysis directly based only on this stream. There are several ways to make this stream useful. We can join it with the orders stream to analyze items that have been added to card but not purchased, or join it with a product catalogue table to form an enriched stream.

In our demo, we’ll join this stream with the product catalog table, and output the enriched stream to ClickHouse for further analysis.

The product_catalog table may look like this:

 itemId, name, price, category
-------------------------------------
 ("P001","Red T-Shirt",9.99,"Apparel"),
  ("P002","Blue Jeans",39.95,"Apparel")


Architecture


We use RisingWave to do real-time data ingestion and enrichment. We then output the enriched data to ClickHouse, so that further analysis can be performed.

Close
Featured Real-time data ingestion and enrichment flow.


Preparation

  • Ensure psql is installed. To see how to install psql without the rest of the PostgreSQL component, see Install psql without PostgreSQL.
  • Ensure a Kafka producer is up and running. I started Kafka with KRaft. Please find the instructions here.
  • Install and connect to RisingWave.
# Install RisingWave
brew tap risingwavelabs/risingwave
brew install risingwave
# Start RisingWave
risingwave playground

# Connect to RisingWave in a new terminal window
psql -h localhost -p 4566 -d dev -U root
  • Install and connect to ClickHouse.
# Download the binaries
curl <https://clickhouse.com/> | sh
# Start the server
./clickhouse server
# Start the client in a new terminal window
./clickhouse client


Write events to Kafka


Let’s now create a topic and inject several events:

# Create a topic
bin/kafka-topics.sh --create --topic cart-events --bootstrap-server localhost:9092
# Write three events to the topic
bin/kafka-console-producer.sh --topic cart-events --bootstrap-server localhost:9092

{"cust_id": "1234", "event_time": "2023-02-01 10:01:00", "item_id": "P001"}
{"cust_id": "1232","event_time": "2023-02-01 10:05:00", "item_id": "P002"}
{"cust_id": "1235","event_time": "2023-02-01 10:10:00","item_id": "P003"}

# Keep the producer active, as we'll write more events later


Ingest data into RisingWave


Let’s now create a table in RisingWave to ingest these events. In RisingWave, you can create either a source or a table for ingesting events. The difference is that events will be persisted in RisingWave if you use a table.

CREATE TABLE IF NOT EXISTS cart_event (
cust_id VARCHAR,
event_time TIMESTAMP,
item_id VARCHAR
)
WITH (
   connector='kafka',
   topic='cart-events',
   properties.bootstrap.server='localhost:9092',
   scan.startup.mode='earliest',
) FORMAT PLAIN ENCODE JSON;

Now create a local table to store product catalog and insert some data in RisingWave, so that we can enrich cart events with product catalog information.

CREATE TABLE product_catalog (
item_id varchar,
name varchar,
price double precision,
category varchar
);

INSERT INTO product_catalog (item_id, name, price, category)
VALUES 
  ('P001','Red T-Shirt',9.99,'Apparel'),
  ('P002','Blue Jeans',39.95,'Apparel'),
  ('P003','Smart Watch',199.99,'Electronics'),
  ('P004','Yoga Mat',29.95,'Fitness'), 
  ('P005','Wireless Headphones',99.99,'Electronics'),
  ('P006','Coffee Mug',5.99,'Kitchen');


Join the stream with the table


Now let’s join the cart_event stream with the product_catalog table to form an enriched data stream. We can use a materialized view to perform the stream-table join if we also want to do some other transformations. If we just want to enrich the stream with the product catalog, we can simply create a sink to do the join. For this demo, we’ll use a materialized view.

RisingWave takes a new approach to materialized views for streaming data. They continuously reflect real-time results through incremental updates.

CREATE MATERIALIZED VIEW data_enrichment AS SELECT 
  c.cust_id,
  c.event_time,
  p.name,
  p.price,
  p.category
FROM
  cart_event c
JOIN
  product_catalog p 
ON 
  c.item_id = p.item_id;

With this stream-table join, an enriched event will be produced as soon as a raw event comes into RisingWave.


Deliver the enriched stream to ClickHouse


We can sink the enriched stream to ClickHouse for further analysis. To make it happen, we need to create a table in ClickHouse with the same schema as the object in RisingWave. As we want to sink data from the data_enrichment materialized view, let’s create a table with the same schema as data_enrichment.

---Run this query in ClickHouse
CREATE TABLE enriched_cart_events
(
cust_id String,
event_time DateTime64,
name String,
price Float64,
category String
)
ENGINE = ReplacingMergeTree()
ORDER BY (cust_id, event_time);

When the sink destination is available, we can create the sink and start streaming data out of RisingWave to ClickHouse.

---Run this query in RisingWave
CREATE SINK sink_to_clickhouse
FROM
    data_enrichment WITH (
    connector = 'clickhouse',
		type='append-only',
		force_append_only='true',
    clickhouse.url = '<http://0.0.0.0:8123>',
    clickhouse.user = 'default',
    clickhouse.password = '',
    clickhouse.database = 'default',
    clickhouse.table='enriched_cart_events',
);

Let’s now query the ClickHouse table to see if the data has come through.

SELECT * from enriched_cart_events;

------ RESULTS
┌─cust_id┬──────event_time───┬─name────┬─price─┬category─┐
│ 1234    │ 2023-02-01 18:01:00.000 │ Red T-Shirt │  9.99 │ Apparel     │
│ 1232    │ 2023-02-01 18:05:00.000 │ Blue Jeans  │ 39.95 │ Apparel     │
│ 1235    │ 2023-02-01 18:10:00.000 │ Smart Watch │ 199.99│ Electronics 
└─────────┴─────────────────────────┴───────┘

We can see that the three events have been enriched and available for use in ClickHouse.

To mock up a stream, let’s write one stream at a time, and then immediately query the table in ClickHouse.

# Write one event at a time
{"cust_id": "1236","event_time": "2023-02-01 10:15:00","item_id": "P001"}
{"cust_id": "1237","event_time": "2023-02-01 10:20:00","item_id": "P004"}
{"cust_id": "1238", "event_time": "2023-02-01 10:25:00", "item_id": "P002"}
{"cust_id": "1239", "event_time": "2023-02-01 10:30:00", "item_id": "P005"}
{"cust_id": "1240", "event_time": "2023-02-01 10:35:00", "item_id": "P003"}
{"cust_id": "1241", "event_time": "2023-02-01 10:40:00", "item_id": "P006"}
{"cust_id": "1242", "event_time": "2023-02-01 10:45:00", "item_id": "P007"}

The enriched data is available in the ClickHouse table as soon as we write an event to Kafka. Each time you query the table in ClickHouse, you’ll see one more row of data.

You should see something like this when you write all above message to Kafka.

SELECT * FROM enriched_cart_events;

------ RESULTS

┌custid┬─────eventtime──┬─name───────┬─price─┬─category─┐
│ 1232 │ 23-02-01 18:05:00.00│ Blue Jeans      │  39.95 │ Apparel     │
│ 1234 │ 23-02-01 18:01:00.00│ Red T-Shirt     │   9.99 │ Apparel     │
│ 1235 │ 23-02-01 18:10:00.00│ Smart Watch     │ 199.99 │ Electronics │
│ 1236 │ 23-02-01 18:15:00.00│ Red T-Shirt     │   9.99 │ Apparel     │
│ 1237 │ 23-02-01 18:20:00.00│ Yoga Mat        │  29.95 │ Fitness     │
│ 1238 │ 23-02-01 18:25:00.00│ Blue Jeans      │  39.95 │ Apparel     │
│ 1239 │ 23-02-01 18:30:00.00│ Wireless Phones │  99.99 │ Electronics │
│ 1240 │ 23-02-01 18:35:00.00│ Smart Watch     │ 199.99 │ Electronics │
│ 1241 │ 23-02-01 18:40:00.00│ Coffee Mug      │   5.99 │ Kitchen     │
└───┴─────────────┴──────────┴─────┴───────┘

Conlusion

For simplicity, we have demonstrated the case of joining a stream with a table to enrich data. In real-world scenarios, we can join multiple streams, filter and aggregate data in streams, before streaming the data to ClickHouse. By leveraging RisingWave’s capabilities for real-time data ingestion and transformation, you can ensure that ClickHouse receives a higher quality of data that is instantly available for thorough analysis. These two systems make a perfect combo.

In the previous two articles, we learned that the RisingWave stream processing engine performs incremental computation on the change streams derived from the base relational tables to obtain the change streams of the target relational tables. In this model, since the input change streams are unbounded, the stream processing engine can choose to process arbitrary portions of the streams, and the output change streams can also include materialized intermediate results. For example, consider the following case.

CREATE TABLE Votes(user_id int, story_id int);

CREATE MATERIALIZED VIEW StoriesVC AS
SELECT story_id, COUNT(*) AS vcount
FROM Votes GROUP BY story_id

/* Txn 1 */
INSERT INTO Votes VALUES (1,1), (2,1), (3,2);
/* Txn 2 */
DELETE FROM Votes WHERE user_id = 1 AND story_id = 1;
/* Txn 3 */
INSERT INTO Votes VALUES (4,1), (5,2);
Close
Featured Compute with the different portions.

The output streams in the three cases differ for the same input stream, but all are correct. The final materialized results of the output streams are (1, 2) and (2, 2).


Barrier


In the above example, the most obvious difference is that the input stream is divided into different granularities and processed segment by segment. Like some other stream processing systems, RisingWave introduces the barrier on the stream. The sources on the computing DAG insert barriers into the data stream, which divide the input stream into many segments.

After receiving a barrier, each operator performs the following steps:

  1. Alignment: Operators with multiple input streams (such as join and union) need to consume data from other streams until they reach their respective barriers, and finally collect all input streams under the same barrier.
  2. Flush: The operator processes all the inputs received so far and outputs the resulting change stream to the downstream.
  3. Broadcast: Sends the barrier to the downstream operators.

The example mentioned above illustrates the insertion of barriers as follows:

Close
Featured Barrier example with the different portions.

By the way, barriers also play a crucial role in the fault tolerance and recovery of the RisingWave stream processing. From the perspective of ACID transactions, here we use barriers to constrain the behavior of operators, ensuring that when nodes in the graph receive a barrier, all the change streams they have received are based on the same version of the base table. This achieves snapshot isolation and ensures the consistency of the states of nodes in the graph. In the upcoming detailed articles on fault tolerance and recovery in RisingWave stream processing, we will further explore how RisingWave ensures atomicity and durability.


Barrier injection strategy


Barriers control the computation of each operator in the stream processing graph, keeping them in sync. The strategy for injecting barriers into the data stream at the source nodes is crucial.

One straightforward approach is to inject barriers at fixed time intervals (barrier interval). For the compute engine, batching can eliminate unnecessary intermediate results and provide better performance. However, in practical applications, users have certain freshness requirements for the output results, so the compute engine cannot accumulate batches indefinitely. Here, the barrier interval sets the upper limit for batching. When the barrier arrives, batching must be stopped and the current intermediate results are output to the downstream. This allows users to achieve a trade-off between performance and freshness by simply configuring a parameter.

On the other hand, for change streams with transactional semantics, sources can maintain the transactional semantics of the upstream by controlling the insertion of barriers without interrupting the upstream transactions.


Watermark and trigger


Regarding when to trigger the output of stream processing, another classic approach is the Dataflow Model proposed by Google in 2015. It partitions the data based on time windows defined on the event time column and uses watermarks to describe the completeness of events. This allows the compute engine to process the events within each window in a batch-like manner only when the events are complete and will not change. This triggering method is also known as completeness triggering.

Watermark(t) is inserted into the data stream, indicating that events with an event time earlier than t have already arrived in the data stream and will not appear again. In RisingWave, this means that there will be no changes on records with event time before t. Under this model, the computation of operators is triggered by the watermark, and operators output results when the watermark determines that the computation results will not change.

For example, the following graph shows an example of counting within a tumble window of 5 seconds. When the aggregation operator receives a watermark with tm > 12:00:11, it knows that there will no longer be data in the windows starting at 12:00:00 and 12:00:05. At this point, the operator can output the results of these two windows.

Close
Featured Watermark example.

Where does the watermark come from? In practice, few upstream systems can provide perfect watermark semantics guarantees when producing streams. Typically, watermark injection is specified in the stream computing system through timeouts or other strategies. For example, in RisingWave, you can define a watermark with a timeout of 5 seconds using SQL. More details can be found in the documentation. In future articles, we will discuss other details of watermarks, such as handling late data.


EMIT ON WINDOW CLOSE


When it comes to window computations, we have two output strategies: the first one is to output partial computation results even if the window is not closed when a barrier is reached, known as "emit on update." The second one is to ignore the barriers and output the complete computation results only when the window is closed, named as "emit on window close" (EOWC). The first strategy provides better real-time consistency, while the second strategy ensures that the output data is no longer subject to changes and eliminates the intermediate state. RisingWave defaults to the first strategy, but when users require the second strategy, we provide an experimental EOWC syntax. It is mainly used in the following scenarios:

  1. When there is a need for an append-only output stream. This is often required by downstream systems such as Kafka or S3.
  2. When better performance is desired at the cost of some delay. The EOWC semantics eliminate the intermediate state of certain operators and allow downstream operators to consume an append-only stream. This optimization can significantly improve performance, particularly in queries involving window aggregation and OVER window functions.
  3. In scenarios like monitoring and alerts, sometimes users do not want to see intermediate results but only expect to see the aggregated results after the window is closed.

Conlusion

This article showcases two modes of triggering computation in the RisingWave stream processing engine. The default barrier-triggered computation ensures the consistency of the states between nodes in the computation graph, resulting in data consistency between materialized views. It also allows users to balance cost and freshness by adjusting the barrier interval. The EOWC semantics introduced from the Dataflow Model leverages additional watermark definitions to trigger computation only when the window’s events are complete. This mode plays a unique role in some scenarios such as append-only sinks and window aggregation.

In the previous article, "Deep Dive into the RisingWave Stream Processing Engine - Part 1: Overview," we described the design principles and core features of the RisingWave stream processing engine. In this article, we will focus on the computational model.


SQL and relational algebra


In the previous article, we mentioned that RisingWave is a streaming database that uses SQL to define stream processing tasks declaratively. Let's first look at the computational model of a classic SQL query engine and then think about how to design the stream processing computational model under SQL.

SQL has become a long-lasting database query language due to the underlying relational algebra's significant role. Any SQL statement can be equivalently transformed into an operator tree composed of relational algebra operators. For example, the following SQL query can be represented as the operator tree in the figure below.

CREATE TABLE StoriesVC(story_id int, vote_count int);
CREATE TABLE Stories (id int, title text);

/* Get stories information with more than 1 vote*/
SELECT id, title, vote_count 
FROM Stories JOIN StoriesVC
ON StoriesVC.story_id = Stories.id
WHERE vote_count > 1;
Close
Featured The operator tree of the aforementioned SQL query.

Relational algebra, as an algebraic system, satisfies the closure property, meaning that the input and output of any relational algebra operator are relations with specific structures (schemas). This allows relational operators to be flexibly nested without exceeding the scope of relational algebra. Query engines can leverage the characteristics of relational operations and use a unified framework for processing. This flexibility is reflected in SQL through the presence of subqueries everywhere. Users can chain their own defined SQL queries together. For example, the previous query can evolve into the following query.

/* Get stories information with more than 1 vote*/
SELECT id, title, vote_count 
FROM Storiess JOIN (
    SELECT count(*) AS vote_count, story_id FROM Votes;
) StoriesVC
ON StoriesVC.story_id = Stories.id
WHERE vote_count > 1;

Through the analysis of SQL and relational algebra, we can conclude that to support the computational model of stream processing using SQL, the following two requirements need to be met:

  1. Operators need to be extended from traditional relational algebra operators so that SQL can be transformed into corresponding classic SQL operators and then into corresponding stream operators.
  2. The closure property needs to be satisfied to form a new algebraic system where the input and output have the same properties.

In summary, the key issue in the computational model is the definition of the computational objects.


Computational model based on TVR


Obviously, computational objects must be extended from relations to meet the first requirement. One attempt to solve this problem is the streaming SQL on Time-varying Relations (TVR). This model is explained in detail in Chapter 8 of "Streaming Systems" and "One SQL to Rule Them All – an Efficient and Syntactically Idiomatic Approach to Management of Streams and Tables." In simple terms, TVR is a time-varying multi-version relation. For any classic SQL operator, the output TVR can be calculated from each snapshot version of the input TVR, resulting in the desired output TVR. This satisfies the two requirements mentioned above.

TVR is a theory-oriented model. If we build a query engine that adheres to its original definition, every time an upstream TVR changes, generating a new snapshot version and calculating the output result with full computation is required. In the end, we would obtain an algorithm for fully calculating and refreshing materialized views. For example, if we build a materialized view called StoriesVC and make modifications to the base table Votes, the calculation process would be as shown in the figure below. Each modification generates a new snapshot, and to update the materialized view StoriesVC, the calculation process (full computation) needs to be re-executed on the snapshot.

CREATE TABLE Votes(user_id int, story_id int);

CREATE MATERIALIZED VIEW StoriesVC AS
SELECT story_id, COUNT(*) AS vcount
FROM Votes GROUP BY story_id
HAVING vcount >= 2;

INSERT INTO Votes VALUES (1,1), (2,1), (3,2);
DELETE FROM Votes WHERE user_id = 1 AND story_id = 1;
INSERT INTO Votes VALUES (4,1), (5,2);
Close
Featured The calculation process of the example mentioned earlier under the incremental TVR model.


Computational model based on change streams


Performing full computation on each snapshot is expensive and involves a significant amount of redundant computation. RisingWave, like other stream processing systems, adopts an incremental computational model. Specifically, the input and output of operators are changes to relations. RisingWave's stream computation engine represents changes using a series of INSERT and DELETE operations. In the example mentioned earlier, the computational model under this model is shown in the figure, where "+" represents INSERT and "-" represents DELETE. The changes to the base table Votes go through individual operators to compute the changes to the target materialized view StoriesVC.

Close
Featured The calculation process of the example mentioned earlier under the incremental computational model.

It is worth noting that there can be various ways to represent changes to relations, but they still need to satisfy the closure property, meaning that the representation of changes in the input and output of operators must be the same. Therefore, RisingWave adopts the simplest INSERT and DELETE operations for representing changes, and on top of that, it provides additional desirable properties.


Stream key


A change stream consisting only of INSERT and DELETE without any other guarantees is not friendly to storage engines. Just imagine finding a value from an unordered list. To support fast insertion and deletion, it usually requires a unique key for indexing the data. Therefore, to ensure that there is always a unique key that can be used as a primary key for each materialized view, RisingWave's optimizer performs a bottom-up deduction on the operator tree to ensure that each operator's output has a unique key. It is also known as a stream key, representing the operation on the change stream with respect to this key.

For example, consider the following join materialized view statement. Its output does not have a unique key. The optimizer rewrites it by adding two columns as stream keys in the Scan operator and Join operator, taken from the original primary key UUID of the two tables. These two columns will appear in the result set, and the materialized view can use them as hidden columns for a composite primary key.

CREATE TABLE VisitEvent(
  story_id int,
  user_id int,
  time datetime,
  uuid varchar primary key);
CREATE TABLE Comments(
  story_id int,
  user_id int,
  created_at datetime,
  content varchar,
  uuid varchar primary key);

CREATE MATERIALIZED VIEW mv AS SELECT
  Comments.story_id as story_id,
  Comments.user_id as user_id,
  Comments.contentas content,
  Comments.created_atas comment_time,
  VisitEvent.timeas visit_time,
FROM CommentsJOIN VisitEventUSING(story_id, user_id);
Close
Featured The stream keys derivation of the example.

Since the stream key is derived from the properties of relational algebra itself, the output change stream naturally satisfies the stream key constraint when the input change stream satisfies its own stream key constraint. In this case, the constraint means that INSERT and DELETE operations with the same stream key must alternate and that an existing stream key cannot be inserted or a non-existing stream key cannot be deleted.


Other types of change streams


Readers familiar with stream processing are likely aware that different scenarios may have different representations of change streams, which differ slightly from RisingWave's change streams. In this section, we will briefly introduce append-only streams and upsert streams and describe how RisingWave converts them into its internal representation when these change streams are input from external systems.

An append-only stream is a change stream on a table with specific restrictions. It can only represent changes that INSERT records are appended to the original table. Records that have already been inserted in the table will not be deleted, revoked, or updated in the future. For such a stream, RisingWave uses a Snowflake-like primary key generation algorithm to generate a new column as the stream key. Afterward, this stream becomes an internal RisingWave change stream that only has INSERT operations.

Similarly, an upsert stream is a change stream defined on a table with keys, which includes UPSERT and DELETE operations. The behavior of the UPSERT operation depends on the current state of the key in the table. If the key does not exist in the table, the record is inserted; otherwise, the corresponding record with the key in the table is overwritten. The UPSERT operation loses the state of the record before the change compared to RisingWave's change stream. Not all stream operators efficiently support UPSERT operations. For example, aggregation operations like SUM require knowledge of the old value before the change event to calculate the incremental change in the result, but the upsert stream loses this information. Therefore, RisingWave converts incoming upsert streams into internal change streams. Specifically, RisingWave materializes the corresponding table and when encountering an UPSERT operation, it queries the value of the key in the table and fills in the corresponding DELETE operation as needed.

Close
Featured Convert the upsert stream to RisingWave's change stream.

Conlusion

This article presents the computational model of the RisingWave stream processing engine and describes how to transform declarative SQL queries into a series of stream operators. Starting from the relational algebra behind SQL, it extends to a stream computational model based on TVR and further introduces the change relation stream model of RisingWave. It provides a detailed description of the types and characteristics of change streams in RisingWave and compares them with common change streams in other systems.

About this series


In the "Deep Dive into the RisingWave Stream Processing Engine" series, we aim to share various aspects of the RisingWave stream processing engine from top to bottom through a series of articles, gradually delving into each module's technical principles and implementation details. We'll discuss the design philosophy behind creating this stream processing engine from scratch and dive deeper into the technical intricacies of each module.

Stream processing is still a relatively young field, and many best practices in design and engineering have yet to be standardized. We are eager to engage in deeper discussions with enthusiasts, researchers, and users interested in stream processing. We welcome discussions in our Slack channel and GitHub repo to collectively explore the future of stream processing technology.

So let's get started!


What does RisingWave's stream processing engine do?


In simple terms, RisingWave's stream processing engine supports running long-running distributed computing tasks. It continuously calculates the changes in the result table, which contains the result of a SQL query on a series of source tables, with the changes in the source tables.

This might sound a bit abstract, so let's illustrate it with an example.

In the following SQL, we define a table called "Stories" to maintain detailed information about articles, and users’ like behavior generates an append-only table called "Votes."

CREATE TABLE Stories (id int, author int, title text, url text, PRIMARY KEY id);
CREATE TABLE Votes (user int, story_id int);

Based on these two tables, we can use SQL statements to define a materialized view (MView). For example, the MView StoriesWithVC maintains the number of likes for each article.

CREATE MATERIALIZED VIEW StoriesWithVC AS
SELECT id, author, title, url, vcount
FROM stories
JOIN (
    SELECT story_id, COUNT(DISTINCT user_id) AS vcount
    FROM votes GROUP BY story_id
) VoteCount
WHERE VoteCount.story_id = stories.id;

Furthermore, we can define new MViews on StoriesWithVC.

CREATE MATERIALIZED VIEW Top10VotedStories AS
SELECT * FROM StoriesWithVC
ORDER BY vcount DESC
LIMIT 10;

In this example, RisingWave needs to efficiently maintain the contents of the MView and, since it supports cascading MView on MView, it also needs to calculate the changes in the MView itself. Maintaining the real-time results of the MView is straightforward, so RisingWave's stream processing engine only needs to calculate the downstream table's corresponding changes based on the changes in the upstream table. As shown in the diagram, when the source tables Stories and Votes receive changes, the stream processing engine calculates the changes in the downstream MView StoriesWithVC and MView Top10VotedStories.

Close
Featured Example of how RisingWave handles stream processing.

Design philosophy


SQL-native computational model


It is evident that both input and output of RisingWave's stream processing engine are data changes on relational tables, with well-defined schemas. In fact, the internal operators also use relational changes as the input and output. This is a result of RisingWave's positioning as a streaming database using a standard SQL interface to support users in creating stream processing jobs. From day one, RisingWave's stream processing was designed to handle SQL queries and relational data workloads. This is different from some other stream processing systems that emerged from the big data era. They often initially build a generic stream processing framework and then build a SQL engine based on that framework to provide a SQL interface.


From general to specialized design philosophy


The SQL-native model of computation allows us to set the problem boundaries of RisingWave in stream processing well, allowing us to build the entire stream processing engine with a Top-Down approach. In the designing of SQL features, it is usually started with the most general SQL operators. Then, specialization and optimization are done for different scenarios to achieve optimal performance for specific workloads. This design philosophy ensures that while RisingWave's stream processing model is general, it can also achieve excellent performance for various specific scenarios.

It is worth mentioning that during development, we found that this specialization can be distributed at different levels:

  1. Runtime adaptation of executors: We implemented an LRU-based state cache in the operators, ensuring that when the state is small and memory is abundant, all states can exist in memory.
  2. Optimizer specialization in generating plans: For example, we specialized operators for append-only streams or streams with watermarks, significantly optimizing performance.
  3. Configuration parameters: For instance, users can balance data freshness and execution cost by converging them to a few configuration parameters like barrier_interval.


Core features


Apart from this, some core features run through the design consistently. We consider these features as first-class citizens of the RisingWave stream processing engine and maintain these features consistently in every design and implementation.


Distributed parallel execution engine


RisingWave is a distributed system capable of processing data on a large scale in parallel. The RisingWave stream processing engine can make full use of multi-node and multi-core computing resources. The generated execution plan avoids single-point calculations as much as possible, distributing calculations to multiple nodes to reduce potential single-point bottlenecks in the system.


Flexible and elastic cloud-native scheduling


RisingWave, as a cloud-native stream database, supports dynamic scaling of computing resources on the basis of parallel processing. Since stream processing tasks are long-running, to make dynamic scaling of clusters effective, the stream processing engine must support online scheduling and migration of tasks. This poses significant challenges in terms of design abstractions for state storage, data partitioning, and more. The RisingWave stream processing engine exposes scheduling and migration interfaces to allow external control components to easily migrate and schedule computing tasks.


Fast and efficient fault tolerance and recovery


Fault tolerance is a fundamental capability that modern stream processing systems must have. Stream processing systems typically use a checkpoint mechanism to persist computing states, achieving exactly-once semantics within the system. Simultaneously, it is necessary to minimize or reduce the resource preemption caused by the checkpoint process, which affects foreground stream processing tasks. RisingWave has its own state storage engine and implements asynchronous checkpointing throughout the entire process, making the checkpoint process almost invisible to foreground tasks.


Snapshot isolation of stream computing results


As a stream database, RisingWave's internal objects, such as MViews and tables, need to ensure the consistency of computing results between them. To be more specific, when a user queries different MViews at the same moment, they should be based on the same version of the source table. This requires that changes in the upstream table and corresponding changes in the downstream table be atomically committed in the same write transaction.

This means that every stream operator cannot easily delay data or do batching. It should promptly dispatch changes when receiving upstream data. Readers familiar with stream processing may know that stream operators can provide better performance through batching. We have made two optimizations for this: first, we introduced the concept of an "epoch," which allows multiple changes on the source table within one epoch. It is equivalent to enlarging the size of the aforementioned write transaction, allowing operators to batch within that transaction. Second, for the common use case of stream computing, we designed the query semantics of "EMIT ON WINDOW CLOSE" for batching with watermark RFC: The Semantics of EMIT ON WINDOW CLOSE by fuyufjh · Pull Request #30 · risingwavelabs/rfcs (github.com), allowing users to declare the consistency semantics they desire between MViews and the upstream.


Overall architecture


The overall architecture is shown in the diagram below. The leftmost Frontend node connects with users via pgwire and is responsible for converting DDL SQL statements into stream processing execution plans. It optimizes the plans in the optimizer and sends the execution plans to the central Meta node for registration. Meta node persists the execution plans in a globally consistent and highly available MetaStore and sends computation tasks to computing nodes based on the execution plans. Throughout the entire lifecycle of a streaming job, the Meta node can send control instructions to computing nodes through RPC to implement task migration, scheduling, checkpoint, and failover. External systems or users can also directly control and schedule computing workloads on computing nodes through the interfaces exposed by the Meta node.

Close
Featured The overall architecture.

Conlusion

Starting from practical scenarios, this article introduced the use cases of the RisingWave stream processing engine, described the design philosophy and core features it relies on in architecture design, and provided an overview of the entire stream processing engine’s architecture.

What is Supabase?


Supabase enables developers to efficiently build production-ready products. It utilizes Postgres as the underlying OLTP database for handling transactional business logic and offers comprehensive toolkits to accelerate the delivery of common application features. Additionally, Supabase Realtime enhances the development toolkits by empowering real-time scenarios such as online chat rooms and real-time monitoring dashboards. It provides a convenient framework to develop real-time event-driven applications.


How does RisingWave Empower Stream Processing?


Using the data directly from the OLTP database might not be sufficient for many use cases. Let's consider the real-time monitoring dashboard as an example. We cannot directly obtain the total number of users who have recently modified their username within the last 5 minutes directly from the database. Usually, these metrics are not readily available in OLTP databases since they are derived from aggregated results based on raw data within sliding time windows.

To do this seamlessly, we need a real-time data pipeline monitoring the change data capture (CDC) stream of the users table to continuously aggregate the number of username updated in the last 5 minutes. Usually, the results should also be stored at some data service so that the dashboard application can fetch from it.

Close
Featured An example of the real-time monitoring dashboard.

Sounds quite cumbersome! What if we have something like Supabase in stream processing so that we can focus on the business logic. A product can help us automatically build a data pipeline based on SQL statements that describe the desired results. It is elastic, meaning it can scale compute resources based on the amount of data ingested. It can also serve the results directly without external data services. Additionally, it is cost-efficient, easy to maintain, reliable (with guaranteed SLA), and user-friendly. Introducing RisingWave, the streaming database that can fulfill all your needs.

RisingWave was built in early 2021 by RisingWave Labs (formerly known as Singularity Data). It was designed to fulfill all these requirements from day one. After being integrated with various companies' data stacks and used in multiple scenarios, RisingWave is now prepared to democratize stream processing.

In this blog, we will explore an example of how to enhance advertisement performance in an e-commerce scenario using Supabase and RisingWave Cloud. This example is inspired by the examples provided in the book "Designing Data-Intensive Applications", the excellent guideline for data engineering.


Use Supabase Tables as Sources


Let’s first create some tables in Supabase: “products”, “customers”, “promotions” and “sale_events”. The first three tables are entities modeling the participants in the e-commercial business logic. The fourth table “sale_events” is known as the "fact table", keeping all transactions made. The relations are visualized in the Supabase schema visualizer:

Close
Featured The relations of the tables are visualized in the Supabase schema visualizer.

Supabase excels at handling customers' orders and sales as it is an OLTP database. Now, we want to perform stream processing to determine the most effective promotion approach in real-time and dynamically adjust the promotion portfolio. This involves utilizing the data generated from the business, performing real-time calculations on it, and then using the computed result back in the business.

Unlike batch processing, stream processing automatically updates the real-time result with incremental computation algorithm when new data is ingested. Let’s take calculating average value as an example. Instead of summing up all values in a list and dividing it by the total amount of numbers, we can incremental approach update the result in time complexity of O(1):

Close
Featured

where the values of “sum” and “count” are also calculated in an incremental way. RisingWave also optimizes top N, sliding window, and other complicated calculations in stream processing. By running SQL statement EXPLAIN CREATE MATERIALIZED VIEW ... you can get the whole graph of the calculation process.

To simplify things, we use RisingWave Cloud to set up a free-tier RisingWave cluster. RisingWave Cloud helps us manage the cluster and provides useful features such as a web SQL editor, data pipeline visualizer, GUI for source/sink management, database user management, and metrics dashboards.

Once the RisingWave cluster is set up, we need to create database replications of the sale_events and promotions tables in Supabase. These replications will serve as data sources in the RisingWave cluster. We can then use the SQL editor in RisingWave Cloud to create the source table. Instead of using the "CREATE SOURCE" statement, we can connect to the source by using the "CREATE TABLE" statement, which allows us to persist a copy of the data in the RisingWave cluster.

CREATE TABLE sale_events (
  -- The same columns in `sale_events` table in Supabase
  id int8,
  created_at TIMESTAMPTZ, 
  customer_id string,
  product_id int8,
  promotion_id int8,
  quantity int8,
  net_price_cents int8,
  discount_price_cents int8,
  PRIMARY KEY (id)
) 
WITH (
  connector='postgres-cdc',
  hostname = 'db.xxxxxx.supabase.co',
  port = '5432',
  username = 'postgres',
  password = 'xxxxxx',
  database.name = 'postgres',
  schema.name = 'public',
  table.name = 'sale_events',
  publication.name = 'rw_publication' -- Database Replications name in Supabase
);

CREATE TABLE promotions (
  id int8,
  created_at TIMESTAMPTZ, 
  ad_type string,
  product_id int8,
  weight_percent int8,
  PRIMARY KEY (id)
) 
WITH (
  ......
  table.name = 'promotions'
);


Design the Stream Processing Pipelines


Before we program the calculation part, let's examine the data flows of the final design. Firstly, the E-commerce system records sales events in the sale_events table (step 1). The changes in the table are captured and ingested into the RisingWave cluster through CDC streams. To calculate the ROI (Return on Investment) for each promotion, the promotions table is also replicated to the RisingWave cluster (step 2). The data pipelines then calculate the ROI and volatility as intermediate results, which are stored in a materialized view. Based on the intermediate results, new promotion weights can be calculated using SQL-defined strategies (step 3). The final result is stored in another materialized view with the same structure as the promotions table in Supabase. This allows the RisingWave cluster to use JDBC to update the values of the weight column in Supabase (step 4). Subsequently, Supabase Realtime captures the update events in the promotions table, enabling the application to immediately call the APIs and adjust the weights in the E-commerce system (steps 5 & 6).

Close
Featured The data flows of the final design.

Let go back to our use case. To make thing easier, let’s have two assumptions:

  1. The product vendor has a fixed budget for their product promotion. That means we can treat weights as the amount of investment, so the “ROI” we have is actually a relative ROI indicator.
  2. Each product has only two promotion types (we will talk about this later).

The return is the total sales amount which can be calculated from the sale_events table. This indicator can be formed as

Close
Featured

where $n$ is the time window length.


Preprocess Data to Get the Intermediate Results


We use standard deviation of the time-windowed sales amounts as the volatility. With the return rate and the volatility, we can construct a simple optimized portfolio to make the return rate as large as possible while diversifying the idiosyncratic risks.

The sale_events table looks like:

                              sale_events (table)
---------------------------------------------------------------------------
 product_id | promotiom_id     | created_at | discount_price | quantity
------------+------------------+------------+------------------------------
 headphones | Search Result    |      10:03 |           6600 |        1 
 headphones | Youtube Video Ad |      10:08 |           6600 |        2
 guitar     | Search Result    |      10:09 |          19900 |        1
 guitar     | Search Result    |      10:09 |          19900 |        1 
 headphones | Youtube Video Ad |      10:09 |           7299 |        2

Let’s use 5-minute as the time window length, and use product_id and promotion_id as the key to group rows together in a time window.

CREATE MATERIALIZED VIEW promotions_sequences_60mins AS(
  SELECT 
    product_id, 
    promotion_id, 
    sum(total_price) AS sales_amount,
    window_start,
    window_end
  FROM (
    SELECT 
      product_id, 
      promotion_id, 
      discount_price_cents * quantity as total_price,
      window_start,
      window_end
    FROM TUMBLE (sale_events, created_at, INTERVAL '5 MINUTES')
  )
  WHERE window_start > NOW() - INTERVAL '60 minutes'
  GROUP BY window_start, window_end, product_id, promotion_id
  ORDER BY window_start DESC
);

Then we can have something like:

               promotions_sequences_60mins (materialized view)
---------------------------------------------------------------------------
product_id | promotion_id      | sales_amount | creaed_at| window_start | window_end 
------------+------------------+--------------+----------+--------------+--
 headphones | Search Result    |         6600 |    10:08 |        10:05 |      10:10
 headphones | Youtube Video Ad |        13200 |    10:08 |        10:05 |      10:10
     guitar | Search Result    |        19900 |    10:07 |        10:05 |      10:10
     guitar | Youtube Video Ad |        19900 |    10:05 |        10:05 |      10:10
===========================================================================
 headphones | Search Result    |         6600 |    10:04 |        10:00 |      10:05
 headphones | Youtube Video Ad |         6600 |    10:03 |        10:00 |      10:05
     guitar | Search Result    |        19900 |    10:03 |        10:00 |      10:05
     guitar | Youtube Video Ad |        19900 |    10:02 |        10:00 |      10:05
===========================================================================
        ... |              ... |          ... |      ... |          ... |        ... 


Calculate Volatility and ROI Indicator


With this materialized view representing the time series, we can easily calculate the volatility and ROII of the latest time window:

CREATE MATERIALIZED VIEW promotions_stat_60mins AS (
  SELECT
    promotions.product_id AS product_id,
    promotion_id,
    SUM(sales_amount) / SUM(promotions.weight_percent) AS roii,
    StDDEV_SAMP(sales_amount) as vol
  FROM promotions_sequences_60mins as seq
  JOIN promotions ON promotion_id = promotions.id AND seq.product_id = promotions.product_id
  GROUP BY product_id, promotion_id
);

The new materialized view looks like:

        promotions_stat_60mins (materialized view)
------------------------------------------------------------
 product_id | promotion_id     | roii         |         vol 
------------+------------------+--------------+-------------
 headphones | Search Result    |         89.3 |       0.532 
 headphones | Youtube Video Ad |         67.8 |       0.110 
     guitar | Search Result    |        125.7 |       0.239 
     guitar | Youtube Video Ad |        293.3 |       0.614 

Suppose the correlation between different promotion types is zero. Then the new weights for the product headphones can be determined by constructing a portfolio with the maximum Sharpe ratio. The math representation of maximizing the Sharpe ratio is:


Sink the New Values to Supabase

Close
Featured

With the help of Lagrange multiplier method, we can know the weights are:

Close
Featured

Now It is time to calculate the new weights and ingest them to Supabase using JDBC sink:

CREATE SINK promotion_update AS (
  SELECT 
    product_id,
    promotion_id,
    o2.vol^2 * o1.roii / (o1.roii * o2.vol^2 + o2.roii * o1.vol^2) as weight
  FROM promotions_stat_60mins as o1
  JOIN promotions_stat_60mins as o2 ON o1.product_id = o2.product_id AND o1.promotion_id <> o2.promition_id
  GROUP BY product_id
) WITH (
  connector='jdbc',
  jdbc.url='jdbc:postgresql://xxxx.supabase.co:5432/postgres?user=postgres&password=xxx',
  table.name = 'promotions',
  type = 'upsert'
);

This sink will use JDBC to update the values in the promotions table in Supabase if there are any changes. Supabase Realtime will further sense the changes in promotions table and take actions in real-time, which completes the real-time control loop of our dynamic advertisement system.

We assume each products has only two promotions, because calculating Sharpe ratio based optimal portfolio is too difficult for pure SQL expressions. However, RisingWave supports Python, Java and Rust UDF (User Defined Function), which can easily solve the convex quadratic optimization problem. Additionally, it provides support for User Defined Aggregate Functions (UDAFs) and User Defined Table Generating Functions (UDTFs) to adapt a wide array of needs. You can find more resources about UDF in our documentation website.

CONCLUSION

This example demonstrates how RisingWave can offer a seamless development experience for Supabase developers when building real-time applications. RisingWave is also Postgres-compatible, which means that experienced Supabase developers do not need to frequently check syntax during development. RisingWave also supports many sources and sinks, allowing developers to integrate with their current architecture.

The example highlights the user-friendly nature of RisingWave, making it easier to harness streaming data pipelines. However, real-world cases are often more complex than the example. Fortunately, RisingWave has supports for hard-core features like exactly-once semantics, snapshot read, second-level recovery and scaling, complicated join operations, and more, to accommodate various scenarios in production environments. Access our source code on GitHub or start a free-tier cluster in RisingWave cloud to start your new journey in stream processing with RisingWave!

Background


Egg is a program optimization library written in Rust. It leverages e-graph and equality saturation technologies to efficiently and flexibly construct custom languages and optimize them. This presentation will guide participants in implementing their first SQL language optimizer using Egg, particularly catering to young developers. The objective is to implement a range of classic optimization rules in approximately 1000 lines of code and apply optimization techniques to real TPC-H queries.

This presentation primarily encompasses the following topics:

1.Project Background and Motivation for Utilizing Egg

2.Fundamental Principles and Utilization of Egg

3.The Key Process of Implementing an SQL Optimizer Using Egg

4.Demonstration of Results and Analysis of the Advantages and Limitations of Egg


Why We Use Egg


The story begins with the RisingLight project. This is an OLAP educational database written in the Rust programming language, initiated and maintained by several students during their internship at RisingWave. The primary purpose of creating RisingLight is to provide an opportunity for interested students, engineers, and community developers to gain a deep understanding of the fundamental principles and implementations of databases. Therefore, RisingLight is designed to be simple, comprehensive, and easy to comprehend.

As an OLAP database, the query engine is a core component of it. The following diagram illustrates its overall structure:

Close
Featured The overall structure of RisingLight Query Engine

When a user inputs an SQL statement, it goes through the following steps: Parser, Binder, to create a Logical Plan, then it's fed to the Optimizer to generate a Physical Plan, which is finally handed over to the Executor for execution.

Here, we'll illustrate the relationship between an SQL statement, a query plan, and an optimized query plan through an example:

Close
Featured An example to explain the relationship between an SQL statement, a query plan, and an optimized query plan

Database optimization for SQL statements primarily falls into two main categories: Rule-Based Optimization (RBO) and Cost-Based Optimization (CBO). Both of these approaches involve transforming a specific pattern within a query plan into another pattern. The key difference lies in how they make this transformation. In RBO, it is assumed that the rewritten pattern is always better than the previous one; whereas in CBO, we need to estimate the cost of each pattern based on statistical information and ultimately select the one with the lowest cost from multiple equivalent representations.

Close
Featured Rule-Based Optimization (RBO) and Cost-Based Optimization (CBO)

RisingLight initially adopted the approach of developing its optimizer from scratch. However, over time, we gradually identified several issues associated with this approach.

Close
Featured Optimizer written in pure Rust

Firstly, despite Rust having the pattern matching syntax of "match", it is not always as intuitive and fluent for describing certain patterns. Secondly, performing local rewrites on tree-like structures composed of structs and enums was not very convenient. Lastly, we only implemented RBO and did not implement CBO. This is because CBO requires relatively complex search strategies, data structures, and dynamic programming algorithms. It necessitates a comprehensive framework, which we didn't have the resources to implement at the time.

It wasn't until one day that I came across the "egg" project within the Rust community. After reading the project's website introduction, I immediately realized that this was the Rust optimization framework I had been searching for.


The basic principles of program optimization in Egg


The project homepage of Egg provides a clear illustration of its fundamental principles and optimization process through this image.

Close
Featured The fundamental principles and optimization process of Egg

In Egg, the core data structure is called an "e-graph," where "e" stands for equivalent. The e-graph builds upon traditional graph structures but introduces the concept of "e-classes." An e-class is a collection composed of several nodes called e-nodes, representing that these nodes are logically equivalent to each other. For example, in the diagram below, "a * 2" and "a << 1" are considered equivalent expressions when 'a' is an integer, so they belong to the same e-class.

Close
Featured "e-graph",the core data structure in Egg

The process of rewriting an expression on the e-graph is illustrated in the diagram below. First, we have a rewriting rule that transforms the left-hand pattern (* ?x 2) into the right-hand pattern (<< ?x 1), where ?x represents a match for any subtree. In Egg, the process begins by matching the left-hand pattern ① within the original graph. Then, based on the right-hand pattern, a new subtree is inserted ②, and finally, their root nodes are merged (union) together ③. This completes the rewriting process for a rule.

Close
Featured The process of rewriting an expression on the e-grap

In practical optimization, we typically define a series of rewriting rules in advance and provide an initial expression. Egg's approach is to continually attempt to match various rules within the graph and insert new expressions until it can no longer expand with new nodes. This state is referred to as "Saturation ①", meaning it is saturated. After saturation, Egg uses a user-defined cost function to find the representation with the lowest cost among all possible representations. This becomes the final optimization result. This process is known as "Equality Saturation." It's worth noting that this is a cost-based optimization method, making it naturally suited for implementing CBO rules.

Close
Featured Equality Saturation

Furthermore, another powerful feature in Egg is its support for Program Analysis. It allows us to associate arbitrary data with each e-class to describe some attribute of it. (You can contemplate why it's associating with e-classes rather than e-nodes.) For example, we can define Constant Analysis, where we associate an Option<Value> with each e-class to describe whether each expression within it is a constant. As nodes are dynamically added and merged in the e-graph, the Analysis is also updated accordingly. This capability can be leveraged to implement dynamic constant propagation and constant folding optimizations.

Close
Featured Egg support for Program Analysis


Using Egg to implement an SQL optimizer


Here's an overview of the main process:

The first step when using Egg is to define the language. Egg provides the define_language! macro that allows you to specify the different types of nodes with an enum.

Close
Featured Egg provides the define_language

Here, we have defined four types of nodes:

1.Value nodes: Represent constants or variables.

2.List nodes: Represent ordered lists composed of multiple nodes.

3.Expression operations: Represent operations like addition, subtraction, multiplication, and division.

4.SQL operators: Represent the main nodes of a query plan tree.

The specific specifications for each node are defined in the comments on the right. It's worth noting that this language directly describes query plans rather than SQL statements. Therefore, you'll notice that column names have been transformed into Column IDs by the binder, for example, $1.1 represents the first column of the first table.

Once the language is defined, you can succinctly describe an expression using a Lisp-like S-expression format. In Egg, the RecExpr container type is used to store expressions. Internally, it's an array of nodes, where later nodes can reference earlier nodes, with the last one being the root node. This contiguous and compact memory layout is more efficient than recursive Box structures.

Close
Featured In Egg, the RecExpr container type is used to store expressions.

Next, we continue to use Egg's pattern matching language to define rules. First, we'll start with expression simplification rules, which are foundational rules that need to be implemented in various languages. The Egg official documentation provides many examples, and I believe that most people can understand them at a glance as they are quite intuitive.

Close
Featured Expression simplification rules

Similarly, for SQL operators, there are simple rules like merging two adjacent identical operators, swapping the order of two operators, eliminating redundant operators, and so on. In this context, an (empty) node is introduced to describe an operator that has no output.

Close
Featured Plan simplification rules

Next, let's examine a less trivial optimization, a classic RBO known as Predicate pushdown. As shown in the diagram below, the goal is to push the Filter operator down below the Join operator. This allows the Filter to be executed earlier, reducing the amount of data that needs to be processed.

Close
Featured Predicate pushdown rules

The challenge in this optimization lies in handling different scenarios for predicate expressions on the Filter operator:

1.↙Predicates that only involve columns from the left table A, which can be pushed down to the left subtree.

2.↘Predicates that only involve columns from the right table B, which can be pushed down to the right subtree.

3.□Predicates that involve columns from both left and right tables A and B, making it impossible to push down.

To implement this optimization, we define three rules:① First, we push the predicates on the Filter operator down to the Join conditions. Then, ② and ③ respectively determine whether each predicate can be pushed down to the left or right.

Here, we use conditional rewriting (if) and introduce the columns_is_subsetfunction to make these determinations.

To implement this function, we also introduce an analysis called "Column Analysis." Similar to classic liveness analysis, it associates all expression nodes with the columns they use and associates all operator nodes with the columns included in their output. This way, we can simply check if "the set of expressions is a subset of the set of operators" to determine whether pushing down is possible.

Close
Featured Column Analysis

Finally, let's take a look at CBO. One of the most classic optimizations is Join Reordering. When multiple tables are being joined, the order in which they are joined can have a significant impact on performance. Additionally, most real-world joins involve equijoins on primary keys, which can be optimized using hash tables. This optimization can reduce computational complexity by an order of magnitude compared to the original Nested Loop Join. Therefore, these two optimizations are crucial for multi-table queries.

In the implementation, we use Rule ① to achieve right rotation of the Join subtree. It works on an initial left-deep tree and explores all possible combinations. Then, we define Rules ② and ③ to match patterns of equijoins and transform such Joins into HashJoins. These rules, when combined, can generate numerous ways to join multiple tables effectively.

Close
Featured One of the most classic optimizations is Join Reordering

Finally, to guide Egg in finding the truly optimal solution among all the possible combinations, we need to define a cost function. The entire code for this function has been provided on the right. However, debugging this function can be a challenging task. Any oversight or mistake can prevent Egg from finding the desired optimal result, and pinpointing the error can be quite difficult.

Close
Featured Define a cost function

The primary limitation of Egg in this context is its lack of support for heuristic search. During the rule application process, it does not employ pruning based on the cost function. Instead, it attempts to exhaustively expand all representations before identifying the optimal solution, adhering to the original principles of Equality Saturation. As a consequence, when dealing with a substantial number of rules, Egg encounters the issue of combinatorial explosion, rendering it unable to produce reasonable results within a finite timeframe, even for moderately complex queries. To address this challenge, we have implemented a manual phased optimization approach along with multiple iterative rounds to alleviate the problem.

Close
Featured Heuristic search

The main technical points for implementing an SQL optimizer using Egg have been outlined above. The overall workflow for the refactored query engine using Egg is illustrated in the diagram below:

Close
Featured The complete pipeline with Egg

In addition to the optimizer itself, we also utilize Egg's Analysis functionality for necessary analysis and processing of query plans during the Binding and Executor generation stages, both before and after optimization. The entire expression and query plan heavily rely on Egg's data structures for representation.


Evaluation and Analysis


RisingLight conducts benchmark tests using the classic TPC-H benchmark in the OLAP domain. In this context, we selected Q5 to evaluate the performance of the new optimizer. Q5 involves joining 6 tables, which poses a significant challenge for the optimizer. Our optimizer took 39ms for optimization and 1s for execution. In comparison, DuckDB only took 5ms for optimization and 15ms for execution. This indicates that our prototype system still has a considerable gap to catch up with industry-leading performance.

Close
Featured Evaluation:TPC-H Q5

Certainly, let's break down the specific effects and contributions of each optimization:

Close
Featured Evaluation: TPC-H Q5

It's evident that without optimization, this query would be practically infeasible. Predicate pushdown significantly reduces the amount of data, hash join further reduces computational complexity, and finally, projection pushdown and column pruning (not mentioned in this text) eliminate unused data.

In conclusion, let's summarize the benefits and limitations of Egg:

Close
Featured The benefits and limitations of Egg

Egg's biggest advantage lies in introducing a domain-specific language (DSL) for describing optimization rules in a simple, intuitive, and elegant manner, reducing the complexity of rule writing to an incredible extent. Coupled with the Rust language, it also has the capability to describe complex logic. Interestingly, even though "Rewrite Everything In Rust!" has been promoted at this conference, in this case, it's more like "Rewrite Rust in Lisp." This illustrates that while Rust is an excellent language, it's not a one-size-fits-all solution. When working on programming language related tasks, using a dedicated language is often more suitable.

Because of its simplicity, Egg is well-suited for rapidly prototyping systems. I went from learning about Egg to fully implementing the optimizer in just one week, with the code totaling just over a thousand lines. I believe it's a great fit for educational projects like RisingLight. However, it's important to note that I spent an additional two weeks integrating it into RisingLight's pipeline. Transitioning an existing project to Egg would require a thorough overhaul.

Now, let's discuss some of the issues with Egg, which is why we are not currently considering using it in RisingWave:

Firstly, Egg is inherently a CBO framework, and it is not particularly friendly to scenarios that primarily require pure RBO. This is evident in the fact that you need to provide a cost function to obtain the final optimization results. Egg does not provide a straightforward method for removing the original expression after applying a rule. While some workarounds are possible, and it is theoretically feasible to implement a dedicated RBO solution independently of its Runner, there is room for improvement in this aspect.

Secondly, Egg lacks heuristic search capabilities. This makes it susceptible to the problem of combinatorial explosion when dealing with complex queries, rendering it unable to provide the optimal results within a limited timeframe. Of course, we have mentioned that multiple iterative rounds can be used to alleviate this problem.

Another somewhat tricky aspect is that Egg has essentially a dynamic type system. Note that all types of nodes are defined within the same enum, which means that, from Egg's perspective, combinations of various node types are considered valid. We cannot, for instance, dictate that children of expression nodes can only be other expressions. This leads to challenges in debugging when rules are written incorrectly, such as when parameter order is reversed.

Close
Featured A follow-up project to Egg called "egglog."

Lastly, I'd like to share some recent advancements in the field. Egg is a creation of a programming languages research group at the University of Washington. This year, they have developed a follow-up project to Egg called "egglog." As the name suggests, it combines E-graph and Datalog, creating a rather sophisticated fusion. However, Egglog has evolved into a standalone language rather than a Rust library. Consequently, it might be challenging to integrate Egglog into Rust projects. Nonetheless, it remains an area worth further exploration and experimentation.

CONCLUSION

This article provided insights into the Egg program optimization framework and discussed our experience in using it to implement an SQL optimizer. If you are interested in these topics, you are encouraged to explore further details on RisingLight or even embark on your own journey to implement an SQL optimizer through the sql-optimizer-labs repository.

Two weeks ago, Current 2023, the biggest data streaming event in the world, was held in San Jose. It stands out as one of my favorite events in 2023. Not only was the conference venue conveniently located just a 10-minute drive from my home, but it was also the unique gathering where data streaming experts from around the world openly engage in discussions about technology.

Close
Featured Current 2023, organized by Confluent, is one of the biggest events in real-time data streaming space.

If you missed the event, fret not. My friend Yaroslav Tkachenko from Goldsky has penned a comprehensive blog detailing the key insights from the event. Among the many insights he shared, one that particularly piqued my interest was his comments on streaming databases:

  • “They [Streaming databases] can cover 50% to 80% of use cases, but they’ll never arrive close to 100%. But they can be surprisingly sticky!”
  • “I approached some of the vendors and asked if they planned to introduce programmatic APIs down the road. Everyone said no - SQL should give them enough market share.”

As the founder of RisingWave, a leading streaming database (of which I am shamelessly proud), Yaroslav's observations prompted a whirlwind of reflections on my end. My reflections are not rooted in disagreement; on the contrary, I wholeheartedly concur with his viewpoints. His blog post has spurred me to contemplate SQL stream processing and streaming databases from various angles. I'm eager to share these musings with everyone in the data streaming community and the wider realm of data engineering.


SQL’s Expressiveness


Streaming databases enable users to process streaming data in the same way as they use databases, with SQL naturally being the primary language. Like most modern databases, RisingWave and several other streaming databases prioritize SQL, and they also offer User Defined Functions (UDFs) in languages like Python and Java. However, these databases do not really provide lower-level programmatic APIs.

So, the question we grapple with is: is the expressiveness of SQL (even with UDF support) sufficient?

In my converstation with hundreds of data streaming practitioners, I've found many who argue that SQL alone doesn't suffice for stream processing. The top three use cases that immediately come to my mind are: (1) (rule-based) fraud detection; (2) financial trading; (3) machine learning.

For fraud detection, many applications continue to rely on a rule-based approach. For these, Java often proves more straightforward for expressing rules and directly integrating with applications. Why? Primarily because numerous system backends are developed in Java. If the streaming data doesn't require persistence in a database, it becomes much easier to articulate the logic using a consistent programming language like Java..

In financial trading, there are concerns about the limitations of SQL, especially when it comes to handling specialized expressions that go beyond standard SQL. While it is possible to embed this logic in User-Defined Functions (UDFs), there are concerns about the increased latency that UDFs can introduce. Traditional UDF implementations, which often involve hosting a UDF server, are known for causing significant delays.

When it comes to machine learning use cases, practitioners have a strong preference for Python. They predominantly develop their applications using Python and rely heavily on popular libraries such as Pandas and Numpy. Using SQL to express their logic is not an instinctive choice for them.

While there are numerous real-world scenarios where SQL, with or without UDF support, adequately meets the requirements (you can refer to some of RisingWave's use cases in fintech, and machine learning), it's important to acknowledge that SQL may not match the expressiveness of Java or Python. However, it's worth discussing whether, if SQL can fulfill their stream processing needs, people would choose SQL-centric interfaces or continue relying on Java-centric frameworks. In my opinion, most individuals would opt for SQL. The key point of my argument lies in the widespread adoption and fundamental nature of SQL. Every data engineer, analyst, and scientist is familiar with it. If basic tools can fulfill their needs, why complicate matters with a more complex solution?


SQL’s Audience


Most data systems, like Hadoop, Spark, Hive, Flink, that emerged during the big data era were Java-centric. However, newer systems, like ClickHouse, RisingWave, and DuckDB, are fundamentally database systems which prioritize SQL. Who exactly uses these Java-centric systems, and who exactly uses SQL-centric systems?

I often find it challenging to convince established companies founded before 2015, such as LinkedIn, Uber, and Pinterest, to adopt SQL for stream processing. For sure, many of them didn’t like my pitch solely because they have already had well established data infrastructures and prefer focusing on developing more application-level projects (well, I know these days many companies are looking into LLMs!). But a closer examination of their data infrastructure reveals some patterns:

  • These companies began with their own data centers;
  • They initiated with a Java-centric big data ecosystem, including technologies like Hadoop, Spark, and Hive;
  • They maintain extensive engineering teams dedicated to data infrastructure;
  • Even when transitioning to the cloud, they lean towards building custom data infrastructure on platforms like EC2, S3, or EKS, rather than opting for hosted data services.

There are several challenging factors that hinder the adoption of stream processing by these enterprises:

  • The enterprises have a codebase that is entirely in Java. Although SQL databases do provide Java UDFs, it is not always feasible to frequently invoke Java code within these UDFs.
  • Some of the big data systems they utilize are customized to their specific requirements, making migration exceptionally difficult.
  • Integrating with extensive big data ecosystems often requires significant engineering efforts.
  • There is also a human element to consider. Teams that have been maintaining Java-centric systems for a long time may perceive a shift to hosted services as a threat to their job security.

While selling SQL-centric systems to these corporations can be daunting, don’t be disheartened. We do have success stories. There are some promising indicators to watch for:

  • Companies transitioning to data warehouses like Snowflake, Redshift, or Big Query signal a positive shift. The rising endorsement of the "modern data stack" prompts these companies to recenter their data infrastructure around these SQL-centric warehouses. As a result, they are less inclined to manage their infrastructure or stick with Java-centric systems;
  • Another interesting signal to consider is the occurrence of business shifts and leadership changes. Whether driven by changes in business priorities or new leadership taking charge, these transitions often trigger a reevaluation of existing systems. New leaders may not be inclined to simply maintain the status quo (as there may be limited ROI in doing so) and may be more receptive to exploring alternatives, such as upgrading their data infrastructure.

Interestingly, the growing popularity of SQL stream processing owes much to the support and promotion from Java-centric big data technologies like Apache Spark Streaming and Apache Flink. While these platforms initially started with a Java interface, they have increasingly recognized the importance of SQL. The prevailing trend suggests that most newcomers to these platforms begin their journey with SQL, which predisposes them towards the SQL ecosystem rather than the Java ecosystem. Moreover, even if they initially adopted Java-centric systems, transitioning to SQL streaming databases in the future may be a more seamless pivot than anticipated.


Market Size of SQL Stream Processing


Before delving into the market size of SQL stream processing, it's essential to first consider the broader data streaming market. We must recognize that the data streaming market, as it stands, is somewhat niche compared to the batch data processing market. Debating this would be fruitless. A simple examination of today's market value for Confluent (the leading streaming company) compared to Snowflake (the dominant batch company) illustrates this point. Regardless of its current stature, the streaming market is undoubtedly booming. An increasing amount of venture capital is being invested, and major data infrastructure players, including Snowflake, Databricks, and Mongo, are beginning to develop their own modern streaming systems.

Close
Featured Today’s market cap of Confluent, the leading streaming company, is $9.08B.
Close
Featured Today’s market cap of Snowflake, the dominant batch company, is $53.33B.

It's plausible to suggest that the stream processing market will eventually mirror the batch processing market in its patterns and trends. So, within the batch processing market, what's the size of the SQL segment? The revenue figures for SQL-centric products, such as Snowflake, Redshift, and Big Query, speak volumes. Then what about the market for products that primarily offer Java interface? Well at least in the data infrastructure space, I didn’t see any strong cash cow at the moment. Someone may mention Databricks, the rapidly growing pre-IPO company commercializing Spark. While no one can deny the fact that Spark is the most widely big data system in the world, a closer look at Databricks’ offerings and marketing strategies would soon lead to the conclusion that the SQL-centric data lakehouse is the thing they bet in.

Close
Featured The streaming world vs the batching world.

This observation raises a paradox: SQL's expressiveness might be limited compared to Java, yet SQL-centric data systems manages to generate more revenue. Why is that?

Firstly, as highlighted in Yaroslav’s blog, SQL caters to approximately 50-80% of use cases. While the exact figure remains elusive, it's evident that SQL suffices for a significant proportion of organizational needs. Hypothetically, if a company determines that SQL stream processing aligns with its use cases, which system would they likely opt for? A SQL-centric one or a Java-centric one? If you're unsure, consider this analogy: if you aim to cut a beef rib, would you opt for a specialized beef rib knife or a Swiss Army knife? The preference is clear.

Secondly, consider the audience for Java. Individuals with a computer science background might proficiently navigate Java and grasp system-specific Java APIs. However, expecting those without such a background to master Java is unrealistic. Even if they did, wouldn't they prefer managing the service independently? While it's not absolute, companies boasting a robust engineering team seem less inclined to outsource.


More Than Just Stream Processing


While we've extensively discussed SQL stream processing, it's time to pivot our attention to streaming databases. Classic stream processing engines like Spark Streaming and Flink have incorporated SQL. As previously mentioned, these engines have begun to use SQL as an entry-level language. Vendors are primarily building on SQL, with Confluent’s Flink offering standing as a notable example. Given that both Spark Streaming and Flink provide SQL interfaces, why the push for streaming databases?

The distinctions are significant. Big data SQL fundamentally diverges from database SQL. For instance, big data SQL often lacks standard SQL statements common in database systems, such as create, drop, alter, insert, update, and delete, among others. Digging deeper, one discerns that a pivotal difference lies in storage capabilities: streaming databases possess them, while stream processing engines typically do not. This discrepancy influences design, implementation, system efficiency, cost, performance, and various other dimensions. For those intrigued by these distinctions, I'd recommend my QCon San Francisco 2023 talk (slide deck here).

Furthermore, the fundamental idea behind a database is distinct from that of a computation engine. To illustrate, database users often employ BI tools or client libraries for result visualization, while those using computation engines typically depend on an external system for storage and querying.

Some perceive a streaming database as a fusion of a stream processing engine and a traditional database. Technically, you could construct a streaming database by merging, say, Flink (a stream processing engine) and Postgres (a database). However, such an endeavor would present myriad challenges, including maintenance, consistency, failure recovery, and other intricate technical issues. I'll delve into these in an upcoming blog.

CONCLUSION

Engaging in debates about SQL’s expressiveness becomes somewhat irrelevant. While SQL possesses enough expressiveness for numerous scenarios, languages like Java, Python, and others may outperform it in specific use cases. However, the decision to embrace SQL stream processing is not solely driven by its expressiveness. It is often influenced by a company’s current position and its journey with data infrastructure. Similar to trends observed in the batch domain, one can speculate about the future market size of SQL stream processing. Nonetheless, streaming databases provide capabilities that go beyond mere stream processing. It will be intriguing to witness how the landscape evolves in the years to come.

1. What is RisingWave


RisingWave is a streaming database. It can connect to event streaming platforms like Apache Kafka and can be queried with SQL. Unlike traditional relational databases, the primary use case for a streaming database like RisingWave is for analytical purposes. In that sense, RisingWave behaves much like other OLAP (online analytical processing) products like Amazon Redshift or Snowflake.

In modern applications, however, OLTP (online transaction processing), i.e., relational databases might not be the single source of truth. Think of event-based systems based on Apache Kafka or others. Here, RisingWave can process such streams in real-time and offer a Postgres-compatible interface to this data. Luckily, RisingWave can also persist data on its own, which is what we will use in this article.

Django’s ORM is designed for traditional relational databases. Data is expected to be normalized, constraints are enforced, and relations have an integrity guarantee.

So, why on earth would anyone use an Analytical database, that in its nature cannot enforce constraints, such as NOT NULL, or UNIQUE in a Django application?

The answer to this is simple: Dashboarding. The concept of RisingWave lies in providing (materialized) views on the data for analytical purposes. Given the power and flexibility of Django’s ORM, we could leverage the performance of analytical databases. In this blog post we are not going to use RisingWave as a primary datastore. Rather, we will use a RisingWave database to build a dashboard for the analytical part of our application.

Other products like Amazon Redshift or Snowflake already have connectors for Django. Since RisingWave doesn’t have one yet, we will try to implement our own and learn about the internals of Django ORM by the way. Luckily, RisingWave is mostly Postgres compatible, so we will start from the original Django Postgres driver. However, PostgreSQL compatibility refers to the DQL (Data Query Language) of SQL – or in simple terms: SELECT statements. In this example we will only read from RisingWave, not write to it. We will also avoid migrations. This is because RisingWave’s concept of materialized views with different connectors (Apache Kafka being just one of them) cannot be created with the semantics of Django’s Model classes. Don’t worry though: we have some sample data to play with.

By the end of this blog post, you will have

  • set up a simple docker-compose setup with a Python webserver, a PostgreSQL instance and a RisingWave instance
  • built a Django ORM connector for RisingWave
  • seen a sample dashboard inside of a Django application built on RisingWave


2. The Setup


docker-compose Setup


For our setup, we need three services: One for the Django application, one for Postgres (to store the transactional data like user accounts in Django), and, of course, one RisingWave service. Here is a simple docker-compose setup that does the trick for us.

version: '3.8'

services:
  app:
    image: python

  postgres:
    image: postgres:latest
    restart: unless-stopped
    volumes:
      - postgres-data:/var/lib/postgresql/data
    environment:
      POSTGRES_USER: postgres
      POSTGRES_DB: postgres
      POSTGRES_PASSWORD: postgres

  risingwave:
    image: risingwavelabs/risingwave:latest
    restart: unless-stopped

volumes:
  postgres-data:


Devcontainer


To make things easy, we can leverage VSCode’s charming devcontainer feature that happens to support docker-compose setups, too. Within the comfort of our editor, we can access all these services effortlessly. If you check out the GitHub repository and open it in VSCode, it will offer you to re-open the project in a container. If you do so, the whole setup will spin up in almost no time. Note that you need Docker and the VSCode Devcontainer extension installed.

Close
Featured

In the setup above, we only persist the Postgres database, not RisingWave. You could add a volume mount for the risingwave service as well, but I prefer to have a clean database on every start to explore its features.


3. The Sample Data


Once you have started the three containers from the docker-compose.yaml file, you can access the RisingWave database with the psql command line tool. As for fetching data from the database, RisingWave is protocol-compatible with Postgres.

We use this connection to ingest the test data into RisingWave.

Here is a simple Python script that generates some random order data. We will use this just like in any other relational database to insert data in RisingWave. Note, however, that this is not the primary use case for RisingWave. It really shines when you connect external data sources like Kafka or S3. For the purposes of our blog post, this data is sufficient, however. If you want to learn more about the connectivity features of RisingWave, have a look at their excellent documentation.


Installing The Sample Data


Within our Devcontainer we can use the psql command to connect to the RisingWave service:

psql -h risingwave -p 4566 -d dev -U root

Create table

CREATE TABLE orders (orderid int, itemid int, qty int, price float, dt date, state varchar);

Now, we run gen_data.py and insert the generated fake data into RisingWave:

INSERT INTO orders (orderid, itemid, qty, price, dt, state) VALUES (1000001, 100001, 3, 993.99, '2020-06-25', 'FM');
INSERT INTO orders (orderid, itemid, qty, price, dt, state) VALUES (1000001, 100002, 3, 751.99, '2020-06-25', 'FM');
INSERT INTO orders (orderid, itemid, qty, price, dt, state) VALUES (1000001, 100003, 4, 433.99, '2020-06-25', 'FM');
…


4. The Django Connector


Now, that we have the data ingested in our RisingWave instance, we should take care of retrieving this data from within a Django application. Django comes with built-in connectors to different relational databases, including PostgreSQL. The ORM (read: the Model classes in your Django app) communicate with the connector which in turn communicates with a database drive (e.g., psycopg2 for PostgreSQL). The connector will generate vendor-specific SQL and provides special functionality not provided by other database products.

In the case of an analytical database, such like RisingWave, we would rather need to disable certain functionalities, such like constraint checking or even primary keys. RisingWave purposefully doesn’t provide such features (and neither do other streaming databases). The reason for this is that such analytical databases provide a read (for faster analytics) and write (for faster ingestion of data) optimized storage. The result usually is a denormalized schema in which constraints doesn’t need to be checked as the source of the data (read: the transactional databases or event streaming sources in your application) are considered satisfy integrity needs of the business logic.

We can just start with copying Django’s PostgreSQL connector to a new package called django-risingwave as a start. Further down the line, we are going to use the new connector just for read operations (i.e., SELECTs). However, at least in theory we want to implement at least part of some functioning management (for creating models) and write operations code in our module. Due to the very nature of the difference of scope between transactional and analytical DB engines, this might not work as the primary datastore for Django, but we will learn some of the internals of RisingWave while doing so.

As an ad-hoc test, we want at least the initial migrations to run through with the django-risingwave connector. This is, however, already more than we will need – since our intention is to use the connector for unmanaged, and read-only models.


base.py


In base.py we find a function that gets the column datatype for varchar columns. RisingWave doesn’t support a constraint on the length, so we just get rid of the parameter:

def getvarchar_column(data): return “varchar”

Also, we need to get rid of the data_types_suffix attribute.


features.py


features.py is one of the most important files for Django ORM connectors. Basically, it holds a configuration of the database capabilities. For any generated code that is not vendor specific, Django ORM will consult this file to turn on or off specific features. We have to disable quite a lot of them to make RisingWave work with Django. Here are some highlights, you’ll find the whole file in the GitHub Repo.

First, we need to set the minimum required database version down to 9.5 – that’s RisingWave’s version, not Postgres`.

minimum_database_version = (9,5)

Next, we disable some features that are mostly needed for constraint checking which RisingWave does not support:

enforces_foreign_key_constraints = False
enforces_unique_constraints = False
allows_multiple_constraints_on_same_fields = False
indexes_foreign_keys = False
supports_column_check_constraints = False
supports_expression_indexes = False
supports_ignore_conflicts = False
supports_indexes = False
supports_index_column_ordering = False
supports_partial_indexes = False
supports_tz_offsets = False
uses_savepoints = False


schema.py


In schema.py we need to override the _iter_column_sql method which is not found in the Postgres backend but inherited from BaseDatabaseSchemaEditor. In particular, we get rid of all the part that is put in place to check NOT NULL constraints.


introspection.py


In introspection.py we need to change the SQL generated by the get_table_list method. We treat all views and tables as just tables for our demo purposes.


operations.py


In operations.py, we get rid of the DEFERRABLE SQL part of our queries.

def deferrable_sql(self): return ""


5. A Simple Django Dashboard


Configure Django to use a second database


Django supports the use of multiple database connections in one project out of the box. This way, we can have the transactional part of our database needs in Postgres, and the analytical part in RisingWave. Exactly what we want here!

DATABASES = {
    "default": {
        "NAME": "postgres",
        "ENGINE": "django.db.backends.postgresql",
        "USER": "postgres",
        "PASSWORD": "postgres",
        "HOST": "postgres"
    },
    "risingwave": {
        "ENGINE": "django_risingwave",
        "NAME": "dev",
        "USER": "root",
        "HOST": "risingwave",
        "PORT": "4566",
    },
}


Creating An Unmanaged Model


Now, let’s create a model to represent the data in our RisingWave service. We want this model to be unmanaged, so that it doesn’t get picked up by Django’s migration framework. Also, we will use it for analytical purposes exclusively, so that we diable its ability to save data.

class Order(models.Model):
    class Meta:
        managed = False

    orderid = models.IntegerField()
    itemid = models.IntegerField()
    qty = models.IntegerField()
    price = models.FloatField()
    date = models.DateField()
    state = models.CharField(max_length=2)

    def save(self, *args, **kwargs):
        raise NotImplementedError


Making “Analytical” Queries with Django ORM


As an example for our dashboard, we create four Top-5-rankings:

  1. The Top5 states by total turnover
  2. The Top5 products with the highest average quantity per order
  3. The Top5 overall best-selling items by quantity
  4. The top5 overall best-selling items by turnover

Let’s take a moment to think about how the corresponding SQL queries would look like. These queries will be very simple as they contain a simple aggregation, such like SUM or AVG, a GROUP BY clause and an ORDER BY clause.

Here are the queries we come up with:

  1. select sum(qty*price) as turnover, state from orders group by state order by turnover desc;
  2. select avg(qty), itemid from orders group by itemid order by avg(itemid) desc;
  3. select sum(qty), itemid from orders group by itemid order by sum(itemid) desc;
  4. select sum(qty*price) as turnover, itemid from orders group by itemid order by turnover desc;

How would these queries translate to Django’s ORM?

Django does not have a group_by function on its model but it will automatically add a GROUP BY clause for the values in the values() function. So, the above queries can be written with Django ORM as follows:

  1. Order.objects.values('state').annotate(turnover=Sum(F('price')*F('qty'))).order_by('-turnover')[:5]
  2. Order.objects.values('itemid').annotate(avg_qty=Avg('qty')).order_by('-avg_qty')[:5]
  3. Order.objects.values('itemid').annotate(total_qty=Sum('qty')).order_by('-total_qty')[:5]
  4. Order.objects.values('itemid').annotate(turnover=Sum(F('price')*F('qty'))).order_by('-turnover')[:5]

On top of that, we need to instruct Django to read these queries not from the default connection but from the risingwave service. We can do so by adding a call to the using method like so:

Order.objects.using('risingwave').values('state').annotate(turnover=Sum(F('price')*F('qty'))).order_by('-turnover')[:5]


Using A Database Router


The data in our Order model isn’t present in the Postgres database at all, so it would be nice if any query to this model would be routed through the RisingWave backend.


The Router Class


Django offers a solution to that problem. We can create a database routing class that needs to implement db_for_readdb_for_writeallow_relation and allow_migrate. We only need the db_for_read method here. By not implementing, i.e., passing, the other methods, we can do so. Since we could add multiple routers to the settings.py file, using the database connection in the return statement isn’t enforced but treated as a hint by Django’s ORM.

from .models import Order

class OlapOltpRouter():
    def db_for_read(self, model, **hints):
        if model == Order:
            return 'risingwave'
    def db_for_write(self, model, **hints):
        pass
    def allow_relation(self, obj1, obj2, **hints):
        pass
    def allow_migrate(self, db, app_label, model_name=None, **hints):
        pass


settings.py


To enable this router (which lives in dashboard/dbrouter.py), we need to add it to the DATABASE_ROUTERS setting in our settings.py:

DATABASE_ROUTERS = ['dashboard.dbrouter.OlapOltpRouter']

Conclusion

  • RisingWave is a powerful analytical database engine that can ingest streaming data sources and offers a performant way to query data
  • Since RisingWave offers PostgreSQL compatibility over the wire, we can quickly hack a connector for Django’s ORM based upon the original Postgres connector
  • Django’s ORM allows building analytical queries and can be used to create dashboards integrated in a Django application easily
  • The connector is released in public and still needs a lot of polishing. Check out the GitHub repo.


Originally published at
Building A RisingWave Connector for Django ORM


In our previous article, State Management for Cloud Native Streaming: Getting to the Core, we introduced the core storage engine of RisingWave, Hummock, and its storage architecture. This article will focus on some optimizations that Hummock has made for streaming.

Close
Featured The storage architecture of Hummock, RisingWave's core storage engine.


Consistent Snapshots


Similar to RocksDB, Hummock provides consistency in reading (Snapshot Read), which is crucial for incremental data joins. For detailed information on how Hummock achieves this, please refer to Shared Indexes and Joins in Streaming Databases. Here, we will briefly explain how Hummock implements consistency in reading.

Hummock uses an Epoch bound to the Barrier as the MVCC version for all written data. This allows us to specify the version to read in Hummock using the Barrier that the current operator has passed through. For a given query Epoch, if a target key has a version number greater than the Epoch, it ignores that version of data and locates the latest (newer) version equal to or less than the Epoch.

Similarly, when users query the Materialize View or intermediate state of data, as the queried data may involve multiple ComputeNode nodes, we need a consistent snapshot to ensure the correctness of query results. To achieve this, the Frontend obtains the most recently committed Barrier from the MetaServer at the beginning of each SQL statement or transaction. This Barrier is used as the query Epoch version number. Subsequently, all queries sent from the Frontend to all ComputeNodes use this Epoch to query data.

Suppose a key has multiple versions:

key1: epoch=5, value=v5

key1: epoch=4, value=v4

key1: epoch=3, value=v3

If a user query with epoch=4 is still ongoing, even though the version epoch=4 has been overwritten by epoch=5, we must retain this data during compaction and only remove the epoch=3 version. To determine which data can be reclaimed, RisingWave maintains the epochs of ongoing queries at all Frontend nodes. It periodically reports the minimum epoch of unfinished queries to the MetaServer. The MetaServer collects all reported epoch values and the currently committed barriers, taking the minimum value (safe epoch), and sends it to the Compactor node. The Compactor then follows the rules described earlier to only reclaim historical version data below the safe epoch.

For Streaming operators, since their queries are always greater than or equal to the committed barrier and the current system's safe epoch, no additional data structures need to be maintained.


Schema-aware Bloom Filter


Storage engines in LSM Tree architecture split data files into multiple layers based on write order or other rules. This means that even when reading a very small range of data, it's still necessary to query multiple files, leading to additional I/O and computation overhead. A popular solution is to create a Bloom Filter for all keys in the same file. When a query x`is encountered, the Bloom Filter is used to filter out unnecessary files, and then the remaining files are queried.

Close
Featured Read key from SST

Typically, LSM Tree engines create a Bloom Filter for the entire key. However, RisingWave optimizes this by creating a Bloom Filter for the most appropriate part based on the operator's specific requirements. For example, for the SQL query below, RisingWave would create separate State Tables for A and P. When creating a Bloom Filter, it would only select seller field, allowing the query to filter State Table A for data corresponding to A.seller=P.id when State Table P updates a data entry.

By creating Bloom Filters in this way, RisingWave can improve performance in more scenarios, avoiding unnecessary I/O and significantly boosting query performance.

CREATE MATERIALIZED VIEW nexmark_q3
AS
SELECT P.name,
P.city,
P.state,
A.id
FROM auction AS A
INNER JOIN person AS P on A.seller = P.id
WHERE A.category = 10


Sub Level


To improve the compacting speed of L0 files, we took inspiration from the design of the CockroachDB storage engine pebble.

Close
Featured Organization of SST files in Hummock

As shown in the diagram, files committed by a checkpoint bound to the same barrier are placed in the same sub-level, known as an overlapping level. Subsequently, this overlapping level undergoes compaction to become a non-overlapping level, where multiple files within do not overlap.

This allows us to select only a portion of L1 files for compacting when choosing compact tasks from L0 to L1. This avoids selecting a massive and slow task, thereby increasing parallelism and throughput.


Fast Checkpoint


As an efficient streaming database, RisingWave provides sub-second data real-time capabilities. This means that user input data can reflect in query results in as little as one second. On top of this, RisingWave performs a checkpoint every 10 seconds. If a cluster node crashes for any reason and recovers later, RisingWave only needs to reprocess the last ten seconds of historical data to catch up with the latest user input. This significantly reduces the impact of failures on business operations.

To support such a high-frequency checkpoint, we have made various optimizations in the storage engine:

  • The flush task holds a reference to the memory data and serializes it into file format in the background, uploading it to S3. Therefore, frequent checkpoints do not block data flow calculations.
  • RisingWave divides the entire cluster's data into multiple groups (initially only two, one for storing state and the other for Materialized View). All state changes for operators within the same group on the same computing node are written to the same file. For a single-node cluster, one checkpoint generates only two files.
  • In some scenarios, the generated files are still very small. To avoid increasing the write amplification burden, we have added multiple strategies to determine whether we should first merge multiple small Level 0 files into larger files before merging them into lower levels, as explained in L0 Intra Compaction.


L0 Intra Compaction


Different business scenarios have significant differences in write traffic and data distribution. When the base level is in the middle of compaction and there is hot data or other reasons causing slow compaction for a specific range of data, L0 data can accumulate. Since LSM Tree queries essentially involve multi-way merging, having too much L0 data can slow down query performance. RisingWave selects a portion of L0 files for merging based on certain strategies to accelerate queries, and we refer to such tasks as L0 Intra Compaction Tasks.

Since it is possible to have a small amount of written data, RisingWave calculates write amplification by considering the proportion of the largest file among the files participating in the merge. For example, if four files participate in the merge, and the largest file accounts for 50% of the total input size, we record a write amplification value of 2. This means that 100% of the computation and I/O were used to make 50% of the data unordered. We record the write amplification as 2. If three files participate in the merge, and the largest file accounts for 66.6% of the total input size, the write amplification is 3. To minimize write amplification, we currently filter out tasks in Intra Level Compaction with write amplification exceeding 3.

CONCLUSION

Hummock was designed from the ground up to be a cloud-native storage engine for streaming computations. We are constantly evolving to achieve faster calculations and lower costs. In the future, we will gradually introduce features such as local file cache Disk Cache to improve I/O efficiency and a Serverless Compaction service for automatic scaling based on loads, further reducing the cost of cloud-based streaming computing.


Introduction to Hummock Read-Write Path


In RisingWave, we have developed our own cloud-native LSM storage engine called Hummock in Rust. We use it to store the state of stateful operators in stream processing.

Similar to general LSM storage engines, data newly written to Hummock is stored in a mutable mem-table. Under specific conditions, the mutable mem-table is frozen into an immutable mem-table. Ultimately, the immutable mem-table is written to an SST (Sorted String Table) file, which is stored persistently. Simultaneously, SST is added to the overlapping L0 in the LSM's metadata. After compaction, data from the overlapping L0 is moved to the non-overlapping LSM lower levels.

Close
Featured The overall data organization from the top-level mutable mem-table to the bottom-level non-overlapping level.

Stateful operators perform get (point lookup) or iter (range query) operations on the state storage.

When handling get requests, after filtering through min-max and bloom filter, Hummock searches from the top-level mutable mem-table to the bottom-level non-overlapping levels. When it finds the corresponding key, it stops searching and returns the corresponding value.

When handling iter requests, unlike get requests, data within the given range may exist in any layer. Therefore, we need to merge the data from each layer. Each layer of data consists of several sorted data run. Both mutable mem-table and immutable mem-table are single sorted data runs in memory, while in overlapping L0, each SST itself is a single sorted data run. Finally, the SSTs in the non-overlapping levels do not overlap with each other, making each layer a single sorted data run. This allows us to perform multi-way merges on data from various layers to handle range queries.

In RisingWave, each sorted data run is abstracted as a HummockIterator. HummockIterator is a Rust trait, and each sorted structure implements this trait. The simplified definition of HummockIterator after simplification is as follows:

#[async_trait]
pub trait HummockIterator: Send + 'static {
    async fn next(&mut self) -> HummockResult<()>;

    fn key(&self) -> &[u8];
    fn value(&self) -> Option<&[u8]>;
}

#[async_trait]
impl HummockIterator for MemtableIterator {...}

#[async_trait]
impl HummockIterator for SstIterator {...}

We use a MergeIterator to perform multi-way merging of multiple HummockIterators using a heap. Since HummockIterator is a trait, and there are multiple types of sorted structures that implement this trait (such as mem-table iterators, SST iterators), and Rust is a statically-typed language, we cannot directly place multiple types in the heap. Therefore, we use Box<dyn HummockIterator> to unify multiple types of HummockIterator and obtain the following implementation of MergeIterator:

pub struct MergeIterator {
    heap: BinaryHeap<Box<dyn HummockIterator>>,
}

#[async_trait]
impl HummockIterator for MergeIterator {
    async fn next(&mut self) -> HummockResult<()> {
        if let Some(top) = self.heap.peek_mut() {
            top.next().await?
        }
        Ok(())
    }

  ...
}


Dynamic Dispatch in the Code


In the code above, dynamic dispatch is used in two places, i.e., Box<dyn ...>. One is used to unify multiple implementations of the HummockIterator trait by using Box<dyn HummockIterator>, and the other is related to the use of the #[async_trait] macro.

Since the next method may involve IO operations, such as fetching the next block from an SST, in the definition of HummockIterator, next is designed as an asynchronous method, allowing it to be suspended by the user-level scheduler while performing IO operations at the bottom of the call stack. Asynchronous methods in Rust do not immediately return their return values but instead return an anonymous type implementing the Future trait based on the specific implementation code of the method. Therefore, for two asynchronous methods with the same return type, their intermediate returned Future types differ due to the differences in the specific implementation code of the methods. However, trait objects with uncertain return types are not object-safe and cannot be used with Box<dyn ...>. The async_trait macro transforms the return values of asynchronous methods' implementations into BoxFuture using dynamic dispatch, resulting in a unified return type.

While dynamic dispatch brings convenience to the code, it can incur significant overhead in CPU-intensive scenarios like multi-way merging. Therefore, an attempt is made to replace dynamic dispatch in the code with static dispatch to reduce runtime overhead.


Optimizing Dynamic Dispatch


Initially, we attempted to remove the async_trait macro. After removing the macro, in the implementations of HummockIterator, each implementation no longer returns a unified BoxFuture but returns a type implementing the Future trait corresponding to the code of its implementation. We can view that in different implementations of HummockIterator, each has a type implementing the Future trait as the associated type within this implementation of the HummockIterator trait. Thus, we modified the trait as follows, where NextFuture is the associated type generated when implementing the next method.

pub trait HummockIterator: Send + 'static {
    type NextFuture:
        Future<Output = HummockResult<()>> + Send;

    fn next(&mut self) -> Self::NextFuture;
    fn key(&self) -> &[u8];
    fn value(&self) -> Option<&[u8]>;
}

In the implementations of HummockIterator, we can use TAIT (trait alias impl trait) to specify the type implementing the Future trait generated when implementing the next method as the associated type NextFuture of HummockIterator.

impl HummockIterator for MergeIterator {
    type NextFuture =
        impl Future<Output = HummockResult<()>>;

    fn next(&mut self) -> Self::NextFuture {
        async move {
            if let Some(top) = self.heap.peek_mut() {
                top.next().await?
            }
            Ok(())
        }
    }

    ...
}

However, this code will encounter an error in compile time:

fn next(&mut self) -> Self::NextFuture {
        |--------- hidden type `[async block@src/lib.rs:87:9: 92:10]` captures the anonymous lifetime defined here

The reason for this issue is that the self variable is used in the next implementation's Future, and therefore, it captures the lifetime of self. The error occurs because the lifetime capture is not specified in the return type. To solve this problem, we need to include lifetimes in NextFuture. At this point, we can use Rust's Generic Associated Types (GAT) to add lifetimes to the associated type.

pub trait HummockIterator: Send + 'static {
    type NextFuture<'a>:
        Future<Output = HummockResult<()>> + Send + 'a
    where Self: 'a;

    fn next(&mut self) -> Self::NextFuture<'_>;
    fn key(&self) -> &[u8];
    fn value(&self) -> Option<&[u8]>;
}

With the above modification, we can define and implement HummockIterator with asynchronous methods without using the async_trait. In our MergeIterator for multi-way merging, we can use the generic type of HummockIterator to replace the previous Box<dyn HummockIterator>.

pub struct MergeIterator<I: HummockIterator> {
    heap: BinaryHeap<I>,
}

Now, MergeIterator can only accept a single type that implements HummockIterator, but in practical applications, MergeIterator needs to accept multiple types of HummockIterator. In this case, we can manually forward different types of HummockIterator using an enum and combine them into one type as the generic parameter for MergeIterator.

pub enum HummockIteratorUnion<
    I1: HummockIterator,
    I2: HummockIterator,
    I3: HummockIterator,
> {
    First(I1),
    Second(I2),
    Third(I3),
}

impl<
    I1: HummockIterator<Direction = D>,
    I2: HummockIterator<Direction = D>,
    I3: HummockIterator<Direction = D>,
> HummockIterator for HummockIteratorUnion<I1, I2, I3>
{
    type NextFuture<'a> = impl Future<Output = HummockResult<()>> + 'a;

    fn next(&mut self) -> Self::NextFuture<'_> {
        async move {
            match self {
                First(iter) => iter.next().await,
                Second(iter) => iter.next().await,
                Third(iter) => iter.next().await,
            }
        }
    }

    ...
}

Finally, a static type of MergeIterator has a specific type:

type HummockMergeIterator = MergeIterator<
  HummockIteratorUnion<
    // For mem-table
    MemtableIterator,
    // For overlapping level SST
    SstIterator,
    // For non-overlapping level sorted runs
    ConcatIterator<SstIterator>,
  >
>;

With this, we have completed the optimization of dynamic dispatch in the code.

Time TakenReduction in Time Taken
box dyn309.58 ms0%
Single-type MergeIterator198.94 ms35.7%
Multi-type MergeIterator237.88 ms23.2%
The optimized code has achieved a significant performance improvement.


Code Simplification


In the code above, both the definition and implementation of HummockIterator require careful handling of associated types, resulting in complex code. In the latest Rust nightly version, Rust provides the impl_trait_in_assoc_type feature, which allows us to define the Future directly in the trait definition without using associated types. Additionally, if we use the async_fn_in_trait feature, we can implement asynchronous methods in a trait without enclosing the code in an async block, treating it like a regular async method. Ultimately, we can simplify the code as follows:

pub trait HummockIterator: Send + 'static {
    fn next(&mut self) ->
        impl Future<Output = HummockResult<()>> + Send + '_;
    fn key(&self) -> &[u8];
    fn value(&self) -> Option<&[u8]>;
}

impl HummockIterator for MergeIterator {
    async fn next(&mut self) -> HummockResult<()> {
        if let Some(top) = self.heap.peek_mut() {
            top.next().await?
        }
        Ok(())
    }
    ...
}

Note: If it weren't for Tokio's requirement for Future to be Send in the trait definition above, you could directly define next as async fn next(&mut self) -> HummockResult<()>;

CONCLUSION

In conclusion, we have successfully optimized the dynamic dispatch in our Rust code for the LSM-tree iterator in RisingWave’s Hummock storage engine. By transitioning from dynamic dispatch using Box<dyn HummockIterator> to static dispatch with generic types, we achieved a significant reduction in execution time.
Overall, our efforts in optimizing dynamic dispatch and simplifying the code have led to substantial performance gains, making Hummock even more efficient for handling stateful operators in stream processing.

sign up successfully_

Welcome to RisingWave community

Get ready to embark on an exciting journey of growth and inspiration. Stay tuned for updates, exclusive content, and opportunities to connect with like-minded individuals.

message sent successfully_

Thank you for reaching out to us

We appreciate your interest in RisingWave and will respond to your inquiry as soon as possible. In the meantime, feel free to explore our website for more information about our services and offerings.

subscribe successfully_

Welcome to RisingWave community

Get ready to embark on an exciting journey of growth and inspiration. Stay tuned for updates, exclusive content, and opportunities to connect with like-minded individuals.