Source version 2.1

Data structures


// tikvSnapshot implements the kv.Snapshot interface.
type tikvSnapshot struct {
	store        *tikvStore
	version      kv.Version
	priority     pb.CommandPri
	notFillCache bool
	syncLog      bool
	keyOnly      bool
	vars         *kv.Variables
}

//store/tikv/txn.go, see to represent txn actions on the server(tikv) side 
// tikvTxn implements kv.Transaction.
type tikvTxn struct { 
	snapshot  *tikvSnapshot
	us        kv.UnionStore
	store     *tikvStore // for connection to region.
	startTS   uint64
	startTime time.Time // Monotonic timestamp for recording txn time consuming.
	commitTS  uint64
	valid     bool
	lockKeys  [][]byte
	mu        sync.Mutex // For thread-safe LockKeys function.
	dirty     bool
	setCnt    int64
	vars      *kv.Variables
}

//session/txn.go, seems to represent txn on the client(tidb) side

// TxnState wraps kv.Transaction to provide a new kv.Transaction.
// 1. It holds all statement related modification in the buffer before flush to the txn,
// so if execute statement meets error, the txn won't be made dirty.
// 2. It's a lazy transaction, that means it's a txnFuture before StartTS() is really need.
type TxnState struct { 
	// States of a TxnState should be one of the followings:
	// Invalid: kv.Transaction == nil && txnFuture == nil
	// Pending: kv.Transaction == nil && txnFuture != nil
	// Valid:	kv.Transaction != nil && txnFuture == nil
	kv.Transaction //embeded type, means TxnState now implements all interface of Transaction
	txnFuture *txnFuture

	buf          kv.MemBuffer
	mutations    map[int64]*binlog.TableMutation
	dirtyTableOP []dirtyTableOperation

	// If doNotCommit is not nil, Commit() will not commit the transaction.
	// doNotCommit flag may be set when StmtCommit fail.
	doNotCommit error
}

type session struct {
	txn         TxnState

	mu struct {
		sync.RWMutex
		values map[fmt.Stringer]interface{}
	}

	store kv.Storage

	sessionVars    *variable.SessionVars
	sessionManager util.SessionManager
}

// twoPhaseCommitter executes a two-phase commit protocol.
type twoPhaseCommitter struct {
	store     *tikvStore //it is the same store in txn.store
	txn       *tikvTxn 
	startTS   uint64
	keys      [][]byte
	mutations map[string]*pb.Mutation
	lockTTL   uint64
	commitTS  uint64
	mu        struct {
		sync.RWMutex
		committed       bool
		undeterminedErr error // undeterminedErr saves the rpc error we encounter when commit primary key.
	}
	priority pb.CommandPri
	syncLog  bool
	connID   uint64 // connID is used for log.
	cleanWg  sync.WaitGroup
	detail   *execdetails.CommitDetails

	// The max time a Txn may use (in ms) from its startTS to commitTS.
	// We use it to guarantee GC worker will not influence any active txn. The value
	// should be less than GC life time.
	maxTxnTimeUse uint64
}

When a session commits a single table, DML-only transaction

  • Session.doCommit
    • TxnState.Commit
      • TxnState.Transaction.Commit(ctx). which actally calls
func (txn *tikvTxn) Commit(ctx context.Context) error {
	defer txn.close()
	var connID uint64
	val := ctx.Value(sessionctx.ConnID)
	if val != nil {
		connID = val.(uint64)
	}
	committer, err := newTwoPhaseCommitter(txn, connID)
	// latches disabled
	err = committer.executeAndWriteFinishBinlog(ctx)
}
  • newTwoPhaseCommitter
    • iterate though tikvTxn.UnionStore to collect keys to Put and Del into mutations collection
    • add txn.lockKeys to mutations with Op_lock type
  • executeAndWriteFinishBinlog
	err := c.execute(ctx)
	if err != nil {
		c.writeFinishBinlog(binlog.BinlogType_Rollback, 0)
	} else {
		c.txn.commitTS = c.commitTS
		c.writeFinishBinlog(binlog.BinlogType_Commit, int64(c.commitTS))
	}

//Only happen path, and no binlog 
// execute executes the two-phase commit protocol.
func (c *twoPhaseCommitter) execute(ctx context.Context) error {
	prewriteBo := NewBackoffer(ctx, prewriteMaxBackoff).WithVars(c.txn.vars)
	err := c.prewriteKeys(prewriteBo, c.keys)

	commitTS, err := c.store.getTimestampWithRetry(NewBackoffer(ctx, tsoMaxBackoff).WithVars(c.txn.vars))
	c.commitTS = commitTS
	commitBo := NewBackoffer(ctx, CommitMaxBackoff).WithVars(c.txn.vars)
	err = c.commitKeys(commitBo, c.keys)
	return nil
}

  • Prewrite, commit, and clean up (in the failure case) are just different cases of doActionOnKeys

twoPhaseCommitter.doActionOnKeys

//happy path only
func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) error {
	mutations := make([]*pb.Mutation, len(batch.keys))
	for i, k := range batch.keys {
		mutations[i] = c.mutations[string(k)]
	}

	req := &tikvrpc.Request{
		Type: tikvrpc.CmdPrewrite,
		Prewrite: &pb.PrewriteRequest{
			Mutations:    mutations,
			PrimaryLock:  c.primary(),
			StartVersion: c.startTS,
			LockTtl:      c.lockTTL,
			TxnSize:      uint64(len(batch.keys)),
		},
		Context: pb.Context{
			Priority: c.priority,
			SyncLog:  c.syncLog,
		},
	}
	resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort)
}

func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) error {
	req := &tikvrpc.Request{
		Type: tikvrpc.CmdCommit,
		Commit: &pb.CommitRequest{
			StartVersion:  c.startTS,
			Keys:          batch.keys,
			CommitVersion: c.commitTS,
		},
		Context: pb.Context{
			Priority: c.priority,
			SyncLog:  c.syncLog,
		},
	}
	req.Context.Priority = c.priority

	sender := NewRegionRequestSender(c.store.regionCache, c.store.client)
	resp, err := sender.SendReq(bo, req, batch.region, readTimeoutShort)
}

func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitAction, keys [][]byte) error {
	groups, firstRegion, err := c.store.regionCache.GroupKeysByRegion(bo, keys)

	var batches []batchKeys
	var sizeFunc = c.keySize
	if action == actionPrewrite {
		sizeFunc = c.keyValueSize
	}
	// Make sure the group that contains primary lock key goes first.
	batches = appendBatchBySize(batches, firstRegion, groups[firstRegion], sizeFunc, txnCommitBatchSize)
	delete(groups, firstRegion)
	for id, g := range groups { //other regions we don't care the order
		batches = appendBatchBySize(batches, id, g, sizeFunc, txnCommitBatchSize)
	}

	firstIsPrimary := bytes.Equal(keys[0], c.primary())
	if firstIsPrimary && (action == actionCommit || action == actionCleanup) {
		// primary should be committed/cleanup first
		err = c.doActionOnBatches(bo, action, batches[:1])
		batches = batches[1:]
	}
	if action == actionCommit {
		// Commit secondary batches in background goroutine to reduce latency.
		// The backoffer instance is created outside of the goroutine to avoid
		// potencial data race in unit test since `CommitMaxBackoff` will be updated
		// by test suites.
		secondaryBo := NewBackoffer(context.Background(), CommitMaxBackoff)
		go func() {
			e := c.doActionOnBatches(secondaryBo, action, batches)
		}()
	} else {
		err = c.doActionOnBatches(bo, action, batches)
	}
	return errors.Trace(err)
}