The Debezium MongoDB connector tails MongoDB's oplog (operations log) to capture every insert, update, and delete from your collections and streams them to Kafka as structured events. With RisingWave — a PostgreSQL-compatible streaming database — you can run SQL analytics on MongoDB data in real time, without complex ETL pipelines.
MongoDB and the Analytics Gap
MongoDB excels at flexible document storage and high-throughput writes, but its document model and lack of native window functions make complex analytics cumbersome. Analytics teams typically work around this by:
- Periodically exporting MongoDB data to a data warehouse (high latency)
- Running aggregation pipelines directly on MongoDB (competes with application traffic)
- Manually coding change listeners in application code (fragile and hard to scale)
CDC with Debezium offers a better path: stream changes out of MongoDB as they happen, and let a dedicated streaming SQL engine handle the analytics.
How the Debezium MongoDB Connector Works
Unlike relational connectors that read a binary replication log, the Debezium MongoDB connector uses MongoDB's change streams API (available since MongoDB 3.6), which is built on top of the oplog. Change streams provide a reliable, resumable cursor of all changes on a collection, database, or entire deployment.
For older MongoDB versions or when direct oplog access is needed, Debezium also supports reading the oplog directly.
Key differences from relational CDC:
- MongoDB documents are schema-less, so Debezium emits the document payload as a JSON string within the event envelope
- The
beforefield is not populated by default in MongoDB CDC (requires$changeStreamPreAndPostImageson MongoDB 6.0+) - The
patchfield in update events contains a MongoDB update document rather than a full row image (unless pre/post images are enabled) - Operations include
c(insert),u(update),d(delete), andr(initial snapshot read)
Step-by-Step Tutorial
Step 1: Prepare MongoDB
Ensure MongoDB is running as a replica set (required for change streams):
// Check replica set status
rs.status()
// Create a Debezium user with required privileges
use admin
db.createUser({
user: "debezium",
pwd: "dbz_secret",
roles: [
{ role: "read", db: "local" },
{ role: "readAnyDatabase", db: "admin" },
{ role: "readWrite", db: "ecommerce" }
]
})
For MongoDB 6.0+, enable pre/post images on the collection to get full before state:
use ecommerce
db.runCommand({
collMod: "orders",
changeStreamPreAndPostImages: { enabled: true }
})
Step 2: Deploy the Debezium MongoDB Connector
{
"name": "mongodb-orders-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.hosts": "rs0/mongodb.internal:27017",
"mongodb.name": "mongo1",
"mongodb.user": "debezium",
"mongodb.password": "dbz_secret",
"database.include.list": "ecommerce",
"collection.include.list": "ecommerce.orders,ecommerce.products",
"topic.prefix": "mongo1",
"snapshot.mode": "initial",
"capture.mode": "change_streams_update_full"
}
}
The capture.mode = "change_streams_update_full" setting requests the full document in update events (requires MongoDB 6.0+ with pre/post images enabled for the before state).
curl -X POST http://kafka-connect:8083/connectors \
-H "Content-Type: application/json" \
-d @mongodb-connector.json
Step 3: Connect RisingWave to the Kafka Topic
RisingWave is a PostgreSQL-compatible streaming database. Since MongoDB documents are flexible JSON, use a Kafka source with plain JSON encoding:
CREATE SOURCE mongo_orders
WITH (
connector = 'kafka',
topic = 'mongo1.ecommerce.orders',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'earliest'
) FORMAT DEBEZIUM ENCODE JSON;
For MongoDB-specific event parsing where documents are nested JSON strings, you can also use:
-- Extract fields from the document JSON payload
CREATE MATERIALIZED VIEW orders_normalized AS
SELECT
(payload->>'order_id')::BIGINT AS order_id,
payload->>'customer_email' AS customer_email,
(payload->>'total_amount')::DECIMAL AS total_amount,
payload->>'status' AS status,
(payload->>'created_at')::TIMESTAMPTZ AS created_at
FROM mongo_orders;
Step 4: Build Analytics Materialized Views
Once normalized, run real-time analytics:
CREATE MATERIALIZED VIEW order_status_breakdown AS
SELECT
status,
COUNT(*) AS order_count,
SUM(total_amount) AS total_value,
AVG(total_amount) AS avg_order_value
FROM orders_normalized
GROUP BY status;
-- Revenue trend with tumbling windows
CREATE MATERIALIZED VIEW hourly_mongo_revenue AS
SELECT
WINDOW_START AS hour,
COUNT(*) AS orders,
SUM(total_amount) AS revenue
FROM TUMBLE(orders_normalized, created_at, INTERVAL '1 HOUR')
GROUP BY WINDOW_START;
Step 5: Sink to Analytics Store
CREATE SINK order_analytics_sink
FROM order_status_breakdown
WITH (
connector = 'kafka',
topic = 'mongo-order-analytics',
properties.bootstrap.server = 'kafka:9092'
) FORMAT UPSERT ENCODE JSON
KEY ENCODE JSON (status);
Comparison Table
| Feature | MongoDB Change Streams | MongoDB Oplog Tailing |
| MongoDB version required | 3.6+ | Any replica set |
| Resumability | Built-in resume token | Manual offset tracking |
| Pre-image (before state) | MongoDB 6.0+ only | Limited |
| Filtering | Collection/database/deployment level | Post-filter only |
| Recommended for new deployments | Yes | Legacy only |
FAQ
Does the Debezium MongoDB connector support sharded clusters? Yes. For sharded clusters, connect Debezium to the mongos router or directly to each shard. The connector uses the change streams API which works across sharded deployments on MongoDB 4.0+.
Why is the before field null in MongoDB CDC events?
MongoDB's oplog does not store the full document before an update by default. Enable changeStreamPreAndPostImages on MongoDB 6.0+ collections to populate the before field. On earlier versions, the patch field contains the MongoDB update operator document that was applied.
Can RisingWave query the nested document structure directly?
Yes. RisingWave supports JSON operators (->, ->>, #>) for navigating nested JSON fields. You can project nested fields in your materialized view definitions and even index into arrays.
Key Takeaways
- Debezium MongoDB connector uses change streams (MongoDB 3.6+) to capture inserts, updates, and deletes
- Configure with
mongodb.hosts,mongodb.name, andcollection.include.list - Enable
changeStreamPreAndPostImageson MongoDB 6.0+ for full before/after state in update events - RisingWave ingests MongoDB CDC events via
FORMAT DEBEZIUM ENCODE JSONon Kafka sources - Use RisingWave's JSON operators to normalize flexible document payloads into typed materialized views

