Highlights of RisingWave v1.9: The Open-Source Streaming Database

Highlights of RisingWave v1.9: The Open-Source Streaming Database

We at RisingWave are happy to announce the release of v1.9 this month! This release is packed with updates and changes to upstream and downstream connectors, including the addition of the Snowflake sink connector. The previous s3 connector has been deprecated and now the AWS S3 source connector should be specified as s3_v2. New SQL commands and functions are also introduced, including creating subscriptions.

Due to some critical bugs found in v1.9.0, v1.9.1 is the official release. If you are looking to upgrade your clusters, please upgrade to v1.9.1.

If you are interested in the full list of v1.9 updates, see the full release note.

Recover command

Support for the RECOVER command is now available. This command triggers an ad-hoc recovery, which may be necessary when there is a high barrier latency. Note that only superusers can use the RECOVER command, limiting access to who can trigger recoveries. Furthermore, RisingWave can only recover from a committed epoch and this command does not wait for the recovery to be completed.

By triggering a recovery, you can choose to alter the streaming rate limit to ensure that RisingWave continues to run smoothly. Additionally, any DROP or CANCEL commands will take effect.

For more information, see:

Configure a parameter server-wide

To set a runtime or system parameter server-wide, you can use the ALTER SYSTEM command. By using this command, the new parameter value will be the default for every new session as well. This makes setting up each RisingWave session much easier if you prefer to run your sessions using different parameters.

For instance, the following SQL command sets the runtime parameter rw_enable_join_ordering to true.

ALTER SYSTEM SET rw_enable_join_ordering TO true;

To see all runtime parameters, use the SHOW ALL command. If you would like to check the current parameter value, use the SHOW parameter_name command.

To see all system parameters, use the SHOW PARAMETERS command. Along with all system parameters, their current values will also be shown.

For more details, see:

Temporal joins enhancement

Non-append-only temporal joins are also supported now, meaning that the outer or left side of the temporal join does not need to be an append-only source or table. The syntax used will be the same in all cases. This new improvement provides you with more flexibility on how you can join your data.

The following SQL command joins two tables using a temporal join. Let us assume that stream_source is a non-append-only source and prod is a table.

SELECT *
FROM stream_source
JOIN prod FOR SYSTEM_TIME AS OF PROC_TIME()
ON source_id = prod_id;

For more information, see:

Support subscriptions and cursors for subscriptions

In this version update, we can now create subscription queues from tables and materialized views, and cursors for subscription queues. These new features are designed to make retrieving data more convenient.

To create a subscription, use the CREATE SUBSCRIPTION command. You can also drop, alter, and show existing subscriptions. When creating a subscription, you can choose how long the incremental data will be retained and accessible by the subscription. The following SQL query creates a subscription on a table and retains the data for one day.

CREATE SUBSCRIPTION sub1 FROM tbl1 WITH (
    retention = '1D' );

Additionally, you can alter the name, owner, schema, and parallelism of a subscription.

Creating a cursor for the subscription enables you to read the query results in smaller batches by retrieving a few rows at once to avoid memory overflow. To do so, you need to create a cursor and then fetch the results from the cursor. The following SQL query creates a cursor on the subscription created above.

DECLARE cursor1 SUBSCRIPTION CURSOR FOR sub1;

Next, you can fetch results from the cursor.

FETCH NEXT FROM cursor1;

----RESULT
col1 | col2 | col3 | op | rw_timestamp
----+----+----+----+-------------------
  1  |   2  |   3  |  1 |
(1 row)

While col1, col2, and col3, are columns from the table, the op column is a special column generated when fetching from a cursor. The values from op can either be 1, 2, 3, or 4, which correlates to INSERT, DELETE, UPDATE_DELETE, and UPDATE_INSERT. UPDATE statements are transformed into UPDATE_DELETE or UPDATE_INSERT. Each time you fetch from the cursor, only a row of data from the table is returned. To see additional rows of data, the FETCH command must be called again.

For more information, see:

Time travel with Iceberg source

With Iceberg sources, you can now select data from a specific time period or version using the AS OF syntax. Since RisingWave currently only supports batch querying from an Iceberg source, this capability makes it easier to track changes to the dataset over time. Selecting data from specific versions enables reproducibility, which is critical for debugging. If there were errors detected in the data, you can easily select data from a previous version.

The following SQL query selects data from an Iceberg source from a specific snapshot ID.

SELECT * FROM iceberg_source FOR system_version AS OF 1567123456789;

This SQL query selects data after a specific timestamp from an Iceberg source.

SELECT * FROM iceberg_source FOR system_time AS OF '2006-09-21 00:00:00+00:00';

For more details, see:

New configurations for PostgreSQL and MySQL connectors

A host of new configuration options are included in this version released, most of them only applicable for MySQL and PostgreSQL CDC sources. This provides you with greater control and flexibility when creating a CDC table, allowing you to match your unique requirements and use cases. Let us go over some of the new configuration options.

Configure SSL/TLS

For MySQL and PostgreSQL CDC sources, you can now use the ssl.mode parameter to set the SSL/TLS encryption level. Configuring SSL/TLS ensures the integrity and confidentiality of the data being transmitted, protecting sensitive information from threats and attacks. Many regulatory standards and best practices utilize SSL/TLS as well, making it a crucial feature for many users.

The ssl.mode parameter accepts three inputs: disabled, preferred, and required. By default, it is set to disabled.

CREATE SOURCE pg_source WITH (
    connector = 'postgres-cdc',
    hostname = '127.0.0.1',
    port = '5432',
    username = 'user',
    password = 'password',
    database.name = 'mydb',
    slot.name = 'mydb_slot',
    ssl.mode = 'required'
);

Configure snapshots

Alongside being able to configure snapshots for CDC tables, you can now also configure the barrier interval and the batch size. When establishing a connection with a MySQL or PostgreSQL database in RisingWave, you can create a CDC source or a CDC table. A CDC source connects to an entire database, whereas a CDC table connects to a single MySQL or PostgreSQL table. The snapshot-related parameter options are only applicable when creating a table.

The new snapshot.interval parameter allows you to set the barrier interval when starting a snapshot read.

The snapshot.batch_size parameter configures the batch size for a snapshot read.

Configure timeouts

Finally, a new runtime parameter is introduced to change the timeout of CDC sources. Some CDC sources may take a while to get established due to a multitude of reasons, such as a large amount of data or a complex table schema. By default, the timeout is set to 30 seconds, which may not be enough in some edge cases. If your CDC source creation fails due to an error from the control stream, you can try increasing the timeout threshold using the runtime parameter cdc_source_wait_streaming_start_timeout.

SET cdc_source_wait_streaming_start_timeout to 90;

For more details, see:

New sink options

This release also features a new sink connector and enhancements to existing sink connectors. Sink connectors play a crucial part in streaming data pipelines, allowing seamless integration with various downstream systems. If there is a connector that you are interested in, please check out the Integrations page to see what we currently support and vote on what connectors you would like to see next. RisingWave always aims to improve its data integration capabilities, making it easy to include RisingWave in any stream processing pipeline.

Support for Snowflake sink connector

Snowflake is a cloud-native, versatile data warehousing platform that is extremely scalable. It offers a fully managed service, making it easy to store and analyze large quantities of data securely. As Snowflake only supports sinking from an external storage, the data will be uploaded to the external storage system before it can be sinked to Snowflake. Currently, RisingWave only supports Amazon S3 as an external storage.

The process to create a Snowflake sink in RisingWave is very straightforward. All that is necessary is a CREATE SINK SQL statement, granted that you have already set up your Amazon S3 bucket.

CREATE SINK snowflake_sink FROM mv WITH (
    connector = 'snowflake',
    type = 'append-only',
    snowflake.database = 'db',
    snowflake.schema = 'schema',
    snowflake.pipe = 'pipe',
    snowflake.account_identifier = '<ORG_NAME>-<ACCOUNT_NAME>',
    snowflake.user = 'user',
    snowflake.rsa_public_key_fp = 'fp',
    snowflake.private_key = 'pk',
    snowflake.s3_bucket = 's3_bucket',
    snowflake.aws_access_key_id = 'aws_id',
    snowflake.aws_secret_access_key = 'secret_key',
    snowflake.aws_region = 'region',
    snowflake.max_batch_row_num = '1030',
    snowflake.s3_path = 's3_path',
);

Support for upsert type BigQuery sinks

Previously, you could only create append-only type BigQuery sinks, meaning that only INSERT operations were delivered to the BigQuery table. But now, upsert type sinks are also supported, allowing for UPDATE and DELETE operations to also be sinked. When creating an upsert sink, additional configurations are needed in BigQuery, where corresponding permissions and primary keys must be set.

CREATE SINK big_query_sink
FROM mv
WITH (
    connector = 'bigquery',
    type = 'upsert',
    bigquery.s3.path= '${s3_service_account_json_path}',
    bigquery.project= '${project_id}',
    bigquery.dataset= '${dataset_id}',
    bigquery.table= '${table_id}',
    access_key = '${aws_access_key}',
    secret_access = '${aws_secret_access}',
    region = '${aws_region}',
);

Support for Delta Lake sinks with GCS

You can now create Delta Lake sinks with Google Cloud Storage serving as the storage location for Delta Lake files. Previously, only AWS S3 and MinIO object store were supported. As Google Cloud Storage is commonly used for data storage, this feature makes the Delta Lake sink connector more accessible. RisingWave wants to ensure hassle-free integration.

To create a Delta Lake sink with GCS, use a CREATE SINK command with the matching parameters.

CREATE SINK s1_sink FROM s1_source
WITH (
    connector = 'deltalake',
    type = 'append-only',
    location = 'gs://bucket-name/path/to/file',
    gcs.service.account = '{type: service_acct,
                                                    project_id: id,
                                                    private_key:key,
                                                      client_email: email}'
);

For more information, see:

>

These are some of the new standout features in the release v1.9. To see the entire list of updates, which includes bug fixes, additional changes to connectors, and more, please refer to the detailed release notes. > >

>

Look out for next month’s edition to see what new, exciting features will be added. Check out the RisingWave GitHub repository to stay up to date on the newest features and planned releases. > >

>

Sign up for our monthly newsletter if you’d like to keep up to date on all the happenings with RisingWave. Follow us on Twitter and LinkedIn, and join our Slack community to talk to our engineers and hundreds of streaming enthusiasts worldwide. > >

The Modern Backbone for Your
Event-Driven Infrastructure
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.