We at RisingWave are thrilled to announce the release of v2.1 this month! This update introduces exciting new features, including updates to subscriptions, schemaless ingestion from Kafka sources, and ASOF joins. Additionally, starting from v2.1 the etcd metastore is no longer supported. We encourage users who previously utilized etcd to migrate to a SQL backend. If you would like some guidance with this process, refer to the migration guide or connect with us on Slack.

Follow along to learn more about some of the standout features in this version release. If you are interested in the full list of v2.1 updates, see the full release note.


Enhancements made to subscriptions and subscription cursors


After being introduced in v1.9, subscriptions and subscriptions cursors continues to receive significant updates to make the features more user-friendly. In this update, the observability of subscriptions and subscription cursors are improved, and subscription cursor timeouts are introduced.


Improved observability


The SHOW CURSORS and SHOW SUBSCRIPTION CURSOR commands have been updated to output all cursor information from all active RisingWave sessions in the same front end node. Additionally, the information returned from the commands now provides a more detailed description on each of the cursors and subscription cursors. Previously, only the names of the objects were listed. Now, the command will also output the user, host, and database of the cursor.


Cursor timeouts


You can now also configure cursors to be blocking or non-blocking by specifying the timeout parameter when fetching from a cursor. Previously, all cursors were non-blocking. This means that even when a cursor has no new updates to return, you can still fetch from the cursor. In this case, the cursor will return an empty row.

For blocking cursors, the cursor will wait until it receives new updates or until it reaches the timeout value. This design is much better for event-driven architecture as you will not need to repeatedly fetch from cursors, saving on resource consumption. If fetching from your cursor takes too long because your subscription contains too many updates and the command timeouts, the cursor will return all values currently fetched.

The following SQL query fetches from a blocking cursor. The value for the timeout should be specified as a string in interval format. In this case, it is set to 20 seconds.

FETCH 50 FROM cursor_name WITH (timeout = '20 second');

For more details, see:


ASOF joins


RisingWave now supports both inner and left outer ASOF joins, which allows you to join tables on timestamp values that do not exactly match. Timestamp values rarely match perfectly in real life, so it can be difficult to merge sets of data. ASOF joins solve this problem by allowing you to join data on timestamps that are close to each other.

When using an ASOF join in RisingWave, exactly two conditions are required. One condition must be an inequality condition (usually on the timestamp column), and the other condition must be an equality condition. Additionally, ASOF joins are currently only available for streaming. Support for ad-hoc queries will be introduced in a future release.

For instance, the following query performs an ASOF LEFT JOIN on t1 and t2 based on two conditions:

  • t1.col_id = t2.col_id: An equality condition that ensures rows from both tables have the same col_id.
  • t1.timestamp <= t2.timestamp: An inequality condition that matches rows where t2.timestamp is the closest time greater than or equal to t1.timestamp.
SELECT 
  t1.col_id,
  t1.timestamp,
  t2.col2
FROM 
  t1
ASOF LEFT JOIN 
  t2
ON 
   t1.col_id = t2.col_id AND
   t1.timestamp <= t2.timestamp;

For more details, see:


Shared Kafka sources


We introduce the concept of “shared sources”, which is only applicable to Kafka sources for now and will be enabled by default. This will only affect Kafka sources created after the version updated and will not affect any existing Kafka sources.

To disable this behavior, set the session variable streaming_use_shared_source to false.

Sources in RisingWave are not considered streaming jobs, unlike materialized views and sinks, for example. Only when a materialized view references the source, is a SourceExecutor created, starting the process of data ingestion.

Previously, a SourceExecutor will be created each time another streaming job references the source. But with shared sources, creating a source means that a SourceExecutor will also be created. Any materialized views referencing the source can reuse the SourceExecutor. Now, for each source, there will only be one SourceExecutor.

With shared sources, resource utilization and consistency among multiple streaming jobs will improve.

For more details, see:


Schemaless ingestion


Unless you are reading a schema from a Web location or a registry, you must specify the data schema when creating a source or table in RisingWave. This is often time-consuming as data schemas can be large and complex, leading you to spend extra time checking for accuracy. But now, RisingWave allows for schemaless ingestion as long as your data is in JSON format.

This feature is currently available for data in JSON format ingested from the following sources:

  • Apache Kafka
  • Apache Pulsar
  • AWS Kinesis
  • AWS S3
  • Azure Blob

By including the INCLUDE payload clause when creating a source or table, you no longer need to specify the data schema. For instance, the columns of kafka_table will include all data from the Kafka topic, schemaless_ingestion.

CREATE TABLE kafka_table (c1 int)
INCLUDE payload
WITH (
    connector = 'kafka',
    topic = 'schemaless_ingestion',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;

This makes ingesting data from sources much easier. If necessary, you can clean up the table afterward by removing unnecessary columns.

For more details, see the JSON example from:


PostgreSQL table-valued function


In this version update, we introduce a new table-valued function (TVF), postgres_query. This function allows you to retrieve data from a PostgreSQL table as needed, rather than through CDC updates.

If you do not want to or need to set up a CDC source in RisingWave, this TVF function serves as a more lightweight alternative. This would be ideal in cases where your PostgreSQL only contains static data and is rarely updated. Running the function once would save on computation resources compared to creating a CDC source and table.

For instance, say you have the following table, users, in Postgres.

first_name | last_name | age | city
-----------+-----------+-----+------
Aaron      | Jones     | 24  | NYC
Shelly     | Doe       | 36  | SF
Taylor     | Smith     | 52  | NYC       

Using the postgres_query function, you can query from the table. Optionally, you can create a table from what the function returns using the CREATE TABLE function.

SELECT * FROM postgres_query(
	'localhost',
	'5432',
	'postgres',
	'postgres',
	'mydb',
	'SELECT * FROM users WHERE age >= 25;'
);
---
first_name | last_name | age | city
-----------+-----------+-----+------
Shelly     | Doe       | 36  | SF
Taylor     | Smith     | 52  | NYC   

For more details, see:


PostgreSQL source connector updates


Partitioned tables


RisingWave supports ingesting data from a partitioned PostgreSQL table but it requires some additional setup. Table partitioning divides a large table into smaller parts based on one or more columns. For instance, a table can be partitioned based on the date column, where each partition has data from a certain time range.

First, ensure that publish_via_parition_root is set to true in your PostgreSQL database when creating a publication so data changes are published as if they are coming from the root table rather than individual partitions.

CREATE PUBLICATION publication_name
FOR table_name WITH 
	(publish_via_partition_root = true);

Then in RisingWave, separate sources with dedicated publication names must be created to ingest data from each partition.


Auto schema change


Auto schema change for Postgres CDC sources is available as a Premium Edition feature. Learn more about RisingWave Premium in Everything You Want to Know about RisingWave Premium.

RisingWave now supports automatic schema changes for Postgres CDC sources and tables, making it easy to manage your data. If the schema of the source PostgreSQL table changes, such as the addition or deletion of a column, the CDC table in RisingWave will also update to reflect the change.

To enable this feature, set the parameter auto.schema.change to true when creating your PostgreSQL source.

For more details, see:


WebHDFS sink


WebHDFS is a RESTful API that enables interactions with Hadoop Distributed File System (HDFS). With WebHDFS, you have the ability to access HDFS and read, write, and delete files without relying on the Hadoop cluster.

By using the CREATE SINK command, you can sink data from RisingWave to WebHDFS directly, and HDFS indirectly.

CREATE SINK webhdfs_sink AS 
SELECT * FROM mv
WITH (
	  connector='webhdfs',
    webhdfs.path = 'test/path',
    webhdfs.endpoint = 'hdfs_endpoint',
    type = 'append-only',
);

For more details, see:

Conclusion

These are some of the highlight features included in v2.1. To see the entire list of updates, which includes new SQL commands and changes to existing connectors, please refer to the full release note.

Look out for next month’s edition to see what new, exciting updates will be made. Check out the RisingWave GitHub repository to stay up to date on the newest features and planned releases.

Sign up for our monthly newsletter if you’d like to keep up to date on all the happenings with RisingWave. Follow us on Twitter and LinkedIn, and join our Slack community to talk to our engineers and hundreds of streaming enthusiasts worldwide.

Avatar

Emily Le

Developer Advocate

The Modern Backbone for Your
Event-Driven Infrastructure
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.