We at RisingWave are thrilled to announce the release of v1.10 this month! This update introduces exciting new features, including user-defined aggregate functions, improvements to CDC connectors, new sink connectors, and more. Follow along to learn more about some of the standout features in this version release.
If you are interested in the full list of v1.10 updates, see the full release note.
Support for UDAF
This update allows you to create embedded user-defined aggregate functions in Python and JavaScript. These functions are defined within RisingWave and executed using an embedded interpreter. Once defined, they can be used like any built-in SQL aggregate function. You can use the CREATE AGGREGATE
SQL command to create aggregate functions. The general syntax is as follows:
CREATE AGGREGATE function_name ( argument_type [, ...] )
RETURNS return_type
LANGUAGE language_name
AS $$ function_body $$;
The body of the function can be defined in either Python or JavaScript and consists of a series of functions that return the aggregated value.
The function create_state
must be defined, which creates a new state. The state maintains the ongoing computation for the aggregate function to compute the result efficiently.
The function accumulate
must also be defined, which updates and returns the current value of the state. This function takes the state and input parameters from the aggregate function definition as parameters.
Optionally, you can include the function finish
, which returns the result of the aggregate function and must be defined with the state as an input. If this function is not defined, the function will return the current state.
You can also include the function retract
, which retracts and returns the current value of the state.
UDAFs provide more complex computing powers in RisingWave, allowing for more flexibility and customization when handling and processing data.
For more details, see:
Fetch multiple updates from cursors
In v1.9, we introduced subscripts and cursors for subscriptions, allowing you to retrieve updates made to a table or materialized view. Previously, you could only retrieve updates row by row from the cursor using the FETCH
command. Now, you can specify how many rows to retrieve from the cursor. The following SQL query retrieves the four most recent updates from the cursor cur1
.
FETCH 4 FROM cur1;
This makes it much easier to see recent changes made to your tables and materialized views.
Additionally, the column names of the result table have been updated to match the column names of the source table or materialized view. Previously, it was formatted as table_name.col_name
.
For more details, see:
Support for spill-able hash joins
To improve RisingWave’s performance when joining two large tables, we now support spill-able hash joins. Currently, RisingWave relies on hash joins, where the hash table is built in memory. Although they parallelize and scale well, a lot of memory is required to build the hash table, which may lead to an out-of-memory problem when the tables are large. Spill-able hash joins resolve this issue as it allows RisingWave to leverage the disk when the memory usage is high during a join query.
Enhancements to CDC source connectors
We continue to improve our existing CDC source connectors, providing you with a smoother streaming experience. This release contains two new features: automatic schema mapping and metadata columns. These new features do not apply to all available CDC connectors, so read on for more details.
Automatic schema mapping
RisingWave now automatically maps the schema of the upstream table to the RisingWave table when creating a MySQL or PostgreSQL CDC table. You can use *
when creating a table to ingest all columns from the source table, eliminating the need to define columns individually. However, *
cannot be used if other columns are specified in the table creation process.
Let us go over a brief example showing this process. The following SQL query connects to a MySQL database. When ingesting CDC data from MySQL or PostgreSQL, you must create a source first to connect to the database before ingesting data from individual tables.
CREATE SOURCE mysql_source WITH (
connector = 'mysql-cdc',
hostname = '127.0.0.1',
port = '3306',
username = 'root',
password = 'password',
database.name = 'mydb',
server.id = 5888
);
Next, we create a table that ingests all columns from the upstream table, tbl1
, from the MySQL database. The columns of mysql_tbl
will correspond to the columns of tbl1
.
CREATE TABLE mysql_tbl (*)
FROM mysql_source TABLE 'mydb.tbl1';
This feature makes creating CDC tables in RisingWave more efficient.
Include metadata columns
When creating a MongoDB, MySQL, or PostgreSQL CDC table, you can use the INCLUDE
clause to append metadata columns. If you need to add metadata columns to existing CDC tables, you need to recreate the table in RisingWave.
For MongoDB, MySQL, and PostgreSQL CDC tables, you can ingest upstream commit timestamps using the INCLUDE
clause. For historical data, the data is filled as 1970-01-01 00:00:00+00:00
by default.
For MongoDB, you can ingest the collection_name
using the INCLUDE
clause.
For MySQL and PostgreSQL, you can ingest the database_name
, schema_name
, and the table_name
using the INCLUDE
clause.
The syntax of the INCLUDE
clause is as follows:
INCLUDE metadata_col [AS col_name];
metadata_col
can be any of the previously mentioned metadata columns. This clause can be used when creating a table, after the table’s schema is defined.
The following SQL query ingests the metadata columns timestamp
and database_name
from a MySQL table.
CREATE TABLE tbl_meta (
id int,
name varchar,
age int
PRIMARY KEY (id)
) INCLUDE TIMESTAMP AS commit_ts
INCLUDE DATABASE_NAME AS db_name
FROM mysql_source TABLE 'mydb.tbl2';
For more details, see:
Enhancements to existing sink connectors
Default sink decoupling
Sink decoupling will be enabled by default for ClickHouse, Google Pub/Sub, Kafka, Kinesis, MQTT, NATS, and Pulsar sinks. Previously, this was only true if the sink was append-only, but this condition no longer exists. Sink decoupling inserts a buffering queue between RisingWave and the downstream system to ensure that RisingWave is not impacted by any performance issues the downstream system may experience.
If you would like to disable sink decouple, use the session variable sink_decouple
.
SET sink_decouple = false;
Checkpoint decouple option
For Delta Lake and StarRocks sinks, you can choose to decouple the downstream system’s commit from RisingWave’s commit by using the commit_checkpoint_interval
parameter. This means that instead of committing data to the downstream system at every barrier, RisingWave will commit data only when the specified checkpoint interval is reached. For instance, if the commit_checkpoint_interval
is set to five, RisingWave will commit data every five checkpoints. By doing so, query performance improves as there are fewer version of the destination table being generated.
The commit_checkpoint_interval
parameter should be specified under the WITH
options when creating a Delta Lake or StarRocks sink.
CREATE SINK s1_sink FROM s1_source
WITH (
connector = 'deltalake',
type = 'append-only',
location = 's3a://my-delta-lake-bucket/path/to/table',
s3.endpoint = '<https://s3.ap-southeast-1.amazonaws.com>',
s3.access.key = '${ACCESS_KEY}',
s3.secret.key = '${SECRET_KEY}',
commit_checkpoint_interval = 5
);
For more details, see:
New sink connectors
RisingWave continues to expand its ecosystem by adding new connectors to downstream systems. We now support sinking data to DynamoDB and Microsoft SQL Server. If you are interested in a particular connector, see our Integrations page. You can vote to show your interest in a particular connector, or ask to be notified when it becomes available.
Amazon DynamoDB
Amazon DynamoDB is a NoSQL database that is designed to handle high-volume, structured and semi-structured data. It offers consistently high performance and easy scalability. To sink data from RisingWave to a DynamoDB table, use the CREATE SINK
command. When sinking to a DynamoDB table, your RisingWave source table must have a composite primary key consisting of two columns. They should correspond to the partition and sort key defined in the DynamoDB target table.
For instance, if you would like to sink to a DynamoDB table named books_dynamo
, which has the partition key isbn
and the sort key edition
, your RisingWave table’s schema should be defined as follows:
CREATE TABLE IF NOT EXISTS books_rw (
isbn varchar,
edition int,
title varchar,
author varchar,
primary key (isbn, edition)
);
Then you can create a sink to sink data from books_rw
to books_dynamo
.
CREATE SINK dynamo_sink
FROM movies
WITH (
connector = 'dynamodb',
table = 'books_dynamo',
primary_key = 'isbn, edition',
endpoint = '<http://localhost:8000>',
region = 'region,
access_key = 'access_key',
secret_key = 'secret_key'
);
Microsoft SQL Server
Microsoft SQL Server is a powerful relational database management system, supporting a wide array of data transaction processing, business intelligence, and more. It uses T-SQL and includes tools like SQL Server Integration Services, Reporting Services, and Analysis Services. RisingWave supports sinking data to both self-hosted SQL Server and Azure SQL.
Use the CREATE SINK
command to start sinking data. The following SQL query creates a sink, sqlserver_sink
, that sinks data from the materialized view, mv1
, to the SQL Server table, sqlserver_tbl
. Since it is an upsert sink, we defined the primary keys pk1
and pk2
.
CREATE SINK sqlserver_sink
FROM mv1
WITH (
connector = 'sqlserver',
type = 'upsert',
sqlserver.host = 'sqlserver-server',
sqlserver.port = 1433,
sqlserver.user = 'user',
sqlserver.password = 'password',
sqlserver.database = 'mydb',
sqlserver.table = 'sqlserver_tbl',
primary_key = 'pk1, pk2',
);
OpenSearch
OpenSearch is an open-source search and analytics engine, designed to search, analyze, and visualize large volumes of data in real time. Derived from ElasticSearch, it is suitable for various applications, such as log and event data analysis, enterprise search, and monitoring and observability.
To sink data to OpenSearch from RisingWave, use the CREATE SINK
command.
CREATE SINK opensearch_sink
FROM table1
WITH (
connector = 'opensearch',
index = 'id1',
primary_key = 'types_id',
url = '<http://opensearch:8080>',
username = 'user',
password = 'password'
);
For more details, see:
Change in allotted reserved memory
The algorithm used to calculate the default reserve memory has been changed. Previously, 20% of the compute node’s total memory was used as reserve memory. Reserve memory provides RisingWave time to adjust its memory usage when there is an additional influx of input data. Now the reserve memory is calculated as the sum of 30% of the first 16GB and 20% of the remaining memory. This calculation method allows the reserve memory to scale accordingly based on your setup, better balancing system performance and memory utilization.
If this method does not suit your setup, you can specify the reserve memory with either the startup option --reserve-memory-bytes
or the environment variable RW_RESERVED_MEMORY_BYTES
. The reserve memory must be at least 512MB.
For more details, see:
>
These are some of the new standout features in the release v1.10. To see the entire list of updates, which includes new SQL functions and support for ingesting new data types, please refer to the detailed release notes. > >
>
Look out for next month’s edition to see what new, exciting features will be added. Check out the RisingWave GitHub repository to stay up to date on the newest features and planned releases. > >
>
Sign up for our monthly newsletter if you’d like to keep up to date on all the happenings with RisingWave. Follow us on Twitter and LinkedIn, and join our Slack community to talk to our engineers and hundreds of streaming enthusiasts worldwide. > >