• Client side of the coordination logic. Removed/rewrite corner cases and error handling
  • Source code verison 2.4

In org.apache.kafka.clients.consumer.KafkaConsumer

private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
	coordinator.poll(timer); //note we poll coordinator every time!
	fetcher.validateOffsetsIfNeeded();
	coordinator.refreshCommittedOffsetsIfNeeded(timer)
	subscriptions.resetMissingPositions();
	fetcher.resetOffsetsIfNeeded();
  	final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);

 	if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) 
		client.transmitSends();

	return this.interceptors.onConsume(new ConsumerRecords<>(records));
}

The paritition assignment refresh logic is in org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. Call it CC below

Data structure of CC

public final class ConsumerCoordinator extends AbstractCoordinator {
    private boolean isLeader = false;
    private final SubscriptionState subscriptions;
    private Set<String> joinedSubscription; //updated by subscrptions
    private MetadataSnapshot metadataSnapshot; 
    private MetadataSnapshot assignmentSnapshot; //updated by metadataSnapshot but not vice-versa. Updated only if current CC is the leader
    private boolean rejoinNeeded = true;
    private boolean needsJoinPrepare = true;
    private MemberState state = MemberState.UNJOINED;
    private HeartbeatThread heartbeatThread = null;

  • Note the heartbeat thread inside the coordinator does not update the internal state

How ConsumerCoordinator polls

   public boolean rejoinNeededOrPending() {
        if (!subscriptions.partitionsAutoAssigned())
            return false;

        // we need to rejoin if we performed the assignment and metadata has changed;
        // also for those owned-but-no-longer-existed partitions we should drop them as lost
        if (assignmentSnapshot != null && !assignmentSnapshot.matches(metadataSnapshot))
            return true;

        // we need to join if our subscription has changed since the last join
        if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription())) {
            return true;
        }

	return rejoinNeeded || joinFuture != null; //rejoinNeeded defaults to true
    }

    private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
        @Override
        public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
                log.debug("Received successful JoinGroup response: {}", joinResponse);

                synchronized (AbstractCoordinator.this) {
                    if (state != MemberState.REBALANCING) {
                        // if the consumer was woken up before a rebalance completes, we may have already left
                        // the group. In this case, we do not want to continue with the sync group.
                        future.raise(new UnjoinedGroupException());
                    } else {
                        AbstractCoordinator.this.generation = new Generation(joinResponse.data().generationId(),
                                joinResponse.data().memberId(), joinResponse.data().protocolName());
                        if (joinResponse.isLeader()) {
                            onJoinLeader(joinResponse).chain(future);
                        } else {
    SyncGroupRequest.Builder requestBuilder =
                new SyncGroupRequest.Builder(
                        new SyncGroupRequestData()
                                .setGroupId(rebalanceConfig.groupId)
                                .setMemberId(generation.memberId)
                                .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
                                .setGenerationId(generation.generationId)
                                .setAssignments(Collections.emptyList())
                );
        log.debug("Sending follower SyncGroup to coordinator {} at generation {}: {}", this.coordinator, this.generation, requestBuilder);
        syncFuture = client.send(coordinator, requestBuilder).compose(new SyncGroupResponseHandler());
                            syncFuture.chain(future);
                        }
                    }
                }
        }
    }

    private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
            state = MemberState.REBALANCING;
  	int joinGroupTimeoutMs = Math.max(rebalanceConfig.rebalanceTimeoutMs, rebalanceConfig.rebalanceTimeoutMs + 5000);
    log.info("(Re-)joining group");
        JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
                new JoinGroupRequestData()
                        .setGroupId(rebalanceConfig.groupId)
                        .setSessionTimeoutMs(this.rebalanceConfig.sessionTimeoutMs)
                        .setMemberId(this.generation.memberId)
                        .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
                        .setProtocolType(protocolType())
                        .setProtocols(metadata()) //this one internally update joined subscription!
                        .setRebalanceTimeoutMs(this.rebalanceConfig.rebalanceTimeoutMs)
        );

        log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator);
            joinFuture = client.send(coordinator, requestBuilder, joinGroupTimeoutMs)
                .compose(new JoinGroupResponseHandler());
            joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
                @Override
                public void onSuccess(ByteBuffer value) {
                    // handle join completion in the callback so that the callback will be invoked
                    // even if the consumer is woken up before finishing the rebalance
                    synchronized (AbstractCoordinator.this) {
                            log.info("Successfully joined group with generation {}", generation.generationId);
                            state = MemberState.STABLE;
                            rejoinNeeded = false;
                            // record rebalance latency
                            if (heartbeatThread != null)
                                heartbeatThread.enable();
                    }
                }
            });
        }
        return joinFuture;
    }

    boolean joinGroupIfNeeded(final Timer timer) {
            // call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second
            // time if the client is woken up before a pending rebalance completes. This must be called
            // on each iteration of the loop because an event requiring a rebalance (such as a metadata
            // refresh which changes the matched subscription set) can occur while another rebalance is
            // still in progress.
            if (needsJoinPrepare) {
                // need to set the flag before calling onJoinPrepare since the user callback may throw
                // exception, in which case upon retry we should not retry onJoinPrepare either.
                needsJoinPrepare = false;
                onJoinPrepare(generation.generationId, generation.memberId);
            }

            final RequestFuture<ByteBuffer> future = initiateJoinGroup();
            client.poll(future, timer);
    Generation generationSnapshot;

                // Generation data maybe concurrently cleared by Heartbeat thread.
                // Can't use synchronized for {@code onJoinComplete}, because it can be long enough
                // and  shouldn't block hearbeat thread.
                // See {@link PlaintextConsumerTest#testMaxPollIntervalMsDelayInAssignment
                synchronized (AbstractCoordinator.this) {
                    generationSnapshot = this.generation;
                }

                    // Duplicate the buffer in case `onJoinComplete` does not complete and needs to be retried.
                    ByteBuffer memberAssignment = future.value().duplicate();

                    onJoinComplete(generationSnapshot.generationId, generationSnapshot.memberId, generationSnapshot.protocol, memberAssignment);

                    // Generally speaking we should always resetJoinGroupFuture once the future is done, but here
                    // we can only reset the join group future after the completion callback returns. This ensures
                    // that if the callback is woken up, we will retry it on the next joinGroupIfNeeded.
                    // And because of that we should explicitly trigger resetJoinGroupFuture in other conditions below.
                    resetJoinGroupFuture();
                    needsJoinPrepare = true; 
}
}

public boolean poll(Timer timer) {
	if (rejoinNeededOrPending()) { //the internal states refershed by a heartbeat thread. Here we only check the staleness
		joinGroupIfNeeded(timer);
	}
}

How heartbeat works - HeartbeatResponseHandler


  public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
            Errors error = heartbeatResponse.error();
            if (error == Errors.NONE) {
                log.debug("Received successful Heartbeat response");
                future.complete(null);
            } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                log.info("Attempt to heartbeat failed since group is rebalancing");
		this.rejoinNeeded = true
                future.raise(error);
            } else if (error == Errors.ILLEGAL_GENERATION) {
                log.info("Attempt to heartbeat failed since generation {} is not current", generation.generationId);
		 this.generation = Generation.NO_GENERATION;
  		rejoinNeeded = true;
        state = MemberState.UNJOINED;
                future.raise(error);
            } else {
                future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
            }
        }