From 8be2526d5f57effd5e5b68bd647ee5f8e4b7b97c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Ho=C5=82ubicki?= Date: Thu, 7 Dec 2023 14:39:23 +0100 Subject: [PATCH] [compression] PS-7806: Column compression breaks async replication on PS (#5162) https://jira.percona.com/browse/PS-7806 Work based on the original patch for 8.0 by Nitendra Bhosle. Problem: When the statement related to the partitioned table containing compressed BLOB columns is replicated, replica stops with error. e.g. DELETE FROM t1 WHERE d2 = 0.00000 ; Cause: Queries like mentioned delete are implemented in the following way: 1. Index scan is performed 2. For every matching row, the row is deleted 3. Query is rewritten, using all columns in WHERE clause. For partitioned tables, during the ordered scan, we read (and keep) the next record from every partition, and then do ordering of cached records, returning the first in order (logic in Partition_helper::handle_ordered_index_scan()). However, we use common prebuilt->compression_heap which is cleaned up before every row read. This causes that rows cached for particular partitions are freed and overwritten by next partition's row during rows read loop in Partition_helper::handle_ordered_index_scan(). Then the query is being binlogged, but blob pointer may be invalid, pointing to overwritten memory, so rewritten query contains wrong value for BLOB column. When received by replica, such row does not exists and replica stops. Solution: Implemented dedicated compression_heap for every partition, similarly to already existing blob_heap. (cherry picked from commit 67976975e62b08f3a43962bccca278607afa14b2) ---------------------------------------------------------------------- PS-9218: Merge MySQL 8.4.0 (fix terminology in replication tests) https://perconadev.atlassian.net/browse/PS-9218 https://github.com/mysql/mysql-server/commit/44a77b5ba29994f8d3634b62c2825481574fc057 --- .../r/rpl_column_compression_async_rpl.result | 17 +++++++++ .../t/rpl_column_compression_async_rpl.test | 27 ++++++++++++++ storage/innobase/handler/ha_innopart.cc | 36 +++++++++++++++++++ storage/innobase/handler/ha_innopart.h | 6 ++++ 4 files changed, 86 insertions(+) create mode 100644 mysql-test/suite/rpl/r/rpl_column_compression_async_rpl.result create mode 100644 mysql-test/suite/rpl/t/rpl_column_compression_async_rpl.test diff --git a/mysql-test/suite/rpl/r/rpl_column_compression_async_rpl.result b/mysql-test/suite/rpl/r/rpl_column_compression_async_rpl.result new file mode 100644 index 000000000000..005ae4069060 --- /dev/null +++ b/mysql-test/suite/rpl/r/rpl_column_compression_async_rpl.result @@ -0,0 +1,17 @@ +include/rpl/init_source_replica.inc +Warnings: +Note #### Sending passwords in plain text without SSL/TLS is extremely insecure. +Note #### Storing MySQL user name or password information in the connection metadata repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START REPLICA; see the 'START REPLICA Syntax' in the MySQL Manual for more information. +[connection master] +CREATE DATABASE test7806; +USE test7806; +CREATE TABLE t (ip_col INT, c0 CHAR(26), lb1 LONGBLOB, d2 DOUBLE, lb3 LONGBLOB COLUMN_FORMAT COMPRESSED, f4 FLOAT, v5 VARCHAR(20), INDEX tt_1_pi0(v5 DESC, f4 DESC, lb1(7) DESC, ip_col) , INDEX tt_1_pi1(f4 DESC, lb1(19) DESC, v5, c0 DESC) , INDEX tt_1_pi2(ip_col DESC, c0, lb1(4) DESC, d2 DESC, f4, v5 DESC) , INDEX tt_1_pi3(d2 DESC) , INDEX tt_1_pi4(d2, lb1(1) ASC) , INDEX tt_1_pi5(lb1(4) DESC, d2 ASC, v5, f4 DESC) , INDEX tt_1_pi6(d2, v5 DESC, f4, lb1(11)) ) ROW_FORMAT=REDUNDANT ENGINE=INNODB PARTITION BY KEY (ip_col) PARTITIONS 24 ; +INSERT INTO t ( ip_col ,c0 ,lb1 ,d2 ,lb3 ,f4 ,v5 ) VALUES( 7731, 'cRCs1irmg7F1WeZqJtxCleKm', 'zGylBmoT3wjZicqkerK3BVKmAzCOZGKmKffbFom3oNcStfiorYpuMi0ZBbgO5zeDnE04TZI0CoycKtHinA9WcnNOtqJBciSqiaduaxpf6ptlzWdwq7d9aVA2EPjCHIdCL4eOSNdx1bUebos4qmEkMEuznY630v7aM7En6eaJfpXo1Gg5CYNwTNsj1dHyo7C8pe5x0HmXQRa2KvPuTrasrNJeCbcntrNwo4jlNFOywkQbGxBd41TPzRK8rGNJWjUMZdq99T27tUirTcl9TaQNUWxFQHc3iNvHFvk6HsLMJV1iYtn7iTbGPjAqiSCVnxcvDvqCfqmB1oMVhrXWXLUSFbmu7EIMcXRzqiVLh3VwLsnRnAQOqELoCPx05VsoV9MwPsQoZbc53CEhmpmUirwrETaenFS8ExoF2P2OYgbPtyzjNQbt0bVRH4u6dRob7R5KzbNpRzev64ggXtb0s5ADOiSMKXbvtcrpjugcTU3vzdv9RsnLebJEQ5YFdfjQWOmqVFKjAjRNXoQYjjNeMNkvvAkgvOo7', 0.00000, 'L7sJjPWAkQObv0LCRUoifwYLmTA9Qjxj7HD6gCkE7xG4Fx00tfsrQw7AXTH6gGZmOvOyy9m9LMOIVhOHCTMqfvuGXDhz5Eqri8BLD3SQhbOZmXWxvq1nu32NWDUOyxAGRbO9xJHNaJ8ZX1ajW2pS84YZjj2yb6x9SupVASP2jNl1vLpgcTJ9eodpGFmy9yLlLJi0RyRS', 0.00, 'lFve5HiZaVPY' ) ; +INSERT INTO t ( ip_col ,c0 ,lb1 ,d2 ,lb3 ,f4 ,v5 ) VALUES( 4314, 'C', '9v2pBuu9PACawxmCC5eoYEsDHNmOaJLKqoW994gvKqEO35F8goMG9o', 0.00000, 'vio1lMJ3e9sGa5tKRz1iJwsHXjKw8uQelcv9o5tBzmgP2YDjG4bB5OK9IaBpWaf3I1Muwd06FFgT8pZKc2GyHCQ3bcqab83yKXe8BWpWFpw3sOb1rdM7uOCvGmSvGKzfBgaPfGCgqbx29kTcQxOzYJd5zGuEd32gQfocLNdttxPiBjyIIPg3kTI8US73sNN4UoEzCcC4XZJ8lWKAqVbDQmNDwV7kZZGb7s8JMjsJ0XTcAKf0sl9lw5L6albce2GqwsSDv', 0.00, '3VK82eQXa' ) ; +DELETE FROM t WHERE d2 = 0.00000 ; +include/rpl/sync_to_replica.inc +include/rpl/diff.inc +[connection master] +DROP TABLE t; +DROP DATABASE test7806; +include/rpl/deinit.inc diff --git a/mysql-test/suite/rpl/t/rpl_column_compression_async_rpl.test b/mysql-test/suite/rpl/t/rpl_column_compression_async_rpl.test new file mode 100644 index 000000000000..f10fc613f34c --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_column_compression_async_rpl.test @@ -0,0 +1,27 @@ +# +# PS-7806 : Column compression breaks async replication on PS +# +--source include/have_binlog_format_row.inc +--source include/rpl/init_source_replica.inc + +CREATE DATABASE test7806; +USE test7806; + +CREATE TABLE t (ip_col INT, c0 CHAR(26), lb1 LONGBLOB, d2 DOUBLE, lb3 LONGBLOB COLUMN_FORMAT COMPRESSED, f4 FLOAT, v5 VARCHAR(20), INDEX tt_1_pi0(v5 DESC, f4 DESC, lb1(7) DESC, ip_col) , INDEX tt_1_pi1(f4 DESC, lb1(19) DESC, v5, c0 DESC) , INDEX tt_1_pi2(ip_col DESC, c0, lb1(4) DESC, d2 DESC, f4, v5 DESC) , INDEX tt_1_pi3(d2 DESC) , INDEX tt_1_pi4(d2, lb1(1) ASC) , INDEX tt_1_pi5(lb1(4) DESC, d2 ASC, v5, f4 DESC) , INDEX tt_1_pi6(d2, v5 DESC, f4, lb1(11)) ) ROW_FORMAT=REDUNDANT ENGINE=INNODB PARTITION BY KEY (ip_col) PARTITIONS 24 ; + +INSERT INTO t ( ip_col ,c0 ,lb1 ,d2 ,lb3 ,f4 ,v5 ) VALUES( 7731, 'cRCs1irmg7F1WeZqJtxCleKm', 'zGylBmoT3wjZicqkerK3BVKmAzCOZGKmKffbFom3oNcStfiorYpuMi0ZBbgO5zeDnE04TZI0CoycKtHinA9WcnNOtqJBciSqiaduaxpf6ptlzWdwq7d9aVA2EPjCHIdCL4eOSNdx1bUebos4qmEkMEuznY630v7aM7En6eaJfpXo1Gg5CYNwTNsj1dHyo7C8pe5x0HmXQRa2KvPuTrasrNJeCbcntrNwo4jlNFOywkQbGxBd41TPzRK8rGNJWjUMZdq99T27tUirTcl9TaQNUWxFQHc3iNvHFvk6HsLMJV1iYtn7iTbGPjAqiSCVnxcvDvqCfqmB1oMVhrXWXLUSFbmu7EIMcXRzqiVLh3VwLsnRnAQOqELoCPx05VsoV9MwPsQoZbc53CEhmpmUirwrETaenFS8ExoF2P2OYgbPtyzjNQbt0bVRH4u6dRob7R5KzbNpRzev64ggXtb0s5ADOiSMKXbvtcrpjugcTU3vzdv9RsnLebJEQ5YFdfjQWOmqVFKjAjRNXoQYjjNeMNkvvAkgvOo7', 0.00000, 'L7sJjPWAkQObv0LCRUoifwYLmTA9Qjxj7HD6gCkE7xG4Fx00tfsrQw7AXTH6gGZmOvOyy9m9LMOIVhOHCTMqfvuGXDhz5Eqri8BLD3SQhbOZmXWxvq1nu32NWDUOyxAGRbO9xJHNaJ8ZX1ajW2pS84YZjj2yb6x9SupVASP2jNl1vLpgcTJ9eodpGFmy9yLlLJi0RyRS', 0.00, 'lFve5HiZaVPY' ) ; + +INSERT INTO t ( ip_col ,c0 ,lb1 ,d2 ,lb3 ,f4 ,v5 ) VALUES( 4314, 'C', '9v2pBuu9PACawxmCC5eoYEsDHNmOaJLKqoW994gvKqEO35F8goMG9o', 0.00000, 'vio1lMJ3e9sGa5tKRz1iJwsHXjKw8uQelcv9o5tBzmgP2YDjG4bB5OK9IaBpWaf3I1Muwd06FFgT8pZKc2GyHCQ3bcqab83yKXe8BWpWFpw3sOb1rdM7uOCvGmSvGKzfBgaPfGCgqbx29kTcQxOzYJd5zGuEd32gQfocLNdttxPiBjyIIPg3kTI8US73sNN4UoEzCcC4XZJ8lWKAqVbDQmNDwV7kZZGb7s8JMjsJ0XTcAKf0sl9lw5L6albce2GqwsSDv', 0.00, '3VK82eQXa' ) ; + +DELETE FROM t WHERE d2 = 0.00000 ; + +--source include/rpl/sync_to_replica.inc +--let $rpl_diff_statement = SELECT * FROM test7806.t; +--source include/rpl/diff.inc + +--source include/rpl/connection_source.inc + +DROP TABLE t; +DROP DATABASE test7806; + +--source include/rpl/deinit.inc diff --git a/storage/innobase/handler/ha_innopart.cc b/storage/innobase/handler/ha_innopart.cc index b17762b6d03e..49c313c7cb9c 100644 --- a/storage/innobase/handler/ha_innopart.cc +++ b/storage/innobase/handler/ha_innopart.cc @@ -1127,6 +1127,7 @@ int ha_innopart::open(const char *name, int mode [[maybe_unused]], ut_a(part.m_row_read_type == 0); ut_a(part.m_trx_id == 0); ut_a(part.m_blob_heap == nullptr); + ut_a(part.m_compress_heap == nullptr); ut_a(0 == part.m_new_rec_lock.count()); } #endif @@ -1235,6 +1236,7 @@ int ha_innopart::close() { } clear_ins_upd_nodes(); clear_blob_heaps(); + clear_compress_heaps(); /* Prevent double close of m_prebuilt->table. The real one was done done in m_part_share->close_table_parts(). */ @@ -1295,6 +1297,7 @@ void ha_innopart::set_partition(uint part_id) { /* For unordered scan and table scan, use blob_heap from first partition as we need exactly one blob. */ m_prebuilt->blob_heap = m_parts[m_ordered ? part_id : 0].m_blob_heap; + m_prebuilt->compress_heap = m_parts[m_ordered ? part_id : 0].m_compress_heap; #ifdef UNIV_DEBUG if (m_prebuilt->blob_heap != nullptr) { @@ -1302,6 +1305,11 @@ void ha_innopart::set_partition(uint part_id) { ("validating blob_heap: %p", m_prebuilt->blob_heap)); mem_heap_validate(m_prebuilt->blob_heap); } + if (m_prebuilt->compress_heap != nullptr) { + DBUG_PRINT("ha_innopart", + ("validating compress_heap: %p", m_prebuilt->compress_heap)); + mem_heap_validate(m_prebuilt->compress_heap); + } #endif m_prebuilt->trx_id = part.m_trx_id; @@ -1340,11 +1348,17 @@ void ha_innopart::update_partition(uint part_id) { ("validating blob_heap: %p", m_prebuilt->blob_heap)); mem_heap_validate(m_prebuilt->blob_heap); } + if (m_prebuilt->compress_heap != nullptr) { + DBUG_PRINT("ha_innopart", + ("validating compress_heap: %p", m_prebuilt->compress_heap)); + mem_heap_validate(m_prebuilt->compress_heap); + } #endif /* For unordered scan and table scan, use blob_heap from first partition as we need exactly one blob anytime. */ m_parts[m_ordered ? part_id : 0].m_blob_heap = m_prebuilt->blob_heap; + m_parts[m_ordered ? part_id : 0].m_compress_heap = m_prebuilt->compress_heap; part.m_trx_id = m_prebuilt->trx_id; part.m_row_read_type = m_prebuilt->row_read_type; @@ -4181,12 +4195,34 @@ void ha_innopart::clear_blob_heaps() { m_prebuilt->blob_heap = nullptr; } +void ha_innopart::clear_compress_heaps() { + DBUG_TRACE; + if (m_parts == nullptr) { + return; + } + + for (uint i = 0; i < m_tot_parts; i++) { + auto &part{m_parts[i]}; + if (part.m_compress_heap != nullptr) { + DBUG_PRINT("ha_innopart", + ("freeing compress_heap: %p", part.m_compress_heap)); + mem_heap_free(part.m_compress_heap); + part.m_compress_heap = nullptr; + } + } + + /* Reset compress_heap in m_prebuilt after freeing all heaps. It is set in + ha_innopart::set_partition to the compress heap of current partition. */ + m_prebuilt->compress_heap = nullptr; +} + /** Reset state of file to after 'open'. This function is called after every statement for all tables used by that statement. */ int ha_innopart::reset() { DBUG_TRACE; clear_blob_heaps(); + clear_compress_heaps(); return ha_innobase::reset(); } diff --git a/storage/innobase/handler/ha_innopart.h b/storage/innobase/handler/ha_innopart.h index 629536717dff..12f7a97d1332 100644 --- a/storage/innobase/handler/ha_innopart.h +++ b/storage/innobase/handler/ha_innopart.h @@ -639,6 +639,9 @@ class ha_innopart : public ha_innobase, /** saved m_prebuilt->blob_heap */ mem_heap_t *m_blob_heap; + /** saved prebuilt->compress_heap */ + mem_heap_t *m_compress_heap; + /** saved m_prebuilt->trx_id (which in turn reflects table->def_trx_id) */ trx_id_t m_trx_id; @@ -691,6 +694,9 @@ class ha_innopart : public ha_innobase, /** Clear the blob heaps for all partitions */ void clear_blob_heaps(); + /** Clear the compress heaps for all partitions */ + void clear_compress_heaps(); + /** Reset state of file to after 'open'. This function is called after every statement for all tables used by that statement. */ int reset() override;