Highlights of RisingWave v2.2

Highlights of RisingWave v2.2

Follow along to learn more about some of the standout features in this version release. If you are interested in the full list of v2.2 updates, see the full release note.

Reusable connections for Kafka connector and schema registries

In v2.2, the old connection method used for AWS PrivateLink has been deprecated. Previously, connections were used if you want to consume messages from or deliver messages to a Kafka service located in a different VPC than the RisingWave cluster.

The updated connection mechanism enables you to reuse connector properties, such as the broker address and security protocol details, across Kafka sources, sinks, and schema registries. With this approach, you only need to specify credentials once per cluster, simplifying setup for multiple sources while enhancing security by reducing exposure to sensitive information.

To create a connection, use the CREATE CONNECTION command and specify the desired parameters.

CREATE CONNECTION kafka_conn1 WITH (
    type = 'kafka',
    properties.bootstrap.server = 'localhost:9092'
) ;

The only required parameters are type and properties.bootstrap.server, but additional parameters related to SSL/SASL authentication, private link connection, and AWS authentication are supported.

Using this connection, you can create Kafka sources, tables, or sinks. If you need to create Kafka sources that ingest data from different topics in the same cluster, this process becomes much easier.

 CREATE SOURCE kafka1 (
     id int,
     name varchar,
     email varchar,
     age int
 ) WITH (
     connector = 'kafka',
     connection = 'kafka_conn1',
     topic = 'topic1',
     scan.startup.mode='latest'
 ) FORMAT PLAIN ENCODE JSON;

Additionally, you can use the CREATE CONNECTION command to also connect to schema registries.

CREATE CONNECTION schema_1 WITH (
  type = 'schema_registry',
  schema.registry = 'http://...',
  schema.registry.username = 'superuser',
  schema.registry.password = 'pass123'
);

For more details, see:

EXPLAIN FORMAT for query optimization

The EXPLAIN command in RisingWave returns the execution plan of a SELECT statement. For particularly complex statements, this may help you figure out how to optimize the query. You can easily identify which operations are consuming the most resources and find workarounds if possible. Additionally, if your query isn’t behaving the way you expected, the EXPLAIN command helps to pinpoint errors in query design.

To improve readability, the new FORMAT option allows you to view the execution plan in various formats, such as JSON, TEXT, XML, and YAML. This flexibility ensures you can choose the format that best suits your needs.

For instance, if you use the EXPLAIN command on the following query without using the FORMAT option, you will see the unformatted output.

CREATE TABLE t(v1 int);

EXPLAIN CREATE MATERIALIZED VIEW m1 AS SELECT * FROM t;
---
StreamMaterialize { columns: [v1, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: NoCheck }
└─StreamTableScan { table: t, columns: [v1, _row_id] }

Using the FORMAT option, you can see the query execution plan in JSON format.

EXPLAIN(physical, FORMAT json) CREATE MATERIALIZED VIEW m1 AS SELECT * FROM t;
---
  {
   "name": "StreamMaterialize",
   "fields": {
     "columns": [
       "v1",
       "t._row_id(hidden)"
     ],
     "pk_columns": [
       "t._row_id"
     ],
     "pk_conflict": "NoCheck",
     "stream_key": [
       "t._row_id"
     ]
   },
   "children": [
     {
       "name": "StreamTableScan",
       "fields": {
         "columns": [
           "v1",
           "_row_id"
         ],
         "table": "t"
       },
       "children": []
     }
   ]
 }

For more details, see:

ALTER...SWAP WITH... for easy updates

The ALTER ... SWAP WITH... command enables you to seamlessly swap the definitions of two objects, ensuring uninterrupted functionality for dependent objects during updates or maintenance. This feature is ideal for transitioning between object versions without downtime.

For instance, say you have a materialized view, mv_old, with the following definition.

CREATE MATERIALIZED VIEW mv_old AS
SELECT 
    product_category, 
    SUM(order_amount) AS total_sales
FROM 
    orders
GROUP BY 
    product_category;

You want to update how total_sales is calculated but have other relations, such as materialized views and sinks, that are dependent on mv_old.

In this case, you can create a new materialized view, mv_new, then use ALTER...SWAP WITH... after to replace mv_old with mv_new without any downtime.

CREATE MATERIALIZED VIEW mv_new AS
SELECT 
    product_category, 
    SUM(order_amount - discount_amount) AS total_sales
FROM 
    orders
GROUP BY 
    product_category;

ALTER MATERIALIZED VIEW mv_old SWAP WITH mv_new;

The ALTER...SWAP WITH... command is supported for the following database objects:

  • Tables

  • Materialized views

  • Views

  • Sources

  • Sinks

  • Subscriptions

For more details, see:

Ingest data from webhooks

Ingesting data from webhook sources into RisingWave is available as a Premium Edition feature. Learn more about RisingWave Premium in Everything You Want to Know about RisingWave Premium.

RisingWave now serves as a webhook destination, allowing you to directly ingest and process external HTTP requests in real-time. With the new webhook connector, you no longer need to maintain an intermediate Kafka cluster, which creates additional pain points in a data pipeline. This enables you to easily build applications that are critical for real-time processing and automation. Webhooks are used in a variety of scenarios, such as sending alerts and notifications, synchronizing data, and trigging CI/CD pipelines.

The webhook connector in RisingWave allows you to ingest events from the following sources:

  • GitHub

  • Segment

  • HubSpot

  • Amazon EventBridge

  • RudderStack

Using the CREATE TABLE command, you can quickly start accepting webhook requests in RisingWave. In the following example, a SECRET is used to validate incoming requests.

CREATE SECRET test_secret WITH (backend = 'meta') AS 'secret_value';

CREATE TABLE wbhtable (
  data JSONB
) WITH (
  connector = 'webhook'
) VALIDATE SECRET test_secret AS secure_compare(
  headers->>'{header of signature}',
  {signature generation expressions}
);

For more details, see:

MySQL table-valued function

In this version update, a new table-valued function (TVF), mysql_query, is introduced. This function allows you to retrieve data from a MySQL table as needed, instead of creating a CDC source.

For scenarios where setting up a CDC source is unnecessary, the mysql_query table-valued function (TVF) provides a lightweight alternative. It’s particularly useful for querying static or infrequently updated MySQL tables, reducing computational overhead compared to a CDC source.

For instance, say you have the following table, users, in MySQL.

first_name | last_name | age | city
-----------+-----------+-----+------
Aaron      | Jones     | 24  | NYC
Shelly     | Doe       | 36  | SF
Taylor     | Smith     | 52  | NYC

Using the mysql_query function, you can query from the table. You can also create a table from what the function returns using the CREATE TABLE function.

SELECT * FROM mysql_query(
    'localhost',
    '3336',
    'superuser',
    'password123',
    'mydb',
    'SELECT * FROM users WHERE age >= 25;'
);
---
first_name | last_name | age | city
-----------+-----------+-----+------
Shelly     | Doe       | 36  | SF
Taylor     | Smith     | 52  | NYC

For more details, see:

New Rust PostgreSQL sink

RisingWave now features a native PostgreSQL sink connector implemented in Rust, replacing the reliance on the JDBC API. This new connector enhances performance and provides greater control over PostgreSQL connections, supporting both upsert and append-only sink types.

To use the Rust sink connector instead of the JDBC sink connector, set the streaming.developer configuration stream_switch_jdbc_pg_to_native to true.

Then you can create a Postgres sink as usual by using the CREATE SINK command.

CREATE SINK rust_pg_sink FROM sink_content WITH (
    connector = 'postgres',
    host = 'localhost',
    port = 4566,
    user = 'superuser',
    password = 'pass123',
    database = 'dev',
    table = 'sink_table',
    type = 'append_only'
);

For more details, see:

Conclusion

These are some of the highlight features included in v2.2. To see the entire list of updates, which includes updates to source and sink connectors, please refer to the full release note.

Stay tuned for next month’s updates as we continue to enhance RisingWave with new features. Visit the RisingWave GitHub repository to explore the latest developments and planned releases.

Sign up for our monthly newsletter if you’d like to keep up to date on all the happenings with RisingWave. Follow us on Twitter and LinkedIn, and join our Slack community to talk to our engineers and hundreds of streaming enthusiasts worldwide.

The Modern Backbone for Your
Event-Driven Infrastructure
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.