Yaroslav Tkachenko

Apache Flink: Postgres to Postgres Replication with Flink CDC

Flink CDC is a great data integration framework that’s getting more and more popular. It started as a collection of Flink sources, but since then it evolved into a standalone tool that provides declarative (YAML) pipelines.

Declarative pipelines are great to start, but they’re not as flexible as actually writing Flink DataStream or Table API pipelines. That’s why I still prefer using Flink source connectors provided by Flink CDC and combining them with regular Flink connectors in a typical Flink pipeline.

This tutorial shows how to combine a Flink CDC Postgres source with a JDBC sink to get a full Postgres to Postgres replication. To make it real, this pipeline doesn’t hardcode any table definitions: it supports reading from any tables, and it can automatically create tables at the destination.

The full source code is at github.com/irontools/flink-tutorials/postgres-replication.

All examples use Flink 2.2.0 with flink-sql-connector-postgres-cdc 3.6.0-2.2 and flink-connector-jdbc-postgres 4.0.0-2.0.

Postgres Replication 101

Postgres logical replication is built around replication slots. A slot pins a position in the WAL, spawns a walsender backend process, and forces Postgres to retain WAL segments until the slot’s consumer catches up. Each slot also holds the active state of the logical decoding plugin (in our case pgoutput) and counts against the cluster-wide max_replication_slots limit (default 10).

If you spin up one CDC source per table, you pay this cost N times:

When using Table API or SQL in Flink CDC, you have to spin up one CDC source per table. However, you can work around this limitation with the DataStream API.

What we want is a single CDC source with a multi-table publication that consumes the WAL once and emits change events for every table you’ve registered. The upstream cost is one walsender, one snapshot pass, and one slot - regardless of how many tables you replicate.

Pipeline overview

                         ┌──────────────── per table ────────────────┐
PostgresCDC source       │ flatMap(filter)     flatMap(deserialize)  │
   (raw SourceRecord) ──>│  SourceRecord  ──>  RowData  ──>  Row     │──> fromChangelogStream ──> JDBC sink
                         └───────────────────────────────────────────┘

Why use the Table API JDBC sink and not the DataStream API? Two reasons:

Schema discovery via PostgresCatalog

Flink’s flink-connector-jdbc-postgres ships a PostgresCatalog that introspects a Postgres database via JDBC and returns Flink Schema objects (column types + primary key) for any table.

PostgresCatalog catalog = new PostgresCatalog(
    Thread.currentThread().getContextClassLoader(),
    "source_pg",
    config.sourceDatabase(),
    config.sourceUser(),
    config.sourcePassword(),
    String.format("jdbc:postgresql://%s:%d/", config.sourceHost(), config.sourcePort()));
catalog.open();

Map<FullyQualifiedTableName, CatalogBaseTable> result = new LinkedHashMap<>();
for (FullyQualifiedTableName id : config.tables()) {
  ObjectPath path = new ObjectPath(config.sourceDatabase(), id.schema() + "." + id.table());
  result.put(id, catalog.getTable(path));
}

Choosing the deserializer

Flink CDC Postgres source connector ships four implementations of DebeziumDeserializationSchema<T> that can be used when defining a source in the DataStream API. Each one has a very different cost profile, and only one is appropriate for a single-source / many-tables design.

StringDebeziumDeserializationSchema

@Override
public void deserialize(SourceRecord record, Collector<String> out) throws Exception {
    out.collect(record.toString());
}

Output is just SourceRecord.toString(). Useful for debugging, useless for anything programmatic - there’s no parser for this format.

JsonDebeziumDeserializationSchema

@Override
public void deserialize(SourceRecord record, Collector<String> out) throws Exception {
    if (jsonConverter == null) {
        initializeJsonConverter();
    }
    byte[] bytes =
            jsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());
    out.collect(new String(bytes));
}

Serializes the entire Debezium envelope (op, before, after, source, ts_ms, …) to JSON via Kafka Connect’s JsonConverter, then wraps it as a String. This is the format you’ve probably seen in CDC tutorials.

For multi-table use, you’d then need to parse the JSON back to discover which table it belongs to, then parse it again to extract each column’s value, then convert to RowData/Row for the sink. Three extra passes over data that originated as a Kafka Connect Struct.

DebeziumEventDeserializationSchema

public abstract class DebeziumEventDeserializationSchema
    extends SourceRecordEventDeserializer
    implements DebeziumDeserializationSchema<org.apache.flink.cdc.common.event.Event> {
    ...
    public void deserialize(SourceRecord record, Collector<Event> out);
    public List<DataChangeEvent> deserializeDataChangeRecord(SourceRecord record);
    ...
}

This produces org.apache.flink.cdc.common.event.Event, the unit of work for the Flink CDC Pipeline framework - the YAML-based, schema-evolution-aware engine in the flink-cdc-pipeline-* connectors. It tracks per-table schemas internally and emits CreateTableEvent / DataChangeEvent / SchemaChangeEvent records.

If you’re writing a CDC Pipeline job, this is what you use. But it doesn’t compose with the regular DataStream / Table API - the Event type and the operators that consume it live in a different ecosystem.

RowDataDebeziumDeserializeSchema

public static RowDataDebeziumDeserializeSchema.Builder newBuilder();
public void deserialize(SourceRecord record, Collector<RowData> out);

This is the implementation Flink’s SQL CDC connector uses internally. It converts Kafka Connect Struct straight into RowData using the physical RowType you supply via the builder. No JSON anywhere. It’s the cheapest way to land a CDC record in Flink’s internal binary row format.

The catch: a single instance is bound to one RowType. There’s no way to say “look up the right RowType per record.” If you wire it to a CDC source that reads multiple tables with different schemas, the conversion will fail the first time a record from a different table arrives.

So RowDataDebeziumDeserializeSchema is exactly the converter we want - but it has to run after per-table demux, not at the source.

Our custom deserializer: pass-through SourceRecord

public class RawDebeziumDeserializationSchema implements DebeziumDeserializationSchema<SourceRecord> {
  @Override
  public void deserialize(SourceRecord record, Collector<SourceRecord> out) {
    out.collect(record);
  }

  @Override
  public TypeInformation<SourceRecord> getProducedType() {
    return TypeInformation.of(SourceRecord.class);
  }
}

This deserializer doesn’t deserialize anything - it forwards Kafka Connect’s SourceRecord (a Struct-backed binary representation) downstream untouched.

This is the only choice that satisfies all three constraints we have:

  1. One source, many tables. The source can’t pick a RowType because there isn’t one - different records will land in different tables. Pass-through preserves that flexibility.
  2. Cheapest possible per-record cost at the source. The source operator just forwards the same SourceRecord reference Debezium handed it. No JSON serialization, no RowData conversion, no allocations beyond what the source itself does.
  3. Filter before deserializing. The demux flat-map only needs to read two fields from the envelope (source.schema, source.table) to decide whether to keep a record. With pass-through, records that get dropped never pay the conversion cost. With JsonDebeziumDeserializationSchema, every record - including the ones we discard - would have already been serialized to JSON by the source.

The work that was in the deserializer is now per-stream, scoped to records that actually belong to a given table:

RowType rowType = (RowType) Utils.extractRowDataTypeFromSchema(schema, tEnv).getLogicalType();

DebeziumDeserializationSchema<RowData> deserializer = RowDataDebeziumDeserializeSchema.newBuilder()
    .setPhysicalRowType(rowType)
    .setResultTypeInfo(InternalTypeInfo.of(rowType))
    .build();

RowRowConverter rowConverter = RowRowConverter.create(dataType);

RowDataDebeziumDeserializeSchema (the one we couldn’t use at the source) is exactly right here, because at this point we know which table the record belongs to and what its RowType is.

Demultiplexing the source stream

The Debezium envelope embeds the originating table identifier in source.schema / source.table. The demux is a filter on those Struct fields:

public static final class TableFilter implements FlatMapFunction<SourceRecord, SourceRecord> {
  private final FullyQualifiedTableName id;

  public TableFilter(FullyQualifiedTableName id) { this.id = id; }

  @Override
  public void flatMap(SourceRecord record, Collector<SourceRecord> out) {
    Struct envelope = (Struct) record.value();
    Struct source = envelope.getStruct("source");
    if (id.schema().equals(source.getString("schema"))
        && id.table().equals(source.getString("table"))) {
      out.collect(record);
    }
  }
}

Two Struct field reads per record - Kafka Connect Struct is just a typed view over a backing array, so this is essentially two array accesses and two String.equals calls. No serialization round-trips.

Per-table conversion: SourceRecord → RowData → Row

After the filter, each table has its own stream of SourceRecords that we know match its RowType. We chain the RowDataDebeziumDeserializeSchema and the RowRowConverter into a single flat-map so the conversion is fully fused - RowData is produced and consumed in the same operator without ever showing up between operator boundaries:

public static final class SourceRecordToRow implements FlatMapFunction<SourceRecord, Row> {
  private final DebeziumDeserializationSchema<RowData> deserializer;
  private final RowRowConverter rowConverter;

  public SourceRecordToRow(
      DebeziumDeserializationSchema<RowData> deserializer,
      RowRowConverter rowConverter) {
    this.deserializer = deserializer;
    this.rowConverter = rowConverter;
  }

  @Override
  public void flatMap(SourceRecord record, Collector<Row> out) throws Exception {
    deserializer.deserialize(record, new Collector<RowData>() {
      @Override
      public void collect(RowData rowData) {
        out.collect(rowConverter.toExternal(rowData));
      }
      @Override
      public void close() {}
    });
  }
}

The inner Collector<RowData> is the bridge - RowDataDebeziumDeserializeSchema writes into it as it would for a downstream operator, but we intercept and convert on the spot. Net result per record: one StructRowData decode, one RowDataRow lift. No JSON, no intermediate Map<String, Object>, no reflection.

The output is a DataStream<Row> tagged with RowKind derived from the Debezium operation (+I for inserts/snapshot reads, +U for updates, -D for deletes) — exactly what StreamTableEnvironment.fromChangelogStream(stream, schema) expects to lift the stream into a Table. The full per-table operator chain (filter → deserialize → bridge to Table) is in the source.

Sink: Table API JDBC + auto-created targets

For each per-table source Table, we build a JDBC sink via TableDescriptor.forConnector("jdbc") and hand it the same discovered Schema. Because the schema carries the primary key, the JDBC sink generates an upsert (INSERT … ON CONFLICT (pk) DO UPDATE) and a delete by PK out of the box — no per-table SQL DDL on our side. All sinks accumulate into a single StatementSet so they share one Flink job (and therefore one CDC source):

StatementSet stmtSet = tEnv.createStatementSet();
for (FullyQualifiedTableName id : config.tables()) {
  registerTablePipeline(tEnv, stmtSet, config, id, catalogTables.get(id), raw);
}
stmtSet.execute();

The JDBC sink expects the target table to exist. To keep the “no manual DDL” promise on the sink side too, the project includes a small PgTableCreator that runs CREATE SCHEMA IF NOT EXISTS + CREATE TABLE IF NOT EXISTS against the target before the sink opens, with column types derived from the source RowType and the primary key from Schema.getPrimaryKey(). The Flink → Postgres type mapping is the inverse of Flink’s PostgresTypeMapper — every type root the catalog mapper recognises round-trips (including arrays), with a TEXT fallback for anything unsupported.

Adding a new table to the source Postgres and a new entry to the TABLES env var is enough — no Java change, no manual DDL on either end.

Summary

The shape of the pipeline is:

  1. One PostgresIncrementalSource for N tables, with a custom pass-through RawDebeziumDeserializationSchema<SourceRecord>. One replication slot, one snapshot pass, one walsender on the upstream database.
  2. Per-table demux on source.schema / source.table - two struct field reads, no serialization.
  3. Per-table RowDataDebeziumDeserializeSchema (built from a RowType discovered via PostgresCatalog) chained with RowRowConverter in a single fused flat-map - the only place we actually pay deserialization cost, and only for records we’re going to keep.
  4. tEnv.fromChangelogStream(...) lifts the Row stream into the Table API. Sinks are TableDescriptor.forConnector("jdbc") instances accumulated into a single StatementSet.
  5. PgTableCreator provisions target schemas / tables from the discovered Schema so sink-side DDL stays in lockstep with source-side DDL automatically.

The deserializer choice is the most important decision. None of the four built-in DebeziumDeserializationSchema implementations work for a single-source / many-tables / Table-API design. A ten-line pass-through deserializer, plus RowDataDebeziumDeserializeSchema applied per-stream after the demux, gives us the cheapest possible end-to-end path: each SourceRecord traverses one cheap field-access filter, then exactly one StructRowData decode, then one RowDataRow lift. No JSON, no reflection, no per-record allocations beyond what Flink CDC and the Table API already do.

Ready to Master Apache Flink?

This tutorial is just the beginning. Efficient data serialization, pipeline design, DataStream API and Table API interop — these are the kinds of deep, production-critical topics that separate Flink practitioners from Flink experts.

The Advanced Apache Flink course covers all of this and more:

  • State & checkpointing deep dive — RocksDB state backend, savepoint compatibility, and the State Processor API with hands-on exercises
  • Connector internals — connector design, Table API vs DataStream API, upserting support
  • Performance tuning — configuration best practices, flame graphs, and memory profiling

Whether you're debugging a stuck checkpoint at 2 AM or designing a pipeline that needs to handle millions of events per second, this course gives you the mental models and practical skills to do it confidently.

Enroll now →