Happy 2024 RisingWave community! The new year brings the launch of RisingWave v1.6, which is filled with fixes, updates, and new features. There are some breaking changes included in this update, so please take note if you are currently running RisingWave. We will go more in-depth on these changes, as well as new connectors, SQL clauses, and more. Read on to learn more about some of the standout features.

If you are interested in the full list of v1.6 updates, see the release note.


Breaking changes


RisingWave v1.6 includes two breaking changes that will affect the performance of current SQL streaming jobs. Please take note of the following changes:


Correctness of SOME, ALL, ANY expressions


This update fixes the correctness of the results generated by SOME, ALL, ANY expressions. If you are running any SQL queries that employ one of these expressions, your previous output may have errors. It is also recommended to drop any existing materialized views that include these expressions and recreate them to ensure accurate results.


Data mapping change regarding timestamps for ClickHouse sinks


If you have a sink that sinks timestamp or timstamptz data from RisingWave to ClickHouse, note that timestamptz type in RisingWave can be sinked to DateTime64 type in ClickHouse. However, the timestamp type in RisingWave cannot be directly sinked. Instead, it must be converted to timestamptz first before being sinked to ClickHouse.


Sinking into tables


You can sink data from sources and tables in RisingWave into another table in RisingWave using the CREATE SINK ... INTO ... command. The behavior of this streaming job is the same as creating an external sink and the sinked-to table can maintain the snapshot consistency with the upstream table. The general syntax is outlined below.

CREATE SINK [ IF NOT EXISTS ] sink_name INTO table_name
[FROM sink_from | AS select_query]

Being able to sink data into tables provides you another method to merge data from different sources together into one table. It is more flexible to add or remove a data source to the table using the CREATE SINK ... INTO ... and DROP SINK ... statements. The results of the table are still updated continuously as new data is streamed in.

For instance, we can join data from two different Kafka topics into the same target table. Here, we sink data from the Kafka sources orders_s0 and orders_s1 into the table orders.

CREATE TABLE orders (
    id int primary key,
    price int,
    item_id int,
    customer_id int
);

CREATE source orders_s0 (
    id int primary key,
    price int,
    item_id int,
    customer_id int
) WITH ( 
    connector = 'kafka',
    topic = 'topic_0',
    ...
) FORMAT PLAIN ENCODE JSON;

CREATE source orders_s1 (
    id int primary key,
    price int,
    item_id int,
    customer_id int
) WITH ( 
    connector = 'kafka',
    topic = 'topic_1',
    ...
) FORMAT PLAIN ENCODE JSON;

CREATE SINK orders_sink0 FROM orders_s0 INTO orders;
CREATE SINK orders_sink1 FROM orders_s1 INTO orders;

Sink orders_sink0 sinks data from orders_s0 into orders and sink orders_sink1 sinks data from orders_s1 into orders. If we no longer want to sink data from orders_s0 into orders, we can drop the sink.

DROP SINK orders_sink0;

For more details see:


New connectors


Two new connectors are introduced this month, the GCS source connector and the StarRocks sink connector, providing you with more flexibility on how you build your streaming processing ecosystem. If there is a connector that you are interested in, please check out the Integrations page to see what we currently support and vote on what connectors you would like to see next.


GCS source


Google Cloud Storage is a cloud-based object storage service that allows you to store and retrieve data. Like with other source connectors in RisingWave, you can establish a connection between RisingWave and GCS to start consuming data by using the CREATE SOURCE or CREATE TABLE command.

CREATE TABLE gcs( 
    id int,
    name varchar
) WITH (
    connector = 'gcs',
    match_pattern = '%Ring%*.ndjson',
    gcs.bucket_name = 'example-source',
    gcs.credential = 'xxxxx',
    gcs.service_account = 'xxxxx'
) FORMAT PLAIN ENCODE JSON;


StarRocks sink


StarRocks, formerly Palo, is an open-source analytical database well-suited for real-time analytics and interactive query processing. To sink data from RisingWave to StarRocks, use the CREATE SINK command. Note that the StarRocks sink connector cannot sink struct and json types as StarRocks’s stream load does not support these data types.

CREATE SINK starrocks_sink
FROM mv WITH (
    connector = 'starrocks',
    type = 'append-only',
    starrocks.host = 'starrocks-fe',
    starrocks.mysqlport = '9030',
    starrocks.httpport = '8030',
    starrocks.user = 'users',
    starrocks.password = '123456',
    starrocks.database = 'demo',
    starrocks.table = 'demo_bhv_table',
    force_append_only='true'
);

For more details see:


Shared CDC source enhancements


Shared PostgreSQL source for multiple CDC tables


Last month, RisingWave introduced a more convenient method to create multiple CDC tables using the same MySQL source. This method now also works for PostgreSQL CDC sources, allowing you to easily ingest data from multiple tables located within different schemas in the same database without specifying the database credentials for each source table.

Use the CREATE SOURCE command to establish a connection with the target database and then use the CREATE TABLE command to ingest data from individual tables within the database. The database credentials only need to be specified once when establishing the connection, streamlining the process.

The following SQL query creates a CDC table in RisingWave that ingests data from the tt3 table within the public schema. pg_mydb is the name of the source created in RisingWave that connects to a PostgreSQL database.

CREATE TABLE tt3 (
    v1 integer primary key,
    v2 timestamp with time zone
) FROM pg_mydb TABLE 'public.tt3';


Multi-table transactions


Multi-table transactions for shared MySQL and PostgreSQL sources are now supported. To enable this, set transactional = true in the WITH options when creating the CDC source. This feature atomically imports a multi-table transaction from an upstream database, so materialized views based on those source tables will be updated atomically. The following SQL query creates a PostgreSQL source with transactions enabled.

CREATE SOURCE pg_cdc WITH (
    connector = 'postgres-cdc',
    hostname = '127.0.0.1',
    port = '8306',
    username = 'root',
    password = '123456',
    database.name = 'mydb',
    slot.name = 'mydb_slot',
	  transactional = 'true'
);

For more details see:


This version update provides two new ways for you to alter SSL settings during the session, granting you more flexibility.

First, we added the properties.ssl.endpoint.identification.algorithm parameter for Kafka source and sink, which allows you to bypass the verification of CA certificates. This parameter is worth looking into if you are running into a “SSL handshake failed error” when creating a Kafka source or sink.

We also introduce the RW_SSL_CERT and RW_SSL_KEY environment variables, which are used to specify the SSL certificate and the key file location for frontend nodes, respectively.

For more details see:


Temporal filter enhancements


Temporal filters selects or excludes data based on time-related criteria, such as timestamps or date ranges. These filters are invaluable when you want to create small windows for your unbounded data. RisingWave already supports temporal filters, but this update introduces some new features, which allow you to construct more complex temporal filters.

Previously, simple comparisons were supported, only allowing you to compare the data to a specific date or the current time, where NOW() could not be the upper bound.

Now, temporal filters allow for NOW() to be the upper bound condition and allows you to connect a temporal filter to a normal condition using the OR operator. However, connecting multiple temporal filters together using the OR operator is not supported.

t > NOW() - INTERVAL '1 hour' OR t IS NULL 
t + INTERVAL '5' SECOND < NOW()

More temporal filter enhancements are in the works so stay tuned for future releases!

For more details see:


SIMILAR TO clause


The SIMILAR TO clause is used to determine if the expression matches the given pattern specified in SQL regular expression by returning true or false. Metacharacters can be included within the pattern. Additionally, ESCAPE characters can be specified at the end of the clause to match a metacharacter literally. Optionally, the NOT keyword can be included to determine if the specified expression does not match the given pattern. For the full list of supported metacharacters, see the link to the documentation provided below.

The general syntax is as follows.

expression [ NOT ] SIMILAR TO sql_regex_pattern [ ESCAPE <esc_text> ]

Here is an example of how SIMILAR TO works. We check if the string “abc” matches the given SQL regular expression.

'abc' SIMILAR TO '(a|b|c)+' -> true

The SIMILAR TO clause provides you the ability to look for specific strings, allowing for more complex and nuanced querying. You can also ensure the validity of your data by checking if they match a specific pattern, as in the case of emails, phone numbers, and more.

For more details see:

Conclusion

These are just some of the new features included with the release of RisingWave v1.6. To see the entire list of updates, which includes more SQL features, connector updates, including changes to the Elasticsearch sink connector, please refer to the detailed release notes.

Look out for next month’s edition to see what new, exciting features will be added. Check out the RisingWave GitHub repository to stay up to date on the newest features and planned releases.

Sign up for our monthly newsletter if you’d like to keep up to date on all the happenings with RisingWave. Follow us on Twitter and Linkedin, and join our Slack community to talk to our engineers and hundreds of streaming enthusiasts worldwide.

Avatar

Emily Le

Developer Advocate

The Modern Backbone for Your
Event-Driven Infrastructure
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.