Kafka
Replicate data to Apache Kafka with Debezium-compatible or Supermetal-native message formats.
- Two formats: Debezium for existing CDC consumers, Supermetal for minimal post-processing
- Transactions: Optional atomic writes aligned to source database transaction boundaries
Message Formats
Debezium
Fully compatible with Debezium's output format for seamless integration with existing CDC consumers. See Debezium change events.
Use when: You have existing Debezium consumers, need before/after images, or require full source metadata.
{
"payload": {
"before": null,
"after": {
"id": 1,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "[email protected]"
},
"source": {
"version": "3.4.1.Final",
"connector": "postgresql",
"name": "PostgreSQL_server",
"ts_ms": 1559033904863,
"ts_us": 1559033904863123,
"ts_ns": 1559033904863123000,
"snapshot": true,
"db": "postgres",
"sequence": "[\"24023119\",\"24023128\"]",
"schema": "public",
"table": "customers",
"txId": 555,
"lsn": 24023128,
"xmin": null
},
"op": "c",
"ts_ms": 1559033904863,
"ts_us": 1559033904863841,
"ts_ns": 1559033904863841257
}
}Supermetal
Upsert-based format requiring little to no post-processing. Compact payload with minimal metadata.
Use when: You don't need Debezium compatibility and want minimal post-processing.
{
"id": 1,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "[email protected]",
"_sm_version": 1559033904863841257,
"_sm_deleted": false
}| Field | Purpose |
|---|---|
_sm_version | Monotonic timestamp for deduplication and ordering |
_sm_deleted | Soft delete flag (true on delete operations) |
Prerequisites
- Kafka cluster: Self-hosted, Confluent Cloud, Amazon MSK, Redpanda, or any Kafka-compatible broker
- Credentials: Username/password or mTLS certificates (if authentication enabled)
- Schema Registry: Confluent Schema Registry URL (if using Avro)
Supported Sources
PostgreSQL and MySQL are supported. Additional sources coming soon.
Setup
Connection
- Brokers: Comma-separated bootstrap servers (e.g.,
broker1:9092, broker2:9092) - Security Protocol:
Plaintext,Ssl,SaslPlaintext,SaslSsl - SASL Mechanism:
Plain,ScramSha256,ScramSha512(if SASL enabled) - Username / Password: For SASL authentication
Message Format
- Format Type:
DebeziumorSupermetal - Value SerDe:
JSON(no registry) orAvro(requires Schema Registry) - Key SerDe: Optional—defaults to JSON with primary key columns
If using Avro, configure the Schema Registry URL and credentials.
Topics
Set Topic Name Template to control routing. Default: supermetal.{database_or_schema}.{table}
Configure partitions and replication factor for auto-created topics. Ensure your Kafka principal has CreateTopic permissions, or pre-create topics manually.
Debezium Options
| Option | Debezium Config | Default | Description |
|---|---|---|---|
| Include Before | - | true | Include before image in update/delete events |
| Tombstones on Delete | tombstones.on.delete | true | Emit null-value record after deletes |
| Emit TX Metadata | provide.transaction.metadata | false | Publish transaction begin/end events |
| TX Metadata Topic | topic.transaction | transaction | Topic name for transaction metadata |
| Skipped Operations | skipped.operations | t | Operations to skip (c, u, d, t, none) |
Headers
| Header | Default | Description |
|---|---|---|
op | off | Operation type (c, r, u, d) |
db | off | Database name |
schema | off | Schema name |
tbl | off | Table name |
context | on | __debezium.context.* headers |
pk_update | on | __debezium.newkey / __debezium.oldkey headers |
Example: With prefix: "dbz", op: true, source_template: "{schema}.{table}", context: false:
dbz.op: "c"
dbz.source: "public.accounts"Data Type Handling
| Option | Debezium Config | Default | Values |
|---|---|---|---|
| Decimal Handling | decimal.handling.mode | Precise | Precise, Double, String |
| Binary Handling | binary.handling.mode | Bytes | Bytes, Base64, Base64UrlSafe, Hex |
| Time Precision | time.precision.mode | Adaptive | Adaptive, AdaptiveTimeMicroseconds, Connect, IsoString, Microseconds, Nanoseconds |
SerDe
| SerDe | Notes |
|---|---|
| JSON | No registry required. Optionally use Schema Registry or embed schema in each message. |
| Avro | Requires Confluent Schema Registry. |
Separate serializers for values and keys. Key SerDe is optional—defaults to JSON with primary key columns.
Topics
Each table maps to a Kafka topic via the Topic Name Template.
Default: supermetal.{database_or_schema}.{table}
Placeholders
| Placeholder | Description |
|---|---|
{database} | Source database name |
{schema} | Source schema name |
{table} | Table name |
{database_or_schema} | Database if present, otherwise schema |
Naming Mode
| Mode | Description |
|---|---|
Default | Replace invalid characters with underscore |
Unicode | Encode invalid characters as _uXXXX |
Creation
| Option | Default | Description |
|---|---|---|
| Partitions | 1 | Number of partitions |
| Replication Factor | 1 | Number of replicas |
| Operation Timeout | 30000 ms | Admin operation timeout |
Transactions
Align Kafka delivery with source database transaction boundaries. Consumers never observe partial transactions.
| Setting | Default | Description |
|---|---|---|
| Enabled | false | Enable transactional producer |
| Transactional ID | auto | Custom transactional.id (auto-derived if empty) |
| Timeout | 60000 ms | transaction.timeout.ms (broker-side) |
| Operation Timeout | 30000 ms | Client-side deadline for commit/abort |
Requirements
Transactions require acks=All and enable idempotence automatically.
Consumer Configuration
Consumers must set isolation.level=read_committed to only see committed transactions:
props.put("isolation.level", "read_committed");Apache® and Apache Kafka® are either registered trademarks or trademarks of the Apache Software Foundation. Debezium is a unregistered trademark of the Commonhaus Foundation. No endorsement by these organizations is implied by the use of these marks.
Last updated on