If you are interested in the full list of v1.7 updates, see the release note.
Adaptive parallelism
Adaptive parallelism will be available by default for all new streaming jobs from v1.7 and on. This means that if additional nodes or CPUs are added to the cluster, RisingWave will automatically adjust the parallelism to take advantage of the added resources. Adaptive parallelism allows for a smoother experience in scaling. However, you can choose to disable this option for the entire session by setting disable_automatic_parallelism_control
to true
for maximal readiness and stability.
If necessary, you can set parallelism to be fixed for a table, materialized view, or sink. This enables you to throttle stream bandwidth and have a predictable resource allocation.
ALTER TABLE table_name SET PARALLELISM = 8;
For more details, see:
Read all columns from an external source with schema
When creating a table or source in RisingWave that reads data from an external source, the schema must be defined, meaning each column name and data type is specified. The only exception is when the data is in Protobuf format. When there are a lot of columns, it can be laborious to define each column. With this update, you can use *
to indicate that all columns from the schema registry should be included within the table or source. Additional generated columns can be included as well. Remember that this method only works if an external schema registry is defined.
CREATE TABLE from_kafka (
*,
gen_i32_field int AS int32_field + 2)
WITH (
connector = 'kafka',
topic = 'test',
properties.bootstrap.server = 'message_queue:29092')
FORMAT UPSERT ENCODE AVRO (
schema.registry = 'http://message_queue:8081');
For more details, see:
INCLUDE
clause
The INCLUDE
clause for CREATE SOURCE
and CREATE TABLE
is now supported, allowing you to add message components as additional columns that are not part of the payload. For instance, the INCLUDE
clause can add columns like the message key, timestamp, or topic header, for further analysis. You can add a timestamp column when creating a table or source like so.
CREATE SOURCE s_name(
...
) INCLUDE timestamp AS include_ts
WITH (...)
FORMAT ... ENCODE ...;
It is optional to specify an alias using AS
, but if one is not specified, the column name will be automatically generated based on the type of connector used. So if this source connects to a Kafka message queue, the generated column name would be _rw_kafka_timestamp
.
There are options to include a key, partition, offset, timestamp, or header when ingesting data from data streams.
INCLUDE {key | partition | offest | timestamp | header} [AS col_name]
Furthermore, for UPSERT
types of sources and tables, INCLUDE KEY
is required as RisingWave will use this column to perform upsert semantics. A primary key cannot be defined as multiple columns in this case.
For more details, see:
Improved UDF support
This version of RisingWave brings improved support for user-defined functions (UDF). UDFs provide more flexibility and control when it comes to data transformations and computations. You can choose to define external UDFs using a programming language, or SQL UDFs within RisingWave. Once a UDF is created, you can use them in SQL queries like any other built-in function. This update supports external UDFs defined in JavaScript and Rust, and SQL UDFs.
JavaScript UDFs
You can create JavaScript UDFs in RisingWave by using the CREATE FUNCTION
command. Compared to defining UDFs in other languages, creating JavaScript UDFs is the most straightforward as an embedded QuickJS virtual machine is in RisingWave. There is no need for additional setup on your local machine nor do you need to define the functions in a separate file. JavaScript UDFs are limited to computational tasks only, making them perfect if your data needs simple but repetitive transformations.
CREATE FUNCTION gcd(a int, b int) RETURNS int LANGUAGE javascript AS $$
if(a == null || b == null) {
return null;
}
while (b != 0) {
let t = b;
b = a % b;
a = t;
}
return a;
$$;
Rust UDFs
UDFs in RisingWave can also be created using Rust. You can choose to either embed WebAssembly (WASM) binaries into SQL using base64 encoding or load WASM binaries from the object store. UDFs defined in Rust are beneficial in that they have better performance in comparison to UDFs defined in Python and Java but are limited to computational tasks. Unlike UDFs created using JavaScript, there is an additional setup required for Rust. See the documentation linked below for a full guide.
SQL UDFs
UDFs can also be defined using SQL in RisingWave internally. With this, you can encapsulate commonly used logic and abstract complex operations and calculations into reusable functions, improving the readability of your queries. Furthermore, you can call on built-in SQL functions and pre-defined SQL UDFs when creating a UDF. However, recursion is currently not supported.
Here is a simple SQL UDF that adds two given parameters. You can choose to either name the input parameters or leave them anonymous. This example has unnamed parameters.
CREATE FUNCTION add(INT, INT)
RETURNS int
LANGUAGE SQL
AS $$select $1 + $2$$;
SELECT add(1, -1);
----RESULT
0
For more details, see:
Supports cryptographic functions
Cryptographic functions are algorithms that transform the given input into an unreadable format so it be secured and protected. The encrypted data can be decrypted back into its original form using the specific key or algorithm. RisingWave now allows you to encrypt and decrypt data using the raw encryption functions encrypt
and decrypt
. These functions apply a cipher based on the given algorithm to the given input but do not provide additional security measures.
encrypt(data bytea, key bytea, type text) -> bytea
decrypt(data bytea, key bytea, type text) -> bytea
The input data is encrypted or decrypted based on the given type
, which is composed of an algorithm, and optionally, a mode and padding. For the full list of supported options, see the documentation linked below.
algorithm [ - mode ] [ /pad: padding ]
Here is an example of encrypting the message “Hello, World!”. The algorithm is aes
, the mode is cbc
, and the padding is pkcs
.
SELECT encrypt('Hello, World!', 'my_secret_key', 'aes-cbc/pad:pkcs');
----RESULT
330317204357327367206241253024303013215030231257
(1 row)
For more details, see:
Enhancements for CDC connectors
This month, a few enhancements have been introduced for the MySQL and PostgreSQL CDC connectors.
First, the default value of the transactional
parameter has been changed to true
when creating shared MySQL and PostgreSQL sources. If creating a CDC table, the default value is false
. This parameter allows you to enable or disable transactions for the CDC table or source.
The snapshot
parameter is also now available when creating MySQL and PostgreSQL CDC tables. By setting this parameter to false
, the CDC table in RisingWave will only consume upstream events that have occurred after the table was created. Any existing data within the source MySQL or PostgreSQL table will not be present in the RisingWave table.
CREATE TABLE orders_no_backfill (
order_id int,
order_date date,
customer_name string,
PRIMARY KEY (order_id)
) WITH (
snapshot = 'false'
) FROM pg_source TABLE 'public.orders_tx';
For more details, see:
>
These are just some of the new features included with the release of RisingWave v1.7. To see the entire list of updates, which also contains new functions and cluster configuration options, please refer to the detailed release notes. > >
>
Look out for next month’s edition to see what new, exciting features will be added. Check out the RisingWave GitHub repository to stay up to date on the newest features and planned releases. > >
>
Sign up for our monthly newsletter if you’d like to keep up to date on all the happenings with RisingWave. Follow us on Twitter and LinkedIn, and join our Slack community to talk to our engineers and hundreds of streaming enthusiasts worldwide. > >