To achieve efficient incremental updates, every streaming task related to a materialized view maintains an internal state. RisingWave doesn't treat internal states and external materialized views as distinct. Instead, it perceives them as interconnected relational tables. This cohesive perspective allows the streaming database to access both the internal states and materialized views in a unified manner. This strategy not only improves system observability but also grants users the ability to probe into the internal state for monitoring or maintenance purposes.
Running Example
Let’s use an example to showcase how tables and materialized views work in RisingWave. Suppose we have a customer table customer
and an order table orders
. We then create materialized views with filter, aggregation, and join on these two tables.
CREATE TABLE customer (
c_custkey INTEGER,
c_name VARCHAR,
c_phone VARCHAR,
PRIMARY KEY (c_custkey)
);
CREATE TABLE orders (
o_orderkey BIGINT,
o_custkey INTEGER,
o_totalprice NUMERIC,
o_orderdate DATE,
PRIMARY KEY (o_orderkey)
);
-- Insert data into customer table
INSERT INTO customer VALUES (1, 'Alice', 1111111111);
INSERT INTO customer VALUES (2, 'Bob', 2222222222);
-- Insert data into orders table
INSERT INTO orders VALUES (1000, 1, 9.9, '2023-01-01');
INSERT INTO orders VALUES (1001, 1, 20, '2023-01-02');
INSERT INTO orders VALUES (1002, 1, 50, '2023-01-03');
INSERT INTO orders VALUES (1003, 2, 100, '2023-01-01');
INSERT INTO orders VALUES (1004, 2, 15, '2023-01-02');
INSERT INTO orders VALUES (1005, 2, 25.5, '2023-01-03');
-- Stateless Streaming Job
CREATE MATERIALIZED VIEW mv1 AS
SELECT *
FROM orders
WHERE o_totalprice > 80;
-- Aggregation State
CREATE MATERIALIZED VIEW mv2 AS
SELECT o_orderdate, COUNT(*) order_count, AVG(o_totalprice) avg_price
FROM orders
GROUP BY o_orderdate;
-- Join State
CREATE MATERIALIZED VIEW mv3 AS
SELECT *
FROM orders
JOIN customer
ON o_custkey = c_custkey;
Peeking into Internal State Tables
You can sneak a peek at all internal states by using the SHOW INTERNAL TABLES
command. Here, "states" refers to the internal states. By executing this command, we can see that the naming convention for a state follows the pattern: __internal
prefix + materialized view name and id + usage + state ID.
dev=> show internal tables;
Name
---------------------------------------------
__internal_mv2_6_hashaggresult_1005
__internal_mv3_8_hashjoinleft_1007
__internal_mv3_8_hashjoinright_1009
__internal_mv3_8_hashjoindegreeright_1010
__internal_mv3_8_hashjoindegreeleft_1008
(6 rows)
Demo Time!
Stateless Streaming Job
Let's start by examining a materialized view called mv1
, which doesn't have any associated state. We'll check the execution plan of mv1
using the EXPLAIN
command and inspect the states for each operator using the EXPLAIN (DISTSQL)
command.
The EXPLAIN
command breaks down the physical execution plan from the operator tree's perspective. On the other hand, the EXPLAIN (DISTSQL)
command unveils a lower-level execution plan from the Fragment perspective, which also includes the number of states and their positioning in operators.
dev=> explain create materialized view mv1 as select * from orders where o_totalprice > 80;
QUERY PLAN
---------------------------------------------------------------------------
StreamMaterialize { columns: [o_orderkey, o_custkey, o_totalprice, o_orderdate], pk_columns: [o_orderkey] }
└─StreamFilter { predicate: (orders.o_totalprice > 80:Int32) }
└─StreamTableScan { table: orders, columns: [o_orderkey, o_custkey, o_totalprice, o_orderdate] }
(3 rows)
dev=> explain (distsql) create materialized view mv1 as select * from orders where o_totalprice > 80;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------
Fragment 0
StreamMaterialize { columns: [o_orderkey, o_custkey, o_totalprice, o_orderdate], pk_columns: [o_orderkey] }
materialized table: 4294967294
StreamFilter { predicate: (orders.o_totalprice > 80:Int32) }
Chain { table: orders, columns: [o_orderkey, o_custkey, o_totalprice, o_orderdate] }
Upstream
BatchPlanNode
Table 4294967294 { columns: [o_orderkey, o_custkey, o_totalprice, o_orderdate], primary key: [$0 ASC], value indices: [0, 1, 2, 3], distribution key: [0] }
(9 rows)
According to the EXPLAIN
result, the plan contains three operators - StreamMaterialize
, StreamFilter
, and StreamTableScan
. Among them, StreamMaterialize
represents the materialized result of mv1
, StreamFilter
represents the filter condition, and StreamTableScan
represents the orders
table. Only the StreamMaterialize
operator itself contains materialized table: 4294967294
, corresponding to the materialized view table mv1
. As the StreamFilter
operator doesn't require any state to perform upstream message filtering, this is considered a stateless streaming job.
Aggregation State
Now, let's switch gears to a job called mv2
that involves an aggregation state. This job is responsible for counting the number of orders each day and calculating the average order price. The corresponding state is __internal_mv2_6_hashaggresult_1005
. Let's identify the operator associated with this state in the execution plan:
dev=> explain create materialized view mv2 as select o_orderdate, count(*) order_count, avg(o_totalprice) avg_price from orders group by o_orderdate;
QUERY PLAN
---------------------------------------------------------------------------
StreamMaterialize { columns: [o_orderdate, order_count, avg_price], pk_columns: [o_orderdate] }
└─StreamProject { exprs: [orders.o_orderdate, count, (sum(orders.o_totalprice) / count(orders.o_totalprice))] }
└─StreamHashAgg { group_key: [orders.o_orderdate], aggs: [count, count, sum(orders.o_totalprice), count(orders.o_totalprice)] }
└─StreamExchange { dist: HashShard(orders.o_orderdate) }
└─StreamTableScan { table: orders, columns: [o_totalprice, o_orderdate, o_orderkey] }
(5 rows)
dev=> explain (distsql) create materialized view mv2 as select o_orderdate, count(*) order_count, avg(o_totalprice) avg_price from orders group by o_orderdate;
QUERY PLAN
---------------------------------------------------------------------------
Fragment 0
StreamMaterialize { columns: [o_orderdate, order_count, avg_price], pk_columns: [o_orderdate] }
materialized table: 4294967294
StreamProject { exprs: [orders.o_orderdate, count, (sum(orders.o_totalprice) / count(orders.o_totalprice))] }
StreamHashAgg { group_key: [orders.o_orderdate], aggs: [count, count, sum(orders.o_totalprice), count(orders.o_totalprice)] }
result table: 0, state tables: []
StreamExchange Hash([1]) from 1
Fragment 1
Chain { table: orders, columns: [o_totalprice, o_orderdate, o_orderkey] }
Upstream
BatchPlanNode
Table 0 { columns: [orders_o_orderdate, count, count_0, sum(orders_o_totalprice), count(orders_o_totalprice)], primary key: [$0 ASC], value indices: [1, 2, 3, 4], distribution key: [0] }
Table 4294967294 { columns: [o_orderdate, order_count, avg_price], primary key: [$0 ASC], value indices: [0, 1, 2], distribution key: [0] }
(15 rows)
In the StreamHashAgg
operator, we can observe a state table, which corresponds to the previously mentioned __internal_mv2_6_hashaggresult_1005
.
Next, let’s probe into the data in __internal_mv2_6_hashaggresult_1005
by executing the DESCRIBE
statement. This will reveal the primary key of this state table, which turns out to be the orders_o_orderdate
column.
dev=> describe __internal_mv2_6_hashaggresult_1005;
Name | Type
----------------------------+--------------------
orders_o_orderdate | Date
count | Int64
count_0 | Int64
sum(orders_o_totalprice) | Decimal
count(orders_o_totalprice) | Int64
primary key | orders_o_orderdate
(6 rows)
dev=> select * from __internal_mv2_6_hashaggresult_1005;
orders_o_orderdate | count | count_0 | sum(orders_o_totalprice) | count(orders_o_totalprice)
--------------------+-------+---------+--------------------------+---------
2023-01-03 | 2 | 2 | 75.5 | 2
2023-01-02 | 2 | 2 | 35 | 2
2023-01-01 | 2 | 2 | 109.9 | 2
(3 rows)
By observing the state
table's column names and the execution plan, we can infer how the state
is maintained. For each row in the orders
table, grouping is first performed based on the o_orderdate
column, and then aggregates are computed for each group. To calculate the count
, we simply increment the count
column for the corresponding group by 1. To calculate avg
, the state maintains sum(orders_o_totalprice)
and count(orders_o_totalprice)
columns, representing the total price and number of orders, respectively. Finally, through the division operation in the Project
operator, the state fully computes the final avg
and writes the result to the mv2
table.
Join State
Our final example explores the join state. The materialized view mv3
merges the two tables into a single wide table through a join operation. The join states are: __internal_mv3_8_hashjoinleft_1007
, __internal_mv3_8_hashjoinright_1009
, __internal_mv3_8_hashjoindegreeright_1010
, and __internal_mv3_8_hashjoindegreeleft_1008
. The ones with "degree" in the name are for boundary handling of null values when processing outer joins. They are not used in this example, so we only need to focus on __internal_mv3_8_hashjoinleft_1007
and __internal_mv3_8_hashjoinright_1009
.
dev=> explain create materialized view mv3 as select * from orders join customer on o_custkey = c_custkey;
QUERY PLAN
---------------------------------------------------------------------------
StreamMaterialize { columns: [o_orderkey, o_custkey, o_totalprice, o_orderdate, c_custkey, c_name, c_phone], pk_columns: [o_orderkey, c_custkey, o_custkey] }
└─StreamHashJoin { type: Inner, predicate: orders.o_custkey = customer.c_custkey }
├─StreamExchange { dist: HashShard(orders.o_custkey) }
| └─StreamTableScan { table: orders, columns: [o_orderkey, o_custkey, o_totalprice, o_orderdate] }
└─StreamExchange { dist: HashShard(customer.c_custkey) }
└─StreamTableScan { table: customer, columns: [c_custkey, c_name, c_phone] }
(6 rows)
dev=> explain (distsql) create materialized view mv3 as select * from orders join customer on o_custkey = c_custkey;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Fragment 0
StreamMaterialize { columns: [o_orderkey, o_custkey, o_totalprice, o_orderdate, c_custkey, c_name, c_phone], pk_columns: [o_orderkey, c_custkey, o_custkey] }
materialized table: 4294967294
StreamHashJoin { type: Inner, predicate: orders.o_custkey = customer.c_custkey }
left table: 0, right table 2, left degree table: 1, right degree table: 3,
StreamExchange Hash([1]) from 1
StreamExchange Hash([0]) from 2
Fragment 1
Chain { table: orders, columns: [o_orderkey, o_custkey, o_totalprice, o_orderdate] }
Upstream
BatchPlanNode
Fragment 2
Chain { table: customer, columns: [c_custkey, c_name, c_phone] }
Upstream
BatchPlanNode
Table 0 { columns: [orders_o_orderkey, orders_o_custkey, orders_o_totalprice, orders_o_orderdate], primary key: [$1 ASC, $0 ASC], value indices: [0, 1, 2, 3], distribution key: [1] }
Table 1 { columns: [orders_o_custkey, orders_o_orderkey, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] }
Table 2 { columns: [customer_c_custkey, customer_c_name, customer_c_phone], primary key: [$0 ASC], value indices: [0, 1, 2], distribution key: [0] }
Table 3 { columns: [customer_c_custkey, _degree], primary key: [$0 ASC], value indices: [1], distribution key: [0] }
Table 4294967294 { columns: [o_orderkey, o_custkey, o_totalprice, o_orderdate, c_custkey, c_name, c_phone], primary key: [$0 ASC, $4 ASC, $1 ASC], value indices: [0, 1, 2, 3, 4, 5, 6], distribution key: [1] }
(23 rows)
From the above execution plan, we can see the states all come from the StreamHashJoin
operator. Next, let's query the content of the states.
dev=> describe __internal_mv3_8_hashjoinleft_1007;
Name | Type
---------------------+-------------------------------------
orders_o_orderkey | Int64
orders_o_custkey | Int32
orders_o_totalprice | Decimal
orders_o_orderdate | Date
primary key | orders_o_custkey, orders_o_orderkey
(5 rows)
dev=> select * from __internal_mv3_8_hashjoinleft_1007;
orders_o_orderkey | orders_o_custkey | orders_o_totalprice | orders_o_orderdate
-------------------+------------------+---------------------+--------------------
1003 | 2 | 100 | 2023-01-01
1004 | 2 | 15 | 2023-01-02
1005 | 2 | 25.5 | 2023-01-03
1000 | 1 | 9.9 | 2023-01-01
1001 | 1 | 20 | 2023-01-02
1002 | 1 | 50 | 2023-01-03
(6 rows)
dev=> describe __internal_mv3_8_hashjoinright_1009;
Name | Type
--------------------+--------------------
customer_c_custkey | Int32
customer_c_name | Varchar
customer_c_phone | Varchar
primary key | customer_c_custkey
(4 rows)
dev=> select * from __internal_mv3_8_hashjoinright_1009;
customer_c_custkey | customer_c_name | customer_c_phone
--------------------+-----------------+------------------
2 | Bob | 222222222
2
1 | Alice | 1111111111
(2 rows)
If you only look at the table content, you will find they are just the inserted data from the left and right tables. Looking more closely, you will notice the primary key of __internal_mv3_8_hashjoinleft_1007
has become a composite primary key of two columns orders_o_custkey,orders_o_orderkey
, different from the input orders table's primary key. This is because during the join process in stream processing, the state needs to be constantly queried, and the access pattern is a query on the join key.
Therefore, to improve the performance of accessing the state, the primary key of the state will use o_custkey
as a prefix. This way, whenever the customer table is updated, the join operator can use c_custkey
to query the corresponding orders in the state on the orders side, which triggers an update.
More
In the above examples, the demonstration of the queryable state is only a static display after data insertion. Interested readers can try inserting/updating/deleting on the orders and customer tables separately to observe the changes in different streaming job states.
In addition to the agg and join operators, states also exist in TopN, source, and other operators. Even in the same operator, different optimizations can produce different states. For example, SimpleAgg has a two-phase optimization that requires a more complex state to handle aggregates like max/min. We have previously introduced delta join in streaming databases. One of its features is reusing existing indexes as states, so delta join itself does not maintain an additional internal state. This can be verified through the queryable state feature.
In stream processing, state is a very important intermediate result. This article introduces the queryable state feature in RisingWave and demonstrates stateless streaming jobs, aggregation states, and join states through a running example. Queryable state allows users to easily view what states each streaming job has and query the specific content of the states. Additionally, queryable state allows us to understand the internal implementation of stream processing technologies in depth. > >
If you are interested in stream processing technologies, you can deepen your learning by exploring RisingWave's source code and using the queryable state feature. Stay tuned for our upcoming articles filled with juicy technical details about queryable state. Get ready to level up your stream processing skills! > >
RisingWave is a high-performance, flexible, and easy-to-use streaming database that supports various computation models and operators like stream processing, window processing, and state processing. > >