Skip to content

Commit

Permalink
PS-7806: Column compression breaks async replication on PS (percona#5162
Browse files Browse the repository at this point in the history
)

https://jira.percona.com/browse/PS-7806

Work based on the original patch for 8.0 by Nitendra Bhosle.

Problem:
When the statement related to the partitioned table containing
compressed BLOB columns is replicated, replica stops with error.
e.g. DELETE FROM t1 WHERE d2 = 0.00000 ;

Cause:
Queries like mentioned delete are implemented in the following way:
1. Index scan is performed
2. For every matching row, the row is deleted
3. Query is rewritten, using all columns in WHERE clause.

For partitioned tables, during the ordered scan, we read (and keep)
the next record from every partition, and then do ordering of cached
records, returning the first in order
(logic in Partition_helper::handle_ordered_index_scan()).
However, we use common prebuilt->compression_heap which is cleaned up
before every row read. This causes that rows cached for particular
partitions are freed and overwritten by next partition's row during
rows read loop in Partition_helper::handle_ordered_index_scan().
Then the query is being binlogged, but blob pointer may be invalid,
pointing to overwritten memory, so rewritten query contains wrong value
for BLOB column.
When received by replica, such row does not exists and replica stops.

Solution:
Implemented dedicated compression_heap for every partition, similarly to
already existing blob_heap.
  • Loading branch information
kamil-holubicki authored Dec 7, 2023
1 parent 88c81fc commit 6797697
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 0 deletions.
17 changes: 17 additions & 0 deletions mysql-test/suite/rpl/r/rpl_column_compression_async_rpl.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
include/master-slave.inc
Warnings:
Note #### Sending passwords in plain text without SSL/TLS is extremely insecure.
Note #### Storing MySQL user name or password information in the connection metadata repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START REPLICA; see the 'START REPLICA Syntax' in the MySQL Manual for more information.
[connection master]
CREATE DATABASE test7806;
USE test7806;
CREATE TABLE t (ip_col INT, c0 CHAR(26), lb1 LONGBLOB, d2 DOUBLE, lb3 LONGBLOB COLUMN_FORMAT COMPRESSED, f4 FLOAT, v5 VARCHAR(20), INDEX tt_1_pi0(v5 DESC, f4 DESC, lb1(7) DESC, ip_col) , INDEX tt_1_pi1(f4 DESC, lb1(19) DESC, v5, c0 DESC) , INDEX tt_1_pi2(ip_col DESC, c0, lb1(4) DESC, d2 DESC, f4, v5 DESC) , INDEX tt_1_pi3(d2 DESC) , INDEX tt_1_pi4(d2, lb1(1) ASC) , INDEX tt_1_pi5(lb1(4) DESC, d2 ASC, v5, f4 DESC) , INDEX tt_1_pi6(d2, v5 DESC, f4, lb1(11)) ) ROW_FORMAT=REDUNDANT ENGINE=INNODB PARTITION BY KEY (ip_col) PARTITIONS 24 ;
INSERT INTO t ( ip_col ,c0 ,lb1 ,d2 ,lb3 ,f4 ,v5 ) VALUES( 7731, 'cRCs1irmg7F1WeZqJtxCleKm', 'zGylBmoT3wjZicqkerK3BVKmAzCOZGKmKffbFom3oNcStfiorYpuMi0ZBbgO5zeDnE04TZI0CoycKtHinA9WcnNOtqJBciSqiaduaxpf6ptlzWdwq7d9aVA2EPjCHIdCL4eOSNdx1bUebos4qmEkMEuznY630v7aM7En6eaJfpXo1Gg5CYNwTNsj1dHyo7C8pe5x0HmXQRa2KvPuTrasrNJeCbcntrNwo4jlNFOywkQbGxBd41TPzRK8rGNJWjUMZdq99T27tUirTcl9TaQNUWxFQHc3iNvHFvk6HsLMJV1iYtn7iTbGPjAqiSCVnxcvDvqCfqmB1oMVhrXWXLUSFbmu7EIMcXRzqiVLh3VwLsnRnAQOqELoCPx05VsoV9MwPsQoZbc53CEhmpmUirwrETaenFS8ExoF2P2OYgbPtyzjNQbt0bVRH4u6dRob7R5KzbNpRzev64ggXtb0s5ADOiSMKXbvtcrpjugcTU3vzdv9RsnLebJEQ5YFdfjQWOmqVFKjAjRNXoQYjjNeMNkvvAkgvOo7', 0.00000, 'L7sJjPWAkQObv0LCRUoifwYLmTA9Qjxj7HD6gCkE7xG4Fx00tfsrQw7AXTH6gGZmOvOyy9m9LMOIVhOHCTMqfvuGXDhz5Eqri8BLD3SQhbOZmXWxvq1nu32NWDUOyxAGRbO9xJHNaJ8ZX1ajW2pS84YZjj2yb6x9SupVASP2jNl1vLpgcTJ9eodpGFmy9yLlLJi0RyRS', 0.00, 'lFve5HiZaVPY' ) ;
INSERT INTO t ( ip_col ,c0 ,lb1 ,d2 ,lb3 ,f4 ,v5 ) VALUES( 4314, 'C', '9v2pBuu9PACawxmCC5eoYEsDHNmOaJLKqoW994gvKqEO35F8goMG9o', 0.00000, 'vio1lMJ3e9sGa5tKRz1iJwsHXjKw8uQelcv9o5tBzmgP2YDjG4bB5OK9IaBpWaf3I1Muwd06FFgT8pZKc2GyHCQ3bcqab83yKXe8BWpWFpw3sOb1rdM7uOCvGmSvGKzfBgaPfGCgqbx29kTcQxOzYJd5zGuEd32gQfocLNdttxPiBjyIIPg3kTI8US73sNN4UoEzCcC4XZJ8lWKAqVbDQmNDwV7kZZGb7s8JMjsJ0XTcAKf0sl9lw5L6albce2GqwsSDv', 0.00, '3VK82eQXa' ) ;
DELETE FROM t WHERE d2 = 0.00000 ;
include/sync_slave_sql_with_master.inc
include/rpl_diff.inc
[connection master]
DROP TABLE t;
DROP DATABASE test7806;
include/rpl_end.inc
27 changes: 27 additions & 0 deletions mysql-test/suite/rpl/t/rpl_column_compression_async_rpl.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#
# PS-7806 : Column compression breaks async replication on PS
#
--source include/have_binlog_format_row.inc
--source include/master-slave.inc

CREATE DATABASE test7806;
USE test7806;

CREATE TABLE t (ip_col INT, c0 CHAR(26), lb1 LONGBLOB, d2 DOUBLE, lb3 LONGBLOB COLUMN_FORMAT COMPRESSED, f4 FLOAT, v5 VARCHAR(20), INDEX tt_1_pi0(v5 DESC, f4 DESC, lb1(7) DESC, ip_col) , INDEX tt_1_pi1(f4 DESC, lb1(19) DESC, v5, c0 DESC) , INDEX tt_1_pi2(ip_col DESC, c0, lb1(4) DESC, d2 DESC, f4, v5 DESC) , INDEX tt_1_pi3(d2 DESC) , INDEX tt_1_pi4(d2, lb1(1) ASC) , INDEX tt_1_pi5(lb1(4) DESC, d2 ASC, v5, f4 DESC) , INDEX tt_1_pi6(d2, v5 DESC, f4, lb1(11)) ) ROW_FORMAT=REDUNDANT ENGINE=INNODB PARTITION BY KEY (ip_col) PARTITIONS 24 ;

INSERT INTO t ( ip_col ,c0 ,lb1 ,d2 ,lb3 ,f4 ,v5 ) VALUES( 7731, 'cRCs1irmg7F1WeZqJtxCleKm', 'zGylBmoT3wjZicqkerK3BVKmAzCOZGKmKffbFom3oNcStfiorYpuMi0ZBbgO5zeDnE04TZI0CoycKtHinA9WcnNOtqJBciSqiaduaxpf6ptlzWdwq7d9aVA2EPjCHIdCL4eOSNdx1bUebos4qmEkMEuznY630v7aM7En6eaJfpXo1Gg5CYNwTNsj1dHyo7C8pe5x0HmXQRa2KvPuTrasrNJeCbcntrNwo4jlNFOywkQbGxBd41TPzRK8rGNJWjUMZdq99T27tUirTcl9TaQNUWxFQHc3iNvHFvk6HsLMJV1iYtn7iTbGPjAqiSCVnxcvDvqCfqmB1oMVhrXWXLUSFbmu7EIMcXRzqiVLh3VwLsnRnAQOqELoCPx05VsoV9MwPsQoZbc53CEhmpmUirwrETaenFS8ExoF2P2OYgbPtyzjNQbt0bVRH4u6dRob7R5KzbNpRzev64ggXtb0s5ADOiSMKXbvtcrpjugcTU3vzdv9RsnLebJEQ5YFdfjQWOmqVFKjAjRNXoQYjjNeMNkvvAkgvOo7', 0.00000, 'L7sJjPWAkQObv0LCRUoifwYLmTA9Qjxj7HD6gCkE7xG4Fx00tfsrQw7AXTH6gGZmOvOyy9m9LMOIVhOHCTMqfvuGXDhz5Eqri8BLD3SQhbOZmXWxvq1nu32NWDUOyxAGRbO9xJHNaJ8ZX1ajW2pS84YZjj2yb6x9SupVASP2jNl1vLpgcTJ9eodpGFmy9yLlLJi0RyRS', 0.00, 'lFve5HiZaVPY' ) ;

INSERT INTO t ( ip_col ,c0 ,lb1 ,d2 ,lb3 ,f4 ,v5 ) VALUES( 4314, 'C', '9v2pBuu9PACawxmCC5eoYEsDHNmOaJLKqoW994gvKqEO35F8goMG9o', 0.00000, 'vio1lMJ3e9sGa5tKRz1iJwsHXjKw8uQelcv9o5tBzmgP2YDjG4bB5OK9IaBpWaf3I1Muwd06FFgT8pZKc2GyHCQ3bcqab83yKXe8BWpWFpw3sOb1rdM7uOCvGmSvGKzfBgaPfGCgqbx29kTcQxOzYJd5zGuEd32gQfocLNdttxPiBjyIIPg3kTI8US73sNN4UoEzCcC4XZJ8lWKAqVbDQmNDwV7kZZGb7s8JMjsJ0XTcAKf0sl9lw5L6albce2GqwsSDv', 0.00, '3VK82eQXa' ) ;

DELETE FROM t WHERE d2 = 0.00000 ;

--source include/sync_slave_sql_with_master.inc
--let $rpl_diff_statement = SELECT * FROM test7806.t;
--source include/rpl_diff.inc

--source include/rpl_connection_master.inc

DROP TABLE t;
DROP DATABASE test7806;

--source include/rpl_end.inc
36 changes: 36 additions & 0 deletions storage/innobase/handler/ha_innopart.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1136,6 +1136,7 @@ int ha_innopart::open(const char *name, int, uint, const dd::Table *table_def) {
ut_a(part.m_row_read_type == 0);
ut_a(part.m_trx_id == 0);
ut_a(part.m_blob_heap == nullptr);
ut_a(part.m_compress_heap == nullptr);
ut_a(0 == part.m_new_rec_lock.count());
}
#endif
Expand Down Expand Up @@ -1244,6 +1245,7 @@ int ha_innopart::close() {
}
clear_ins_upd_nodes();
clear_blob_heaps();
clear_compress_heaps();

/* Prevent double close of m_prebuilt->table. The real one was done
done in m_part_share->close_table_parts(). */
Expand Down Expand Up @@ -1304,13 +1306,19 @@ void ha_innopart::set_partition(uint part_id) {
/* For unordered scan and table scan, use blob_heap from first
partition as we need exactly one blob. */
m_prebuilt->blob_heap = m_parts[m_ordered ? part_id : 0].m_blob_heap;
m_prebuilt->compress_heap = m_parts[m_ordered ? part_id : 0].m_compress_heap;

#ifdef UNIV_DEBUG
if (m_prebuilt->blob_heap != nullptr) {
DBUG_PRINT("ha_innopart",
("validating blob_heap: %p", m_prebuilt->blob_heap));
mem_heap_validate(m_prebuilt->blob_heap);
}
if (m_prebuilt->compress_heap != nullptr) {
DBUG_PRINT("ha_innopart",
("validating compress_heap: %p", m_prebuilt->compress_heap));
mem_heap_validate(m_prebuilt->compress_heap);
}
#endif

m_prebuilt->trx_id = part.m_trx_id;
Expand Down Expand Up @@ -1349,11 +1357,17 @@ void ha_innopart::update_partition(uint part_id) {
("validating blob_heap: %p", m_prebuilt->blob_heap));
mem_heap_validate(m_prebuilt->blob_heap);
}
if (m_prebuilt->compress_heap != nullptr) {
DBUG_PRINT("ha_innopart",
("validating compress_heap: %p", m_prebuilt->compress_heap));
mem_heap_validate(m_prebuilt->compress_heap);
}
#endif

/* For unordered scan and table scan, use blob_heap from first
partition as we need exactly one blob anytime. */
m_parts[m_ordered ? part_id : 0].m_blob_heap = m_prebuilt->blob_heap;
m_parts[m_ordered ? part_id : 0].m_compress_heap = m_prebuilt->compress_heap;

part.m_trx_id = m_prebuilt->trx_id;
part.m_row_read_type = m_prebuilt->row_read_type;
Expand Down Expand Up @@ -4189,12 +4203,34 @@ void ha_innopart::clear_blob_heaps() {
m_prebuilt->blob_heap = nullptr;
}

void ha_innopart::clear_compress_heaps() {
DBUG_TRACE;
if (m_parts == nullptr) {
return;
}

for (uint i = 0; i < m_tot_parts; i++) {
auto &part{m_parts[i]};
if (part.m_compress_heap != nullptr) {
DBUG_PRINT("ha_innopart",
("freeing compress_heap: %p", part.m_compress_heap));
mem_heap_free(part.m_compress_heap);
part.m_compress_heap = nullptr;
}
}

/* Reset compress_heap in m_prebuilt after freeing all heaps. It is set in
ha_innopart::set_partition to the compress heap of current partition. */
m_prebuilt->compress_heap = nullptr;
}

/** Reset state of file to after 'open'. This function is called
after every statement for all tables used by that statement. */
int ha_innopart::reset() {
DBUG_TRACE;

clear_blob_heaps();
clear_compress_heaps();

return ha_innobase::reset();
}
Expand Down
6 changes: 6 additions & 0 deletions storage/innobase/handler/ha_innopart.h
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,9 @@ class ha_innopart : public ha_innobase,
/** saved m_prebuilt->blob_heap */
mem_heap_t *m_blob_heap;

/** saved prebuilt->compress_heap */
mem_heap_t *m_compress_heap;

/** saved m_prebuilt->trx_id (which in turn reflects table->def_trx_id) */
trx_id_t m_trx_id;

Expand Down Expand Up @@ -684,6 +687,9 @@ class ha_innopart : public ha_innobase,
/** Clear the blob heaps for all partitions */
void clear_blob_heaps();

/** Clear the compress heaps for all partitions */
void clear_compress_heaps();

/** Reset state of file to after 'open'. This function is called
after every statement for all tables used by that statement. */
int reset() override;
Expand Down

0 comments on commit 6797697

Please sign in to comment.