Background


Egg is a program optimization library written in Rust. It leverages e-graph and equality saturation technologies to efficiently and flexibly construct custom languages and optimize them. This presentation will guide participants in implementing their first SQL language optimizer using Egg, particularly catering to young developers. The objective is to implement a range of classic optimization rules in approximately 1000 lines of code and apply optimization techniques to real TPC-H queries.

This presentation primarily encompasses the following topics:

1.Project Background and Motivation for Utilizing Egg

2.Fundamental Principles and Utilization of Egg

3.The Key Process of Implementing an SQL Optimizer Using Egg

4.Demonstration of Results and Analysis of the Advantages and Limitations of Egg


Why We Use Egg


The story begins with the RisingLight project. This is an OLAP educational database written in the Rust programming language, initiated and maintained by several students during their internship at RisingWave. The primary purpose of creating RisingLight is to provide an opportunity for interested students, engineers, and community developers to gain a deep understanding of the fundamental principles and implementations of databases. Therefore, RisingLight is designed to be simple, comprehensive, and easy to comprehend.

As an OLAP database, the query engine is a core component of it. The following diagram illustrates its overall structure:

Close
Featured The overall structure of RisingLight Query Engine

When a user inputs an SQL statement, it goes through the following steps: Parser, Binder, to create a Logical Plan, then it's fed to the Optimizer to generate a Physical Plan, which is finally handed over to the Executor for execution.

Here, we'll illustrate the relationship between an SQL statement, a query plan, and an optimized query plan through an example:

Close
Featured An example to explain the relationship between an SQL statement, a query plan, and an optimized query plan

Database optimization for SQL statements primarily falls into two main categories: Rule-Based Optimization (RBO) and Cost-Based Optimization (CBO). Both of these approaches involve transforming a specific pattern within a query plan into another pattern. The key difference lies in how they make this transformation. In RBO, it is assumed that the rewritten pattern is always better than the previous one; whereas in CBO, we need to estimate the cost of each pattern based on statistical information and ultimately select the one with the lowest cost from multiple equivalent representations.

Close
Featured Rule-Based Optimization (RBO) and Cost-Based Optimization (CBO)

RisingLight initially adopted the approach of developing its optimizer from scratch. However, over time, we gradually identified several issues associated with this approach.

Close
Featured Optimizer written in pure Rust

Firstly, despite Rust having the pattern matching syntax of "match", it is not always as intuitive and fluent for describing certain patterns. Secondly, performing local rewrites on tree-like structures composed of structs and enums was not very convenient. Lastly, we only implemented RBO and did not implement CBO. This is because CBO requires relatively complex search strategies, data structures, and dynamic programming algorithms. It necessitates a comprehensive framework, which we didn't have the resources to implement at the time.

It wasn't until one day that I came across the "egg" project within the Rust community. After reading the project's website introduction, I immediately realized that this was the Rust optimization framework I had been searching for.


The basic principles of program optimization in Egg


The project homepage of Egg provides a clear illustration of its fundamental principles and optimization process through this image.

Close
Featured The fundamental principles and optimization process of Egg

In Egg, the core data structure is called an "e-graph," where "e" stands for equivalent. The e-graph builds upon traditional graph structures but introduces the concept of "e-classes." An e-class is a collection composed of several nodes called e-nodes, representing that these nodes are logically equivalent to each other. For example, in the diagram below, "a * 2" and "a << 1" are considered equivalent expressions when 'a' is an integer, so they belong to the same e-class.

Close
Featured "e-graph",the core data structure in Egg

The process of rewriting an expression on the e-graph is illustrated in the diagram below. First, we have a rewriting rule that transforms the left-hand pattern (* ?x 2) into the right-hand pattern (<< ?x 1), where ?x represents a match for any subtree. In Egg, the process begins by matching the left-hand pattern ① within the original graph. Then, based on the right-hand pattern, a new subtree is inserted ②, and finally, their root nodes are merged (union) together ③. This completes the rewriting process for a rule.

Close
Featured The process of rewriting an expression on the e-grap

In practical optimization, we typically define a series of rewriting rules in advance and provide an initial expression. Egg's approach is to continually attempt to match various rules within the graph and insert new expressions until it can no longer expand with new nodes. This state is referred to as "Saturation ①", meaning it is saturated. After saturation, Egg uses a user-defined cost function to find the representation with the lowest cost among all possible representations. This becomes the final optimization result. This process is known as "Equality Saturation." It's worth noting that this is a cost-based optimization method, making it naturally suited for implementing CBO rules.

Close
Featured Equality Saturation

Furthermore, another powerful feature in Egg is its support for Program Analysis. It allows us to associate arbitrary data with each e-class to describe some attribute of it. (You can contemplate why it's associating with e-classes rather than e-nodes.) For example, we can define Constant Analysis, where we associate an Option<Value> with each e-class to describe whether each expression within it is a constant. As nodes are dynamically added and merged in the e-graph, the Analysis is also updated accordingly. This capability can be leveraged to implement dynamic constant propagation and constant folding optimizations.

Close
Featured Egg support for Program Analysis


Using Egg to implement an SQL optimizer


Here's an overview of the main process:

The first step when using Egg is to define the language. Egg provides the define_language! macro that allows you to specify the different types of nodes with an enum.

Close
Featured Egg provides the define_language

Here, we have defined four types of nodes:

1.Value nodes: Represent constants or variables.

2.List nodes: Represent ordered lists composed of multiple nodes.

3.Expression operations: Represent operations like addition, subtraction, multiplication, and division.

4.SQL operators: Represent the main nodes of a query plan tree.

The specific specifications for each node are defined in the comments on the right. It's worth noting that this language directly describes query plans rather than SQL statements. Therefore, you'll notice that column names have been transformed into Column IDs by the binder, for example, $1.1 represents the first column of the first table.

Once the language is defined, you can succinctly describe an expression using a Lisp-like S-expression format. In Egg, the RecExpr container type is used to store expressions. Internally, it's an array of nodes, where later nodes can reference earlier nodes, with the last one being the root node. This contiguous and compact memory layout is more efficient than recursive Box structures.

Close
Featured In Egg, the RecExpr container type is used to store expressions.

Next, we continue to use Egg's pattern matching language to define rules. First, we'll start with expression simplification rules, which are foundational rules that need to be implemented in various languages. The Egg official documentation provides many examples, and I believe that most people can understand them at a glance as they are quite intuitive.

Close
Featured Expression simplification rules

Similarly, for SQL operators, there are simple rules like merging two adjacent identical operators, swapping the order of two operators, eliminating redundant operators, and so on. In this context, an (empty) node is introduced to describe an operator that has no output.

Close
Featured Plan simplification rules

Next, let's examine a less trivial optimization, a classic RBO known as Predicate pushdown. As shown in the diagram below, the goal is to push the Filter operator down below the Join operator. This allows the Filter to be executed earlier, reducing the amount of data that needs to be processed.

Close
Featured Predicate pushdown rules

The challenge in this optimization lies in handling different scenarios for predicate expressions on the Filter operator:

1.↙Predicates that only involve columns from the left table A, which can be pushed down to the left subtree.

2.↘Predicates that only involve columns from the right table B, which can be pushed down to the right subtree.

3.□Predicates that involve columns from both left and right tables A and B, making it impossible to push down.

To implement this optimization, we define three rules:① First, we push the predicates on the Filter operator down to the Join conditions. Then, ② and ③ respectively determine whether each predicate can be pushed down to the left or right.

Here, we use conditional rewriting (if) and introduce the columns_is_subsetfunction to make these determinations.

To implement this function, we also introduce an analysis called "Column Analysis." Similar to classic liveness analysis, it associates all expression nodes with the columns they use and associates all operator nodes with the columns included in their output. This way, we can simply check if "the set of expressions is a subset of the set of operators" to determine whether pushing down is possible.

Close
Featured Column Analysis

Finally, let's take a look at CBO. One of the most classic optimizations is Join Reordering. When multiple tables are being joined, the order in which they are joined can have a significant impact on performance. Additionally, most real-world joins involve equijoins on primary keys, which can be optimized using hash tables. This optimization can reduce computational complexity by an order of magnitude compared to the original Nested Loop Join. Therefore, these two optimizations are crucial for multi-table queries.

In the implementation, we use Rule ① to achieve right rotation of the Join subtree. It works on an initial left-deep tree and explores all possible combinations. Then, we define Rules ② and ③ to match patterns of equijoins and transform such Joins into HashJoins. These rules, when combined, can generate numerous ways to join multiple tables effectively.

Close
Featured One of the most classic optimizations is Join Reordering

Finally, to guide Egg in finding the truly optimal solution among all the possible combinations, we need to define a cost function. The entire code for this function has been provided on the right. However, debugging this function can be a challenging task. Any oversight or mistake can prevent Egg from finding the desired optimal result, and pinpointing the error can be quite difficult.

Close
Featured Define a cost function

The primary limitation of Egg in this context is its lack of support for heuristic search. During the rule application process, it does not employ pruning based on the cost function. Instead, it attempts to exhaustively expand all representations before identifying the optimal solution, adhering to the original principles of Equality Saturation. As a consequence, when dealing with a substantial number of rules, Egg encounters the issue of combinatorial explosion, rendering it unable to produce reasonable results within a finite timeframe, even for moderately complex queries. To address this challenge, we have implemented a manual phased optimization approach along with multiple iterative rounds to alleviate the problem.

Close
Featured Heuristic search

The main technical points for implementing an SQL optimizer using Egg have been outlined above. The overall workflow for the refactored query engine using Egg is illustrated in the diagram below:

Close
Featured The complete pipeline with Egg

In addition to the optimizer itself, we also utilize Egg's Analysis functionality for necessary analysis and processing of query plans during the Binding and Executor generation stages, both before and after optimization. The entire expression and query plan heavily rely on Egg's data structures for representation.


Evaluation and Analysis


RisingLight conducts benchmark tests using the classic TPC-H benchmark in the OLAP domain. In this context, we selected Q5 to evaluate the performance of the new optimizer. Q5 involves joining 6 tables, which poses a significant challenge for the optimizer. Our optimizer took 39ms for optimization and 1s for execution. In comparison, DuckDB only took 5ms for optimization and 15ms for execution. This indicates that our prototype system still has a considerable gap to catch up with industry-leading performance.

Close
Featured Evaluation:TPC-H Q5

Certainly, let's break down the specific effects and contributions of each optimization:

Close
Featured Evaluation: TPC-H Q5

It's evident that without optimization, this query would be practically infeasible. Predicate pushdown significantly reduces the amount of data, hash join further reduces computational complexity, and finally, projection pushdown and column pruning (not mentioned in this text) eliminate unused data.

In conclusion, let's summarize the benefits and limitations of Egg:

Close
Featured The benefits and limitations of Egg

Egg's biggest advantage lies in introducing a domain-specific language (DSL) for describing optimization rules in a simple, intuitive, and elegant manner, reducing the complexity of rule writing to an incredible extent. Coupled with the Rust language, it also has the capability to describe complex logic. Interestingly, even though "Rewrite Everything In Rust!" has been promoted at this conference, in this case, it's more like "Rewrite Rust in Lisp." This illustrates that while Rust is an excellent language, it's not a one-size-fits-all solution. When working on programming language related tasks, using a dedicated language is often more suitable.

Because of its simplicity, Egg is well-suited for rapidly prototyping systems. I went from learning about Egg to fully implementing the optimizer in just one week, with the code totaling just over a thousand lines. I believe it's a great fit for educational projects like RisingLight. However, it's important to note that I spent an additional two weeks integrating it into RisingLight's pipeline. Transitioning an existing project to Egg would require a thorough overhaul.

Now, let's discuss some of the issues with Egg, which is why we are not currently considering using it in RisingWave:

Firstly, Egg is inherently a CBO framework, and it is not particularly friendly to scenarios that primarily require pure RBO. This is evident in the fact that you need to provide a cost function to obtain the final optimization results. Egg does not provide a straightforward method for removing the original expression after applying a rule. While some workarounds are possible, and it is theoretically feasible to implement a dedicated RBO solution independently of its Runner, there is room for improvement in this aspect.

Secondly, Egg lacks heuristic search capabilities. This makes it susceptible to the problem of combinatorial explosion when dealing with complex queries, rendering it unable to provide the optimal results within a limited timeframe. Of course, we have mentioned that multiple iterative rounds can be used to alleviate this problem.

Another somewhat tricky aspect is that Egg has essentially a dynamic type system. Note that all types of nodes are defined within the same enum, which means that, from Egg's perspective, combinations of various node types are considered valid. We cannot, for instance, dictate that children of expression nodes can only be other expressions. This leads to challenges in debugging when rules are written incorrectly, such as when parameter order is reversed.

Close
Featured A follow-up project to Egg called "egglog."

Lastly, I'd like to share some recent advancements in the field. Egg is a creation of a programming languages research group at the University of Washington. This year, they have developed a follow-up project to Egg called "egglog." As the name suggests, it combines E-graph and Datalog, creating a rather sophisticated fusion. However, Egglog has evolved into a standalone language rather than a Rust library. Consequently, it might be challenging to integrate Egglog into Rust projects. Nonetheless, it remains an area worth further exploration and experimentation.

CONCLUSION

This article provided insights into the Egg program optimization framework and discussed our experience in using it to implement an SQL optimizer. If you are interested in these topics, you are encouraged to explore further details on RisingLight or even embark on your own journey to implement an SQL optimizer through the sql-optimizer-labs repository.

I stumbled upon an intriguing big data project called Stratosphere a decade ago. What immediately captured my interest was a particular section in its introduction: the ability to initiate a cluster on a single machine and execute MapReduce-based WordCount computations with just three lines of code.

During a time dominated by Hadoop, installing and running a WordCount program would typically require several hours or even days of effort. Therefore, encountering a project that achieved the same functionality in merely three lines of code left an indelible impression on me. Motivated by this concise yet powerful approach, I delved extensively into the project and eventually became a contributor.

In the present day, the project previously known as Stratosphere has transformed and is now recognized as Apache Flink, reigning as the most popular stream processing engine in the realm of big data. However, in contrast to the early days of Stratosphere, Flink has grown into a colossal project with considerable intricacy.

Nevertheless, as someone who contributed to the initial design and development of Flink's stream processing engine, I still yearn for a user experience that embraces simplicity. My aspiration is for users to swiftly embark on their stream processing endeavors and encounter the streamlined efficiency it offers at remarkable speeds.

To uphold this belief, my team and I have crafted RisingWave, a cloud-based streaming database that furnishes users with a high-performance distributed stream processing with a PostgreSQL-like experience. In this article, I showcase how you can initiate your journey into stream processing using RisingWave with a mere four lines of code.


What is Stream Processing?


Stream processing and batch processing are the two fundamental modes of data processing. In the last two decades, stream processing systems and batch processing systems have experienced quick iterations, evolving from single-machine setups to distributed systems and adapting big data to cloud computing. Substantial architectural enhancements have been implemented in batch and stream processing systems.

Close
Featured The evolution of batch processing and stream processing systems over the last 20 years

The two primary distinctions between stream processing and batch processing are as follows:

  • Stream processing systems operate on event-driven computations, whereas batch processing systems rely on user-initiated computations.
  • Stream processing systems adopt an incremental computation model, while batch processing systems employ a full-computation model.


Regardless of whether it's stream processing or batch processing, both approaches are progressively shifting toward real-time capabilities. Batch systems are widely used in interactive analysis scenarios, while stream processing systems are extensively applied in monitoring, alerting, automation, and various other scenarios.

Close
Featured Comparison of real-time OLAP systems and real-time streaming stream processing systems


RisingWave: Stream Processing with PostgreSQL Experience


RisingWave is an open-source distributed SQL streaming database licensed under the Apache 2.0 license. It utilizes a PostgreSQL-compatible interface, allowing users to perform distributed stream processing like operating a PostgreSQL database.

RisingWave is primarily designed for two typical use cases: streaming ETL and streaming analytics.

Streaming ETL refers to the real-time ingestion of various data sources (such as OLTP databases, message queues, and file systems) into destination systems (such as OLAP databases, data warehouses, data lakes, or simply back to OLTP databases, message queues, file systems) after undergoing processing operations like joins, aggregations, groupings, windowing, and more. In this scenario, RisingWave can fully replace Apache Flink.

Close
Featured Streaming ETL use cases


Streaming analytics, on the other hand, refers to the capability of ingesting data from multiple data sources (such as OLTP databases, message queues, and file systems) and performing complex analytics (with operations like joins, aggregations, groupings, windowing, and more) before displaying results in the BI dashboards.

Users may also directly access data inside RisingWave using client libraries in different programming languages. In this scenario, RisingWave can replace a combination of Apache Flink and SQL/NoSQL databases (such as MySQL, PostgreSQL, and Cassandra, Redis).
Streaming analytical use cases.


Deploying RisingWave with 4 Lines of Code


To install and run RisingWave on a Mac, follow these three commands in the command line window (refer to our documentation if you are a Linux user.)

$ brew tap risingwavelabs/risingwave
$ brew install risingwave
$ risingwave playground


Next, open a new command line window and execute the following command to establish a connection with RisingWave:

$ psql -h localhost -p 4566 -d dev -U root


For ease of understanding, let's first try to create a table and use INSERT to add some test data. In real-world scenarios, we typically need to fetch data from the message queues or OLTP databases, which will be introduced later.

Let's create a table for web browsing records:

CREATE TABLE website_visits (
  timestamp TIMESTAMP,
  user_id VARCHAR,
  page_id VARCHAR,
  action VARCHAR
);


Next, we create a materialized view to count the number of visits, visitors, and last access time for each page. It is worth mentioning that materialized views based on streaming data are a core feature of RisingWave.

CREATE MATERIALIZED VIEW page_visits_mv AS
SELECT page_id,
       COUNT(*) AS total_visits,
       COUNT(DISTINCT user_id) AS unique_visitors,
       MAX(timestamp) AS last_visit_time
FROM website_visits
GROUP BY page_id;


We use INSERT to add some data:

INSERT INTO website_visits (timestamp, user_id, page_id, action) VALUES
  ('2023-06-13T10:00:00Z', 'user1', 'page1', 'view'),
  ('2023-06-13T10:01:00Z', 'user2', 'page2', 'view'),
  ('2023-06-13T10:02:00Z', 'user3', 'page3', 'view'),
  ('2023-06-13T10:03:00Z', 'user4', 'page1', 'view'),
  ('2023-06-13T10:04:00Z', 'user5', 'page2', 'view');


Take a look at the current results:

SELECT * from page_visits_mv;

-----Results
 page_id | total_visits | unique_visitors |   last_visit_time

---------+--------------+-----------------+---------------------

 page2   |            2 |               2 | 2023-06-13 10:04:00

 page3   |            1 |               1 | 2023-06-13 10:02:00

 page1   |            2 |               2 | 2023-06-13 10:03:00

(3 rows)


Let's insert five more rows of data:

INSERT INTO website_visits (timestamp, user_id, page_id, action) VALUES
  ('2023-06-13T10:05:00Z', 'user1', 'page1', 'click'),
  ('2023-06-13T10:06:00Z', 'user2', 'page2', 'scroll'),
  ('2023-06-13T10:07:00Z', 'user3', 'page1', 'view'),
  ('2023-06-13T10:08:00Z', 'user4', 'page2', 'view'),
  ('2023-06-13T10:09:00Z', 'user5', 'page3', 'view');


Inserting data twice is done to simulate the continuous influx of data. Let's take another look at the current results:

SELECT * FROM page_visits_mv;
-----Results
 page_id | total_visits | unique_visitors |   last_visit_time

---------+--------------+-----------------+---------------------

 page1   |            4 |               3 | 2023-06-13 10:07:00

 page2   |            4 |               3 | 2023-06-13 10:08:00

 page3   |            2 |               2 | 2023-06-13 10:09:00

(3 rows)


We can see that the results have been updated. This result automatically stays up-to-date when we are processing real-time streaming data.


Interaction with Kafka


Given that message queues are commonly used in stream data processing, let's look at how to retrieve and process data from Kafka in real-time.

If you haven't installed Kafka yet, first download the appropriate compressed package from the official website (using 3.4.0 as an example here), and then unzip it:

$ tar -xzf kafka_2.13-3.4.0.tgz
$ cd kafka_2.13-3.4.0


Now let's start Kafka.

  1. Generate a cluster UUID:
$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"


2. Format log directory:

$ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties


3. Start Kafka server:

$ bin/kafka-server-start.sh config/kraft/server.properties


After starting the Kafka server, we can create a topic:

$ bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092


Once Kafka is successfully launched, we can directly input messages from the command line.

First, run the following command to start the producer:

$ bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092


Once the '>' symbol appears, we can enter the message. To facilitate data consumption in RisingWave, we input data in JSON format:

{"timestamp": "2023-06-13T10:05:00Z", "user_id": "user1", "page_id": "page1", "action": "click"}
{"timestamp": "2023-06-13T10:06:00Z", "user_id": "user2", "page_id": "page2", "action": "scroll"}
{"timestamp": "2023-06-13T10:07:00Z", "user_id": "user3", "page_id": "page1", "action": "view"}
{"timestamp": "2023-06-13T10:08:00Z", "user_id": "user4", "page_id": "page2", "action": "view"}
{"timestamp": "2023-06-13T10:09:00Z", "user_id": "user5", "page_id": "page3", "action": "view"}


We can start a consumer to view the messages we have inputted:

$ bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092


Now let's take a look at how RisingWave retrieves data from this message queue. In this scenario, RisingWave plays the role of a message consumer. Let's switch back to the psql window and create a data source to establish a connection with the previously created topic. It's important to note that we are only establishing the connection at this stage and have yet to start consuming data.

When creating a data source, we can directly define a schema to map relevant fields from the JSON data in the streaming data. To avoid conflicts with the tables mentioned earlier, we will name the data source website_visits_stream.

CREATE source IF NOT EXISTS website_visits_stream (
	timestamp TIMESTAMP,
	user_id VARCHAR,
	page_id VARCHAR,
	action VARCHAR
	)
WITH (
	connector='kafka',
	topic='test',
	properties.bootstrap.server='localhost:9092',
	scan.startup.mode='earliest'
	)
ROW FORMAT JSON;


We must create a materialized view for RisingWave to start ingesting data and performing computations. We have created a materialized view similar to the above example for easy understanding.

CREATE MATERIALIZED VIEW visits_stream_mv AS
	SELECT page_id,
	COUNT(*) AS total_visits,
	COUNT(DISTINCT user_id) AS unique_visitors,
	MAX(timestamp) AS last_visit_time
	FROM website_visits_stream
	GROUP BY page_id;


Now we can take a look at the results:

SELECT * FROM visits_stream_mv;
-----Results

 page_id | total_visits | unique_visitors |   last_visit_time

---------+--------------+-----------------+---------------------

 page1   |            1 |               1 | 2023-06-13 10:07:00

 page2   |            3 |               2 | 2023-06-13 10:08:00

 page3   |            1 |               1 | 2023-06-13 10:09:00

(3 rows)
At this point, we have successfully retrieved data from Kafka and performed processing operations on it.


Advanced: Build a Real-time Monitoring System with RisingWave


Real-time monitoring plays a crucial role in streaming applications. By processing data in real-time, you can visualize and monitor the results as they happen. RisingWave can act as a data source, seamlessly connecting to visualization tools like Superset, Grafana, and more, allowing you to display processed metric data in real-time.

We encourage you to take on the challenge of building your own stream processing and visualization system. For specific steps, you can refer to our use case document. In this document, we showcase how RisingWave can be used to monitor and process system performance metrics, subsequently presenting them in real time through Grafana.

Although our demonstration is relatively simple, we firmly believe that with real data, and in your familiar business scenarios, you can achieve much richer and more impactful display effects.

Close
Featured Use RisingWave for monitoring and displaying real-time results in Grafana

Conclusion

RisingWave distinguishes itself with its remarkable simplicity, enabling users to engage in distributed stream processing effortlessly using SQL. In terms of performance, RisingWave outperforms stream processing platforms of the big data era, such as Apache Flink.

The extent of this performance improvement is quite noteworthy. Brace yourself for the details: stateless computing demonstrates an enhancement of around 10%-30%, while stateful computing showcases an astonishing 10X+ boost!

Keep an eye out for the upcoming performance report to delve deeper into these findings. An efficient stream processing platform should always prioritize simplicity, and RisingWave delivers precisely that and more.


This article was originally published on Medium.

Background


Social media platforms like Twitter process a high volume of messages every second. User behaviors, such as tweets, upvotes, downvotes, and comments, are recorded to serve various data requirements. Companies want to find meaning in data gathered from social media platforms to support their business decisions. To do so, they need to adopt stream processing in their tech stack. However, this can be a struggle as many stream processing frameworks are complicated to design and implement.

RisingWave is a cloud-native streaming database that uses SQL as the interface language. Its design reduces the complexity and cost of building real-time applications. RisingWave consumes streaming data, performs event-driven processing, and updates results dynamically.

Apache Pulsar is an open-source, distributed, cloud-native pub-sub messaging and streaming platform originally developed at Yahoo. It was built from the ground up to support a flexible messaging model, multi-tenancy, geo-replication, and strong durability guarantees.

RisingWave works seamlessly with Pulsar to provide a real-time data streaming and processing solution, making building and maintaining real-time applications much easier.


Overview and data model


This tutorial will use RisingWave to consume Pulsar data streams and perform data analysis. We will use sample Twitter data to query Twitter users and their posted tweets.

Below are the schemas for tweets and Twitter users. In the tweet schema, text contains the content of a tweet, and created_at contains the date and time when a tweet was posted. Hashtags will be extracted from text.

{
    "tweet": {
        "created_at": "2020-02-12T17:09:56.000Z",
        "id": "1227640996038684673",
        "text": "Doctors: Googling stuff online does not make you a doctor\n\nDevelopers: https://t.co/mrju5ypPkb",
        "lang": "English"
    },
    "author": {
        "created_at": "2013-12-14T04:35:55.000Z",
        "id": "2244994945",
        "name": "Singularity Data",
        "username": "singularitty"
    }
}


Prerequisites

  • Ensure Docker Desktop is installed in your environment and it is running (i.e., have the Docker Desktop app open) before launching the demo cluster.
  • Ensure that the PostgreSQL interactive terminal, psql, is installed in your environment.
    • To install psql on macOS, run this command: brew install postgres
    • To install psql on Ubuntu, run this command: sudo apt-get install postgresql-client


Step 1: Launch the demo cluster


First, clone the risingwave-demo repository to your environment.

git clone https://github.com/singularity-data/risingwave-demo.git

Now, navigate to the twitter directory and start the demo cluster from the docker-compose file.

cd twitter-pulsar
docker-compose up -d

A Pulsar instance and necessary RisingWave components, including frontend node, compute node, metadata node, and MinIO, will be started.

A workload generator is also packaged in the docker-compose file. It will generate random data and feed them into Pulsar.


Step 2: Connect RisingWave to the Pulsar stream


Now, connect to RisingWave so you can manage data streams and perform data analysis.

psql -h localhost -p 4566 -d dev -U root

Connect to the data stream with the following SQL statement.

CREATE SOURCE twitter (
    data STRUCT < created_at TIMESTAMP,
    id VARCHAR,
    text VARCHAR,
    lang VARCHAR >,
    author STRUCT < created_at TIMESTAMP,
    id VARCHAR,
    name VARCHAR,
    username VARCHAR,
    followers INT >
) WITH (
    connector = 'pulsar',
    pulsar.topic = 'twitter',
    pulsar.admin.url = 'http://message_queue:8080',
    pulsar.service.url = 'pulsar://message_queue:6650'
) ROW FORMAT JSON;

Note that the SQL statement uses the struct data type. It is not a standard Postgres data type but an extension we made to Postgres’s SQL dialect to process more complex schemas. The struct data type can create nested tables. Elements in a nested table need to be enclosed with angle brackets (< and >). For details about the struct data type, please see Data types.


Step 3: Define materialized views


Next, create a materialized view that stores the tweets posted by influencers with more than 5000 followers. This materialized view is a basic one. It may facilitate further pipelines that join with it to maintain a more in-depth view of the influencers.

If you get a metadata error when running the SQL statement, we suggest that you wait one minute for the stream metadata to be fully parsed, and try again.

CREATE MATERIALIZED VIEW influencer_tweets AS
SELECT
    (author).id as author_id, (data).text as tweet
FROM
    twitter
WHERE
    (author).followers > 1000
    AND (data).lang = 'English';

In this demo, we rely on the syntax of Postgres’s composite types(author).id indicates the field “id” in column “author” which is a struct type. The parentheses around “author” are required to avoid ambiguity when there is a table with the same name as the column.


Step 4: Query the results


Wait for one or two minutes for the workload generator to generate some mock data, and then you can query some tweets from the materialized view:

SELECT * FROM influencer_tweets LIMIT 5;

The results may look like this:

author_idtweet
2929346146#5th generation #Graphic Interface Now lots bread too part usually last did host you just kuban education hail are recently yet already a.
5369531922Graceful my joyously accordingly its yourself graceful hand onto should heat themselves which child off Bangladeshi why.
4868249670Across tonight ginger horror regularly these range their yourself since early why stand what who cloud yourselves listen.
4179993078Stupidly power her it from never which positively photographer weekly woman did cat first while rich.
7293315540#interactive #User-centric Mine were tomorrow does not day these factory cent monthly peacock first to about wisdom regularly cook above frequently you.

When you finish, run the following command to disconnect RisingWave.

quit

Run the following command to remove the Docker images.

docker-compose down

Summary

This tutorial shows how RisingWave can connect to a Pulsar stream to query sample tweets written by influencers. This use case is intended to be inspirational rather than complete. If you want to share your ideas about what RisingWave can do or are interested in a particular use case scenario, don’t hesitate to let us know in the RisingWave Community workspace on Slack. Please use this invitation link to join the workspace.

The amount of streaming data has grown explosively over the past few years. A lot of businesses realize that they need to move to stream processing, but they have a hard time figuring out the route to take. Most of the stream processing frameworks out there are too complex to design and implement.

RisingWave is a cloud-native streaming database that uses SQL as the interface. It is designed to reduce the complexity and cost of building real-time applications. RisingWave consumes streaming data, performs continuous queries, and updates results dynamically.

Redpanda is an Apache Kafka®-compatible streaming data platform. It was built from the ground up with performance and simplicity in mind. It requires no Zookeeper®, no JVM, and no code changes.

RisingWave works seamlessly with Redpanda to provide a real-time data streaming and processing solution that makes it so much easier to build and maintain real-time applications.


Overview


In this tutorial, you will learn how to use RisingWave to consume Redpanda data streams and perform data analysis. We will use ad impression and click events as sample data and try to count clicks of an ad within one minute after the ad was shown.

Below is the schema of the ad impression and click events:

{
  "user_id": 2926375,
  "click_timestamp": "2022-05-11 16:04:06.416369",
  "impression_timestamp": "2022-05-11 16:04:06.273401",
  "ad_id": 8184596
}

For users who are not familiar with digital advertising, an impression is counted whenever an ad is displayed within an app or on a website. impression_timestamp is the date and time when the ad was shown to a user. In the schema, impression_timestamp should be smaller (earlier) than click_timestamp to ensure that only clicks subsequent to impressions are counted.

We have set up a demo cluster specifically for the Redpanda and RisingWave stack so that you do not need to install them separately.


Prerequisites

  • Ensure you have Docker and Docker Compose installed in your environment. Note that Docker Compose is included in Docker Desktop for Windows and macOS. If you use Docker Desktop, ensure that it is running before launching the demo cluster.
  • Ensure that the PostgreSQL interactive terminal, psql, is installed in your environment.
    • To install psql on macOS, run this command: brew install postgres
    • To install psql on Ubuntu, run this command: sudo apt-get install postgresql-client


Step 1: Launch the demo cluster


First, let us clone the risingwave-demo repository to your environment.

git clone https://github.com/singularity-data/risingwave-demo.git

Now let us navigate to the ad-click directory and start the demo cluster from the docker compose file.

cd ad-click
docker-compose up -d

A Redpanda instance and necessary RisingWave components, including frontend node, compute node, metadata node, and MinIO, will be started.

A workload-generator is also packaged in the docker-compose. It will generate some random data and feed them into Redpanda.


Step 2: Connect RisingWave to the Redpanda stream


Now let us connect to RisingWave so that we can manage data streams and perform data analysis.

psql -h localhost -p 4566 -d dev -U root

Note that RisingWave can be connected via psql on port 4566 by default, while Redpanda will listens on port 29092. If you intend to ingest data directly from Redpanda, you should use port 29092 instead.

We set up the connection with a Redpanda topic with this SQL statement:

create source ad_source (
  user_id bigint,
  ad_id bigint,
  click_timestamp timestamp,
  impression_timestamp timestamp
) with (
  connector = 'kafka',
  kafka.topic = 'ad_clicks',
  kafka.brokers = 'message_queue:29092',
  kafka.scan.startup.mode = 'earliest'
) row format json;

Let us dive a little deeper into the parameters in the WITH clause:

  • connector = 'kafka': As Redpanda is Kafka-compatible, it can be connected in the same way as Kafka.
  • kafka.topic = 'user_activities': The Redpanda topic.
  • kafka.brokers = 'redpanda:29092': The addresses of the Redpanda broker.
  • kafka.scan.startup.mode = 'earliest': It means the RisingWave will start to consume data from the earliest entry in the stream. Alternatively, you can set this parameter to ‘latest`, which means RisingWave will start to consume data from the latest entry.


Step 3: Analyze the data


We’ll define a materialized view to count the clicks on each ad within one minute after the ad was shown.

With materialized views, only incremental calculations are performed each time a new event comes in, and the results are persisted right after calculations for a new event are completed.

create materialized view m_click_statistic as
select
  ad_id,
  count(user_id) as clicks_count
from
  ad_source
where
  click_timestamp is not null
  and impression_timestamp < click_timestamp
  and impression_timestamp + interval '1' minute >= click_timestamp
group by
  ad_id;

We want to make sure that only ads that have been clicked are calculated, so we limit the scope by using the click_timestamp is not null condition. Any clicks one minute after the impression are considered as non-relevant and therefore have been excluded. That is why we include the impression_timestamp + interval '1' minute >= click_timestamp condition.


Step 4: Query the results


RisingWave is designed to achieve both second-level freshness and low query-latency via pre-aggregations on streams. Downstream applications can query results at extremely short intervals if needed.

We query the results with the following statement:

select * from m_click_statistic;

The results may look like this:

ad_id | clicks_count
------+--------------
    1 | 356
    2 | 340
    3 | 319
    4 | 356
    5 | 333
    6 | 368
    7 | 355
    8 | 349
    9 | 359
(9 rows)

If you query multiple times, you will be able to see that the results are changing as new events come in. For example, if you run the query again in 10 seconds, you may get the results as follows.

ad_id | clicks_count
------+--------------
    1 | 362
    2 | 345
    3 | 325
    4 | 359
    5 | 335
    6 | 369
    7 | 360
    8 | 353
    9 | 360
(9 rows)

When you finish, run the following command to remove the Docker containers.

docker-compose down

Summary

In this tutorial, we connected RisingWave to a Redpanda stream and performed basic ad performance analysis. The use case is a bit simple and intended to be inspirational rather than complete. If you want to share your ideas about what RisingWave can do or are interested in a particular use scenario, please let us know in the RisingWave Community workspace on Slack. Please use this invitation link to join the workspace.

sign up successfully_

Welcome to RisingWave community

Get ready to embark on an exciting journey of growth and inspiration. Stay tuned for updates, exclusive content, and opportunities to connect with like-minded individuals.

message sent successfully_

Thank you for reaching out to us

We appreciate your interest in RisingWave and will respond to your inquiry as soon as possible. In the meantime, feel free to explore our website for more information about our services and offerings.

subscribe successfully_

Welcome to RisingWave community

Get ready to embark on an exciting journey of growth and inspiration. Stay tuned for updates, exclusive content, and opportunities to connect with like-minded individuals.