Single Message Transforms (SMTs) let you reshape Debezium change events in-flight, inside Kafka Connect, before they land in Kafka topics. You can flatten the Debezium envelope, rename fields, mask PII, filter irrelevant events, and convert timestamps—all without touching your source database or downstream consumer code.
Why SMTs Matter for CDC Pipelines
A raw Debezium event looks like this:
{
"before": { "id": 1, "email": "user@example.com", "amount": 99.99 },
"after": { "id": 1, "email": "user@example.com", "amount": 149.99 },
"op": "u",
"ts_ms": 1710000000000
}
Most downstream consumers want a flat record, not a nested envelope. And you probably don't want email in your data warehouse. SMTs solve both problems at the Kafka Connect layer—no separate stream processing job needed.
The alternative is writing a Flink or Spark job just to flatten a JSON envelope. SMTs handle the common cases in a few lines of connector configuration.
Core SMTs for Debezium Pipelines
ExtractNewRecordState
The most important SMT for Debezium. It unwraps the after payload (or before for deletes) and produces a flat record:
{
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,ts_ms,source.ts_ms",
"transforms.unwrap.add.headers": "db"
}
After this SMT, the Kafka message looks like:
{ "id": 1, "email": "user@example.com", "amount": 149.99, "__op": "u", "__ts_ms": 1710000000000 }
ReplaceField — Remove PII
Drop or rename fields before events reach downstream systems:
{
"transforms": "dropPII",
"transforms.dropPII.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.dropPII.blacklist": "email,phone,ssn,credit_card_number"
}
TimestampConverter — Normalize Timestamps
Convert epoch milliseconds to ISO-8601 strings or vice versa:
{
"transforms": "tsConvert",
"transforms.tsConvert.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.tsConvert.field": "created_at",
"transforms.tsConvert.target.type": "string",
"transforms.tsConvert.format": "yyyy-MM-dd'T'HH:mm:ss.SSSZ"
}
Filter — Drop Unwanted Events
Discard events based on field values:
{
"transforms": "filterDeletes",
"transforms.filterDeletes.type": "io.debezium.transforms.Filter",
"transforms.filterDeletes.language": "jsr223.groovy",
"transforms.filterDeletes.condition": "value.op != 'd'"
}
Flatten — Collapse Nested Structures
Useful when the source metadata block contains fields you want at the top level:
{
"transforms": "flattenSource",
"transforms.flattenSource.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flattenSource.delimiter": "_"
}
Step-by-Step Tutorial
Step 1: Define a Connector with Chained SMTs
SMTs are applied in the order they're listed, separated by commas:
{
"name": "postgres-orders-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "secret",
"database.dbname": "shop",
"database.server.name": "pgserver1",
"table.include.list": "public.orders",
"plugin.name": "pgoutput",
"transforms": "unwrap,dropPII,tsConvert",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "op,ts_ms",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.dropPII.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.dropPII.blacklist": "email,phone",
"transforms.tsConvert.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.tsConvert.field": "created_at",
"transforms.tsConvert.target.type": "string",
"transforms.tsConvert.format": "yyyy-MM-dd'T'HH:mm:ss'Z'"
}
}
Step 2: Connect RisingWave to the Transformed Topic
With ExtractNewRecordState applied, the events are flat JSON—use FORMAT PLAIN for the already-unwrapped format, or keep FORMAT DEBEZIUM if you skip ExtractNewRecordState:
-- For Debezium → Kafka → RisingWave pipeline (with ExtractNewRecordState applied):
CREATE SOURCE orders_flat (
id BIGINT,
customer_id BIGINT,
total NUMERIC,
status VARCHAR,
created_at VARCHAR,
__op VARCHAR,
__ts_ms BIGINT
) WITH (
connector = 'kafka',
topic = 'pgserver1.public.orders',
properties.bootstrap.server = 'kafka:9092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
Step 3: Build Analytics on Enriched Data
-- Operations breakdown over the last hour
CREATE MATERIALIZED VIEW ops_last_hour AS
SELECT
__op,
COUNT(*) AS event_count
FROM orders_flat
WHERE TO_TIMESTAMP(__ts_ms / 1000.0) > NOW() - INTERVAL '1 hour'
GROUP BY __op;
-- Inserts per minute trend
CREATE MATERIALIZED VIEW insert_trend AS
SELECT
DATE_TRUNC('minute', TO_TIMESTAMP(__ts_ms / 1000.0)) AS minute,
COUNT(*) AS inserts
FROM orders_flat
WHERE __op = 'c'
GROUP BY 1
ORDER BY 1 DESC;
Step 4: Validate Transforms with Kafka Console Consumer
Before wiring RisingWave, verify that your SMT chain produces the expected output:
kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--topic pgserver1.public.orders \
--from-beginning \
--max-messages 3 \
| python3 -m json.tool
Comparison Table
| SMT | Use Case | Runs In | Performance Impact |
| ExtractNewRecordState | Flatten Debezium envelope | Kafka Connect | Low |
| ReplaceField | Remove/rename fields | Kafka Connect | Negligible |
| TimestampConverter | Normalize date formats | Kafka Connect | Negligible |
| Filter (Groovy) | Conditional event dropping | Kafka Connect | Low-medium (script eval) |
| Flatten | Collapse nested JSON | Kafka Connect | Low |
| Custom SMT | Complex business logic | Kafka Connect | Depends on implementation |
FAQ
Q: Should I use SMTs or a stream processor like Flink for data enrichment? SMTs are ideal for stateless, field-level transformations—renaming, dropping, converting types. For stateful enrichments (joins with reference data, aggregations, windowing), use a stream processor or RisingWave's materialized views. The rule of thumb: if you need to look up external data to enrich a record, use a stream processor.
Q: Does ExtractNewRecordState handle schema evolution?
Yes—it reads whatever fields are in the after payload and passes them through. If you add a column to the source table, it automatically appears in the downstream message. However, if you're using Avro with Schema Registry, schema compatibility rules apply and you may need a schema migration.
Q: Can I apply SMTs selectively to only certain tables?
SMTs apply to all events processed by a connector. If you need table-specific logic, either deploy separate connectors per table, or use a Filter SMT based on the source.table field to conditionally drop/pass events.
Key Takeaways
ExtractNewRecordStateis the foundational SMT for any Debezium pipeline—it converts the complex envelope into a flat, usable record.- Chain multiple SMTs with the
transformscomma-separated list; they execute in order. - Use
ReplaceFieldwith ablacklistto drop PII fields before events ever reach the Kafka topic. - SMTs run synchronously in the Kafka Connect worker, so complex logic (especially Groovy scripts in
Filter) can reduce connector throughput. - For data that's already been SMT-processed, use
FORMAT PLAIN ENCODE JSONin RisingWave rather thanFORMAT DEBEZIUM ENCODE JSON.

