Source code: How Kafka GroupCoordinator handles membership
- Server side of the coordination logic. Removed/rewrite corner cases and error handling
- Source code verison 2.4
- See
kafka.coordinator.group.GroupState
for the state transition
Started inside KafkaServer.startup
/**
* Startup logic executed at the same time when the server starts up.
*/
def startup(enableMetadataExpiration: Boolean = true): Unit = {
info("Starting up.")
groupManager.startup(enableMetadataExpiration)
isActive.set(true)
info("Startup complete.")
}
How LEAVE_GROUP request is handled
In handleLeaveGroup
.It will be triggered when
- Consumer calls
unsubscribe
- Consumer heartbeat times out. Consumer will send LEAVE_GROUP request
//Group is locked for concurrent edit
val memberId = leavingMember.memberId
val groupInstanceId = Option(leavingMember.groupInstanceId)
if (group.isPendingMember(memberId)) {
// if a pending member is leaving, it needs to be removed from the pending list, heartbeat cancelled
// and if necessary, prompt a JoinGroup completion.
info(s"Pending member $memberId is leaving group ${group.groupId}.")
removePendingMemberAndUpdateGroup(group, memberId)
heartbeatPurgatory.checkAndComplete(MemberKey(group.groupId, memberId))
} else {
val member = if (group.hasStaticMember(groupInstanceId))
group.get(group.getStaticMemberId(groupInstanceId))
else
group.get(memberId)
member.isLeaving = true
val memberKey = MemberKey(member.groupId, member.memberId)
heartbeatPurgatory.checkAndComplete(memberKey)
info(s"Member[group.instance.id ${member.groupInstanceId}, member.id ${member.memberId}] " +
s"in group ${group.groupId} has left, removing it from the group")
// New members may timeout with a pending JoinGroup while the group is still rebalancing, so we have
// to invoke the callback before removing the member. We return UNKNOWN_MEMBER_ID so that the consumer
// will retry the JoinGroup request if is still active.
group.maybeInvokeJoinCallback(member, joinError(NoMemberId, Errors.UNKNOWN_MEMBER_ID))
group.remove(member.memberId)
group.removeStaticMember(member.groupInstanceId)
group.currentState match {
case Dead | Empty =>
case Stable | CompletingRebalance => maybePrepareRebalance(group, reason)
case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
}
}
How is heartbeat handled
//when group is still loading, respond blindly
//group is mutex is locked in the whole process
group.currentState match {
case Empty =>
responseCallback(Errors.UNKNOWN_MEMBER_ID)
case CompletingRebalance =>
responseCallback(Errors.REBALANCE_IN_PROGRESS)
case PreparingRebalance =>
val member = group.get(memberId)
completeAndScheduleNextHeartbeatExpiration(group, member)
responseCallback(Errors.REBALANCE_IN_PROGRESS)
case Stable =>
val member = group.get(memberId)
completeAndScheduleNextHeartbeatExpiration(group, member)
responseCallback(Errors.NONE)
case Dead =>
throw new IllegalStateException(s"Reached unexpected condition for Dead group $groupId")
private def completeAndScheduleNextExpiration(group: GroupMetadata, member: MemberMetadata, timeoutMs: Long): Unit = {
// complete current heartbeat expectation
member.latestHeartbeat = time.milliseconds()
val memberKey = MemberKey(member.groupId, member.memberId)
heartbeatPurgatory.checkAndComplete(memberKey)
// reschedule the next heartbeat expiration deadline
val deadline = member.latestHeartbeat + timeoutMs
val delayedHeartbeat = new DelayedHeartbeat(this, group, member.memberId, isPending = false, deadline, timeoutMs)
heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
}
def onExpireHeartbeat(group: GroupMetadata, memberId: String, isPending: Boolean, heartbeatDeadline: Long): Unit = {
group.inLock {
if (group.is(Dead)) {
info(s"Received notification of heartbeat expiration for member $memberId after group ${group.groupId} had already been unloaded or deleted.")
} else if (isPending) {
info(s"Pending member $memberId in group ${group.groupId} has been removed after session timeout expiration.")
removePendingMemberAndUpdateGroup(group, memberId)
} else if (!group.has(memberId)) {
debug(s"Member $memberId has already been removed from the group.")
} else {
val member = group.get(memberId)
if (!member.shouldKeepAlive(heartbeatDeadline)) {
info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group")
removeMemberAndUpdateGroup(group, member, s"removing member ${member.memberId} on heartbeat expiration")
}
}
}
}