Code reading notes: sync-diff inspector
Major data structures
// TableConfig is the config of table.
type TableConfig struct {
// table's origin information
TableInstance
// columns be ignored, will not check this column's data
IgnoreColumns []string `toml:"ignore-columns"`
// field should be the primary key, unique key or field with index
Fields string `toml:"index-fields"`
// select range, for example: "age > 10 AND age < 20"
Range string `toml:"range"`
// set true if comparing sharding tables with target table, should have more than one source tables.
IsSharding bool `toml:"is-sharding"`
// saves the source tables's info.
// may have more than one source for sharding tables.
// or you want to compare table with different schema and table name.
// SourceTables can be nil when source and target is one-to-one correspondence.
SourceTables []TableInstance `toml:"source-tables"`
TargetTableInfo *model.TableInfo
// collation config in mysql/tidb
Collation string `toml:"collation"`
}
type Diff struct {
sourceDBs map[string]DBConfig
targetDB DBConfig
chunkSize int
sample int
checkThreadCount int
useChecksum bool
useCheckpoint bool
onlyUseChecksum bool
ignoreDataCheck bool
ignoreStructCheck bool
tables map[string]map[string]*TableConfig
fixSQLFile *os.File
report *Report
tidbInstanceID string
tableRouter *router.Table
cpDB *sql.DB
ctx context.Context
}
// Bound represents a bound for a column
type Bound struct {
Column string `json:"column"`
Lower string `json:"lower"`
Upper string `json:"upper"`
HasLower bool `json:"has-lower"`
HasUpper bool `json:"has-upper"`
}
// ChunkRange represents chunk range
type ChunkRange struct {
ID int `json:"id"`
Bounds []*Bound `json:"bounds"`
Where string `json:"where"`
Args []string `json:"args"`
State string `json:"state"`
columnOffset map[string]int
}
How is diff done
CheckTableData
loads the checkpointed chuncks data from the context. For the first time running, it willSplitChunks
(See below)- Spawns multiple channels to check chuncks -
checkChunkDataEqual
UseChecksum
to detect if we go for checksum mode or row-by-row mode- Checksum is computed by
dbutil.GetCRC32Checksum
bounded bychunk.Where
, which is a SQL formated by
fmt.Sprintf(
"SELECT BIT_XOR(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) AS checksum FROM %s WHERE %s;",
strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), TableName(schemaName, tableName), limitRange)
- An example checksum sql would be
SELECT BIT_XOR(CAST(CRC32(CONCAT_WS(',', id, name, age, CONCAT(ISNULL(id), ISNULL(name), ISNULL(age))))AS UNSIGNED)) AS checksum
FROM test.test WHERE id > 0 AND id < 10;
How are chunks split
- The
Range
config in the toml is passed togetChunksForTable
- Two ways of splitting - bucket spliting and random spliting. Default config goes by random splitting
Random splitting
cnt, err := dbutil.GetRowCount(context.Background(), table.Conn, table.Schema, table.Table, limits, nil)
chunkCnt := (int(cnt) + chunkSize - 1) / chunkSize
chunks, err := splitRangeByRandom(table.Conn, NewChunkRange(), chunkCnt, table.Schema, table.Table, columns, s.limits, s.collation)
- Randomly pick
chunkCnt
number of ids as the splitting ID
SELECT `id` FROM (
SELECT `id`, rand() rand_value FROM `test`.`test`
WHERE `id` COLLATE "latin1_bin" > 0 AND `id` COLLATE "latin1_bin" < 100
ORDER BY rand_value LIMIT 5) rand_tmp
ORDER BY `id` COLLATE "latin1_bin";
- and create new trunk based on these separate id values
Summary Info
CREATE TABLE IF NOT EXISTS `sync_diff_inspector`.`summary`(" +
"`schema` varchar(64), `table` varchar(64)," +
"`chunk_num` int not null default 0," +
"`check_success_num` int not null default 0," +
"`check_failed_num` int not null default 0," +
"`check_ignore_num` int not null default 0," +
"`state` enum('not_checked', 'checking', 'success', 'failed') DEFAULT 'not_checked'," +
"`config_hash` varchar(50)," +
"`update_time` datetime ON UPDATE CURRENT_TIMESTAMP," +
"PRIMARY KEY(`schema`, `table`));
- Updates every 10 secs
SELECT `state`, COUNT(*) FROM @checkpointSchemaName.@chunkTableName
WHERE `instance_id` = ? AND `schema` = ? AND `table` = ?
GROUP BY `state`;
UPDATE `@checkpointSchemaName`.`@summaryTableName`
SET `chunk_num` = ?, `check_success_num` = ?, `check_failed_num` = ?, `check_ignore_num` = ?, `state` = ?
WHERE `schema` = ? AND `table` = ?
Chunk Info
CREATE TABLE IF NOT EXISTS `sync_diff_inspector`.`chunk`(" +
"`chunk_id` int," +
"`instance_id` varchar(64)," +
"`schema` varchar(64)," +
"`table` varchar(64)," +
"`range` text," +
"`checksum` varchar(20)," +
"`chunk_str` text," +
"`state` enum('not_checked', 'checking', 'success', 'failed', 'ignore', 'error') DEFAULT 'not_checked'," +
"`update_time` datetime ON UPDATE CURRENT_TIMESTAMP," +
"PRIMARY KEY(`schema`, `table`, `instance_id`, `chunk_id`));