Version 2.3

Data structure


public class KafkaConsumer<K, V> implements Consumer<K, V> {

   private final ConsumerCoordinator coordinator;
   private final SubscriptionState subscriptions;
}

public class SubscriptionState {

    /* the partitions that are currently assigned, note that the order of partition matters (see FetchBuilder for more details) */
    private final PartitionStates<TopicPartitionState> assignment;
    /* the list of topics the user has requested */
    private Set<String> subscription;


    private static class TopicPartitionState {

        private FetchState fetchState;
        private FetchPosition position; // last consumed position
}

public class PartitionStates<S> {

    private final LinkedHashMap<TopicPartition, S> map = new LinkedHashMap<>();
    private final Set<TopicPartition> partitionSetView = Collections.unmodifiableSet(map.keySet());

    public static class PartitionState<S> {
        private final TopicPartition topicPartition;
        private final S value;
}
}
}

Inside ConsumerCoordinator

In org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - the local proxy for consumer coordinator.

    private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
        Node coordinator = checkAndGetCoordinator();

        // create the offset commit request
        Map<String, OffsetCommitRequestData.OffsetCommitRequestTopic> requestTopicDataMap = new HashMap<>();
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
            TopicPartition topicPartition = entry.getKey();
            OffsetAndMetadata offsetAndMetadata = entry.getValue();
            OffsetCommitRequestData.OffsetCommitRequestTopic topic = requestTopicDataMap
                    .getOrDefault(topicPartition.topic(),
                            new OffsetCommitRequestData.OffsetCommitRequestTopic()
                                    .setName(topicPartition.topic())
                    );

            topic.partitions().add(new OffsetCommitRequestData.OffsetCommitRequestPartition()
                    .setPartitionIndex(topicPartition.partition())
                    .setCommittedOffset(offsetAndMetadata.offset())
                    .setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch().orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
                    .setCommittedMetadata(offsetAndMetadata.metadata())
            );
            requestTopicDataMap.put(topicPartition.topic(), topic);
        }

        final Generation generation = generation();
            // if the generation is null, we are not part of an active group (and we expect to be).
            // the only thing we can do is fail the commit and let the user rejoin the group in poll()
            if (generation == null) {
                log.info("Failing OffsetCommit request since the consumer is not part of an active group");
                return RequestFuture.failure(new CommitFailedException());
            }

        OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
                new OffsetCommitRequestData()
                        .setGroupId(this.groupId)
                        .setGenerationId(generation.generationId)
                        .setMemberId(generation.memberId)
                        .setGroupInstanceId(groupInstanceId.orElse(null))
                        .setTopics(new ArrayList<>(requestTopicDataMap.values()))
        );

        log.trace("Sending OffsetCommit request with {} to coordinator {}", offsets, coordinator);

        return client.send(coordinator, builder)
                .compose(new OffsetCommitResponseHandler(offsets));
    }