Kafka
Supermetal replicates to Apache Kafka in a Debezium compatible format for existing CDC consumers, or a Supermetal native format for minimal post processing, serialized as JSON or Avro. Optional transactions align delivery with source database transaction boundaries.
Message Formats
Debezium
Compatible with Debezium's output format. See Debezium change events.
Use when: You have existing Debezium consumers, need before and after images, or require full source metadata.
{
"payload": {
"before": null,
"after": {
"id": 1,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "[email protected]"
},
"source": {
"version": "3.4.1.Final",
"connector": "postgresql",
"name": "PostgreSQL_server",
"ts_ms": 1559033904863,
"ts_us": 1559033904863123,
"ts_ns": 1559033904863123000,
"snapshot": true,
"db": "postgres",
"sequence": "[\"24023119\",\"24023128\"]",
"schema": "public",
"table": "customers",
"txId": 555,
"lsn": 24023128,
"xmin": null
},
"op": "c",
"ts_ms": 1559033904863,
"ts_us": 1559033904863841,
"ts_ns": 1559033904863841257
}
}Supermetal
Upsert based format requiring little to no post processing. Compact payload with minimal metadata.
Use when: You don't need Debezium compatibility and want minimal post processing.
{
"id": 1,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "[email protected]",
"_sm_version": 1559033904863841257,
"_sm_deleted": false
}| Field | Purpose |
|---|---|
_sm_version | Monotonic timestamp for deduplication and ordering |
_sm_deleted | Soft delete flag (true on delete operations) |
Prerequisites
- A Kafka cluster, self hosted, Confluent Cloud, Amazon MSK, Redpanda, or any Kafka compatible broker.
- Credentials, username and password or mTLS certificates, when authentication is enabled.
- A Confluent Schema Registry URL when using Avro.
Setup
Connection
- Brokers. Comma separated bootstrap servers (
broker1:9092, broker2:9092). - Security protocol.
Plaintext,Ssl,SaslPlaintext, orSaslSsl. - SASL mechanism.
Plain,ScramSha256, orScramSha512, when SASL is enabled. - Username and password. For SASL authentication.
Message format
- Format.
DebeziumorSupermetal. - Value SerDe.
JSON(no registry required) orAvro(requires Schema Registry). - Key SerDe. Optional, defaults to JSON with primary key columns.
If using Avro, configure the Schema Registry URL and credentials.
Topics
Set the topic name template to control routing. The default is supermetal.{database_or_schema}.{table}.
Supermetal creates missing topics with 1 partition and a replication factor of 1, both configurable. Ensure your Kafka principal has CreateTopic permission, or create the topics yourself in advance.
Debezium Options
| Option | Debezium Config | Default | Description |
|---|---|---|---|
| Include Before | - | true | Include before image in update and delete events |
| Tombstones on Delete | tombstones.on.delete | true | Emit a null value record after deletes |
| Emit TX Metadata | provide.transaction.metadata | false | Publish transaction begin and end events |
| TX Metadata Topic | topic.transaction | transaction | Topic name for transaction metadata |
| Skipped Operations | skipped.operations | t | Operations to skip (c, u, d, t, none) |
Headers
| Header | Default | Description |
|---|---|---|
op | off | Operation type (c, r, u, d) |
db | off | Database name |
schema | off | Schema name |
tbl | off | Table name |
context | on | __debezium.context.* headers |
pk_update | on | __debezium.newkey / __debezium.oldkey headers |
Example: With prefix: "dbz", op: true, source_template: "{schema}.{table}", context: false:
dbz.op: "c"
dbz.source: "public.accounts"Data Type Handling
| Option | Debezium Config | Default | Values |
|---|---|---|---|
| Decimal Handling | decimal.handling.mode | Precise | Precise, Double, String |
| Binary Handling | binary.handling.mode | Bytes | Bytes, Base64, Base64UrlSafe, Hex |
| Time Precision | time.precision.mode | Adaptive | Adaptive, AdaptiveTimeMicroseconds, Connect, IsoString, Microseconds, Nanoseconds |
SerDe
| SerDe | Notes |
|---|---|
| JSON | No registry required. Optionally use Schema Registry or embed the schema in each message. |
| Avro | Requires Confluent Schema Registry. |
Values and keys serialize separately. Key SerDe is optional and defaults to JSON with primary key columns.
Topics
Each table maps to a Kafka topic through the topic name template. The default is supermetal.{database_or_schema}.{table}.
Placeholders
| Placeholder | Description |
|---|---|
{database} | Source database name |
{schema} | Source schema name |
{table} | Table name |
{database_or_schema} | Database if present, otherwise schema |
Naming Mode
| Mode | Description |
|---|---|
Default | Replace invalid characters with underscore |
Unicode | Encode invalid characters as _uXXXX |
Transactions
Optional transactions align Kafka delivery with source database transaction boundaries, so consumers never observe partial transactions. Enabling them forces acks=All and an idempotent producer.
Consumer configuration
Consumers must set isolation.level=read_committed to only see committed transactions:
props.put("isolation.level", "read_committed");Apache® and Apache Kafka® are either registered trademarks or trademarks of the Apache Software Foundation. Debezium is a unregistered trademark of the Commonhaus Foundation. No endorsement by these organizations is implied by the use of these marks.
Changelog
0.1.5
2026-06-08
Failed producer delivery acks are now retried indefinitely.
Last updated on