Kafka

Replicate data to Apache Kafka with Debezium-compatible or Supermetal-native message formats.

  • Two formats: Debezium for existing CDC consumers, Supermetal for minimal post-processing
  • Transactions: Optional atomic writes aligned to source database transaction boundaries

Message Formats

Debezium

Fully compatible with Debezium's output format for seamless integration with existing CDC consumers. See Debezium change events.

Use when: You have existing Debezium consumers, need before/after images, or require full source metadata.

{
    "payload": { 
        "before": null, 
        "after": { 
            "id": 1,
            "first_name": "Anne",
            "last_name": "Kretchmar",
            "email": "[email protected]"
        },
        "source": { 
            "version": "3.4.1.Final",
            "connector": "postgresql",
            "name": "PostgreSQL_server",
            "ts_ms": 1559033904863,
            "ts_us": 1559033904863123,
            "ts_ns": 1559033904863123000,
            "snapshot": true,
            "db": "postgres",
            "sequence": "[\"24023119\",\"24023128\"]",
            "schema": "public",
            "table": "customers",
            "txId": 555,
            "lsn": 24023128,
            "xmin": null
        },
        "op": "c", 
        "ts_ms": 1559033904863, 
        "ts_us": 1559033904863841, 
        "ts_ns": 1559033904863841257 
    }
}

Supermetal

Upsert-based format requiring little to no post-processing. Compact payload with minimal metadata.

Use when: You don't need Debezium compatibility and want minimal post-processing.

{
    "id": 1,
    "first_name": "Anne",
    "last_name": "Kretchmar",
    "email": "[email protected]",
    "_sm_version": 1559033904863841257,
    "_sm_deleted": false
}
FieldPurpose
_sm_versionMonotonic timestamp for deduplication and ordering
_sm_deletedSoft delete flag (true on delete operations)

Prerequisites

  • Kafka cluster: Self-hosted, Confluent Cloud, Amazon MSK, Redpanda, or any Kafka-compatible broker
  • Credentials: Username/password or mTLS certificates (if authentication enabled)
  • Schema Registry: Confluent Schema Registry URL (if using Avro)

Supported Sources

PostgreSQL and MySQL are supported. Additional sources coming soon.


Setup

Connection

  • Brokers: Comma-separated bootstrap servers (e.g., broker1:9092, broker2:9092)
  • Security Protocol: Plaintext, Ssl, SaslPlaintext, SaslSsl
  • SASL Mechanism: Plain, ScramSha256, ScramSha512 (if SASL enabled)
  • Username / Password: For SASL authentication

Message Format

  • Format Type: Debezium or Supermetal
  • Value SerDe: JSON (no registry) or Avro (requires Schema Registry)
  • Key SerDe: Optional—defaults to JSON with primary key columns

If using Avro, configure the Schema Registry URL and credentials.

Topics

Set Topic Name Template to control routing. Default: supermetal.{database_or_schema}.{table}

Configure partitions and replication factor for auto-created topics. Ensure your Kafka principal has CreateTopic permissions, or pre-create topics manually.


Debezium Options

OptionDebezium ConfigDefaultDescription
Include Before-trueInclude before image in update/delete events
Tombstones on Deletetombstones.on.deletetrueEmit null-value record after deletes
Emit TX Metadataprovide.transaction.metadatafalsePublish transaction begin/end events
TX Metadata Topictopic.transactiontransactionTopic name for transaction metadata
Skipped Operationsskipped.operationstOperations to skip (c, u, d, t, none)

Headers

HeaderDefaultDescription
opoffOperation type (c, r, u, d)
dboffDatabase name
schemaoffSchema name
tbloffTable name
contexton__debezium.context.* headers
pk_updateon__debezium.newkey / __debezium.oldkey headers

Example: With prefix: "dbz", op: true, source_template: "{schema}.{table}", context: false:

dbz.op: "c"
dbz.source: "public.accounts"

Data Type Handling

OptionDebezium ConfigDefaultValues
Decimal Handlingdecimal.handling.modePrecisePrecise, Double, String
Binary Handlingbinary.handling.modeBytesBytes, Base64, Base64UrlSafe, Hex
Time Precisiontime.precision.modeAdaptiveAdaptive, AdaptiveTimeMicroseconds, Connect, IsoString, Microseconds, Nanoseconds

SerDe

SerDeNotes
JSONNo registry required. Optionally use Schema Registry or embed schema in each message.
AvroRequires Confluent Schema Registry.

Separate serializers for values and keys. Key SerDe is optional—defaults to JSON with primary key columns.


Topics

Each table maps to a Kafka topic via the Topic Name Template.

Default: supermetal.{database_or_schema}.{table}

Placeholders

PlaceholderDescription
{database}Source database name
{schema}Source schema name
{table}Table name
{database_or_schema}Database if present, otherwise schema

Naming Mode

ModeDescription
DefaultReplace invalid characters with underscore
UnicodeEncode invalid characters as _uXXXX

Creation

OptionDefaultDescription
Partitions1Number of partitions
Replication Factor1Number of replicas
Operation Timeout30000 msAdmin operation timeout

Transactions

Align Kafka delivery with source database transaction boundaries. Consumers never observe partial transactions.

SettingDefaultDescription
EnabledfalseEnable transactional producer
Transactional IDautoCustom transactional.id (auto-derived if empty)
Timeout60000 mstransaction.timeout.ms (broker-side)
Operation Timeout30000 msClient-side deadline for commit/abort

Requirements

Transactions require acks=All and enable idempotence automatically.

Consumer Configuration

Consumers must set isolation.level=read_committed to only see committed transactions:

props.put("isolation.level", "read_committed");

Apache® and Apache Kafka® are either registered trademarks or trademarks of the Apache Software Foundation. Debezium is a unregistered trademark of the Commonhaus Foundation. No endorsement by these organizations is implied by the use of these marks.

Last updated on

On this page