From 173c443860d9917930d6855337f2ed61d9cbbf4b Mon Sep 17 00:00:00 2001 From: Brian Cloutier Date: Thu, 28 Jun 2018 19:06:18 -0700 Subject: [PATCH 1/2] Add tests for 1PC COPY on append and hash-distributed tables --- .../expected/failure_1pc_copy_append.out | 282 +++++++++++++ .../expected/failure_1pc_copy_hash.out | 384 ++++++++++++++++++ src/test/regress/failure_schedule | 3 + .../regress/sql/failure_1pc_copy_append.sql | 93 +++++ .../regress/sql/failure_1pc_copy_hash.sql | 142 +++++++ 5 files changed, 904 insertions(+) create mode 100644 src/test/regress/expected/failure_1pc_copy_append.out create mode 100644 src/test/regress/expected/failure_1pc_copy_hash.out create mode 100644 src/test/regress/sql/failure_1pc_copy_append.sql create mode 100644 src/test/regress/sql/failure_1pc_copy_hash.sql diff --git a/src/test/regress/expected/failure_1pc_copy_append.out b/src/test/regress/expected/failure_1pc_copy_append.out new file mode 100644 index 00000000000..b0aad4a71f9 --- /dev/null +++ b/src/test/regress/expected/failure_1pc_copy_append.out @@ -0,0 +1,282 @@ +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SET citus.shard_count = 1; +SET citus.shard_replication_factor = 2; -- one shard per worker +SET citus.multi_shard_commit_protocol TO '1pc'; +SET citus.next_shard_id TO 100400; +ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100; +CREATE TABLE copy_test (key int, value int); +SELECT create_distributed_table('copy_test', 'key', 'append'); + create_distributed_table +-------------------------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT count(1) FROM copy_test; + count +------- + 4 +(1 row) + +---- all of the following tests test behavior with 2 shard placements ---- +SHOW citus.shard_replication_factor; + citus.shard_replication_factor +-------------------------------- + 2 +(1 row) + +---- kill the connection when we try to create the shard ---- +SELECT citus.mitmproxy('conn.onQuery(query="worker_apply_shard_ddl_command").kill()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:57640 +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid +--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+------------- + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57640 | 101 +(2 rows) + +SELECT count(1) FROM copy_test; + count +------- + 4 +(1 row) + +---- kill the connection when we try to start a transaction ---- +SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction_id").kill()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +WARNING: connection not open +CONTEXT: while executing command on localhost:57640 +ERROR: failure on connection marked as essential: localhost:57640 +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid +--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+------------- + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57640 | 101 +(2 rows) + +SELECT count(1) FROM copy_test; + count +------- + 4 +(1 row) + +---- kill the connection when we start the COPY ---- +SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").kill()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:57640 +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid +--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+------------- + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57640 | 101 +(2 rows) + +SELECT count(1) FROM copy_test; + count +------- + 4 +(1 row) + +---- kill the connection when we send the data ---- +SELECT citus.mitmproxy('conn.onCopyData().kill()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +ERROR: failed to COPY to shard 100404 on localhost:57640 +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid +--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+------------- + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57640 | 101 +(2 rows) + +SELECT count(1) FROM copy_test; + count +------- + 4 +(1 row) + +---- cancel the connection when we send the data ---- +SELECT citus.mitmproxy(format('conn.onCopyData().cancel(%s)', pg_backend_pid())); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +ERROR: canceling statement due to user request +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid +--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+------------- + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57640 | 101 +(2 rows) + +SELECT count(1) FROM copy_test; + count +------- + 4 +(1 row) + +---- kill the connection when we try to get the size of the table ---- +SELECT citus.mitmproxy('conn.onQuery(query="pg_table_size").kill()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +WARNING: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:57640 +WARNING: connection not open +CONTEXT: while executing command on localhost:57640 +ERROR: failure on connection marked as essential: localhost:57640 +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid +--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+------------- + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57640 | 101 +(2 rows) + +SELECT count(1) FROM copy_test; + count +------- + 4 +(1 row) + +-- we round-robin when picking which node to run pg_table_size on, this COPY runs it on +-- the other node, so the next copy will try to run it on our node +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY p.nodeport, p.placementid; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid +--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+------------- + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 + copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 57637 | 113 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57640 | 101 + copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 57640 | 112 +(4 rows) + +SELECT count(1) FROM copy_test; + count +------- + 8 +(1 row) + +---- kill the connection when we try to get the min, max of the table ---- +SELECT citus.mitmproxy('conn.onQuery(query="SELECT min\(key\), max\(key\)").kill()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +WARNING: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:57640 +WARNING: connection not open +CONTEXT: while executing command on localhost:57640 +ERROR: failure on connection marked as essential: localhost:57640 +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid +--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+------------- + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57640 | 101 + copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 57640 | 112 + copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 57637 | 113 +(4 rows) + +SELECT count(1) FROM copy_test; + count +------- + 8 +(1 row) + +---- kill the connection when we try to COMMIT ---- +SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT").kill()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +WARNING: connection not open +CONTEXT: while executing command on localhost:57640 +WARNING: failed to commit critical transaction on localhost:57640, metadata is likely out of sync +WARNING: connection not open +CONTEXT: while executing command on localhost:57640 +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid +--------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+------------- + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57640 | 101 + copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 57640 | 112 + copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 57637 | 113 + copy_test | 100409 | t | 0 | 3 | 100409 | 3 | 8192 | localhost | 57640 | 116 + copy_test | 100409 | t | 0 | 3 | 100409 | 1 | 8192 | localhost | 57637 | 117 +(6 rows) + +SELECT count(1) FROM copy_test; + count +------- + 12 +(1 row) + +-- ==== Clean up, we're done here ==== +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +DROP TABLE copy_test; diff --git a/src/test/regress/expected/failure_1pc_copy_hash.out b/src/test/regress/expected/failure_1pc_copy_hash.out new file mode 100644 index 00000000000..066366abec5 --- /dev/null +++ b/src/test/regress/expected/failure_1pc_copy_hash.out @@ -0,0 +1,384 @@ +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SET citus.shard_count = 1; +SET citus.shard_replication_factor = 2; -- one shard per worker +SET citus.multi_shard_commit_protocol TO '1pc'; +SET citus.next_shard_id TO 100400; +ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100; +CREATE TABLE copy_test (key int, value int); +SELECT create_distributed_table('copy_test', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT count(1) FROM copy_test; + count +------- + 4 +(1 row) + +-- ==== kill the connection when we try to start a transaction ==== +-- the query should abort +SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction").killall()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +WARNING: connection not open +CONTEXT: while executing command on localhost:57640 +COPY copy_test, line 1: "0, 0" +ERROR: failure on connection marked as essential: localhost:57640 +CONTEXT: COPY copy_test, line 1: "0, 0" +-- ==== kill the connection when we try to start the COPY ==== +-- the query should abort +SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").killall()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:57640 +COPY copy_test, line 1: "0, 0" +-- ==== kill the connection when we first start sending data ==== +-- the query should abort +SELECT citus.mitmproxy('conn.onCopyData().killall()'); -- raw rows from the client + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +ERROR: failed to COPY to shard 100400 on localhost:57640 +-- ==== kill the connection when the worker confirms it's received the data ==== +-- the query should abort +SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY").killall()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +ERROR: failed to COPY to shard 100400 on localhost:57640 +-- ==== kill the connection when we try to send COMMIT ==== +-- the query should succeed, and the placement should be marked inactive +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT count(1) FROM pg_dist_shard_placement WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass +) AND shardstate = 3; + count +------- + 0 +(1 row) + +SELECT count(1) FROM copy_test; + count +------- + 4 +(1 row) + +SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").killall()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +WARNING: connection not open +CONTEXT: while executing command on localhost:57640 +WARNING: failed to commit critical transaction on localhost:57640, metadata is likely out of sync +WARNING: connection not open +CONTEXT: while executing command on localhost:57640 +-- the shard is marked invalid +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT count(1) FROM pg_dist_shard_placement WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass +) AND shardstate = 3; + count +------- + 1 +(1 row) + +SELECT count(1) FROM copy_test; + count +------- + 8 +(1 row) + +-- ==== clean up a little bit before running the next test ==== +UPDATE pg_dist_shard_placement SET shardstate = 1 +WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass +); +TRUNCATE copy_test; +-- ==== try to COPY invalid data ==== +-- here the coordinator actually sends the data, but then unexpectedly closes the +-- connection when it notices the data stream is broken. Crucially, it closes the +-- connection before sending COMMIT, so no data makes it into the worker. +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9 && echo 10' WITH CSV; +ERROR: missing data for column "value" +CONTEXT: COPY copy_test, line 5: "10" +-- kill the connection if the coordinator sends COMMIT. It doesn't, so nothing changes +SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").kill()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9 && echo 10' WITH CSV; +ERROR: missing data for column "value" +CONTEXT: COPY copy_test, line 5: "10" +SELECT * FROM copy_test ORDER BY key, value; + key | value +-----+------- +(0 rows) + +-- ==== clean up some more to prepare for tests with only one replica ==== +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +TRUNCATE copy_test; +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE nodeport = :worker_1_port; +SELECT * FROM pg_dist_shard_placement WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass +) ORDER BY nodeport, placementid; + shardid | shardstate | shardlength | nodename | nodeport | placementid +---------+------------+-------------+-----------+----------+------------- + 100400 | 3 | 0 | localhost | 57637 | 100 + 100400 | 1 | 0 | localhost | 57640 | 101 +(2 rows) + +-- ==== okay, run some tests where there's only one active shard ==== +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT * FROM copy_test; + key | value +-----+------- + 0 | 0 + 1 | 1 + 2 | 4 + 3 | 9 +(4 rows) + +-- the worker is unreachable +SELECT citus.mitmproxy('conn.killall()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +WARNING: connection not open +CONTEXT: while executing command on localhost:57640 +COPY copy_test, line 1: "0, 0" +ERROR: failure on connection marked as essential: localhost:57640 +CONTEXT: COPY copy_test, line 1: "0, 0" +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM copy_test; + key | value +-----+------- + 0 | 0 + 1 | 1 + 2 | 4 + 3 | 9 +(4 rows) + +-- the first message fails +SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction_id").killall()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +WARNING: connection not open +CONTEXT: while executing command on localhost:57640 +COPY copy_test, line 1: "0, 0" +ERROR: failure on connection marked as essential: localhost:57640 +CONTEXT: COPY copy_test, line 1: "0, 0" +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM copy_test; + key | value +-----+------- + 0 | 0 + 1 | 1 + 2 | 4 + 3 | 9 +(4 rows) + +-- the COPY message fails +SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").killall()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +ERROR: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +CONTEXT: while executing command on localhost:57640 +COPY copy_test, line 1: "0, 0" +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM copy_test; + key | value +-----+------- + 0 | 0 + 1 | 1 + 2 | 4 + 3 | 9 +(4 rows) + +-- the COPY data fails +SELECT citus.mitmproxy('conn.onCopyData().killall()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +ERROR: failed to COPY to shard 100400 on localhost:57640 +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM copy_test; + key | value +-----+------- + 0 | 0 + 1 | 1 + 2 | 4 + 3 | 9 +(4 rows) + +-- the COMMIT fails +SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").killall()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +WARNING: connection not open +CONTEXT: while executing command on localhost:57640 +WARNING: failed to commit critical transaction on localhost:57640, metadata is likely out of sync +WARNING: connection not open +CONTEXT: while executing command on localhost:57640 +WARNING: could not commit transaction for shard 100400 on any active node +ERROR: could not commit transaction on any active node +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM copy_test; + key | value +-----+------- + 0 | 0 + 1 | 1 + 2 | 4 + 3 | 9 +(4 rows) + +-- the placement is not marked invalid +SELECT * FROM pg_dist_shard_placement WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass +) ORDER BY nodeport, placementid; + shardid | shardstate | shardlength | nodename | nodeport | placementid +---------+------------+-------------+-----------+----------+------------- + 100400 | 3 | 0 | localhost | 57637 | 100 + 100400 | 1 | 0 | localhost | 57640 | 101 +(2 rows) + +-- the COMMIT makes it through but the connection dies before we get a response +SELECT citus.mitmproxy('conn.onCommandComplete(command="COMMIT").killall()'); + mitmproxy +----------- + +(1 row) + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +WARNING: connection not open +CONTEXT: while executing command on localhost:57640 +WARNING: failed to commit critical transaction on localhost:57640, metadata is likely out of sync +WARNING: connection not open +CONTEXT: while executing command on localhost:57640 +WARNING: could not commit transaction for shard 100400 on any active node +ERROR: could not commit transaction on any active node +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +SELECT * FROM pg_dist_shard_placement WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass +) ORDER BY nodeport, placementid; + shardid | shardstate | shardlength | nodename | nodeport | placementid +---------+------------+-------------+-----------+----------+------------- + 100400 | 3 | 0 | localhost | 57637 | 100 + 100400 | 1 | 0 | localhost | 57640 | 101 +(2 rows) + +SELECT * FROM copy_test; + key | value +-----+------- + 0 | 0 + 1 | 1 + 2 | 4 + 3 | 9 + 0 | 0 + 1 | 1 + 2 | 4 + 3 | 9 +(8 rows) + +-- ==== Clean up, we're done here ==== +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +DROP TABLE copy_test; diff --git a/src/test/regress/failure_schedule b/src/test/regress/failure_schedule index 083a07aabb2..b84f3442b27 100644 --- a/src/test/regress/failure_schedule +++ b/src/test/regress/failure_schedule @@ -10,3 +10,6 @@ test: failure_truncate test: failure_create_index_concurrently test: failure_add_disable_node test: failure_copy_to_reference + +test: failure_1pc_copy_hash +test: failure_1pc_copy_append diff --git a/src/test/regress/sql/failure_1pc_copy_append.sql b/src/test/regress/sql/failure_1pc_copy_append.sql new file mode 100644 index 00000000000..3fefe7d55d7 --- /dev/null +++ b/src/test/regress/sql/failure_1pc_copy_append.sql @@ -0,0 +1,93 @@ +SELECT citus.mitmproxy('conn.allow()'); + +SET citus.shard_count = 1; +SET citus.shard_replication_factor = 2; -- one shard per worker +SET citus.multi_shard_commit_protocol TO '1pc'; +SET citus.next_shard_id TO 100400; +ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100; + +CREATE TABLE copy_test (key int, value int); +SELECT create_distributed_table('copy_test', 'key', 'append'); + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT count(1) FROM copy_test; + +---- all of the following tests test behavior with 2 shard placements ---- +SHOW citus.shard_replication_factor; + +---- kill the connection when we try to create the shard ---- +SELECT citus.mitmproxy('conn.onQuery(query="worker_apply_shard_ddl_command").kill()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; +SELECT count(1) FROM copy_test; + +---- kill the connection when we try to start a transaction ---- +SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction_id").kill()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; +SELECT count(1) FROM copy_test; + +---- kill the connection when we start the COPY ---- +SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").kill()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; +SELECT count(1) FROM copy_test; + +---- kill the connection when we send the data ---- +SELECT citus.mitmproxy('conn.onCopyData().kill()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; +SELECT count(1) FROM copy_test; + +---- cancel the connection when we send the data ---- +SELECT citus.mitmproxy(format('conn.onCopyData().cancel(%s)', pg_backend_pid())); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; +SELECT count(1) FROM copy_test; + +---- kill the connection when we try to get the size of the table ---- +SELECT citus.mitmproxy('conn.onQuery(query="pg_table_size").kill()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; +SELECT count(1) FROM copy_test; + +-- we round-robin when picking which node to run pg_table_size on, this COPY runs it on +-- the other node, so the next copy will try to run it on our node +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY p.nodeport, p.placementid; +SELECT count(1) FROM copy_test; + +---- kill the connection when we try to get the min, max of the table ---- +SELECT citus.mitmproxy('conn.onQuery(query="SELECT min\(key\), max\(key\)").kill()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; +SELECT count(1) FROM copy_test; + +---- kill the connection when we try to COMMIT ---- +SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT").kill()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p + WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass + ORDER BY placementid; +SELECT count(1) FROM copy_test; + +-- ==== Clean up, we're done here ==== + +SELECT citus.mitmproxy('conn.allow()'); +DROP TABLE copy_test; diff --git a/src/test/regress/sql/failure_1pc_copy_hash.sql b/src/test/regress/sql/failure_1pc_copy_hash.sql new file mode 100644 index 00000000000..6a9efb84006 --- /dev/null +++ b/src/test/regress/sql/failure_1pc_copy_hash.sql @@ -0,0 +1,142 @@ +SELECT citus.mitmproxy('conn.allow()'); + +SET citus.shard_count = 1; +SET citus.shard_replication_factor = 2; -- one shard per worker +SET citus.multi_shard_commit_protocol TO '1pc'; +SET citus.next_shard_id TO 100400; +ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 100; + +CREATE TABLE copy_test (key int, value int); +SELECT create_distributed_table('copy_test', 'key'); + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT count(1) FROM copy_test; + +-- ==== kill the connection when we try to start a transaction ==== +-- the query should abort + +SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction").killall()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; + +-- ==== kill the connection when we try to start the COPY ==== +-- the query should abort + +SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").killall()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; + +-- ==== kill the connection when we first start sending data ==== +-- the query should abort + +SELECT citus.mitmproxy('conn.onCopyData().killall()'); -- raw rows from the client +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; + +-- ==== kill the connection when the worker confirms it's received the data ==== +-- the query should abort + +SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY").killall()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; + +-- ==== kill the connection when we try to send COMMIT ==== +-- the query should succeed, and the placement should be marked inactive + +SELECT citus.mitmproxy('conn.allow()'); +SELECT count(1) FROM pg_dist_shard_placement WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass +) AND shardstate = 3; +SELECT count(1) FROM copy_test; + +SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").killall()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; + +-- the shard is marked invalid +SELECT citus.mitmproxy('conn.allow()'); +SELECT count(1) FROM pg_dist_shard_placement WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass +) AND shardstate = 3; +SELECT count(1) FROM copy_test; + +-- ==== clean up a little bit before running the next test ==== + +UPDATE pg_dist_shard_placement SET shardstate = 1 +WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass +); +TRUNCATE copy_test; + +-- ==== try to COPY invalid data ==== + +-- here the coordinator actually sends the data, but then unexpectedly closes the +-- connection when it notices the data stream is broken. Crucially, it closes the +-- connection before sending COMMIT, so no data makes it into the worker. +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9 && echo 10' WITH CSV; + +-- kill the connection if the coordinator sends COMMIT. It doesn't, so nothing changes +SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").kill()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9 && echo 10' WITH CSV; + +SELECT * FROM copy_test ORDER BY key, value; + +-- ==== clean up some more to prepare for tests with only one replica ==== + +SELECT citus.mitmproxy('conn.allow()'); + +TRUNCATE copy_test; +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE nodeport = :worker_1_port; +SELECT * FROM pg_dist_shard_placement WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass +) ORDER BY nodeport, placementid; + +-- ==== okay, run some tests where there's only one active shard ==== + +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT * FROM copy_test; + +-- the worker is unreachable +SELECT citus.mitmproxy('conn.killall()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM copy_test; + +-- the first message fails +SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction_id").killall()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM copy_test; + +-- the COPY message fails +SELECT citus.mitmproxy('conn.onQuery(query="FROM STDIN WITH").killall()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM copy_test; + +-- the COPY data fails +SELECT citus.mitmproxy('conn.onCopyData().killall()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM copy_test; + +-- the COMMIT fails +SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").killall()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT citus.mitmproxy('conn.allow()'); +SELECT * FROM copy_test; + +-- the placement is not marked invalid +SELECT * FROM pg_dist_shard_placement WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass +) ORDER BY nodeport, placementid; + +-- the COMMIT makes it through but the connection dies before we get a response +SELECT citus.mitmproxy('conn.onCommandComplete(command="COMMIT").killall()'); +COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; +SELECT citus.mitmproxy('conn.allow()'); + +SELECT * FROM pg_dist_shard_placement WHERE shardid IN ( + SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'copy_test'::regclass +) ORDER BY nodeport, placementid; +SELECT * FROM copy_test; + +-- ==== Clean up, we're done here ==== + +SELECT citus.mitmproxy('conn.allow()'); +DROP TABLE copy_test; From ea485ab40e3d72e7e3ddfb849c48d9af212762f1 Mon Sep 17 00:00:00 2001 From: Brian Cloutier Date: Tue, 31 Jul 2018 14:57:22 -0700 Subject: [PATCH 2/2] Update tests for our updated $mitmPort --- .../expected/failure_1pc_copy_append.out | 61 +++++++++---------- .../expected/failure_1pc_copy_hash.out | 52 ++++++++-------- 2 files changed, 55 insertions(+), 58 deletions(-) diff --git a/src/test/regress/expected/failure_1pc_copy_append.out b/src/test/regress/expected/failure_1pc_copy_append.out index b0aad4a71f9..0866e503d0d 100644 --- a/src/test/regress/expected/failure_1pc_copy_append.out +++ b/src/test/regress/expected/failure_1pc_copy_append.out @@ -41,14 +41,14 @@ COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' W ERROR: server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. -CONTEXT: while executing command on localhost:57640 +CONTEXT: while executing command on localhost:9060 SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass ORDER BY placementid; logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid --------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+------------- copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 - copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57640 | 101 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 (2 rows) SELECT count(1) FROM copy_test; @@ -66,15 +66,15 @@ SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction_id"). COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; WARNING: connection not open -CONTEXT: while executing command on localhost:57640 -ERROR: failure on connection marked as essential: localhost:57640 +CONTEXT: while executing command on localhost:9060 +ERROR: failure on connection marked as essential: localhost:9060 SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass ORDER BY placementid; logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid --------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+------------- copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 - copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57640 | 101 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 (2 rows) SELECT count(1) FROM copy_test; @@ -94,14 +94,14 @@ COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' W ERROR: server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. -CONTEXT: while executing command on localhost:57640 +CONTEXT: while executing command on localhost:9060 SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass ORDER BY placementid; logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid --------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+------------- copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 - copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57640 | 101 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 (2 rows) SELECT count(1) FROM copy_test; @@ -118,17 +118,18 @@ SELECT citus.mitmproxy('conn.onCopyData().kill()'); (1 row) COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -ERROR: failed to COPY to shard 100404 on localhost:57640 +ERROR: failed to COPY to shard 100404 on localhost:9060 SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass ORDER BY placementid; logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid --------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+------------- copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 - copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57640 | 101 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 (2 rows) SELECT count(1) FROM copy_test; +WARNING: could not consume data from worker node count ------- 4 @@ -149,15 +150,11 @@ SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid --------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+------------- copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 - copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57640 | 101 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 (2 rows) SELECT count(1) FROM copy_test; - count -------- - 4 -(1 row) - +ERROR: canceling statement due to user request ---- kill the connection when we try to get the size of the table ---- SELECT citus.mitmproxy('conn.onQuery(query="pg_table_size").kill()'); mitmproxy @@ -169,17 +166,17 @@ COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' W WARNING: server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. -CONTEXT: while executing command on localhost:57640 +CONTEXT: while executing command on localhost:9060 WARNING: connection not open -CONTEXT: while executing command on localhost:57640 -ERROR: failure on connection marked as essential: localhost:57640 +CONTEXT: while executing command on localhost:9060 +ERROR: failure on connection marked as essential: localhost:9060 SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass ORDER BY placementid; logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid --------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+------------- copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 - copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57640 | 101 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 (2 rows) SELECT count(1) FROM copy_test; @@ -196,10 +193,10 @@ SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p ORDER BY p.nodeport, p.placementid; logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid --------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+------------- + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 + copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 9060 | 112 copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 57637 | 113 - copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57640 | 101 - copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 57640 | 112 (4 rows) SELECT count(1) FROM copy_test; @@ -219,18 +216,18 @@ COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' W WARNING: server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. -CONTEXT: while executing command on localhost:57640 +CONTEXT: while executing command on localhost:9060 WARNING: connection not open -CONTEXT: while executing command on localhost:57640 -ERROR: failure on connection marked as essential: localhost:57640 +CONTEXT: while executing command on localhost:9060 +ERROR: failure on connection marked as essential: localhost:9060 SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass ORDER BY placementid; logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid --------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+------------- copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 - copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57640 | 101 - copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 57640 | 112 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 + copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 9060 | 112 copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 57637 | 113 (4 rows) @@ -249,20 +246,20 @@ SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT").kill()'); COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; WARNING: connection not open -CONTEXT: while executing command on localhost:57640 -WARNING: failed to commit critical transaction on localhost:57640, metadata is likely out of sync +CONTEXT: while executing command on localhost:9060 +WARNING: failed to commit critical transaction on localhost:9060, metadata is likely out of sync WARNING: connection not open -CONTEXT: while executing command on localhost:57640 +CONTEXT: while executing command on localhost:9060 SELECT * FROM pg_dist_shard s, pg_dist_shard_placement p WHERE (s.shardid = p.shardid) AND s.logicalrelid = 'copy_test'::regclass ORDER BY placementid; logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue | shardid | shardstate | shardlength | nodename | nodeport | placementid --------------+---------+--------------+---------------+---------------+---------+------------+-------------+-----------+----------+------------- copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57637 | 100 - copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 57640 | 101 - copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 57640 | 112 + copy_test | 100400 | t | 0 | 3 | 100400 | 1 | 8192 | localhost | 9060 | 101 + copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 9060 | 112 copy_test | 100407 | t | 0 | 3 | 100407 | 1 | 8192 | localhost | 57637 | 113 - copy_test | 100409 | t | 0 | 3 | 100409 | 3 | 8192 | localhost | 57640 | 116 + copy_test | 100409 | t | 0 | 3 | 100409 | 3 | 8192 | localhost | 9060 | 116 copy_test | 100409 | t | 0 | 3 | 100409 | 1 | 8192 | localhost | 57637 | 117 (6 rows) diff --git a/src/test/regress/expected/failure_1pc_copy_hash.out b/src/test/regress/expected/failure_1pc_copy_hash.out index 066366abec5..5e54cafc17a 100644 --- a/src/test/regress/expected/failure_1pc_copy_hash.out +++ b/src/test/regress/expected/failure_1pc_copy_hash.out @@ -33,9 +33,9 @@ SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction").kil COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; WARNING: connection not open -CONTEXT: while executing command on localhost:57640 +CONTEXT: while executing command on localhost:9060 COPY copy_test, line 1: "0, 0" -ERROR: failure on connection marked as essential: localhost:57640 +ERROR: failure on connection marked as essential: localhost:9060 CONTEXT: COPY copy_test, line 1: "0, 0" -- ==== kill the connection when we try to start the COPY ==== -- the query should abort @@ -49,7 +49,7 @@ COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' W ERROR: server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. -CONTEXT: while executing command on localhost:57640 +CONTEXT: while executing command on localhost:9060 COPY copy_test, line 1: "0, 0" -- ==== kill the connection when we first start sending data ==== -- the query should abort @@ -60,7 +60,7 @@ SELECT citus.mitmproxy('conn.onCopyData().killall()'); -- raw rows from the clie (1 row) COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -ERROR: failed to COPY to shard 100400 on localhost:57640 +ERROR: failed to COPY to shard 100400 on localhost:9060 -- ==== kill the connection when the worker confirms it's received the data ==== -- the query should abort SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY").killall()'); @@ -70,7 +70,7 @@ SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY").killall()'); (1 row) COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -ERROR: failed to COPY to shard 100400 on localhost:57640 +ERROR: failed to COPY to shard 100400 on localhost:9060 -- ==== kill the connection when we try to send COMMIT ==== -- the query should succeed, and the placement should be marked inactive SELECT citus.mitmproxy('conn.allow()'); @@ -101,10 +101,10 @@ SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").killall()'); COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; WARNING: connection not open -CONTEXT: while executing command on localhost:57640 -WARNING: failed to commit critical transaction on localhost:57640, metadata is likely out of sync +CONTEXT: while executing command on localhost:9060 +WARNING: failed to commit critical transaction on localhost:9060, metadata is likely out of sync WARNING: connection not open -CONTEXT: while executing command on localhost:57640 +CONTEXT: while executing command on localhost:9060 -- the shard is marked invalid SELECT citus.mitmproxy('conn.allow()'); mitmproxy @@ -168,8 +168,8 @@ SELECT * FROM pg_dist_shard_placement WHERE shardid IN ( ) ORDER BY nodeport, placementid; shardid | shardstate | shardlength | nodename | nodeport | placementid ---------+------------+-------------+-----------+----------+------------- - 100400 | 3 | 0 | localhost | 57637 | 100 - 100400 | 1 | 0 | localhost | 57640 | 101 + 100400 | 1 | 0 | localhost | 9060 | 100 + 100400 | 3 | 0 | localhost | 57637 | 101 (2 rows) -- ==== okay, run some tests where there's only one active shard ==== @@ -192,9 +192,9 @@ SELECT citus.mitmproxy('conn.killall()'); COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; WARNING: connection not open -CONTEXT: while executing command on localhost:57640 +CONTEXT: while executing command on localhost:9060 COPY copy_test, line 1: "0, 0" -ERROR: failure on connection marked as essential: localhost:57640 +ERROR: failure on connection marked as essential: localhost:9060 CONTEXT: COPY copy_test, line 1: "0, 0" SELECT citus.mitmproxy('conn.allow()'); mitmproxy @@ -220,9 +220,9 @@ SELECT citus.mitmproxy('conn.onQuery(query="assign_distributed_transaction_id"). COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; WARNING: connection not open -CONTEXT: while executing command on localhost:57640 +CONTEXT: while executing command on localhost:9060 COPY copy_test, line 1: "0, 0" -ERROR: failure on connection marked as essential: localhost:57640 +ERROR: failure on connection marked as essential: localhost:9060 CONTEXT: COPY copy_test, line 1: "0, 0" SELECT citus.mitmproxy('conn.allow()'); mitmproxy @@ -250,7 +250,7 @@ COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' W ERROR: server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. -CONTEXT: while executing command on localhost:57640 +CONTEXT: while executing command on localhost:9060 COPY copy_test, line 1: "0, 0" SELECT citus.mitmproxy('conn.allow()'); mitmproxy @@ -275,7 +275,7 @@ SELECT citus.mitmproxy('conn.onCopyData().killall()'); (1 row) COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; -ERROR: failed to COPY to shard 100400 on localhost:57640 +ERROR: failed to COPY to shard 100400 on localhost:9060 SELECT citus.mitmproxy('conn.allow()'); mitmproxy ----------- @@ -300,10 +300,10 @@ SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT$").killall()'); COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; WARNING: connection not open -CONTEXT: while executing command on localhost:57640 -WARNING: failed to commit critical transaction on localhost:57640, metadata is likely out of sync +CONTEXT: while executing command on localhost:9060 +WARNING: failed to commit critical transaction on localhost:9060, metadata is likely out of sync WARNING: connection not open -CONTEXT: while executing command on localhost:57640 +CONTEXT: while executing command on localhost:9060 WARNING: could not commit transaction for shard 100400 on any active node ERROR: could not commit transaction on any active node SELECT citus.mitmproxy('conn.allow()'); @@ -327,8 +327,8 @@ SELECT * FROM pg_dist_shard_placement WHERE shardid IN ( ) ORDER BY nodeport, placementid; shardid | shardstate | shardlength | nodename | nodeport | placementid ---------+------------+-------------+-----------+----------+------------- - 100400 | 3 | 0 | localhost | 57637 | 100 - 100400 | 1 | 0 | localhost | 57640 | 101 + 100400 | 1 | 0 | localhost | 9060 | 100 + 100400 | 3 | 0 | localhost | 57637 | 101 (2 rows) -- the COMMIT makes it through but the connection dies before we get a response @@ -340,10 +340,10 @@ SELECT citus.mitmproxy('conn.onCommandComplete(command="COMMIT").killall()'); COPY copy_test FROM PROGRAM 'echo 0, 0 && echo 1, 1 && echo 2, 4 && echo 3, 9' WITH CSV; WARNING: connection not open -CONTEXT: while executing command on localhost:57640 -WARNING: failed to commit critical transaction on localhost:57640, metadata is likely out of sync +CONTEXT: while executing command on localhost:9060 +WARNING: failed to commit critical transaction on localhost:9060, metadata is likely out of sync WARNING: connection not open -CONTEXT: while executing command on localhost:57640 +CONTEXT: while executing command on localhost:9060 WARNING: could not commit transaction for shard 100400 on any active node ERROR: could not commit transaction on any active node SELECT citus.mitmproxy('conn.allow()'); @@ -357,8 +357,8 @@ SELECT * FROM pg_dist_shard_placement WHERE shardid IN ( ) ORDER BY nodeport, placementid; shardid | shardstate | shardlength | nodename | nodeport | placementid ---------+------------+-------------+-----------+----------+------------- - 100400 | 3 | 0 | localhost | 57637 | 100 - 100400 | 1 | 0 | localhost | 57640 | 101 + 100400 | 1 | 0 | localhost | 9060 | 100 + 100400 | 3 | 0 | localhost | 57637 | 101 (2 rows) SELECT * FROM copy_test;