Source code: How Kafka consumer decides which partitions to poll
- Client side of the coordination logic. Removed/rewrite corner cases and error handling
- Source code verison 2.4
In org.apache.kafka.clients.consumer.KafkaConsumer
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
coordinator.poll(timer); //note we poll coordinator every time!
fetcher.validateOffsetsIfNeeded();
coordinator.refreshCommittedOffsetsIfNeeded(timer)
subscriptions.resetMissingPositions();
fetcher.resetOffsetsIfNeeded();
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
if (fetcher.sendFetches() > 0 || client.hasPendingRequests())
client.transmitSends();
return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
The paritition assignment refresh logic is in org.apache.kafka.clients.consumer.internals.ConsumerCoordinator
. Call it CC below
Data structure of CC
public final class ConsumerCoordinator extends AbstractCoordinator {
private boolean isLeader = false;
private final SubscriptionState subscriptions;
private Set<String> joinedSubscription; //updated by subscrptions
private MetadataSnapshot metadataSnapshot;
private MetadataSnapshot assignmentSnapshot; //updated by metadataSnapshot but not vice-versa. Updated only if current CC is the leader
private boolean rejoinNeeded = true;
private boolean needsJoinPrepare = true;
private MemberState state = MemberState.UNJOINED;
private HeartbeatThread heartbeatThread = null;
- Note the heartbeat thread inside the coordinator does not update the internal state
How ConsumerCoordinator polls
public boolean rejoinNeededOrPending() {
if (!subscriptions.partitionsAutoAssigned())
return false;
// we need to rejoin if we performed the assignment and metadata has changed;
// also for those owned-but-no-longer-existed partitions we should drop them as lost
if (assignmentSnapshot != null && !assignmentSnapshot.matches(metadataSnapshot))
return true;
// we need to join if our subscription has changed since the last join
if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription())) {
return true;
}
return rejoinNeeded || joinFuture != null; //rejoinNeeded defaults to true
}
private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
@Override
public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
log.debug("Received successful JoinGroup response: {}", joinResponse);
synchronized (AbstractCoordinator.this) {
if (state != MemberState.REBALANCING) {
// if the consumer was woken up before a rebalance completes, we may have already left
// the group. In this case, we do not want to continue with the sync group.
future.raise(new UnjoinedGroupException());
} else {
AbstractCoordinator.this.generation = new Generation(joinResponse.data().generationId(),
joinResponse.data().memberId(), joinResponse.data().protocolName());
if (joinResponse.isLeader()) {
onJoinLeader(joinResponse).chain(future);
} else {
SyncGroupRequest.Builder requestBuilder =
new SyncGroupRequest.Builder(
new SyncGroupRequestData()
.setGroupId(rebalanceConfig.groupId)
.setMemberId(generation.memberId)
.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setGenerationId(generation.generationId)
.setAssignments(Collections.emptyList())
);
log.debug("Sending follower SyncGroup to coordinator {} at generation {}: {}", this.coordinator, this.generation, requestBuilder);
syncFuture = client.send(coordinator, requestBuilder).compose(new SyncGroupResponseHandler());
syncFuture.chain(future);
}
}
}
}
}
private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
state = MemberState.REBALANCING;
int joinGroupTimeoutMs = Math.max(rebalanceConfig.rebalanceTimeoutMs, rebalanceConfig.rebalanceTimeoutMs + 5000);
log.info("(Re-)joining group");
JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
new JoinGroupRequestData()
.setGroupId(rebalanceConfig.groupId)
.setSessionTimeoutMs(this.rebalanceConfig.sessionTimeoutMs)
.setMemberId(this.generation.memberId)
.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setProtocolType(protocolType())
.setProtocols(metadata()) //this one internally update joined subscription!
.setRebalanceTimeoutMs(this.rebalanceConfig.rebalanceTimeoutMs)
);
log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator);
joinFuture = client.send(coordinator, requestBuilder, joinGroupTimeoutMs)
.compose(new JoinGroupResponseHandler());
joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
@Override
public void onSuccess(ByteBuffer value) {
// handle join completion in the callback so that the callback will be invoked
// even if the consumer is woken up before finishing the rebalance
synchronized (AbstractCoordinator.this) {
log.info("Successfully joined group with generation {}", generation.generationId);
state = MemberState.STABLE;
rejoinNeeded = false;
// record rebalance latency
if (heartbeatThread != null)
heartbeatThread.enable();
}
}
});
}
return joinFuture;
}
boolean joinGroupIfNeeded(final Timer timer) {
// call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second
// time if the client is woken up before a pending rebalance completes. This must be called
// on each iteration of the loop because an event requiring a rebalance (such as a metadata
// refresh which changes the matched subscription set) can occur while another rebalance is
// still in progress.
if (needsJoinPrepare) {
// need to set the flag before calling onJoinPrepare since the user callback may throw
// exception, in which case upon retry we should not retry onJoinPrepare either.
needsJoinPrepare = false;
onJoinPrepare(generation.generationId, generation.memberId);
}
final RequestFuture<ByteBuffer> future = initiateJoinGroup();
client.poll(future, timer);
Generation generationSnapshot;
// Generation data maybe concurrently cleared by Heartbeat thread.
// Can't use synchronized for {@code onJoinComplete}, because it can be long enough
// and shouldn't block hearbeat thread.
// See {@link PlaintextConsumerTest#testMaxPollIntervalMsDelayInAssignment
synchronized (AbstractCoordinator.this) {
generationSnapshot = this.generation;
}
// Duplicate the buffer in case `onJoinComplete` does not complete and needs to be retried.
ByteBuffer memberAssignment = future.value().duplicate();
onJoinComplete(generationSnapshot.generationId, generationSnapshot.memberId, generationSnapshot.protocol, memberAssignment);
// Generally speaking we should always resetJoinGroupFuture once the future is done, but here
// we can only reset the join group future after the completion callback returns. This ensures
// that if the callback is woken up, we will retry it on the next joinGroupIfNeeded.
// And because of that we should explicitly trigger resetJoinGroupFuture in other conditions below.
resetJoinGroupFuture();
needsJoinPrepare = true;
}
}
public boolean poll(Timer timer) {
if (rejoinNeededOrPending()) { //the internal states refershed by a heartbeat thread. Here we only check the staleness
joinGroupIfNeeded(timer);
}
}
How heartbeat works - HeartbeatResponseHandler
public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
Errors error = heartbeatResponse.error();
if (error == Errors.NONE) {
log.debug("Received successful Heartbeat response");
future.complete(null);
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
log.info("Attempt to heartbeat failed since group is rebalancing");
this.rejoinNeeded = true
future.raise(error);
} else if (error == Errors.ILLEGAL_GENERATION) {
log.info("Attempt to heartbeat failed since generation {} is not current", generation.generationId);
this.generation = Generation.NO_GENERATION;
rejoinNeeded = true;
state = MemberState.UNJOINED;
future.raise(error);
} else {
future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
}
}