Apache Flink: Reading and Modifying Kafka Consumer Offsets Using the State Processor API

Kafka Consumer offset management in the Flink Kafka source is notoriously difficult to understand. The offsets are stored in the Flink state, but they can also be committed to Kafka. Flink generally uses the offsets stored in state, unless a specific configuration is provided to favor Kafka consumer group offsets.

So, understanding what offsets Flink actually stores can be very important for many situations. Maybe you need to rewind them due to a data corruption bug. Or maybe a Flink job started from a savepoint consumes data it’s not supposed to, and you’d like to understand what a savepoint actually contains. Or maybe you’re migrating a job between clusters and need to bootstrap state from scratch.

Flink’s State Processor API lets you read and write savepoint state programmatically. This tutorial shows how to use it with the KafkaSource connector – reading partition offsets from an existing savepoint and writing modified offsets into a new one.

The full source code is available at github.com/irontools/flink-tutorials/kafka-offsets.

All examples use Flink 2.1.1 with flink-connector-kafka 4.0.1-2.0.

How KafkaSource State Is Stored

The KafkaSource connector stores two distinct pieces of state in a savepoint:

Reader state (operator list state) – each subtask stores its assigned Kafka partition splits in a ListState<byte[]> named SourceReaderState. Each element is a serialized KafkaPartitionSplit containing the topic, partition, starting offset, and stopping offset.

Coordinator state – the KafkaSourceEnumerator stores which partitions have been discovered and assigned. This is written as coordinator state bytes attached to the operator in the savepoint metadata.

Both are serialized as raw bytes using Flink’s SimpleVersionedSerialization wrapper format:

[serializerVersion (int)][dataLength (int)][data (bytes)]

The split data inside follows the KafkaPartitionSplitSerializer format:

[topic (UTF)][partition (int)][startingOffset (long)][stoppingOffset (long)]

Identifying the Kafka Source Operator

Before reading or writing state, you need the operator’s UID hash – a 32-character hex string that identifies the Kafka source operator in the savepoint. For SQL/Table API jobs (where you don’t set UIDs explicitly), you can find it using Flink’s built-in savepoint_metadata() function:

EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
settings.getConfiguration().setString("table.display.max-column-width", "36");
TableEnvironment tEnv = TableEnvironment.create(settings);

tEnv.executeSql("LOAD MODULE state");
tEnv.executeSql("SELECT * FROM savepoint_metadata('" + savepointPath + "')").print();

This prints all operators with their names and UID hashes. Look for the one named Source: YourTableName[1].

Reading Kafka Offsets

Reading is straightforward. Use SavepointReader to read the SourceReaderState list state, then deserialize each byte[] element:

static KafkaPartitionSplitState deserializeSplit(byte[] raw) throws IOException {
    try (DataInputStream in = new DataInputStream(new ByteArrayInputStream(raw))) {
        int serializerVersion = in.readInt();
        int dataLength = in.readInt();

        String topic = in.readUTF();
        int partition = in.readInt();
        long startingOffset = in.readLong();
        long stoppingOffset = in.readLong();

        return new KafkaPartitionSplitState(topic, partition, startingOffset, stoppingOffset);
    }
}

Wire it up with the State Processor API:

SavepointReader savepoint = SavepointReader.read(env, savepointPath, new HashMapStateBackend());

savepoint.readListState(
        OperatorIdentifier.forUidHash(operatorUidHash),
        "SourceReaderState",
        TypeInformation.of(byte[].class)
    )
    .map(ReadKafkaOffsets::deserializeSplit)
    .print();

env.execute("Read Kafka Offsets");

The output shows each partition’s current offset:

KafkaPartitionSplitState{topic='loadtest.json', partition=0, startingOffset=33, stoppingOffset=NONE}
KafkaPartitionSplitState{topic='loadtest.json', partition=1, startingOffset=29, stoppingOffset=NONE}
KafkaPartitionSplitState{topic='loadtest.json', partition=2, startingOffset=26, stoppingOffset=NONE}

Writing Modified Offsets

Writing offsets is more involved because you need to handle two layers of state: the per-subtask reader state and the coordinator state.

Step 1: Write the Reader State

Serialization is the reverse of deserialization. Wrap the split data with SimpleVersionedSerialization:

static byte[] serializeSplit(String topic, int partition, long offset) throws IOException {
    ByteArrayOutputStream splitBaos = new ByteArrayOutputStream();
    DataOutputStream splitOut = new DataOutputStream(splitBaos);
    splitOut.writeUTF(topic);
    splitOut.writeInt(partition);
    splitOut.writeLong(offset);
    splitOut.writeLong(Long.MIN_VALUE); // no stopping offset
    splitOut.flush();
    byte[] splitData = splitBaos.toByteArray();

    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    DataOutputStream out = new DataOutputStream(baos);
    out.writeInt(0); // KafkaPartitionSplitSerializer version
    out.writeInt(splitData.length);
    out.write(splitData);
    out.flush();
    return baos.toByteArray();
}

Then use a non-keyed StateBootstrapFunction to write operator list state:

public class KafkaSourceStateBootstrapFunction extends StateBootstrapFunction<byte[]> {
    private ListState<byte[]> sourceReaderState;

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        sourceReaderState = context.getOperatorStateStore().getListState(
            new ListStateDescriptor<>("SourceReaderState", byte[].class)
        );
    }

    @Override
    public void processElement(byte[] value, Context ctx) throws Exception {
        sourceReaderState.add(value);
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {}
}

Then use SavepointWriter.fromExistingSavepoint() to read an existing savepoint, remove the old Kafka source operator, and write a new one with the modified splits:

OperatorIdentifier operatorId = OperatorIdentifier.forUidHash(operatorUidHash);

SavepointWriter
    .fromExistingSavepoint(env, inputPath, new HashMapStateBackend())
    .removeOperator(operatorId)
    .withOperator(operatorId, transformation)
    .write(outputPath);

env.execute("Write Kafka Offsets");

This preserves all other operators in the savepoint while replacing only the Kafka source state.

Step 2: Write the Coordinator State

If you stop here, the savepoint will appear to work – but the offsets will be ignored. The KafkaSourceEnumerator will start fresh (no coordinator state), re-discover all partitions, and assign them to subtasks from earliest-offset, ignoring the offsets you wrote into the reader state.

The fix is to also write the enumerator’s coordinator state. This tells the enumerator “these partitions are already discovered and assigned – don’t re-assign them.”

The coordinator state has a three-layer serialization format:

Layer 1: [coordinatorSerdeVersion = 1 (int)]
Layer 2: [enumSerializerVersion = 2 (int)][dataLength (int)]
Layer 3: [numPartitions (int)]
           for each: [topic (UTF)][partition (int)][assignmentStatus = 0 (int)]
         [initialDiscoveryFinished = true (boolean)]
static byte[] serializeEnumeratorState(List<SplitInfo> splits) throws IOException {
    // KafkaSourceEnumStateSerializer v2 format
    ByteArrayOutputStream enumBaos = new ByteArrayOutputStream();
    DataOutputStream enumOut = new DataOutputStream(enumBaos);
    enumOut.writeInt(splits.size());
    for (SplitInfo split : splits) {
        enumOut.writeUTF(split.topic);
        enumOut.writeInt(split.partition);
        enumOut.writeInt(0); // ASSIGNED status
    }
    enumOut.writeBoolean(true); // initialDiscoveryFinished
    enumOut.flush();
    byte[] enumData = enumBaos.toByteArray();

    // Coordinator serde version + SimpleVersionedSerialization wrapper
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    DataOutputStream out = new DataOutputStream(baos);
    out.writeInt(1); // SOURCE_COORDINATOR_SERDE_VERSION
    out.writeInt(2); // KAFKA_ENUM_STATE_SERIALIZER_VERSION
    out.writeInt(enumData.length);
    out.write(enumData);
    out.flush();
    return baos.toByteArray();
}

The SavepointWriter API has no public method to set coordinator state, so you need to post-process the savepoint’s _metadata file after writing:

static void injectCoordinatorState(String savepointPath, String operatorUidHash,
                                    byte[] coordinatorState) throws Exception {
    Path metadataPath = Paths.get(savepointPath, "_metadata");

    CheckpointMetadata metadata;
    try (DataInputStream dis = new DataInputStream(Files.newInputStream(metadataPath))) {
        metadata = Checkpoints.loadCheckpointMetadata(
            dis, Thread.currentThread().getContextClassLoader(), savepointPath);
    }

    OperatorID targetId = operatorIdFromHex(operatorUidHash);
    for (OperatorState opState : metadata.getOperatorStates()) {
        if (opState.getOperatorID().equals(targetId)) {
            opState.setCoordinatorState(
                new ByteStreamStateHandle("kafka-enumerator", coordinatorState));
            break;
        }
    }

    try (DataOutputStream dos = new DataOutputStream(Files.newOutputStream(metadataPath))) {
        Checkpoints.storeCheckpointMetadata(metadata, dos);
    }
}

Call this after env.execute() to inject the coordinator state into the savepoint that was just written.

The Complete Write Flow

Putting it all together:

// 1. Define the desired offsets
List<SplitInfo> splits = Arrays.asList(
    new SplitInfo("loadtest.json", 0, 100),
    new SplitInfo("loadtest.json", 1, 200),
    new SplitInfo("loadtest.json", 2, 150)
);

// 2. Write the reader state into a new savepoint
SavepointWriter
    .fromExistingSavepoint(env, inputPath, new HashMapStateBackend())
    .removeOperator(operatorId)
    .withOperator(operatorId, transformation)
    .write(outputPath);
env.execute("Write Kafka Offsets");

// 3. Inject coordinator state
byte[] enumeratorState = serializeEnumeratorState(splits);
injectCoordinatorState(outputPath, operatorUidHash, enumeratorState);

Then start your Flink job from the generated savepoint:

settings.getConfiguration().setString(
    "execution.state-recovery.path", "/tmp/flink-savepoints/generated-savepoint");

Summary

Reading Kafka offsets from a Flink savepoint is a simple deserialization exercise. Writing them back requires understanding two undocumented details:

  1. Kafka source state is operator list state, not keyed state – use StateBootstrapFunction, not KeyedStateBootstrapFunction, and skip .keyBy() in the transformation.

  2. You must also write the enumerator coordinator state with the correct three-layer serialization format. Without it, the KafkaSourceEnumerator will re-discover partitions on startup and assign them from earliest-offset, ignoring your modified reader state entirely.

The State Processor API doesn’t expose coordinator state through its public API, but the savepoint’s _metadata file is small and easy to post-process using Flink’s internal Checkpoints utility.

This tutorial barely scratches the surface. The State Processor API, savepoints, Kafka connector internals – these are the kinds of deep, production-critical topics that separate Flink practitioners from Flink experts.

The Advanced Apache Flink course covers all of this and more:

Whether you’re debugging a stuck checkpoint at 2 AM or designing a pipeline that needs to handle millions of events per second, this course gives you the mental models and practical skills to do it confidently.

Enroll now →