How tidb implments 2PC
Source version 2.1
Data structures
// tikvSnapshot implements the kv.Snapshot interface.
type tikvSnapshot struct {
store *tikvStore
version kv.Version
priority pb.CommandPri
notFillCache bool
syncLog bool
keyOnly bool
vars *kv.Variables
}
//store/tikv/txn.go, see to represent txn actions on the server(tikv) side
// tikvTxn implements kv.Transaction.
type tikvTxn struct {
snapshot *tikvSnapshot
us kv.UnionStore
store *tikvStore // for connection to region.
startTS uint64
startTime time.Time // Monotonic timestamp for recording txn time consuming.
commitTS uint64
valid bool
lockKeys [][]byte
mu sync.Mutex // For thread-safe LockKeys function.
dirty bool
setCnt int64
vars *kv.Variables
}
//session/txn.go, seems to represent txn on the client(tidb) side
// TxnState wraps kv.Transaction to provide a new kv.Transaction.
// 1. It holds all statement related modification in the buffer before flush to the txn,
// so if execute statement meets error, the txn won't be made dirty.
// 2. It's a lazy transaction, that means it's a txnFuture before StartTS() is really need.
type TxnState struct {
// States of a TxnState should be one of the followings:
// Invalid: kv.Transaction == nil && txnFuture == nil
// Pending: kv.Transaction == nil && txnFuture != nil
// Valid: kv.Transaction != nil && txnFuture == nil
kv.Transaction //embeded type, means TxnState now implements all interface of Transaction
txnFuture *txnFuture
buf kv.MemBuffer
mutations map[int64]*binlog.TableMutation
dirtyTableOP []dirtyTableOperation
// If doNotCommit is not nil, Commit() will not commit the transaction.
// doNotCommit flag may be set when StmtCommit fail.
doNotCommit error
}
type session struct {
txn TxnState
mu struct {
sync.RWMutex
values map[fmt.Stringer]interface{}
}
store kv.Storage
sessionVars *variable.SessionVars
sessionManager util.SessionManager
}
// twoPhaseCommitter executes a two-phase commit protocol.
type twoPhaseCommitter struct {
store *tikvStore //it is the same store in txn.store
txn *tikvTxn
startTS uint64
keys [][]byte
mutations map[string]*pb.Mutation
lockTTL uint64
commitTS uint64
mu struct {
sync.RWMutex
committed bool
undeterminedErr error // undeterminedErr saves the rpc error we encounter when commit primary key.
}
priority pb.CommandPri
syncLog bool
connID uint64 // connID is used for log.
cleanWg sync.WaitGroup
detail *execdetails.CommitDetails
// The max time a Txn may use (in ms) from its startTS to commitTS.
// We use it to guarantee GC worker will not influence any active txn. The value
// should be less than GC life time.
maxTxnTimeUse uint64
}
When a session commits a single table, DML-only transaction
- Session.doCommit
- TxnState.Commit
- TxnState.Transaction.Commit(ctx). which actally calls
- TxnState.Commit
func (txn *tikvTxn) Commit(ctx context.Context) error {
defer txn.close()
var connID uint64
val := ctx.Value(sessionctx.ConnID)
if val != nil {
connID = val.(uint64)
}
committer, err := newTwoPhaseCommitter(txn, connID)
// latches disabled
err = committer.executeAndWriteFinishBinlog(ctx)
}
- newTwoPhaseCommitter
- iterate though tikvTxn.UnionStore to collect keys to Put and Del into
mutations
collection - add txn.lockKeys to mutations with
Op_lock
type
- iterate though tikvTxn.UnionStore to collect keys to Put and Del into
- executeAndWriteFinishBinlog
err := c.execute(ctx)
if err != nil {
c.writeFinishBinlog(binlog.BinlogType_Rollback, 0)
} else {
c.txn.commitTS = c.commitTS
c.writeFinishBinlog(binlog.BinlogType_Commit, int64(c.commitTS))
}
//Only happen path, and no binlog
// execute executes the two-phase commit protocol.
func (c *twoPhaseCommitter) execute(ctx context.Context) error {
prewriteBo := NewBackoffer(ctx, prewriteMaxBackoff).WithVars(c.txn.vars)
err := c.prewriteKeys(prewriteBo, c.keys)
commitTS, err := c.store.getTimestampWithRetry(NewBackoffer(ctx, tsoMaxBackoff).WithVars(c.txn.vars))
c.commitTS = commitTS
commitBo := NewBackoffer(ctx, CommitMaxBackoff).WithVars(c.txn.vars)
err = c.commitKeys(commitBo, c.keys)
return nil
}
- Prewrite, commit, and clean up (in the failure case) are just different cases of
doActionOnKeys
twoPhaseCommitter.doActionOnKeys
//happy path only
func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys) error {
mutations := make([]*pb.Mutation, len(batch.keys))
for i, k := range batch.keys {
mutations[i] = c.mutations[string(k)]
}
req := &tikvrpc.Request{
Type: tikvrpc.CmdPrewrite,
Prewrite: &pb.PrewriteRequest{
Mutations: mutations,
PrimaryLock: c.primary(),
StartVersion: c.startTS,
LockTtl: c.lockTTL,
TxnSize: uint64(len(batch.keys)),
},
Context: pb.Context{
Priority: c.priority,
SyncLog: c.syncLog,
},
}
resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort)
}
func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) error {
req := &tikvrpc.Request{
Type: tikvrpc.CmdCommit,
Commit: &pb.CommitRequest{
StartVersion: c.startTS,
Keys: batch.keys,
CommitVersion: c.commitTS,
},
Context: pb.Context{
Priority: c.priority,
SyncLog: c.syncLog,
},
}
req.Context.Priority = c.priority
sender := NewRegionRequestSender(c.store.regionCache, c.store.client)
resp, err := sender.SendReq(bo, req, batch.region, readTimeoutShort)
}
func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitAction, keys [][]byte) error {
groups, firstRegion, err := c.store.regionCache.GroupKeysByRegion(bo, keys)
var batches []batchKeys
var sizeFunc = c.keySize
if action == actionPrewrite {
sizeFunc = c.keyValueSize
}
// Make sure the group that contains primary lock key goes first.
batches = appendBatchBySize(batches, firstRegion, groups[firstRegion], sizeFunc, txnCommitBatchSize)
delete(groups, firstRegion)
for id, g := range groups { //other regions we don't care the order
batches = appendBatchBySize(batches, id, g, sizeFunc, txnCommitBatchSize)
}
firstIsPrimary := bytes.Equal(keys[0], c.primary())
if firstIsPrimary && (action == actionCommit || action == actionCleanup) {
// primary should be committed/cleanup first
err = c.doActionOnBatches(bo, action, batches[:1])
batches = batches[1:]
}
if action == actionCommit {
// Commit secondary batches in background goroutine to reduce latency.
// The backoffer instance is created outside of the goroutine to avoid
// potencial data race in unit test since `CommitMaxBackoff` will be updated
// by test suites.
secondaryBo := NewBackoffer(context.Background(), CommitMaxBackoff)
go func() {
e := c.doActionOnBatches(secondaryBo, action, batches)
}()
} else {
err = c.doActionOnBatches(bo, action, batches)
}
return errors.Trace(err)
}