How kafka consumer commits offsets
Version 2.3
Data structure
public class KafkaConsumer<K, V> implements Consumer<K, V> {
private final ConsumerCoordinator coordinator;
private final SubscriptionState subscriptions;
}
public class SubscriptionState {
/* the partitions that are currently assigned, note that the order of partition matters (see FetchBuilder for more details) */
private final PartitionStates<TopicPartitionState> assignment;
/* the list of topics the user has requested */
private Set<String> subscription;
private static class TopicPartitionState {
private FetchState fetchState;
private FetchPosition position; // last consumed position
}
public class PartitionStates<S> {
private final LinkedHashMap<TopicPartition, S> map = new LinkedHashMap<>();
private final Set<TopicPartition> partitionSetView = Collections.unmodifiableSet(map.keySet());
public static class PartitionState<S> {
private final TopicPartition topicPartition;
private final S value;
}
}
}
Inside ConsumerCoordinator
In org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - the local proxy for consumer coordinator.
private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
Node coordinator = checkAndGetCoordinator();
// create the offset commit request
Map<String, OffsetCommitRequestData.OffsetCommitRequestTopic> requestTopicDataMap = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
TopicPartition topicPartition = entry.getKey();
OffsetAndMetadata offsetAndMetadata = entry.getValue();
OffsetCommitRequestData.OffsetCommitRequestTopic topic = requestTopicDataMap
.getOrDefault(topicPartition.topic(),
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setName(topicPartition.topic())
);
topic.partitions().add(new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(topicPartition.partition())
.setCommittedOffset(offsetAndMetadata.offset())
.setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch().orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
.setCommittedMetadata(offsetAndMetadata.metadata())
);
requestTopicDataMap.put(topicPartition.topic(), topic);
}
final Generation generation = generation();
// if the generation is null, we are not part of an active group (and we expect to be).
// the only thing we can do is fail the commit and let the user rejoin the group in poll()
if (generation == null) {
log.info("Failing OffsetCommit request since the consumer is not part of an active group");
return RequestFuture.failure(new CommitFailedException());
}
OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
new OffsetCommitRequestData()
.setGroupId(this.groupId)
.setGenerationId(generation.generationId)
.setMemberId(generation.memberId)
.setGroupInstanceId(groupInstanceId.orElse(null))
.setTopics(new ArrayList<>(requestTopicDataMap.values()))
);
log.trace("Sending OffsetCommit request with {} to coordinator {}", offsets, coordinator);
return client.send(coordinator, builder)
.compose(new OffsetCommitResponseHandler(offsets));
}