MongoDB CDC uses change streams, a native feature built on the oplog. Debezium wraps those change streams and publishes events to Kafka. RisingWave subscribes to change streams directly and materializes the data into queryable SQL tables. If your destination is analytics rather than a message bus, RisingWave removes the Kafka layer and the consumer code entirely.
How MongoDB Change Streams Work
MongoDB change streams expose a cursor over the oplog, allowing applications to receive real-time notifications of insert, update, replace, and delete events. They require a replica set or sharded cluster — standalone MongoDB instances do not support change streams.
// Example change stream event for an insert
{
"_id": { "_data": "826604..." },
"operationType": "insert",
"fullDocument": {
"_id": ObjectId("66049a1b..."),
"customer_id": "C-1042",
"items": [
{ "sku": "WIDGET-A", "qty": 3, "price": 9.99 }
],
"total": 29.97,
"created_at": ISODate("2026-04-01T14:22:00Z")
},
"ns": { "db": "ecommerce", "coll": "orders" },
"documentKey": { "_id": ObjectId("66049a1b...") }
}
The nested structure — items is an array of objects — is the challenge. Both Debezium and RisingWave must deal with flattening this into a relational representation.
Debezium MongoDB Connector Setup
Debezium's MongoDB connector uses the change stream API (not oplog tailing, which was deprecated). It requires Kafka Connect and publishes events to Kafka topics.
{
"name": "mongodb-cdc-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.connection.string": "mongodb://mongo1:27017,mongo2:27017,mongo3:27017/?replicaSet=rs0",
"mongodb.user": "debezium",
"mongodb.password": "secret",
"collection.include.list": "ecommerce.orders,ecommerce.customers",
"topic.prefix": "mongo",
"snapshot.mode": "initial",
"capture.mode": "change_streams_update_full_document"
}
}
Setting capture.mode to change_streams_update_full_document is important. Without it, update events only contain the changed fields, not the full document. Downstream consumers then need to maintain state to reconstruct the current document.
Events land in topics like mongo.ecommerce.orders. The payload uses Debezium's envelope format:
{
"schema": { ... },
"payload": {
"op": "c",
"after": "{\"_id\": ..., \"customer_id\": \"C-1042\", \"items\": [...]}",
"source": { "collection": "orders", ... }
}
}
The after field is a JSON string. Consumers must parse it, then decide how to handle the nested items array.
RisingWave MongoDB CDC Source
RisingWave connects directly to MongoDB and subscribes to change streams using the embedded Debezium engine — no Kafka required.
CREATE SOURCE mongodb_cdc
WITH (
connector = 'mongodb-cdc',
mongodb.url = 'mongodb://mongo1:27017,mongo2:27017,mongo3:27017/?replicaSet=rs0',
username = 'debezium',
password = 'secret',
database = 'ecommerce'
);
Tables map MongoDB collections to SQL:
CREATE TABLE orders (
_id VARCHAR,
customer_id VARCHAR,
items JSONB,
total DECIMAL(10,2),
created_at TIMESTAMPTZ,
PRIMARY KEY (_id)
) FROM mongodb_cdc COLLECTION 'ecommerce.orders';
The items array is stored as JSONB. You can query it directly or flatten it with a materialized view.
Flattening Nested BSON Documents in RisingWave
This is where RisingWave shows practical value. Using SQL, you can explode nested arrays into relational rows and maintain them as a live materialized view.
-- Flatten order line items into individual rows
CREATE MATERIALIZED VIEW order_line_items AS
SELECT
o._id AS order_id,
o.customer_id,
item->>'sku' AS sku,
(item->>'qty')::INT AS qty,
(item->>'price')::DECIMAL(10,2) AS unit_price,
(item->>'qty')::INT
* (item->>'price')::DECIMAL(10,2) AS line_total,
o.created_at
FROM orders o,
jsonb_array_elements(o.items) AS item;
This materialized view updates in real time as MongoDB documents change. Query it like any SQL table:
SELECT sku, SUM(qty) AS units_sold, SUM(line_total) AS revenue
FROM order_line_items
WHERE created_at >= NOW() - INTERVAL '24 hours'
GROUP BY sku
ORDER BY revenue DESC;
No Kafka consumer. No Spark job. No ETL pipeline. One materialized view definition, results always fresh.
Replica Set Requirements
Both Debezium and RisingWave require MongoDB to be running as a replica set. This is necessary because change streams are built on the oplog, which only exists in replica sets.
| Requirement | Debezium | RisingWave |
| Replica set or sharded cluster | Required | Required |
| Standalone MongoDB | Not supported | Not supported |
| MongoDB version | 4.0+ (change streams) | 4.0+ |
| User role | read + changeStream on target DBs | Same |
fullDocument enrichment | Requires capture.mode config | Automatic |
For development, a single-node replica set works:
mongod --replSet rs0 --bind_ip_addr 0.0.0.0
rs.initiate({ _id: "rs0", members: [{ _id: 0, host: "localhost:27017" }] });
Comparison Table
| Dimension | Debezium + Kafka | RisingWave |
| Infrastructure | MongoDB RS, Kafka, Kafka Connect | MongoDB RS, RisingWave |
| Nested document handling | Consumer-side parsing | SQL JSONB operators |
| Real-time aggregations | Kafka Streams / ksqlDB | SQL materialized views |
| Fan-out to multiple consumers | Native (Kafka topics) | Limited (one consumer role) |
| Operational overhead | High | Low |
| Schema flexibility | Schemaless (JSON string payload) | JSONB + typed columns |
When to Keep Debezium
MongoDB + Debezium makes sense when multiple independent services need to consume the same change stream. A search indexing service, a cache invalidation service, and an analytics pipeline can all subscribe to the same Kafka topic independently, without knowing about each other.
RisingWave is better when the consumer is a query engine. If you are building dashboards, running aggregations, or joining MongoDB data with other sources, RisingWave's SQL layer handles that directly.
FAQ
Can RisingWave join MongoDB data with PostgreSQL data in real time? Yes. Create a PostgreSQL CDC source and a MongoDB CDC source, then write a JOIN in a materialized view. RisingWave handles the multi-source join continuously.
What happens to BSON ObjectId fields?
MongoDB _id values of type ObjectId are converted to strings in both Debezium and RisingWave. Store them as VARCHAR and use _id::VARCHAR in joins if needed.
Does RisingWave support sharded MongoDB clusters? RisingWave's MongoDB CDC connector supports replica sets. Sharded cluster support depends on the connector version — check the RisingWave release notes for current status.
How does update handling differ? MongoDB change streams for updates can emit only the changed fields (delta) or the full document, depending on configuration. RisingWave requests the full document on updates automatically, so materialized views always have complete row state.
Can I filter which collections to capture?
Yes. In RisingWave, create a TABLE only for the collections you need. Collections without a corresponding table are not captured. In Debezium, use collection.include.list to restrict capture to specific namespaces.

