Editorial: Coding set 3

2020-05-16 00:00:00 +0000

System design: web scale chat systems

2020-05-12 00:00:00 +0000

Requirements

  • Multiple clients for a single user is allowed, and all messages should be synced between the clients
  • Messages are displayed in the same order as closely as possible on all clients. See the discussion below
  • Web scale
  • Use WebSocket to deliver messages
  • User Authentication needed
  • Can ignore friend list feature for now
  • Need to support group chat

What kind of order must be maintained?

  • Partial order within the same sender on the same client must be maintained. From a sender’s pov, the message it sends should arrive in the same order in all clients,e.g., If the user sends messages in the order of is 1, 2, it is OK to delay the delivery of 2 until 1 is delivered, but it is not ok to display 2 on the receiver’s client, and then insert 1 in front of 2.
  • Complete order between senders within the same dialog should not be maintained. Otherwise, the whole group will be slowed down by the slowest participant - Everyone will be fighting for the same distributed lock within the group

    Capacity estimation

  • 1B user clients. Each sends 1 message every 2 min in 12 hours = 1B * 1 * 30 * 12 = 360B messsages
    • We will assume they are in the same region instead of distributed across regions
  • Each message 1kb = 360 TB storage per day
  • Peak write traffic = 4 times of average about 35 mil writes per sec
  • Because the read is powered by websocket, the read request will be higher by write but not much higher.
  • Each websocket server maintains 100k connections, so we need 10k node
  • Each DB node handles 10k request per sec peak, so need a 350 node cluster

High level components breakdown

Excluding common services, we need

  • User authenticaiton and authorization module (UAA) for concrete auth logic
  • User client service that manages 1-to-many user-client relationship
  • User domain service that stores detailed user info. Can be combined with user client service
  • Message domain service manages the details and life-cycle of individual message
  • Dialog domain service that manages dialog lifecycle, including dialog-> message mapping, and dialog->user, mapping
  • Notification service to host websocket connections to clients

Why separate message domain and dialog domain?

Aside from conceptually reasons, the practical reasons:

  • Message is subject to extension, e.g., video/voice messages.
  • Need way to locate message by its id for various purposes

Why separate notification domain and dialog domain

The two have different characterisitcs

  • Dialog: heavy state + short connections
  • notificaiton: light state + long connections/websocket

Otherwise, heavy state + long connection is a much harder problem

User authenticaion and authorization(UAA)

  • SSO such as open id, gives the access_token and refresh_token to every single client. For each request, user will provide
    • access_token
    • openid
    • login_type
  • When a request hits the API gateway, API gateway will hit the UAA and get the corresponding JWT token before forwarding to the actual target service
  • UAA makes sure (login_type, openid) exists, and return the code to the client
  • Client will use the return code to get the token
  • Need to maintain
    • (user_id, auth_type) -> (token, expiry, meta)
    • (user_id, auth_id) -> (name, salted hash, meta) - can be combined with above table
    • auth_id -> (user_id, openid, login_type, access_token)

Message service

  • Maintains the mapping (msg_id) -> (sender_id, dialog_id, content, status, create_at, updated_at). msg_id is a snowflake id which is roughly sorted by the processing time
  • Status is to mark if the message has been delivered to the dialog service. This is required as 2PC/TCC state
  • We need 2PC/TCC support between the message domain and dialog domain because messages from the same sender must display in the same relative order across all clients.
  • Only after the we deliver to the dialog service that we return to the client the message has been delivered. Again, if we don’t have to maintain a consistent order for messages from the same sender, we can return to client the moment it is persisted to message db, and do the delivery async

TCC vs MQ for tranferring data between message domain and dialog domain

Basically a decision between sync and async communication. In this case TCC is preferred over MQ

  • Chat favors low latency over throughput. We can fail early and let user try again
  • Reverting operation in MQ is a lot trickier than sync communication

This also means we accept the risk that we will lose th buffer capability of the MQ when there is a major outage across the cluster

Dialog service

Maintains the following mappings

  • (dialogue_id, user_ID) -> metadata - members in a dialouge
  • (user_id, dialogue_id) -> metadata - dialogues a user is in
  • (dialog_id, message_created_at) –> message_id - for the content of dialogue
  • partitioned by the dialogue_id, order key is the PK so that transactions remain on the same node

  • Upon persisting message from the message service, dialog service send a notification via notification service to client that new messages are available. Client will then poll from the dialog servic.
  • Can also push directly to the client as part of notification instead of notifying client to poll. In production, it is most likely a hybrid to be reactive
  • Delivered messaged can be purged from the DB, since we have message service as source of truth

User client service

  • Maintains (user_id, client_id) -> (notification node, metadata)
  • Acts as router to route the incoming notifiction request to the real server
  • 1B client * 1KB each entry = 1T storage
  • Simple ID based sharding + 10 nodes is enough to host the metadata

Where do we maintain the checkpoint on the messages user’s poll

Either local client side or user client service side could work. In any case, server side needs to defend against malicious poll requests

Local cilent

  • In local storage maintains received message ids and their content so that client can do local dedup and reordering.
  • Client does not send out another message until the previous one has been acked by the server side.
    • Inputing message is a low throughput activity so such delay is OK.
    • If we really want to support concurrent send, then we have to maintain a sequence number for each dialog, which further complicates the design
  • Note we can’t rely on client time because of the consist, in order requirement - the processing time in the message service is the source of truth time

What if I want to maintain the same relative order of the message i sent across all clients

  • This means we need to introduce a sequence number for each (dialog_id, user_id) which acts as the logical distributed lock, and fetch and then increment it every one i send a message.
  • In theory we could still use the message creation time instead of sequence number, but in practice hard to deal with time drift issues

Notification service

  • This is the stateless layer that maintains the long-living client connections
  • Long connection gateway (LCG) connects mulitple clients and multiple business backend (many-to-many). Between them only 1 long conneciton.
  • LCG supports pub/sub model.
  • LCG should provide allow callback to business backend to support domain-specific auth
  • Open resty layer to keep session and LB.
  • Long connection broker for protocal, auth, session, pub/sub management

If we have to deploy it globally to serve the global traffic, which areas need to change

TODO: dive deep on this later

How pd schedules regions

2020-04-29 00:00:00 +0000

  • Store : each tikv server maps to a store
  • Pending: learner/follower log lags behind leader by a large margin
  • PD use standard revision + lease for leader failover

  • Scheduler: generate the operator, which encapsulates the steps related to rebalancing for a single region,i.e., one region has mutliple steps
    • balance-leader-scheduler
    • balance-region-scheduler
    • hot-region-scheduler

type Scheduler interface {
	GetName() string
	GetResourceKind() ResourceKind
	Schedule(cluster *clusterInfo) Operator
}

After operator is generated, it will be put in a queue and execute sequentially (possibly in parallel), by sending the operator step to the region leader as a response to the heartbeat

  • It will wait for timeout with ack

type Operator interface {
	GetRegionID() uint64
	GetResourceKind() ResourceKind
	Do(region *regionInfo) (*pdpb.RegionHeartbeatResponse, bool)
}

Source code: how kafka consumer commits offset

2020-04-29 00:00:00 +0000

  • Source code version 2.4
  • What does asyncCommitFenced do?

In ConsumerCoordinator


    private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
        Node coordinator = checkAndGetCoordinator();

        // create the offset commit request
        Map<String, OffsetCommitRequestData.OffsetCommitRequestTopic> requestTopicDataMap = new HashMap<>();
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
            TopicPartition topicPartition = entry.getKey();
            OffsetAndMetadata offsetAndMetadata = entry.getValue();

            OffsetCommitRequestData.OffsetCommitRequestTopic topic = requestTopicDataMap
                    .getOrDefault(topicPartition.topic(),
                            new OffsetCommitRequestData.OffsetCommitRequestTopic()
                                    .setName(topicPartition.topic())
                    );

            topic.partitions().add(new OffsetCommitRequestData.OffsetCommitRequestPartition()
                    .setPartitionIndex(topicPartition.partition())
                    .setCommittedOffset(offsetAndMetadata.offset())
                    .setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch().orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
                    .setCommittedMetadata(offsetAndMetadata.metadata())
            );
            requestTopicDataMap.put(topicPartition.topic(), topic);
        }

        final Generation generation = generationIfStable();

        OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
                new OffsetCommitRequestData()
                        .setGroupId(this.rebalanceConfig.groupId)
                        .setGenerationId(generation.generationId)
                        .setMemberId(generation.memberId)
                        .setGroupInstanceId(rebalanceConfig.groupInstanceId.orElse(null))
                        .setTopics(new ArrayList<>(requestTopicDataMap.values()))
        );

        log.trace("Sending OffsetCommit request with {} to coordinator {}", offsets, coordinator);

        return client.send(coordinator, builder)
                .compose(new OffsetCommitResponseHandler(offsets));
    }

On server side, the operation detection is done by checking the value request.header.apiKey, e.g.,


        case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
        case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
        case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
        case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)

The response is


   public OffsetCommitResponse(int requestThrottleMs, Map<TopicPartition, Errors> responseData) {
        Map<String, OffsetCommitResponseTopic>
                responseTopicDataMap = new HashMap<>();

        for (Map.Entry<TopicPartition, Errors> entry : responseData.entrySet()) {
            TopicPartition topicPartition = entry.getKey();
            String topicName = topicPartition.topic();

            OffsetCommitResponseTopic topic = responseTopicDataMap.getOrDefault(
                topicName, new OffsetCommitResponseTopic().setName(topicName));

            topic.partitions().add(new OffsetCommitResponsePartition()
                                       .setErrorCode(entry.getValue().code())
                                       .setPartitionIndex(topicPartition.partition()));
            responseTopicDataMap.put(topicName, topic);
        }

        data = new OffsetCommitResponseData()
                .setTopics(new ArrayList<>(responseTopicDataMap.values()))
                .setThrottleTimeMs(requestThrottleMs);
    }

Coding set 2

2020-04-27 00:00:00 +0000

Source code: How Kafka GroupCoordinator handles membership

2020-04-26 00:00:00 +0000

  • Server side of the coordination logic. Removed/rewrite corner cases and error handling
  • Source code verison 2.4
  • See kafka.coordinator.group.GroupState for the state transition

Started inside KafkaServer.startup


  /**
   * Startup logic executed at the same time when the server starts up.
   */
  def startup(enableMetadataExpiration: Boolean = true): Unit = {
    info("Starting up.")
    groupManager.startup(enableMetadataExpiration)
    isActive.set(true)
    info("Startup complete.")
  }

How LEAVE_GROUP request is handled

In handleLeaveGroup.It will be triggered when

  • Consumer calls unsubscribe
  • Consumer heartbeat times out. Consumer will send LEAVE_GROUP request

//Group is locked for concurrent edit
                  val memberId = leavingMember.memberId
                  val groupInstanceId = Option(leavingMember.groupInstanceId)
                  if (group.isPendingMember(memberId)) {
                      // if a pending member is leaving, it needs to be removed from the pending list, heartbeat cancelled
                      // and if necessary, prompt a JoinGroup completion.
                      info(s"Pending member $memberId is leaving group ${group.groupId}.")
                      removePendingMemberAndUpdateGroup(group, memberId)
                      heartbeatPurgatory.checkAndComplete(MemberKey(group.groupId, memberId))
                  } else {
                    val member = if (group.hasStaticMember(groupInstanceId))
                      group.get(group.getStaticMemberId(groupInstanceId))
                    else
                      group.get(memberId)
  member.isLeaving = true
    val memberKey = MemberKey(member.groupId, member.memberId)
    heartbeatPurgatory.checkAndComplete(memberKey)
                    info(s"Member[group.instance.id ${member.groupInstanceId}, member.id ${member.memberId}] " +
                      s"in group ${group.groupId} has left, removing it from the group")
  // New members may timeout with a pending JoinGroup while the group is still rebalancing, so we have
    // to invoke the callback before removing the member. We return UNKNOWN_MEMBER_ID so that the consumer
    // will retry the JoinGroup request if is still active.
    group.maybeInvokeJoinCallback(member, joinError(NoMemberId, Errors.UNKNOWN_MEMBER_ID))

    group.remove(member.memberId)
    group.removeStaticMember(member.groupInstanceId)

    group.currentState match {
      case Dead | Empty =>
      case Stable | CompletingRebalance => maybePrepareRebalance(group, reason)
      case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
    }
                  }

How is heartbeat handled


//when group is still loading, respond blindly
//group is mutex is locked in the whole process

          group.currentState match {
            case Empty =>
              responseCallback(Errors.UNKNOWN_MEMBER_ID)

            case CompletingRebalance =>
                responseCallback(Errors.REBALANCE_IN_PROGRESS)

            case PreparingRebalance =>
                val member = group.get(memberId)
                completeAndScheduleNextHeartbeatExpiration(group, member)
                responseCallback(Errors.REBALANCE_IN_PROGRESS)

            case Stable =>
                val member = group.get(memberId)
                completeAndScheduleNextHeartbeatExpiration(group, member)
                responseCallback(Errors.NONE)

            case Dead =>
              throw new IllegalStateException(s"Reached unexpected condition for Dead group $groupId")


  private def completeAndScheduleNextExpiration(group: GroupMetadata, member: MemberMetadata, timeoutMs: Long): Unit = {
    // complete current heartbeat expectation
    member.latestHeartbeat = time.milliseconds()
    val memberKey = MemberKey(member.groupId, member.memberId)
    heartbeatPurgatory.checkAndComplete(memberKey)

    // reschedule the next heartbeat expiration deadline
    val deadline = member.latestHeartbeat + timeoutMs
    val delayedHeartbeat = new DelayedHeartbeat(this, group, member.memberId, isPending = false, deadline, timeoutMs)
    heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
  }

  def onExpireHeartbeat(group: GroupMetadata, memberId: String, isPending: Boolean, heartbeatDeadline: Long): Unit = {
    group.inLock {
      if (group.is(Dead)) {
        info(s"Received notification of heartbeat expiration for member $memberId after group ${group.groupId} had already been unloaded or deleted.")
      } else if (isPending) {
        info(s"Pending member $memberId in group ${group.groupId} has been removed after session timeout expiration.")
        removePendingMemberAndUpdateGroup(group, memberId)
      } else if (!group.has(memberId)) {
        debug(s"Member $memberId has already been removed from the group.")
      } else {
        val member = group.get(memberId)
        if (!member.shouldKeepAlive(heartbeatDeadline)) {
          info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group")
          removeMemberAndUpdateGroup(group, member, s"removing member ${member.memberId} on heartbeat expiration")
        }
      }
    }
  }

Source code: How Kafka consumer decides which partitions to poll

2020-04-24 00:00:00 +0000

  • Client side of the coordination logic. Removed/rewrite corner cases and error handling
  • Source code verison 2.4

In org.apache.kafka.clients.consumer.KafkaConsumer

private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
	coordinator.poll(timer); //note we poll coordinator every time!
	fetcher.validateOffsetsIfNeeded();
	coordinator.refreshCommittedOffsetsIfNeeded(timer)
	subscriptions.resetMissingPositions();
	fetcher.resetOffsetsIfNeeded();
  	final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);

 	if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) 
		client.transmitSends();

	return this.interceptors.onConsume(new ConsumerRecords<>(records));
}

The paritition assignment refresh logic is in org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. Call it CC below

Data structure of CC

public final class ConsumerCoordinator extends AbstractCoordinator {
    private boolean isLeader = false;
    private final SubscriptionState subscriptions;
    private Set<String> joinedSubscription; //updated by subscrptions
    private MetadataSnapshot metadataSnapshot; 
    private MetadataSnapshot assignmentSnapshot; //updated by metadataSnapshot but not vice-versa. Updated only if current CC is the leader
    private boolean rejoinNeeded = true;
    private boolean needsJoinPrepare = true;
    private MemberState state = MemberState.UNJOINED;
    private HeartbeatThread heartbeatThread = null;

  • Note the heartbeat thread inside the coordinator does not update the internal state

How ConsumerCoordinator polls

   public boolean rejoinNeededOrPending() {
        if (!subscriptions.partitionsAutoAssigned())
            return false;

        // we need to rejoin if we performed the assignment and metadata has changed;
        // also for those owned-but-no-longer-existed partitions we should drop them as lost
        if (assignmentSnapshot != null && !assignmentSnapshot.matches(metadataSnapshot))
            return true;

        // we need to join if our subscription has changed since the last join
        if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription())) {
            return true;
        }

	return rejoinNeeded || joinFuture != null; //rejoinNeeded defaults to true
    }

    private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
        @Override
        public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
                log.debug("Received successful JoinGroup response: {}", joinResponse);

                synchronized (AbstractCoordinator.this) {
                    if (state != MemberState.REBALANCING) {
                        // if the consumer was woken up before a rebalance completes, we may have already left
                        // the group. In this case, we do not want to continue with the sync group.
                        future.raise(new UnjoinedGroupException());
                    } else {
                        AbstractCoordinator.this.generation = new Generation(joinResponse.data().generationId(),
                                joinResponse.data().memberId(), joinResponse.data().protocolName());
                        if (joinResponse.isLeader()) {
                            onJoinLeader(joinResponse).chain(future);
                        } else {
    SyncGroupRequest.Builder requestBuilder =
                new SyncGroupRequest.Builder(
                        new SyncGroupRequestData()
                                .setGroupId(rebalanceConfig.groupId)
                                .setMemberId(generation.memberId)
                                .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
                                .setGenerationId(generation.generationId)
                                .setAssignments(Collections.emptyList())
                );
        log.debug("Sending follower SyncGroup to coordinator {} at generation {}: {}", this.coordinator, this.generation, requestBuilder);
        syncFuture = client.send(coordinator, requestBuilder).compose(new SyncGroupResponseHandler());
                            syncFuture.chain(future);
                        }
                    }
                }
        }
    }

    private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
            state = MemberState.REBALANCING;
  	int joinGroupTimeoutMs = Math.max(rebalanceConfig.rebalanceTimeoutMs, rebalanceConfig.rebalanceTimeoutMs + 5000);
    log.info("(Re-)joining group");
        JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
                new JoinGroupRequestData()
                        .setGroupId(rebalanceConfig.groupId)
                        .setSessionTimeoutMs(this.rebalanceConfig.sessionTimeoutMs)
                        .setMemberId(this.generation.memberId)
                        .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
                        .setProtocolType(protocolType())
                        .setProtocols(metadata()) //this one internally update joined subscription!
                        .setRebalanceTimeoutMs(this.rebalanceConfig.rebalanceTimeoutMs)
        );

        log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator);
            joinFuture = client.send(coordinator, requestBuilder, joinGroupTimeoutMs)
                .compose(new JoinGroupResponseHandler());
            joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
                @Override
                public void onSuccess(ByteBuffer value) {
                    // handle join completion in the callback so that the callback will be invoked
                    // even if the consumer is woken up before finishing the rebalance
                    synchronized (AbstractCoordinator.this) {
                            log.info("Successfully joined group with generation {}", generation.generationId);
                            state = MemberState.STABLE;
                            rejoinNeeded = false;
                            // record rebalance latency
                            if (heartbeatThread != null)
                                heartbeatThread.enable();
                    }
                }
            });
        }
        return joinFuture;
    }

    boolean joinGroupIfNeeded(final Timer timer) {
            // call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second
            // time if the client is woken up before a pending rebalance completes. This must be called
            // on each iteration of the loop because an event requiring a rebalance (such as a metadata
            // refresh which changes the matched subscription set) can occur while another rebalance is
            // still in progress.
            if (needsJoinPrepare) {
                // need to set the flag before calling onJoinPrepare since the user callback may throw
                // exception, in which case upon retry we should not retry onJoinPrepare either.
                needsJoinPrepare = false;
                onJoinPrepare(generation.generationId, generation.memberId);
            }

            final RequestFuture<ByteBuffer> future = initiateJoinGroup();
            client.poll(future, timer);
    Generation generationSnapshot;

                // Generation data maybe concurrently cleared by Heartbeat thread.
                // Can't use synchronized for {@code onJoinComplete}, because it can be long enough
                // and  shouldn't block hearbeat thread.
                // See {@link PlaintextConsumerTest#testMaxPollIntervalMsDelayInAssignment
                synchronized (AbstractCoordinator.this) {
                    generationSnapshot = this.generation;
                }

                    // Duplicate the buffer in case `onJoinComplete` does not complete and needs to be retried.
                    ByteBuffer memberAssignment = future.value().duplicate();

                    onJoinComplete(generationSnapshot.generationId, generationSnapshot.memberId, generationSnapshot.protocol, memberAssignment);

                    // Generally speaking we should always resetJoinGroupFuture once the future is done, but here
                    // we can only reset the join group future after the completion callback returns. This ensures
                    // that if the callback is woken up, we will retry it on the next joinGroupIfNeeded.
                    // And because of that we should explicitly trigger resetJoinGroupFuture in other conditions below.
                    resetJoinGroupFuture();
                    needsJoinPrepare = true; 
}
}

public boolean poll(Timer timer) {
	if (rejoinNeededOrPending()) { //the internal states refershed by a heartbeat thread. Here we only check the staleness
		joinGroupIfNeeded(timer);
	}
}

How heartbeat works - HeartbeatResponseHandler


  public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
            Errors error = heartbeatResponse.error();
            if (error == Errors.NONE) {
                log.debug("Received successful Heartbeat response");
                future.complete(null);
            } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                log.info("Attempt to heartbeat failed since group is rebalancing");
		this.rejoinNeeded = true
                future.raise(error);
            } else if (error == Errors.ILLEGAL_GENERATION) {
                log.info("Attempt to heartbeat failed since generation {} is not current", generation.generationId);
		 this.generation = Generation.NO_GENERATION;
  		rejoinNeeded = true;
        state = MemberState.UNJOINED;
                future.raise(error);
            } else {
                future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
            }
        }

On PermGen

2020-04-22 00:00:00 +0000

  • Class loader reads the class file and loads the interpreted data into the JVM
  • Static non-final variables are stored in the memory area, but static final variables are stored in runtime constant pool - still in the PermGen though. Also, final class variables are stored as part of the data that any type can use, whereas class variables is stores as part of the data for the type that declares them
  • For each type:
    • FQN of the type
    • FQN of super class
    • Is this type a class or interface?
    • Type modifier set - public, abstract, final?
    • FQN of direct superinterfaces it implements
    • Constant pool: an array holding literals and symbolic reference to types, fields, and methods
    • For each field:
      • name
      • type
      • modifier
    • For each method:
      • name
      • return type
      • parameter # and types
      • modifier set (public, private, abstract, final…)
      • If method is not abstract or native:
        • method byte codes
        • size of the operand stack and local variables
        • Exception table
    • Class variables, i.e., static but not final
    • Reference to ClassLoader to support user-defined ClassLoader
    • Reference to class Class
    • For non-abstract class, method table - array of direct references to array instance methods that maybe invoked from the instance

How is the method area involved when we execute the main()

class Volcano {

    public static void main(String[] args) {
        Lava lava = new Lava();
        lava.flow();
    }
}
  • Class loader loads the class the main() method is in, populate method area with the type info listed above
  • VM runs the byte codes of main() from method area, and maintain a pointer to the contant pool (in method area) of main()’s class
  • Lava is a symbol in constant pool, so the class loader loads the Lava type and populate the corresponding method area
  • VM replace the symbol with reference to the Lava class’s class data
  • VM allocates memory for Lava, by look up Lava in method area to find out its heap space
  • VM inits all fields in lava to its default value
  • Pushs the reference to the new Lava object into (operand) stack
  • use the reference to init the private value of Lava

How kafka manages consumers

2020-04-21 00:00:00 +0000

ConsumerCoordinator

  • Fetch thread to pull msgs.
  • Heartbeat thread to the coordinator. Default timeout at 10s (session.timeout.ms)
    • Will get the state of the coordinator, if it is rebalancing, send the JoinGroup request
  • Every consumer group has a coordinator on the broker. The coordinator manages the membership of consumers

Kafka group states

  • Empty: init state, no group member
  • PreparingBalance
  • InitialRebalance: delay entering PreparingBalance state to reduce rebalance times
  • AwaitSync: waiting for the new assignment from the group leader
  • Stable
  • Dead: empty group. Metadata no longer available

When joining new group, the state is Empty -> PreparingBalance -> AwaitSync -> Stable

Rebalance consumer

When the consumer membership changes, coordinator will rebalance consumer to partition association

  • Coordinator replys the heartbeat request from all consumers with rebalance notification. All consumers need to resend joinGroup request
  • JoinGroup: When new one joins the group
    • the coordinater will let the consumer group leader know the membership assignment, generaton
    • Other consumers will know the leader, generation and topic and its partition info
  • SyncGroup: after JoinGroup stage is over. Consumer leader will rebalance and return the assignment to the coordinator, which in turn broadcast to other consumers as reponse of SyncGroup call from followers
    • Default assignment strategy is RangeAssignor
  • If leader is lost during the rebalance, it will be detected by coordinator and the leader will be removed

Coding set 1

2020-04-20 00:00:00 +0000

Design: delayed task

2020-04-19 00:00:00 +0000

The purpose of delayed task is to execute some task in the the future

Common solutions:

  • DelayQueue
  • ScheduledExecutorService
  • zset in redis with activation time as key, need to check if deletion is successful to see if it is consumed multiple times
  • Use key space notification (Default is off in redis)
  • Timer wheel-based implementation

What if we can have up to 1 B tasks to be run the future

For example, we want to send different notifications to user 1 day after performing different types of actions. How do you design this?

Assume we have 100 mil users, each user can get up to 10 different delayed tasks per day, and we have to execute the task within 12 hours of the scheduled execution time

Then we have to persist with (virtual shard, timestamp, seq id) as the key prefix, and use timestamp as the checkpoint offset

System design: square cash

2020-04-13 00:00:00 +0000

Original discussion

High level components

  • API gateway
  • Auth module
  • Payment service records txns seen by the user
    • txn, status, may have a list of ledger entries
  • Book keeping ledger service, keeps track of internal states
    • use double book keeping, i.e., need to differentiate debit and credit
    • Need to know which entry maps to which txn
  • Account balance service for users
    • May merge this as part of the ledger if distributed txn is not desirable
  • Proxy to 3rd party payment and settlement
  • MQ for the async notification from the main txn system

Requirement: our internal txn state should be consistent with external txn state

  • Recon job to scan pending txns after a period
  • provide webhook for 3rd party to ack success

Requirement: users may pay in different currencies

  • Introduce a centralized exchange service
  • Ledger keeps the currency and the original amount, with the rate recorded
  • Need extra ledger entry for internal txns

System design: Dapper

2020-04-11 00:00:00 +0000

Design a dapper-like system for tracing and monitoring

Requirements: drill down a certain request and see its sequence of actions

  • Embed in the header
    • Trace id: root id
    • Parent session id: who called this request
    • Current session id
    • Seq number to mark layer and de-dup
  • Do sampling to control no more than 1k writes per sec
  • Can just store them in (traceid, session) -> table
  • Upon viewing, just load all sessions in a trace on the fly and compute the tree (< 100 sessions per trace)

Requirements: calculate avg latency by node id and time

  • Timeseries db, similar to the design of prometheus
  • For each metric, we maintain (timestamp, machineid, sequence) -> labels, value
  • Suppose 100 metrics sent per sec, 3 months data is 100 * 86400 * 100 = 1B data points, so if we want to plot the data over 3 months we need to scan at most 1 bill rows.
    • It can be parallelized by each point we see on the graph

System design: OLAP

2020-04-10 00:00:00 +0000

Suppose we run an e-commerce web site

Requirement: top 10 in the last 24 hours for each store

  • Need to keep track of
    • For each store, maintains a top 10 list, probably in cache, since it is way more reads than writes
    • txn history with index at (store_id, created_at), so need a seperate txn history service shared by store_id
  • Each store has at most 10k txns per day, so we can afford to load them in memory and compute it at every refresh interval, e.g., 10 mins
    • In fact, 7 days means 1 mil txns, so is good too
  • The calculation of stores can be triggered on-demand, i.e., during the visit, so we don’t have to pull store catalog service, i.e., update once every 1-5 minutes
    • Either synchronously via API call or asyncly via MQ

Requirement: complete item sales rank in the last week by category

  • Category is much more coarse grained than store, espeically basic ones. Expect total < 1 mil.
  • This also means the whole data can fit into the cache
  • Category is a nested-tree like structure, we assume no more than 5 levels (20 ^ 5) = 3.5 mil. Note that on amazon it is more like a DAG with 2 paths. So we assume each item will update at most 10 categories
  • Because it is windowed, we have to keep re-calculating the whole batch to defend against the retired sales case even for the top K.
    • Mini batch can speed up the calculation at the cost of additional complexity, but is needed if we care about the update frequency
    • The brute force calculation is OK if we don’t have strong SLA on the freshness of the data (< 1 hours). It will scan and populate 10 mil per day * 7 days * 10 categories per item = about 1B intermediate rows

Suppose we want to see updates < 1 min, for popular categories, e.g., top 100 hot categories

Populating the raw data

  • Keep track of
    • Last pulled checkpoint
    • (category_id, time, item_id) -> count
  • We periodically pull the txn services to get the txns since the last pulled checkpoint to populate (category_id, time, item_id) -> count
    • Can also triggered by notifications from the main txn service, but need to watch out for frequent update problem, so still need to checkthe last pulled check point
    • Can purge pulled data after 1 week
    • During the pull need to clear all data after the checkpoint first, and do a complete calculation

Calculating the final result

  • Keep track of
    • (category_id, item_id) -> count, last_calced_at
    • (category_id) -> (item id list, last updated_at)
  • Can store the result into cache since we have < 1 mil categories
  • At each refresh interval, scan all items for
    • the count after last_calced_at
    • the count before the now - 7 days
    • Last calculated window count
    • a - b + c to get the current window count
    • update the current window count with new updated_at
  • scan all items in the categories, can compute it in memory
    • we assume it has < 10 mil items per cateogry

Requirement: show a item’s sale number in the last day/week/month/lifetime

Common problems for hugh data processing

2020-04-07 00:00:00 +0000

Find common URLs

Given 2 files, each with 5B URL, each URL has 64B. 4G memory size

  • Memory can store 4 * 10^9 / 64 = 60 M records in total.
  • Each big file has to be loaded about 80 times into memory to read it
  • So calculation the hash of each URL mod 200, and output each to 200 files
  • Load two files with same hash into the same memory, one into a hashset. The other will be doing looking up
    • This step can be parallelized
  • Pipe the results into the final file

1G file with words, each word < 16B. Find the most popular 100 words with 1MB memory

  • Memory fits at least 50k words. File contains at least 100M words
  • Read the file sequentially and hash the words into 2k bucket files, about 0.5 MB per file. Note each file can’t be too huge to defend against the case of all distinct words
  • Read each file to generate the word count for each bucket
  • After it is generated, compare and update with current result in memory
  • if the distict words can fit into memory, we can also use a trie or suffix array to save space

Find all distinct numbers in a file

3B numbers in the file. Unable to fit into the memory

  • Idea 1: hash into bucket files and dedup for each file. Again, each file size < memory to defend against all distinct case
  • Idea 2: bitmap will consume 3G ram. Note bloomfilter may still be too huge because each will consum about 10 bits
  • Similar idea can be used to find if a number exists in the file

Top K in a list of N sorted arrays with same size

  • Maintain a size N heap, and index for each array
  • Populate first element into the heap, with record of which array it is from
  • Get the min of this heap, and get the next item from the same array
  • Stop when we have done this K times

Sort queries by frequency

10 files with 1G size

  • If duplicaiton is high, just use Hashmap in memory to store the count, and then sort over the keys
  • If duplication is low, use bucket hash, and then merge sort

Find median of 5B numbers

  • Idea 1: If can fit into memory, use two heaps, one greater than current median, and one less than current median
  • Invariant: size diff of two heaps < 1
  • Idea 2: Parition based on prefix. Based on size of each file we know count and precisely which file the median resides and in what order, and then we do a precise mapping

System design: Locate the delivery kiosk for user's order

2020-04-06 00:00:00 +0000

Note capacity estimation, storage design, and computation design will keep triggering update of each other

Requirements

  • Each kiosk has multiple delivery boxes, of different dimensions
  • When user places an order, the system should return up to 3 possible kiosks that can accept user’s delivery
  • Handle the NA traffic

Capacity estimation

  • 500 mil NA user, assume each place 1 order at black friday, so peak traffic at 500 mil / 90 k sec per day * 4 = 25 K TPS max
  • Each kiosk has 100 boxes, so it can support 1k population, assumption on normal day 10% people buys. So 500K kiosk is more than enough
  • Each kiosk covers a 0.5 mile radius with little overlap

Storage layer

  • Kiosk needs to keep track of:
  • geohash - needs index to range search
  • number of boxes
  • Box needs to keep track of
  • dimension (H, L, W)
  • parent kiosk
  • Status needs to keep track of
  • Box to the packages ids, one to many
  • parent kiosk if they are in separate service
  • Since kiosk info is rarely updated, kiosk and box info can be in read slaves or cache to scale out read.

Computation sequence

  • Fetch the nearest 10 kiosk from on of the DB’s read slave by range searching on geohash
  • Fetch the empty boxes (< 1k total) in those kiosks, and compute in memory possible empty ones, i.e., WLH > the package’s WLH
  • Return to the user kiosks with empty boxes

Productivity tips and tricks

2020-04-01 00:00:00 +0000

  • Estimate how much time you can allocate, and set aside continous time block for it. Each block should be at least one hour
  • Structured procastination by working on things important but not urgent
  • Checking phone every few minutes is a sign of fatigue, take a break
  • Stop when you feel good and know where the direction is going and come back later
  • Going through email or discuss on a forum is ineffective by nature. Pre-plan time slot for such activities, ideally at fixed time, and don’t worry about it later
  • Don’t think about money or dispute if possible. They are naturally attention sinks
  • Don’t think about damage other people have done to you. They don’t deserve space in your mind

How InnoDB handles deletion

2020-03-31 00:00:00 +0000

  • All user data in InnoDB tables is stored in pages comprising a B-tree index (the clustered index). In some other database systems, this type of index is called an “index-organized table”. Each row in the index node contains the values of the (user-specified or system-generated) primary key and all the other columns of the table.
    • Updates to rows usually rewrite the data within the same page
  • by default, innodb_file_per_table is on. This means each table will have an ibd file
    • each ibd file has multiple segments, each of which is associated with an index
    • each segment consists of multiple 1MB extent
    • Each extend has multiple 16KB pages
  • On delete, the space of the deleted record is marked reusable, if it reaches MERGE_THRESHOLD (default 50%), innodb will try merging it with neighboring pages, and leave the original page blank.
    • Check index_page_merge_successful in INFORMATION_SCHEMA.INNODB_METRICS

Optimize table

  • If you do sequential deletes instead of random deletes, most likely you don’t need to run optimize table, because
  • Purpose of optimize table is to
  • reduce the data_free value in information_schema.tables.
  • defrag index pages.
  • data_free marks the reusable space, and is not accrurate if the table has variable-length column > 768 bytes,e.g., varchar, text
  • Innodb implements opitmize table by alter table force to use the temp table. This also means additional spaces needed during the operation

On forward and backward compatibility

2020-03-26 00:00:00 +0000

Suppose A depends on B to function, i.e., A -> B

  • Forward compatibility means that if a new version of B is deployed, current A should still be able to handle it
    • This means we can upgrade B without breaking A
    • If not, then we have to upgrade A so that it can process both current B and next version of A first
  • Note this is what people normally mean when they talk about backward compatibility, which is not exactly same as the definition. The real definition is that a newer version of A still accepts B

How pt-archiver works

2020-03-25 00:00:00 +0000

Most likely we need bulk insertion and deletion mode. Otherwise, single row insertion mode can barely break 2k rows per sec, i.e., max 200M per day

   $bulkins_file = File::Temp->new( SUFFIX => 'pt-archiver' )
         or die "Cannot open temp file: $OS_ERROR\n";

   while (                                 # Quit if:
      $row                                 # There is no data
      && $retries >= 0                     # or retries are exceeded
      && (!$o->get('run-time') || $now < $end) # or time is exceeded
      && !-f $sentinel                     # or the sentinel is set
      && $oktorun                          # or instructed to quit
      )
   {
 my $lastrow = $row;
      my $escaped_row;
            $escaped_row = escape([@{$row}[@sel_slice]], $fields_separated_by, $optionally_enclosed_by);
  print $bulkins_file $escaped_row, "\n"
                  or die "Cannot write to bulk file: $OS_ERROR\n";
      # Possibly flush the file and commit the insert and delete.
      commit($o) unless $commit_each;

  # Get the next row in this chunk.
      # First time through this loop $get_sth is set to $get_first.
      # For non-bulk operations this means that rows ($row) are archived
      # one-by-one in in the code block above ("row is archivable").  For
      # bulk operations, the 2nd to 2nd-to-last rows are ignored and
      # only the first row ($first_row) and the last row ($last_row) of
      # this chunk are used to do bulk INSERT or DELETE on the range of
      # rows between first and last.  After the bulk ops, $first_row and
      # $last_row are reset to the next chunk.
      # $last_row are reset to the next chunk.
      if ( $get_sth->{Active} ) { # Fetch until exhausted
         $row = $get_sth->fetchrow_arrayref();
      }
  if ( !$row ) {
          $bulkins_file->close()
               or die "Cannot close bulk insert file: $OS_ERROR\n";

            my $ins_sth; # Let plugin change which sth is used for the INSERT.
            $ins_sth ||= $ins_row; # Default to the sth decided before.
            my $success = do_with_retries($o, 'bulk_inserting', sub {
               $ins_sth->execute($bulkins_file->filename());
               $src->{dbh}->do("SELECT 'pt-archiver keepalive'") if $src;
               PTDEBUG && _d('Bulk inserted', $del_row->rows, 'rows');
               $statistics{INSERT} += $ins_sth->rows;
            });

#Notice no checksum is performed, the correctness is ensured by deletion after the insertion 
   my $success = do_with_retries($o, 'bulk_deleting', sub {
                  $del_row->execute(
                     @{$first_row}[@bulkdel_slice],
                     @{$lastrow}[@bulkdel_slice],
                  );
                  PTDEBUG && _d('Bulk deleted', $del_row->rows, 'rows');
                  $statistics{DELETE} += $del_row->rows;
               });

        commit($o, 1) if $commit_each;
         $get_sth = $get_next;
     PTDEBUG && _d('Fetching rows in next chunk');
         trace('select', sub {
            my $select_start = time;
            $get_sth->execute(@{$lastrow}[@asc_slice]);
            $last_select_time = time - $select_start;
            PTDEBUG && _d('Fetched', $get_sth->rows, 'rows');
            $statistics{SELECT} += $get_sth->rows;
         });
  @beginning_of_txn = @{$lastrow}[@asc_slice] unless $txn_cnt;
         $row              = $get_sth->fetchrow_arrayref();
         $first_row        = $row ? [ @$row ] : undef;
 $bulkins_file = File::Temp->new( SUFFIX => 'pt-archiver' )
               or die "Cannot open temp file: $OS_ERROR\n";
}
}

How is character escaped

# Formats a row the same way SELECT INTO OUTFILE does by default.  This is
# described in the LOAD DATA INFILE section of the MySQL manual,
# http://dev.mysql.com/doc/refman/5.0/en/load-data.html
sub escape {
   my ($row, $fields_separated_by, $optionally_enclosed_by) = @_;
   $fields_separated_by ||= "\t";
   $optionally_enclosed_by ||= '';

# Note that we don't try escaping separator here. So we may have problem escaping if the field contains the separator itself
   return join($fields_separated_by, map {
      s/([\t\n\\])/\\$1/g if defined $_;  # Escape tabs etc
      my $s = defined $_ ? $_ : '\N';             # NULL = \N
      # var & ~var will return 0 only for numbers
      if ($s !~ /^[0-9,.E]+$/  && $optionally_enclosed_by eq '"') {
          $s =~ s/([^\\])"/$1\\"/g;
          $s = $optionally_enclosed_by."$s".$optionally_enclosed_by;
      }
      # $_ =~ s/([^\\])"/$1\\"/g if ($_ !~ /^[0-9,.E]+$/  && $optionally_enclosed_by eq '"');
      # $_ = $optionally_enclosed_by && ($_ & ~$_) ? $optionally_enclosed_by."$_".$optionally_enclosed_by : $_;
      chomp $s;
      $s;
   } @$row);
}