Debezium MongoDB Connector: Streaming NoSQL Changes to Analytics

Debezium MongoDB Connector: Streaming NoSQL Changes to Analytics

The Debezium MongoDB connector tails MongoDB's oplog (operations log) to capture every insert, update, and delete from your collections and streams them to Kafka as structured events. With RisingWave — a PostgreSQL-compatible streaming database — you can run SQL analytics on MongoDB data in real time, without complex ETL pipelines.

MongoDB and the Analytics Gap

MongoDB excels at flexible document storage and high-throughput writes, but its document model and lack of native window functions make complex analytics cumbersome. Analytics teams typically work around this by:

  • Periodically exporting MongoDB data to a data warehouse (high latency)
  • Running aggregation pipelines directly on MongoDB (competes with application traffic)
  • Manually coding change listeners in application code (fragile and hard to scale)

CDC with Debezium offers a better path: stream changes out of MongoDB as they happen, and let a dedicated streaming SQL engine handle the analytics.

How the Debezium MongoDB Connector Works

Unlike relational connectors that read a binary replication log, the Debezium MongoDB connector uses MongoDB's change streams API (available since MongoDB 3.6), which is built on top of the oplog. Change streams provide a reliable, resumable cursor of all changes on a collection, database, or entire deployment.

For older MongoDB versions or when direct oplog access is needed, Debezium also supports reading the oplog directly.

Key differences from relational CDC:

  • MongoDB documents are schema-less, so Debezium emits the document payload as a JSON string within the event envelope
  • The before field is not populated by default in MongoDB CDC (requires $changeStreamPreAndPostImages on MongoDB 6.0+)
  • The patch field in update events contains a MongoDB update document rather than a full row image (unless pre/post images are enabled)
  • Operations include c (insert), u (update), d (delete), and r (initial snapshot read)

Step-by-Step Tutorial

Step 1: Prepare MongoDB

Ensure MongoDB is running as a replica set (required for change streams):

// Check replica set status
rs.status()

// Create a Debezium user with required privileges
use admin
db.createUser({
  user: "debezium",
  pwd: "dbz_secret",
  roles: [
    { role: "read", db: "local" },
    { role: "readAnyDatabase", db: "admin" },
    { role: "readWrite", db: "ecommerce" }
  ]
})

For MongoDB 6.0+, enable pre/post images on the collection to get full before state:

use ecommerce
db.runCommand({
  collMod: "orders",
  changeStreamPreAndPostImages: { enabled: true }
})

Step 2: Deploy the Debezium MongoDB Connector

{
  "name": "mongodb-orders-connector",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "mongodb.hosts": "rs0/mongodb.internal:27017",
    "mongodb.name": "mongo1",
    "mongodb.user": "debezium",
    "mongodb.password": "dbz_secret",
    "database.include.list": "ecommerce",
    "collection.include.list": "ecommerce.orders,ecommerce.products",
    "topic.prefix": "mongo1",
    "snapshot.mode": "initial",
    "capture.mode": "change_streams_update_full"
  }
}

The capture.mode = "change_streams_update_full" setting requests the full document in update events (requires MongoDB 6.0+ with pre/post images enabled for the before state).

curl -X POST http://kafka-connect:8083/connectors \
  -H "Content-Type: application/json" \
  -d @mongodb-connector.json

Step 3: Connect RisingWave to the Kafka Topic

RisingWave is a PostgreSQL-compatible streaming database. Since MongoDB documents are flexible JSON, use a Kafka source with plain JSON encoding:

CREATE SOURCE mongo_orders
WITH (
    connector = 'kafka',
    topic = 'mongo1.ecommerce.orders',
    properties.bootstrap.server = 'kafka:9092',
    scan.startup.mode = 'earliest'
) FORMAT DEBEZIUM ENCODE JSON;

For MongoDB-specific event parsing where documents are nested JSON strings, you can also use:

-- Extract fields from the document JSON payload
CREATE MATERIALIZED VIEW orders_normalized AS
SELECT
    (payload->>'order_id')::BIGINT AS order_id,
    payload->>'customer_email' AS customer_email,
    (payload->>'total_amount')::DECIMAL AS total_amount,
    payload->>'status' AS status,
    (payload->>'created_at')::TIMESTAMPTZ AS created_at
FROM mongo_orders;

Step 4: Build Analytics Materialized Views

Once normalized, run real-time analytics:

CREATE MATERIALIZED VIEW order_status_breakdown AS
SELECT
    status,
    COUNT(*) AS order_count,
    SUM(total_amount) AS total_value,
    AVG(total_amount) AS avg_order_value
FROM orders_normalized
GROUP BY status;

-- Revenue trend with tumbling windows
CREATE MATERIALIZED VIEW hourly_mongo_revenue AS
SELECT
    WINDOW_START AS hour,
    COUNT(*) AS orders,
    SUM(total_amount) AS revenue
FROM TUMBLE(orders_normalized, created_at, INTERVAL '1 HOUR')
GROUP BY WINDOW_START;

Step 5: Sink to Analytics Store

CREATE SINK order_analytics_sink
FROM order_status_breakdown
WITH (
    connector = 'kafka',
    topic = 'mongo-order-analytics',
    properties.bootstrap.server = 'kafka:9092'
) FORMAT UPSERT ENCODE JSON
KEY ENCODE JSON (status);

Comparison Table

FeatureMongoDB Change StreamsMongoDB Oplog Tailing
MongoDB version required3.6+Any replica set
ResumabilityBuilt-in resume tokenManual offset tracking
Pre-image (before state)MongoDB 6.0+ onlyLimited
FilteringCollection/database/deployment levelPost-filter only
Recommended for new deploymentsYesLegacy only

FAQ

Does the Debezium MongoDB connector support sharded clusters? Yes. For sharded clusters, connect Debezium to the mongos router or directly to each shard. The connector uses the change streams API which works across sharded deployments on MongoDB 4.0+.

Why is the before field null in MongoDB CDC events? MongoDB's oplog does not store the full document before an update by default. Enable changeStreamPreAndPostImages on MongoDB 6.0+ collections to populate the before field. On earlier versions, the patch field contains the MongoDB update operator document that was applied.

Can RisingWave query the nested document structure directly? Yes. RisingWave supports JSON operators (->, ->>, #>) for navigating nested JSON fields. You can project nested fields in your materialized view definitions and even index into arrays.

Key Takeaways

  • Debezium MongoDB connector uses change streams (MongoDB 3.6+) to capture inserts, updates, and deletes
  • Configure with mongodb.hosts, mongodb.name, and collection.include.list
  • Enable changeStreamPreAndPostImages on MongoDB 6.0+ for full before/after state in update events
  • RisingWave ingests MongoDB CDC events via FORMAT DEBEZIUM ENCODE JSON on Kafka sources
  • Use RisingWave's JSON operators to normalize flexible document payloads into typed materialized views

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.