Common problems for hugh data processing
Find common URLs
Given 2 files, each with 5B URL, each URL has 64B. 4G memory size
- Memory can store 4 * 10^9 / 64 = 60 M records in total.
- Each big file has to be loaded about 80 times into memory to read it
- So calculation the hash of each URL mod 200, and output each to 200 files
- Load two files with same hash into the same memory, one into a hashset. The other will be doing looking up
- This step can be parallelized
- Pipe the results into the final file
Top 100 popular words
1G file with words, each word < 16B. Find the most popular 100 words with 1MB memory
- Memory fits at least 50k words. File contains at least 100M words
- Read the file sequentially and hash the words into 2k bucket files, about 0.5 MB per file. Note each file can’t be too huge to defend against the case of all distinct words
- Read each file to generate the word count for each bucket
- After it is generated, compare and update with current result in memory
- if the distict words can fit into memory, we can also use a trie or suffix array to save space
Find all distinct numbers in a file
3B numbers in the file. Unable to fit into the memory
- Idea 1: hash into bucket files and dedup for each file. Again, each file size < memory to defend against all distinct case
- Idea 2: bitmap will consume 3G ram. Note bloomfilter may still be too huge because each will consum about 10 bits
- Similar idea can be used to find if a number exists in the file
Top K in a list of N sorted arrays with same size
- Maintain a size N heap, and index for each array
- Populate first element into the heap, with record of which array it is from
- Get the min of this heap, and get the next item from the same array
- Stop when we have done this K times
Sort queries by frequency
10 files with 1G size
- If duplicaiton is high, just use Hashmap in memory to store the count, and then sort over the keys
- If duplication is low, use bucket hash, and then merge sort
Find median of 5B numbers
- Idea 1: If can fit into memory, use two heaps, one greater than current median, and one less than current median
- Invariant: size diff of two heaps < 1
- Idea 2: Parition based on prefix. Based on size of each file we know count and precisely which file the median resides and in what order, and then we do a precise mapping
System design: Locate the delivery kiosk for user's order
Note capacity estimation, storage design, and computation design will keep triggering update of each other
Requirements
- Each kiosk has multiple delivery boxes, of different dimensions
- When user places an order, the system should return up to 3 possible kiosks that can accept user’s delivery
- Handle the NA traffic
Capacity estimation
- 500 mil NA user, assume each place 1 order at black friday, so peak traffic at 500 mil / 90 k sec per day * 4 = 25 K TPS max
- Each kiosk has 100 boxes, so it can support 1k population, assumption on normal day 10% people buys. So 500K kiosk is more than enough
- Each kiosk covers a 0.5 mile radius with little overlap
Storage layer
- Kiosk needs to keep track of:
- geohash - needs index to range search
- number of boxes
- Box needs to keep track of
- dimension (H, L, W)
- parent kiosk
- Status needs to keep track of
- Box to the packages ids, one to many
- parent kiosk if they are in separate service
- Since kiosk info is rarely updated, kiosk and box info can be in read slaves or cache to scale out read.
Computation sequence
- Fetch the nearest 10 kiosk from on of the DB’s read slave by range searching on geohash
- Fetch the empty boxes (< 1k total) in those kiosks, and compute in memory possible empty ones, i.e., WLH > the package’s WLH
- Return to the user kiosks with empty boxes
Productivity tips and tricks
- Estimate how much time you can allocate, and set aside continous time block for it. Each block should be at least one hour
- Structured procastination by working on things important but not urgent
- Checking phone every few minutes is a sign of fatigue, take a break
- Stop when you feel good and know where the direction is going and come back later
- Going through email or discuss on a forum is ineffective by nature. Pre-plan time slot for such activities, ideally at fixed time, and don’t worry about it later
- Don’t think about money or dispute if possible. They are naturally attention sinks
- Don’t think about damage other people have done to you. They don’t deserve space in your mind
How InnoDB handles deletion
- All user data in InnoDB tables is stored in pages comprising a B-tree index (the clustered index). In some other database systems, this type of index is called an “index-organized table”. Each row in the index node contains the values of the (user-specified or system-generated) primary key and all the other columns of the table.
- Updates to rows usually rewrite the data within the same page
- by default,
innodb_file_per_table
is on. This means each table will have anibd
file- each ibd file has multiple segments, each of which is associated with an index
- each segment consists of multiple 1MB extent
- Each extend has multiple 16KB pages
- On delete, the space of the deleted record is marked reusable, if it reaches MERGE_THRESHOLD (default 50%), innodb will try merging it with neighboring pages, and leave the original page blank.
- Check
index_page_merge_successful
inINFORMATION_SCHEMA.INNODB_METRICS
- Check
Optimize table
- If you do sequential deletes instead of random deletes, most likely you don’t need to run
optimize table
, because- Newly inserted rows will use the space marked for reuse.
- innodb automatically merge empty index pages
- Purpose of
optimize table
is to - reduce the
data_free
value ininformation_schema.tables
. - defrag index pages.
data_free
marks the reusable space, and is not accrurate if the table has variable-length column > 768 bytes,e.g., varchar, text- Innodb implements
opitmize table
byalter table force
to use the temp table. This also means additional spaces needed during the operation
On forward and backward compatibility
Suppose A depends on B to function, i.e., A -> B
- Forward compatibility means that if a new version of B is deployed, current A should still be able to handle it
- This means we can upgrade B without breaking A
- If not, then we have to upgrade A so that it can process both current B and next version of A first
- Note this is what people normally mean when they talk about backward compatibility, which is not exactly same as the definition. The real definition is that a newer version of A still accepts B
How pt-archiver works
Most likely we need bulk insertion and deletion mode. Otherwise, single row insertion mode can barely break 2k rows per sec, i.e., max 200M per day
$bulkins_file = File::Temp->new( SUFFIX => 'pt-archiver' )
or die "Cannot open temp file: $OS_ERROR\n";
while ( # Quit if:
$row # There is no data
&& $retries >= 0 # or retries are exceeded
&& (!$o->get('run-time') || $now < $end) # or time is exceeded
&& !-f $sentinel # or the sentinel is set
&& $oktorun # or instructed to quit
)
{
my $lastrow = $row;
my $escaped_row;
$escaped_row = escape([@{$row}[@sel_slice]], $fields_separated_by, $optionally_enclosed_by);
print $bulkins_file $escaped_row, "\n"
or die "Cannot write to bulk file: $OS_ERROR\n";
# Possibly flush the file and commit the insert and delete.
commit($o) unless $commit_each;
# Get the next row in this chunk.
# First time through this loop $get_sth is set to $get_first.
# For non-bulk operations this means that rows ($row) are archived
# one-by-one in in the code block above ("row is archivable"). For
# bulk operations, the 2nd to 2nd-to-last rows are ignored and
# only the first row ($first_row) and the last row ($last_row) of
# this chunk are used to do bulk INSERT or DELETE on the range of
# rows between first and last. After the bulk ops, $first_row and
# $last_row are reset to the next chunk.
# $last_row are reset to the next chunk.
if ( $get_sth->{Active} ) { # Fetch until exhausted
$row = $get_sth->fetchrow_arrayref();
}
if ( !$row ) {
$bulkins_file->close()
or die "Cannot close bulk insert file: $OS_ERROR\n";
my $ins_sth; # Let plugin change which sth is used for the INSERT.
$ins_sth ||= $ins_row; # Default to the sth decided before.
my $success = do_with_retries($o, 'bulk_inserting', sub {
$ins_sth->execute($bulkins_file->filename());
$src->{dbh}->do("SELECT 'pt-archiver keepalive'") if $src;
PTDEBUG && _d('Bulk inserted', $del_row->rows, 'rows');
$statistics{INSERT} += $ins_sth->rows;
});
#Notice no checksum is performed, the correctness is ensured by deletion after the insertion
my $success = do_with_retries($o, 'bulk_deleting', sub {
$del_row->execute(
@{$first_row}[@bulkdel_slice],
@{$lastrow}[@bulkdel_slice],
);
PTDEBUG && _d('Bulk deleted', $del_row->rows, 'rows');
$statistics{DELETE} += $del_row->rows;
});
commit($o, 1) if $commit_each;
$get_sth = $get_next;
PTDEBUG && _d('Fetching rows in next chunk');
trace('select', sub {
my $select_start = time;
$get_sth->execute(@{$lastrow}[@asc_slice]);
$last_select_time = time - $select_start;
PTDEBUG && _d('Fetched', $get_sth->rows, 'rows');
$statistics{SELECT} += $get_sth->rows;
});
@beginning_of_txn = @{$lastrow}[@asc_slice] unless $txn_cnt;
$row = $get_sth->fetchrow_arrayref();
$first_row = $row ? [ @$row ] : undef;
$bulkins_file = File::Temp->new( SUFFIX => 'pt-archiver' )
or die "Cannot open temp file: $OS_ERROR\n";
}
}
How is character escaped
# Formats a row the same way SELECT INTO OUTFILE does by default. This is
# described in the LOAD DATA INFILE section of the MySQL manual,
# http://dev.mysql.com/doc/refman/5.0/en/load-data.html
sub escape {
my ($row, $fields_separated_by, $optionally_enclosed_by) = @_;
$fields_separated_by ||= "\t";
$optionally_enclosed_by ||= '';
# Note that we don't try escaping separator here. So we may have problem escaping if the field contains the separator itself
return join($fields_separated_by, map {
s/([\t\n\\])/\\$1/g if defined $_; # Escape tabs etc
my $s = defined $_ ? $_ : '\N'; # NULL = \N
# var & ~var will return 0 only for numbers
if ($s !~ /^[0-9,.E]+$/ && $optionally_enclosed_by eq '"') {
$s =~ s/([^\\])"/$1\\"/g;
$s = $optionally_enclosed_by."$s".$optionally_enclosed_by;
}
# $_ =~ s/([^\\])"/$1\\"/g if ($_ !~ /^[0-9,.E]+$/ && $optionally_enclosed_by eq '"');
# $_ = $optionally_enclosed_by && ($_ & ~$_) ? $optionally_enclosed_by."$_".$optionally_enclosed_by : $_;
chomp $s;
$s;
} @$row);
}
On Mysql's async replication for production
How async replication works in mysql
- Async replication is the default behavior. Semi-sync replication is introduced in 5.7
- Master writes changes to a local binlog file. Note SELECT or SHOW will not be written to the binlog since they don’t affect slave state
- Note that this binlog is different from innodb’s undo and redo log. It is main purpose is for replication and recovery after restoring from a backup
- On AWS RDS, binlog is not turned on by default and can be configured to remain alive for max 7 days
- This additional binlog write introduces additional performance penality. On Aurora, we observe at least 1/3 reduction in through if we turn on binlog
- The binlog file has a buffer controlled by
binlog_cache_size
. Temp file will be open buffer is not enough - Since it is just a file, it is subject to failures during flush/fsych experienced by normal files. The default behavior is that server will shut down upon detecting such issues
- Slave pulls from the master. Master does NOT try pushing to the slave
- By default, slave does not write it is own binlog
- Any detected corruption/data loss in the binlog means that replication should be restarted from scractch again. Current slave’s data should be discarded
Complications from production setup
On production, mysql is normally on active-standby with semisync or sync replicaiton between them. This introduces additonal complications on async replicaiton. We call current active X, standby Y, the X-Y cluster source, and the async replica sink
From the source pov:
- To ensure async replication remains working after Y got promoted to master after X failed for some reason, at minimum GTID mode has to be turned, but this caused write stalls on master db, especially Aurora
- Note that even with GTID mode on, we can not ensure no loss of binlog data which has been persisted on X, although chance is greatly reduced
From the sink pov:
- Suppose failover between X and Y happened, the pulling connection to X may still be active and alive, while the new active is Y. This means when the sink pulls binlog from a connection pool (most likely in the network applications), it may be pulling from both X and Y , i.e., a split brain problem.
- The root cause is that during failover, by CAP, the async replicaiton should be killed and not available to avoid the split brain, but the implmentation does not obey this
- In practice, split brain problem is not likely to cause problem by turning on
log_slave_updates
, but split brain is a critical situation that should be avoided conceptually. - When such split brain replication happens, either fix it through domain knowledge, or discard the current sink and restart the replication from the scratch
- This means async replication is good for the case where some slacks are allowed in data consistencies and availabilities, e.g.,
- Cross region failover instances, where we accept the loss/corruption of couple rows so that we can recover in the case of main region data loss
- read slave for OLAP workload/pipelines. So we have a mostly up-to-date data source with little additional workload on the master
- Similarly, async replication is not a good fit for mission critical systems - they will have to be on master or a sync-replicated replica
How uid genenerator generates snowflake id
Default uid generator
private long getCurrentSecond() {
long currentSecond = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
if (currentSecond - epochSeconds > bitsAllocator.getMaxDeltaSeconds()) {
throw new UidGenerateException("Timestamp bits is exhausted. Refusing UID generate. Now: " + currentSecond);
}
return currentSecond;
}
protected synchronized long nextId() {
long currentSecond = getCurrentSecond();
// Clock moved backwards, refuse to generate uid
if (currentSecond < lastSecond) {
long refusedSeconds = lastSecond - currentSecond;
throw new UidGenerateException("Clock moved backwards. Refusing for %d seconds", refusedSeconds);
}
// At the same second, increase sequence
if (currentSecond == lastSecond) {
sequence = (sequence + 1) & bitsAllocator.getMaxSequence();
// Exceed the max sequence, we wait the next second to generate uid
if (sequence == 0) {
currentSecond = getNextSecond(lastSecond); //this one just self-spins
}
// At the different second, sequence restart from zero
} else {
sequence = 0L;
}
lastSecond = currentSecond;
// Allocate bits for UID
return bitsAllocator.allocate(currentSecond - epochSeconds, workerId, sequence);
}
CachedUidGenerator
Data structure
public class BufferPaddingExecutor {
/** Whether buffer padding is running */
private final AtomicBoolean running;
/** We can borrow UIDs from the future, here store the last second we have consumed */
private final PaddedAtomicLong lastSecond; // this is inited with system timestamp, after that it becomes the logical second
/** RingBuffer & BufferUidProvider */
private final RingBuffer ringBuffer;
private final BufferedUidProvider uidProvider;
/** Padding immediately by the thread pool */
private final ExecutorService bufferPadExecutors;
/** Padding schedule thread */
private final ScheduledExecutorService bufferPadSchedule;
}
Init CachedUidGenerator
private void initRingBuffer() {
// initialize RingBufferPaddingExecutor
boolean usingSchedule = (scheduleInterval != null);
this.bufferPaddingExecutor = new BufferPaddingExecutor(ringBuffer, this::nextIdsForOneSecond, usingSchedule);
if (usingSchedule) {
bufferPaddingExecutor.setScheduleInterval(scheduleInterval);
}
}
this.lastSecond = new PaddedAtomicLong(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()));
Actual population logic
public void paddingBuffer() {
boolean isFullRingBuffer = false;
while (!isFullRingBuffer) {
List<Long> uidList = uidProvider.provide(lastSecond.incrementAndGet()); //the only place lastSecond is mutated other than init
for (Long uid : uidList) {
isFullRingBuffer = !ringBuffer.put(uid);
if (isFullRingBuffer) {
break;
}
}
}
}
Query timeout for Mysql jdbc
Query timeout implementation in mysql jdbc
mysql-connector-j version 5.1.48
Data structures
/**
* A Statement object is used for executing a static SQL statement and obtaining
* the results produced by it.
*
* Only one ResultSet per Statement can be open at any point in time. Therefore, if the reading of one ResultSet is interleaved with the reading of another,
* each must have been generated by different Statements. All statement execute methods implicitly close a statement's current ResultSet if an open one exists.
*/
public class StatementImpl implements Statement {
/**
* Thread used to implement query timeouts...Eventually we could be more
* efficient and have one thread with timers, but this is a straightforward
* and simple way to implement a feature that isn't used all that often. (?!!!)
*/
/** The physical connection used to effectively execute the statement */
protected Reference<MySQLConnection> physicalConnection = null;
class CancelTask extends TimerTask {
SQLException caughtWhileCancelling = null;
StatementImpl toCancel;
Properties origConnProps = null;
String origConnURL = "";
}
}
private BooleanConnectionProperty queryTimeoutKillsConnection = new BooleanConnectionProperty("queryTimeoutKillsConnection", false,
Messages.getString("ConnectionProperties.queryTimeoutKillsConnection"), "5.1.9", MISC_CATEGORY, Integer.MIN_VALUE);
What does CancelTask do
Inside TimerTask.run(), it starts a new thread. Inside Thread.run()
Only happy path with default value here. Comment inline
Connection cancelConn = null;
java.sql.Statement cancelStmt = null;
try {
MySQLConnection physicalConn = StatementImpl.this.physicalConnection.get();
synchronized (StatementImpl.this.cancelTimeoutMutex) {
if (CancelTask.this.origConnURL.equals(physicalConn.getURL())) {
// All's fine
cancelConn = physicalConn.duplicate(); //this one will create a new connection
cancelStmt = cancelConn.createStatement();
cancelStmt.execute("KILL QUERY " + physicalConn.getId());
} else {
try {
cancelConn = (Connection) DriverManager.getConnection(CancelTask.this.origConnURL, CancelTask.this.origConnProps);
cancelStmt = cancelConn.createStatement();
cancelStmt.execute("KILL QUERY " + CancelTask.this.origConnId);
} catch (NullPointerException npe) {
// Log this? "Failed to connect to " + origConnURL + " and KILL query"
}
}
CancelTask.this.toCancel.wasCancelled = true;
CancelTask.this.toCancel.wasCancelledByTimeout = true;
}
} catch (SQLException sqlEx) {
CancelTask.this.caughtWhileCancelling = sqlEx;
} catch (NullPointerException npe) {
// Case when connection closed while starting to cancel.
// We can't easily synchronize this, because then one thread can't cancel() a running query.
// Ignore, we shouldn't re-throw this, because the connection's already closed, so the statement has been timed out.
} finally {
if (cancelStmt != null) {
try {
cancelStmt.close();
} catch (SQLException sqlEx) {
throw new RuntimeException(sqlEx.toString());
}
}
if (cancelConn != null) {
try {
cancelConn.close();
} catch (SQLException sqlEx) {
throw new RuntimeException(sqlEx.toString());
}
}
CancelTask.this.toCancel = null;
CancelTask.this.origConnProps = null;
CancelTask.this.origConnURL = null;
}
How is CancelTask used
public java.sql.ResultSet executeQuery(String sql) throws SQLException {
CancelTask timeoutTask = null;
if (locallyScopedConn.getEnableQueryTimeouts() && this.timeoutInMillis != 0 && locallyScopedConn.versionMeetsMinimum(5, 0, 0)) {
timeoutTask = new CancelTask(this);
locallyScopedConn.getCancelTimer().schedule(timeoutTask, this.timeoutInMillis);
}
this.results = locallyScopedConn.execSQL(this, sql, this.maxRows, null, this.resultSetType, this.resultSetConcurrency,
createStreamingResultSet(), this.currentCatalog, cachedFields);
if (timeoutTask != null) {
if (timeoutTask.caughtWhileCancelling != null) {
throw timeoutTask.caughtWhileCancelling;
}
timeoutTask.cancel();
locallyScopedConn.getCancelTimer().purge();
timeoutTask = null;
}
synchronized (this.cancelTimeoutMutex) {
if (this.wasCancelled) {
SQLException cause = null;
if (this.wasCancelledByTimeout) {
cause = new MySQLTimeoutException();
} else {
cause = new MySQLStatementCancelledException();
}
resetCancelledState();
throw cause;
}
}
//inside finally
if (timeoutTask != null) {
timeoutTask.cancel();
locallyScopedConn.getCancelTimer().purge();
}
}
Implication of this implementation
- By default, if the query times out, the connection will not be closed
- Sending the
KILL
query is a best effort attempt. The more reliable way is to use mysql’smax_execution_time
hint. Socket timeout can only mitigate but not solve defending against the slow query problem
Reading Notes: Effective Executive
Chapter 1 Effectiveness Can Be Learned
- Manual work is about doing things right. Knowledge work is about doing the right thing
- Knowledge worker (KWer) can not be supervised in detail. He has to direct himself
- KWer must make deicsions and live by its result. He should be the best person to make the call
- KW is defined by its result, NOT costs or quantity
- KW cann’t let events direct his action for too long - he will be operating
- Insight the org it is all about effort and cost - even for profit center
- One tends to be drawn into the challenges inside the org, instead of events to the outside.
- By the time revelent outside events becomes measureable it is too late, because being able to measure the event means one has to build concept first
- Change in trends is more important than trend itself, but they can only be preceived rather than counted.
- Don’t need to know the detail of your collaborationer’s work, but need to know why it is there and what is happening
- Ask the expected result instead of work to be done
- Effective decision is based on dissenting opinions rather than concensus on the facts
Chapter 2 Know Thy Time
- Plan with time rather than work. Consolidate them into the larget possible chunks
- Need to spend much time with people to get idea across
- Fast personnel decision is likely to be wrong decisions. Make them several times before committing - need hours of continous and uninterrrupted thoughts
- Cutting important things by mistake is easy to correct
- 90 mins continous work + 30 mins interruptive work
- Schedule morning work period at home. This is better than after dinner because people will be too tired to do a good job
Chapter 3 What Can I Contribute?
- Contribution is importatnt in all 3 areas: direct result, building of the people, building of the value
- Justify the payroll based on contribution rather than effort
- Doing a job well means he is specialized, which also means retooling when moving to a new position
Chapter 4 Making Strength Productive
- Fill the position by strength rather than weakness. If weakness is blocking the strength, fix it by work or career oppurtunities
- Build job by task rather than personality,i.e., don’t create job for a person
- Indispensible man means, at one of
- he is incompetent to he shields himself from demands
- the superior is weak
- his strength is misused to delay tackling a serious problem, if not hiding it
- Rapidly promoted superior easier to lead success. A un-performing superior only means new one will be brought from outside
- Help boss overcome limitation and use his strength. Present with area related to his strength first, to help him understand the situation
- Listening or reading - people have different preference in receiving info
- Easier to raise the performance of 1 leader instead of the mass. He should be put into performance-making, standard-setting position
- Claiming the lack of authority/access is often a cover-up for inertia
Chapter 5 First Things First
- Double buffering of work could work on condition each is given a min amount of continous time
- Yesterday’s success always lingers longer than the productive life
- Pressure driven prioritization means focusing on yesterday instead of tomorrow, inside rather than outside, i.e., urgent tasks pushs back important ones
- Reprioritization at the end of each task
Chapter 6 The Elements of Decision-making
- Focus on what is “right”, i.e., satisfies specs, before worrying about compromises and concessions. Stakeholders know how to make compromise better than you, but they need your help figuring out what is right
- Incomplete explanation is often more dangerous than the totally wrong explanation
- an executive who makes many decisions is both lazy and ineffectual, because he failed to see the truly generic problems
Chapter 7 Effective Decisions
- Most likely a choice between two courses neither is more provably nearly right than the other
- Fact requires criteria for relevance
- Effective decisions grow out of clash of divergent opinions and competing alternatives. In fact, one does not make a decision unless there is disagreement
- whoever voices the opinion should be responsible for the factual findings
- Find the measurment is more about risk-taking judgement, since it is a judgement, alternatives measurements must be considered too
- Measurements sound on paper is still meaningless if you can’t convert to step-to-step guide for a computer
- Act if on balance the benefits greatly outweigh cost and risk; and Act or do not act; but do not “hedge” or compromise.
What triggers the txnLockFast failure?
Source version 2.1
What is txnLockFast error
This is triggered when read txn with read_start_ts happens between the write txn’s write_start_ts and write_commit_ts. Upon seeing this, the read txn will backoff retry later witht the same read_start_ts, so that we preserve snapshot isolation. The effect of txnLockFast is similar to pessimistic lock but with higher latency
tidb_tikvclient_backoff_seconds_count
Triggered by expression increase( tidb_tikvclient_backoff_seconds_count[10m] ) > 10
tidb_tikvclient_backoff_seconds_count
is a range vector which contains counters. This expression means it happensin more than 10 of the seconds in the last 10 minutes
Data structures
Retriever is the interface wraps the basic Get and Seek methods. Retriever is implemented by BufferStore -> unionStore -> tikvSnapshot -> tikvTxn
Transaction defines the interface for operations inside a Transaction. Transaction
is implemented by tikvTxn -> TxnState
// Scanner support tikv scan
type Scanner struct {
snapshot *tikvSnapshot
batchSize int
valid bool
cache []*pb.KvPair
idx int
nextStartKey []byte
endKey []byte
eof bool
// Use for reverse scan.
reverse bool
nextEndKey []byte
}
What triggers txnLockFast?
- Commit check
TxnState
also implementsBatchGet
but seems not used.tikvTxn
also implmentsGet
but not used except for testing
//part of Retriever interface
func (us *unionStore) Get(k Key) ([]byte, error) {
us.markLazyConditionPair(k, nil, e.(error))
}
//part of Transaction interface
func (txn *tikvTxn) Commit(ctx context.Context) error {
txn.us.CheckLazyConditionPairs()
}
func (us *unionStore) CheckLazyConditionPairs() error {
us.snapshot.BatchGet(keys)//this may trigger txnLockFast
}
- Scan keys in tikv
func (s *Scanner) Next() error {
s.resolveCurrentLock(bo, current)
}
func (s *Scanner) resolveCurrentLock(bo *Backoffer, current *pb.KvPair) error {
s.snapshot.get(bo, kv.Key(current.Key))
}
- Coprocesser
// handleCopResponse checks coprocessor Response for region split and lock,
// returns more tasks when that happens, or handles the response if no error.
// if we're handling streaming coprocessor response, lastRange is the range of last
// successful response, otherwise it's nil.
func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, resp *copResponse, task *copTask, ch chan<- *copResponse, lastRange *coprocessor.KeyRange) ([]*copTask, error) {
logutil.Logger(context.Background()).Debug("coprocessor encounters",
zap.Stringer("lock", lockErr))
ok, err1 := worker.store.lockResolver.ResolveLocks(bo, []*Lock{NewLock(lockErr)})
if err1 != nil {
return nil, errors.Trace(err1)
}
if !ok {
if err := bo.Backoff(boTxnLockFast, errors.New(lockErr.String())); err != nil {
return nil, errors.Trace(err)
}
}
}
How tidb implments 2PC
Source version 2.1
Data structures
// tikvSnapshot implements the kv.Snapshot interface.
type tikvSnapshot struct {
store *tikvStore
version kv.Version
priority pb.CommandPri
notFillCache bool
syncLog bool
keyOnly bool
vars *kv.Variables
}
//store/tikv/txn.go, see to represent txn actions on the server(tikv) side
// tikvTxn implements kv.Transaction.
type tikvTxn struct {
snapshot *tikvSnapshot
us kv.UnionStore
store *tikvStore // for connection to region.
startTS uint64
startTime time.Time // Monotonic timestamp for recording txn time consuming.
commitTS uint64
valid bool
lockKeys [][]byte
mu sync.Mutex // For thread-safe LockKeys function.
dirty bool
setCnt int64
vars *kv.Variables
}
//session/txn.go, seems to represent txn on the client(tidb) side
// TxnState wraps kv.Transaction to provide a new kv.Transaction.
// 1. It holds all statement related modification in the buffer before flush to the txn,
// so if execute statement meets error, the txn won't be made dirty.
// 2. It's a lazy transaction, that means it's a txnFuture before StartTS() is really need.
type TxnState struct {
// States of a TxnState should be one of the followings:
// Invalid: kv.Transaction == nil && txnFuture == nil
// Pending: kv.Transaction == nil && txnFuture != nil
// Valid: kv.Transaction != nil && txnFuture == nil
kv.Transaction //embeded type, means TxnState now implements all interface of Transaction
txnFuture *txnFuture
buf kv.MemBuffer
mutations map[int64]*binlog.TableMutation
dirtyTableOP []dirtyTableOperation
// If doNotCommit is not nil, Commit() will not commit the transaction.
// doNotCommit flag may be set when StmtCommit fail.
doNotCommit error
}
type session struct {
txn TxnState
mu struct {
sync.RWMutex
values map[fmt.Stringer]interface{}
}
store kv.Storage
sessionVars *variable.SessionVars
sessionManager util.SessionManager
}
// twoPhaseCommitter executes a two-phase commit protocol.
type twoPhaseCommitter struct {
store *tikvStore //it is the same store in txn.store
txn *tikvTxn
startTS uint64
keys [][]byte
mutations map[string]*pb.Mutation
lockTTL uint64
commitTS uint64
mu struct {
sync.RWMutex
committed bool
undeterminedErr error // undeterminedErr saves the rpc error we encounter when commit primary key.
}
priority pb.CommandPri
syncLog bool
connID uint64 // connID is used for log.
cleanWg sync.WaitGroup
detail *execdetails.CommitDetails
// The max time a Txn may use (in ms) from its startTS to commitTS.
// We use it to guarantee GC worker will not influence any active txn. The value
// should be less than GC life time.
maxTxnTimeUse uint64
}
When a session commits a single table, DML-only transaction
- Session.doCommit
- TxnState.Commit
- TxnState.Transaction.Commit(ctx). which actally calls
- TxnState.Commit
func (txn *tikvTxn) Commit(ctx context.Context) error {
defer txn.close()
var connID uint64
val := ctx.Value(sessionctx.ConnID)
if val != nil {
connID = val.(uint64)
}
committer, err := newTwoPhaseCommitter(txn, connID)
// latches disabled
err = committer.executeAndWriteFinishBinlog(ctx)
}
- newTwoPhaseCommitter
- iterate though tikvTxn.UnionStore to collect keys to Put and Del into
mutations
collection - add txn.lockKeys to mutations with
Op_lock
type
- iterate though tikvTxn.UnionStore to collect keys to Put and Del into
- executeAndWriteFinishBinlog
err := c.execute(ctx)
if err != nil {
c.writeFinishBinlog(binlog.BinlogType_Rollback, 0)
} else {
c.txn.commitTS = c.commitTS
c.writeFinishBinlog(binlog.BinlogType_Commit, int64(c.commitTS))
}
//Only happen path, and no binlog
// execute executes the two-phase commit protocol.
func (c *twoPhaseCommitter) execute(ctx context.Context) error {
prewriteBo := NewBackoffer(ctx, prewriteMaxBackoff).WithVars(c.txn.vars)
err := c.prewriteKeys(prewriteBo, c.keys)
commitTS, err := c.store.getTimestampWithRetry(NewBackoffer(ctx, tsoMaxBackoff).WithVars(c.txn.vars))
c.commitTS = commitTS
commitBo := NewBackoffer(ctx, CommitMaxBackoff).WithVars(c.txn.vars)
err = c.commitKeys(commitBo, c.keys)
return nil
}
- Prewrite, commit, and clean up (in the failure case) are just different cases of
doActionOnKeys
twoPhaseCommitter.doActionOnKeys
//happy path only
func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) error {
mutations := make([]*pb.Mutation, len(batch.keys))
for i, k := range batch.keys {
mutations[i] = c.mutations[string(k)]
}
req := &tikvrpc.Request{
Type: tikvrpc.CmdPrewrite,
Prewrite: &pb.PrewriteRequest{
Mutations: mutations,
PrimaryLock: c.primary(),
StartVersion: c.startTS,
LockTtl: c.lockTTL,
TxnSize: uint64(len(batch.keys)),
},
Context: pb.Context{
Priority: c.priority,
SyncLog: c.syncLog,
},
}
resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort)
}
func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) error {
req := &tikvrpc.Request{
Type: tikvrpc.CmdCommit,
Commit: &pb.CommitRequest{
StartVersion: c.startTS,
Keys: batch.keys,
CommitVersion: c.commitTS,
},
Context: pb.Context{
Priority: c.priority,
SyncLog: c.syncLog,
},
}
req.Context.Priority = c.priority
sender := NewRegionRequestSender(c.store.regionCache, c.store.client)
resp, err := sender.SendReq(bo, req, batch.region, readTimeoutShort)
}
func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitAction, keys [][]byte) error {
groups, firstRegion, err := c.store.regionCache.GroupKeysByRegion(bo, keys)
var batches []batchKeys
var sizeFunc = c.keySize
if action == actionPrewrite {
sizeFunc = c.keyValueSize
}
// Make sure the group that contains primary lock key goes first.
batches = appendBatchBySize(batches, firstRegion, groups[firstRegion], sizeFunc, txnCommitBatchSize)
delete(groups, firstRegion)
for id, g := range groups { //other regions we don't care the order
batches = appendBatchBySize(batches, id, g, sizeFunc, txnCommitBatchSize)
}
firstIsPrimary := bytes.Equal(keys[0], c.primary())
if firstIsPrimary && (action == actionCommit || action == actionCleanup) {
// primary should be committed/cleanup first
err = c.doActionOnBatches(bo, action, batches[:1])
batches = batches[1:]
}
if action == actionCommit {
// Commit secondary batches in background goroutine to reduce latency.
// The backoffer instance is created outside of the goroutine to avoid
// potencial data race in unit test since `CommitMaxBackoff` will be updated
// by test suites.
secondaryBo := NewBackoffer(context.Background(), CommitMaxBackoff)
go func() {
e := c.doActionOnBatches(secondaryBo, action, batches)
}()
} else {
err = c.doActionOnBatches(bo, action, batches)
}
return errors.Trace(err)
}
How tidb retries request to tikv
Source code version v2.1.18
Data structures
type Backoffer struct {
ctx context.Context
fn map[backoffType]func(context.Context) int
maxSleep int
totalSleep int
errors []error
types []backoffType
vars *kv.Variables
}
- Sleep setting for each type retry
- Time in MS
EqualJitter
means half of the duration is randomized
NewBackoffFn(base, cap, jitter int) func(ctx context.Context) int
case boTiKVRPC:
return NewBackoffFn(100, 2000, EqualJitter)
case BoTxnLock:
return NewBackoffFn(200, 3000, EqualJitter)
case boTxnLockFast:
return NewBackoffFn(vars.BackoffLockFast, 3000, EqualJitter)
case boPDRPC:
return NewBackoffFn(500, 3000, EqualJitter)
case BoRegionMiss:
return NewBackoffFn(100, 500, NoJitter)
case BoUpdateLeader:
return NewBackoffFn(1, 10, NoJitter)
case boServerBusy:
return NewBackoffFn(2000, 10000, EqualJitter)
Default max total sleep time in each case
// Maximum total sleep time(in ms) for kv/cop commands.
const (
copBuildTaskMaxBackoff = 5000
tsoMaxBackoff = 15000
scannerNextMaxBackoff = 20000
batchGetMaxBackoff = 20000
copNextMaxBackoff = 20000
getMaxBackoff = 20000
prewriteMaxBackoff = 20000
cleanupMaxBackoff = 20000
GcOneRegionMaxBackoff = 20000
GcResolveLockMaxBackoff = 100000
deleteRangeOneRegionMaxBackoff = 100000
rawkvMaxBackoff = 20000
splitRegionBackoff = 20000
scatterRegionBackoff = 20000
waitScatterRegionFinishBackoff = 120000
)
When do we stop trying
If the backoff attempts have slept more that configured duration, we will return a MySql error that singals that do not retry anymore
realSleep := f(b.ctx)
backoffDuration.Observe(float64(realSleep) / 1000)
b.totalSleep += realSleep
b.types = append(b.types, typ)
var startTs interface{} = ""
if ts := b.ctx.Value(txnStartKey); ts != nil {
startTs = ts
}
logutil.Logger(context.Background()).Debug("retry later",
zap.Error(err),
zap.Int("totalSleep", b.totalSleep),
zap.Int("maxSleep", b.maxSleep),
zap.Stringer("type", typ),
zap.Reflect("txnStartTS", startTs))
b.errors = append(b.errors, errors.Errorf("%s at %s", err.Error(), time.Now().Format(time.RFC3339Nano)))
if b.maxSleep > 0 && b.totalSleep >= b.maxSleep {
errMsg := fmt.Sprintf("backoffer.maxSleep %dms is exceeded, errors:", b.maxSleep)
for i, err := range b.errors {
// Print only last 3 errors for non-DEBUG log levels.
if log.GetLevel() == zapcore.DebugLevel || i >= len(b.errors)-3 {
errMsg += "\n" + err.Error()
}
}
logutil.Logger(context.Background()).Warn(errMsg)
// Use the first backoff type to generate a MySQL error.
return b.types[0].TError()
}
Max sleep for each case
func splitTableRanges(t table.PhysicalTable, store kv.Storage, startHandle, endHandle int64) ([]kv.KeyRange, error)
maxSleep := 10000 // ms
bo := tikv.NewBackoffer(context.Background(), maxSleep)
}
func getPhysicalTableRegions(physicalTableID int64, tableInfo *model.TableInfo, tikvStore tikv.Storage, s kv.SplitableStore, uniqueRegionMap map[uint64]struct{}) ([]regionMeta, error) {
recordRegionMetas, err := regionCache.LoadRegionsInKeyRange(tikv.NewBackoffer(context.Background(), 20000), startKey, endKey)
}
func (c *twoPhaseCommitter) execute(ctx context.Context) error {
prewriteBo := NewBackoffer(ctx, prewriteMaxBackoff).WithVars(c.txn.vars)//defaults 20 seconds
commitBo := NewBackoffer(ctx, CommitMaxBackoff).WithVars(c.txn.vars)//defaults 41 seconds
}
Health check and availability
- ELB default health check timeout is 10 sec, which is overly generious for the same region case. Normally 2 second timeout with 3 attempts to declare failure is enough
- Otherwise, by default, the instance will be taken offline at least 30 seconds after the failure happens, with traffic still directed to the faulty instance.
- Problem with time-based availability metrics: MTTF / (MTTF + MTTR)
- In distributed systems, common to have part of it failed somewhere, how do you define “down”?
- How do you differentitate the impact of down during off hour and peak hour
- Problem with count-based availability metrics: % of successful requests
- High volume user has higher impact on this metric
- Less traffic will come when user preceives the system is down, which makes it look better than it actually is
- Not showing how long a system is down
- Both metrics above do not capture the down time pattern and different durations of outages
- The longer the time window, the “better” availability metric appears
How micrometer keeps percentiles and publish to datadog
Data structures (bottom up)
- StepDouble: threadsafe
- By default, each step represent 1 min
- value operations are done by
DoubleWriter
- Within the same step, the returned value of
poll()
will remain same. The value is reset at the each step
- Histogram: either
- hdr-based histrogram
TimeWindowPercentileHistogram
- fixed boundary histograms
TimeWindowFixedBoundaryHistogram
- hdr-based histrogram
- Timer: has an instance of Histogram
- Meter: A named and dimensioned producer of one or more measurements. All the metrics construct we create in the application code, e.g., Counter, Timer, implements meter
- DatadogMeterRegistry: implements
MeterRegistry
->StepMeterRegistry
.- Maintains the list of meters
- Associates a instance of
Timer
with aHistogramGauges
, which translate histogram to the the percentile gauges (with the phi tag) we observe on DD
DatadogMeterRegistry.publish()
- Gets the API URI: by default is the public API. Normally we sent to dogstatd URL on 127.0.0.1
- Partition the
Meter
s in the registry to a nested list ofMeter
s. By default, partition size is 10k - All data in these 10k meters will be published in a single POST call
- By default, this method is triggered once per minute. See
PushMeterRegistry.start()
On working with distributed teams
- Need to run with more processes as if you are bigger team/org
- Massive, if not too much, communication need between offices if teams are not autonomous enough
- Decompose tasks more fine-grained than in the same location
- Massive communication load during OKR settings for each tasks
- Arrive at 5 mins early to ensure techs are ready and thus, real meeting starts on time
- Host side, have one reponsible identifying remote people who want to speak
- Make explicit overlapping business hours
- Make sure results are visible to everyone
- daily and weekly review become a must
- Hire ONLY self-driven people
- Project owner needs to have one go-to doc for
- resources and their owners
- links to detailed docs
- current progress and risks
- One wiki parent page for each team
- Each team sends out weekly report, summarized and distilled at each level
- Centralized test logbook, including PoC runs, so that ideas/data can be reused in the future
- Team should meet face to face at least once per OKR period
- Need a doc that holds context for all stake holders
- Major tasks with owners, progress, impact on the overall progress
- Fine grained enough to ensure daily update
Traps in Chaos Engineering
Notes on this post
- Goal is resilience instead of finding risks
- Don’t measure KR by errors discovers - they show only ignorance instead of risk
- Human under pressure IS part of the system to test. Need to optimize how they react
- Focus on things done right rather than pointing out risks. Don’t chase the fixes
- Providing context and let service owner decide what to do with the vulnerability
- Automate the discovery of what is wrong, but not what should be
- Manual gamedays and chaos experiements in non-prod env are the prerequisite of prod testing
- Creating experiement often has more value than running them
- Define “steady state” at first
When will tidb binlog replication lose data
How tidb writes the binlog
- The pump we deployed is actually the pump server(PS) process. Tidb process has a pump client (PC) that writes to PS during a txn
- PC uses PS’s interface
WriteBinlog(context.Context, *WriteBinlogReq) (*WriteBinlogResp, error)
- PC implmentation:
func (c *PumpsClient) WriteBinlog(binlog *pb.Binlog) error
- PC uses PS’s interface
- PC maintains a list of available PS and a list of unavailable PS
- Subscribe to PD and listen for its updates on PS status
- Reguarly heartbeat to PS on the unavailable list, and try to add it to back to the available list
- Keep an
ErrNum
for each PS. Increment it on failed binlog writes, and reset it on success. If a PS’sErrNum
is too high (default 10), add it to the unavailable list
- If PC failed to write the prewrite binlog:
- If err is
ResourceExhausted
error from grpc, stop retrying - Otherwise, try every single available pump once
- If still no luck, try every single unavailable pump once
- If still no luck, declare that we failed to write to binlog
- If err is
- If PC failed to write the commit binlog, PC will not retry, because PS can recover it from Tikv after 10 mins (default max txn timeout)
- Note this is a common source of replication lag
- When pump gracefully shutdown, e.g., via pdctl, it will endter the readonly mode first, wait until all drainers have read the binlog, and then shutdown
When do we lose binlog
Assuming we turn on ignore-error
- binlog stored on PS is lost forever
- All PSs are down
- PC encounters grpc’s
ResourceExhausted
error. Note this case very likely we don’t lose any binlog, because most likely the main txn commit will fail too
Default settings
// RetryInterval is the interval of retrying to write binlog.
RetryInterval = 100 * time.Millisecond
// DefaultBinlogWriteTimeout is the default max time binlog can use to write to pump.
DefaultBinlogWriteTimeout = 15 * time.Second
How druid CP recycles connections
Selected data structures
public final class DruidConnectionHolder {
protected final DruidAbstractDataSource dataSource;
protected final long connectionId;
protected final Connection conn;
protected final List<ConnectionEventListener> connectionEventListeners = new CopyOnWriteArrayList<ConnectionEventListener>();
protected final List<StatementEventListener> statementEventListeners = new CopyOnWriteArrayList<StatementEventListener>();
protected final long connectTimeMillis;
protected volatile long lastActiveTimeMillis;
protected volatile long lastExecTimeMillis;
protected volatile long lastKeepTimeMillis;
protected volatile long lastValidTimeMillis;
protected long useCount = 0;
private long keepAliveCheckCount = 0;
private long lastNotEmptyWaitNanos;
private final long createNanoSpan;
protected PreparedStatementPool statementPool;
protected final List<Statement> statementTrace = new ArrayList<Statement>(2);
protected final boolean defaultReadOnly;
protected final int defaultHoldability;
protected final int defaultTransactionIsolation;
protected final boolean defaultAutoCommit;
protected boolean underlyingReadOnly;
protected int underlyingHoldability;
protected int underlyingTransactionIsolation;
protected boolean underlyingAutoCommit;
protected volatile boolean discard = false;
protected volatile boolean active = false;
protected final Map<String, Object> variables;
protected final Map<String, Object> globleVariables;
final ReentrantLock lock = new ReentrantLock();
}
How shrink() works
Assume we turn on time check and keep-alive
DruidConnectionHolder connection = connections[i];
long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;
if (idleMillis < minEvictableIdleTimeMillis
&& idleMillis < keepAliveBetweenTimeMillis) {
break; //stop shrinking all together
}
if (idleMillis >= minEvictableIdleTimeMillis) {
if (checkTime && i < checkCount) {
evictConnections[evictCount++] = connection;
continue;
} else if (idleMillis > maxEvictableIdleTimeMillis) {
evictConnections[evictCount++] = connection;
continue;
}
}
if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) {
keepAliveConnections[keepAliveCount++] = connection;
}
int removeCount = evictCount + keepAliveCount;
if (removeCount > 0) {
System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
poolingCount -= removeCount;
}
- After this check, it will validate connections for all that requires keep alive
- Finally
int fillCount = minIdle - (activeCount + poolingCount + createTaskCount); for (int i = 0; i < fillCount; ++i) { emptySignal(); }
- Validation is done via
MySqlValidConnectionChecker
and if it is not set
String query = validateQuery;
if (validateQuery == null || validateQuery.isEmpty()) {
query = DEFAULT_VALIDATION_QUERY;
}
Statement stmt = null;
ResultSet rs = null;
try {
stmt = conn.createStatement();
if (validationQueryTimeout > 0) {
stmt.setQueryTimeout(validationQueryTimeout);
}
rs = stmt.executeQuery(query);
return true;
} finally {
JdbcUtils.close(rs);
JdbcUtils.close(stmt);
}
Atcoder notes
B - Voting Judges
Simple but good for coding excercise
D - Semi Common Multiple
- By substraction we know the delta is the LCM of A
- By factoring we know the init must be the LCM/2, just need to verify that every number satisfies the condition
- What got me is how LCM is calculated - just use lcm formula rolling through the A. Other approachs couldn’t handle catches
- The cnt calc formla can be simplfied to M //(lcm//2) - M // lcm