Kafka Setup
Prerequisites
- Kafka cluster: Self-hosted, Confluent Cloud, Amazon MSK, Redpanda, or any Kafka-compatible broker
- Credentials: Username/password or mTLS certificates (if authentication enabled)
- Schema Registry: Confluent Schema Registry URL (if using Avro)
Supported Sources
PostgreSQL, MySQL, Mongo are supported. Additional sources coming soon.
Setup
Connection
- Brokers: Comma-separated bootstrap servers (e.g.,
broker1:9092, broker2:9092) - Security Protocol:
Plaintext,Ssl,SaslPlaintext,SaslSsl - SASL Mechanism:
Plain,ScramSha256,ScramSha512(if SASL enabled) - Username / Password: For SASL authentication
Message Format
- Format Type:
DebeziumorSupermetal - Value SerDe:
JSON(no registry) orAvro(requires Schema Registry) - Key SerDe: Optional—defaults to JSON with primary key columns
If using Avro, configure the Schema Registry URL and credentials.
Topics
Set Topic Name Template to control routing. Default: supermetal.{database_or_schema}.{table}
Configure partitions and replication factor for auto-created topics. Ensure your Kafka principal has CreateTopic permissions, or pre-create topics manually.
Debezium Options
| Option | Debezium Config | Default | Description |
|---|---|---|---|
| Include Before | - | true | Include before image in update/delete events |
| Tombstones on Delete | tombstones.on.delete | true | Emit null-value record after deletes |
| Emit TX Metadata | provide.transaction.metadata | false | Publish transaction begin/end events |
| TX Metadata Topic | topic.transaction | transaction | Topic name for transaction metadata |
| Skipped Operations | skipped.operations | t | Operations to skip (c, u, d, t, none) |
Headers
| Header | Default | Description |
|---|---|---|
op | off | Operation type (c, r, u, d) |
db | off | Database name |
schema | off | Schema name |
tbl | off | Table name |
context | on | __debezium.context.* headers |
pk_update | on | __debezium.newkey / __debezium.oldkey headers |
Example: With prefix: "dbz", op: true, source_template: "{schema}.{table}", context: false:
dbz.op: "c"
dbz.source: "public.accounts"Data Type Handling
| Option | Debezium Config | Default | Values |
|---|---|---|---|
| Decimal Handling | decimal.handling.mode | Precise | Precise, Double, String |
| Binary Handling | binary.handling.mode | Bytes | Bytes, Base64, Base64UrlSafe, Hex |
| Time Precision | time.precision.mode | Adaptive | Adaptive, AdaptiveTimeMicroseconds, Connect, IsoString, Microseconds, Nanoseconds |
SerDe
| SerDe | Notes |
|---|---|
| JSON | No registry required. Optionally use Schema Registry or embed schema in each message. |
| Avro | Requires Confluent Schema Registry. |
Separate serializers for values and keys. Key SerDe is optional—defaults to JSON with primary key columns.
Topics
Each table maps to a Kafka topic via the Topic Name Template.
Default: supermetal.{database_or_schema}.{table}
Placeholders
| Placeholder | Description |
|---|---|
{database} | Source database name |
{schema} | Source schema name |
{table} | Table name |
{database_or_schema} | Database if present, otherwise schema |
Naming Mode
| Mode | Description |
|---|---|
Default | Replace invalid characters with underscore |
Unicode | Encode invalid characters as _uXXXX |
Creation
| Option | Default | Description |
|---|---|---|
| Partitions | 1 | Number of partitions |
| Replication Factor | 1 | Number of replicas |
| Operation Timeout | 30000 ms | Admin operation timeout |
Transactions
Align Kafka delivery with source database transaction boundaries. Consumers never observe partial transactions.
| Setting | Default | Description |
|---|---|---|
| Enabled | false | Enable transactional producer |
| Transactional ID | auto | Custom transactional.id (auto-derived if empty) |
| Timeout | 60000 ms | transaction.timeout.ms (broker-side) |
| Operation Timeout | 30000 ms | Client-side deadline for commit/abort |
Requirements
Transactions require acks=All and enable idempotence automatically.
Consumer Configuration
Consumers must set isolation.level=read_committed to only see committed transactions:
props.put("isolation.level", "read_committed");Last updated on