Source code: how kafka consumer commits offset
- Source code version 2.4
- What does
asyncCommitFenced
do?
In ConsumerCoordinator
private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
Node coordinator = checkAndGetCoordinator();
// create the offset commit request
Map<String, OffsetCommitRequestData.OffsetCommitRequestTopic> requestTopicDataMap = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
TopicPartition topicPartition = entry.getKey();
OffsetAndMetadata offsetAndMetadata = entry.getValue();
OffsetCommitRequestData.OffsetCommitRequestTopic topic = requestTopicDataMap
.getOrDefault(topicPartition.topic(),
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setName(topicPartition.topic())
);
topic.partitions().add(new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(topicPartition.partition())
.setCommittedOffset(offsetAndMetadata.offset())
.setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch().orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
.setCommittedMetadata(offsetAndMetadata.metadata())
);
requestTopicDataMap.put(topicPartition.topic(), topic);
}
final Generation generation = generationIfStable();
OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
new OffsetCommitRequestData()
.setGroupId(this.rebalanceConfig.groupId)
.setGenerationId(generation.generationId)
.setMemberId(generation.memberId)
.setGroupInstanceId(rebalanceConfig.groupInstanceId.orElse(null))
.setTopics(new ArrayList<>(requestTopicDataMap.values()))
);
log.trace("Sending OffsetCommit request with {} to coordinator {}", offsets, coordinator);
return client.send(coordinator, builder)
.compose(new OffsetCommitResponseHandler(offsets));
}
On server side, the operation detection is done by checking the value request.header.apiKey
, e.g.,
case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
The response is
public OffsetCommitResponse(int requestThrottleMs, Map<TopicPartition, Errors> responseData) {
Map<String, OffsetCommitResponseTopic>
responseTopicDataMap = new HashMap<>();
for (Map.Entry<TopicPartition, Errors> entry : responseData.entrySet()) {
TopicPartition topicPartition = entry.getKey();
String topicName = topicPartition.topic();
OffsetCommitResponseTopic topic = responseTopicDataMap.getOrDefault(
topicName, new OffsetCommitResponseTopic().setName(topicName));
topic.partitions().add(new OffsetCommitResponsePartition()
.setErrorCode(entry.getValue().code())
.setPartitionIndex(topicPartition.partition()));
responseTopicDataMap.put(topicName, topic);
}
data = new OffsetCommitResponseData()
.setTopics(new ArrayList<>(responseTopicDataMap.values()))
.setThrottleTimeMs(requestThrottleMs);
}