We at RisingWave are happy to announce the release of v1.9 this month! This release is packed with updates and changes to upstream and downstream connectors, including the addition of the Snowflake sink connector. The previous s3
connector has been deprecated and now the AWS S3 source connector should be specified as s3_v2
. New SQL commands and functions are also introduced, including creating subscriptions.
Due to some critical bugs found in v1.9.0, v1.9.1 is the official release. If you are looking to upgrade your clusters, please upgrade to v1.9.1.
If you are interested in the full list of v1.9 updates, see the full release note.
Recover command
Support for the RECOVER
command is now available. This command triggers an ad-hoc recovery, which may be necessary when there is a high barrier latency. Note that only superusers can use the RECOVER
command, limiting access to who can trigger recoveries. Furthermore, RisingWave can only recover from a committed epoch and this command does not wait for the recovery to be completed.
By triggering a recovery, you can choose to alter the streaming rate limit to ensure that RisingWave continues to run smoothly. Additionally, any DROP
or CANCEL
commands will take effect.
For more information, see:
Configure a parameter server-wide
To set a runtime or system parameter server-wide, you can use the ALTER SYSTEM
command. By using this command, the new parameter value will be the default for every new session as well. This makes setting up each RisingWave session much easier if you prefer to run your sessions using different parameters.
For instance, the following SQL command sets the runtime parameter rw_enable_join_ordering
to true
.
ALTER SYSTEM SET rw_enable_join_ordering TO true;
To see all runtime parameters, use the SHOW ALL
command. If you would like to check the current parameter value, use the SHOW parameter_name
command.
To see all system parameters, use the SHOW PARAMETERS
command. Along with all system parameters, their current values will also be shown.
For more details, see:
Temporal joins enhancement
Non-append-only temporal joins are also supported now, meaning that the outer or left side of the temporal join does not need to be an append-only source or table. The syntax used will be the same in all cases. This new improvement provides you with more flexibility on how you can join your data.
The following SQL command joins two tables using a temporal join. Let us assume that stream_source
is a non-append-only source and prod
is a table.
SELECT *
FROM stream_source
JOIN prod FOR SYSTEM_TIME AS OF PROC_TIME()
ON source_id = prod_id;
For more information, see:
Support subscriptions and cursors for subscriptions
In this version update, we can now create subscription queues from tables and materialized views, and cursors for subscription queues. These new features are designed to make retrieving data more convenient.
To create a subscription, use the CREATE SUBSCRIPTION
command. You can also drop, alter, and show existing subscriptions. When creating a subscription, you can choose how long the incremental data will be retained and accessible by the subscription. The following SQL query creates a subscription on a table and retains the data for one day.
CREATE SUBSCRIPTION sub1 FROM tbl1 WITH (
retention = '1D' );
Additionally, you can alter the name, owner, schema, and parallelism of a subscription.
Creating a cursor for the subscription enables you to read the query results in smaller batches by retrieving a few rows at once to avoid memory overflow. To do so, you need to create a cursor and then fetch the results from the cursor. The following SQL query creates a cursor on the subscription created above.
DECLARE cursor1 SUBSCRIPTION CURSOR FOR sub1;
Next, you can fetch results from the cursor.
FETCH NEXT FROM cursor1;
----RESULT
col1 | col2 | col3 | op | rw_timestamp
----+----+----+----+-------------------
1 | 2 | 3 | 1 |
(1 row)
While col1
, col2
, and col3
, are columns from the table, the op
column is a special column generated when fetching from a cursor. The values from op
can either be 1
, 2
, 3
, or 4
, which correlates to INSERT
, DELETE
, UPDATE_DELETE
, and UPDATE_INSERT
. UPDATE
statements are transformed into UPDATE_DELETE
or UPDATE_INSERT
. Each time you fetch from the cursor, only a row of data from the table is returned. To see additional rows of data, the FETCH
command must be called again.
For more information, see:
Time travel with Iceberg source
With Iceberg sources, you can now select data from a specific time period or version using the AS OF
syntax. Since RisingWave currently only supports batch querying from an Iceberg source, this capability makes it easier to track changes to the dataset over time. Selecting data from specific versions enables reproducibility, which is critical for debugging. If there were errors detected in the data, you can easily select data from a previous version.
The following SQL query selects data from an Iceberg source from a specific snapshot ID.
SELECT * FROM iceberg_source FOR system_version AS OF 1567123456789;
This SQL query selects data after a specific timestamp from an Iceberg source.
SELECT * FROM iceberg_source FOR system_time AS OF '2006-09-21 00:00:00+00:00';
For more details, see:
New configurations for PostgreSQL and MySQL connectors
A host of new configuration options are included in this version released, most of them only applicable for MySQL and PostgreSQL CDC sources. This provides you with greater control and flexibility when creating a CDC table, allowing you to match your unique requirements and use cases. Let us go over some of the new configuration options.
Configure SSL/TLS
For MySQL and PostgreSQL CDC sources, you can now use the ssl.mode
parameter to set the SSL/TLS encryption level. Configuring SSL/TLS ensures the integrity and confidentiality of the data being transmitted, protecting sensitive information from threats and attacks. Many regulatory standards and best practices utilize SSL/TLS as well, making it a crucial feature for many users.
The ssl.mode
parameter accepts three inputs: disabled
, preferred
, and required
. By default, it is set to disabled
.
CREATE SOURCE pg_source WITH (
connector = 'postgres-cdc',
hostname = '127.0.0.1',
port = '5432',
username = 'user',
password = 'password',
database.name = 'mydb',
slot.name = 'mydb_slot',
ssl.mode = 'required'
);
Configure snapshots
Alongside being able to configure snapshots for CDC tables, you can now also configure the barrier interval and the batch size. When establishing a connection with a MySQL or PostgreSQL database in RisingWave, you can create a CDC source or a CDC table. A CDC source connects to an entire database, whereas a CDC table connects to a single MySQL or PostgreSQL table. The snapshot-related parameter options are only applicable when creating a table.
The new snapshot.interval
parameter allows you to set the barrier interval when starting a snapshot read.
The snapshot.batch_size
parameter configures the batch size for a snapshot read.
Configure timeouts
Finally, a new runtime parameter is introduced to change the timeout of CDC sources. Some CDC sources may take a while to get established due to a multitude of reasons, such as a large amount of data or a complex table schema. By default, the timeout is set to 30 seconds, which may not be enough in some edge cases. If your CDC source creation fails due to an error from the control stream, you can try increasing the timeout threshold using the runtime parameter cdc_source_wait_streaming_start_timeout
.
SET cdc_source_wait_streaming_start_timeout to 90;
For more details, see:
New sink options
This release also features a new sink connector and enhancements to existing sink connectors. Sink connectors play a crucial part in streaming data pipelines, allowing seamless integration with various downstream systems. If there is a connector that you are interested in, please check out the Integrations page to see what we currently support and vote on what connectors you would like to see next. RisingWave always aims to improve its data integration capabilities, making it easy to include RisingWave in any stream processing pipeline.
Support for Snowflake sink connector
Snowflake is a cloud-native, versatile data warehousing platform that is extremely scalable. It offers a fully managed service, making it easy to store and analyze large quantities of data securely. As Snowflake only supports sinking from an external storage, the data will be uploaded to the external storage system before it can be sinked to Snowflake. Currently, RisingWave only supports Amazon S3 as an external storage.
The process to create a Snowflake sink in RisingWave is very straightforward. All that is necessary is a CREATE SINK
SQL statement, granted that you have already set up your Amazon S3 bucket.
CREATE SINK snowflake_sink FROM mv WITH (
connector = 'snowflake',
type = 'append-only',
snowflake.database = 'db',
snowflake.schema = 'schema',
snowflake.pipe = 'pipe',
snowflake.account_identifier = '<ORG_NAME>-<ACCOUNT_NAME>',
snowflake.user = 'user',
snowflake.rsa_public_key_fp = 'fp',
snowflake.private_key = 'pk',
snowflake.s3_bucket = 's3_bucket',
snowflake.aws_access_key_id = 'aws_id',
snowflake.aws_secret_access_key = 'secret_key',
snowflake.aws_region = 'region',
snowflake.max_batch_row_num = '1030',
snowflake.s3_path = 's3_path',
);
Support for upsert
type BigQuery sinks
Previously, you could only create append-only
type BigQuery sinks, meaning that only INSERT
operations were delivered to the BigQuery table. But now, upsert
type sinks are also supported, allowing for UPDATE
and DELETE
operations to also be sinked. When creating an upsert
sink, additional configurations are needed in BigQuery, where corresponding permissions and primary keys must be set.
CREATE SINK big_query_sink
FROM mv
WITH (
connector = 'bigquery',
type = 'upsert',
bigquery.s3.path= '${s3_service_account_json_path}',
bigquery.project= '${project_id}',
bigquery.dataset= '${dataset_id}',
bigquery.table= '${table_id}',
access_key = '${aws_access_key}',
secret_access = '${aws_secret_access}',
region = '${aws_region}',
);
Support for Delta Lake sinks with GCS
You can now create Delta Lake sinks with Google Cloud Storage serving as the storage location for Delta Lake files. Previously, only AWS S3 and MinIO object store were supported. As Google Cloud Storage is commonly used for data storage, this feature makes the Delta Lake sink connector more accessible. RisingWave wants to ensure hassle-free integration.
To create a Delta Lake sink with GCS, use a CREATE SINK
command with the matching parameters.
CREATE SINK s1_sink FROM s1_source
WITH (
connector = 'deltalake',
type = 'append-only',
location = 'gs://bucket-name/path/to/file',
gcs.service.account = '{type: service_acct,
project_id: id,
private_key:key,
client_email: email}'
);
For more information, see:
>
These are some of the new standout features in the release v1.9. To see the entire list of updates, which includes bug fixes, additional changes to connectors, and more, please refer to the detailed release notes. > >
>
Look out for next month’s edition to see what new, exciting features will be added. Check out the RisingWave GitHub repository to stay up to date on the newest features and planned releases. > >
>
Sign up for our monthly newsletter if you’d like to keep up to date on all the happenings with RisingWave. Follow us on Twitter and LinkedIn, and join our Slack community to talk to our engineers and hundreds of streaming enthusiasts worldwide. > >