diff --git a/src/test/regress/expected/failure_1pc_copy_append.out b/src/test/regress/expected/failure_1pc_copy_append.out new file mode 100644 index 00000000000..0866e503d0d --- /dev/null +++ b/src/test/regress/expected/failure_1pc_copy_append.out @@ -0,0 +1,279 @@ +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SET citus.shard_count = 1; +SET citus.shard_replication_factor = 2; -- one shard per worker +SET citus.multi_shard_commit_protocol TO '1pc'; +SET citus.next_shard_id TO 100400; +ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100; +CREATE TABLE copy_test (key int, value int); +SELECT create_distributed_table('copy_test', 'key', 'append'); + create_distributed_table +-------------------------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT count(1) FROM copy_test; + count +------- + 4 +(1 row) + +---- all of the following tests test behavior with 2 shard placements ---- +SHOW citus.shard_replication_factor; + citus.shard_replication_factor +-------------------------------- + 2 +(1 row) + +---- kill the connection when we try to create the shard ---- +SELECT citus.mitmproxy('conn.onQuery(query="worker_apply_shard_ddl_command").kill()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid +--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+------------- + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 +(2 rows) + +SELECT count(1) FROM copy_test; + count +------- + 4 +(1 row) + +---- kill the connection when we try to start a transaction ---- +SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction_id").kill()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +WARNING: connection not open +CONTEXT: while executing command on localhost:9060 +ERROR: failure on connection marked as essential: localhost:9060 +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid +--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+------------- + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 +(2 rows) + +SELECT count(1) FROM copy_test; + count +------- + 4 +(1 row) + +---- kill the connection when we start the COPY ---- +SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").kill()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid +--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+------------- + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 +(2 rows) + +SELECT count(1) FROM copy_test; + count +------- + 4 +(1 row) + +---- kill the connection when we send the data ---- +SELECT citus.mitmproxy('conn.onCopyData().kill()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +ERROR: failed to COPY to shard 100404 on localhost:9060 +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid +--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+------------- + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 +(2 rows) + +SELECT count(1) FROM copy_test; +WARNING: could not consume data from worker node + count +------- + 4 +(1 row) + +---- cancel the connection when we send the data ---- +SELECT citus.mitmproxy(format('conn.onCopyData().cancel(%s)', pg_backend_pid())); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +ERROR: canceling statement due to user request +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid +--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+------------- + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 +(2 rows) + +SELECT count(1) FROM copy_test; +ERROR: canceling statement due to user request +---- kill the connection when we try to get the size of the table ---- +SELECT citus.mitmproxy('conn.onQuery(query="pg_table_size").kill()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +WARNING: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +WARNING: connection not open +CONTEXT: while executing command on localhost:9060 +ERROR: failure on connection marked as essential: localhost:9060 +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid +--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+------------- + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 +(2 rows) + +SELECT count(1) FROM copy_test; + count +------- + 4 +(1 row) + +-- we round-robin when picking which node to run pg_table_size on, this COPY runs it on +-- the other node, so the next copy will try to run it on our node +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY p.nodeport, p.placementid; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid +--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+------------- + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 + copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 9060 | 112 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 + copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 57637 | 113 +(4 rows) + +SELECT count(1) FROM copy_test; + count +------- + 8 +(1 row) + +---- kill the connection when we try to get the min, max of the table ---- +SELECT citus.mitmproxy('conn.onQuery(query="SELECT min\(key\), max\(key\)").kill()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +WARNING: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +WARNING: connection not open +CONTEXT: while executing command on localhost:9060 +ERROR: failure on connection marked as essential: localhost:9060 +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid +--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+------------- + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 + copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 9060 | 112 + copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 57637 | 113 +(4 rows) + +SELECT count(1) FROM copy_test; + count +------- + 8 +(1 row) + +---- kill the connection when we try to COMMIT ---- +SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT").kill()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +WARNING: connection not open +CONTEXT: while executing command on localhost:9060 +WARNING: failed to commit critical transaction on localhost:9060, metadata is likely out of sync +WARNING: connection not open +CONTEXT: while executing command on localhost:9060 +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid +--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+------------- + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 + copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 9060 | 112 + copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 57637 | 113 + copy_test | 100409 | t | 0 | 3 | 100409 | 3 | 8192 | localhost | 9060 | 116 + copy_test | 100409 | t | 0 | 3 | 100409 | 1 | 8192 | localhost | 57637 | 117 +(6 rows) + +SELECT count(1) FROM copy_test; + count +------- + 12 +(1 row) + +-- ==== Clean up, we're done here ==== +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +DROP TABLE copy_test; diff --git a/src/test/regress/expected/failure_1pc_copy_hash.out b/src/test/regress/expected/failure_1pc_copy_hash.out new file mode 100644 index 00000000000..5e54cafc17a --- /dev/null +++ b/src/test/regress/expected/failure_1pc_copy_hash.out @@ -0,0 +1,384 @@ +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SET citus.shard_count = 1; +SET citus.shard_replication_factor = 2; -- one shard per worker +SET citus.multi_shard_commit_protocol TO '1pc'; +SET citus.next_shard_id TO 100400; +ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100; +CREATE TABLE copy_test (key int, value int); +SELECT create_distributed_table('copy_test', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT count(1) FROM copy_test; + count +------- + 4 +(1 row) + +-- ==== kill the connection when we try to start a transaction ==== +-- the query should abort +SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction").killall()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +WARNING: connection not open +CONTEXT: while executing command on localhost:9060 +COPY copy_test, line 1: "0, 0" +ERROR: failure on connection marked as essential: localhost:9060 +CONTEXT: COPY copy_test, line 1: "0, 0" +-- ==== kill the connection when we try to start the COPY ==== +-- the query should abort +SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").killall()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +COPY copy_test, line 1: "0, 0" +-- ==== kill the connection when we first start sending data ==== +-- the query should abort +SELECT citus.mitmproxy('conn.onCopyData().killall()'); -- raw rows from the client + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +ERROR: failed to COPY to shard 100400 on localhost:9060 +-- ==== kill the connection when the worker confirms it's received the data ==== +-- the query should abort +SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY").killall()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +ERROR: failed to COPY to shard 100400 on localhost:9060 +-- ==== kill the connection when we try to send COMMIT ==== +-- the query should succeed, and the placement should be marked inactive +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT count(1) FROM pg_dist_shard_placement WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass +) AND shardstate = 3; + count +------- + 0 +(1 row) + +SELECT count(1) FROM copy_test; + count +------- + 4 +(1 row) + +SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").killall()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +WARNING: connection not open +CONTEXT: while executing command on localhost:9060 +WARNING: failed to commit critical transaction on localhost:9060, metadata is likely out of sync +WARNING: connection not open +CONTEXT: while executing command on localhost:9060 +-- the shard is marked invalid +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT count(1) FROM pg_dist_shard_placement WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass +) AND shardstate = 3; + count +------- + 1 +(1 row) + +SELECT count(1) FROM copy_test; + count +------- + 8 +(1 row) + +-- ==== clean up a little bit before running the next test ==== +UPDATE pg_dist_shard_placement SET shardstate = 1 +WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass +); +TRUNCATE copy_test; +-- ==== try to COPY invalid data ==== +-- here the coordinator actually sends the data, but then unexpectedly closes the +-- connection when it notices the data stream is broken. Crucially, it closes the +-- connection before sending COMMIT, so no data makes it into the worker. +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9 && echo 10' WITH CSV; +ERROR: missing data for column "value" +CONTEXT: COPY copy_test, line 5: "10" +-- kill the connection if the coordinator sends COMMIT. It doesn't, so nothing changes +SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").kill()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9 && echo 10' WITH CSV; +ERROR: missing data for column "value" +CONTEXT: COPY copy_test, line 5: "10" +SELECT * FROM copy_test ORDER BY key, value; + key | value +-----+------- +(0 rows) + +-- ==== clean up some more to prepare for tests with only one replica ==== +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +TRUNCATE copy_test; +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE nodeport = :worker_1_port; +SELECT * FROM pg_dist_shard_placement WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass +) ORDER BY nodeport, placementid; + shardid | shardstate | shardlength | nodename | nodeport | placementid +---------+------------+-------------+-----------+----------+------------- + 100400 | 1 | 0 | localhost | 9060 | 100 + 100400 | 3 | 0 | localhost | 57637 | 101 +(2 rows) + +-- ==== okay, run some tests where there's only one active shard ==== +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT * FROM copy_test; + key | value +-----+------- + 0 | 0 + 1 | 1 + 2 | 4 + 3 | 9 +(4 rows) + +-- the worker is unreachable +SELECT citus.mitmproxy('conn.killall()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +WARNING: connection not open +CONTEXT: while executing command on localhost:9060 +COPY copy_test, line 1: "0, 0" +ERROR: failure on connection marked as essential: localhost:9060 +CONTEXT: COPY copy_test, line 1: "0, 0" +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM copy_test; + key | value +-----+------- + 0 | 0 + 1 | 1 + 2 | 4 + 3 | 9 +(4 rows) + +-- the first message fails +SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction_id").killall()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +WARNING: connection not open +CONTEXT: while executing command on localhost:9060 +COPY copy_test, line 1: "0, 0" +ERROR: failure on connection marked as essential: localhost:9060 +CONTEXT: COPY copy_test, line 1: "0, 0" +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM copy_test; + key | value +-----+------- + 0 | 0 + 1 | 1 + 2 | 4 + 3 | 9 +(4 rows) + +-- the COPY message fails +SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").killall()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:9060 +COPY copy_test, line 1: "0, 0" +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM copy_test; + key | value +-----+------- + 0 | 0 + 1 | 1 + 2 | 4 + 3 | 9 +(4 rows) + +-- the COPY data fails +SELECT citus.mitmproxy('conn.onCopyData().killall()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +ERROR: failed to COPY to shard 100400 on localhost:9060 +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM copy_test; + key | value +-----+------- + 0 | 0 + 1 | 1 + 2 | 4 + 3 | 9 +(4 rows) + +-- the COMMIT fails +SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").killall()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +WARNING: connection not open +CONTEXT: while executing command on localhost:9060 +WARNING: failed to commit critical transaction on localhost:9060, metadata is likely out of sync +WARNING: connection not open +CONTEXT: while executing command on localhost:9060 +WARNING: could not commit transaction for shard 100400 on any active node +ERROR: could not commit transaction on any active node +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM copy_test; + key | value +-----+------- + 0 | 0 + 1 | 1 + 2 | 4 + 3 | 9 +(4 rows) + +-- the placement is not marked invalid +SELECT * FROM pg_dist_shard_placement WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass +) ORDER BY nodeport, placementid; + shardid | shardstate | shardlength | nodename | nodeport | placementid +---------+------------+-------------+-----------+----------+------------- + 100400 | 1 | 0 | localhost | 9060 | 100 + 100400 | 3 | 0 | localhost | 57637 | 101 +(2 rows) + +-- the COMMIT makes it through but the connection dies before we get a response +SELECT citus.mitmproxy('conn.onCommandComplete(command="COMMIT").killall()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +WARNING: connection not open +CONTEXT: while executing command on localhost:9060 +WARNING: failed to commit critical transaction on localhost:9060, metadata is likely out of sync +WARNING: connection not open +CONTEXT: while executing command on localhost:9060 +WARNING: could not commit transaction for shard 100400 on any active node +ERROR: could not commit transaction on any active node +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM pg_dist_shard_placement WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass +) ORDER BY nodeport, placementid; + shardid | shardstate | shardlength | nodename | nodeport | placementid +---------+------------+-------------+-----------+----------+------------- + 100400 | 1 | 0 | localhost | 9060 | 100 + 100400 | 3 | 0 | localhost | 57637 | 101 +(2 rows) + +SELECT * FROM copy_test; + key | value +-----+------- + 0 | 0 + 1 | 1 + 2 | 4 + 3 | 9 + 0 | 0 + 1 | 1 + 2 | 4 + 3 | 9 +(8 rows) + +-- ==== Clean up, we're done here ==== +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +DROP TABLE copy_test; diff --git a/src/test/regress/failure_schedule b/src/test/regress/failure_schedule index 083a07aabb2..b84f3442b27 100644 --- a/src/test/regress/failure_schedule +++ b/src/test/regress/failure_schedule @@ -10,3 +10,6 @@ test: failure_truncate test: failure_create_index_concurrently test: failure_add_disable_node test: failure_copy_to_reference + +test: failure_1pc_copy_hash +test: failure_1pc_copy_append diff --git a/src/test/regress/sql/failure_1pc_copy_append.sql b/src/test/regress/sql/failure_1pc_copy_append.sql new file mode 100644 index 00000000000..3fefe7d55d7 --- /dev/null +++ b/src/test/regress/sql/failure_1pc_copy_append.sql @@ -0,0 +1,93 @@ +SELECT citus.mitmproxy('conn.allow()'); + +SET citus.shard_count = 1; +SET citus.shard_replication_factor = 2; -- one shard per worker +SET citus.multi_shard_commit_protocol TO '1pc'; +SET citus.next_shard_id TO 100400; +ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100; + +CREATE TABLE copy_test (key int, value int); +SELECT create_distributed_table('copy_test', 'key', 'append'); + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT count(1) FROM copy_test; + +---- all of the following tests test behavior with 2 shard placements ---- +SHOW citus.shard_replication_factor; + +---- kill the connection when we try to create the shard ---- +SELECT citus.mitmproxy('conn.onQuery(query="worker_apply_shard_ddl_command").kill()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; +SELECT count(1) FROM copy_test; + +---- kill the connection when we try to start a transaction ---- +SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction_id").kill()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; +SELECT count(1) FROM copy_test; + +---- kill the connection when we start the COPY ---- +SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").kill()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; +SELECT count(1) FROM copy_test; + +---- kill the connection when we send the data ---- +SELECT citus.mitmproxy('conn.onCopyData().kill()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; +SELECT count(1) FROM copy_test; + +---- cancel the connection when we send the data ---- +SELECT citus.mitmproxy(format('conn.onCopyData().cancel(%s)', pg_backend_pid())); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; +SELECT count(1) FROM copy_test; + +---- kill the connection when we try to get the size of the table ---- +SELECT citus.mitmproxy('conn.onQuery(query="pg_table_size").kill()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; +SELECT count(1) FROM copy_test; + +-- we round-robin when picking which node to run pg_table_size on, this COPY runs it on +-- the other node, so the next copy will try to run it on our node +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY p.nodeport, p.placementid; +SELECT count(1) FROM copy_test; + +---- kill the connection when we try to get the min, max of the table ---- +SELECT citus.mitmproxy('conn.onQuery(query="SELECT min\(key\), max\(key\)").kill()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; +SELECT count(1) FROM copy_test; + +---- kill the connection when we try to COMMIT ---- +SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT").kill()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; +SELECT count(1) FROM copy_test; + +-- ==== Clean up, we're done here ==== + +SELECT citus.mitmproxy('conn.allow()'); +DROP TABLE copy_test; diff --git a/src/test/regress/sql/failure_1pc_copy_hash.sql b/src/test/regress/sql/failure_1pc_copy_hash.sql new file mode 100644 index 00000000000..6a9efb84006 --- /dev/null +++ b/src/test/regress/sql/failure_1pc_copy_hash.sql @@ -0,0 +1,142 @@ +SELECT citus.mitmproxy('conn.allow()'); + +SET citus.shard_count = 1; +SET citus.shard_replication_factor = 2; -- one shard per worker +SET citus.multi_shard_commit_protocol TO '1pc'; +SET citus.next_shard_id TO 100400; +ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100; + +CREATE TABLE copy_test (key int, value int); +SELECT create_distributed_table('copy_test', 'key'); + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT count(1) FROM copy_test; + +-- ==== kill the connection when we try to start a transaction ==== +-- the query should abort + +SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction").killall()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; + +-- ==== kill the connection when we try to start the COPY ==== +-- the query should abort + +SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").killall()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; + +-- ==== kill the connection when we first start sending data ==== +-- the query should abort + +SELECT citus.mitmproxy('conn.onCopyData().killall()'); -- raw rows from the client +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; + +-- ==== kill the connection when the worker confirms it's received the data ==== +-- the query should abort + +SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY").killall()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; + +-- ==== kill the connection when we try to send COMMIT ==== +-- the query should succeed, and the placement should be marked inactive + +SELECT citus.mitmproxy('conn.allow()'); +SELECT count(1) FROM pg_dist_shard_placement WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass +) AND shardstate = 3; +SELECT count(1) FROM copy_test; + +SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").killall()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; + +-- the shard is marked invalid +SELECT citus.mitmproxy('conn.allow()'); +SELECT count(1) FROM pg_dist_shard_placement WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass +) AND shardstate = 3; +SELECT count(1) FROM copy_test; + +-- ==== clean up a little bit before running the next test ==== + +UPDATE pg_dist_shard_placement SET shardstate = 1 +WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass +); +TRUNCATE copy_test; + +-- ==== try to COPY invalid data ==== + +-- here the coordinator actually sends the data, but then unexpectedly closes the +-- connection when it notices the data stream is broken. Crucially, it closes the +-- connection before sending COMMIT, so no data makes it into the worker. +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9 && echo 10' WITH CSV; + +-- kill the connection if the coordinator sends COMMIT. It doesn't, so nothing changes +SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").kill()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9 && echo 10' WITH CSV; + +SELECT * FROM copy_test ORDER BY key, value; + +-- ==== clean up some more to prepare for tests with only one replica ==== + +SELECT citus.mitmproxy('conn.allow()'); + +TRUNCATE copy_test; +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE nodeport = :worker_1_port; +SELECT * FROM pg_dist_shard_placement WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass +) ORDER BY nodeport, placementid; + +-- ==== okay, run some tests where there's only one active shard ==== + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT * FROM copy_test; + +-- the worker is unreachable +SELECT citus.mitmproxy('conn.killall()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM copy_test; + +-- the first message fails +SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction_id").killall()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM copy_test; + +-- the COPY message fails +SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").killall()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM copy_test; + +-- the COPY data fails +SELECT citus.mitmproxy('conn.onCopyData().killall()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM copy_test; + +-- the COMMIT fails +SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").killall()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM copy_test; + +-- the placement is not marked invalid +SELECT * FROM pg_dist_shard_placement WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass +) ORDER BY nodeport, placementid; + +-- the COMMIT makes it through but the connection dies before we get a response +SELECT citus.mitmproxy('conn.onCommandComplete(command="COMMIT").killall()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT citus.mitmproxy('conn.allow()'); + +SELECT * FROM pg_dist_shard_placement WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass +) ORDER BY nodeport, placementid; +SELECT * FROM copy_test; + +-- ==== Clean up, we're done here ==== + +SELECT citus.mitmproxy('conn.allow()'); +DROP TABLE copy_test;