Major data structures


// TableConfig is the config of table.
type TableConfig struct {
        // table's origin information
        TableInstance
        // columns be ignored, will not check this column's data
        IgnoreColumns []string `toml:"ignore-columns"`
        // field should be the primary key, unique key or field with index
        Fields string `toml:"index-fields"`
        // select range, for example: "age > 10 AND age < 20"
        Range string `toml:"range"`
        // set true if comparing sharding tables with target table, should have more than one source tables.
        IsSharding bool `toml:"is-sharding"`
        // saves the source tables's info.
        // may have more than one source for sharding tables.
        // or you want to compare table with different schema and table name.
        // SourceTables can be nil when source and target is one-to-one correspondence.
        SourceTables    []TableInstance `toml:"source-tables"`
        TargetTableInfo *model.TableInfo

        // collation config in mysql/tidb
        Collation string `toml:"collation"`
}

type Diff struct {
        sourceDBs         map[string]DBConfig
        targetDB          DBConfig
        chunkSize         int
        sample            int
        checkThreadCount  int
        useChecksum       bool
        useCheckpoint     bool
        onlyUseChecksum   bool
        ignoreDataCheck   bool
        ignoreStructCheck bool
        tables            map[string]map[string]*TableConfig
        fixSQLFile        *os.File

        report         *Report
        tidbInstanceID string
        tableRouter    *router.Table
        cpDB           *sql.DB

        ctx context.Context
}

// Bound represents a bound for a column
type Bound struct {
        Column string `json:"column"`
        Lower  string `json:"lower"`
        Upper  string `json:"upper"`

        HasLower bool `json:"has-lower"`
        HasUpper bool `json:"has-upper"`
}

// ChunkRange represents chunk range
type ChunkRange struct {
        ID     int      `json:"id"`
        Bounds []*Bound `json:"bounds"`

        Where string   `json:"where"`
        Args  []string `json:"args"`

        State string `json:"state"`

        columnOffset map[string]int
}

How is diff done

Source

  • CheckTableData loads the checkpointed chuncks data from the context. For the first time running, it will SplitChunks (See below)
  • Spawns multiple channels to check chuncks - checkChunkDataEqual
  • UseChecksum to detect if we go for checksum mode or row-by-row mode
  • Checksum is computed by dbutil.GetCRC32Checksum bounded by chunk.Where, which is a SQL formated by

fmt.Sprintf(
"SELECT BIT_XOR(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) AS checksum FROM %s WHERE %s;",
		strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), TableName(schemaName, tableName), limitRange)

  • An example checksum sql would be

 SELECT BIT_XOR(CAST(CRC32(CONCAT_WS(',', id, name, age, CONCAT(ISNULL(id), ISNULL(name), ISNULL(age))))AS UNSIGNED)) AS checksum 
 FROM test.test WHERE id > 0 AND id < 10;

How are chunks split

Source

  • The Range config in the toml is passed to getChunksForTable
  • Two ways of splitting - bucket spliting and random spliting. Default config goes by random splitting

Random splitting


cnt, err := dbutil.GetRowCount(context.Background(), table.Conn, table.Schema, table.Table, limits, nil)
chunkCnt := (int(cnt) + chunkSize - 1) / chunkSize
chunks, err := splitRangeByRandom(table.Conn, NewChunkRange(), chunkCnt, table.Schema, table.Table, columns, s.limits, s.collation)

  • Randomly pick chunkCnt number of ids as the splitting ID

SELECT `id` FROM (
	SELECT `id`, rand() rand_value FROM `test`.`test`  
	WHERE `id` COLLATE "latin1_bin" > 0 AND `id` COLLATE "latin1_bin" < 100 
	ORDER BY rand_value LIMIT 5) rand_tmp 
ORDER BY `id` COLLATE "latin1_bin";

  • and create new trunk based on these separate id values

Summary Info

Source


CREATE TABLE IF NOT EXISTS `sync_diff_inspector`.`summary`(" +
			"`schema` varchar(64), `table` varchar(64)," +
			"`chunk_num` int not null default 0," +
			"`check_success_num` int not null default 0," +
			"`check_failed_num` int not null default 0," +
			"`check_ignore_num` int not null default 0," +
			"`state` enum('not_checked', 'checking', 'success', 'failed') DEFAULT 'not_checked'," +
			"`config_hash` varchar(50)," +
			"`update_time` datetime ON UPDATE CURRENT_TIMESTAMP," +
			"PRIMARY KEY(`schema`, `table`));
  • Updates every 10 secs

SELECT `state`, COUNT(*) FROM @checkpointSchemaName.@chunkTableName  
WHERE `instance_id` = ? AND `schema` = ? AND `table` = ? 
GROUP BY `state`;

UPDATE `@checkpointSchemaName`.`@summaryTableName` 
SET `chunk_num` = ?, `check_success_num` = ?, `check_failed_num` = ?, `check_ignore_num` = ?, `state` = ? 
WHERE `schema` = ? AND `table` = ?

Chunk Info


CREATE TABLE IF NOT EXISTS `sync_diff_inspector`.`chunk`(" +
			"`chunk_id` int," +
			"`instance_id` varchar(64)," +
			"`schema` varchar(64)," +
			"`table` varchar(64)," +
			"`range` text," +
			"`checksum` varchar(20)," +
			"`chunk_str` text," +
			"`state` enum('not_checked', 'checking', 'success', 'failed', 'ignore', 'error') DEFAULT 'not_checked'," +
			"`update_time` datetime ON UPDATE CURRENT_TIMESTAMP," +
			"PRIMARY KEY(`schema`, `table`, `instance_id`, `chunk_id`));