KafkaKafka

Supermetal replicates to Apache Kafka in a Debezium compatible format for existing CDC consumers, or a Supermetal native format for minimal post processing, serialized as JSON or Avro. Optional transactions align delivery with source database transaction boundaries.

Message Formats

Debezium

Compatible with Debezium's output format. See Debezium change events.

Use when: You have existing Debezium consumers, need before and after images, or require full source metadata.

{
    "payload": { 
        "before": null, 
        "after": { 
            "id": 1,
            "first_name": "Anne",
            "last_name": "Kretchmar",
            "email": "[email protected]"
        },
        "source": { 
            "version": "3.4.1.Final",
            "connector": "postgresql",
            "name": "PostgreSQL_server",
            "ts_ms": 1559033904863,
            "ts_us": 1559033904863123,
            "ts_ns": 1559033904863123000,
            "snapshot": true,
            "db": "postgres",
            "sequence": "[\"24023119\",\"24023128\"]",
            "schema": "public",
            "table": "customers",
            "txId": 555,
            "lsn": 24023128,
            "xmin": null
        },
        "op": "c", 
        "ts_ms": 1559033904863, 
        "ts_us": 1559033904863841, 
        "ts_ns": 1559033904863841257 
    }
}

Supermetal

Upsert based format requiring little to no post processing. Compact payload with minimal metadata.

Use when: You don't need Debezium compatibility and want minimal post processing.

{
    "id": 1,
    "first_name": "Anne",
    "last_name": "Kretchmar",
    "email": "[email protected]",
    "_sm_version": 1559033904863841257,
    "_sm_deleted": false
}
FieldPurpose
_sm_versionMonotonic timestamp for deduplication and ordering
_sm_deletedSoft delete flag (true on delete operations)

Prerequisites

  • A Kafka cluster, self hosted, Confluent Cloud, Amazon MSK, Redpanda, or any Kafka compatible broker.
  • Credentials, username and password or mTLS certificates, when authentication is enabled.
  • A Confluent Schema Registry URL when using Avro.

Setup

Connection

  • Brokers. Comma separated bootstrap servers (broker1:9092, broker2:9092).
  • Security protocol. Plaintext, Ssl, SaslPlaintext, or SaslSsl.
  • SASL mechanism. Plain, ScramSha256, or ScramSha512, when SASL is enabled.
  • Username and password. For SASL authentication.

Message format

  • Format. Debezium or Supermetal.
  • Value SerDe. JSON (no registry required) or Avro (requires Schema Registry).
  • Key SerDe. Optional, defaults to JSON with primary key columns.

If using Avro, configure the Schema Registry URL and credentials.

Topics

Set the topic name template to control routing. The default is supermetal.{database_or_schema}.{table}.

Supermetal creates missing topics with 1 partition and a replication factor of 1, both configurable. Ensure your Kafka principal has CreateTopic permission, or create the topics yourself in advance.

Debezium Options

OptionDebezium ConfigDefaultDescription
Include Before-trueInclude before image in update and delete events
Tombstones on Deletetombstones.on.deletetrueEmit a null value record after deletes
Emit TX Metadataprovide.transaction.metadatafalsePublish transaction begin and end events
TX Metadata Topictopic.transactiontransactionTopic name for transaction metadata
Skipped Operationsskipped.operationstOperations to skip (c, u, d, t, none)

Headers

HeaderDefaultDescription
opoffOperation type (c, r, u, d)
dboffDatabase name
schemaoffSchema name
tbloffTable name
contexton__debezium.context.* headers
pk_updateon__debezium.newkey / __debezium.oldkey headers

Example: With prefix: "dbz", op: true, source_template: "{schema}.{table}", context: false:

dbz.op: "c"
dbz.source: "public.accounts"

Data Type Handling

OptionDebezium ConfigDefaultValues
Decimal Handlingdecimal.handling.modePrecisePrecise, Double, String
Binary Handlingbinary.handling.modeBytesBytes, Base64, Base64UrlSafe, Hex
Time Precisiontime.precision.modeAdaptiveAdaptive, AdaptiveTimeMicroseconds, Connect, IsoString, Microseconds, Nanoseconds

SerDe

SerDeNotes
JSONNo registry required. Optionally use Schema Registry or embed the schema in each message.
AvroRequires Confluent Schema Registry.

Values and keys serialize separately. Key SerDe is optional and defaults to JSON with primary key columns.

Topics

Each table maps to a Kafka topic through the topic name template. The default is supermetal.{database_or_schema}.{table}.

Placeholders

PlaceholderDescription
{database}Source database name
{schema}Source schema name
{table}Table name
{database_or_schema}Database if present, otherwise schema

Naming Mode

ModeDescription
DefaultReplace invalid characters with underscore
UnicodeEncode invalid characters as _uXXXX

Transactions

Optional transactions align Kafka delivery with source database transaction boundaries, so consumers never observe partial transactions. Enabling them forces acks=All and an idempotent producer.

Consumer configuration

Consumers must set isolation.level=read_committed to only see committed transactions:

props.put("isolation.level", "read_committed");

Apache® and Apache Kafka® are either registered trademarks or trademarks of the Apache Software Foundation. Debezium is a unregistered trademark of the Commonhaus Foundation. No endorsement by these organizations is implied by the use of these marks.

Changelog

0.1.5

2026-06-08

Failed producer delivery acks are now retried indefinitely.

Last updated on

On this page