System design: square cash

2020-04-13 00:00:00 +0000

Original discussion

High level components

  • API gateway
  • Auth module
  • Payment service records txns seen by the user
    • txn, status, may have a list of ledger entries
  • Book keeping ledger service, keeps track of internal states
    • use double book keeping, i.e., need to differentiate debit and credit
    • Need to know which entry maps to which txn
  • Account balance service for users
    • May merge this as part of the ledger if distributed txn is not desirable
  • Proxy to 3rd party payment and settlement
  • MQ for the async notification from the main txn system

Requirement: our internal txn state should be consistent with external txn state

  • Recon job to scan pending txns after a period
  • provide webhook for 3rd party to ack success

Requirement: users may pay in different currencies

  • Introduce a centralized exchange service
  • Ledger keeps the currency and the original amount, with the rate recorded
  • Need extra ledger entry for internal txns

System design: Dapper

2020-04-11 00:00:00 +0000

Design a dapper-like system for tracing and monitoring

Requirements: drill down a certain request and see its sequence of actions

  • Embed in the header
    • Trace id: root id
    • Parent session id: who called this request
    • Current session id
    • Seq number to mark layer and de-dup
  • Do sampling to control no more than 1k writes per sec
  • Can just store them in (traceid, session) -> table
  • Upon viewing, just load all sessions in a trace on the fly and compute the tree (< 100 sessions per trace)

Requirements: calculate avg latency by node id and time

  • Timeseries db, similar to the design of prometheus
  • For each metric, we maintain (timestamp, machineid, sequence) -> labels, value
  • Suppose 100 metrics sent per sec, 3 months data is 100 * 86400 * 100 = 1B data points, so if we want to plot the data over 3 months we need to scan at most 1 bill rows.
    • It can be parallelized by each point we see on the graph

System design: OLAP

2020-04-10 00:00:00 +0000

Suppose we run an e-commerce web site

Requirement: top 10 in the last 24 hours for each store

  • Need to keep track of
    • For each store, maintains a top 10 list, probably in cache, since it is way more reads than writes
    • txn history with index at (store_id, created_at), so need a seperate txn history service shared by store_id
  • Each store has at most 10k txns per day, so we can afford to load them in memory and compute it at every refresh interval, e.g., 10 mins
    • In fact, 7 days means 1 mil txns, so is good too
  • The calculation of stores can be triggered on-demand, i.e., during the visit, so we don’t have to pull store catalog service, i.e., update once every 1-5 minutes
    • Either synchronously via API call or asyncly via MQ

Requirement: complete item sales rank in the last week by category

  • Category is much more coarse grained than store, espeically basic ones. Expect total < 1 mil.
  • This also means the whole data can fit into the cache
  • Category is a nested-tree like structure, we assume no more than 5 levels (20 ^ 5) = 3.5 mil. Note that on amazon it is more like a DAG with 2 paths. So we assume each item will update at most 10 categories
  • Because it is windowed, we have to keep re-calculating the whole batch to defend against the retired sales case even for the top K.
    • Mini batch can speed up the calculation at the cost of additional complexity, but is needed if we care about the update frequency
    • The brute force calculation is OK if we don’t have strong SLA on the freshness of the data (< 1 hours). It will scan and populate 10 mil per day * 7 days * 10 categories per item = about 1B intermediate rows

Suppose we want to see updates < 1 min, for popular categories, e.g., top 100 hot categories

Populating the raw data

  • Keep track of
    • Last pulled checkpoint
    • (category_id, time, item_id) -> count
  • We periodically pull the txn services to get the txns since the last pulled checkpoint to populate (category_id, time, item_id) -> count
    • Can also triggered by notifications from the main txn service, but need to watch out for frequent update problem, so still need to checkthe last pulled check point
    • Can purge pulled data after 1 week
    • During the pull need to clear all data after the checkpoint first, and do a complete calculation

Calculating the final result

  • Keep track of
    • (category_id, item_id) -> count, last_calced_at
    • (category_id) -> (item id list, last updated_at)
  • Can store the result into cache since we have < 1 mil categories
  • At each refresh interval, scan all items for
    • the count after last_calced_at
    • the count before the now - 7 days
    • Last calculated window count
    • a - b + c to get the current window count
    • update the current window count with new updated_at
  • scan all items in the categories, can compute it in memory
    • we assume it has < 10 mil items per cateogry

Requirement: show a item’s sale number in the last day/week/month/lifetime

Common problems for hugh data processing

2020-04-07 00:00:00 +0000

Find common URLs

Given 2 files, each with 5B URL, each URL has 64B. 4G memory size

  • Memory can store 4 * 10^9 / 64 = 60 M records in total.
  • Each big file has to be loaded about 80 times into memory to read it
  • So calculation the hash of each URL mod 200, and output each to 200 files
  • Load two files with same hash into the same memory, one into a hashset. The other will be doing looking up
    • This step can be parallelized
  • Pipe the results into the final file

1G file with words, each word < 16B. Find the most popular 100 words with 1MB memory

  • Memory fits at least 50k words. File contains at least 100M words
  • Read the file sequentially and hash the words into 2k bucket files, about 0.5 MB per file. Note each file can’t be too huge to defend against the case of all distinct words
  • Read each file to generate the word count for each bucket
  • After it is generated, compare and update with current result in memory
  • if the distict words can fit into memory, we can also use a trie or suffix array to save space

Find all distinct numbers in a file

3B numbers in the file. Unable to fit into the memory

  • Idea 1: hash into bucket files and dedup for each file. Again, each file size < memory to defend against all distinct case
  • Idea 2: bitmap will consume 3G ram. Note bloomfilter may still be too huge because each will consum about 10 bits
  • Similar idea can be used to find if a number exists in the file

Top K in a list of N sorted arrays with same size

  • Maintain a size N heap, and index for each array
  • Populate first element into the heap, with record of which array it is from
  • Get the min of this heap, and get the next item from the same array
  • Stop when we have done this K times

Sort queries by frequency

10 files with 1G size

  • If duplicaiton is high, just use Hashmap in memory to store the count, and then sort over the keys
  • If duplication is low, use bucket hash, and then merge sort

Find median of 5B numbers

  • Idea 1: If can fit into memory, use two heaps, one greater than current median, and one less than current median
  • Invariant: size diff of two heaps < 1
  • Idea 2: Parition based on prefix. Based on size of each file we know count and precisely which file the median resides and in what order, and then we do a precise mapping

System design: Locate the delivery kiosk for user's order

2020-04-06 00:00:00 +0000

Note capacity estimation, storage design, and computation design will keep triggering update of each other

Requirements

  • Each kiosk has multiple delivery boxes, of different dimensions
  • When user places an order, the system should return up to 3 possible kiosks that can accept user’s delivery
  • Handle the NA traffic

Capacity estimation

  • 500 mil NA user, assume each place 1 order at black friday, so peak traffic at 500 mil / 90 k sec per day * 4 = 25 K TPS max
  • Each kiosk has 100 boxes, so it can support 1k population, assumption on normal day 10% people buys. So 500K kiosk is more than enough
  • Each kiosk covers a 0.5 mile radius with little overlap

Storage layer

  • Kiosk needs to keep track of:
  • geohash - needs index to range search
  • number of boxes
  • Box needs to keep track of
  • dimension (H, L, W)
  • parent kiosk
  • Status needs to keep track of
  • Box to the packages ids, one to many
  • parent kiosk if they are in separate service
  • Since kiosk info is rarely updated, kiosk and box info can be in read slaves or cache to scale out read.

Computation sequence

  • Fetch the nearest 10 kiosk from on of the DB’s read slave by range searching on geohash
  • Fetch the empty boxes (< 1k total) in those kiosks, and compute in memory possible empty ones, i.e., WLH > the package’s WLH
  • Return to the user kiosks with empty boxes

Productivity tips and tricks

2020-04-01 00:00:00 +0000

  • Estimate how much time you can allocate, and set aside continous time block for it. Each block should be at least one hour
  • Structured procastination by working on things important but not urgent
  • Checking phone every few minutes is a sign of fatigue, take a break
  • Stop when you feel good and know where the direction is going and come back later
  • Going through email or discuss on a forum is ineffective by nature. Pre-plan time slot for such activities, ideally at fixed time, and don’t worry about it later
  • Don’t think about money or dispute if possible. They are naturally attention sinks
  • Don’t think about damage other people have done to you. They don’t deserve space in your mind

How InnoDB handles deletion

2020-03-31 00:00:00 +0000

  • All user data in InnoDB tables is stored in pages comprising a B-tree index (the clustered index). In some other database systems, this type of index is called an “index-organized table”. Each row in the index node contains the values of the (user-specified or system-generated) primary key and all the other columns of the table.
    • Updates to rows usually rewrite the data within the same page
  • by default, innodb_file_per_table is on. This means each table will have an ibd file
    • each ibd file has multiple segments, each of which is associated with an index
    • each segment consists of multiple 1MB extent
    • Each extend has multiple 16KB pages
  • On delete, the space of the deleted record is marked reusable, if it reaches MERGE_THRESHOLD (default 50%), innodb will try merging it with neighboring pages, and leave the original page blank.
    • Check index_page_merge_successful in INFORMATION_SCHEMA.INNODB_METRICS

Optimize table

  • If you do sequential deletes instead of random deletes, most likely you don’t need to run optimize table, because
  • Purpose of optimize table is to
  • reduce the data_free value in information_schema.tables.
  • defrag index pages.
  • data_free marks the reusable space, and is not accrurate if the table has variable-length column > 768 bytes,e.g., varchar, text
  • Innodb implements opitmize table by alter table force to use the temp table. This also means additional spaces needed during the operation

On forward and backward compatibility

2020-03-26 00:00:00 +0000

Suppose A depends on B to function, i.e., A -> B

  • Forward compatibility means that if a new version of B is deployed, current A should still be able to handle it
    • This means we can upgrade B without breaking A
    • If not, then we have to upgrade A so that it can process both current B and next version of A first
  • Note this is what people normally mean when they talk about backward compatibility, which is not exactly same as the definition. The real definition is that a newer version of A still accepts B

How pt-archiver works

2020-03-25 00:00:00 +0000

Most likely we need bulk insertion and deletion mode. Otherwise, single row insertion mode can barely break 2k rows per sec, i.e., max 200M per day

   $bulkins_file = File::Temp->new( SUFFIX => 'pt-archiver' )
         or die "Cannot open temp file: $OS_ERROR\n";

   while (                                 # Quit if:
      $row                                 # There is no data
      && $retries >= 0                     # or retries are exceeded
      && (!$o->get('run-time') || $now < $end) # or time is exceeded
      && !-f $sentinel                     # or the sentinel is set
      && $oktorun                          # or instructed to quit
      )
   {
 my $lastrow = $row;
      my $escaped_row;
            $escaped_row = escape([@{$row}[@sel_slice]], $fields_separated_by, $optionally_enclosed_by);
  print $bulkins_file $escaped_row, "\n"
                  or die "Cannot write to bulk file: $OS_ERROR\n";
      # Possibly flush the file and commit the insert and delete.
      commit($o) unless $commit_each;

  # Get the next row in this chunk.
      # First time through this loop $get_sth is set to $get_first.
      # For non-bulk operations this means that rows ($row) are archived
      # one-by-one in in the code block above ("row is archivable").  For
      # bulk operations, the 2nd to 2nd-to-last rows are ignored and
      # only the first row ($first_row) and the last row ($last_row) of
      # this chunk are used to do bulk INSERT or DELETE on the range of
      # rows between first and last.  After the bulk ops, $first_row and
      # $last_row are reset to the next chunk.
      # $last_row are reset to the next chunk.
      if ( $get_sth->{Active} ) { # Fetch until exhausted
         $row = $get_sth->fetchrow_arrayref();
      }
  if ( !$row ) {
          $bulkins_file->close()
               or die "Cannot close bulk insert file: $OS_ERROR\n";

            my $ins_sth; # Let plugin change which sth is used for the INSERT.
            $ins_sth ||= $ins_row; # Default to the sth decided before.
            my $success = do_with_retries($o, 'bulk_inserting', sub {
               $ins_sth->execute($bulkins_file->filename());
               $src->{dbh}->do("SELECT 'pt-archiver keepalive'") if $src;
               PTDEBUG && _d('Bulk inserted', $del_row->rows, 'rows');
               $statistics{INSERT} += $ins_sth->rows;
            });

#Notice no checksum is performed, the correctness is ensured by deletion after the insertion 
   my $success = do_with_retries($o, 'bulk_deleting', sub {
                  $del_row->execute(
                     @{$first_row}[@bulkdel_slice],
                     @{$lastrow}[@bulkdel_slice],
                  );
                  PTDEBUG && _d('Bulk deleted', $del_row->rows, 'rows');
                  $statistics{DELETE} += $del_row->rows;
               });

        commit($o, 1) if $commit_each;
         $get_sth = $get_next;
     PTDEBUG && _d('Fetching rows in next chunk');
         trace('select', sub {
            my $select_start = time;
            $get_sth->execute(@{$lastrow}[@asc_slice]);
            $last_select_time = time - $select_start;
            PTDEBUG && _d('Fetched', $get_sth->rows, 'rows');
            $statistics{SELECT} += $get_sth->rows;
         });
  @beginning_of_txn = @{$lastrow}[@asc_slice] unless $txn_cnt;
         $row              = $get_sth->fetchrow_arrayref();
         $first_row        = $row ? [ @$row ] : undef;
 $bulkins_file = File::Temp->new( SUFFIX => 'pt-archiver' )
               or die "Cannot open temp file: $OS_ERROR\n";
}
}

How is character escaped

# Formats a row the same way SELECT INTO OUTFILE does by default.  This is
# described in the LOAD DATA INFILE section of the MySQL manual,
# http://dev.mysql.com/doc/refman/5.0/en/load-data.html
sub escape {
   my ($row, $fields_separated_by, $optionally_enclosed_by) = @_;
   $fields_separated_by ||= "\t";
   $optionally_enclosed_by ||= '';

# Note that we don't try escaping separator here. So we may have problem escaping if the field contains the separator itself
   return join($fields_separated_by, map {
      s/([\t\n\\])/\\$1/g if defined $_;  # Escape tabs etc
      my $s = defined $_ ? $_ : '\N';             # NULL = \N
      # var & ~var will return 0 only for numbers
      if ($s !~ /^[0-9,.E]+$/  && $optionally_enclosed_by eq '"') {
          $s =~ s/([^\\])"/$1\\"/g;
          $s = $optionally_enclosed_by."$s".$optionally_enclosed_by;
      }
      # $_ =~ s/([^\\])"/$1\\"/g if ($_ !~ /^[0-9,.E]+$/  && $optionally_enclosed_by eq '"');
      # $_ = $optionally_enclosed_by && ($_ & ~$_) ? $optionally_enclosed_by."$_".$optionally_enclosed_by : $_;
      chomp $s;
      $s;
   } @$row);
}

On Mysql's async replication for production

2020-03-24 00:00:00 +0000

How async replication works in mysql

  • Async replication is the default behavior. Semi-sync replication is introduced in 5.7
  • Master writes changes to a local binlog file. Note SELECT or SHOW will not be written to the binlog since they don’t affect slave state
  • Note that this binlog is different from innodb’s undo and redo log. It is main purpose is for replication and recovery after restoring from a backup
  • On AWS RDS, binlog is not turned on by default and can be configured to remain alive for max 7 days
  • This additional binlog write introduces additional performance penality. On Aurora, we observe at least 1/3 reduction in through if we turn on binlog
  • The binlog file has a buffer controlled by binlog_cache_size. Temp file will be open buffer is not enough
  • Since it is just a file, it is subject to failures during flush/fsych experienced by normal files. The default behavior is that server will shut down upon detecting such issues
  • Slave pulls from the master. Master does NOT try pushing to the slave
  • By default, slave does not write it is own binlog
  • Any detected corruption/data loss in the binlog means that replication should be restarted from scractch again. Current slave’s data should be discarded

Complications from production setup

On production, mysql is normally on active-standby with semisync or sync replicaiton between them. This introduces additonal complications on async replicaiton. We call current active X, standby Y, the X-Y cluster source, and the async replica sink

From the source pov:

  • To ensure async replication remains working after Y got promoted to master after X failed for some reason, at minimum GTID mode has to be turned, but this caused write stalls on master db, especially Aurora
  • Note that even with GTID mode on, we can not ensure no loss of binlog data which has been persisted on X, although chance is greatly reduced

From the sink pov:

  • Suppose failover between X and Y happened, the pulling connection to X may still be active and alive, while the new active is Y. This means when the sink pulls binlog from a connection pool (most likely in the network applications), it may be pulling from both X and Y , i.e., a split brain problem.
  • The root cause is that during failover, by CAP, the async replicaiton should be killed and not available to avoid the split brain, but the implmentation does not obey this
  • In practice, split brain problem is not likely to cause problem by turning on log_slave_updates, but split brain is a critical situation that should be avoided conceptually.
  • When such split brain replication happens, either fix it through domain knowledge, or discard the current sink and restart the replication from the scratch
  • This means async replication is good for the case where some slacks are allowed in data consistencies and availabilities, e.g.,
  • Cross region failover instances, where we accept the loss/corruption of couple rows so that we can recover in the case of main region data loss
  • read slave for OLAP workload/pipelines. So we have a mostly up-to-date data source with little additional workload on the master
  • Similarly, async replication is not a good fit for mission critical systems - they will have to be on master or a sync-replicated replica

How uid genenerator generates snowflake id

2020-03-16 00:00:00 +0000

Default uid generator

    private long getCurrentSecond() {
        long currentSecond = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
        if (currentSecond - epochSeconds > bitsAllocator.getMaxDeltaSeconds()) {
            throw new UidGenerateException("Timestamp bits is exhausted. Refusing UID generate. Now: " + currentSecond);
        }

        return currentSecond;
    }

   protected synchronized long nextId() {
        long currentSecond = getCurrentSecond();

        // Clock moved backwards, refuse to generate uid
        if (currentSecond < lastSecond) {
            long refusedSeconds = lastSecond - currentSecond;
            throw new UidGenerateException("Clock moved backwards. Refusing for %d seconds", refusedSeconds);
        }

        // At the same second, increase sequence
        if (currentSecond == lastSecond) {
            sequence = (sequence + 1) & bitsAllocator.getMaxSequence();
            // Exceed the max sequence, we wait the next second to generate uid
            if (sequence == 0) {
                currentSecond = getNextSecond(lastSecond); //this one just self-spins
            }

        // At the different second, sequence restart from zero
        } else {
            sequence = 0L;
        }

        lastSecond = currentSecond;

        // Allocate bits for UID
        return bitsAllocator.allocate(currentSecond - epochSeconds, workerId, sequence);
    }

CachedUidGenerator

Data structure


public class BufferPaddingExecutor {

  /** Whether buffer padding is running */
    private final AtomicBoolean running;

    /** We can borrow UIDs from the future, here store the last second we have consumed */
    private final PaddedAtomicLong lastSecond;  // this is inited with system timestamp, after that it becomes the logical second

    /** RingBuffer & BufferUidProvider */
    private final RingBuffer ringBuffer;
    private final BufferedUidProvider uidProvider;

    /** Padding immediately by the thread pool */
    private final ExecutorService bufferPadExecutors;
    /** Padding schedule thread */
    private final ScheduledExecutorService bufferPadSchedule;
}

Init CachedUidGenerator


    private void initRingBuffer() {
        // initialize RingBufferPaddingExecutor
        boolean usingSchedule = (scheduleInterval != null);
        this.bufferPaddingExecutor = new BufferPaddingExecutor(ringBuffer, this::nextIdsForOneSecond, usingSchedule);
        if (usingSchedule) {
            bufferPaddingExecutor.setScheduleInterval(scheduleInterval);
        }
}

 this.lastSecond = new PaddedAtomicLong(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()));

Actual population logic


 public void paddingBuffer() {
     boolean isFullRingBuffer = false;
        while (!isFullRingBuffer) {
            List<Long> uidList = uidProvider.provide(lastSecond.incrementAndGet()); //the only place lastSecond is mutated other than init
            for (Long uid : uidList) {
                isFullRingBuffer = !ringBuffer.put(uid);
                if (isFullRingBuffer) {
                    break;
                }
            }
        }
}

Query timeout for Mysql jdbc

2020-03-11 00:00:00 +0000

Query timeout implementation in mysql jdbc

mysql-connector-j version 5.1.48

Data structures


/**
 * A Statement object is used for executing a static SQL statement and obtaining
 * the results produced by it.
 * 
 * Only one ResultSet per Statement can be open at any point in time. Therefore, if the reading of one ResultSet is interleaved with the reading of another,
 * each must have been generated by different Statements. All statement execute methods implicitly close a statement's current ResultSet if an open one exists.
 */
public class StatementImpl implements Statement {

  /**
     * Thread used to implement query timeouts...Eventually we could be more
     * efficient and have one thread with timers, but this is a straightforward
     * and simple way to implement a feature that isn't used all that often. (?!!!)
     */
    /** The physical connection used to effectively execute the statement */
    protected Reference<MySQLConnection> physicalConnection = null;

    class CancelTask extends TimerTask {
        SQLException caughtWhileCancelling = null;
        StatementImpl toCancel;
        Properties origConnProps = null;
        String origConnURL = "";
}

}

    private BooleanConnectionProperty queryTimeoutKillsConnection = new BooleanConnectionProperty("queryTimeoutKillsConnection", false,
            Messages.getString("ConnectionProperties.queryTimeoutKillsConnection"), "5.1.9", MISC_CATEGORY, Integer.MIN_VALUE);

What does CancelTask do

Inside TimerTask.run(), it starts a new thread. Inside Thread.run()

Only happy path with default value here. Comment inline

                    Connection cancelConn = null;
                    java.sql.Statement cancelStmt = null;

                    try {
                        MySQLConnection physicalConn = StatementImpl.this.physicalConnection.get();
                                synchronized (StatementImpl.this.cancelTimeoutMutex) {
                                    if (CancelTask.this.origConnURL.equals(physicalConn.getURL())) {
                                        // All's fine
                                        cancelConn = physicalConn.duplicate(); //this one will create a new connection
                                        cancelStmt = cancelConn.createStatement();
                                        cancelStmt.execute("KILL QUERY " + physicalConn.getId());
                                    } else {
                                        try {
                                            cancelConn = (Connection) DriverManager.getConnection(CancelTask.this.origConnURL, CancelTask.this.origConnProps);
                                            cancelStmt = cancelConn.createStatement();
                                            cancelStmt.execute("KILL QUERY " + CancelTask.this.origConnId);
                                        } catch (NullPointerException npe) {
                                            // Log this? "Failed to connect to " + origConnURL + " and KILL query"
                                        }
                                    }
                                    CancelTask.this.toCancel.wasCancelled = true;
                                    CancelTask.this.toCancel.wasCancelledByTimeout = true;
                                }
                    } catch (SQLException sqlEx) {
                        CancelTask.this.caughtWhileCancelling = sqlEx;
                    } catch (NullPointerException npe) {
                        // Case when connection closed while starting to cancel.
                        // We can't easily synchronize this, because then one thread can't cancel() a running query.
                        // Ignore, we shouldn't re-throw this, because the connection's already closed, so the statement has been timed out.
                    } finally {
                        if (cancelStmt != null) {
                            try {
                                cancelStmt.close();
                            } catch (SQLException sqlEx) {
                                throw new RuntimeException(sqlEx.toString());
                            }
                        }

                        if (cancelConn != null) {
                            try {
                                cancelConn.close();
                            } catch (SQLException sqlEx) {
                                throw new RuntimeException(sqlEx.toString());
                            }
                        }

                        CancelTask.this.toCancel = null;
                        CancelTask.this.origConnProps = null;
                        CancelTask.this.origConnURL = null;
                    }


How is CancelTask used


 public java.sql.ResultSet executeQuery(String sql) throws SQLException {
 CancelTask timeoutTask = null;
        if (locallyScopedConn.getEnableQueryTimeouts() && this.timeoutInMillis != 0 && locallyScopedConn.versionMeetsMinimum(5, 0, 0)) {
                    timeoutTask = new CancelTask(this);
                    locallyScopedConn.getCancelTimer().schedule(timeoutTask, this.timeoutInMillis);
                }

   this.results = locallyScopedConn.execSQL(this, sql, this.maxRows, null, this.resultSetType, this.resultSetConcurrency,
                        createStreamingResultSet(), this.currentCatalog, cachedFields);

                if (timeoutTask != null) {
                    if (timeoutTask.caughtWhileCancelling != null) {
                        throw timeoutTask.caughtWhileCancelling;
                    }

                    timeoutTask.cancel();

                    locallyScopedConn.getCancelTimer().purge();

                    timeoutTask = null;
                }

 synchronized (this.cancelTimeoutMutex) {
                    if (this.wasCancelled) {
                        SQLException cause = null;

                        if (this.wasCancelledByTimeout) {
                            cause = new MySQLTimeoutException();
                        } else {
                            cause = new MySQLStatementCancelledException();
                        }

                        resetCancelledState();

                        throw cause;
                    }
                }
//inside finally
                if (timeoutTask != null) {
                    timeoutTask.cancel();

                    locallyScopedConn.getCancelTimer().purge();
                }
}

Implication of this implementation

  • By default, if the query times out, the connection will not be closed
  • Sending the KILL query is a best effort attempt. The more reliable way is to use mysql’s max_execution_time hint. Socket timeout can only mitigate but not solve defending against the slow query problem

Reading Notes: Effective Executive

2020-03-10 00:00:00 +0000

Chapter 1 Effectiveness Can Be Learned

  • Manual work is about doing things right. Knowledge work is about doing the right thing
  • Knowledge worker (KWer) can not be supervised in detail. He has to direct himself
  • KWer must make deicsions and live by its result. He should be the best person to make the call
  • KW is defined by its result, NOT costs or quantity
  • KW cann’t let events direct his action for too long - he will be operating
  • Insight the org it is all about effort and cost - even for profit center
  • One tends to be drawn into the challenges inside the org, instead of events to the outside.
  • By the time revelent outside events becomes measureable it is too late, because being able to measure the event means one has to build concept first
  • Change in trends is more important than trend itself, but they can only be preceived rather than counted.
  • Don’t need to know the detail of your collaborationer’s work, but need to know why it is there and what is happening
  • Ask the expected result instead of work to be done
  • Effective decision is based on dissenting opinions rather than concensus on the facts

Chapter 2 Know Thy Time

  • Plan with time rather than work. Consolidate them into the larget possible chunks
  • Need to spend much time with people to get idea across
  • Fast personnel decision is likely to be wrong decisions. Make them several times before committing - need hours of continous and uninterrrupted thoughts
  • Cutting important things by mistake is easy to correct
  • 90 mins continous work + 30 mins interruptive work
  • Schedule morning work period at home. This is better than after dinner because people will be too tired to do a good job

Chapter 3 What Can I Contribute?

  • Contribution is importatnt in all 3 areas: direct result, building of the people, building of the value
  • Justify the payroll based on contribution rather than effort
  • Doing a job well means he is specialized, which also means retooling when moving to a new position

Chapter 4 Making Strength Productive

  • Fill the position by strength rather than weakness. If weakness is blocking the strength, fix it by work or career oppurtunities
  • Build job by task rather than personality,i.e., don’t create job for a person
  • Indispensible man means, at one of
  • he is incompetent to he shields himself from demands
  • the superior is weak
  • his strength is misused to delay tackling a serious problem, if not hiding it
  • Rapidly promoted superior easier to lead success. A un-performing superior only means new one will be brought from outside
  • Help boss overcome limitation and use his strength. Present with area related to his strength first, to help him understand the situation
  • Listening or reading - people have different preference in receiving info
  • Easier to raise the performance of 1 leader instead of the mass. He should be put into performance-making, standard-setting position
  • Claiming the lack of authority/access is often a cover-up for inertia

Chapter 5 First Things First

  • Double buffering of work could work on condition each is given a min amount of continous time
  • Yesterday’s success always lingers longer than the productive life
  • Pressure driven prioritization means focusing on yesterday instead of tomorrow, inside rather than outside, i.e., urgent tasks pushs back important ones
  • Reprioritization at the end of each task

Chapter 6 The Elements of Decision-making

  • Focus on what is “right”, i.e., satisfies specs, before worrying about compromises and concessions. Stakeholders know how to make compromise better than you, but they need your help figuring out what is right
  • Incomplete explanation is often more dangerous than the totally wrong explanation
  • an executive who makes many decisions is both lazy and ineffectual, because he failed to see the truly generic problems

Chapter 7 Effective Decisions

  • Most likely a choice between two courses neither is more provably nearly right than the other
  • Fact requires criteria for relevance
  • Effective decisions grow out of clash of divergent opinions and competing alternatives. In fact, one does not make a decision unless there is disagreement
  • whoever voices the opinion should be responsible for the factual findings
  • Find the measurment is more about risk-taking judgement, since it is a judgement, alternatives measurements must be considered too
  • Measurements sound on paper is still meaningless if you can’t convert to step-to-step guide for a computer
  • Act if on balance the benefits greatly outweigh cost and risk; and Act or do not act; but do not “hedge” or compromise.

What triggers the txnLockFast failure?

2020-03-09 00:00:00 +0000

Source version 2.1

What is txnLockFast error

This is triggered when read txn with read_start_ts happens between the write txn’s write_start_ts and write_commit_ts. Upon seeing this, the read txn will backoff retry later witht the same read_start_ts, so that we preserve snapshot isolation. The effect of txnLockFast is similar to pessimistic lock but with higher latency

tidb_tikvclient_backoff_seconds_count

Triggered by expression increase( tidb_tikvclient_backoff_seconds_count[10m] ) > 10

tidb_tikvclient_backoff_seconds_count is a range vector which contains counters. This expression means it happensin more than 10 of the seconds in the last 10 minutes

Data structures

Retriever is the interface wraps the basic Get and Seek methods. Retriever is implemented by BufferStore -> unionStore -> tikvSnapshot -> tikvTxn

Transaction defines the interface for operations inside a Transaction. Transaction is implemented by tikvTxn -> TxnState

// Scanner support tikv scan
type Scanner struct {
	snapshot     *tikvSnapshot
	batchSize    int
	valid        bool
	cache        []*pb.KvPair
	idx          int
	nextStartKey []byte
	endKey       []byte
	eof          bool

	// Use for reverse scan.
	reverse    bool
	nextEndKey []byte
}

What triggers txnLockFast?

  • Commit check
    • TxnState also implements BatchGet but seems not used.
    • tikvTxn also implments Get but not used except for testing

//part of Retriever interface
func (us *unionStore) Get(k Key) ([]byte, error) {
  us.markLazyConditionPair(k, nil, e.(error))
}

//part of Transaction interface
func (txn *tikvTxn) Commit(ctx context.Context) error {
 txn.us.CheckLazyConditionPairs()
}

func (us *unionStore) CheckLazyConditionPairs() error {
 us.snapshot.BatchGet(keys)//this may trigger txnLockFast
}

  • Scan keys in tikv

func (s *Scanner) Next() error {
   s.resolveCurrentLock(bo, current)
}

func (s *Scanner) resolveCurrentLock(bo *Backoffer, current *pb.KvPair) error {
  s.snapshot.get(bo, kv.Key(current.Key))
}

  • Coprocesser

// handleCopResponse checks coprocessor Response for region split and lock,
// returns more tasks when that happens, or handles the response if no error.
// if we're handling streaming coprocessor response, lastRange is the range of last
// successful response, otherwise it's nil.
func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, resp *copResponse, task *copTask, ch chan<- *copResponse, lastRange *coprocessor.KeyRange) ([]*copTask, error) {

		logutil.Logger(context.Background()).Debug("coprocessor encounters",
			zap.Stringer("lock", lockErr))
		ok, err1 := worker.store.lockResolver.ResolveLocks(bo, []*Lock{NewLock(lockErr)})
		if err1 != nil {
			return nil, errors.Trace(err1)
		}
		if !ok {
			if err := bo.Backoff(boTxnLockFast, errors.New(lockErr.String())); err != nil {
				return nil, errors.Trace(err)
			}
		}
}

How tidb implments 2PC

2020-03-05 00:00:00 +0000

Source version 2.1

Data structures


// tikvSnapshot implements the kv.Snapshot interface.
type tikvSnapshot struct {
	store        *tikvStore
	version      kv.Version
	priority     pb.CommandPri
	notFillCache bool
	syncLog      bool
	keyOnly      bool
	vars         *kv.Variables
}

//store/tikv/txn.go, see to represent txn actions on the server(tikv) side 
// tikvTxn implements kv.Transaction.
type tikvTxn struct { 
	snapshot  *tikvSnapshot
	us        kv.UnionStore
	store     *tikvStore // for connection to region.
	startTS   uint64
	startTime time.Time // Monotonic timestamp for recording txn time consuming.
	commitTS  uint64
	valid     bool
	lockKeys  [][]byte
	mu        sync.Mutex // For thread-safe LockKeys function.
	dirty     bool
	setCnt    int64
	vars      *kv.Variables
}

//session/txn.go, seems to represent txn on the client(tidb) side

// TxnState wraps kv.Transaction to provide a new kv.Transaction.
// 1. It holds all statement related modification in the buffer before flush to the txn,
// so if execute statement meets error, the txn won't be made dirty.
// 2. It's a lazy transaction, that means it's a txnFuture before StartTS() is really need.
type TxnState struct { 
	// States of a TxnState should be one of the followings:
	// Invalid: kv.Transaction == nil && txnFuture == nil
	// Pending: kv.Transaction == nil && txnFuture != nil
	// Valid:	kv.Transaction != nil && txnFuture == nil
	kv.Transaction //embeded type, means TxnState now implements all interface of Transaction
	txnFuture *txnFuture

	buf          kv.MemBuffer
	mutations    map[int64]*binlog.TableMutation
	dirtyTableOP []dirtyTableOperation

	// If doNotCommit is not nil, Commit() will not commit the transaction.
	// doNotCommit flag may be set when StmtCommit fail.
	doNotCommit error
}

type session struct {
	txn         TxnState

	mu struct {
		sync.RWMutex
		values map[fmt.Stringer]interface{}
	}

	store kv.Storage

	sessionVars    *variable.SessionVars
	sessionManager util.SessionManager
}

// twoPhaseCommitter executes a two-phase commit protocol.
type twoPhaseCommitter struct {
	store     *tikvStore //it is the same store in txn.store
	txn       *tikvTxn 
	startTS   uint64
	keys      [][]byte
	mutations map[string]*pb.Mutation
	lockTTL   uint64
	commitTS  uint64
	mu        struct {
		sync.RWMutex
		committed       bool
		undeterminedErr error // undeterminedErr saves the rpc error we encounter when commit primary key.
	}
	priority pb.CommandPri
	syncLog  bool
	connID   uint64 // connID is used for log.
	cleanWg  sync.WaitGroup
	detail   *execdetails.CommitDetails

	// The max time a Txn may use (in ms) from its startTS to commitTS.
	// We use it to guarantee GC worker will not influence any active txn. The value
	// should be less than GC life time.
	maxTxnTimeUse uint64
}

When a session commits a single table, DML-only transaction

  • Session.doCommit
    • TxnState.Commit
      • TxnState.Transaction.Commit(ctx). which actally calls
func (txn *tikvTxn) Commit(ctx context.Context) error {
	defer txn.close()
	var connID uint64
	val := ctx.Value(sessionctx.ConnID)
	if val != nil {
		connID = val.(uint64)
	}
	committer, err := newTwoPhaseCommitter(txn, connID)
	// latches disabled
	err = committer.executeAndWriteFinishBinlog(ctx)
}
  • newTwoPhaseCommitter
    • iterate though tikvTxn.UnionStore to collect keys to Put and Del into mutations collection
    • add txn.lockKeys to mutations with Op_lock type
  • executeAndWriteFinishBinlog
	err := c.execute(ctx)
	if err != nil {
		c.writeFinishBinlog(binlog.BinlogType_Rollback, 0)
	} else {
		c.txn.commitTS = c.commitTS
		c.writeFinishBinlog(binlog.BinlogType_Commit, int64(c.commitTS))
	}

//Only happen path, and no binlog 
// execute executes the two-phase commit protocol.
func (c *twoPhaseCommitter) execute(ctx context.Context) error {
	prewriteBo := NewBackoffer(ctx, prewriteMaxBackoff).WithVars(c.txn.vars)
	err := c.prewriteKeys(prewriteBo, c.keys)

	commitTS, err := c.store.getTimestampWithRetry(NewBackoffer(ctx, tsoMaxBackoff).WithVars(c.txn.vars))
	c.commitTS = commitTS
	commitBo := NewBackoffer(ctx, CommitMaxBackoff).WithVars(c.txn.vars)
	err = c.commitKeys(commitBo, c.keys)
	return nil
}

  • Prewrite, commit, and clean up (in the failure case) are just different cases of doActionOnKeys

twoPhaseCommitter.doActionOnKeys

//happy path only
func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) error {
	mutations := make([]*pb.Mutation, len(batch.keys))
	for i, k := range batch.keys {
		mutations[i] = c.mutations[string(k)]
	}

	req := &tikvrpc.Request{
		Type: tikvrpc.CmdPrewrite,
		Prewrite: &pb.PrewriteRequest{
			Mutations:    mutations,
			PrimaryLock:  c.primary(),
			StartVersion: c.startTS,
			LockTtl:      c.lockTTL,
			TxnSize:      uint64(len(batch.keys)),
		},
		Context: pb.Context{
			Priority: c.priority,
			SyncLog:  c.syncLog,
		},
	}
	resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort)
}

func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) error {
	req := &tikvrpc.Request{
		Type: tikvrpc.CmdCommit,
		Commit: &pb.CommitRequest{
			StartVersion:  c.startTS,
			Keys:          batch.keys,
			CommitVersion: c.commitTS,
		},
		Context: pb.Context{
			Priority: c.priority,
			SyncLog:  c.syncLog,
		},
	}
	req.Context.Priority = c.priority

	sender := NewRegionRequestSender(c.store.regionCache, c.store.client)
	resp, err := sender.SendReq(bo, req, batch.region, readTimeoutShort)
}

func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitAction, keys [][]byte) error {
	groups, firstRegion, err := c.store.regionCache.GroupKeysByRegion(bo, keys)

	var batches []batchKeys
	var sizeFunc = c.keySize
	if action == actionPrewrite {
		sizeFunc = c.keyValueSize
	}
	// Make sure the group that contains primary lock key goes first.
	batches = appendBatchBySize(batches, firstRegion, groups[firstRegion], sizeFunc, txnCommitBatchSize)
	delete(groups, firstRegion)
	for id, g := range groups { //other regions we don't care the order
		batches = appendBatchBySize(batches, id, g, sizeFunc, txnCommitBatchSize)
	}

	firstIsPrimary := bytes.Equal(keys[0], c.primary())
	if firstIsPrimary && (action == actionCommit || action == actionCleanup) {
		// primary should be committed/cleanup first
		err = c.doActionOnBatches(bo, action, batches[:1])
		batches = batches[1:]
	}
	if action == actionCommit {
		// Commit secondary batches in background goroutine to reduce latency.
		// The backoffer instance is created outside of the goroutine to avoid
		// potencial data race in unit test since `CommitMaxBackoff` will be updated
		// by test suites.
		secondaryBo := NewBackoffer(context.Background(), CommitMaxBackoff)
		go func() {
			e := c.doActionOnBatches(secondaryBo, action, batches)
		}()
	} else {
		err = c.doActionOnBatches(bo, action, batches)
	}
	return errors.Trace(err)
}

How tidb retries request to tikv

2020-03-03 00:00:00 +0000

Source code version v2.1.18

Data structures

type Backoffer struct {
	ctx context.Context

	fn         map[backoffType]func(context.Context) int
	maxSleep   int
	totalSleep int
	errors     []error
	types      []backoffType
	vars       *kv.Variables
}
  • Sleep setting for each type retry
    • Time in MS
    • EqualJitter means half of the duration is randomized
NewBackoffFn(base, cap, jitter int) func(ctx context.Context) int


	case boTiKVRPC:
		return NewBackoffFn(100, 2000, EqualJitter)
	case BoTxnLock:
		return NewBackoffFn(200, 3000, EqualJitter)
	case boTxnLockFast:
		return NewBackoffFn(vars.BackoffLockFast, 3000, EqualJitter)
	case boPDRPC:
		return NewBackoffFn(500, 3000, EqualJitter)
	case BoRegionMiss:
		return NewBackoffFn(100, 500, NoJitter)
	case BoUpdateLeader:
		return NewBackoffFn(1, 10, NoJitter)
	case boServerBusy:
		return NewBackoffFn(2000, 10000, EqualJitter)

Default max total sleep time in each case


// Maximum total sleep time(in ms) for kv/cop commands.
const (
	copBuildTaskMaxBackoff         = 5000
	tsoMaxBackoff                  = 15000
	scannerNextMaxBackoff          = 20000
	batchGetMaxBackoff             = 20000
	copNextMaxBackoff              = 20000
	getMaxBackoff                  = 20000
	prewriteMaxBackoff             = 20000
	cleanupMaxBackoff              = 20000
	GcOneRegionMaxBackoff          = 20000
	GcResolveLockMaxBackoff        = 100000
	deleteRangeOneRegionMaxBackoff = 100000
	rawkvMaxBackoff                = 20000
	splitRegionBackoff             = 20000
	scatterRegionBackoff           = 20000
	waitScatterRegionFinishBackoff = 120000
)

When do we stop trying

If the backoff attempts have slept more that configured duration, we will return a MySql error that singals that do not retry anymore

	realSleep := f(b.ctx)
	backoffDuration.Observe(float64(realSleep) / 1000)
	b.totalSleep += realSleep
	b.types = append(b.types, typ)

	var startTs interface{} = ""
	if ts := b.ctx.Value(txnStartKey); ts != nil {
		startTs = ts
	}
	logutil.Logger(context.Background()).Debug("retry later",
		zap.Error(err),
		zap.Int("totalSleep", b.totalSleep),
		zap.Int("maxSleep", b.maxSleep),
		zap.Stringer("type", typ),
		zap.Reflect("txnStartTS", startTs))

	b.errors = append(b.errors, errors.Errorf("%s at %s", err.Error(), time.Now().Format(time.RFC3339Nano)))
	if b.maxSleep > 0 && b.totalSleep >= b.maxSleep {
		errMsg := fmt.Sprintf("backoffer.maxSleep %dms is exceeded, errors:", b.maxSleep)
		for i, err := range b.errors {
			// Print only last 3 errors for non-DEBUG log levels.
			if log.GetLevel() == zapcore.DebugLevel || i >= len(b.errors)-3 {
				errMsg += "\n" + err.Error()
			}
		}
		logutil.Logger(context.Background()).Warn(errMsg)
		// Use the first backoff type to generate a MySQL error.
		return b.types[0].TError()
	}

Max sleep for each case

func splitTableRanges(t table.PhysicalTable, store kv.Storage, startHandle, endHandle int64) ([]kv.KeyRange, error) 
	maxSleep := 10000 // ms
	bo := tikv.NewBackoffer(context.Background(), maxSleep)
}
func getPhysicalTableRegions(physicalTableID int64, tableInfo *model.TableInfo, tikvStore tikv.Storage, s kv.SplitableStore, uniqueRegionMap map[uint64]struct{}) ([]regionMeta, error) {
 recordRegionMetas, err := regionCache.LoadRegionsInKeyRange(tikv.NewBackoffer(context.Background(), 20000), startKey, endKey)
}
func (c *twoPhaseCommitter) execute(ctx context.Context) error {
prewriteBo := NewBackoffer(ctx, prewriteMaxBackoff).WithVars(c.txn.vars)//defaults 20 seconds
commitBo := NewBackoffer(ctx, CommitMaxBackoff).WithVars(c.txn.vars)//defaults 41 seconds
}

Health check and availability

2020-02-26 00:00:00 +0000

the morning paper

  • ELB default health check timeout is 10 sec, which is overly generious for the same region case. Normally 2 second timeout with 3 attempts to declare failure is enough
  • Otherwise, by default, the instance will be taken offline at least 30 seconds after the failure happens, with traffic still directed to the faulty instance.
  • Problem with time-based availability metrics: MTTF / (MTTF + MTTR)
    • In distributed systems, common to have part of it failed somewhere, how do you define “down”?
    • How do you differentitate the impact of down during off hour and peak hour
  • Problem with count-based availability metrics: % of successful requests
    • High volume user has higher impact on this metric
    • Less traffic will come when user preceives the system is down, which makes it look better than it actually is
    • Not showing how long a system is down
  • Both metrics above do not capture the down time pattern and different durations of outages
  • The longer the time window, the “better” availability metric appears

How micrometer keeps percentiles and publish to datadog

2020-02-25 00:00:00 +0000

Data structures (bottom up)

  • StepDouble: threadsafe
    • By default, each step represent 1 min
    • value operations are done by DoubleWriter
    • Within the same step, the returned value of poll() will remain same. The value is reset at the each step
  • Histogram: either
    • hdr-based histrogram TimeWindowPercentileHistogram
    • fixed boundary histograms TimeWindowFixedBoundaryHistogram
  • Timer: has an instance of Histogram
  • Meter: A named and dimensioned producer of one or more measurements. All the metrics construct we create in the application code, e.g., Counter, Timer, implements meter
  • DatadogMeterRegistry: implements MeterRegistry -> StepMeterRegistry.
    • Maintains the list of meters
    • Associates a instance of Timer with a HistogramGauges, which translate histogram to the the percentile gauges (with the phi tag) we observe on DD

DatadogMeterRegistry.publish()

  • Gets the API URI: by default is the public API. Normally we sent to dogstatd URL on 127.0.0.1
  • Partition the Meters in the registry to a nested list of Meters. By default, partition size is 10k
  • All data in these 10k meters will be published in a single POST call
  • By default, this method is triggered once per minute. See PushMeterRegistry.start()

On working with distributed teams

2020-02-07 00:00:00 +0000

  • Need to run with more processes as if you are bigger team/org
  • Massive, if not too much, communication need between offices if teams are not autonomous enough
    • Decompose tasks more fine-grained than in the same location
    • Massive communication load during OKR settings for each tasks
  • Arrive at 5 mins early to ensure techs are ready and thus, real meeting starts on time
  • Host side, have one reponsible identifying remote people who want to speak
  • Make explicit overlapping business hours
  • Make sure results are visible to everyone
    • daily and weekly review become a must
  • Hire ONLY self-driven people
  • Project owner needs to have one go-to doc for
    • resources and their owners
    • links to detailed docs
    • current progress and risks
  • One wiki parent page for each team
  • Each team sends out weekly report, summarized and distilled at each level
  • Centralized test logbook, including PoC runs, so that ideas/data can be reused in the future
  • Team should meet face to face at least once per OKR period
  • Need a doc that holds context for all stake holders
  • Major tasks with owners, progress, impact on the overall progress
  • Fine grained enough to ensure daily update

Traps in Chaos Engineering

2020-02-03 00:00:00 +0000

Notes on this post

  • Goal is resilience instead of finding risks
  • Don’t measure KR by errors discovers - they show only ignorance instead of risk
  • Human under pressure IS part of the system to test. Need to optimize how they react
  • Focus on things done right rather than pointing out risks. Don’t chase the fixes
  • Providing context and let service owner decide what to do with the vulnerability
  • Automate the discovery of what is wrong, but not what should be
  • Manual gamedays and chaos experiements in non-prod env are the prerequisite of prod testing
  • Creating experiement often has more value than running them
  • Define “steady state” at first