How pt-archiver works
Most likely we need bulk insertion and deletion mode. Otherwise, single row insertion mode can barely break 2k rows per sec, i.e., max 200M per day
$bulkins_file = File::Temp->new( SUFFIX => 'pt-archiver' )
or die "Cannot open temp file: $OS_ERROR\n";
while ( # Quit if:
$row # There is no data
&& $retries >= 0 # or retries are exceeded
&& (!$o->get('run-time') || $now < $end) # or time is exceeded
&& !-f $sentinel # or the sentinel is set
&& $oktorun # or instructed to quit
)
{
my $lastrow = $row;
my $escaped_row;
$escaped_row = escape([@{$row}[@sel_slice]], $fields_separated_by, $optionally_enclosed_by);
print $bulkins_file $escaped_row, "\n"
or die "Cannot write to bulk file: $OS_ERROR\n";
# Possibly flush the file and commit the insert and delete.
commit($o) unless $commit_each;
# Get the next row in this chunk.
# First time through this loop $get_sth is set to $get_first.
# For non-bulk operations this means that rows ($row) are archived
# one-by-one in in the code block above ("row is archivable"). For
# bulk operations, the 2nd to 2nd-to-last rows are ignored and
# only the first row ($first_row) and the last row ($last_row) of
# this chunk are used to do bulk INSERT or DELETE on the range of
# rows between first and last. After the bulk ops, $first_row and
# $last_row are reset to the next chunk.
# $last_row are reset to the next chunk.
if ( $get_sth->{Active} ) { # Fetch until exhausted
$row = $get_sth->fetchrow_arrayref();
}
if ( !$row ) {
$bulkins_file->close()
or die "Cannot close bulk insert file: $OS_ERROR\n";
my $ins_sth; # Let plugin change which sth is used for the INSERT.
$ins_sth ||= $ins_row; # Default to the sth decided before.
my $success = do_with_retries($o, 'bulk_inserting', sub {
$ins_sth->execute($bulkins_file->filename());
$src->{dbh}->do("SELECT 'pt-archiver keepalive'") if $src;
PTDEBUG && _d('Bulk inserted', $del_row->rows, 'rows');
$statistics{INSERT} += $ins_sth->rows;
});
#Notice no checksum is performed, the correctness is ensured by deletion after the insertion
my $success = do_with_retries($o, 'bulk_deleting', sub {
$del_row->execute(
@{$first_row}[@bulkdel_slice],
@{$lastrow}[@bulkdel_slice],
);
PTDEBUG && _d('Bulk deleted', $del_row->rows, 'rows');
$statistics{DELETE} += $del_row->rows;
});
commit($o, 1) if $commit_each;
$get_sth = $get_next;
PTDEBUG && _d('Fetching rows in next chunk');
trace('select', sub {
my $select_start = time;
$get_sth->execute(@{$lastrow}[@asc_slice]);
$last_select_time = time - $select_start;
PTDEBUG && _d('Fetched', $get_sth->rows, 'rows');
$statistics{SELECT} += $get_sth->rows;
});
@beginning_of_txn = @{$lastrow}[@asc_slice] unless $txn_cnt;
$row = $get_sth->fetchrow_arrayref();
$first_row = $row ? [ @$row ] : undef;
$bulkins_file = File::Temp->new( SUFFIX => 'pt-archiver' )
or die "Cannot open temp file: $OS_ERROR\n";
}
}
How is character escaped
# Formats a row the same way SELECT INTO OUTFILE does by default. This is
# described in the LOAD DATA INFILE section of the MySQL manual,
# http://dev.mysql.com/doc/refman/5.0/en/load-data.html
sub escape {
my ($row, $fields_separated_by, $optionally_enclosed_by) = @_;
$fields_separated_by ||= "\t";
$optionally_enclosed_by ||= '';
# Note that we don't try escaping separator here. So we may have problem escaping if the field contains the separator itself
return join($fields_separated_by, map {
s/([\t\n\\])/\\$1/g if defined $_; # Escape tabs etc
my $s = defined $_ ? $_ : '\N'; # NULL = \N
# var & ~var will return 0 only for numbers
if ($s !~ /^[0-9,.E]+$/ && $optionally_enclosed_by eq '"') {
$s =~ s/([^\\])"/$1\\"/g;
$s = $optionally_enclosed_by."$s".$optionally_enclosed_by;
}
# $_ =~ s/([^\\])"/$1\\"/g if ($_ !~ /^[0-9,.E]+$/ && $optionally_enclosed_by eq '"');
# $_ = $optionally_enclosed_by && ($_ & ~$_) ? $optionally_enclosed_by."$_".$optionally_enclosed_by : $_;
chomp $s;
$s;
} @$row);
}