Debezium SMTs (Single Message Transforms): A Practical Guide

Debezium SMTs (Single Message Transforms): A Practical Guide

Single Message Transforms (SMTs) are lightweight, per-message operations applied inside Kafka Connect before events reach their destination topic. For Debezium specifically, they solve the problem that raw Debezium messages include metadata (before/after envelopes, operation type, source info) that downstream consumers often don't want.

What a Raw Debezium Message Looks Like

Without SMTs, a Debezium PostgreSQL change event looks like this:

{
  "before": null,
  "after": {
    "id": 1001,
    "customer_id": 42,
    "total": 99.99,
    "status": "placed"
  },
  "source": {
    "version": "2.5.0.Final",
    "connector": "postgresql",
    "name": "prod-server",
    "ts_ms": 1711900800000,
    "schema": "public",
    "table": "orders",
    "txId": 7823,
    "lsn": 24023424,
    "xmin": null
  },
  "op": "c",
  "ts_ms": 1711900800100,
  "transaction": null
}

Most downstream consumers only want the after fields. The metadata is useful for exactly-once processing and audit, but for a simple sink into a data warehouse, it's noise.

The Most Useful SMTs

ExtractNewRecordState

This is the SMT you'll use in almost every Debezium pipeline. It flattens the envelope, promoting after fields to the top level and optionally adding operation metadata as header fields.

{
  "name": "orders-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.delete.handling.mode": "rewrite",
    "transforms.unwrap.add.fields": "op,table,lsn,source.ts_ms",
    "transforms.unwrap.add.headers": "db"
  }
}

After this SMT, the message becomes:

{
  "id": 1001,
  "customer_id": 42,
  "total": 99.99,
  "status": "placed",
  "__op": "c",
  "__table": "orders",
  "__lsn": 24023424,
  "__source_ts_ms": 1711900800000
}

The __op field (c=create, u=update, d=delete, r=read/snapshot) is the most commonly needed metadata for downstream deduplication logic.

ReplaceField

Use this to rename or drop specific fields. Useful when column names in the source don't match naming conventions in the destination.

"transforms": "unwrap,rename",
"transforms.rename.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.rename.renames": "customer_id:customerId,total:totalAmount",
"transforms.rename.blacklist": "internal_notes,admin_flag"

Router (TopicNameRouter)

By default, Debezium sends all events from a table to a single topic like prod-server.public.orders. The RegexRouter SMT lets you rename topics:

"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "prod-server.public.(.*)",
"transforms.route.replacement": "cdc.$1"

This renames prod-server.public.orders to cdc.orders.

Filter

The Filter SMT drops messages matching a condition. Available in Debezium 1.9+ via the io.debezium.transforms.Filter class, which uses a Groovy expression language.

"transforms": "filter",
"transforms.filter.type": "io.debezium.transforms.Filter",
"transforms.filter.language": "jsr223.groovy",
"transforms.filter.condition": "value.after.status == 'test' || value.op == 'd'"

This drops all events where status is "test" or the operation is a delete. Filtering at the connector level reduces downstream load but requires the Groovy dependency.

TimestampConverter

Debezium represents timestamps as epoch milliseconds by default. Use this to convert to a string format your sink expects:

"transforms": "ts",
"transforms.ts.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.ts.field": "created_at",
"transforms.ts.target.type": "string",
"transforms.ts.format": "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"

Chaining Multiple SMTs

SMTs are applied in the order listed in the transforms property, left to right. A common pattern is unwrap first, then rename:

"transforms": "unwrap,rename,route",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,lsn",
"transforms.rename.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.rename.renames": "customer_id:customerId",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "prod-server.public.(.*)",
"transforms.route.replacement": "cdc.$1"

Order matters: you must unwrap before renaming, because the rename operates on top-level fields, and the fields only become top-level after unwrapping.

Common SMT Pitfalls

Performance: SMTs run in the connector's worker thread. Complex Groovy Filter expressions or expensive transforms can reduce connector throughput. For high-volume tables (>10K events/sec), profile the impact.

Tombstones: When delete.handling.mode=tombstone, Debezium sends a null-value tombstone after each delete. Many SMTs don't handle null values well. Set drop.tombstones=true unless your consumer explicitly needs tombstones for log compaction.

Schema registry conflicts: ExtractNewRecordState changes the schema of the Kafka message. If you're using Confluent Schema Registry with compatibility checks, a new schema version is registered after adding SMTs. Ensure compatibility is set to FORWARD or NONE before deploying.

The RisingWave Approach: SQL Instead of SMT Config

RisingWave handles the same transformations through standard SQL. There is no SMT configuration syntax to learn.

The equivalent of ExtractNewRecordState in RisingWave is simply writing a SELECT that chooses the columns you want:

-- Create source (raw CDC events)
CREATE SOURCE orders_source WITH (
  connector = 'postgres-cdc',
  hostname = 'db',
  port = '5432',
  username = 'rwuser',
  password = 'secret',
  database.name = 'prod',
  schema.name = 'public',
  table.name = 'orders'
);

-- "ExtractNewRecordState" equivalent: just select the columns you want
CREATE MATERIALIZED VIEW orders_clean AS
SELECT
  id,
  customer_id    AS "customerId",
  total          AS "totalAmount",
  status,
  updated_at
FROM orders_source
WHERE status != 'test';

The equivalent of a Filter SMT is a WHERE clause. The equivalent of ReplaceField is a column alias. The equivalent of TimestampConverter is CAST(updated_at AS VARCHAR) or a TO_CHAR function.

This comparison shows the philosophical difference clearly:

GoalDebezium SMT ConfigRisingWave SQL
Flatten envelopeExtractNewRecordStateImplicit — columns are already flat
Drop fieldsReplaceField blacklistOmit from SELECT
Rename fieldsReplaceField renamesColumn alias (AS)
Filter rowsFilter + GroovyWHERE clause
Convert timestampTimestampConverterCAST() or TO_CHAR()
Route to different topicRegexRouterDifferent materialized view

For engineers who already know SQL, the RisingWave approach removes an entire learning curve. The SMT configuration syntax is non-obvious — the fact that transforms must be a comma-separated string, and each transform is configured via transforms.<name>.<property> prefixes, is a common source of errors.

If you already have Kafka and Kafka Connect deployed, SMTs work in-process and are efficient for stateless transforms. For teams building new pipelines or consolidating infrastructure, RisingWave offers the SQL approach as a simpler alternative to SMT configuration chains.

FAQ

Can I write custom SMTs? Yes. SMTs are Java classes implementing org.apache.kafka.connect.transforms.Transformation. You can package them in a JAR and deploy alongside Kafka Connect. This is useful for organization-specific transformations that aren't covered by built-in SMTs.

When should I use Kafka Streams or ksqlDB instead of SMTs? SMTs are stateless. If your transformation requires joining across multiple events, maintaining state, or windowed aggregations, you need Kafka Streams or ksqlDB. SMTs only see one message at a time.

Does ExtractNewRecordState work for all Debezium connectors? Yes, it works for all Debezium relational connectors (PostgreSQL, MySQL, SQL Server, Oracle, MongoDB has a different version). The field names in add.fields differ slightly per connector (e.g., MySQL uses gtid instead of lsn).

What is delete.handling.mode and which value should I use? rewrite promotes the before-state fields to the top level with __deleted=true appended. tombstone sends a null-value message (good for log compaction). drop silently discards delete events. Use rewrite if your downstream needs to know about deletes; use drop for append-only pipelines.

Can SMTs add latency? At normal throughputs, SMTs add microseconds per message. At very high throughputs (millions of events/sec), the CPU cost accumulates. Profile with and without SMTs on your target hardware before production deployment.

Best-in-Class Event Streaming
for Agents, Apps, and Analytics
GitHubXLinkedInSlackYouTube
Sign up for our to stay updated.