How kafka consumer commits offsets

2020-06-17 00:00:00 +0000

Version 2.3

Data structure


public class KafkaConsumer<K, V> implements Consumer<K, V> {

   private final ConsumerCoordinator coordinator;
   private final SubscriptionState subscriptions;
}

public class SubscriptionState {

    /* the partitions that are currently assigned, note that the order of partition matters (see FetchBuilder for more details) */
    private final PartitionStates<TopicPartitionState> assignment;
    /* the list of topics the user has requested */
    private Set<String> subscription;


    private static class TopicPartitionState {

        private FetchState fetchState;
        private FetchPosition position; // last consumed position
}

public class PartitionStates<S> {

    private final LinkedHashMap<TopicPartition, S> map = new LinkedHashMap<>();
    private final Set<TopicPartition> partitionSetView = Collections.unmodifiableSet(map.keySet());

    public static class PartitionState<S> {
        private final TopicPartition topicPartition;
        private final S value;
}
}
}

Inside ConsumerCoordinator

In org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - the local proxy for consumer coordinator.

    private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
        Node coordinator = checkAndGetCoordinator();

        // create the offset commit request
        Map<String, OffsetCommitRequestData.OffsetCommitRequestTopic> requestTopicDataMap = new HashMap<>();
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
            TopicPartition topicPartition = entry.getKey();
            OffsetAndMetadata offsetAndMetadata = entry.getValue();
            OffsetCommitRequestData.OffsetCommitRequestTopic topic = requestTopicDataMap
                    .getOrDefault(topicPartition.topic(),
                            new OffsetCommitRequestData.OffsetCommitRequestTopic()
                                    .setName(topicPartition.topic())
                    );

            topic.partitions().add(new OffsetCommitRequestData.OffsetCommitRequestPartition()
                    .setPartitionIndex(topicPartition.partition())
                    .setCommittedOffset(offsetAndMetadata.offset())
                    .setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch().orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
                    .setCommittedMetadata(offsetAndMetadata.metadata())
            );
            requestTopicDataMap.put(topicPartition.topic(), topic);
        }

        final Generation generation = generation();
            // if the generation is null, we are not part of an active group (and we expect to be).
            // the only thing we can do is fail the commit and let the user rejoin the group in poll()
            if (generation == null) {
                log.info("Failing OffsetCommit request since the consumer is not part of an active group");
                return RequestFuture.failure(new CommitFailedException());
            }

        OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
                new OffsetCommitRequestData()
                        .setGroupId(this.groupId)
                        .setGenerationId(generation.generationId)
                        .setMemberId(generation.memberId)
                        .setGroupInstanceId(groupInstanceId.orElse(null))
                        .setTopics(new ArrayList<>(requestTopicDataMap.values()))
        );

        log.trace("Sending OffsetCommit request with {} to coordinator {}", offsets, coordinator);

        return client.send(coordinator, builder)
                .compose(new OffsetCommitResponseHandler(offsets));
    }

ElasticCache redis catches

2020-06-16 00:00:00 +0000

Losing data on rebooting primary in clustered mode

  • If we reboot the primary, both primary and its read replica/slave’s data will be wiped
    • Reboot takes about 2 mins from start to primary health check passed again. Auto failover will not be triggered in this process
    • Note that if we deploy redis on EC2 directly, we can turn on AOF, so that on server reboot, all data upto the last fsync will be in memory. However, EC redis disabled AOF in cross-AZ mode and cannot be turned on! This suggests a risk of cache avalanche and stakeholders need to eval the risk/reward ratio
  • If we reboot the read/replica, the primary data remains intact
  • Promoting the replica to master will keep the data in the new master and old master intact

Retro: tidb migration for txn histroy re-architecturing

2020-06-14 00:00:00 +0000

Scenario and goals

  • Major refactoring of centralization of multiple payment history tables into one To act as OLTP and small OLAP bedrock for front line services
  • 2k writes per sec. 1k reads with p99 < 2 sec
  • Gain confidence for the critial path migration later on

Timeline

  • June - PoC with load testing
  • July - Dev starts
  • Aug - Load testing with problems. DR drills
  • Sep - Turn on incremental traffic switch
  • Oct - 100% traffic switch

Major decision dilemmas/Known unknowns and how I solved them

  • DB selection
    • PoCed vitess, tidb, cockroach db
    • Vitess has hugh operational problem even during PoC
    • Cockroach is feasible, but we are mysql based so favored tdib
  • Hosting solution: EKS + operator or EC2 + Ansible
    • Ansible proved too much effort
    • Because we can do incremental traffic switch, it is OK we take risks with k8s operator, since it is the direction of the future
  • Data transfer approach: direct between DBs or done by kafka
    • Kafka approach means we don’t need coordination logic
    • But our kafka pipeline implmentation uses the single row actions, which reduced speed by a large margin
    • Personal preference is between DBs, but the implmentation team prefers the kafka approach. In the end I approved the team’s decision
      • I have no way to prove the kafka approach won’t perform as well.
      • I didn’t force a PoC, because that would be implementing coordination logic regardless, which we all know it is not trivial
      • I trusted the team, which I didn’t work too much with in prior
      • In retro, I should have forced a PoC on the team’s preferred approach even against the timeline.

How did we migrate

  • Based on big query, we know in effect data becomes immutable after 3 months. So we migration data up to T - 3 months first, and take a backup of that. This means our kafka pipeline needs to run at most 3 months data (in effect, a little over one month) every time we need to re-run
  • A recon job that calcuates checksum between old and new architecture based on domain logics
  • A new service powered by the tidb running in parallel with the old one. Openresty in from of the both to do traffic swtich

Setbacks/Unknown unknowns run into

  • By late July, find that kafka pipeline performance is much lower than expected - only 2k writes per sec
  • By late August, find that we had to scope down the project - one low traffic /big effort component is dropped
  • Schema changed twice AFTER staging migration is completed. Each time it addes an extra week of delay
  • First week in production, the cluster disappeared from all monitors for 10 minutes

(TODO: add how we solve such problems)

Retro: tidb migration for a mission critical system

2020-06-08 00:00:00 +0000

Scenario

  • Aurora spends 75% of waits on cross region binlog replication, and unable to satify the TPS requirement even after upgrading to highest hardware spec. Such bottleneck is limiting business growth

Timeline

  • Nov - PoC with load testing
  • Dec - Migration plan discussion and got buy-ins
  • Jan - Develop verificaiton tools and processes
    • Set up clusters and drills on operations
  • Feb - Run the verificaiton schemes on prod
    • 30+ case DR drills from stg to prod

Major decision dilemmas/Known unknowns and how I solved them

  • Tidb vs Dynamo
    • Both sides run PoC and run a 4 hour debate session to present and argue for the solution
    • Neither side can show the other side is not feasible. So tie is broken by the LCA in the reporting line.
  • Hosting solution: EKS + operator or EC2 + Ansible
    • Both sides has data to support its claim, and neither side can prove the other side is not feasible
    • Again, the tie is broken by the LCA in the reporting line, who in retro, made the correct decision
  • Migration approach: one-shot vs incremental
    • The intuition says we should migrate data incrementally. But I lobbied for one shot approach after my research * I researched 5 cases with clients in similar industries. Talked to 2 of them directly to understand their motivation of not choosing incremental
      • Got confirmation from the solution architects of pingcap on not choosing incremental
    • Such proposal caused stress in higher-ups. As a remedy, all migration runbooks includes near-real time verification and rollback plan

How I did verification

  • query replay - to verify server side behavior is behaving as expected. Note since it is Aurora, we can’t use a proxy service to traffic capure and relay to a side car without having down time for prod
  • Binlog + EMR job to check the data consistency cross domain
    • to make sure binlog replication is behaving as expected ,
    • Standard sync-diff verification applies too
  • Traffic replay - to verify client side library (jdbc, connection pool) etc are maintaining same behavior as mysql

How we performed migraiton drill

  • Detailed runbook with DRI and reviwer at each step
  • I ask as the coordinator to measure process and overall correctness
  • Each step is timed and publicly annouced by me and the action taker
  • Drill is done 4 times before the actual migration. The last time is the complete drill with all stakeholders’ attendance
  • Last 2 drills completed without any error

Setbacks/Unknown unknowns run into

  • Two weeks before going live. Query replay verification failed on prod
  • 36 hours before going live. Run out of snowflake id on prod
  • Keep seeing concurrent read-write error even though the code shows no conrrency during writes

Common prometheus problems

2020-06-05 00:00:00 +0000

Range vector

  • With counter type, the it is more common to use rate() rather than increase(), because the latter is changed by the length of the bucket, i.e., it will compute the different between values at the both ends of the bucket, so longer the bucket bigger the value
  • For the same reason above, don’t use rate() or increase() over gauge metrics because the underlying assumption does not match - Counter is always increasing, while gauge can go up and down
    • Any decrease in counter is treated as counter reset
  • Because of reset and extrapolation logic, recommend to set time interval 5 times of the scrapping interval. So we we are resilient enough to have 2 values in the bucket

Agents behind LB

  • We need to expose all agents so that they can be scrapable by the P server
  • If the above requirement is hard, then we need addtional sidecar/proxy on each node. Alternatively, add separate URL path for each server to the LB

Applying peak-end rule at work

2020-06-01 00:00:00 +0000

  • Surprises in the middle as peak. Rewards/compliment at the end
  • Identify the target person you want to influent. His feeling decides the moment’s effectiveness
  • The effect should align with the expectation, and exceeds expectation
  • At the end of the project, do a retro with team and client, to summarize exp and lessons
    • Proved next stage roadmap even if not asked
  • Similarly, highest and most recent prices carry more weights in setting internal price benchmark

Kafka high CPU usage problem

2020-05-29 00:00:00 +0000

Cluster setup

  • 6 m5.2xlarge
  • Kafka 2.3.0
  • openjdk version “1.8.0_232”
  • CentOS Linux release 7.6.1810 (Core)

Symptom

Every kafka broker is experiencing heavy CPU load at 15k msgs on each broker

procs -----------memory---------- ---swap-- -----io---- -system-- ------cpu-----
 r  b   swpd   free   buff  cache   si   so    bi    bo   in   cs us sy id wa st
 1  0      0 286240  44392 24279584    0    0    53   711    8    6 40 11 49  0  0
 5  0      0 287480  44392 24280624    0    0     0     0 48650 41962 15  5 80  0  0
 5  0      0 286992  44392 24281656    0    0     0     0 50061 41004 21  6 73  0  0
15  0      0 289344  44376 24272348    0    0     0     0 53212 44541 30  7 63  0  0
 5  0      0 291032  44340 24271480    0    0     0  9264 51796 43536 19  5 76  0  0
10  0      0 291656  44352 24272676    0    0     0  1604 46937 41888 18  5 77  0  0
 3  0      0 290776  44352 24273524    0    0     0     0 46349 40179 16  4 80  0  0
 2  0      0 289448  44352 24274356    0    0     0     0 50271 41767 18  6 76  0  0
 1  0      0 289444  44352 24275272    0    0     0     0 46326 39414 16  5 79  0  0
 9  0      0 287456  44352 24276560    0    0     0  3704 47776 39765 19  5 76  0  0

Each context swtich costs about 4 microseconds

Threads:  94 total,   8 running,  86 sleeping,   0 stopped,   0 zombie
%Cpu(s): 56.2 us,  9.9 sy,  0.0 ni, 28.1 id,  0.0 wa,  0.0 hi,  5.8 si,  0.0 st
KiB Mem : 31960748 total,   221784 free,  5007428 used, 26731536 buff/cache
KiB Swap:        0 total,        0 free,        0 used. 26347036 avail Mem 
  PID USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND                                        
11063 kafka     20   0   34.2g   4.4g  68900 R 53.2 14.4 474:03.72 data-plane-kafk                                
10495 kafka     20   0   34.2g   4.4g  68900 R 47.8 14.4 470:24.77 data-plane-kafk                                
10499 kafka     20   0   34.2g   4.4g  68900 S 46.5 14.4 470:24.25 data-plane-kafk                                
10492 kafka     20   0   34.2g   4.4g  68900 S 45.8 14.4 470:21.15 data-plane-kafk                                
10494 kafka     20   0   34.2g   4.4g  68900 S 45.5 14.4 469:54.86 data-plane-kafk                                
10493 kafka     20   0   34.2g   4.4g  68900 S 45.2 14.4 470:30.89 data-plane-kafk                                
10497 kafka     20   0   34.2g   4.4g  68900 S 45.2 14.4 470:19.32 data-plane-kafk                                
10491 kafka     20   0   34.2g   4.4g  68900 R 44.9 14.4 470:20.41 data-plane-kafk                                
10498 kafka     20   0   34.2g   4.4g  68900 S 43.9 14.4 470:25.80 data-plane-kafk                                
11062 kafka     20   0   34.2g   4.4g  68900 R 39.2 14.4 372:52.36 data-plane-kafk                                
11060 kafka     20   0   34.2g   4.4g  68900 S 35.5 14.4 397:22.49 data-plane-kafk                                
13119 kafka     20   0   34.2g   4.4g  68900 S  9.3 14.4 103:45.58 ReplicaFetcherT

Analysis

Common thread dumps

Thread 11063: (state = IN_NATIVE)
 - sun.nio.ch.EPollArrayWrapper.epollWait(long, int, long, int) @bci=0 (Compiled frame; information may be imprecise)
 - sun.nio.ch.EPollArrayWrapper.poll(long) @bci=18, line=269 (Compiled frame)
 - sun.nio.ch.EPollSelectorImpl.doSelect(long) @bci=28, line=93 (Compiled frame)
 - sun.nio.ch.SelectorImpl.lockAndDoSelect(long) @bci=37, line=86 (Compiled frame)
 - sun.nio.ch.SelectorImpl.select(long) @bci=30, line=97 (Compiled frame)
 - org.apache.kafka.common.network.Selector.select(long) @bci=35, line=794 (Compiled frame)
 - org.apache.kafka.common.network.Selector.poll(long) @bci=191, line=467 (Compiled frame)
 - kafka.network.Processor.poll() @bci=24, line=863 (Compiled frame)
 - kafka.network.Processor.run() @bci=31, line=762 (Compiled frame)
 - java.lang.Thread.run() @bci=11, line=748 (Interpreted frame)

Thread 10494: (state = BLOCKED)
 - sun.misc.Unsafe.park(boolean, long) @bci=0 (Compiled frame; information may be imprecise)
 - java.util.concurrent.locks.LockSupport.parkNanos(java.lang.Object, long) @bci=20, line=215 (Compiled frame)
 - java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(long) @bci=78, line=2078 (Compiled frame)
 - java.util.concurrent.ArrayBlockingQueue.poll(long, java.util.concurrent.TimeUnit) @bci=49, line=418 (Compiled frame)
 - kafka.network.RequestChannel.receiveRequest(long) @bci=8, line=344 (Compiled frame)
 - kafka.server.KafkaRequestHandler.run() @bci=72, line=54 (Interpreted frame)
 - java.lang.Thread.run() @bci=11, line=748 (Interpreted frame)
Thread 11063: (state = IN_JAVA)
 - java.util.HashMap.getNode(int, java.lang.Object) @bci=143, line=581 (Compiled frame; information may be imprecise)
 - java.util.HashMap.get(java.lang.Object) @bci=6, line=557 (Compiled frame)
 - org.apache.kafka.common.protocol.types.Schema.get(java.lang.String) @bci=5, line=158 (Compiled frame)
 - org.apache.kafka.common.protocol.types.Struct.get(java.lang.String) @bci=5, line=177 (Compiled frame)
 - org.apache.kafka.common.protocol.types.Struct.getInt(java.lang.String) @bci=2, line=233 (Compiled frame)
 - org.apache.kafka.common.protocol.types.Struct.get(org.apache.kafka.common.protocol.types.Field$Int32) @bci=5, line=84 (Compiled frame)
 - org.apache.kafka.common.requests.ProduceRequest.<init>(org.apache.kafka.common.protocol.types.Struct, short) @bci=116, line=249 (Compiled frame)
 - org.apache.kafka.common.requests.AbstractRequest.parseRequest(org.apache.kafka.common.protocol.ApiKeys, short, org.apache.kafka.common.protocol.types.Struct) @bci=210, line=147 (Compiled frame)
 - org.apache.kafka.common.requests.RequestContext.parseRequest(java.nio.ByteBuffer) @bci=64, line=64 (Compiled frame)
 - kafka.network.RequestChannel$Request.<init>(int, org.apache.kafka.common.requests.RequestContext, long, org.apache.kafka.common.memory.MemoryPool, java.nio.ByteBuffer, kafka.network.RequestChannel$Metrics) @bci=114, line=89 (Compiled frame)
 - kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(org.apache.kafka.common.network.NetworkReceive) @bci=260, line=890 (Compiled frame)
 - kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(java.lang.Object) @bci=5, line=873 (Compiled frame)
Thread 11062: (state = IN_JAVA) 
 - java.util.HashMap$HashIterator.<init>(java.util.HashMap) @bci=75, line=1433 (Compiled frame; information may be imprecise)
 - java.util.HashMap$KeyIterator.<init>(java.util.HashMap) @bci=7, line=1467 (Compiled frame)
 - java.util.HashMap$KeySet.iterator() @bci=8, line=917 (Compiled frame)
 - java.util.HashSet.iterator() @bci=7, line=173 (Compiled frame)
 - sun.nio.ch.Util$3.iterator() @bci=4, line=324 (Compiled frame)
 - org.apache.kafka.common.network.Selector.pollSelectionKeys(java.util.Set, boolean, long) @bci=5, line=518 (Compiled frame)
 - org.apache.kafka.common.network.Selector.poll(long) @bci=312, line=483 (Compiled frame)
 - kafka.network.Processor.poll() @bci=24, line=863 (Compiled frame)
 - kafka.network.Processor.run() @bci=31, line=762 (Compiled frame)
 - java.lang.Thread.run() @bci=11, line=748 (Interpreted frame)
Thread 11060: (state = IN_NATIVE)
 - sun.nio.ch.FileChannelImpl.transferTo0(java.io.FileDescriptor, long, long, java.io.FileDescriptor) @bci=0 (Compiled frame; information may be imprecise)
 - sun.nio.ch.FileChannelImpl.transferToDirectlyInternal(long, int, java.nio.channels.WritableByteChannel, java.io.FileDescriptor) @bci=107, line=428 (Compiled frame)
 - sun.nio.ch.FileChannelImpl.transferToDirectly(long, int, java.nio.channels.WritableByteChannel) @bci=217, line=493 (Compiled frame)
 - sun.nio.ch.FileChannelImpl.transferTo(long, long, java.nio.channels.WritableByteChannel) @bci=133, line=605 (Compiled frame)
 - org.apache.kafka.common.network.PlaintextTransportLayer.transferFrom(java.nio.channels.FileChannel, long, long) @bci=8, line=215 (Compiled frame)
 - org.apache.kafka.common.record.FileRecords.writeTo(java.nio.channels.GatheringByteChannel, long, int) @bci=123, line=283 (Compiled frame)
 - org.apache.kafka.common.record.DefaultRecordsSend.writeTo(java.nio.channels.GatheringByteChannel, long, int) @bci=11, line=33 (Compiled frame)
 - org.apache.kafka.common.record.RecordsSend.writeTo(java.nio.channels.GatheringByteChannel) @bci=25, line=58 (Compiled frame)
 - org.apache.kafka.common.record.MultiRecordsSend.writeTo(java.nio.channels.GatheringByteChannel) @bci=24, line=93 (Compiled frame)
 - org.apache.kafka.common.network.KafkaChannel.send(org.apache.kafka.common.network.Send) @bci=10, line=429 (Compiled frame)
 - org.apache.kafka.common.network.KafkaChannel.write() @bci=14, line=399 (Compiled frame)
 - org.apache.kafka.common.network.Selector.pollSelectionKeys(java.util.Set, boolean, long) @bci=498, line=589 (Compiled frame)

ss shows that network connection is at normal range

Total: 7010 (kernel 0)
TCP:   6804 (estab 6792, closed 0, orphaned 0, synrecv 0, timewait 0/0), ports 0

Transport Total     IP        IPv6
*	  0         -         -        
RAW	  0         0         0        
UDP	  8         4         4        
TCP	  6804      7         6797     
INET	  6812      11        6801     
FRAG	  0         0         0  

The kafka process is not limited by any config

Limit                     Soft Limit           Hard Limit           Units     
Max cpu time              unlimited            unlimited            seconds   
Max file size             unlimited            unlimited            bytes     
Max data size             unlimited            unlimited            bytes     
Max stack size            8388608              unlimited            bytes     
Max core file size        0                    unlimited            bytes     
Max resident set          unlimited            unlimited            bytes     
Max processes             124668               124668               processes 
Max open files            128000               128000               files     
Max locked memory         65536                65536                bytes     
Max address space         unlimited            unlimited            bytes     
Max file locks            unlimited            unlimited            locks     
Max pending signals       124668               124668               signals   
Max msgqueue size         819200               819200               bytes     
Max nice priority         0                    0                    
Max realtime priority     0                    0                    
Max realtime timeout      unlimited            unlimited            us  

GC configs

Memory: 4k page, physical 31960748k(31339152k free), swap 0k(0k free)
CommandLine flags: -XX:+ExplicitGCInvokesConcurrent -XX:GCLogFileSize=104857600 -XX:InitialHeapSize=4294967296 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ManagementServer -XX:MaxGCPauseMillis=20 -XX:MaxHeapSize=4294967296 -XX:NumberOfGCLogFiles=10 -XX:+PrintGC -XX:+PrintGCDateStamps -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseCompressedClassPointers -XX:+UseCompressedOops -XX:+UseG1GC -XX:+UseGCLogFileRotation 

GC actions

2020-05-27T19:52:59.673+0000: 221.678: [GC pause (G1 Evacuation Pause) (young), 0.0221126 secs]
2020-05-27T19:53:00.458+0000: 222.463: [GC pause (G1 Evacuation Pause) (young), 0.0298531 secs]
2020-05-27T19:53:00.758+0000: 222.763: [GC pause (G1 Evacuation Pause) (young), 0.0145942 secs]
2020-05-27T19:53:01.661+0000: 223.667: [GC pause (G1 Evacuation Pause) (young), 0.0193434 secs]
2020-05-27T19:53:01.995+0000: 224.000: [GC pause (G1 Evacuation Pause) (young), 0.0157352 secs]
2020-05-27T19:53:02.286+0000: 224.292: [GC pause (G1 Evacuation Pause) (young), 0.0410610 secs]
2020-05-27T19:53:02.564+0000: 224.570: [GC pause (G1 Evacuation Pause) (young), 0.0161346 secs]
2020-05-27T19:53:02.852+0000: 224.857: [GC pause (G1 Evacuation Pause) (young), 0.0131654 secs]
2020-05-27T19:53:03.298+0000: 225.304: [GC pause (G1 Evacuation Pause) (young), 0.0145538 secs]
2020-05-27T19:53:03.938+0000: 225.943: [GC pause (G1 Evacuation Pause) (young), 0.0149772 secs]
2020-05-27T19:53:04.449+0000: 226.454: [GC pause (G1 Evacuation Pause) (young), 0.0178123 secs]
2020-05-27T19:53:04.675+0000: 226.680: [GC pause (G1 Evacuation Pause) (young), 0.0134442 secs]
2020-05-27T19:53:04.976+0000: 226.981: [GC pause (G1 Evacuation Pause) (young), 0.0185317 secs]
2020-05-27T19:53:05.388+0000: 227.393: [GC pause (G1 Evacuation Pause) (young), 0.0114091 secs]
2020-05-27T19:53:05.877+0000: 227.882: [GC pause (G1 Evacuation Pause) (young), 0.0133841 secs]
2020-05-27T19:53:06.355+0000: 228.361: [GC pause (G1 Evacuation Pause) (young), 0.0156590 secs]
2020-05-27T19:53:06.903+0000: 228.908: [GC pause (G1 Evacuation Pause) (young), 0.0147414 secs]
2020-05-27T19:53:07.211+0000: 229.216: [GC pause (G1 Evacuation Pause) (young), 0.0126670 secs]
2020-05-27T19:53:07.681+0000: 229.687: [GC pause (G1 Evacuation Pause) (young), 0.0160053 secs]
2020-05-27T19:53:08.148+0000: 230.153: [GC pause (G1 Evacuation Pause) (young), 0.0123430 secs]
2020-05-27T19:53:08.618+0000: 230.623: [GC pause (G1 Evacuation Pause) (young), 0.0157585 secs]
2020-05-27T19:53:09.344+0000: 231.349: [GC pause (G1 Evacuation Pause) (young), 0.0152343 secs]
2020-05-27T19:53:10.220+0000: 232.225: [GC pause (G1 Evacuation Pause) (young), 0.0129275 secs]
2020-05-27T19:53:11.181+0000: 233.186: [GC pause (G1 Evacuation Pause) (young), 0.0126668 secs]

GC is a suspoect as we pause more than once per second. Our goal is to maintain 90% gc time at 20ms, and on average no more than once per sec

so we change to

CommandLine flags: -XX:CICompilerCount=4 -XX:ConcGCThreads=2 -XX:+ExplicitGCInvokesConcurrent -XX:G1HeapRegionSize=2097152 -XX:GCLogFileSize=104857600 -XX:InitialHeapSize=6442450944 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ManagementServer -XX:MarkStackSize=4194304 -XX:MaxGCPauseMillis=20 -XX:MaxHeapSize=6442450944 -XX:MaxNewSize=3865051136 -XX:MinHeapDeltaBytes=2097152 -XX:NumberOfGCLogFiles=10 -XX:+PrintGC -XX:+PrintGCDateStamps -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseCompressedClassPointers -XX:+UseCompressedOops -XX:+UseG1GC -XX:+UseGCLogFileRotation

and GC changes to

2020-05-30T19:53:09.518+0000: 170448.394: [GC pause (G1 Evacuation Pause) (young), 0.0181099 secs]
2020-05-30T19:53:12.206+0000: 170451.082: [GC pause (G1 Evacuation Pause) (young), 0.0199838 secs]
2020-05-30T19:53:15.498+0000: 170454.375: [GC pause (G1 Evacuation Pause) (young), 0.0196726 secs]
2020-05-30T19:53:18.248+0000: 170457.124: [GC pause (G1 Evacuation Pause) (young), 0.0189805 secs]
2020-05-30T19:53:21.330+0000: 170460.206: [GC pause (G1 Evacuation Pause) (young), 0.0199247 secs]
2020-05-30T19:53:24.483+0000: 170463.359: [GC pause (G1 Evacuation Pause) (young), 0.0202763 secs]
2020-05-30T19:53:27.667+0000: 170466.543: [GC pause (G1 Evacuation Pause) (young), 0.0194760 secs]
2020-05-30T19:53:31.174+0000: 170470.051: [GC pause (G1 Evacuation Pause) (young), 0.0198198 secs]
2020-05-30T19:53:34.597+0000: 170473.473: [GC pause (G1 Evacuation Pause) (young), 0.0206716 secs]
2020-05-30T19:53:37.317+0000: 170476.193: [GC pause (G1 Evacuation Pause) (young), 0.0197347 secs]
2020-05-30T19:53:40.785+0000: 170479.661: [GC pause (G1 Evacuation Pause) (young), 0.0182224 secs]
2020-05-30T19:53:44.389+0000: 170483.266: [GC pause (G1 Evacuation Pause) (young), 0.0195625 secs]
2020-05-30T19:53:47.709+0000: 170486.585: [GC pause (G1 Evacuation Pause) (young), 0.0207940 secs]
2020-05-30T19:53:51.014+0000: 170489.890: [GC pause (G1 Evacuation Pause) (young), 0.0176806 secs]
2020-05-30T19:53:54.912+0000: 170493.788: [GC pause (G1 Evacuation Pause) (young), 0.0192957 secs]
2020-05-30T19:53:58.615+0000: 170497.491: [GC pause (G1 Evacuation Pause) (young), 0.0196635 secs]
2020-05-30T19:54:02.422+0000: 170501.298: [GC pause (G1 Evacuation Pause) (young), 0.0210456 secs]
2020-05-30T19:54:05.657+0000: 170504.533: [GC pause (G1 Evacuation Pause) (young), 0.0185793 secs]
2020-05-30T19:54:09.633+0000: 170508.509: [GC pause (G1 Evacuation Pause) (young), 0.0197715 secs]

We can see overall pause is GC is reduced, even though we keep the region size roughly same.

Editorial: Coding set 4

2020-05-25 00:00:00 +0000

Good for whiteboarding

1494. Parallel Courses II

  • My gut feeling was wrong
  • Took me too long

Basic Calculator

  • Multiple solutions
  • Follow up:
  • Support a ternary operator like in Java.
  • A method to print the expression string that identicaly to the input, preserves all the formats like extra spaces.

Create Maximum Number

  • My gut feeling was wrong

312. Burst Balloons

  • Was not able to see the insight

sqrt decomposition

2020-05-19 00:00:00 +0000

Motivation

  • Range operation on an array that costs O(n), we want to improve to O(sqrt(n)) with O(sqrt(n)) additional memory
  • May have write operation along with reads, write/read ratio could be high

Why each block has size sqrt?

The reasoning is similar to master theorem, i.e., the number of subproblems vs the cost of combining results from subproblems. The worst case is optimized when the two parts of the total cost is equal

Example

  • No write operation, answer abitrary range query, each would cost O(n) without this techinque
  • A mixture of read-write operation, one with O(1) and one O(n). So overall cost will be O(n). After applying the technique the cost will be dropped back to O(sqrt(n)).
    • Works best when write/read ratio could be high
    • Often an alternative for segment tree

Compare with meet in the middle

  • After computing each sqrt bucket, most likely we just keep the result in the form of aggregation, e.g., SUM, MAX, MIN, but in MitM, we do have a full result set of brute force search.
  • After sqrt decomposition, we know that solution either does not exist across buckets or can be calculated quickly by the aggregation results on each bucket. In MitM, we have to combine search results from both sides to find the solution
  • Because we only split problem into half. MitM is less likely to handle update well compared with sqrt decomposition

Example

Editorial: Coding set 3

2020-05-16 00:00:00 +0000

System design: web scale chat systems

2020-05-12 00:00:00 +0000

Requirements

  • Multiple clients for a single user is allowed, and all messages should be synced between the clients
  • Messages are displayed in the same order as closely as possible on all clients. See the discussion below
  • Web scale
  • Use WebSocket to deliver messages
  • User Authentication needed
  • Can ignore friend list feature for now
  • Need to support group chat

What kind of order must be maintained?

  • Partial order within the same sender on the same client must be maintained. From a sender’s pov, the message it sends should arrive in the same order in all clients,e.g., If the user sends messages in the order of is 1, 2, it is OK to delay the delivery of 2 until 1 is delivered, but it is not ok to display 2 on the receiver’s client, and then insert 1 in front of 2.
  • Complete order between senders within the same dialog should not be maintained. Otherwise, the whole group will be slowed down by the slowest participant - Everyone will be fighting for the same distributed lock within the group

    Capacity estimation

  • 1B user clients. Each sends 1 message every 2 min in 12 hours = 1B * 1 * 30 * 12 = 360B messsages
    • We will assume they are in the same region instead of distributed across regions
  • Each message 1kb = 360 TB storage per day
  • Peak write traffic = 4 times of average about 35 mil writes per sec
  • Because the read is powered by websocket, the read request will be higher by write but not much higher.
  • Each websocket server maintains 100k connections, so we need 10k node
  • Each DB node handles 10k request per sec peak, so need a 350 node cluster

High level components breakdown

Excluding common services, we need

  • User authenticaiton and authorization module (UAA) for concrete auth logic
  • User client service that manages 1-to-many user-client relationship
  • User domain service that stores detailed user info. Can be combined with user client service
  • Message domain service manages the details and life-cycle of individual message
  • Dialog domain service that manages dialog lifecycle, including dialog-> message mapping, and dialog->user, mapping
  • Notification service to host websocket connections to clients

Why separate message domain and dialog domain?

Aside from conceptually reasons, the practical reasons:

  • Message is subject to extension, e.g., video/voice messages.
  • Need way to locate message by its id for various purposes

Why separate notification domain and dialog domain

The two have different characterisitcs

  • Dialog: heavy state + short connections
  • notificaiton: light state + long connections/websocket

Otherwise, heavy state + long connection is a much harder problem

User authenticaion and authorization(UAA)

  • SSO such as open id, gives the access_token and refresh_token to every single client. For each request, user will provide
    • access_token
    • openid
    • login_type
  • When a request hits the API gateway, API gateway will hit the UAA and get the corresponding JWT token before forwarding to the actual target service
  • UAA makes sure (login_type, openid) exists, and return the code to the client
  • Client will use the return code to get the token
  • Need to maintain
    • (user_id, auth_type) -> (token, expiry, meta)
    • (user_id, auth_id) -> (name, salted hash, meta) - can be combined with above table
    • auth_id -> (user_id, openid, login_type, access_token)

Message service

  • Maintains the mapping (msg_id) -> (sender_id, dialog_id, content, status, create_at, updated_at). msg_id is a snowflake id which is roughly sorted by the processing time
  • Status is to mark if the message has been delivered to the dialog service. This is required as 2PC/TCC state
  • We need 2PC/TCC support between the message domain and dialog domain because messages from the same sender must display in the same relative order across all clients.
  • Only after the we deliver to the dialog service that we return to the client the message has been delivered. Again, if we don’t have to maintain a consistent order for messages from the same sender, we can return to client the moment it is persisted to message db, and do the delivery async

TCC vs MQ for tranferring data between message domain and dialog domain

Basically a decision between sync and async communication. In this case TCC is preferred over MQ

  • Chat favors low latency over throughput. We can fail early and let user try again
  • Reverting operation in MQ is a lot trickier than sync communication

This also means we accept the risk that we will lose th buffer capability of the MQ when there is a major outage across the cluster

Dialog service

Maintains the following mappings

  • (dialogue_id, user_ID) -> metadata - members in a dialouge
  • (user_id, dialogue_id) -> metadata - dialogues a user is in
  • (dialog_id, message_created_at) –> message_id - for the content of dialogue
  • partitioned by the dialogue_id, order key is the PK so that transactions remain on the same node

  • Upon persisting message from the message service, dialog service send a notification via notification service to client that new messages are available. Client will then poll from the dialog servic.
  • Can also push directly to the client as part of notification instead of notifying client to poll. In production, it is most likely a hybrid to be reactive
  • Delivered messaged can be purged from the DB, since we have message service as source of truth

User client service

  • Maintains (user_id, client_id) -> (notification node, metadata)
  • Acts as router to route the incoming notifiction request to the real server
  • 1B client * 1KB each entry = 1T storage
  • Simple ID based sharding + 10 nodes is enough to host the metadata

Where do we maintain the checkpoint on the messages user’s poll

Either local client side or user client service side could work. In any case, server side needs to defend against malicious poll requests

Local cilent

  • In local storage maintains received message ids and their content so that client can do local dedup and reordering.
  • Client does not send out another message until the previous one has been acked by the server side.
    • Inputing message is a low throughput activity so such delay is OK.
    • If we really want to support concurrent send, then we have to maintain a sequence number for each dialog, which further complicates the design
  • Note we can’t rely on client time because of the consist, in order requirement - the processing time in the message service is the source of truth time

What if I want to maintain the same relative order of the message i sent across all clients

  • This means we need to introduce a sequence number for each (dialog_id, user_id) which acts as the logical distributed lock, and fetch and then increment it every one i send a message.
  • In theory we could still use the message creation time instead of sequence number, but in practice hard to deal with time drift issues

Notification service

  • This is the stateless layer that maintains the long-living client connections
  • Long connection gateway (LCG) connects mulitple clients and multiple business backend (many-to-many). Between them only 1 long conneciton.
  • LCG supports pub/sub model.
  • LCG should provide allow callback to business backend to support domain-specific auth
  • Open resty layer to keep session and LB.
  • Long connection broker for protocal, auth, session, pub/sub management

If we have to deploy it globally to serve the global traffic, which areas need to change

TODO: dive deep on this later

How pd schedules regions

2020-04-29 00:00:00 +0000

  • Store : each tikv server maps to a store
  • Pending: learner/follower log lags behind leader by a large margin
  • PD use standard revision + lease for leader failover

  • Scheduler: generate the operator, which encapsulates the steps related to rebalancing for a single region,i.e., one region has mutliple steps
    • balance-leader-scheduler
    • balance-region-scheduler
    • hot-region-scheduler

type Scheduler interface {
	GetName() string
	GetResourceKind() ResourceKind
	Schedule(cluster *clusterInfo) Operator
}

After operator is generated, it will be put in a queue and execute sequentially (possibly in parallel), by sending the operator step to the region leader as a response to the heartbeat

  • It will wait for timeout with ack

type Operator interface {
	GetRegionID() uint64
	GetResourceKind() ResourceKind
	Do(region *regionInfo) (*pdpb.RegionHeartbeatResponse, bool)
}

Source code: how kafka consumer commits offset

2020-04-29 00:00:00 +0000

  • Source code version 2.4
  • What does asyncCommitFenced do?

In ConsumerCoordinator


    private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
        Node coordinator = checkAndGetCoordinator();

        // create the offset commit request
        Map<String, OffsetCommitRequestData.OffsetCommitRequestTopic> requestTopicDataMap = new HashMap<>();
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
            TopicPartition topicPartition = entry.getKey();
            OffsetAndMetadata offsetAndMetadata = entry.getValue();

            OffsetCommitRequestData.OffsetCommitRequestTopic topic = requestTopicDataMap
                    .getOrDefault(topicPartition.topic(),
                            new OffsetCommitRequestData.OffsetCommitRequestTopic()
                                    .setName(topicPartition.topic())
                    );

            topic.partitions().add(new OffsetCommitRequestData.OffsetCommitRequestPartition()
                    .setPartitionIndex(topicPartition.partition())
                    .setCommittedOffset(offsetAndMetadata.offset())
                    .setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch().orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
                    .setCommittedMetadata(offsetAndMetadata.metadata())
            );
            requestTopicDataMap.put(topicPartition.topic(), topic);
        }

        final Generation generation = generationIfStable();

        OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
                new OffsetCommitRequestData()
                        .setGroupId(this.rebalanceConfig.groupId)
                        .setGenerationId(generation.generationId)
                        .setMemberId(generation.memberId)
                        .setGroupInstanceId(rebalanceConfig.groupInstanceId.orElse(null))
                        .setTopics(new ArrayList<>(requestTopicDataMap.values()))
        );

        log.trace("Sending OffsetCommit request with {} to coordinator {}", offsets, coordinator);

        return client.send(coordinator, builder)
                .compose(new OffsetCommitResponseHandler(offsets));
    }

On server side, the operation detection is done by checking the value request.header.apiKey, e.g.,


        case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
        case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
        case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
        case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)

The response is


   public OffsetCommitResponse(int requestThrottleMs, Map<TopicPartition, Errors> responseData) {
        Map<String, OffsetCommitResponseTopic>
                responseTopicDataMap = new HashMap<>();

        for (Map.Entry<TopicPartition, Errors> entry : responseData.entrySet()) {
            TopicPartition topicPartition = entry.getKey();
            String topicName = topicPartition.topic();

            OffsetCommitResponseTopic topic = responseTopicDataMap.getOrDefault(
                topicName, new OffsetCommitResponseTopic().setName(topicName));

            topic.partitions().add(new OffsetCommitResponsePartition()
                                       .setErrorCode(entry.getValue().code())
                                       .setPartitionIndex(topicPartition.partition()));
            responseTopicDataMap.put(topicName, topic);
        }

        data = new OffsetCommitResponseData()
                .setTopics(new ArrayList<>(responseTopicDataMap.values()))
                .setThrottleTimeMs(requestThrottleMs);
    }

Coding set 2

2020-04-27 00:00:00 +0000

Source code: How Kafka GroupCoordinator handles membership

2020-04-26 00:00:00 +0000

  • Server side of the coordination logic. Removed/rewrite corner cases and error handling
  • Source code verison 2.4
  • See kafka.coordinator.group.GroupState for the state transition

Started inside KafkaServer.startup


  /**
   * Startup logic executed at the same time when the server starts up.
   */
  def startup(enableMetadataExpiration: Boolean = true): Unit = {
    info("Starting up.")
    groupManager.startup(enableMetadataExpiration)
    isActive.set(true)
    info("Startup complete.")
  }

How LEAVE_GROUP request is handled

In handleLeaveGroup.It will be triggered when

  • Consumer calls unsubscribe
  • Consumer heartbeat times out. Consumer will send LEAVE_GROUP request

//Group is locked for concurrent edit
                  val memberId = leavingMember.memberId
                  val groupInstanceId = Option(leavingMember.groupInstanceId)
                  if (group.isPendingMember(memberId)) {
                      // if a pending member is leaving, it needs to be removed from the pending list, heartbeat cancelled
                      // and if necessary, prompt a JoinGroup completion.
                      info(s"Pending member $memberId is leaving group ${group.groupId}.")
                      removePendingMemberAndUpdateGroup(group, memberId)
                      heartbeatPurgatory.checkAndComplete(MemberKey(group.groupId, memberId))
                  } else {
                    val member = if (group.hasStaticMember(groupInstanceId))
                      group.get(group.getStaticMemberId(groupInstanceId))
                    else
                      group.get(memberId)
  member.isLeaving = true
    val memberKey = MemberKey(member.groupId, member.memberId)
    heartbeatPurgatory.checkAndComplete(memberKey)
                    info(s"Member[group.instance.id ${member.groupInstanceId}, member.id ${member.memberId}] " +
                      s"in group ${group.groupId} has left, removing it from the group")
  // New members may timeout with a pending JoinGroup while the group is still rebalancing, so we have
    // to invoke the callback before removing the member. We return UNKNOWN_MEMBER_ID so that the consumer
    // will retry the JoinGroup request if is still active.
    group.maybeInvokeJoinCallback(member, joinError(NoMemberId, Errors.UNKNOWN_MEMBER_ID))

    group.remove(member.memberId)
    group.removeStaticMember(member.groupInstanceId)

    group.currentState match {
      case Dead | Empty =>
      case Stable | CompletingRebalance => maybePrepareRebalance(group, reason)
      case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
    }
                  }

How is heartbeat handled


//when group is still loading, respond blindly
//group is mutex is locked in the whole process

          group.currentState match {
            case Empty =>
              responseCallback(Errors.UNKNOWN_MEMBER_ID)

            case CompletingRebalance =>
                responseCallback(Errors.REBALANCE_IN_PROGRESS)

            case PreparingRebalance =>
                val member = group.get(memberId)
                completeAndScheduleNextHeartbeatExpiration(group, member)
                responseCallback(Errors.REBALANCE_IN_PROGRESS)

            case Stable =>
                val member = group.get(memberId)
                completeAndScheduleNextHeartbeatExpiration(group, member)
                responseCallback(Errors.NONE)

            case Dead =>
              throw new IllegalStateException(s"Reached unexpected condition for Dead group $groupId")


  private def completeAndScheduleNextExpiration(group: GroupMetadata, member: MemberMetadata, timeoutMs: Long): Unit = {
    // complete current heartbeat expectation
    member.latestHeartbeat = time.milliseconds()
    val memberKey = MemberKey(member.groupId, member.memberId)
    heartbeatPurgatory.checkAndComplete(memberKey)

    // reschedule the next heartbeat expiration deadline
    val deadline = member.latestHeartbeat + timeoutMs
    val delayedHeartbeat = new DelayedHeartbeat(this, group, member.memberId, isPending = false, deadline, timeoutMs)
    heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
  }

  def onExpireHeartbeat(group: GroupMetadata, memberId: String, isPending: Boolean, heartbeatDeadline: Long): Unit = {
    group.inLock {
      if (group.is(Dead)) {
        info(s"Received notification of heartbeat expiration for member $memberId after group ${group.groupId} had already been unloaded or deleted.")
      } else if (isPending) {
        info(s"Pending member $memberId in group ${group.groupId} has been removed after session timeout expiration.")
        removePendingMemberAndUpdateGroup(group, memberId)
      } else if (!group.has(memberId)) {
        debug(s"Member $memberId has already been removed from the group.")
      } else {
        val member = group.get(memberId)
        if (!member.shouldKeepAlive(heartbeatDeadline)) {
          info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group")
          removeMemberAndUpdateGroup(group, member, s"removing member ${member.memberId} on heartbeat expiration")
        }
      }
    }
  }

Source code: How Kafka consumer decides which partitions to poll

2020-04-24 00:00:00 +0000

  • Client side of the coordination logic. Removed/rewrite corner cases and error handling
  • Source code verison 2.4

In org.apache.kafka.clients.consumer.KafkaConsumer

private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
	coordinator.poll(timer); //note we poll coordinator every time!
	fetcher.validateOffsetsIfNeeded();
	coordinator.refreshCommittedOffsetsIfNeeded(timer)
	subscriptions.resetMissingPositions();
	fetcher.resetOffsetsIfNeeded();
  	final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);

 	if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) 
		client.transmitSends();

	return this.interceptors.onConsume(new ConsumerRecords<>(records));
}

The paritition assignment refresh logic is in org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. Call it CC below

Data structure of CC

public final class ConsumerCoordinator extends AbstractCoordinator {
    private boolean isLeader = false;
    private final SubscriptionState subscriptions;
    private Set<String> joinedSubscription; //updated by subscrptions
    private MetadataSnapshot metadataSnapshot; 
    private MetadataSnapshot assignmentSnapshot; //updated by metadataSnapshot but not vice-versa. Updated only if current CC is the leader
    private boolean rejoinNeeded = true;
    private boolean needsJoinPrepare = true;
    private MemberState state = MemberState.UNJOINED;
    private HeartbeatThread heartbeatThread = null;

  • Note the heartbeat thread inside the coordinator does not update the internal state

How ConsumerCoordinator polls

   public boolean rejoinNeededOrPending() {
        if (!subscriptions.partitionsAutoAssigned())
            return false;

        // we need to rejoin if we performed the assignment and metadata has changed;
        // also for those owned-but-no-longer-existed partitions we should drop them as lost
        if (assignmentSnapshot != null && !assignmentSnapshot.matches(metadataSnapshot))
            return true;

        // we need to join if our subscription has changed since the last join
        if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription())) {
            return true;
        }

	return rejoinNeeded || joinFuture != null; //rejoinNeeded defaults to true
    }

    private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
        @Override
        public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
                log.debug("Received successful JoinGroup response: {}", joinResponse);

                synchronized (AbstractCoordinator.this) {
                    if (state != MemberState.REBALANCING) {
                        // if the consumer was woken up before a rebalance completes, we may have already left
                        // the group. In this case, we do not want to continue with the sync group.
                        future.raise(new UnjoinedGroupException());
                    } else {
                        AbstractCoordinator.this.generation = new Generation(joinResponse.data().generationId(),
                                joinResponse.data().memberId(), joinResponse.data().protocolName());
                        if (joinResponse.isLeader()) {
                            onJoinLeader(joinResponse).chain(future);
                        } else {
    SyncGroupRequest.Builder requestBuilder =
                new SyncGroupRequest.Builder(
                        new SyncGroupRequestData()
                                .setGroupId(rebalanceConfig.groupId)
                                .setMemberId(generation.memberId)
                                .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
                                .setGenerationId(generation.generationId)
                                .setAssignments(Collections.emptyList())
                );
        log.debug("Sending follower SyncGroup to coordinator {} at generation {}: {}", this.coordinator, this.generation, requestBuilder);
        syncFuture = client.send(coordinator, requestBuilder).compose(new SyncGroupResponseHandler());
                            syncFuture.chain(future);
                        }
                    }
                }
        }
    }

    private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
            state = MemberState.REBALANCING;
  	int joinGroupTimeoutMs = Math.max(rebalanceConfig.rebalanceTimeoutMs, rebalanceConfig.rebalanceTimeoutMs + 5000);
    log.info("(Re-)joining group");
        JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
                new JoinGroupRequestData()
                        .setGroupId(rebalanceConfig.groupId)
                        .setSessionTimeoutMs(this.rebalanceConfig.sessionTimeoutMs)
                        .setMemberId(this.generation.memberId)
                        .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
                        .setProtocolType(protocolType())
                        .setProtocols(metadata()) //this one internally update joined subscription!
                        .setRebalanceTimeoutMs(this.rebalanceConfig.rebalanceTimeoutMs)
        );

        log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator);
            joinFuture = client.send(coordinator, requestBuilder, joinGroupTimeoutMs)
                .compose(new JoinGroupResponseHandler());
            joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
                @Override
                public void onSuccess(ByteBuffer value) {
                    // handle join completion in the callback so that the callback will be invoked
                    // even if the consumer is woken up before finishing the rebalance
                    synchronized (AbstractCoordinator.this) {
                            log.info("Successfully joined group with generation {}", generation.generationId);
                            state = MemberState.STABLE;
                            rejoinNeeded = false;
                            // record rebalance latency
                            if (heartbeatThread != null)
                                heartbeatThread.enable();
                    }
                }
            });
        }
        return joinFuture;
    }

    boolean joinGroupIfNeeded(final Timer timer) {
            // call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second
            // time if the client is woken up before a pending rebalance completes. This must be called
            // on each iteration of the loop because an event requiring a rebalance (such as a metadata
            // refresh which changes the matched subscription set) can occur while another rebalance is
            // still in progress.
            if (needsJoinPrepare) {
                // need to set the flag before calling onJoinPrepare since the user callback may throw
                // exception, in which case upon retry we should not retry onJoinPrepare either.
                needsJoinPrepare = false;
                onJoinPrepare(generation.generationId, generation.memberId);
            }

            final RequestFuture<ByteBuffer> future = initiateJoinGroup();
            client.poll(future, timer);
    Generation generationSnapshot;

                // Generation data maybe concurrently cleared by Heartbeat thread.
                // Can't use synchronized for {@code onJoinComplete}, because it can be long enough
                // and  shouldn't block hearbeat thread.
                // See {@link PlaintextConsumerTest#testMaxPollIntervalMsDelayInAssignment
                synchronized (AbstractCoordinator.this) {
                    generationSnapshot = this.generation;
                }

                    // Duplicate the buffer in case `onJoinComplete` does not complete and needs to be retried.
                    ByteBuffer memberAssignment = future.value().duplicate();

                    onJoinComplete(generationSnapshot.generationId, generationSnapshot.memberId, generationSnapshot.protocol, memberAssignment);

                    // Generally speaking we should always resetJoinGroupFuture once the future is done, but here
                    // we can only reset the join group future after the completion callback returns. This ensures
                    // that if the callback is woken up, we will retry it on the next joinGroupIfNeeded.
                    // And because of that we should explicitly trigger resetJoinGroupFuture in other conditions below.
                    resetJoinGroupFuture();
                    needsJoinPrepare = true; 
}
}

public boolean poll(Timer timer) {
	if (rejoinNeededOrPending()) { //the internal states refershed by a heartbeat thread. Here we only check the staleness
		joinGroupIfNeeded(timer);
	}
}

How heartbeat works - HeartbeatResponseHandler


  public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
            Errors error = heartbeatResponse.error();
            if (error == Errors.NONE) {
                log.debug("Received successful Heartbeat response");
                future.complete(null);
            } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                log.info("Attempt to heartbeat failed since group is rebalancing");
		this.rejoinNeeded = true
                future.raise(error);
            } else if (error == Errors.ILLEGAL_GENERATION) {
                log.info("Attempt to heartbeat failed since generation {} is not current", generation.generationId);
		 this.generation = Generation.NO_GENERATION;
  		rejoinNeeded = true;
        state = MemberState.UNJOINED;
                future.raise(error);
            } else {
                future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
            }
        }

On PermGen

2020-04-22 00:00:00 +0000

  • Class loader reads the class file and loads the interpreted data into the JVM
  • Static non-final variables are stored in the memory area, but static final variables are stored in runtime constant pool - still in the PermGen though. Also, final class variables are stored as part of the data that any type can use, whereas class variables is stores as part of the data for the type that declares them
  • For each type:
    • FQN of the type
    • FQN of super class
    • Is this type a class or interface?
    • Type modifier set - public, abstract, final?
    • FQN of direct superinterfaces it implements
    • Constant pool: an array holding literals and symbolic reference to types, fields, and methods
    • For each field:
      • name
      • type
      • modifier
    • For each method:
      • name
      • return type
      • parameter # and types
      • modifier set (public, private, abstract, final…)
      • If method is not abstract or native:
        • method byte codes
        • size of the operand stack and local variables
        • Exception table
    • Class variables, i.e., static but not final
    • Reference to ClassLoader to support user-defined ClassLoader
    • Reference to class Class
    • For non-abstract class, method table - array of direct references to array instance methods that maybe invoked from the instance

How is the method area involved when we execute the main()

class Volcano {

    public static void main(String[] args) {
        Lava lava = new Lava();
        lava.flow();
    }
}
  • Class loader loads the class the main() method is in, populate method area with the type info listed above
  • VM runs the byte codes of main() from method area, and maintain a pointer to the contant pool (in method area) of main()’s class
  • Lava is a symbol in constant pool, so the class loader loads the Lava type and populate the corresponding method area
  • VM replace the symbol with reference to the Lava class’s class data
  • VM allocates memory for Lava, by look up Lava in method area to find out its heap space
  • VM inits all fields in lava to its default value
  • Pushs the reference to the new Lava object into (operand) stack
  • use the reference to init the private value of Lava

How kafka manages consumers

2020-04-21 00:00:00 +0000

ConsumerCoordinator

  • Fetch thread to pull msgs.
  • Heartbeat thread to the coordinator. Default timeout at 10s (session.timeout.ms)
    • Will get the state of the coordinator, if it is rebalancing, send the JoinGroup request
  • Every consumer group has a coordinator on the broker. The coordinator manages the membership of consumers

Kafka group states

  • Empty: init state, no group member
  • PreparingBalance
  • InitialRebalance: delay entering PreparingBalance state to reduce rebalance times
  • AwaitSync: waiting for the new assignment from the group leader
  • Stable
  • Dead: empty group. Metadata no longer available

When joining new group, the state is Empty -> PreparingBalance -> AwaitSync -> Stable

Rebalance consumer

When the consumer membership changes, coordinator will rebalance consumer to partition association

  • Coordinator replys the heartbeat request from all consumers with rebalance notification. All consumers need to resend joinGroup request
  • JoinGroup: When new one joins the group
    • the coordinater will let the consumer group leader know the membership assignment, generaton
    • Other consumers will know the leader, generation and topic and its partition info
  • SyncGroup: after JoinGroup stage is over. Consumer leader will rebalance and return the assignment to the coordinator, which in turn broadcast to other consumers as reponse of SyncGroup call from followers
    • Default assignment strategy is RangeAssignor
  • If leader is lost during the rebalance, it will be detected by coordinator and the leader will be removed

Coding set 1

2020-04-20 00:00:00 +0000

Design: delayed task

2020-04-19 00:00:00 +0000

The purpose of delayed task is to execute some task in the the future

Common solutions:

  • DelayQueue
  • ScheduledExecutorService
  • zset in redis with activation time as key, need to check if deletion is successful to see if it is consumed multiple times
  • Use key space notification (Default is off in redis)
  • Timer wheel-based implementation

What if we can have up to 1 B tasks to be run the future

For example, we want to send different notifications to user 1 day after performing different types of actions. How do you design this?

Assume we have 100 mil users, each user can get up to 10 different delayed tasks per day, and we have to execute the task within 12 hours of the scheduled execution time

Then we have to persist with (virtual shard, timestamp, seq id) as the key prefix, and use timestamp as the checkpoint offset