With the exponential growth in data generation through various events, the significance of real-time analytics has become increasingly evident. However, what exactly does real-time analytics involve? Is it merely faster data analysis? Can fast historical data analysis be considered real-time analytics?

The key point is not to analyze data fast, but to analyze fresh and well-formatted data fast. That means data needs to be ingested and transformed in real time before analytics is performed.

To achieve seamless real-time data ingestion, transformation, and analytics, a powerful combination to explore is RisingWave and ClickHouse. RisingWave is a PostgreSQL-compatible database specifically designed for stream processing. It excels at ingesting real-time data streams, performing diverse transformations, and enabling instant querying of results. ClickHouse® is a high-performance, column-oriented SQL database management system (DBMS) purpose-built for efficient online analytical processing (OLAP) of large data volumes and complex analytical queries.

By utilizing RisingWave's real-time data transformation capabilities, you can preprocess and enhance data prior to ingestion into ClickHouse, ensuring that the data aligns with your precise analytical requirements when it reaches ClickHouse.


Use case: Enrich cart events with product details


In this blog post, I’ll use an online retail example to show you how to build a system that ingests, transforms, and analyzes data in real time.

We have a stream of data for recording events that are triggered when customers add merchandise to their cart.

Typical cart events look like this:

(customerId, eventTime, itemId)
--------------------------------
("1234","2023-02-01 10:01:00","P001")
("1234","2023-02-01 10:05:00","P002")

As there is no product information in this stream, it’s hard to do analysis directly based only on this stream. There are several ways to make this stream useful. We can join it with the orders stream to analyze items that have been added to card but not purchased, or join it with a product catalogue table to form an enriched stream.

In our demo, we’ll join this stream with the product catalog table, and output the enriched stream to ClickHouse for further analysis.

The product_catalog table may look like this:

 itemId, name, price, category
-------------------------------------
 ("P001","Red T-Shirt",9.99,"Apparel"),
  ("P002","Blue Jeans",39.95,"Apparel")


Architecture


We use RisingWave to do real-time data ingestion and enrichment. We then output the enriched data to ClickHouse, so that further analysis can be performed.

Close
Featured Real-time data ingestion and enrichment flow.


Preparation

  • Ensure psql is installed. To see how to install psql without the rest of the PostgreSQL component, see Install psql without PostgreSQL.
  • Ensure a Kafka producer is up and running. I started Kafka with KRaft. Please find the instructions here.
  • Install and connect to RisingWave.
# Install RisingWave
brew tap risingwavelabs/risingwave
brew install risingwave
# Start RisingWave
risingwave playground

# Connect to RisingWave in a new terminal window
psql -h localhost -p 4566 -d dev -U root
  • Install and connect to ClickHouse.
# Download the binaries
curl <https://clickhouse.com/> | sh
# Start the server
./clickhouse server
# Start the client in a new terminal window
./clickhouse client


Write events to Kafka


Let’s now create a topic and inject several events:

# Create a topic
bin/kafka-topics.sh --create --topic cart-events --bootstrap-server localhost:9092
# Write three events to the topic
bin/kafka-console-producer.sh --topic cart-events --bootstrap-server localhost:9092

{"cust_id": "1234", "event_time": "2023-02-01 10:01:00", "item_id": "P001"}
{"cust_id": "1232","event_time": "2023-02-01 10:05:00", "item_id": "P002"}
{"cust_id": "1235","event_time": "2023-02-01 10:10:00","item_id": "P003"}

# Keep the producer active, as we'll write more events later


Ingest data into RisingWave


Let’s now create a table in RisingWave to ingest these events. In RisingWave, you can create either a source or a table for ingesting events. The difference is that events will be persisted in RisingWave if you use a table.

CREATE TABLE IF NOT EXISTS cart_event (
cust_id VARCHAR,
event_time TIMESTAMP,
item_id VARCHAR
)
WITH (
   connector='kafka',
   topic='cart-events',
   properties.bootstrap.server='localhost:9092',
   scan.startup.mode='earliest',
) FORMAT PLAIN ENCODE JSON;

Now create a local table to store product catalog and insert some data in RisingWave, so that we can enrich cart events with product catalog information.

CREATE TABLE product_catalog (
item_id varchar,
name varchar,
price double precision,
category varchar
);

INSERT INTO product_catalog (item_id, name, price, category)
VALUES 
  ('P001','Red T-Shirt',9.99,'Apparel'),
  ('P002','Blue Jeans',39.95,'Apparel'),
  ('P003','Smart Watch',199.99,'Electronics'),
  ('P004','Yoga Mat',29.95,'Fitness'), 
  ('P005','Wireless Headphones',99.99,'Electronics'),
  ('P006','Coffee Mug',5.99,'Kitchen');


Join the stream with the table


Now let’s join the cart_event stream with the product_catalog table to form an enriched data stream. We can use a materialized view to perform the stream-table join if we also want to do some other transformations. If we just want to enrich the stream with the product catalog, we can simply create a sink to do the join. For this demo, we’ll use a materialized view.

RisingWave takes a new approach to materialized views for streaming data. They continuously reflect real-time results through incremental updates.

CREATE MATERIALIZED VIEW data_enrichment AS SELECT 
  c.cust_id,
  c.event_time,
  p.name,
  p.price,
  p.category
FROM
  cart_event c
JOIN
  product_catalog p 
ON 
  c.item_id = p.item_id;

With this stream-table join, an enriched event will be produced as soon as a raw event comes into RisingWave.


Deliver the enriched stream to ClickHouse


We can sink the enriched stream to ClickHouse for further analysis. To make it happen, we need to create a table in ClickHouse with the same schema as the object in RisingWave. As we want to sink data from the data_enrichment materialized view, let’s create a table with the same schema as data_enrichment.

---Run this query in ClickHouse
CREATE TABLE enriched_cart_events
(
cust_id String,
event_time DateTime64,
name String,
price Float64,
category String
)
ENGINE = ReplacingMergeTree()
ORDER BY (cust_id, event_time);

When the sink destination is available, we can create the sink and start streaming data out of RisingWave to ClickHouse.

---Run this query in RisingWave
CREATE SINK sink_to_clickhouse
FROM
    data_enrichment WITH (
    connector = 'clickhouse',
		type='append-only',
		force_append_only='true',
    clickhouse.url = '<http://0.0.0.0:8123>',
    clickhouse.user = 'default',
    clickhouse.password = '',
    clickhouse.database = 'default',
    clickhouse.table='enriched_cart_events',
);

Let’s now query the ClickHouse table to see if the data has come through.

SELECT * from enriched_cart_events;

------ RESULTS
┌─cust_id┬──────event_time───┬─name────┬─price─┬category─┐
│ 1234    │ 2023-02-01 18:01:00.000 │ Red T-Shirt │  9.99 │ Apparel     │
│ 1232    │ 2023-02-01 18:05:00.000 │ Blue Jeans  │ 39.95 │ Apparel     │
│ 1235    │ 2023-02-01 18:10:00.000 │ Smart Watch │ 199.99│ Electronics 
└─────────┴─────────────────────────┴───────┘

We can see that the three events have been enriched and available for use in ClickHouse.

To mock up a stream, let’s write one stream at a time, and then immediately query the table in ClickHouse.

# Write one event at a time
{"cust_id": "1236","event_time": "2023-02-01 10:15:00","item_id": "P001"}
{"cust_id": "1237","event_time": "2023-02-01 10:20:00","item_id": "P004"}
{"cust_id": "1238", "event_time": "2023-02-01 10:25:00", "item_id": "P002"}
{"cust_id": "1239", "event_time": "2023-02-01 10:30:00", "item_id": "P005"}
{"cust_id": "1240", "event_time": "2023-02-01 10:35:00", "item_id": "P003"}
{"cust_id": "1241", "event_time": "2023-02-01 10:40:00", "item_id": "P006"}
{"cust_id": "1242", "event_time": "2023-02-01 10:45:00", "item_id": "P007"}

The enriched data is available in the ClickHouse table as soon as we write an event to Kafka. Each time you query the table in ClickHouse, you’ll see one more row of data.

You should see something like this when you write all above message to Kafka.

SELECT * FROM enriched_cart_events;

------ RESULTS

┌custid┬─────eventtime──┬─name───────┬─price─┬─category─┐
│ 1232 │ 23-02-01 18:05:00.00│ Blue Jeans      │  39.95 │ Apparel     │
│ 1234 │ 23-02-01 18:01:00.00│ Red T-Shirt     │   9.99 │ Apparel     │
│ 1235 │ 23-02-01 18:10:00.00│ Smart Watch     │ 199.99 │ Electronics │
│ 1236 │ 23-02-01 18:15:00.00│ Red T-Shirt     │   9.99 │ Apparel     │
│ 1237 │ 23-02-01 18:20:00.00│ Yoga Mat        │  29.95 │ Fitness     │
│ 1238 │ 23-02-01 18:25:00.00│ Blue Jeans      │  39.95 │ Apparel     │
│ 1239 │ 23-02-01 18:30:00.00│ Wireless Phones │  99.99 │ Electronics │
│ 1240 │ 23-02-01 18:35:00.00│ Smart Watch     │ 199.99 │ Electronics │
│ 1241 │ 23-02-01 18:40:00.00│ Coffee Mug      │   5.99 │ Kitchen     │
└───┴─────────────┴──────────┴─────┴───────┘

Conclusion

For simplicity, we have demonstrated the case of joining a stream with a table to enrich data. In real-world scenarios, we can join multiple streams, filter and aggregate data in streams, before streaming the data to ClickHouse. By leveraging RisingWave’s capabilities for real-time data ingestion and transformation, you can ensure that ClickHouse receives a higher quality of data that is instantly available for thorough analysis. These two systems make a perfect combo.

Avatar

Heng Ma

Content Lead

The Modern Backbone for Your
Event-Driven Infrastructure
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.