Cluster setup

  • 6 m5.2xlarge
  • Kafka 2.3.0
  • openjdk version “1.8.0_232”
  • CentOS Linux release 7.6.1810 (Core)

Symptom

Every kafka broker is experiencing heavy CPU load at 15k msgs on each broker

procs -----------memory---------- ---swap-- -----io---- -system-- ------cpu-----
 r  b   swpd   free   buff  cache   si   so    bi    bo   in   cs us sy id wa st
 1  0      0 286240  44392 24279584    0    0    53   711    8    6 40 11 49  0  0
 5  0      0 287480  44392 24280624    0    0     0     0 48650 41962 15  5 80  0  0
 5  0      0 286992  44392 24281656    0    0     0     0 50061 41004 21  6 73  0  0
15  0      0 289344  44376 24272348    0    0     0     0 53212 44541 30  7 63  0  0
 5  0      0 291032  44340 24271480    0    0     0  9264 51796 43536 19  5 76  0  0
10  0      0 291656  44352 24272676    0    0     0  1604 46937 41888 18  5 77  0  0
 3  0      0 290776  44352 24273524    0    0     0     0 46349 40179 16  4 80  0  0
 2  0      0 289448  44352 24274356    0    0     0     0 50271 41767 18  6 76  0  0
 1  0      0 289444  44352 24275272    0    0     0     0 46326 39414 16  5 79  0  0
 9  0      0 287456  44352 24276560    0    0     0  3704 47776 39765 19  5 76  0  0

Each context swtich costs about 4 microseconds

Threads:  94 total,   8 running,  86 sleeping,   0 stopped,   0 zombie
%Cpu(s): 56.2 us,  9.9 sy,  0.0 ni, 28.1 id,  0.0 wa,  0.0 hi,  5.8 si,  0.0 st
KiB Mem : 31960748 total,   221784 free,  5007428 used, 26731536 buff/cache
KiB Swap:        0 total,        0 free,        0 used. 26347036 avail Mem 
  PID USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND                                        
11063 kafka     20   0   34.2g   4.4g  68900 R 53.2 14.4 474:03.72 data-plane-kafk                                
10495 kafka     20   0   34.2g   4.4g  68900 R 47.8 14.4 470:24.77 data-plane-kafk                                
10499 kafka     20   0   34.2g   4.4g  68900 S 46.5 14.4 470:24.25 data-plane-kafk                                
10492 kafka     20   0   34.2g   4.4g  68900 S 45.8 14.4 470:21.15 data-plane-kafk                                
10494 kafka     20   0   34.2g   4.4g  68900 S 45.5 14.4 469:54.86 data-plane-kafk                                
10493 kafka     20   0   34.2g   4.4g  68900 S 45.2 14.4 470:30.89 data-plane-kafk                                
10497 kafka     20   0   34.2g   4.4g  68900 S 45.2 14.4 470:19.32 data-plane-kafk                                
10491 kafka     20   0   34.2g   4.4g  68900 R 44.9 14.4 470:20.41 data-plane-kafk                                
10498 kafka     20   0   34.2g   4.4g  68900 S 43.9 14.4 470:25.80 data-plane-kafk                                
11062 kafka     20   0   34.2g   4.4g  68900 R 39.2 14.4 372:52.36 data-plane-kafk                                
11060 kafka     20   0   34.2g   4.4g  68900 S 35.5 14.4 397:22.49 data-plane-kafk                                
13119 kafka     20   0   34.2g   4.4g  68900 S  9.3 14.4 103:45.58 ReplicaFetcherT

Analysis

Common thread dumps

Thread 11063: (state = IN_NATIVE)
 - sun.nio.ch.EPollArrayWrapper.epollWait(long, int, long, int) @bci=0 (Compiled frame; information may be imprecise)
 - sun.nio.ch.EPollArrayWrapper.poll(long) @bci=18, line=269 (Compiled frame)
 - sun.nio.ch.EPollSelectorImpl.doSelect(long) @bci=28, line=93 (Compiled frame)
 - sun.nio.ch.SelectorImpl.lockAndDoSelect(long) @bci=37, line=86 (Compiled frame)
 - sun.nio.ch.SelectorImpl.select(long) @bci=30, line=97 (Compiled frame)
 - org.apache.kafka.common.network.Selector.select(long) @bci=35, line=794 (Compiled frame)
 - org.apache.kafka.common.network.Selector.poll(long) @bci=191, line=467 (Compiled frame)
 - kafka.network.Processor.poll() @bci=24, line=863 (Compiled frame)
 - kafka.network.Processor.run() @bci=31, line=762 (Compiled frame)
 - java.lang.Thread.run() @bci=11, line=748 (Interpreted frame)

Thread 10494: (state = BLOCKED)
 - sun.misc.Unsafe.park(boolean, long) @bci=0 (Compiled frame; information may be imprecise)
 - java.util.concurrent.locks.LockSupport.parkNanos(java.lang.Object, long) @bci=20, line=215 (Compiled frame)
 - java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(long) @bci=78, line=2078 (Compiled frame)
 - java.util.concurrent.ArrayBlockingQueue.poll(long, java.util.concurrent.TimeUnit) @bci=49, line=418 (Compiled frame)
 - kafka.network.RequestChannel.receiveRequest(long) @bci=8, line=344 (Compiled frame)
 - kafka.server.KafkaRequestHandler.run() @bci=72, line=54 (Interpreted frame)
 - java.lang.Thread.run() @bci=11, line=748 (Interpreted frame)
Thread 11063: (state = IN_JAVA)
 - java.util.HashMap.getNode(int, java.lang.Object) @bci=143, line=581 (Compiled frame; information may be imprecise)
 - java.util.HashMap.get(java.lang.Object) @bci=6, line=557 (Compiled frame)
 - org.apache.kafka.common.protocol.types.Schema.get(java.lang.String) @bci=5, line=158 (Compiled frame)
 - org.apache.kafka.common.protocol.types.Struct.get(java.lang.String) @bci=5, line=177 (Compiled frame)
 - org.apache.kafka.common.protocol.types.Struct.getInt(java.lang.String) @bci=2, line=233 (Compiled frame)
 - org.apache.kafka.common.protocol.types.Struct.get(org.apache.kafka.common.protocol.types.Field$Int32) @bci=5, line=84 (Compiled frame)
 - org.apache.kafka.common.requests.ProduceRequest.<init>(org.apache.kafka.common.protocol.types.Struct, short) @bci=116, line=249 (Compiled frame)
 - org.apache.kafka.common.requests.AbstractRequest.parseRequest(org.apache.kafka.common.protocol.ApiKeys, short, org.apache.kafka.common.protocol.types.Struct) @bci=210, line=147 (Compiled frame)
 - org.apache.kafka.common.requests.RequestContext.parseRequest(java.nio.ByteBuffer) @bci=64, line=64 (Compiled frame)
 - kafka.network.RequestChannel$Request.<init>(int, org.apache.kafka.common.requests.RequestContext, long, org.apache.kafka.common.memory.MemoryPool, java.nio.ByteBuffer, kafka.network.RequestChannel$Metrics) @bci=114, line=89 (Compiled frame)
 - kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(org.apache.kafka.common.network.NetworkReceive) @bci=260, line=890 (Compiled frame)
 - kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(java.lang.Object) @bci=5, line=873 (Compiled frame)
Thread 11062: (state = IN_JAVA) 
 - java.util.HashMap$HashIterator.<init>(java.util.HashMap) @bci=75, line=1433 (Compiled frame; information may be imprecise)
 - java.util.HashMap$KeyIterator.<init>(java.util.HashMap) @bci=7, line=1467 (Compiled frame)
 - java.util.HashMap$KeySet.iterator() @bci=8, line=917 (Compiled frame)
 - java.util.HashSet.iterator() @bci=7, line=173 (Compiled frame)
 - sun.nio.ch.Util$3.iterator() @bci=4, line=324 (Compiled frame)
 - org.apache.kafka.common.network.Selector.pollSelectionKeys(java.util.Set, boolean, long) @bci=5, line=518 (Compiled frame)
 - org.apache.kafka.common.network.Selector.poll(long) @bci=312, line=483 (Compiled frame)
 - kafka.network.Processor.poll() @bci=24, line=863 (Compiled frame)
 - kafka.network.Processor.run() @bci=31, line=762 (Compiled frame)
 - java.lang.Thread.run() @bci=11, line=748 (Interpreted frame)
Thread 11060: (state = IN_NATIVE)
 - sun.nio.ch.FileChannelImpl.transferTo0(java.io.FileDescriptor, long, long, java.io.FileDescriptor) @bci=0 (Compiled frame; information may be imprecise)
 - sun.nio.ch.FileChannelImpl.transferToDirectlyInternal(long, int, java.nio.channels.WritableByteChannel, java.io.FileDescriptor) @bci=107, line=428 (Compiled frame)
 - sun.nio.ch.FileChannelImpl.transferToDirectly(long, int, java.nio.channels.WritableByteChannel) @bci=217, line=493 (Compiled frame)
 - sun.nio.ch.FileChannelImpl.transferTo(long, long, java.nio.channels.WritableByteChannel) @bci=133, line=605 (Compiled frame)
 - org.apache.kafka.common.network.PlaintextTransportLayer.transferFrom(java.nio.channels.FileChannel, long, long) @bci=8, line=215 (Compiled frame)
 - org.apache.kafka.common.record.FileRecords.writeTo(java.nio.channels.GatheringByteChannel, long, int) @bci=123, line=283 (Compiled frame)
 - org.apache.kafka.common.record.DefaultRecordsSend.writeTo(java.nio.channels.GatheringByteChannel, long, int) @bci=11, line=33 (Compiled frame)
 - org.apache.kafka.common.record.RecordsSend.writeTo(java.nio.channels.GatheringByteChannel) @bci=25, line=58 (Compiled frame)
 - org.apache.kafka.common.record.MultiRecordsSend.writeTo(java.nio.channels.GatheringByteChannel) @bci=24, line=93 (Compiled frame)
 - org.apache.kafka.common.network.KafkaChannel.send(org.apache.kafka.common.network.Send) @bci=10, line=429 (Compiled frame)
 - org.apache.kafka.common.network.KafkaChannel.write() @bci=14, line=399 (Compiled frame)
 - org.apache.kafka.common.network.Selector.pollSelectionKeys(java.util.Set, boolean, long) @bci=498, line=589 (Compiled frame)

ss shows that network connection is at normal range

Total: 7010 (kernel 0)
TCP:   6804 (estab 6792, closed 0, orphaned 0, synrecv 0, timewait 0/0), ports 0

Transport Total     IP        IPv6
*	  0         -         -        
RAW	  0         0         0        
UDP	  8         4         4        
TCP	  6804      7         6797     
INET	  6812      11        6801     
FRAG	  0         0         0  

The kafka process is not limited by any config

Limit                     Soft Limit           Hard Limit           Units     
Max cpu time              unlimited            unlimited            seconds   
Max file size             unlimited            unlimited            bytes     
Max data size             unlimited            unlimited            bytes     
Max stack size            8388608              unlimited            bytes     
Max core file size        0                    unlimited            bytes     
Max resident set          unlimited            unlimited            bytes     
Max processes             124668               124668               processes 
Max open files            128000               128000               files     
Max locked memory         65536                65536                bytes     
Max address space         unlimited            unlimited            bytes     
Max file locks            unlimited            unlimited            locks     
Max pending signals       124668               124668               signals   
Max msgqueue size         819200               819200               bytes     
Max nice priority         0                    0                    
Max realtime priority     0                    0                    
Max realtime timeout      unlimited            unlimited            us  

GC configs

Memory: 4k page, physical 31960748k(31339152k free), swap 0k(0k free)
CommandLine flags: -XX:+ExplicitGCInvokesConcurrent -XX:GCLogFileSize=104857600 -XX:InitialHeapSize=4294967296 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ManagementServer -XX:MaxGCPauseMillis=20 -XX:MaxHeapSize=4294967296 -XX:NumberOfGCLogFiles=10 -XX:+PrintGC -XX:+PrintGCDateStamps -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseCompressedClassPointers -XX:+UseCompressedOops -XX:+UseG1GC -XX:+UseGCLogFileRotation 

GC actions

2020-05-27T19:52:59.673+0000: 221.678: [GC pause (G1 Evacuation Pause) (young), 0.0221126 secs]
2020-05-27T19:53:00.458+0000: 222.463: [GC pause (G1 Evacuation Pause) (young), 0.0298531 secs]
2020-05-27T19:53:00.758+0000: 222.763: [GC pause (G1 Evacuation Pause) (young), 0.0145942 secs]
2020-05-27T19:53:01.661+0000: 223.667: [GC pause (G1 Evacuation Pause) (young), 0.0193434 secs]
2020-05-27T19:53:01.995+0000: 224.000: [GC pause (G1 Evacuation Pause) (young), 0.0157352 secs]
2020-05-27T19:53:02.286+0000: 224.292: [GC pause (G1 Evacuation Pause) (young), 0.0410610 secs]
2020-05-27T19:53:02.564+0000: 224.570: [GC pause (G1 Evacuation Pause) (young), 0.0161346 secs]
2020-05-27T19:53:02.852+0000: 224.857: [GC pause (G1 Evacuation Pause) (young), 0.0131654 secs]
2020-05-27T19:53:03.298+0000: 225.304: [GC pause (G1 Evacuation Pause) (young), 0.0145538 secs]
2020-05-27T19:53:03.938+0000: 225.943: [GC pause (G1 Evacuation Pause) (young), 0.0149772 secs]
2020-05-27T19:53:04.449+0000: 226.454: [GC pause (G1 Evacuation Pause) (young), 0.0178123 secs]
2020-05-27T19:53:04.675+0000: 226.680: [GC pause (G1 Evacuation Pause) (young), 0.0134442 secs]
2020-05-27T19:53:04.976+0000: 226.981: [GC pause (G1 Evacuation Pause) (young), 0.0185317 secs]
2020-05-27T19:53:05.388+0000: 227.393: [GC pause (G1 Evacuation Pause) (young), 0.0114091 secs]
2020-05-27T19:53:05.877+0000: 227.882: [GC pause (G1 Evacuation Pause) (young), 0.0133841 secs]
2020-05-27T19:53:06.355+0000: 228.361: [GC pause (G1 Evacuation Pause) (young), 0.0156590 secs]
2020-05-27T19:53:06.903+0000: 228.908: [GC pause (G1 Evacuation Pause) (young), 0.0147414 secs]
2020-05-27T19:53:07.211+0000: 229.216: [GC pause (G1 Evacuation Pause) (young), 0.0126670 secs]
2020-05-27T19:53:07.681+0000: 229.687: [GC pause (G1 Evacuation Pause) (young), 0.0160053 secs]
2020-05-27T19:53:08.148+0000: 230.153: [GC pause (G1 Evacuation Pause) (young), 0.0123430 secs]
2020-05-27T19:53:08.618+0000: 230.623: [GC pause (G1 Evacuation Pause) (young), 0.0157585 secs]
2020-05-27T19:53:09.344+0000: 231.349: [GC pause (G1 Evacuation Pause) (young), 0.0152343 secs]
2020-05-27T19:53:10.220+0000: 232.225: [GC pause (G1 Evacuation Pause) (young), 0.0129275 secs]
2020-05-27T19:53:11.181+0000: 233.186: [GC pause (G1 Evacuation Pause) (young), 0.0126668 secs]

GC is a suspoect as we pause more than once per second. Our goal is to maintain 90% gc time at 20ms, and on average no more than once per sec

so we change to

CommandLine flags: -XX:CICompilerCount=4 -XX:ConcGCThreads=2 -XX:+ExplicitGCInvokesConcurrent -XX:G1HeapRegionSize=2097152 -XX:GCLogFileSize=104857600 -XX:InitialHeapSize=6442450944 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ManagementServer -XX:MarkStackSize=4194304 -XX:MaxGCPauseMillis=20 -XX:MaxHeapSize=6442450944 -XX:MaxNewSize=3865051136 -XX:MinHeapDeltaBytes=2097152 -XX:NumberOfGCLogFiles=10 -XX:+PrintGC -XX:+PrintGCDateStamps -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseCompressedClassPointers -XX:+UseCompressedOops -XX:+UseG1GC -XX:+UseGCLogFileRotation

and GC changes to

2020-05-30T19:53:09.518+0000: 170448.394: [GC pause (G1 Evacuation Pause) (young), 0.0181099 secs]
2020-05-30T19:53:12.206+0000: 170451.082: [GC pause (G1 Evacuation Pause) (young), 0.0199838 secs]
2020-05-30T19:53:15.498+0000: 170454.375: [GC pause (G1 Evacuation Pause) (young), 0.0196726 secs]
2020-05-30T19:53:18.248+0000: 170457.124: [GC pause (G1 Evacuation Pause) (young), 0.0189805 secs]
2020-05-30T19:53:21.330+0000: 170460.206: [GC pause (G1 Evacuation Pause) (young), 0.0199247 secs]
2020-05-30T19:53:24.483+0000: 170463.359: [GC pause (G1 Evacuation Pause) (young), 0.0202763 secs]
2020-05-30T19:53:27.667+0000: 170466.543: [GC pause (G1 Evacuation Pause) (young), 0.0194760 secs]
2020-05-30T19:53:31.174+0000: 170470.051: [GC pause (G1 Evacuation Pause) (young), 0.0198198 secs]
2020-05-30T19:53:34.597+0000: 170473.473: [GC pause (G1 Evacuation Pause) (young), 0.0206716 secs]
2020-05-30T19:53:37.317+0000: 170476.193: [GC pause (G1 Evacuation Pause) (young), 0.0197347 secs]
2020-05-30T19:53:40.785+0000: 170479.661: [GC pause (G1 Evacuation Pause) (young), 0.0182224 secs]
2020-05-30T19:53:44.389+0000: 170483.266: [GC pause (G1 Evacuation Pause) (young), 0.0195625 secs]
2020-05-30T19:53:47.709+0000: 170486.585: [GC pause (G1 Evacuation Pause) (young), 0.0207940 secs]
2020-05-30T19:53:51.014+0000: 170489.890: [GC pause (G1 Evacuation Pause) (young), 0.0176806 secs]
2020-05-30T19:53:54.912+0000: 170493.788: [GC pause (G1 Evacuation Pause) (young), 0.0192957 secs]
2020-05-30T19:53:58.615+0000: 170497.491: [GC pause (G1 Evacuation Pause) (young), 0.0196635 secs]
2020-05-30T19:54:02.422+0000: 170501.298: [GC pause (G1 Evacuation Pause) (young), 0.0210456 secs]
2020-05-30T19:54:05.657+0000: 170504.533: [GC pause (G1 Evacuation Pause) (young), 0.0185793 secs]
2020-05-30T19:54:09.633+0000: 170508.509: [GC pause (G1 Evacuation Pause) (young), 0.0197715 secs]

We can see overall pause is GC is reduced, even though we keep the region size roughly same.