Follow along to learn more about some of the standout features in this version release. If you are interested in the full list of v2.1 updates, see the full release note.
Enhancements made to subscriptions and subscription cursors
After being introduced in v1.9, subscriptions and subscriptions cursors continues to receive significant updates to make the features more user-friendly. In this update, the observability of subscriptions and subscription cursors are improved, and subscription cursor timeouts are introduced.
Improved observability
The SHOW CURSORS
and SHOW SUBSCRIPTION CURSOR
commands have been updated to output all cursor information from all active RisingWave sessions in the same front end node. Additionally, the information returned from the commands now provides a more detailed description on each of the cursors and subscription cursors. Previously, only the names of the objects were listed. Now, the command will also output the user, host, and database of the cursor.
Cursor timeouts
You can now also configure cursors to be blocking or non-blocking by specifying the timeout
parameter when fetching from a cursor. Previously, all cursors were non-blocking. This means that even when a cursor has no new updates to return, you can still fetch from the cursor. In this case, the cursor will return an empty row.
For blocking cursors, the cursor will wait until it receives new updates or until it reaches the timeout value. This design is much better for event-driven architecture as you will not need to repeatedly fetch from cursors, saving on resource consumption. If fetching from your cursor takes too long because your subscription contains too many updates and the command timeouts, the cursor will return all values currently fetched.
The following SQL query fetches from a blocking cursor. The value for the timeout should be specified as a string in interval format. In this case, it is set to 20 seconds.
FETCH 50 FROM cursor_name WITH (timeout = '20 second');
For more details, see:
ASOF
joins
RisingWave now supports both inner and left outer ASOF
joins, which allows you to join tables on timestamp values that do not exactly match. Timestamp values rarely match perfectly in real life, so it can be difficult to merge sets of data. ASOF
joins solve this problem by allowing you to join data on timestamps that are close to each other.
When using an ASOF
join in RisingWave, exactly two conditions are required. One condition must be an inequality condition (usually on the timestamp column), and the other condition must be an equality condition. Additionally, ASOF
joins are currently only available for streaming. Support for ad-hoc queries will be introduced in a future release.
For instance, the following query performs an ASOF LEFT JOIN
on t1
and t2
based on two conditions:
t1.col_id = t2.col_id
: An equality condition that ensures rows from both tables have the samecol_id
.t1.timestamp <= t2.timestamp
: An inequality condition that matches rows wheret2.timestamp
is the closest time greater than or equal tot1.timestamp
.
SELECT
t1.col_id,
t1.timestamp,
t2.col2
FROM
t1
ASOF LEFT JOIN
t2
ON
t1.col_id = t2.col_id AND
t1.timestamp <= t2.timestamp;
For more details, see:
Shared Kafka sources
We introduce the concept of “shared sources”, which is only applicable to Kafka sources for now and will be enabled by default. This will only affect Kafka sources created after the version updated and will not affect any existing Kafka sources.
To disable this behavior, set the session variable streaming_use_shared_source
to false
.
Sources in RisingWave are not considered streaming jobs, unlike materialized views and sinks, for example. Only when a materialized view references the source, is a SourceExecutor
created, starting the process of data ingestion.
Previously, a SourceExecutor
will be created each time another streaming job references the source. But with shared sources, creating a source means that a SourceExecutor
will also be created. Any materialized views referencing the source can reuse the SourceExecutor
. Now, for each source, there will only be one SourceExecutor
.
With shared sources, resource utilization and consistency among multiple streaming jobs will improve.
For more details, see:
Schemaless ingestion
Unless you are reading a schema from a Web location or a registry, you must specify the data schema when creating a source or table in RisingWave. This is often time-consuming as data schemas can be large and complex, leading you to spend extra time checking for accuracy. But now, RisingWave allows for schemaless ingestion as long as your data is in JSON format.
This feature is currently available for data in JSON format ingested from the following sources:
Apache Kafka
Apache Pulsar
AWS Kinesis
AWS S3
Azure Blob
By including the INCLUDE payload
clause when creating a source or table, you no longer need to specify the data schema. For instance, the columns of kafka_table
will include all data from the Kafka topic, schemaless_ingestion
.
CREATE TABLE kafka_table (c1 int)
INCLUDE payload
WITH (
connector = 'kafka',
topic = 'schemaless_ingestion',
properties.bootstrap.server = 'kafka:9092'
) FORMAT PLAIN ENCODE JSON;
This makes ingesting data from sources much easier. If necessary, you can clean up the table afterward by removing unnecessary columns.
For more details, see:
PostgreSQL table-valued function
In this version update, we introduce a new table-valued function (TVF), postgres_query
. This function allows you to retrieve data from a PostgreSQL table as needed, rather than through CDC updates.
If you do not want to or need to set up a CDC source in RisingWave, this TVF function serves as a more lightweight alternative. This would be ideal in cases where your PostgreSQL only contains static data and is rarely updated. Running the function once would save on computation resources compared to creating a CDC source and table.
For instance, say you have the following table, users
, in Postgres.
first_name | last_name | age | city
-----------+-----------+-----+------
Aaron | Jones | 24 | NYC
Shelly | Doe | 36 | SF
Taylor | Smith | 52 | NYC
Using the postgres_query
function, you can query from the table. Optionally, you can create a table from what the function returns using the CREATE TABLE
function.
SELECT * FROM postgres_query(
'localhost',
'5432',
'postgres',
'postgres',
'mydb',
'SELECT * FROM users WHERE age >= 25;'
);
---
first_name | last_name | age | city
-----------+-----------+-----+------
Shelly | Doe | 36 | SF
Taylor | Smith | 52 | NYC
For more details, see:
PostgreSQL source connector updates
Partitioned tables
RisingWave supports ingesting data from a partitioned PostgreSQL table but it requires some additional setup. Table partitioning divides a large table into smaller parts based on one or more columns. For instance, a table can be partitioned based on the date column, where each partition has data from a certain time range.
First, ensure that publish_via_parition_root
is set to true
in your PostgreSQL database when creating a publication so data changes are published as if they are coming from the root table rather than individual partitions.
CREATE PUBLICATION publication_name
FOR table_name WITH
(publish_via_partition_root = true);
Then in RisingWave, separate sources with dedicated publication names must be created to ingest data from each partition.
Auto schema change
Auto schema change for Postgres CDC sources is available as a Premium Edition feature. Learn more about RisingWave Premium in Everything You Want to Know about RisingWave Premium.
RisingWave now supports automatic schema changes for Postgres CDC sources and tables, making it easy to manage your data. If the schema of the source PostgreSQL table changes, such as the addition or deletion of a column, the CDC table in RisingWave will also update to reflect the change.
To enable this feature, set the parameter auto.schema.change
to true
when creating your PostgreSQL source.
For more details, see:
WebHDFS sink
WebHDFS is a RESTful API that enables interactions with Hadoop Distributed File System (HDFS). With WebHDFS, you have the ability to access HDFS and read, write, and delete files without relying on the Hadoop cluster.
By using the CREATE SINK
command, you can sink data from RisingWave to WebHDFS directly, and HDFS indirectly.
CREATE SINK webhdfs_sink AS
SELECT * FROM mv
WITH (
connector='webhdfs',
webhdfs.path = 'test/path',
webhdfs.endpoint = 'hdfs_endpoint',
type = 'append-only',
);
For more details, see:
Conclusion
These are some of the highlight features included in v2.1. To see the entire list of updates, which includes new SQL commands and changes to existing connectors, please refer to the full release note.
Look out for next month’s edition to see what new, exciting updates will be made. Check out the RisingWave GitHub repository to stay up to date on the newest features and planned releases.
Sign up for our monthly newsletter if you’d like to keep up to date on all the happenings with RisingWave. Follow us on Twitter and LinkedIn, and join our Slack community to talk to our engineers and hundreds of streaming enthusiasts worldwide.