Kafka high CPU usage problem
Cluster setup
- 6 m5.2xlarge
- Kafka 2.3.0
- openjdk version “1.8.0_232”
- CentOS Linux release 7.6.1810 (Core)
Symptom
Every kafka broker is experiencing heavy CPU load at 15k msgs on each broker
procs -----------memory---------- ---swap-- -----io---- -system-- ------cpu-----
r b swpd free buff cache si so bi bo in cs us sy id wa st
1 0 0 286240 44392 24279584 0 0 53 711 8 6 40 11 49 0 0
5 0 0 287480 44392 24280624 0 0 0 0 48650 41962 15 5 80 0 0
5 0 0 286992 44392 24281656 0 0 0 0 50061 41004 21 6 73 0 0
15 0 0 289344 44376 24272348 0 0 0 0 53212 44541 30 7 63 0 0
5 0 0 291032 44340 24271480 0 0 0 9264 51796 43536 19 5 76 0 0
10 0 0 291656 44352 24272676 0 0 0 1604 46937 41888 18 5 77 0 0
3 0 0 290776 44352 24273524 0 0 0 0 46349 40179 16 4 80 0 0
2 0 0 289448 44352 24274356 0 0 0 0 50271 41767 18 6 76 0 0
1 0 0 289444 44352 24275272 0 0 0 0 46326 39414 16 5 79 0 0
9 0 0 287456 44352 24276560 0 0 0 3704 47776 39765 19 5 76 0 0
Each context swtich costs about 4 microseconds
Threads: 94 total, 8 running, 86 sleeping, 0 stopped, 0 zombie
%Cpu(s): 56.2 us, 9.9 sy, 0.0 ni, 28.1 id, 0.0 wa, 0.0 hi, 5.8 si, 0.0 st
KiB Mem : 31960748 total, 221784 free, 5007428 used, 26731536 buff/cache
KiB Swap: 0 total, 0 free, 0 used. 26347036 avail Mem
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
11063 kafka 20 0 34.2g 4.4g 68900 R 53.2 14.4 474:03.72 data-plane-kafk
10495 kafka 20 0 34.2g 4.4g 68900 R 47.8 14.4 470:24.77 data-plane-kafk
10499 kafka 20 0 34.2g 4.4g 68900 S 46.5 14.4 470:24.25 data-plane-kafk
10492 kafka 20 0 34.2g 4.4g 68900 S 45.8 14.4 470:21.15 data-plane-kafk
10494 kafka 20 0 34.2g 4.4g 68900 S 45.5 14.4 469:54.86 data-plane-kafk
10493 kafka 20 0 34.2g 4.4g 68900 S 45.2 14.4 470:30.89 data-plane-kafk
10497 kafka 20 0 34.2g 4.4g 68900 S 45.2 14.4 470:19.32 data-plane-kafk
10491 kafka 20 0 34.2g 4.4g 68900 R 44.9 14.4 470:20.41 data-plane-kafk
10498 kafka 20 0 34.2g 4.4g 68900 S 43.9 14.4 470:25.80 data-plane-kafk
11062 kafka 20 0 34.2g 4.4g 68900 R 39.2 14.4 372:52.36 data-plane-kafk
11060 kafka 20 0 34.2g 4.4g 68900 S 35.5 14.4 397:22.49 data-plane-kafk
13119 kafka 20 0 34.2g 4.4g 68900 S 9.3 14.4 103:45.58 ReplicaFetcherT
Analysis
Common thread dumps
Thread 11063: (state = IN_NATIVE)
- sun.nio.ch.EPollArrayWrapper.epollWait(long, int, long, int) @bci=0 (Compiled frame; information may be imprecise)
- sun.nio.ch.EPollArrayWrapper.poll(long) @bci=18, line=269 (Compiled frame)
- sun.nio.ch.EPollSelectorImpl.doSelect(long) @bci=28, line=93 (Compiled frame)
- sun.nio.ch.SelectorImpl.lockAndDoSelect(long) @bci=37, line=86 (Compiled frame)
- sun.nio.ch.SelectorImpl.select(long) @bci=30, line=97 (Compiled frame)
- org.apache.kafka.common.network.Selector.select(long) @bci=35, line=794 (Compiled frame)
- org.apache.kafka.common.network.Selector.poll(long) @bci=191, line=467 (Compiled frame)
- kafka.network.Processor.poll() @bci=24, line=863 (Compiled frame)
- kafka.network.Processor.run() @bci=31, line=762 (Compiled frame)
- java.lang.Thread.run() @bci=11, line=748 (Interpreted frame)
Thread 10494: (state = BLOCKED)
- sun.misc.Unsafe.park(boolean, long) @bci=0 (Compiled frame; information may be imprecise)
- java.util.concurrent.locks.LockSupport.parkNanos(java.lang.Object, long) @bci=20, line=215 (Compiled frame)
- java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(long) @bci=78, line=2078 (Compiled frame)
- java.util.concurrent.ArrayBlockingQueue.poll(long, java.util.concurrent.TimeUnit) @bci=49, line=418 (Compiled frame)
- kafka.network.RequestChannel.receiveRequest(long) @bci=8, line=344 (Compiled frame)
- kafka.server.KafkaRequestHandler.run() @bci=72, line=54 (Interpreted frame)
- java.lang.Thread.run() @bci=11, line=748 (Interpreted frame)
Thread 11063: (state = IN_JAVA)
- java.util.HashMap.getNode(int, java.lang.Object) @bci=143, line=581 (Compiled frame; information may be imprecise)
- java.util.HashMap.get(java.lang.Object) @bci=6, line=557 (Compiled frame)
- org.apache.kafka.common.protocol.types.Schema.get(java.lang.String) @bci=5, line=158 (Compiled frame)
- org.apache.kafka.common.protocol.types.Struct.get(java.lang.String) @bci=5, line=177 (Compiled frame)
- org.apache.kafka.common.protocol.types.Struct.getInt(java.lang.String) @bci=2, line=233 (Compiled frame)
- org.apache.kafka.common.protocol.types.Struct.get(org.apache.kafka.common.protocol.types.Field$Int32) @bci=5, line=84 (Compiled frame)
- org.apache.kafka.common.requests.ProduceRequest.<init>(org.apache.kafka.common.protocol.types.Struct, short) @bci=116, line=249 (Compiled frame)
- org.apache.kafka.common.requests.AbstractRequest.parseRequest(org.apache.kafka.common.protocol.ApiKeys, short, org.apache.kafka.common.protocol.types.Struct) @bci=210, line=147 (Compiled frame)
- org.apache.kafka.common.requests.RequestContext.parseRequest(java.nio.ByteBuffer) @bci=64, line=64 (Compiled frame)
- kafka.network.RequestChannel$Request.<init>(int, org.apache.kafka.common.requests.RequestContext, long, org.apache.kafka.common.memory.MemoryPool, java.nio.ByteBuffer, kafka.network.RequestChannel$Metrics) @bci=114, line=89 (Compiled frame)
- kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(org.apache.kafka.common.network.NetworkReceive) @bci=260, line=890 (Compiled frame)
- kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(java.lang.Object) @bci=5, line=873 (Compiled frame)
Thread 11062: (state = IN_JAVA)
- java.util.HashMap$HashIterator.<init>(java.util.HashMap) @bci=75, line=1433 (Compiled frame; information may be imprecise)
- java.util.HashMap$KeyIterator.<init>(java.util.HashMap) @bci=7, line=1467 (Compiled frame)
- java.util.HashMap$KeySet.iterator() @bci=8, line=917 (Compiled frame)
- java.util.HashSet.iterator() @bci=7, line=173 (Compiled frame)
- sun.nio.ch.Util$3.iterator() @bci=4, line=324 (Compiled frame)
- org.apache.kafka.common.network.Selector.pollSelectionKeys(java.util.Set, boolean, long) @bci=5, line=518 (Compiled frame)
- org.apache.kafka.common.network.Selector.poll(long) @bci=312, line=483 (Compiled frame)
- kafka.network.Processor.poll() @bci=24, line=863 (Compiled frame)
- kafka.network.Processor.run() @bci=31, line=762 (Compiled frame)
- java.lang.Thread.run() @bci=11, line=748 (Interpreted frame)
Thread 11060: (state = IN_NATIVE)
- sun.nio.ch.FileChannelImpl.transferTo0(java.io.FileDescriptor, long, long, java.io.FileDescriptor) @bci=0 (Compiled frame; information may be imprecise)
- sun.nio.ch.FileChannelImpl.transferToDirectlyInternal(long, int, java.nio.channels.WritableByteChannel, java.io.FileDescriptor) @bci=107, line=428 (Compiled frame)
- sun.nio.ch.FileChannelImpl.transferToDirectly(long, int, java.nio.channels.WritableByteChannel) @bci=217, line=493 (Compiled frame)
- sun.nio.ch.FileChannelImpl.transferTo(long, long, java.nio.channels.WritableByteChannel) @bci=133, line=605 (Compiled frame)
- org.apache.kafka.common.network.PlaintextTransportLayer.transferFrom(java.nio.channels.FileChannel, long, long) @bci=8, line=215 (Compiled frame)
- org.apache.kafka.common.record.FileRecords.writeTo(java.nio.channels.GatheringByteChannel, long, int) @bci=123, line=283 (Compiled frame)
- org.apache.kafka.common.record.DefaultRecordsSend.writeTo(java.nio.channels.GatheringByteChannel, long, int) @bci=11, line=33 (Compiled frame)
- org.apache.kafka.common.record.RecordsSend.writeTo(java.nio.channels.GatheringByteChannel) @bci=25, line=58 (Compiled frame)
- org.apache.kafka.common.record.MultiRecordsSend.writeTo(java.nio.channels.GatheringByteChannel) @bci=24, line=93 (Compiled frame)
- org.apache.kafka.common.network.KafkaChannel.send(org.apache.kafka.common.network.Send) @bci=10, line=429 (Compiled frame)
- org.apache.kafka.common.network.KafkaChannel.write() @bci=14, line=399 (Compiled frame)
- org.apache.kafka.common.network.Selector.pollSelectionKeys(java.util.Set, boolean, long) @bci=498, line=589 (Compiled frame)
ss shows that network connection is at normal range
Total: 7010 (kernel 0)
TCP: 6804 (estab 6792, closed 0, orphaned 0, synrecv 0, timewait 0/0), ports 0
Transport Total IP IPv6
* 0 - -
RAW 0 0 0
UDP 8 4 4
TCP 6804 7 6797
INET 6812 11 6801
FRAG 0 0 0
The kafka process is not limited by any config
Limit Soft Limit Hard Limit Units
Max cpu time unlimited unlimited seconds
Max file size unlimited unlimited bytes
Max data size unlimited unlimited bytes
Max stack size 8388608 unlimited bytes
Max core file size 0 unlimited bytes
Max resident set unlimited unlimited bytes
Max processes 124668 124668 processes
Max open files 128000 128000 files
Max locked memory 65536 65536 bytes
Max address space unlimited unlimited bytes
Max file locks unlimited unlimited locks
Max pending signals 124668 124668 signals
Max msgqueue size 819200 819200 bytes
Max nice priority 0 0
Max realtime priority 0 0
Max realtime timeout unlimited unlimited us
GC configs
Memory: 4k page, physical 31960748k(31339152k free), swap 0k(0k free)
CommandLine flags: -XX:+ExplicitGCInvokesConcurrent -XX:GCLogFileSize=104857600 -XX:InitialHeapSize=4294967296 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ManagementServer -XX:MaxGCPauseMillis=20 -XX:MaxHeapSize=4294967296 -XX:NumberOfGCLogFiles=10 -XX:+PrintGC -XX:+PrintGCDateStamps -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseCompressedClassPointers -XX:+UseCompressedOops -XX:+UseG1GC -XX:+UseGCLogFileRotation
GC actions
2020-05-27T19:52:59.673+0000: 221.678: [GC pause (G1 Evacuation Pause) (young), 0.0221126 secs]
2020-05-27T19:53:00.458+0000: 222.463: [GC pause (G1 Evacuation Pause) (young), 0.0298531 secs]
2020-05-27T19:53:00.758+0000: 222.763: [GC pause (G1 Evacuation Pause) (young), 0.0145942 secs]
2020-05-27T19:53:01.661+0000: 223.667: [GC pause (G1 Evacuation Pause) (young), 0.0193434 secs]
2020-05-27T19:53:01.995+0000: 224.000: [GC pause (G1 Evacuation Pause) (young), 0.0157352 secs]
2020-05-27T19:53:02.286+0000: 224.292: [GC pause (G1 Evacuation Pause) (young), 0.0410610 secs]
2020-05-27T19:53:02.564+0000: 224.570: [GC pause (G1 Evacuation Pause) (young), 0.0161346 secs]
2020-05-27T19:53:02.852+0000: 224.857: [GC pause (G1 Evacuation Pause) (young), 0.0131654 secs]
2020-05-27T19:53:03.298+0000: 225.304: [GC pause (G1 Evacuation Pause) (young), 0.0145538 secs]
2020-05-27T19:53:03.938+0000: 225.943: [GC pause (G1 Evacuation Pause) (young), 0.0149772 secs]
2020-05-27T19:53:04.449+0000: 226.454: [GC pause (G1 Evacuation Pause) (young), 0.0178123 secs]
2020-05-27T19:53:04.675+0000: 226.680: [GC pause (G1 Evacuation Pause) (young), 0.0134442 secs]
2020-05-27T19:53:04.976+0000: 226.981: [GC pause (G1 Evacuation Pause) (young), 0.0185317 secs]
2020-05-27T19:53:05.388+0000: 227.393: [GC pause (G1 Evacuation Pause) (young), 0.0114091 secs]
2020-05-27T19:53:05.877+0000: 227.882: [GC pause (G1 Evacuation Pause) (young), 0.0133841 secs]
2020-05-27T19:53:06.355+0000: 228.361: [GC pause (G1 Evacuation Pause) (young), 0.0156590 secs]
2020-05-27T19:53:06.903+0000: 228.908: [GC pause (G1 Evacuation Pause) (young), 0.0147414 secs]
2020-05-27T19:53:07.211+0000: 229.216: [GC pause (G1 Evacuation Pause) (young), 0.0126670 secs]
2020-05-27T19:53:07.681+0000: 229.687: [GC pause (G1 Evacuation Pause) (young), 0.0160053 secs]
2020-05-27T19:53:08.148+0000: 230.153: [GC pause (G1 Evacuation Pause) (young), 0.0123430 secs]
2020-05-27T19:53:08.618+0000: 230.623: [GC pause (G1 Evacuation Pause) (young), 0.0157585 secs]
2020-05-27T19:53:09.344+0000: 231.349: [GC pause (G1 Evacuation Pause) (young), 0.0152343 secs]
2020-05-27T19:53:10.220+0000: 232.225: [GC pause (G1 Evacuation Pause) (young), 0.0129275 secs]
2020-05-27T19:53:11.181+0000: 233.186: [GC pause (G1 Evacuation Pause) (young), 0.0126668 secs]
GC is a suspoect as we pause more than once per second. Our goal is to maintain 90% gc time at 20ms, and on average no more than once per sec
so we change to
CommandLine flags: -XX:CICompilerCount=4 -XX:ConcGCThreads=2 -XX:+ExplicitGCInvokesConcurrent -XX:G1HeapRegionSize=2097152 -XX:GCLogFileSize=104857600 -XX:InitialHeapSize=6442450944 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ManagementServer -XX:MarkStackSize=4194304 -XX:MaxGCPauseMillis=20 -XX:MaxHeapSize=6442450944 -XX:MaxNewSize=3865051136 -XX:MinHeapDeltaBytes=2097152 -XX:NumberOfGCLogFiles=10 -XX:+PrintGC -XX:+PrintGCDateStamps -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseCompressedClassPointers -XX:+UseCompressedOops -XX:+UseG1GC -XX:+UseGCLogFileRotation
and GC changes to
2020-05-30T19:53:09.518+0000: 170448.394: [GC pause (G1 Evacuation Pause) (young), 0.0181099 secs]
2020-05-30T19:53:12.206+0000: 170451.082: [GC pause (G1 Evacuation Pause) (young), 0.0199838 secs]
2020-05-30T19:53:15.498+0000: 170454.375: [GC pause (G1 Evacuation Pause) (young), 0.0196726 secs]
2020-05-30T19:53:18.248+0000: 170457.124: [GC pause (G1 Evacuation Pause) (young), 0.0189805 secs]
2020-05-30T19:53:21.330+0000: 170460.206: [GC pause (G1 Evacuation Pause) (young), 0.0199247 secs]
2020-05-30T19:53:24.483+0000: 170463.359: [GC pause (G1 Evacuation Pause) (young), 0.0202763 secs]
2020-05-30T19:53:27.667+0000: 170466.543: [GC pause (G1 Evacuation Pause) (young), 0.0194760 secs]
2020-05-30T19:53:31.174+0000: 170470.051: [GC pause (G1 Evacuation Pause) (young), 0.0198198 secs]
2020-05-30T19:53:34.597+0000: 170473.473: [GC pause (G1 Evacuation Pause) (young), 0.0206716 secs]
2020-05-30T19:53:37.317+0000: 170476.193: [GC pause (G1 Evacuation Pause) (young), 0.0197347 secs]
2020-05-30T19:53:40.785+0000: 170479.661: [GC pause (G1 Evacuation Pause) (young), 0.0182224 secs]
2020-05-30T19:53:44.389+0000: 170483.266: [GC pause (G1 Evacuation Pause) (young), 0.0195625 secs]
2020-05-30T19:53:47.709+0000: 170486.585: [GC pause (G1 Evacuation Pause) (young), 0.0207940 secs]
2020-05-30T19:53:51.014+0000: 170489.890: [GC pause (G1 Evacuation Pause) (young), 0.0176806 secs]
2020-05-30T19:53:54.912+0000: 170493.788: [GC pause (G1 Evacuation Pause) (young), 0.0192957 secs]
2020-05-30T19:53:58.615+0000: 170497.491: [GC pause (G1 Evacuation Pause) (young), 0.0196635 secs]
2020-05-30T19:54:02.422+0000: 170501.298: [GC pause (G1 Evacuation Pause) (young), 0.0210456 secs]
2020-05-30T19:54:05.657+0000: 170504.533: [GC pause (G1 Evacuation Pause) (young), 0.0185793 secs]
2020-05-30T19:54:09.633+0000: 170508.509: [GC pause (G1 Evacuation Pause) (young), 0.0197715 secs]
We can see overall pause is GC is reduced, even though we keep the region size roughly same.