On Mysql's async replication for production

2020-03-24 00:00:00 +0000

How async replication works in mysql

  • Async replication is the default behavior. Semi-sync replication is introduced in 5.7
  • Master writes changes to a local binlog file. Note SELECT or SHOW will not be written to the binlog since they don’t affect slave state
  • Note that this binlog is different from innodb’s undo and redo log. It is main purpose is for replication and recovery after restoring from a backup
  • On AWS RDS, binlog is not turned on by default and can be configured to remain alive for max 7 days
  • This additional binlog write introduces additional performance penality. On Aurora, we observe at least 1/3 reduction in through if we turn on binlog
  • The binlog file has a buffer controlled by binlog_cache_size. Temp file will be open buffer is not enough
  • Since it is just a file, it is subject to failures during flush/fsych experienced by normal files. The default behavior is that server will shut down upon detecting such issues
  • Slave pulls from the master. Master does NOT try pushing to the slave
  • By default, slave does not write it is own binlog
  • Any detected corruption/data loss in the binlog means that replication should be restarted from scractch again. Current slave’s data should be discarded

Complications from production setup

On production, mysql is normally on active-standby with semisync or sync replicaiton between them. This introduces additonal complications on async replicaiton. We call current active X, standby Y, the X-Y cluster source, and the async replica sink

From the source pov:

  • To ensure async replication remains working after Y got promoted to master after X failed for some reason, at minimum GTID mode has to be turned, but this caused write stalls on master db, especially Aurora
  • Note that even with GTID mode on, we can not ensure no loss of binlog data which has been persisted on X, although chance is greatly reduced

From the sink pov:

  • Suppose failover between X and Y happened, the pulling connection to X may still be active and alive, while the new active is Y. This means when the sink pulls binlog from a connection pool (most likely in the network applications), it may be pulling from both X and Y , i.e., a split brain problem.
  • The root cause is that during failover, by CAP, the async replicaiton should be killed and not available to avoid the split brain, but the implmentation does not obey this
  • In practice, split brain problem is not likely to cause problem by turning on log_slave_updates, but split brain is a critical situation that should be avoided conceptually.
  • When such split brain replication happens, either fix it through domain knowledge, or discard the current sink and restart the replication from the scratch
  • This means async replication is good for the case where some slacks are allowed in data consistencies and availabilities, e.g.,
  • Cross region failover instances, where we accept the loss/corruption of couple rows so that we can recover in the case of main region data loss
  • read slave for OLAP workload/pipelines. So we have a mostly up-to-date data source with little additional workload on the master
  • Similarly, async replication is not a good fit for mission critical systems - they will have to be on master or a sync-replicated replica

How uid genenerator generates snowflake id

2020-03-16 00:00:00 +0000

Default uid generator

    private long getCurrentSecond() {
        long currentSecond = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
        if (currentSecond - epochSeconds > bitsAllocator.getMaxDeltaSeconds()) {
            throw new UidGenerateException("Timestamp bits is exhausted. Refusing UID generate. Now: " + currentSecond);
        }

        return currentSecond;
    }

   protected synchronized long nextId() {
        long currentSecond = getCurrentSecond();

        // Clock moved backwards, refuse to generate uid
        if (currentSecond < lastSecond) {
            long refusedSeconds = lastSecond - currentSecond;
            throw new UidGenerateException("Clock moved backwards. Refusing for %d seconds", refusedSeconds);
        }

        // At the same second, increase sequence
        if (currentSecond == lastSecond) {
            sequence = (sequence + 1) & bitsAllocator.getMaxSequence();
            // Exceed the max sequence, we wait the next second to generate uid
            if (sequence == 0) {
                currentSecond = getNextSecond(lastSecond); //this one just self-spins
            }

        // At the different second, sequence restart from zero
        } else {
            sequence = 0L;
        }

        lastSecond = currentSecond;

        // Allocate bits for UID
        return bitsAllocator.allocate(currentSecond - epochSeconds, workerId, sequence);
    }

CachedUidGenerator

Data structure


public class BufferPaddingExecutor {

  /** Whether buffer padding is running */
    private final AtomicBoolean running;

    /** We can borrow UIDs from the future, here store the last second we have consumed */
    private final PaddedAtomicLong lastSecond;  // this is inited with system timestamp, after that it becomes the logical second

    /** RingBuffer & BufferUidProvider */
    private final RingBuffer ringBuffer;
    private final BufferedUidProvider uidProvider;

    /** Padding immediately by the thread pool */
    private final ExecutorService bufferPadExecutors;
    /** Padding schedule thread */
    private final ScheduledExecutorService bufferPadSchedule;
}

Init CachedUidGenerator


    private void initRingBuffer() {
        // initialize RingBufferPaddingExecutor
        boolean usingSchedule = (scheduleInterval != null);
        this.bufferPaddingExecutor = new BufferPaddingExecutor(ringBuffer, this::nextIdsForOneSecond, usingSchedule);
        if (usingSchedule) {
            bufferPaddingExecutor.setScheduleInterval(scheduleInterval);
        }
}

 this.lastSecond = new PaddedAtomicLong(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()));

Actual population logic


 public void paddingBuffer() {
     boolean isFullRingBuffer = false;
        while (!isFullRingBuffer) {
            List<Long> uidList = uidProvider.provide(lastSecond.incrementAndGet()); //the only place lastSecond is mutated other than init
            for (Long uid : uidList) {
                isFullRingBuffer = !ringBuffer.put(uid);
                if (isFullRingBuffer) {
                    break;
                }
            }
        }
}

Query timeout for Mysql jdbc

2020-03-11 00:00:00 +0000

Query timeout implementation in mysql jdbc

mysql-connector-j version 5.1.48

Data structures


/**
 * A Statement object is used for executing a static SQL statement and obtaining
 * the results produced by it.
 * 
 * Only one ResultSet per Statement can be open at any point in time. Therefore, if the reading of one ResultSet is interleaved with the reading of another,
 * each must have been generated by different Statements. All statement execute methods implicitly close a statement's current ResultSet if an open one exists.
 */
public class StatementImpl implements Statement {

  /**
     * Thread used to implement query timeouts...Eventually we could be more
     * efficient and have one thread with timers, but this is a straightforward
     * and simple way to implement a feature that isn't used all that often. (?!!!)
     */
    /** The physical connection used to effectively execute the statement */
    protected Reference<MySQLConnection> physicalConnection = null;

    class CancelTask extends TimerTask {
        SQLException caughtWhileCancelling = null;
        StatementImpl toCancel;
        Properties origConnProps = null;
        String origConnURL = "";
}

}

    private BooleanConnectionProperty queryTimeoutKillsConnection = new BooleanConnectionProperty("queryTimeoutKillsConnection", false,
            Messages.getString("ConnectionProperties.queryTimeoutKillsConnection"), "5.1.9", MISC_CATEGORY, Integer.MIN_VALUE);

What does CancelTask do

Inside TimerTask.run(), it starts a new thread. Inside Thread.run()

Only happy path with default value here. Comment inline

                    Connection cancelConn = null;
                    java.sql.Statement cancelStmt = null;

                    try {
                        MySQLConnection physicalConn = StatementImpl.this.physicalConnection.get();
                                synchronized (StatementImpl.this.cancelTimeoutMutex) {
                                    if (CancelTask.this.origConnURL.equals(physicalConn.getURL())) {
                                        // All's fine
                                        cancelConn = physicalConn.duplicate(); //this one will create a new connection
                                        cancelStmt = cancelConn.createStatement();
                                        cancelStmt.execute("KILL QUERY " + physicalConn.getId());
                                    } else {
                                        try {
                                            cancelConn = (Connection) DriverManager.getConnection(CancelTask.this.origConnURL, CancelTask.this.origConnProps);
                                            cancelStmt = cancelConn.createStatement();
                                            cancelStmt.execute("KILL QUERY " + CancelTask.this.origConnId);
                                        } catch (NullPointerException npe) {
                                            // Log this? "Failed to connect to " + origConnURL + " and KILL query"
                                        }
                                    }
                                    CancelTask.this.toCancel.wasCancelled = true;
                                    CancelTask.this.toCancel.wasCancelledByTimeout = true;
                                }
                    } catch (SQLException sqlEx) {
                        CancelTask.this.caughtWhileCancelling = sqlEx;
                    } catch (NullPointerException npe) {
                        // Case when connection closed while starting to cancel.
                        // We can't easily synchronize this, because then one thread can't cancel() a running query.
                        // Ignore, we shouldn't re-throw this, because the connection's already closed, so the statement has been timed out.
                    } finally {
                        if (cancelStmt != null) {
                            try {
                                cancelStmt.close();
                            } catch (SQLException sqlEx) {
                                throw new RuntimeException(sqlEx.toString());
                            }
                        }

                        if (cancelConn != null) {
                            try {
                                cancelConn.close();
                            } catch (SQLException sqlEx) {
                                throw new RuntimeException(sqlEx.toString());
                            }
                        }

                        CancelTask.this.toCancel = null;
                        CancelTask.this.origConnProps = null;
                        CancelTask.this.origConnURL = null;
                    }


How is CancelTask used


 public java.sql.ResultSet executeQuery(String sql) throws SQLException {
 CancelTask timeoutTask = null;
        if (locallyScopedConn.getEnableQueryTimeouts() && this.timeoutInMillis != 0 && locallyScopedConn.versionMeetsMinimum(5, 0, 0)) {
                    timeoutTask = new CancelTask(this);
                    locallyScopedConn.getCancelTimer().schedule(timeoutTask, this.timeoutInMillis);
                }

   this.results = locallyScopedConn.execSQL(this, sql, this.maxRows, null, this.resultSetType, this.resultSetConcurrency,
                        createStreamingResultSet(), this.currentCatalog, cachedFields);

                if (timeoutTask != null) {
                    if (timeoutTask.caughtWhileCancelling != null) {
                        throw timeoutTask.caughtWhileCancelling;
                    }

                    timeoutTask.cancel();

                    locallyScopedConn.getCancelTimer().purge();

                    timeoutTask = null;
                }

 synchronized (this.cancelTimeoutMutex) {
                    if (this.wasCancelled) {
                        SQLException cause = null;

                        if (this.wasCancelledByTimeout) {
                            cause = new MySQLTimeoutException();
                        } else {
                            cause = new MySQLStatementCancelledException();
                        }

                        resetCancelledState();

                        throw cause;
                    }
                }
//inside finally
                if (timeoutTask != null) {
                    timeoutTask.cancel();

                    locallyScopedConn.getCancelTimer().purge();
                }
}

Implication of this implementation

  • By default, if the query times out, the connection will not be closed
  • Sending the KILL query is a best effort attempt. The more reliable way is to use mysql’s max_execution_time hint. Socket timeout can only mitigate but not solve defending against the slow query problem

Reading Notes: Effective Executive

2020-03-10 00:00:00 +0000

Chapter 1 Effectiveness Can Be Learned

  • Manual work is about doing things right. Knowledge work is about doing the right thing
  • Knowledge worker (KWer) can not be supervised in detail. He has to direct himself
  • KWer must make deicsions and live by its result. He should be the best person to make the call
  • KW is defined by its result, NOT costs or quantity
  • KW cann’t let events direct his action for too long - he will be operating
  • Insight the org it is all about effort and cost - even for profit center
  • One tends to be drawn into the challenges inside the org, instead of events to the outside.
  • By the time revelent outside events becomes measureable it is too late, because being able to measure the event means one has to build concept first
  • Change in trends is more important than trend itself, but they can only be preceived rather than counted.
  • Don’t need to know the detail of your collaborationer’s work, but need to know why it is there and what is happening
  • Ask the expected result instead of work to be done
  • Effective decision is based on dissenting opinions rather than concensus on the facts

Chapter 2 Know Thy Time

  • Plan with time rather than work. Consolidate them into the larget possible chunks
  • Need to spend much time with people to get idea across
  • Fast personnel decision is likely to be wrong decisions. Make them several times before committing - need hours of continous and uninterrrupted thoughts
  • Cutting important things by mistake is easy to correct
  • 90 mins continous work + 30 mins interruptive work
  • Schedule morning work period at home. This is better than after dinner because people will be too tired to do a good job

Chapter 3 What Can I Contribute?

  • Contribution is importatnt in all 3 areas: direct result, building of the people, building of the value
  • Justify the payroll based on contribution rather than effort
  • Doing a job well means he is specialized, which also means retooling when moving to a new position

Chapter 4 Making Strength Productive

  • Fill the position by strength rather than weakness. If weakness is blocking the strength, fix it by work or career oppurtunities
  • Build job by task rather than personality,i.e., don’t create job for a person
  • Indispensible man means, at one of
  • he is incompetent to he shields himself from demands
  • the superior is weak
  • his strength is misused to delay tackling a serious problem, if not hiding it
  • Rapidly promoted superior easier to lead success. A un-performing superior only means new one will be brought from outside
  • Help boss overcome limitation and use his strength. Present with area related to his strength first, to help him understand the situation
  • Listening or reading - people have different preference in receiving info
  • Easier to raise the performance of 1 leader instead of the mass. He should be put into performance-making, standard-setting position
  • Claiming the lack of authority/access is often a cover-up for inertia

Chapter 5 First Things First

  • Double buffering of work could work on condition each is given a min amount of continous time
  • Yesterday’s success always lingers longer than the productive life
  • Pressure driven prioritization means focusing on yesterday instead of tomorrow, inside rather than outside, i.e., urgent tasks pushs back important ones
  • Reprioritization at the end of each task

Chapter 6 The Elements of Decision-making

  • Focus on what is “right”, i.e., satisfies specs, before worrying about compromises and concessions. Stakeholders know how to make compromise better than you, but they need your help figuring out what is right
  • Incomplete explanation is often more dangerous than the totally wrong explanation
  • an executive who makes many decisions is both lazy and ineffectual, because he failed to see the truly generic problems

Chapter 7 Effective Decisions

  • Most likely a choice between two courses neither is more provably nearly right than the other
  • Fact requires criteria for relevance
  • Effective decisions grow out of clash of divergent opinions and competing alternatives. In fact, one does not make a decision unless there is disagreement
  • whoever voices the opinion should be responsible for the factual findings
  • Find the measurment is more about risk-taking judgement, since it is a judgement, alternatives measurements must be considered too
  • Measurements sound on paper is still meaningless if you can’t convert to step-to-step guide for a computer
  • Act if on balance the benefits greatly outweigh cost and risk; and Act or do not act; but do not “hedge” or compromise.

What triggers the txnLockFast failure?

2020-03-09 00:00:00 +0000

Source version 2.1

What is txnLockFast error

This is triggered when read txn with read_start_ts happens between the write txn’s write_start_ts and write_commit_ts. Upon seeing this, the read txn will backoff retry later witht the same read_start_ts, so that we preserve snapshot isolation. The effect of txnLockFast is similar to pessimistic lock but with higher latency

tidb_tikvclient_backoff_seconds_count

Triggered by expression increase( tidb_tikvclient_backoff_seconds_count[10m] ) > 10

tidb_tikvclient_backoff_seconds_count is a range vector which contains counters. This expression means it happensin more than 10 of the seconds in the last 10 minutes

Data structures

Retriever is the interface wraps the basic Get and Seek methods. Retriever is implemented by BufferStore -> unionStore -> tikvSnapshot -> tikvTxn

Transaction defines the interface for operations inside a Transaction. Transaction is implemented by tikvTxn -> TxnState

// Scanner support tikv scan
type Scanner struct {
	snapshot     *tikvSnapshot
	batchSize    int
	valid        bool
	cache        []*pb.KvPair
	idx          int
	nextStartKey []byte
	endKey       []byte
	eof          bool

	// Use for reverse scan.
	reverse    bool
	nextEndKey []byte
}

What triggers txnLockFast?

  • Commit check
    • TxnState also implements BatchGet but seems not used.
    • tikvTxn also implments Get but not used except for testing

//part of Retriever interface
func (us *unionStore) Get(k Key) ([]byte, error) {
  us.markLazyConditionPair(k, nil, e.(error))
}

//part of Transaction interface
func (txn *tikvTxn) Commit(ctx context.Context) error {
 txn.us.CheckLazyConditionPairs()
}

func (us *unionStore) CheckLazyConditionPairs() error {
 us.snapshot.BatchGet(keys)//this may trigger txnLockFast
}

  • Scan keys in tikv

func (s *Scanner) Next() error {
   s.resolveCurrentLock(bo, current)
}

func (s *Scanner) resolveCurrentLock(bo *Backoffer, current *pb.KvPair) error {
  s.snapshot.get(bo, kv.Key(current.Key))
}

  • Coprocesser

// handleCopResponse checks coprocessor Response for region split and lock,
// returns more tasks when that happens, or handles the response if no error.
// if we're handling streaming coprocessor response, lastRange is the range of last
// successful response, otherwise it's nil.
func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, resp *copResponse, task *copTask, ch chan<- *copResponse, lastRange *coprocessor.KeyRange) ([]*copTask, error) {

		logutil.Logger(context.Background()).Debug("coprocessor encounters",
			zap.Stringer("lock", lockErr))
		ok, err1 := worker.store.lockResolver.ResolveLocks(bo, []*Lock{NewLock(lockErr)})
		if err1 != nil {
			return nil, errors.Trace(err1)
		}
		if !ok {
			if err := bo.Backoff(boTxnLockFast, errors.New(lockErr.String())); err != nil {
				return nil, errors.Trace(err)
			}
		}
}

How tidb implments 2PC

2020-03-05 00:00:00 +0000

Source version 2.1

Data structures


// tikvSnapshot implements the kv.Snapshot interface.
type tikvSnapshot struct {
	store        *tikvStore
	version      kv.Version
	priority     pb.CommandPri
	notFillCache bool
	syncLog      bool
	keyOnly      bool
	vars         *kv.Variables
}

//store/tikv/txn.go, see to represent txn actions on the server(tikv) side 
// tikvTxn implements kv.Transaction.
type tikvTxn struct { 
	snapshot  *tikvSnapshot
	us        kv.UnionStore
	store     *tikvStore // for connection to region.
	startTS   uint64
	startTime time.Time // Monotonic timestamp for recording txn time consuming.
	commitTS  uint64
	valid     bool
	lockKeys  [][]byte
	mu        sync.Mutex // For thread-safe LockKeys function.
	dirty     bool
	setCnt    int64
	vars      *kv.Variables
}

//session/txn.go, seems to represent txn on the client(tidb) side

// TxnState wraps kv.Transaction to provide a new kv.Transaction.
// 1. It holds all statement related modification in the buffer before flush to the txn,
// so if execute statement meets error, the txn won't be made dirty.
// 2. It's a lazy transaction, that means it's a txnFuture before StartTS() is really need.
type TxnState struct { 
	// States of a TxnState should be one of the followings:
	// Invalid: kv.Transaction == nil && txnFuture == nil
	// Pending: kv.Transaction == nil && txnFuture != nil
	// Valid:	kv.Transaction != nil && txnFuture == nil
	kv.Transaction //embeded type, means TxnState now implements all interface of Transaction
	txnFuture *txnFuture

	buf          kv.MemBuffer
	mutations    map[int64]*binlog.TableMutation
	dirtyTableOP []dirtyTableOperation

	// If doNotCommit is not nil, Commit() will not commit the transaction.
	// doNotCommit flag may be set when StmtCommit fail.
	doNotCommit error
}

type session struct {
	txn         TxnState

	mu struct {
		sync.RWMutex
		values map[fmt.Stringer]interface{}
	}

	store kv.Storage

	sessionVars    *variable.SessionVars
	sessionManager util.SessionManager
}

// twoPhaseCommitter executes a two-phase commit protocol.
type twoPhaseCommitter struct {
	store     *tikvStore //it is the same store in txn.store
	txn       *tikvTxn 
	startTS   uint64
	keys      [][]byte
	mutations map[string]*pb.Mutation
	lockTTL   uint64
	commitTS  uint64
	mu        struct {
		sync.RWMutex
		committed       bool
		undeterminedErr error // undeterminedErr saves the rpc error we encounter when commit primary key.
	}
	priority pb.CommandPri
	syncLog  bool
	connID   uint64 // connID is used for log.
	cleanWg  sync.WaitGroup
	detail   *execdetails.CommitDetails

	// The max time a Txn may use (in ms) from its startTS to commitTS.
	// We use it to guarantee GC worker will not influence any active txn. The value
	// should be less than GC life time.
	maxTxnTimeUse uint64
}

When a session commits a single table, DML-only transaction

  • Session.doCommit
    • TxnState.Commit
      • TxnState.Transaction.Commit(ctx). which actally calls
func (txn *tikvTxn) Commit(ctx context.Context) error {
	defer txn.close()
	var connID uint64
	val := ctx.Value(sessionctx.ConnID)
	if val != nil {
		connID = val.(uint64)
	}
	committer, err := newTwoPhaseCommitter(txn, connID)
	// latches disabled
	err = committer.executeAndWriteFinishBinlog(ctx)
}
  • newTwoPhaseCommitter
    • iterate though tikvTxn.UnionStore to collect keys to Put and Del into mutations collection
    • add txn.lockKeys to mutations with Op_lock type
  • executeAndWriteFinishBinlog
	err := c.execute(ctx)
	if err != nil {
		c.writeFinishBinlog(binlog.BinlogType_Rollback, 0)
	} else {
		c.txn.commitTS = c.commitTS
		c.writeFinishBinlog(binlog.BinlogType_Commit, int64(c.commitTS))
	}

//Only happen path, and no binlog 
// execute executes the two-phase commit protocol.
func (c *twoPhaseCommitter) execute(ctx context.Context) error {
	prewriteBo := NewBackoffer(ctx, prewriteMaxBackoff).WithVars(c.txn.vars)
	err := c.prewriteKeys(prewriteBo, c.keys)

	commitTS, err := c.store.getTimestampWithRetry(NewBackoffer(ctx, tsoMaxBackoff).WithVars(c.txn.vars))
	c.commitTS = commitTS
	commitBo := NewBackoffer(ctx, CommitMaxBackoff).WithVars(c.txn.vars)
	err = c.commitKeys(commitBo, c.keys)
	return nil
}

  • Prewrite, commit, and clean up (in the failure case) are just different cases of doActionOnKeys

twoPhaseCommitter.doActionOnKeys

//happy path only
func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) error {
	mutations := make([]*pb.Mutation, len(batch.keys))
	for i, k := range batch.keys {
		mutations[i] = c.mutations[string(k)]
	}

	req := &tikvrpc.Request{
		Type: tikvrpc.CmdPrewrite,
		Prewrite: &pb.PrewriteRequest{
			Mutations:    mutations,
			PrimaryLock:  c.primary(),
			StartVersion: c.startTS,
			LockTtl:      c.lockTTL,
			TxnSize:      uint64(len(batch.keys)),
		},
		Context: pb.Context{
			Priority: c.priority,
			SyncLog:  c.syncLog,
		},
	}
	resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort)
}

func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) error {
	req := &tikvrpc.Request{
		Type: tikvrpc.CmdCommit,
		Commit: &pb.CommitRequest{
			StartVersion:  c.startTS,
			Keys:          batch.keys,
			CommitVersion: c.commitTS,
		},
		Context: pb.Context{
			Priority: c.priority,
			SyncLog:  c.syncLog,
		},
	}
	req.Context.Priority = c.priority

	sender := NewRegionRequestSender(c.store.regionCache, c.store.client)
	resp, err := sender.SendReq(bo, req, batch.region, readTimeoutShort)
}

func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitAction, keys [][]byte) error {
	groups, firstRegion, err := c.store.regionCache.GroupKeysByRegion(bo, keys)

	var batches []batchKeys
	var sizeFunc = c.keySize
	if action == actionPrewrite {
		sizeFunc = c.keyValueSize
	}
	// Make sure the group that contains primary lock key goes first.
	batches = appendBatchBySize(batches, firstRegion, groups[firstRegion], sizeFunc, txnCommitBatchSize)
	delete(groups, firstRegion)
	for id, g := range groups { //other regions we don't care the order
		batches = appendBatchBySize(batches, id, g, sizeFunc, txnCommitBatchSize)
	}

	firstIsPrimary := bytes.Equal(keys[0], c.primary())
	if firstIsPrimary && (action == actionCommit || action == actionCleanup) {
		// primary should be committed/cleanup first
		err = c.doActionOnBatches(bo, action, batches[:1])
		batches = batches[1:]
	}
	if action == actionCommit {
		// Commit secondary batches in background goroutine to reduce latency.
		// The backoffer instance is created outside of the goroutine to avoid
		// potencial data race in unit test since `CommitMaxBackoff` will be updated
		// by test suites.
		secondaryBo := NewBackoffer(context.Background(), CommitMaxBackoff)
		go func() {
			e := c.doActionOnBatches(secondaryBo, action, batches)
		}()
	} else {
		err = c.doActionOnBatches(bo, action, batches)
	}
	return errors.Trace(err)
}

How tidb retries request to tikv

2020-03-03 00:00:00 +0000

Source code version v2.1.18

Data structures

type Backoffer struct {
	ctx context.Context

	fn         map[backoffType]func(context.Context) int
	maxSleep   int
	totalSleep int
	errors     []error
	types      []backoffType
	vars       *kv.Variables
}
  • Sleep setting for each type retry
    • Time in MS
    • EqualJitter means half of the duration is randomized
NewBackoffFn(base, cap, jitter int) func(ctx context.Context) int


	case boTiKVRPC:
		return NewBackoffFn(100, 2000, EqualJitter)
	case BoTxnLock:
		return NewBackoffFn(200, 3000, EqualJitter)
	case boTxnLockFast:
		return NewBackoffFn(vars.BackoffLockFast, 3000, EqualJitter)
	case boPDRPC:
		return NewBackoffFn(500, 3000, EqualJitter)
	case BoRegionMiss:
		return NewBackoffFn(100, 500, NoJitter)
	case BoUpdateLeader:
		return NewBackoffFn(1, 10, NoJitter)
	case boServerBusy:
		return NewBackoffFn(2000, 10000, EqualJitter)

Default max total sleep time in each case


// Maximum total sleep time(in ms) for kv/cop commands.
const (
	copBuildTaskMaxBackoff         = 5000
	tsoMaxBackoff                  = 15000
	scannerNextMaxBackoff          = 20000
	batchGetMaxBackoff             = 20000
	copNextMaxBackoff              = 20000
	getMaxBackoff                  = 20000
	prewriteMaxBackoff             = 20000
	cleanupMaxBackoff              = 20000
	GcOneRegionMaxBackoff          = 20000
	GcResolveLockMaxBackoff        = 100000
	deleteRangeOneRegionMaxBackoff = 100000
	rawkvMaxBackoff                = 20000
	splitRegionBackoff             = 20000
	scatterRegionBackoff           = 20000
	waitScatterRegionFinishBackoff = 120000
)

When do we stop trying

If the backoff attempts have slept more that configured duration, we will return a MySql error that singals that do not retry anymore

	realSleep := f(b.ctx)
	backoffDuration.Observe(float64(realSleep) / 1000)
	b.totalSleep += realSleep
	b.types = append(b.types, typ)

	var startTs interface{} = ""
	if ts := b.ctx.Value(txnStartKey); ts != nil {
		startTs = ts
	}
	logutil.Logger(context.Background()).Debug("retry later",
		zap.Error(err),
		zap.Int("totalSleep", b.totalSleep),
		zap.Int("maxSleep", b.maxSleep),
		zap.Stringer("type", typ),
		zap.Reflect("txnStartTS", startTs))

	b.errors = append(b.errors, errors.Errorf("%s at %s", err.Error(), time.Now().Format(time.RFC3339Nano)))
	if b.maxSleep > 0 && b.totalSleep >= b.maxSleep {
		errMsg := fmt.Sprintf("backoffer.maxSleep %dms is exceeded, errors:", b.maxSleep)
		for i, err := range b.errors {
			// Print only last 3 errors for non-DEBUG log levels.
			if log.GetLevel() == zapcore.DebugLevel || i >= len(b.errors)-3 {
				errMsg += "\n" + err.Error()
			}
		}
		logutil.Logger(context.Background()).Warn(errMsg)
		// Use the first backoff type to generate a MySQL error.
		return b.types[0].TError()
	}

Max sleep for each case

func splitTableRanges(t table.PhysicalTable, store kv.Storage, startHandle, endHandle int64) ([]kv.KeyRange, error) 
	maxSleep := 10000 // ms
	bo := tikv.NewBackoffer(context.Background(), maxSleep)
}
func getPhysicalTableRegions(physicalTableID int64, tableInfo *model.TableInfo, tikvStore tikv.Storage, s kv.SplitableStore, uniqueRegionMap map[uint64]struct{}) ([]regionMeta, error) {
 recordRegionMetas, err := regionCache.LoadRegionsInKeyRange(tikv.NewBackoffer(context.Background(), 20000), startKey, endKey)
}
func (c *twoPhaseCommitter) execute(ctx context.Context) error {
prewriteBo := NewBackoffer(ctx, prewriteMaxBackoff).WithVars(c.txn.vars)//defaults 20 seconds
commitBo := NewBackoffer(ctx, CommitMaxBackoff).WithVars(c.txn.vars)//defaults 41 seconds
}

Health check and availability

2020-02-26 00:00:00 +0000

the morning paper

  • ELB default health check timeout is 10 sec, which is overly generious for the same region case. Normally 2 second timeout with 3 attempts to declare failure is enough
  • Otherwise, by default, the instance will be taken offline at least 30 seconds after the failure happens, with traffic still directed to the faulty instance.
  • Problem with time-based availability metrics: MTTF / (MTTF + MTTR)
    • In distributed systems, common to have part of it failed somewhere, how do you define “down”?
    • How do you differentitate the impact of down during off hour and peak hour
  • Problem with count-based availability metrics: % of successful requests
    • High volume user has higher impact on this metric
    • Less traffic will come when user preceives the system is down, which makes it look better than it actually is
    • Not showing how long a system is down
  • Both metrics above do not capture the down time pattern and different durations of outages
  • The longer the time window, the “better” availability metric appears

How micrometer keeps percentiles and publish to datadog

2020-02-25 00:00:00 +0000

Data structures (bottom up)

  • StepDouble: threadsafe
    • By default, each step represent 1 min
    • value operations are done by DoubleWriter
    • Within the same step, the returned value of poll() will remain same. The value is reset at the each step
  • Histogram: either
    • hdr-based histrogram TimeWindowPercentileHistogram
    • fixed boundary histograms TimeWindowFixedBoundaryHistogram
  • Timer: has an instance of Histogram
  • Meter: A named and dimensioned producer of one or more measurements. All the metrics construct we create in the application code, e.g., Counter, Timer, implements meter
  • DatadogMeterRegistry: implements MeterRegistry -> StepMeterRegistry.
    • Maintains the list of meters
    • Associates a instance of Timer with a HistogramGauges, which translate histogram to the the percentile gauges (with the phi tag) we observe on DD

DatadogMeterRegistry.publish()

  • Gets the API URI: by default is the public API. Normally we sent to dogstatd URL on 127.0.0.1
  • Partition the Meters in the registry to a nested list of Meters. By default, partition size is 10k
  • All data in these 10k meters will be published in a single POST call
  • By default, this method is triggered once per minute. See PushMeterRegistry.start()

On working with distributed teams

2020-02-07 00:00:00 +0000

  • Need to run with more processes as if you are bigger team/org
  • Massive, if not too much, communication need between offices if teams are not autonomous enough
    • Decompose tasks more fine-grained than in the same location
    • Massive communication load during OKR settings for each tasks
  • Arrive at 5 mins early to ensure techs are ready and thus, real meeting starts on time
  • Host side, have one reponsible identifying remote people who want to speak
  • Make explicit overlapping business hours
  • Make sure results are visible to everyone
    • daily and weekly review become a must
  • Hire ONLY self-driven people
  • Project owner needs to have one go-to doc for
    • resources and their owners
    • links to detailed docs
    • current progress and risks
  • One wiki parent page for each team
  • Each team sends out weekly report, summarized and distilled at each level
  • Centralized test logbook, including PoC runs, so that ideas/data can be reused in the future
  • Team should meet face to face at least once per OKR period
  • Need a doc that holds context for all stake holders
  • Major tasks with owners, progress, impact on the overall progress
  • Fine grained enough to ensure daily update

Traps in Chaos Engineering

2020-02-03 00:00:00 +0000

Notes on this post

  • Goal is resilience instead of finding risks
  • Don’t measure KR by errors discovers - they show only ignorance instead of risk
  • Human under pressure IS part of the system to test. Need to optimize how they react
  • Focus on things done right rather than pointing out risks. Don’t chase the fixes
  • Providing context and let service owner decide what to do with the vulnerability
  • Automate the discovery of what is wrong, but not what should be
  • Manual gamedays and chaos experiements in non-prod env are the prerequisite of prod testing
  • Creating experiement often has more value than running them
  • Define “steady state” at first

When will tidb binlog replication lose data

2020-01-30 00:00:00 +0000

Source code

How tidb writes the binlog

  • The pump we deployed is actually the pump server(PS) process. Tidb process has a pump client (PC) that writes to PS during a txn
    • PC uses PS’s interface WriteBinlog(context.Context, *WriteBinlogReq) (*WriteBinlogResp, error)
    • PC implmentation: func (c *PumpsClient) WriteBinlog(binlog *pb.Binlog) error
  • PC maintains a list of available PS and a list of unavailable PS
    • Subscribe to PD and listen for its updates on PS status
    • Reguarly heartbeat to PS on the unavailable list, and try to add it to back to the available list
    • Keep an ErrNum for each PS. Increment it on failed binlog writes, and reset it on success. If a PS’s ErrNum is too high (default 10), add it to the unavailable list
  • If PC failed to write the prewrite binlog:
    • If err is ResourceExhausted error from grpc, stop retrying
    • Otherwise, try every single available pump once
    • If still no luck, try every single unavailable pump once
    • If still no luck, declare that we failed to write to binlog
  • If PC failed to write the commit binlog, PC will not retry, because PS can recover it from Tikv after 10 mins (default max txn timeout)
    • Note this is a common source of replication lag
  • When pump gracefully shutdown, e.g., via pdctl, it will endter the readonly mode first, wait until all drainers have read the binlog, and then shutdown

When do we lose binlog

Assuming we turn on ignore-error

  • binlog stored on PS is lost forever
  • All PSs are down
  • PC encounters grpc’s ResourceExhausted error. Note this case very likely we don’t lose any binlog, because most likely the main txn commit will fail too

Default settings

	// RetryInterval is the interval of retrying to write binlog.
	RetryInterval = 100 * time.Millisecond

// DefaultBinlogWriteTimeout is the default max time binlog can use to write to pump.
	DefaultBinlogWriteTimeout = 15 * time.Second

How druid CP recycles connections

2020-01-23 00:00:00 +0000

Selected data structures

public final class DruidConnectionHolder {

   protected final DruidAbstractDataSource       dataSource;
    protected final long                          connectionId;
    protected final Connection                    conn;
    protected final List<ConnectionEventListener> connectionEventListeners = new CopyOnWriteArrayList<ConnectionEventListener>();
    protected final List<StatementEventListener>  statementEventListeners  = new CopyOnWriteArrayList<StatementEventListener>();
    protected final long                          connectTimeMillis;
    protected volatile long                       lastActiveTimeMillis;
    protected volatile long                       lastExecTimeMillis;
    protected volatile long                       lastKeepTimeMillis;
    protected volatile long                       lastValidTimeMillis;
    protected long                                useCount                 = 0;
    private long                                  keepAliveCheckCount      = 0;
    private long                                  lastNotEmptyWaitNanos;
    private final long                            createNanoSpan;
    protected PreparedStatementPool               statementPool;
    protected final List<Statement>               statementTrace           = new ArrayList<Statement>(2);
    protected final boolean                       defaultReadOnly;
    protected final int                           defaultHoldability;
    protected final int                           defaultTransactionIsolation;
    protected final boolean                       defaultAutoCommit;
    protected boolean                             underlyingReadOnly;
    protected int                                 underlyingHoldability;
    protected int                                 underlyingTransactionIsolation;
    protected boolean                             underlyingAutoCommit;
    protected volatile boolean                    discard                  = false;
    protected volatile boolean                    active                   = false;
    protected final Map<String, Object>           variables;
    protected final Map<String, Object>           globleVariables;
    final ReentrantLock                           lock                     = new ReentrantLock();
}

How shrink() works

Assume we turn on time check and keep-alive


DruidConnectionHolder connection = connections[i];
long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;

if (idleMillis < minEvictableIdleTimeMillis
		&& idleMillis < keepAliveBetweenTimeMillis) {
	break; //stop shrinking all together
		}

if (idleMillis >= minEvictableIdleTimeMillis) {
	if (checkTime && i < checkCount) {
		evictConnections[evictCount++] = connection;
		continue;
	} else if (idleMillis > maxEvictableIdleTimeMillis) {
		evictConnections[evictCount++] = connection;
		continue;
	}
}

if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) {
	keepAliveConnections[keepAliveCount++] = connection;
}

int removeCount = evictCount + keepAliveCount;
if (removeCount > 0) {
	System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
	Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
	poolingCount -= removeCount;
}

  • After this check, it will validate connections for all that requires keep alive
  • Finally
                  int fillCount = minIdle - (activeCount + poolingCount + createTaskCount);
                  for (int i = 0; i < fillCount; ++i) {
                      emptySignal();
                  }
    
  • Validation is done via MySqlValidConnectionChecker and if it is not set

String query = validateQuery;
if (validateQuery == null || validateQuery.isEmpty()) {
	query = DEFAULT_VALIDATION_QUERY;
}

Statement stmt = null;
ResultSet rs = null;
try {
	stmt = conn.createStatement();
	if (validationQueryTimeout > 0) {
		stmt.setQueryTimeout(validationQueryTimeout);
	}
	rs = stmt.executeQuery(query);
	return true;
} finally {
	JdbcUtils.close(rs);
	JdbcUtils.close(stmt);
}

Atcoder notes

2020-01-14 00:00:00 +0000

B - Voting Judges

Simple but good for coding excercise

D - Semi Common Multiple

  • By substraction we know the delta is the LCM of A
  • By factoring we know the init must be the LCM/2, just need to verify that every number satisfies the condition
  • What got me is how LCM is calculated - just use lcm formula rolling through the A. Other approachs couldn’t handle catches
  • The cnt calc formla can be simplfied to M //(lcm//2) - M // lcm

C - Successive Subtraction

E - Max-Min Sums

E - Friendships

F - Sugoroku

F - Interval Running

C - GCD on Blackboard

B - LRUD Game

E - Sequence Decomposing

D - Maximum Sum of Minimum

Code reading notes: sync-diff inspector

2020-01-10 00:00:00 +0000

Major data structures


// TableConfig is the config of table.
type TableConfig struct {
        // table's origin information
        TableInstance
        // columns be ignored, will not check this column's data
        IgnoreColumns []string `toml:"ignore-columns"`
        // field should be the primary key, unique key or field with index
        Fields string `toml:"index-fields"`
        // select range, for example: "age > 10 AND age < 20"
        Range string `toml:"range"`
        // set true if comparing sharding tables with target table, should have more than one source tables.
        IsSharding bool `toml:"is-sharding"`
        // saves the source tables's info.
        // may have more than one source for sharding tables.
        // or you want to compare table with different schema and table name.
        // SourceTables can be nil when source and target is one-to-one correspondence.
        SourceTables    []TableInstance `toml:"source-tables"`
        TargetTableInfo *model.TableInfo

        // collation config in mysql/tidb
        Collation string `toml:"collation"`
}

type Diff struct {
        sourceDBs         map[string]DBConfig
        targetDB          DBConfig
        chunkSize         int
        sample            int
        checkThreadCount  int
        useChecksum       bool
        useCheckpoint     bool
        onlyUseChecksum   bool
        ignoreDataCheck   bool
        ignoreStructCheck bool
        tables            map[string]map[string]*TableConfig
        fixSQLFile        *os.File

        report         *Report
        tidbInstanceID string
        tableRouter    *router.Table
        cpDB           *sql.DB

        ctx context.Context
}

// Bound represents a bound for a column
type Bound struct {
        Column string `json:"column"`
        Lower  string `json:"lower"`
        Upper  string `json:"upper"`

        HasLower bool `json:"has-lower"`
        HasUpper bool `json:"has-upper"`
}

// ChunkRange represents chunk range
type ChunkRange struct {
        ID     int      `json:"id"`
        Bounds []*Bound `json:"bounds"`

        Where string   `json:"where"`
        Args  []string `json:"args"`

        State string `json:"state"`

        columnOffset map[string]int
}

How is diff done

Source

  • CheckTableData loads the checkpointed chuncks data from the context. For the first time running, it will SplitChunks (See below)
  • Spawns multiple channels to check chuncks - checkChunkDataEqual
  • UseChecksum to detect if we go for checksum mode or row-by-row mode
  • Checksum is computed by dbutil.GetCRC32Checksum bounded by chunk.Where, which is a SQL formated by

fmt.Sprintf(
"SELECT BIT_XOR(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) AS checksum FROM %s WHERE %s;",
		strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), TableName(schemaName, tableName), limitRange)

  • An example checksum sql would be

 SELECT BIT_XOR(CAST(CRC32(CONCAT_WS(',', id, name, age, CONCAT(ISNULL(id), ISNULL(name), ISNULL(age))))AS UNSIGNED)) AS checksum 
 FROM test.test WHERE id > 0 AND id < 10;

How are chunks split

Source

  • The Range config in the toml is passed to getChunksForTable
  • Two ways of splitting - bucket spliting and random spliting. Default config goes by random splitting

Random splitting


cnt, err := dbutil.GetRowCount(context.Background(), table.Conn, table.Schema, table.Table, limits, nil)
chunkCnt := (int(cnt) + chunkSize - 1) / chunkSize
chunks, err := splitRangeByRandom(table.Conn, NewChunkRange(), chunkCnt, table.Schema, table.Table, columns, s.limits, s.collation)

  • Randomly pick chunkCnt number of ids as the splitting ID

SELECT `id` FROM (
	SELECT `id`, rand() rand_value FROM `test`.`test`  
	WHERE `id` COLLATE "latin1_bin" > 0 AND `id` COLLATE "latin1_bin" < 100 
	ORDER BY rand_value LIMIT 5) rand_tmp 
ORDER BY `id` COLLATE "latin1_bin";

  • and create new trunk based on these separate id values

Summary Info

Source


CREATE TABLE IF NOT EXISTS `sync_diff_inspector`.`summary`(" +
			"`schema` varchar(64), `table` varchar(64)," +
			"`chunk_num` int not null default 0," +
			"`check_success_num` int not null default 0," +
			"`check_failed_num` int not null default 0," +
			"`check_ignore_num` int not null default 0," +
			"`state` enum('not_checked', 'checking', 'success', 'failed') DEFAULT 'not_checked'," +
			"`config_hash` varchar(50)," +
			"`update_time` datetime ON UPDATE CURRENT_TIMESTAMP," +
			"PRIMARY KEY(`schema`, `table`));
  • Updates every 10 secs

SELECT `state`, COUNT(*) FROM @checkpointSchemaName.@chunkTableName  
WHERE `instance_id` = ? AND `schema` = ? AND `table` = ? 
GROUP BY `state`;

UPDATE `@checkpointSchemaName`.`@summaryTableName` 
SET `chunk_num` = ?, `check_success_num` = ?, `check_failed_num` = ?, `check_ignore_num` = ?, `state` = ? 
WHERE `schema` = ? AND `table` = ?

Chunk Info


CREATE TABLE IF NOT EXISTS `sync_diff_inspector`.`chunk`(" +
			"`chunk_id` int," +
			"`instance_id` varchar(64)," +
			"`schema` varchar(64)," +
			"`table` varchar(64)," +
			"`range` text," +
			"`checksum` varchar(20)," +
			"`chunk_str` text," +
			"`state` enum('not_checked', 'checking', 'success', 'failed', 'ignore', 'error') DEFAULT 'not_checked'," +
			"`update_time` datetime ON UPDATE CURRENT_TIMESTAMP," +
			"PRIMARY KEY(`schema`, `table`, `instance_id`, `chunk_id`));

Atcoder notes

2020-01-05 00:00:00 +0000

C - Candles

ans = dist between ends + min distance from either end

C - Strange Bank

  • No need for memorization. Note in knapsack solution, we just need >= and A[0] instead of checking >
  • Note that 6 coins and 9 coins they don’t intersect, so we just brute force on the 6s amount, and the remaining will be shared by 9s, followed by 1s

C - String Transformation

We can prove that the transformation retains the isomorphism if it starts with one. so we can just verify that final mapping is an isomorphism

C - K-th Substring

  • Brute force won’t work because it will be s ** 2 substrings
  • K no more than 5 means that the top K answer will not be longer than 5 anyway. Otherwise, we can reduce the length of the answer and improve it
  • So we just sort all 25000 strings, each with length 5

C - Factors of Factorial

Calcuate the prime factorization of each number and then build the divisor by the PRODCUT of numbers

D - Rain Flows into Dams

Since it is odd, we can do the odd - even trick and sum up together

C - AB Substrings

Corner cases:

  • no BA
  • no BX and AX

    The general theme is that every time you see a counting formal with minus component, have a dicussion on the cases where the sum could be negative and when the positive item could be 0

C - String Transformation

Every swap will keep the isomorphism, and we can transform to any isomorphism by swapping. Therefore, we just need to ensure the final isomorphsim is good, because it is always reachable

C - Snuke Festival

Good for coding excercise

D - Flipping Signs

Simple problem, but similar to two segment insight

D - DivRem Number

N = q * m + q = q (m + 1), with q LT m. Just need to find all divisor pairs

D - Remainder Reminder

Similar to divisor finding, iterate on all possible divisor b >= max(2, K + 1) to generate a, check how many full b “bands” we have and calculate mod >= K in the last partial band. Note that mod band start at 1! so the partial bond contributes to N % b - K + 1 pairs

Atcoder notes

2020-01-01 00:00:00 +0000

D - Harlequin

  • One state is “some has odd”, it is counter is “all have even”, and it maps to win/lose conditions

C - Align

  • It is obvious the solution will be zigzaged. Therefore, we can generate a formula for the final answer
  • we can store and sort the coefficents, and apply it to the sorted factors - way cleaner than my classification solution

B - Simplified mahjong

  • When A[i] != 0, it is obvious the anser will be floor(S/2) - by sorting and we will reach the upper bound
  • When A[i] == 0, we know the problem can be decomposed into segments
  • So we just sort the cards, and calcuate the size of 0-separated segments, and sum up the result applied by (1)

D - Coloring Dominoes

  • first col needs classification
  • I miscounted 2 * 2 + 2 * 2 case, should have 3 choices instead of 2

C - Base -2 Number

  • Classify based on the mod 4, 00, 01, 10, 11, add 4 to the original number before shifting

B - ABC

  • Replace BC with D and for a list of new strings
  • The inverse cost can be calculated by scan/bubble sort, every time we find a need to swap, the cost is increased by the number of to-be-swapped items

D - Blue and Red Balls

  • Star and band. Separating K into i empty buckets (could be empty) has choose(K + i - 1, K) ways, in the non empty case it becomes (K - 1, K - i). Note it is a direct application of formula
  • My first guess was incorrect. It was too high, and didn’t consider combiniation case

D - Coloring Edges on Tree

The official approach is cleaner than mine

  • We know that answer >= deg(v). In fact, max(deg(v)) is sufficient to give a solution
  • We can just BFS, add keep assigning the color form 1 to K while skipping the color of the edge from grand parent to parent. Such construction ensures the conditions are satisfied
  • Note that DFS is not feasible here because we lose the coloring information on siblings

E - Colorful Hats 2

The key insight is that given colors in 1…i, the color in i+1, can be uniquely determined by picking one color same as the predecessor. The mulitplication factor acts as the “pick”

Index 0 acts as the sentinel value

I got WA in formula - tried to classify 2 color and 3 color case. This implys at the each index, the pick is not unique even with the multiplier - classic anti pattern

D - Lucky PIN

The official brute force solution is cleaner than mine


for i in range(100):
	ds = [i/100, (i/10)%10, i% 10]
	find = 0 
	for i in range N:
		if match:
			find+=1

	if find == 3:
		ans+= 1

It also has a DP approach with 240M ops and 240M memory

C - HonestOrUnkind2

The official soultion is cleaner than mine. Mainly because even if I used bitmask, I still use a second array to track correctness, which is not needed, due to the bit mask

B. Counting of Trees

For reason unknown, i implemented count in a very verbose way…Just a simply dict and update is enough. Although, I missed the check where D[0] has to be 0 due to the condition in the problem

D. Disjoint Set of Common Divisors

The answer is the prime factorization of the GCD. Note that prime factorization can run in sqrt(N), and after sqrt(N), if the number is not 1, then we know that number is the only divisor left, and it is prime.

Proof: by contradiction

  • suppose that remaining number is not prime, then we should have divied it in the previous step, where all divisors no more than sqrt(N) have been discoverd
  • Because it is prime, we know it is the only possible divisor left, otherwise, it would be a product of two divisors > sqrt(n) => contradiction
  • Note by this proof, during coding we don’t have to limit the upperbound to tight sqrt(n), as long as we exhause all divisors under an upperbound it still holds

A - 01 Matrix

A construction solution that I was not able to come up with. One lesson learned is that in matrix solutions, at least one of row or column is “nicely” cut into bands (as min number as possible), and then we twist the other dimension to fit the requirements

C - Strawberry Cakes

The insight is similar to above, cut the matrix into bands and then twist it in the other dimension. Otherwise, implementation will be messy

D - Powerful Discount Tickets

  • I used the bsearch approach. One case to cover is when the remainng tickets are not enough to cover all tickets
  • Official solution uses PQ, the key insight is that floor(X/2 ** Y) = floor(floor(X/2 ** (Y-1))/ 2)

D - Ki

  • Python version gives TLE. C++ version is fine
  • Note that when we traverse trees, after we visit the node, we init the children properties on the spot, i.e., every time we reach a node, all its parent/parent edge info has been known

A - Triangle

  • The key insight that I missed is that suppose one vertex is at (0, 0), then the area of the triangle is abs(x2 * y3 - x3 * y2)/2
  • Another key insight is to set another ONE coordinate to 1 and the other 10^9 to cover the max value case, so we convert to Euclidian divison formula:
      a = bq + r = b(q+ 1) - (b - r)
    

However, I still don’t understand why I need an additional %b to pass the last test case

D - Enough Array

  • Idea 1: prefix sum with bsearch
  • Idea 2: two pointers. Note that when you move the head pointer, the checked index should be i instead of i + 1

System design: service registry

2019-12-30 00:00:00 +0000

Requirements

  • Data consistency
  • How long it takes to discover offline service?
  • How many service instances can it support?
  • How many machines you need to run this registry?

Eureka

  • P2P, all provides to outside services, when it is registred on node, it will be gossiped to other nodes
  • ReadOnly default sync time is 30 sec. Service refreshes on 30 sec interval
  • Default: Every 60 sec to detect offline service. Heartbeat expires at 90 sec

My Design

How Spring resolves circular dependency between beans

2019-12-29 00:00:00 +0000

  • Object’s instantiation is done via reflection ApplicataionContext.getBean(), and after that object’s properties are done via object’s init
  • When object A needs another object B, we will recursively uses ApplicataionContext.getBean() to get A, and then inject into B. At this time, the objects are created, but their properties are not set.
  • Suppose A has B, and B has A as member, on init
    • A insantiated first
    • A finds that B is needed, getBean() on B
      • B is instantiated
      • B finds that A is needed, getBean() on A. At this time, A’s reference is registered already, so Spring returns reference to A in the first step
      • B sets the member reference to A, finishes the init, and return
    • A now gets the reference to B, set its member reference to B, and returns

Implementation


	protected Object getSingleton(String beanName, boolean allowEarlyReference) {
		Object singletonObject = this.singletonObjects.get(beanName);
		if (singletonObject == null && isSingletonCurrentlyInCreation(beanName)) {
		//first time ever to create
			synchronized (this.singletonObjects) {
			//singletonObjects is { bean_name : ObjectFactory} map
				singletonObject = this.earlySingletonObjects.get(beanName);
				if (singletonObject == null && allowEarlyReference) {
				//bean is marked in creation, but not created, do it now
					ObjectFactory<?> singletonFactory = this.singletonFactories.get(beanName);
					if (singletonFactory != null) {
						singletonObject = singletonFactory.getObject();
						this.earlySingletonObjects.put(beanName, singletonObject);
						this.singletonFactories.remove(beanName); //make sure we init only once
					}
				}
			}
		}
		return (singletonObject != NULL_OBJECT ? singletonObject : null);
	}

RocksDB in the context of TiKV

2019-12-27 00:00:00 +0000

RocksDB concepts

  • Column family: Each k-v belongs to only 1 CF. CFs share the same WAL (to support atomic writes), but different memtable and table files. Note classic CF mapping would be (row_key, a sorted list of (column, value)), but in RocksDB it is just (cf, key, value)
  • Write buffer stores the memtable. Block cache is where RocksDB caches data in memory for reads
  • Memtable flushes are by default scheduled on HIGH thread pool, while compactions on LOW thread pool. Stalling memtable flush can stall writes, increasing p99 latency. Set the thread pool with max_background_jobs

Compaction

  • Size-tiered compaction strategy: small SSTable to medium SSTable. trade read and space for write amplification. Compaction merges all sorted runs in one level to create a new sorted run in the next level.
    • A common approach for tiered is to merge sorted runs of similar size, without having the notion of levels
  • Level-based compaction strategy: Each level has at most 1 sorted run. Some data will go to the next level if current level is approaching limit. Trade read/write amplification for space amplification. In paper is all-to-all merge, but in RocksDB it is some-to-some
    • Leveled compaction in RocksDB is also tiered+leveled. There can be N sorted runs at the memtable level courtesy of the max_write_buffer_number option– only one is active for writes, the rest are read-only waiting to be flushed. A memtable flush is similar to tiered compaction – the memtable output creates a new sorted run in L0 and doesn’t read/rewrite existing sorted runs in L0.
  • Sub compaction speed up a compaction job by partitioning it among multiple threads.

Read & write amplification

  • RA counts the number of disk reads to perform a query. It is defined separately for point query and range queries
  • Flash-based storage can be written to only a finite number of times, so write amplification will decrease the flash lifetime
  • LSM with level-based compaction has better write amplification than B-tree

How TiKV uses RocksDB

  • All data in a TiKV node shares two RocksDB instances. One is for data, and the other is for Raft log.
    • The default RocksDB instance stores KV data in the default, write and lock CFs
  • raft logs are stored as region_id/log_id -> value
  • data is stored as key + bit inverted ts -> value. TS is inverted, so that highest bit will be the first item
    • Tikv uses prefix bloom filter, i.e., it predicts on the prefix instead of the whole key
  • Use TableProperties in Split Check, MVCC GC, and compact region range
  • When adding a replica to a new server, generates a SST snapshot and sends to the new server directly
  • Disk space will be released only when tombstones have been compacted