On Mysql's async replication for production
How async replication works in mysql
- Async replication is the default behavior. Semi-sync replication is introduced in 5.7
- Master writes changes to a local binlog file. Note SELECT or SHOW will not be written to the binlog since they don’t affect slave state
- Note that this binlog is different from innodb’s undo and redo log. It is main purpose is for replication and recovery after restoring from a backup
- On AWS RDS, binlog is not turned on by default and can be configured to remain alive for max 7 days
- This additional binlog write introduces additional performance penality. On Aurora, we observe at least 1/3 reduction in through if we turn on binlog
- The binlog file has a buffer controlled by
binlog_cache_size
. Temp file will be open buffer is not enough - Since it is just a file, it is subject to failures during flush/fsych experienced by normal files. The default behavior is that server will shut down upon detecting such issues
- Slave pulls from the master. Master does NOT try pushing to the slave
- By default, slave does not write it is own binlog
- Any detected corruption/data loss in the binlog means that replication should be restarted from scractch again. Current slave’s data should be discarded
Complications from production setup
On production, mysql is normally on active-standby with semisync or sync replicaiton between them. This introduces additonal complications on async replicaiton. We call current active X, standby Y, the X-Y cluster source, and the async replica sink
From the source pov:
- To ensure async replication remains working after Y got promoted to master after X failed for some reason, at minimum GTID mode has to be turned, but this caused write stalls on master db, especially Aurora
- Note that even with GTID mode on, we can not ensure no loss of binlog data which has been persisted on X, although chance is greatly reduced
From the sink pov:
- Suppose failover between X and Y happened, the pulling connection to X may still be active and alive, while the new active is Y. This means when the sink pulls binlog from a connection pool (most likely in the network applications), it may be pulling from both X and Y , i.e., a split brain problem.
- The root cause is that during failover, by CAP, the async replicaiton should be killed and not available to avoid the split brain, but the implmentation does not obey this
- In practice, split brain problem is not likely to cause problem by turning on
log_slave_updates
, but split brain is a critical situation that should be avoided conceptually. - When such split brain replication happens, either fix it through domain knowledge, or discard the current sink and restart the replication from the scratch
- This means async replication is good for the case where some slacks are allowed in data consistencies and availabilities, e.g.,
- Cross region failover instances, where we accept the loss/corruption of couple rows so that we can recover in the case of main region data loss
- read slave for OLAP workload/pipelines. So we have a mostly up-to-date data source with little additional workload on the master
- Similarly, async replication is not a good fit for mission critical systems - they will have to be on master or a sync-replicated replica
How uid genenerator generates snowflake id
Default uid generator
private long getCurrentSecond() {
long currentSecond = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
if (currentSecond - epochSeconds > bitsAllocator.getMaxDeltaSeconds()) {
throw new UidGenerateException("Timestamp bits is exhausted. Refusing UID generate. Now: " + currentSecond);
}
return currentSecond;
}
protected synchronized long nextId() {
long currentSecond = getCurrentSecond();
// Clock moved backwards, refuse to generate uid
if (currentSecond < lastSecond) {
long refusedSeconds = lastSecond - currentSecond;
throw new UidGenerateException("Clock moved backwards. Refusing for %d seconds", refusedSeconds);
}
// At the same second, increase sequence
if (currentSecond == lastSecond) {
sequence = (sequence + 1) & bitsAllocator.getMaxSequence();
// Exceed the max sequence, we wait the next second to generate uid
if (sequence == 0) {
currentSecond = getNextSecond(lastSecond); //this one just self-spins
}
// At the different second, sequence restart from zero
} else {
sequence = 0L;
}
lastSecond = currentSecond;
// Allocate bits for UID
return bitsAllocator.allocate(currentSecond - epochSeconds, workerId, sequence);
}
CachedUidGenerator
Data structure
public class BufferPaddingExecutor {
/** Whether buffer padding is running */
private final AtomicBoolean running;
/** We can borrow UIDs from the future, here store the last second we have consumed */
private final PaddedAtomicLong lastSecond; // this is inited with system timestamp, after that it becomes the logical second
/** RingBuffer & BufferUidProvider */
private final RingBuffer ringBuffer;
private final BufferedUidProvider uidProvider;
/** Padding immediately by the thread pool */
private final ExecutorService bufferPadExecutors;
/** Padding schedule thread */
private final ScheduledExecutorService bufferPadSchedule;
}
Init CachedUidGenerator
private void initRingBuffer() {
// initialize RingBufferPaddingExecutor
boolean usingSchedule = (scheduleInterval != null);
this.bufferPaddingExecutor = new BufferPaddingExecutor(ringBuffer, this::nextIdsForOneSecond, usingSchedule);
if (usingSchedule) {
bufferPaddingExecutor.setScheduleInterval(scheduleInterval);
}
}
this.lastSecond = new PaddedAtomicLong(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()));
Actual population logic
public void paddingBuffer() {
boolean isFullRingBuffer = false;
while (!isFullRingBuffer) {
List<Long> uidList = uidProvider.provide(lastSecond.incrementAndGet()); //the only place lastSecond is mutated other than init
for (Long uid : uidList) {
isFullRingBuffer = !ringBuffer.put(uid);
if (isFullRingBuffer) {
break;
}
}
}
}
Query timeout for Mysql jdbc
Query timeout implementation in mysql jdbc
mysql-connector-j version 5.1.48
Data structures
/**
* A Statement object is used for executing a static SQL statement and obtaining
* the results produced by it.
*
* Only one ResultSet per Statement can be open at any point in time. Therefore, if the reading of one ResultSet is interleaved with the reading of another,
* each must have been generated by different Statements. All statement execute methods implicitly close a statement's current ResultSet if an open one exists.
*/
public class StatementImpl implements Statement {
/**
* Thread used to implement query timeouts...Eventually we could be more
* efficient and have one thread with timers, but this is a straightforward
* and simple way to implement a feature that isn't used all that often. (?!!!)
*/
/** The physical connection used to effectively execute the statement */
protected Reference<MySQLConnection> physicalConnection = null;
class CancelTask extends TimerTask {
SQLException caughtWhileCancelling = null;
StatementImpl toCancel;
Properties origConnProps = null;
String origConnURL = "";
}
}
private BooleanConnectionProperty queryTimeoutKillsConnection = new BooleanConnectionProperty("queryTimeoutKillsConnection", false,
Messages.getString("ConnectionProperties.queryTimeoutKillsConnection"), "5.1.9", MISC_CATEGORY, Integer.MIN_VALUE);
What does CancelTask do
Inside TimerTask.run(), it starts a new thread. Inside Thread.run()
Only happy path with default value here. Comment inline
Connection cancelConn = null;
java.sql.Statement cancelStmt = null;
try {
MySQLConnection physicalConn = StatementImpl.this.physicalConnection.get();
synchronized (StatementImpl.this.cancelTimeoutMutex) {
if (CancelTask.this.origConnURL.equals(physicalConn.getURL())) {
// All's fine
cancelConn = physicalConn.duplicate(); //this one will create a new connection
cancelStmt = cancelConn.createStatement();
cancelStmt.execute("KILL QUERY " + physicalConn.getId());
} else {
try {
cancelConn = (Connection) DriverManager.getConnection(CancelTask.this.origConnURL, CancelTask.this.origConnProps);
cancelStmt = cancelConn.createStatement();
cancelStmt.execute("KILL QUERY " + CancelTask.this.origConnId);
} catch (NullPointerException npe) {
// Log this? "Failed to connect to " + origConnURL + " and KILL query"
}
}
CancelTask.this.toCancel.wasCancelled = true;
CancelTask.this.toCancel.wasCancelledByTimeout = true;
}
} catch (SQLException sqlEx) {
CancelTask.this.caughtWhileCancelling = sqlEx;
} catch (NullPointerException npe) {
// Case when connection closed while starting to cancel.
// We can't easily synchronize this, because then one thread can't cancel() a running query.
// Ignore, we shouldn't re-throw this, because the connection's already closed, so the statement has been timed out.
} finally {
if (cancelStmt != null) {
try {
cancelStmt.close();
} catch (SQLException sqlEx) {
throw new RuntimeException(sqlEx.toString());
}
}
if (cancelConn != null) {
try {
cancelConn.close();
} catch (SQLException sqlEx) {
throw new RuntimeException(sqlEx.toString());
}
}
CancelTask.this.toCancel = null;
CancelTask.this.origConnProps = null;
CancelTask.this.origConnURL = null;
}
How is CancelTask used
public java.sql.ResultSet executeQuery(String sql) throws SQLException {
CancelTask timeoutTask = null;
if (locallyScopedConn.getEnableQueryTimeouts() && this.timeoutInMillis != 0 && locallyScopedConn.versionMeetsMinimum(5, 0, 0)) {
timeoutTask = new CancelTask(this);
locallyScopedConn.getCancelTimer().schedule(timeoutTask, this.timeoutInMillis);
}
this.results = locallyScopedConn.execSQL(this, sql, this.maxRows, null, this.resultSetType, this.resultSetConcurrency,
createStreamingResultSet(), this.currentCatalog, cachedFields);
if (timeoutTask != null) {
if (timeoutTask.caughtWhileCancelling != null) {
throw timeoutTask.caughtWhileCancelling;
}
timeoutTask.cancel();
locallyScopedConn.getCancelTimer().purge();
timeoutTask = null;
}
synchronized (this.cancelTimeoutMutex) {
if (this.wasCancelled) {
SQLException cause = null;
if (this.wasCancelledByTimeout) {
cause = new MySQLTimeoutException();
} else {
cause = new MySQLStatementCancelledException();
}
resetCancelledState();
throw cause;
}
}
//inside finally
if (timeoutTask != null) {
timeoutTask.cancel();
locallyScopedConn.getCancelTimer().purge();
}
}
Implication of this implementation
- By default, if the query times out, the connection will not be closed
- Sending the
KILL
query is a best effort attempt. The more reliable way is to use mysql’smax_execution_time
hint. Socket timeout can only mitigate but not solve defending against the slow query problem
Reading Notes: Effective Executive
Chapter 1 Effectiveness Can Be Learned
- Manual work is about doing things right. Knowledge work is about doing the right thing
- Knowledge worker (KWer) can not be supervised in detail. He has to direct himself
- KWer must make deicsions and live by its result. He should be the best person to make the call
- KW is defined by its result, NOT costs or quantity
- KW cann’t let events direct his action for too long - he will be operating
- Insight the org it is all about effort and cost - even for profit center
- One tends to be drawn into the challenges inside the org, instead of events to the outside.
- By the time revelent outside events becomes measureable it is too late, because being able to measure the event means one has to build concept first
- Change in trends is more important than trend itself, but they can only be preceived rather than counted.
- Don’t need to know the detail of your collaborationer’s work, but need to know why it is there and what is happening
- Ask the expected result instead of work to be done
- Effective decision is based on dissenting opinions rather than concensus on the facts
Chapter 2 Know Thy Time
- Plan with time rather than work. Consolidate them into the larget possible chunks
- Need to spend much time with people to get idea across
- Fast personnel decision is likely to be wrong decisions. Make them several times before committing - need hours of continous and uninterrrupted thoughts
- Cutting important things by mistake is easy to correct
- 90 mins continous work + 30 mins interruptive work
- Schedule morning work period at home. This is better than after dinner because people will be too tired to do a good job
Chapter 3 What Can I Contribute?
- Contribution is importatnt in all 3 areas: direct result, building of the people, building of the value
- Justify the payroll based on contribution rather than effort
- Doing a job well means he is specialized, which also means retooling when moving to a new position
Chapter 4 Making Strength Productive
- Fill the position by strength rather than weakness. If weakness is blocking the strength, fix it by work or career oppurtunities
- Build job by task rather than personality,i.e., don’t create job for a person
- Indispensible man means, at one of
- he is incompetent to he shields himself from demands
- the superior is weak
- his strength is misused to delay tackling a serious problem, if not hiding it
- Rapidly promoted superior easier to lead success. A un-performing superior only means new one will be brought from outside
- Help boss overcome limitation and use his strength. Present with area related to his strength first, to help him understand the situation
- Listening or reading - people have different preference in receiving info
- Easier to raise the performance of 1 leader instead of the mass. He should be put into performance-making, standard-setting position
- Claiming the lack of authority/access is often a cover-up for inertia
Chapter 5 First Things First
- Double buffering of work could work on condition each is given a min amount of continous time
- Yesterday’s success always lingers longer than the productive life
- Pressure driven prioritization means focusing on yesterday instead of tomorrow, inside rather than outside, i.e., urgent tasks pushs back important ones
- Reprioritization at the end of each task
Chapter 6 The Elements of Decision-making
- Focus on what is “right”, i.e., satisfies specs, before worrying about compromises and concessions. Stakeholders know how to make compromise better than you, but they need your help figuring out what is right
- Incomplete explanation is often more dangerous than the totally wrong explanation
- an executive who makes many decisions is both lazy and ineffectual, because he failed to see the truly generic problems
Chapter 7 Effective Decisions
- Most likely a choice between two courses neither is more provably nearly right than the other
- Fact requires criteria for relevance
- Effective decisions grow out of clash of divergent opinions and competing alternatives. In fact, one does not make a decision unless there is disagreement
- whoever voices the opinion should be responsible for the factual findings
- Find the measurment is more about risk-taking judgement, since it is a judgement, alternatives measurements must be considered too
- Measurements sound on paper is still meaningless if you can’t convert to step-to-step guide for a computer
- Act if on balance the benefits greatly outweigh cost and risk; and Act or do not act; but do not “hedge” or compromise.
What triggers the txnLockFast failure?
Source version 2.1
What is txnLockFast error
This is triggered when read txn with read_start_ts happens between the write txn’s write_start_ts and write_commit_ts. Upon seeing this, the read txn will backoff retry later witht the same read_start_ts, so that we preserve snapshot isolation. The effect of txnLockFast is similar to pessimistic lock but with higher latency
tidb_tikvclient_backoff_seconds_count
Triggered by expression increase( tidb_tikvclient_backoff_seconds_count[10m] ) > 10
tidb_tikvclient_backoff_seconds_count
is a range vector which contains counters. This expression means it happensin more than 10 of the seconds in the last 10 minutes
Data structures
Retriever is the interface wraps the basic Get and Seek methods. Retriever is implemented by BufferStore -> unionStore -> tikvSnapshot -> tikvTxn
Transaction defines the interface for operations inside a Transaction. Transaction
is implemented by tikvTxn -> TxnState
// Scanner support tikv scan
type Scanner struct {
snapshot *tikvSnapshot
batchSize int
valid bool
cache []*pb.KvPair
idx int
nextStartKey []byte
endKey []byte
eof bool
// Use for reverse scan.
reverse bool
nextEndKey []byte
}
What triggers txnLockFast?
- Commit check
TxnState
also implementsBatchGet
but seems not used.tikvTxn
also implmentsGet
but not used except for testing
//part of Retriever interface
func (us *unionStore) Get(k Key) ([]byte, error) {
us.markLazyConditionPair(k, nil, e.(error))
}
//part of Transaction interface
func (txn *tikvTxn) Commit(ctx context.Context) error {
txn.us.CheckLazyConditionPairs()
}
func (us *unionStore) CheckLazyConditionPairs() error {
us.snapshot.BatchGet(keys)//this may trigger txnLockFast
}
- Scan keys in tikv
func (s *Scanner) Next() error {
s.resolveCurrentLock(bo, current)
}
func (s *Scanner) resolveCurrentLock(bo *Backoffer, current *pb.KvPair) error {
s.snapshot.get(bo, kv.Key(current.Key))
}
- Coprocesser
// handleCopResponse checks coprocessor Response for region split and lock,
// returns more tasks when that happens, or handles the response if no error.
// if we're handling streaming coprocessor response, lastRange is the range of last
// successful response, otherwise it's nil.
func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, resp *copResponse, task *copTask, ch chan<- *copResponse, lastRange *coprocessor.KeyRange) ([]*copTask, error) {
logutil.Logger(context.Background()).Debug("coprocessor encounters",
zap.Stringer("lock", lockErr))
ok, err1 := worker.store.lockResolver.ResolveLocks(bo, []*Lock{NewLock(lockErr)})
if err1 != nil {
return nil, errors.Trace(err1)
}
if !ok {
if err := bo.Backoff(boTxnLockFast, errors.New(lockErr.String())); err != nil {
return nil, errors.Trace(err)
}
}
}
How tidb implments 2PC
Source version 2.1
Data structures
// tikvSnapshot implements the kv.Snapshot interface.
type tikvSnapshot struct {
store *tikvStore
version kv.Version
priority pb.CommandPri
notFillCache bool
syncLog bool
keyOnly bool
vars *kv.Variables
}
//store/tikv/txn.go, see to represent txn actions on the server(tikv) side
// tikvTxn implements kv.Transaction.
type tikvTxn struct {
snapshot *tikvSnapshot
us kv.UnionStore
store *tikvStore // for connection to region.
startTS uint64
startTime time.Time // Monotonic timestamp for recording txn time consuming.
commitTS uint64
valid bool
lockKeys [][]byte
mu sync.Mutex // For thread-safe LockKeys function.
dirty bool
setCnt int64
vars *kv.Variables
}
//session/txn.go, seems to represent txn on the client(tidb) side
// TxnState wraps kv.Transaction to provide a new kv.Transaction.
// 1. It holds all statement related modification in the buffer before flush to the txn,
// so if execute statement meets error, the txn won't be made dirty.
// 2. It's a lazy transaction, that means it's a txnFuture before StartTS() is really need.
type TxnState struct {
// States of a TxnState should be one of the followings:
// Invalid: kv.Transaction == nil && txnFuture == nil
// Pending: kv.Transaction == nil && txnFuture != nil
// Valid: kv.Transaction != nil && txnFuture == nil
kv.Transaction //embeded type, means TxnState now implements all interface of Transaction
txnFuture *txnFuture
buf kv.MemBuffer
mutations map[int64]*binlog.TableMutation
dirtyTableOP []dirtyTableOperation
// If doNotCommit is not nil, Commit() will not commit the transaction.
// doNotCommit flag may be set when StmtCommit fail.
doNotCommit error
}
type session struct {
txn TxnState
mu struct {
sync.RWMutex
values map[fmt.Stringer]interface{}
}
store kv.Storage
sessionVars *variable.SessionVars
sessionManager util.SessionManager
}
// twoPhaseCommitter executes a two-phase commit protocol.
type twoPhaseCommitter struct {
store *tikvStore //it is the same store in txn.store
txn *tikvTxn
startTS uint64
keys [][]byte
mutations map[string]*pb.Mutation
lockTTL uint64
commitTS uint64
mu struct {
sync.RWMutex
committed bool
undeterminedErr error // undeterminedErr saves the rpc error we encounter when commit primary key.
}
priority pb.CommandPri
syncLog bool
connID uint64 // connID is used for log.
cleanWg sync.WaitGroup
detail *execdetails.CommitDetails
// The max time a Txn may use (in ms) from its startTS to commitTS.
// We use it to guarantee GC worker will not influence any active txn. The value
// should be less than GC life time.
maxTxnTimeUse uint64
}
When a session commits a single table, DML-only transaction
- Session.doCommit
- TxnState.Commit
- TxnState.Transaction.Commit(ctx). which actally calls
- TxnState.Commit
func (txn *tikvTxn) Commit(ctx context.Context) error {
defer txn.close()
var connID uint64
val := ctx.Value(sessionctx.ConnID)
if val != nil {
connID = val.(uint64)
}
committer, err := newTwoPhaseCommitter(txn, connID)
// latches disabled
err = committer.executeAndWriteFinishBinlog(ctx)
}
- newTwoPhaseCommitter
- iterate though tikvTxn.UnionStore to collect keys to Put and Del into
mutations
collection - add txn.lockKeys to mutations with
Op_lock
type
- iterate though tikvTxn.UnionStore to collect keys to Put and Del into
- executeAndWriteFinishBinlog
err := c.execute(ctx)
if err != nil {
c.writeFinishBinlog(binlog.BinlogType_Rollback, 0)
} else {
c.txn.commitTS = c.commitTS
c.writeFinishBinlog(binlog.BinlogType_Commit, int64(c.commitTS))
}
//Only happen path, and no binlog
// execute executes the two-phase commit protocol.
func (c *twoPhaseCommitter) execute(ctx context.Context) error {
prewriteBo := NewBackoffer(ctx, prewriteMaxBackoff).WithVars(c.txn.vars)
err := c.prewriteKeys(prewriteBo, c.keys)
commitTS, err := c.store.getTimestampWithRetry(NewBackoffer(ctx, tsoMaxBackoff).WithVars(c.txn.vars))
c.commitTS = commitTS
commitBo := NewBackoffer(ctx, CommitMaxBackoff).WithVars(c.txn.vars)
err = c.commitKeys(commitBo, c.keys)
return nil
}
- Prewrite, commit, and clean up (in the failure case) are just different cases of
doActionOnKeys
twoPhaseCommitter.doActionOnKeys
//happy path only
func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) error {
mutations := make([]*pb.Mutation, len(batch.keys))
for i, k := range batch.keys {
mutations[i] = c.mutations[string(k)]
}
req := &tikvrpc.Request{
Type: tikvrpc.CmdPrewrite,
Prewrite: &pb.PrewriteRequest{
Mutations: mutations,
PrimaryLock: c.primary(),
StartVersion: c.startTS,
LockTtl: c.lockTTL,
TxnSize: uint64(len(batch.keys)),
},
Context: pb.Context{
Priority: c.priority,
SyncLog: c.syncLog,
},
}
resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort)
}
func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) error {
req := &tikvrpc.Request{
Type: tikvrpc.CmdCommit,
Commit: &pb.CommitRequest{
StartVersion: c.startTS,
Keys: batch.keys,
CommitVersion: c.commitTS,
},
Context: pb.Context{
Priority: c.priority,
SyncLog: c.syncLog,
},
}
req.Context.Priority = c.priority
sender := NewRegionRequestSender(c.store.regionCache, c.store.client)
resp, err := sender.SendReq(bo, req, batch.region, readTimeoutShort)
}
func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitAction, keys [][]byte) error {
groups, firstRegion, err := c.store.regionCache.GroupKeysByRegion(bo, keys)
var batches []batchKeys
var sizeFunc = c.keySize
if action == actionPrewrite {
sizeFunc = c.keyValueSize
}
// Make sure the group that contains primary lock key goes first.
batches = appendBatchBySize(batches, firstRegion, groups[firstRegion], sizeFunc, txnCommitBatchSize)
delete(groups, firstRegion)
for id, g := range groups { //other regions we don't care the order
batches = appendBatchBySize(batches, id, g, sizeFunc, txnCommitBatchSize)
}
firstIsPrimary := bytes.Equal(keys[0], c.primary())
if firstIsPrimary && (action == actionCommit || action == actionCleanup) {
// primary should be committed/cleanup first
err = c.doActionOnBatches(bo, action, batches[:1])
batches = batches[1:]
}
if action == actionCommit {
// Commit secondary batches in background goroutine to reduce latency.
// The backoffer instance is created outside of the goroutine to avoid
// potencial data race in unit test since `CommitMaxBackoff` will be updated
// by test suites.
secondaryBo := NewBackoffer(context.Background(), CommitMaxBackoff)
go func() {
e := c.doActionOnBatches(secondaryBo, action, batches)
}()
} else {
err = c.doActionOnBatches(bo, action, batches)
}
return errors.Trace(err)
}
How tidb retries request to tikv
Source code version v2.1.18
Data structures
type Backoffer struct {
ctx context.Context
fn map[backoffType]func(context.Context) int
maxSleep int
totalSleep int
errors []error
types []backoffType
vars *kv.Variables
}
- Sleep setting for each type retry
- Time in MS
EqualJitter
means half of the duration is randomized
NewBackoffFn(base, cap, jitter int) func(ctx context.Context) int
case boTiKVRPC:
return NewBackoffFn(100, 2000, EqualJitter)
case BoTxnLock:
return NewBackoffFn(200, 3000, EqualJitter)
case boTxnLockFast:
return NewBackoffFn(vars.BackoffLockFast, 3000, EqualJitter)
case boPDRPC:
return NewBackoffFn(500, 3000, EqualJitter)
case BoRegionMiss:
return NewBackoffFn(100, 500, NoJitter)
case BoUpdateLeader:
return NewBackoffFn(1, 10, NoJitter)
case boServerBusy:
return NewBackoffFn(2000, 10000, EqualJitter)
Default max total sleep time in each case
// Maximum total sleep time(in ms) for kv/cop commands.
const (
copBuildTaskMaxBackoff = 5000
tsoMaxBackoff = 15000
scannerNextMaxBackoff = 20000
batchGetMaxBackoff = 20000
copNextMaxBackoff = 20000
getMaxBackoff = 20000
prewriteMaxBackoff = 20000
cleanupMaxBackoff = 20000
GcOneRegionMaxBackoff = 20000
GcResolveLockMaxBackoff = 100000
deleteRangeOneRegionMaxBackoff = 100000
rawkvMaxBackoff = 20000
splitRegionBackoff = 20000
scatterRegionBackoff = 20000
waitScatterRegionFinishBackoff = 120000
)
When do we stop trying
If the backoff attempts have slept more that configured duration, we will return a MySql error that singals that do not retry anymore
realSleep := f(b.ctx)
backoffDuration.Observe(float64(realSleep) / 1000)
b.totalSleep += realSleep
b.types = append(b.types, typ)
var startTs interface{} = ""
if ts := b.ctx.Value(txnStartKey); ts != nil {
startTs = ts
}
logutil.Logger(context.Background()).Debug("retry later",
zap.Error(err),
zap.Int("totalSleep", b.totalSleep),
zap.Int("maxSleep", b.maxSleep),
zap.Stringer("type", typ),
zap.Reflect("txnStartTS", startTs))
b.errors = append(b.errors, errors.Errorf("%s at %s", err.Error(), time.Now().Format(time.RFC3339Nano)))
if b.maxSleep > 0 && b.totalSleep >= b.maxSleep {
errMsg := fmt.Sprintf("backoffer.maxSleep %dms is exceeded, errors:", b.maxSleep)
for i, err := range b.errors {
// Print only last 3 errors for non-DEBUG log levels.
if log.GetLevel() == zapcore.DebugLevel || i >= len(b.errors)-3 {
errMsg += "\n" + err.Error()
}
}
logutil.Logger(context.Background()).Warn(errMsg)
// Use the first backoff type to generate a MySQL error.
return b.types[0].TError()
}
Max sleep for each case
func splitTableRanges(t table.PhysicalTable, store kv.Storage, startHandle, endHandle int64) ([]kv.KeyRange, error)
maxSleep := 10000 // ms
bo := tikv.NewBackoffer(context.Background(), maxSleep)
}
func getPhysicalTableRegions(physicalTableID int64, tableInfo *model.TableInfo, tikvStore tikv.Storage, s kv.SplitableStore, uniqueRegionMap map[uint64]struct{}) ([]regionMeta, error) {
recordRegionMetas, err := regionCache.LoadRegionsInKeyRange(tikv.NewBackoffer(context.Background(), 20000), startKey, endKey)
}
func (c *twoPhaseCommitter) execute(ctx context.Context) error {
prewriteBo := NewBackoffer(ctx, prewriteMaxBackoff).WithVars(c.txn.vars)//defaults 20 seconds
commitBo := NewBackoffer(ctx, CommitMaxBackoff).WithVars(c.txn.vars)//defaults 41 seconds
}
Health check and availability
- ELB default health check timeout is 10 sec, which is overly generious for the same region case. Normally 2 second timeout with 3 attempts to declare failure is enough
- Otherwise, by default, the instance will be taken offline at least 30 seconds after the failure happens, with traffic still directed to the faulty instance.
- Problem with time-based availability metrics: MTTF / (MTTF + MTTR)
- In distributed systems, common to have part of it failed somewhere, how do you define “down”?
- How do you differentitate the impact of down during off hour and peak hour
- Problem with count-based availability metrics: % of successful requests
- High volume user has higher impact on this metric
- Less traffic will come when user preceives the system is down, which makes it look better than it actually is
- Not showing how long a system is down
- Both metrics above do not capture the down time pattern and different durations of outages
- The longer the time window, the “better” availability metric appears
How micrometer keeps percentiles and publish to datadog
Data structures (bottom up)
- StepDouble: threadsafe
- By default, each step represent 1 min
- value operations are done by
DoubleWriter
- Within the same step, the returned value of
poll()
will remain same. The value is reset at the each step
- Histogram: either
- hdr-based histrogram
TimeWindowPercentileHistogram
- fixed boundary histograms
TimeWindowFixedBoundaryHistogram
- hdr-based histrogram
- Timer: has an instance of Histogram
- Meter: A named and dimensioned producer of one or more measurements. All the metrics construct we create in the application code, e.g., Counter, Timer, implements meter
- DatadogMeterRegistry: implements
MeterRegistry
->StepMeterRegistry
.- Maintains the list of meters
- Associates a instance of
Timer
with aHistogramGauges
, which translate histogram to the the percentile gauges (with the phi tag) we observe on DD
DatadogMeterRegistry.publish()
- Gets the API URI: by default is the public API. Normally we sent to dogstatd URL on 127.0.0.1
- Partition the
Meter
s in the registry to a nested list ofMeter
s. By default, partition size is 10k - All data in these 10k meters will be published in a single POST call
- By default, this method is triggered once per minute. See
PushMeterRegistry.start()
On working with distributed teams
- Need to run with more processes as if you are bigger team/org
- Massive, if not too much, communication need between offices if teams are not autonomous enough
- Decompose tasks more fine-grained than in the same location
- Massive communication load during OKR settings for each tasks
- Arrive at 5 mins early to ensure techs are ready and thus, real meeting starts on time
- Host side, have one reponsible identifying remote people who want to speak
- Make explicit overlapping business hours
- Make sure results are visible to everyone
- daily and weekly review become a must
- Hire ONLY self-driven people
- Project owner needs to have one go-to doc for
- resources and their owners
- links to detailed docs
- current progress and risks
- One wiki parent page for each team
- Each team sends out weekly report, summarized and distilled at each level
- Centralized test logbook, including PoC runs, so that ideas/data can be reused in the future
- Team should meet face to face at least once per OKR period
- Need a doc that holds context for all stake holders
- Major tasks with owners, progress, impact on the overall progress
- Fine grained enough to ensure daily update
Traps in Chaos Engineering
Notes on this post
- Goal is resilience instead of finding risks
- Don’t measure KR by errors discovers - they show only ignorance instead of risk
- Human under pressure IS part of the system to test. Need to optimize how they react
- Focus on things done right rather than pointing out risks. Don’t chase the fixes
- Providing context and let service owner decide what to do with the vulnerability
- Automate the discovery of what is wrong, but not what should be
- Manual gamedays and chaos experiements in non-prod env are the prerequisite of prod testing
- Creating experiement often has more value than running them
- Define “steady state” at first
When will tidb binlog replication lose data
How tidb writes the binlog
- The pump we deployed is actually the pump server(PS) process. Tidb process has a pump client (PC) that writes to PS during a txn
- PC uses PS’s interface
WriteBinlog(context.Context, *WriteBinlogReq) (*WriteBinlogResp, error)
- PC implmentation:
func (c *PumpsClient) WriteBinlog(binlog *pb.Binlog) error
- PC uses PS’s interface
- PC maintains a list of available PS and a list of unavailable PS
- Subscribe to PD and listen for its updates on PS status
- Reguarly heartbeat to PS on the unavailable list, and try to add it to back to the available list
- Keep an
ErrNum
for each PS. Increment it on failed binlog writes, and reset it on success. If a PS’sErrNum
is too high (default 10), add it to the unavailable list
- If PC failed to write the prewrite binlog:
- If err is
ResourceExhausted
error from grpc, stop retrying - Otherwise, try every single available pump once
- If still no luck, try every single unavailable pump once
- If still no luck, declare that we failed to write to binlog
- If err is
- If PC failed to write the commit binlog, PC will not retry, because PS can recover it from Tikv after 10 mins (default max txn timeout)
- Note this is a common source of replication lag
- When pump gracefully shutdown, e.g., via pdctl, it will endter the readonly mode first, wait until all drainers have read the binlog, and then shutdown
When do we lose binlog
Assuming we turn on ignore-error
- binlog stored on PS is lost forever
- All PSs are down
- PC encounters grpc’s
ResourceExhausted
error. Note this case very likely we don’t lose any binlog, because most likely the main txn commit will fail too
Default settings
// RetryInterval is the interval of retrying to write binlog.
RetryInterval = 100 * time.Millisecond
// DefaultBinlogWriteTimeout is the default max time binlog can use to write to pump.
DefaultBinlogWriteTimeout = 15 * time.Second
How druid CP recycles connections
Selected data structures
public final class DruidConnectionHolder {
protected final DruidAbstractDataSource dataSource;
protected final long connectionId;
protected final Connection conn;
protected final List<ConnectionEventListener> connectionEventListeners = new CopyOnWriteArrayList<ConnectionEventListener>();
protected final List<StatementEventListener> statementEventListeners = new CopyOnWriteArrayList<StatementEventListener>();
protected final long connectTimeMillis;
protected volatile long lastActiveTimeMillis;
protected volatile long lastExecTimeMillis;
protected volatile long lastKeepTimeMillis;
protected volatile long lastValidTimeMillis;
protected long useCount = 0;
private long keepAliveCheckCount = 0;
private long lastNotEmptyWaitNanos;
private final long createNanoSpan;
protected PreparedStatementPool statementPool;
protected final List<Statement> statementTrace = new ArrayList<Statement>(2);
protected final boolean defaultReadOnly;
protected final int defaultHoldability;
protected final int defaultTransactionIsolation;
protected final boolean defaultAutoCommit;
protected boolean underlyingReadOnly;
protected int underlyingHoldability;
protected int underlyingTransactionIsolation;
protected boolean underlyingAutoCommit;
protected volatile boolean discard = false;
protected volatile boolean active = false;
protected final Map<String, Object> variables;
protected final Map<String, Object> globleVariables;
final ReentrantLock lock = new ReentrantLock();
}
How shrink() works
Assume we turn on time check and keep-alive
DruidConnectionHolder connection = connections[i];
long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;
if (idleMillis < minEvictableIdleTimeMillis
&& idleMillis < keepAliveBetweenTimeMillis) {
break; //stop shrinking all together
}
if (idleMillis >= minEvictableIdleTimeMillis) {
if (checkTime && i < checkCount) {
evictConnections[evictCount++] = connection;
continue;
} else if (idleMillis > maxEvictableIdleTimeMillis) {
evictConnections[evictCount++] = connection;
continue;
}
}
if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) {
keepAliveConnections[keepAliveCount++] = connection;
}
int removeCount = evictCount + keepAliveCount;
if (removeCount > 0) {
System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
poolingCount -= removeCount;
}
- After this check, it will validate connections for all that requires keep alive
- Finally
int fillCount = minIdle - (activeCount + poolingCount + createTaskCount); for (int i = 0; i < fillCount; ++i) { emptySignal(); }
- Validation is done via
MySqlValidConnectionChecker
and if it is not set
String query = validateQuery;
if (validateQuery == null || validateQuery.isEmpty()) {
query = DEFAULT_VALIDATION_QUERY;
}
Statement stmt = null;
ResultSet rs = null;
try {
stmt = conn.createStatement();
if (validationQueryTimeout > 0) {
stmt.setQueryTimeout(validationQueryTimeout);
}
rs = stmt.executeQuery(query);
return true;
} finally {
JdbcUtils.close(rs);
JdbcUtils.close(stmt);
}
Atcoder notes
B - Voting Judges
Simple but good for coding excercise
D - Semi Common Multiple
- By substraction we know the delta is the LCM of A
- By factoring we know the init must be the LCM/2, just need to verify that every number satisfies the condition
- What got me is how LCM is calculated - just use lcm formula rolling through the A. Other approachs couldn’t handle catches
- The cnt calc formla can be simplfied to M //(lcm//2) - M // lcm
C - Successive Subtraction
E - Max-Min Sums
E - Friendships
F - Sugoroku
F - Interval Running
C - GCD on Blackboard
B - LRUD Game
E - Sequence Decomposing
D - Maximum Sum of Minimum
Code reading notes: sync-diff inspector
Major data structures
// TableConfig is the config of table.
type TableConfig struct {
// table's origin information
TableInstance
// columns be ignored, will not check this column's data
IgnoreColumns []string `toml:"ignore-columns"`
// field should be the primary key, unique key or field with index
Fields string `toml:"index-fields"`
// select range, for example: "age > 10 AND age < 20"
Range string `toml:"range"`
// set true if comparing sharding tables with target table, should have more than one source tables.
IsSharding bool `toml:"is-sharding"`
// saves the source tables's info.
// may have more than one source for sharding tables.
// or you want to compare table with different schema and table name.
// SourceTables can be nil when source and target is one-to-one correspondence.
SourceTables []TableInstance `toml:"source-tables"`
TargetTableInfo *model.TableInfo
// collation config in mysql/tidb
Collation string `toml:"collation"`
}
type Diff struct {
sourceDBs map[string]DBConfig
targetDB DBConfig
chunkSize int
sample int
checkThreadCount int
useChecksum bool
useCheckpoint bool
onlyUseChecksum bool
ignoreDataCheck bool
ignoreStructCheck bool
tables map[string]map[string]*TableConfig
fixSQLFile *os.File
report *Report
tidbInstanceID string
tableRouter *router.Table
cpDB *sql.DB
ctx context.Context
}
// Bound represents a bound for a column
type Bound struct {
Column string `json:"column"`
Lower string `json:"lower"`
Upper string `json:"upper"`
HasLower bool `json:"has-lower"`
HasUpper bool `json:"has-upper"`
}
// ChunkRange represents chunk range
type ChunkRange struct {
ID int `json:"id"`
Bounds []*Bound `json:"bounds"`
Where string `json:"where"`
Args []string `json:"args"`
State string `json:"state"`
columnOffset map[string]int
}
How is diff done
CheckTableData
loads the checkpointed chuncks data from the context. For the first time running, it willSplitChunks
(See below)- Spawns multiple channels to check chuncks -
checkChunkDataEqual
UseChecksum
to detect if we go for checksum mode or row-by-row mode- Checksum is computed by
dbutil.GetCRC32Checksum
bounded bychunk.Where
, which is a SQL formated by
fmt.Sprintf(
"SELECT BIT_XOR(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) AS checksum FROM %s WHERE %s;",
strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), TableName(schemaName, tableName), limitRange)
- An example checksum sql would be
SELECT BIT_XOR(CAST(CRC32(CONCAT_WS(',', id, name, age, CONCAT(ISNULL(id), ISNULL(name), ISNULL(age))))AS UNSIGNED)) AS checksum
FROM test.test WHERE id > 0 AND id < 10;
How are chunks split
- The
Range
config in the toml is passed togetChunksForTable
- Two ways of splitting - bucket spliting and random spliting. Default config goes by random splitting
Random splitting
cnt, err := dbutil.GetRowCount(context.Background(), table.Conn, table.Schema, table.Table, limits, nil)
chunkCnt := (int(cnt) + chunkSize - 1) / chunkSize
chunks, err := splitRangeByRandom(table.Conn, NewChunkRange(), chunkCnt, table.Schema, table.Table, columns, s.limits, s.collation)
- Randomly pick
chunkCnt
number of ids as the splitting ID
SELECT `id` FROM (
SELECT `id`, rand() rand_value FROM `test`.`test`
WHERE `id` COLLATE "latin1_bin" > 0 AND `id` COLLATE "latin1_bin" < 100
ORDER BY rand_value LIMIT 5) rand_tmp
ORDER BY `id` COLLATE "latin1_bin";
- and create new trunk based on these separate id values
Summary Info
CREATE TABLE IF NOT EXISTS `sync_diff_inspector`.`summary`(" +
"`schema` varchar(64), `table` varchar(64)," +
"`chunk_num` int not null default 0," +
"`check_success_num` int not null default 0," +
"`check_failed_num` int not null default 0," +
"`check_ignore_num` int not null default 0," +
"`state` enum('not_checked', 'checking', 'success', 'failed') DEFAULT 'not_checked'," +
"`config_hash` varchar(50)," +
"`update_time` datetime ON UPDATE CURRENT_TIMESTAMP," +
"PRIMARY KEY(`schema`, `table`));
- Updates every 10 secs
SELECT `state`, COUNT(*) FROM @checkpointSchemaName.@chunkTableName
WHERE `instance_id` = ? AND `schema` = ? AND `table` = ?
GROUP BY `state`;
UPDATE `@checkpointSchemaName`.`@summaryTableName`
SET `chunk_num` = ?, `check_success_num` = ?, `check_failed_num` = ?, `check_ignore_num` = ?, `state` = ?
WHERE `schema` = ? AND `table` = ?
Chunk Info
CREATE TABLE IF NOT EXISTS `sync_diff_inspector`.`chunk`(" +
"`chunk_id` int," +
"`instance_id` varchar(64)," +
"`schema` varchar(64)," +
"`table` varchar(64)," +
"`range` text," +
"`checksum` varchar(20)," +
"`chunk_str` text," +
"`state` enum('not_checked', 'checking', 'success', 'failed', 'ignore', 'error') DEFAULT 'not_checked'," +
"`update_time` datetime ON UPDATE CURRENT_TIMESTAMP," +
"PRIMARY KEY(`schema`, `table`, `instance_id`, `chunk_id`));
Atcoder notes
C - Candles
ans = dist between ends + min distance from either end
C - Strange Bank
- No need for memorization. Note in knapsack solution, we just need >= and A[0] instead of checking >
- Note that 6 coins and 9 coins they don’t intersect, so we just brute force on the 6s amount, and the remaining will be shared by 9s, followed by 1s
C - String Transformation
We can prove that the transformation retains the isomorphism if it starts with one. so we can just verify that final mapping is an isomorphism
C - K-th Substring
- Brute force won’t work because it will be s ** 2 substrings
- K no more than 5 means that the top K answer will not be longer than 5 anyway. Otherwise, we can reduce the length of the answer and improve it
- So we just sort all 25000 strings, each with length 5
C - Factors of Factorial
Calcuate the prime factorization of each number and then build the divisor by the PRODCUT of numbers
D - Rain Flows into Dams
Since it is odd, we can do the odd - even trick and sum up together
C - AB Substrings
Corner cases:
- no BA
-
no BX and AX
The general theme is that every time you see a counting formal with minus component, have a dicussion on the cases where the sum could be negative and when the positive item could be 0
C - String Transformation
Every swap will keep the isomorphism, and we can transform to any isomorphism by swapping. Therefore, we just need to ensure the final isomorphsim is good, because it is always reachable
C - Snuke Festival
Good for coding excercise
D - Flipping Signs
Simple problem, but similar to two segment insight
D - DivRem Number
N = q * m + q = q (m + 1), with q LT m. Just need to find all divisor pairs
D - Remainder Reminder
Similar to divisor finding, iterate on all possible divisor b >= max(2, K + 1) to generate a, check how many full b “bands” we have and calculate mod >= K in the last partial band. Note that mod band start at 1! so the partial bond contributes to N % b - K + 1 pairs
Atcoder notes
D - Harlequin
- One state is “some has odd”, it is counter is “all have even”, and it maps to win/lose conditions
C - Align
- It is obvious the solution will be zigzaged. Therefore, we can generate a formula for the final answer
- we can store and sort the coefficents, and apply it to the sorted factors - way cleaner than my classification solution
B - Simplified mahjong
- When A[i] != 0, it is obvious the anser will be floor(S/2) - by sorting and we will reach the upper bound
- When A[i] == 0, we know the problem can be decomposed into segments
- So we just sort the cards, and calcuate the size of 0-separated segments, and sum up the result applied by (1)
D - Coloring Dominoes
- first col needs classification
- I miscounted 2 * 2 + 2 * 2 case, should have 3 choices instead of 2
C - Base -2 Number
- Classify based on the mod 4, 00, 01, 10, 11, add 4 to the original number before shifting
B - ABC
- Replace BC with D and for a list of new strings
- The inverse cost can be calculated by scan/bubble sort, every time we find a need to swap, the cost is increased by the number of to-be-swapped items
D - Blue and Red Balls
- Star and band. Separating K into i empty buckets (could be empty) has choose(K + i - 1, K) ways, in the non empty case it becomes (K - 1, K - i). Note it is a direct application of formula
- My first guess was incorrect. It was too high, and didn’t consider combiniation case
D - Coloring Edges on Tree
The official approach is cleaner than mine
- We know that answer >= deg(v). In fact, max(deg(v)) is sufficient to give a solution
- We can just BFS, add keep assigning the color form 1 to K while skipping the color of the edge from grand parent to parent. Such construction ensures the conditions are satisfied
- Note that DFS is not feasible here because we lose the coloring information on siblings
E - Colorful Hats 2
The key insight is that given colors in 1…i, the color in i+1, can be uniquely determined by picking one color same as the predecessor. The mulitplication factor acts as the “pick”
Index 0 acts as the sentinel value
I got WA in formula - tried to classify 2 color and 3 color case. This implys at the each index, the pick is not unique even with the multiplier - classic anti pattern
D - Lucky PIN
The official brute force solution is cleaner than mine
for i in range(100):
ds = [i/100, (i/10)%10, i% 10]
find = 0
for i in range N:
if match:
find+=1
if find == 3:
ans+= 1
It also has a DP approach with 240M ops and 240M memory
C - HonestOrUnkind2
The official soultion is cleaner than mine. Mainly because even if I used bitmask, I still use a second array to track correctness, which is not needed, due to the bit mask
B. Counting of Trees
For reason unknown, i implemented count in a very verbose way…Just a simply dict and update is enough. Although, I missed the check where D[0] has to be 0 due to the condition in the problem
D. Disjoint Set of Common Divisors
The answer is the prime factorization of the GCD. Note that prime factorization can run in sqrt(N), and after sqrt(N), if the number is not 1, then we know that number is the only divisor left, and it is prime.
Proof: by contradiction
- suppose that remaining number is not prime, then we should have divied it in the previous step, where all divisors no more than sqrt(N) have been discoverd
- Because it is prime, we know it is the only possible divisor left, otherwise, it would be a product of two divisors > sqrt(n) => contradiction
- Note by this proof, during coding we don’t have to limit the upperbound to tight sqrt(n), as long as we exhause all divisors under an upperbound it still holds
A - 01 Matrix
A construction solution that I was not able to come up with. One lesson learned is that in matrix solutions, at least one of row or column is “nicely” cut into bands (as min number as possible), and then we twist the other dimension to fit the requirements
C - Strawberry Cakes
The insight is similar to above, cut the matrix into bands and then twist it in the other dimension. Otherwise, implementation will be messy
D - Powerful Discount Tickets
- I used the bsearch approach. One case to cover is when the remainng tickets are not enough to cover all tickets
- Official solution uses PQ, the key insight is that floor(X/2 ** Y) = floor(floor(X/2 ** (Y-1))/ 2)
D - Ki
- Python version gives TLE. C++ version is fine
- Note that when we traverse trees, after we visit the node, we init the children properties on the spot, i.e., every time we reach a node, all its parent/parent edge info has been known
A - Triangle
- The key insight that I missed is that suppose one vertex is at (0, 0), then the area of the triangle is abs(x2 * y3 - x3 * y2)/2
- Another key insight is to set another ONE coordinate to 1 and the other 10^9 to cover the max value case, so we convert to Euclidian divison formula:
a = bq + r = b(q+ 1) - (b - r)
However, I still don’t understand why I need an additional %b
to pass the last test case
D - Enough Array
- Idea 1: prefix sum with bsearch
- Idea 2: two pointers. Note that when you move the head pointer, the checked index should be i instead of i + 1
System design: service registry
Requirements
- Data consistency
- How long it takes to discover offline service?
- How many service instances can it support?
- How many machines you need to run this registry?
Eureka
- P2P, all provides to outside services, when it is registred on node, it will be gossiped to other nodes
- ReadOnly default sync time is 30 sec. Service refreshes on 30 sec interval
- Default: Every 60 sec to detect offline service. Heartbeat expires at 90 sec
My Design
How Spring resolves circular dependency between beans
- Object’s instantiation is done via reflection
ApplicataionContext.getBean()
, and after that object’s properties are done via object’s init - When object A needs another object B, we will recursively uses
ApplicataionContext.getBean()
to get A, and then inject into B. At this time, the objects are created, but their properties are not set. - Suppose A has B, and B has A as member, on init
- A insantiated first
- A finds that B is needed,
getBean()
on B- B is instantiated
- B finds that A is needed,
getBean()
on A. At this time, A’s reference is registered already, so Spring returns reference to A in the first step - B sets the member reference to A, finishes the init, and return
- A now gets the reference to B, set its member reference to B, and returns
Implementation
protected Object getSingleton(String beanName, boolean allowEarlyReference) {
Object singletonObject = this.singletonObjects.get(beanName);
if (singletonObject == null && isSingletonCurrentlyInCreation(beanName)) {
//first time ever to create
synchronized (this.singletonObjects) {
//singletonObjects is { bean_name : ObjectFactory} map
singletonObject = this.earlySingletonObjects.get(beanName);
if (singletonObject == null && allowEarlyReference) {
//bean is marked in creation, but not created, do it now
ObjectFactory<?> singletonFactory = this.singletonFactories.get(beanName);
if (singletonFactory != null) {
singletonObject = singletonFactory.getObject();
this.earlySingletonObjects.put(beanName, singletonObject);
this.singletonFactories.remove(beanName); //make sure we init only once
}
}
}
}
return (singletonObject != NULL_OBJECT ? singletonObject : null);
}
RocksDB in the context of TiKV
RocksDB concepts
- Column family: Each k-v belongs to only 1 CF. CFs share the same WAL (to support atomic writes), but different memtable and table files. Note classic CF mapping would be (row_key, a sorted list of (column, value)), but in RocksDB it is just (cf, key, value)
- Write buffer stores the memtable. Block cache is where RocksDB caches data in memory for reads
- Memtable flushes are by default scheduled on HIGH thread pool, while compactions on LOW thread pool. Stalling memtable flush can stall writes, increasing p99 latency. Set the thread pool with
max_background_jobs
Compaction
- Size-tiered compaction strategy: small SSTable to medium SSTable. trade read and space for write amplification. Compaction merges all sorted runs in one level to create a new sorted run in the next level.
- A common approach for tiered is to merge sorted runs of similar size, without having the notion of levels
- Level-based compaction strategy: Each level has at most 1 sorted run. Some data will go to the next level if current level is approaching limit. Trade read/write amplification for space amplification. In paper is all-to-all merge, but in RocksDB it is some-to-some
- Leveled compaction in RocksDB is also tiered+leveled. There can be N sorted runs at the memtable level courtesy of the max_write_buffer_number option– only one is active for writes, the rest are read-only waiting to be flushed. A memtable flush is similar to tiered compaction – the memtable output creates a new sorted run in L0 and doesn’t read/rewrite existing sorted runs in L0.
- Sub compaction speed up a compaction job by partitioning it among multiple threads.
Read & write amplification
- RA counts the number of disk reads to perform a query. It is defined separately for point query and range queries
- Flash-based storage can be written to only a finite number of times, so write amplification will decrease the flash lifetime
- LSM with level-based compaction has better write amplification than B-tree
How TiKV uses RocksDB
- All data in a TiKV node shares two RocksDB instances. One is for data, and the other is for Raft log.
- The default RocksDB instance stores KV data in the default, write and lock CFs
- raft logs are stored as region_id/log_id -> value
- data is stored as key + bit inverted ts -> value. TS is inverted, so that highest bit will be the first item
- Tikv uses prefix bloom filter, i.e., it predicts on the prefix instead of the whole key
- Use TableProperties in Split Check, MVCC GC, and compact region range
- When adding a replica to a new server, generates a SST snapshot and sends to the new server directly
- Disk space will be released only when tombstones have been compacted