Skip to content

Commit

Permalink
Prototype proxy-based failure testing
Browse files Browse the repository at this point in the history
- Add an example test
- Implement a simple fluent API for configuring the mitmproxy
- Bump the citus version
- Add make check-failure target
- Tell travis to install and run mitmproxy
  • Loading branch information
lithp committed May 2, 2018
1 parent 7fd4383 commit fedadb2
Show file tree
Hide file tree
Showing 15 changed files with 667 additions and 6 deletions.
12 changes: 11 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
sudo: required
dist: trusty
language: c
python:
- "3.5"
cache:
apt: true
directories:
Expand All @@ -27,10 +29,18 @@ before_install:
- setup_apt
- curl https://install.citusdata.com/community/deb.sh | sudo bash
- nuke_pg
- pyenv versions
- pyenv global 3.6
- sudo apt-get install python3-pip
- sudo pip3 install --upgrade pip
- python --version
- python3 --version
install:
- install_uncrustify
- install_pg
- install_custom_pg
- pip3 install --user mitmproxy==3.0.4
- mitmproxy --version
# download and install HLL manually, as custom builds won't satisfy deps
# only install if performing non-11 build
- |
Expand All @@ -39,7 +49,7 @@ install:
sudo dpkg --force-confold --force-confdef --force-all -i *hll*.deb
fi
before_script: citus_indent --quiet --check
script: CFLAGS=-Werror pg_travis_multi_test check
script: CFLAGS=-Werror pg_travis_multi_test check-failure
after_success:
- sync_to_enterprise
- bash <(curl -s https://codecov.io/bash)
2 changes: 1 addition & 1 deletion citus.control
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '7.4-1'
default_version = '7.4-3'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog
6 changes: 4 additions & 2 deletions src/backend/distributed/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
6.2-1 6.2-2 6.2-3 6.2-4 \
7.0-1 7.0-2 7.0-3 7.0-4 7.0-5 7.0-6 7.0-7 7.0-8 7.0-9 7.0-10 7.0-11 7.0-12 7.0-13 7.0-14 7.0-15 \
7.1-1 7.1-2 7.1-3 7.1-4 \
7.2-1 7.2-2 7.2-3 \
7.2-1 7.2-2 7.2-3 \
7.3-1 7.3-2 7.3-3 \
7.4-1 7.4-2 7.4-3
7.4-1 7.4-2 7.4-3 7.4-4

# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
Expand Down Expand Up @@ -199,6 +199,8 @@ $(EXTENSION)--7.4-2.sql: $(EXTENSION)--7.4-1.sql $(EXTENSION)--7.4-1--7.4-2.sql
cat $^ > $@
$(EXTENSION)--7.4-3.sql: $(EXTENSION)--7.4-2.sql $(EXTENSION)--7.4-2--7.4-3.sql
cat $^ > $@
$(EXTENSION)--7.4-4.sql: $(EXTENSION)--7.4-3.sql $(EXTENSION)--7.4-3--7.4-4.sql
cat $^ > $@

NO_PGXS = 1

Expand Down
24 changes: 24 additions & 0 deletions src/backend/distributed/citus--7.4-3--7.4-4.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/* citus--7.4-3--7.4-4 */

CREATE FUNCTION citus.mitmproxy(text) RETURNS text AS $$
DECLARE
command ALIAS FOR $1;
result text;
BEGIN
CREATE TEMPORARY TABLE mitmproxy_command (command text);
CREATE TEMPORARY TABLE mitmproxy_result (res text);

INSERT INTO mitmproxy_command VALUES (command);

EXECUTE format('COPY mitmproxy_command TO %L', current_setting('citus.mitmfifo'));
EXECUTE format('COPY mitmproxy_result FROM %L', current_setting('citus.mitmfifo'));

SELECT res INTO result FROM mitmproxy_result;

DROP TABLE mitmproxy_command;
DROP TABLE mitmproxy_result;

RETURN result;
END;
$$ LANGUAGE plpgsql;

2 changes: 1 addition & 1 deletion src/backend/distributed/citus.control
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '7.4-3'
default_version = '7.4-4'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog
4 changes: 4 additions & 0 deletions src/test/regress/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ check-follower-cluster: all
$(pg_regress_multi_check) --load-extension=citus --follower-cluster \
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_follower_schedule $(EXTRA_TESTS)

check-failure: all
$(pg_regress_multi_check) --load-extension=citus --mitmproxy \
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/failure_schedule $(EXTRA_TESTS)

clean distclean maintainer-clean:
rm -f $(output_files) $(input_files)
rm -rf tmp_check/
91 changes: 91 additions & 0 deletions src/test/regress/expected/failure_testing.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
-- disable the maintenance daemon, prevent it from making any connections
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1;
ALTER SYSTEM SET citus.recover_2pc_interval TO -1;
ALTER SYSTEM set citus.enable_statistics_collection TO false;
SELECT pg_reload_conf();
pg_reload_conf
----------------
t
(1 row)

SELECT citus.mitmproxy('flow.allow()');
mitmproxy
-----------

(1 row)

-- add the workers
SELECT master_add_node('localhost', :worker_1_port); -- the second worker
master_add_node
---------------------------------------------------
(1,1,localhost,57637,default,f,t,primary,default)
(1 row)

SELECT master_add_node('localhost', :worker_2_port + 2); -- the first worker, behind a mitmproxy
master_add_node
---------------------------------------------------
(2,2,localhost,57640,default,f,t,primary,default)
(1 row)

CREATE TABLE test (a int, b int);
SELECT create_distributed_table('test', 'a');
create_distributed_table
--------------------------

(1 row)

BEGIN;
UPDATE test SET b = 2 WHERE a = 2;
ROLLBACK;
SELECT citus.mitmproxy('flow.contains(b"UPDATE").kill()');
mitmproxy
-----------

(1 row)

BEGIN;
UPDATE test SET b = 2 WHERE a = 2;
WARNING: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
CONTEXT: while executing command on localhost:57640
ROLLBACK;
SELECT citus.mitmproxy('flow.contains(b"BEGIN").kill()');
mitmproxy
-----------

(1 row)

BEGIN;
UPDATE test SET b = 2 WHERE a = 2;
WARNING: connection not open
CONTEXT: while executing command on localhost:57640
ROLLBACK;
SET citus.multi_shard_commit_protocol = '2pc';
SELECT citus.mitmproxy('flow.contains(b"COMMIT").kill()');
mitmproxy
-----------

(1 row)

BEGIN;
UPDATE test SET b = 2 WHERE a = 2;
WARNING: connection not open
CONTEXT: while executing command on localhost:57640
COMMIT;
WARNING: connection not open
CONTEXT: while executing command on localhost:57640
WARNING: connection not open
CONTEXT: while executing command on localhost:57640
WARNING: connection not open
CONTEXT: while executing command on localhost:57640
SELECT citus.mitmproxy('flow.allow()');
mitmproxy
-----------

(1 row)

SET citus.multi_shard_commit_protocol = '2pc';
BEGIN;
UPDATE test SET b = 2 WHERE a = 2;
COMMIT;
1 change: 1 addition & 0 deletions src/test/regress/failure_schedule
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
test: failure_testing
1 change: 1 addition & 0 deletions src/test/regress/mitmscripts/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__pycache__
72 changes: 72 additions & 0 deletions src/test/regress/mitmscripts/fail_first_copy_row.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import re

from mitmproxy.utils import strutils
from mitmproxy.proxy.protocol import TlsLayer, RawTCPLayer

'''
Use with a command line like this:
mitmdump --rawtcp -p 9702 --mode reverse:localhost:9700 -s tcp_message.py
'''

killed_a_connection = False # we only want to kill one

def next_layer(layer):
'''
mitmproxy wasn't really meant for intercepting raw tcp streams, it tries to wrap the
upsteam connection (the one to the worker) in a tls stream. This hook intercepts the
part where it creates the TlsLayer (it happens in root_context.py) and instead creates
a RawTCPLayer. That's the layer which calls our tcp_message hook
'''
if isinstance(layer, TlsLayer):
replacement = RawTCPLayer(layer.ctx)
layer.reply.send(replacement)

def tcp_message(flow):
tcp_msg = flow.messages[-1]

# 1. Once we know it's the maintenance daemon ignore packets from this connection
if b'dump_local_wait_edges' in tcp_msg.content:
flow.ignore = True
return
if getattr(flow, 'ignore', False):
return

msg = tcp_msg.content
pat = re.compile(b'(test_[0-9]+)')

if tcp_msg.from_client and msg.startswith(b'Q') and pat.search(msg):
shard = pat.search(msg).groups()[0]
flow.shard = shard
else:
shard = getattr(flow, 'shard', None)

print_message(shard, tcp_msg)

# global killed_a_connection
# if killed_a_connection:
# return

if not tcp_msg.from_client and msg.startswith(b'H') and shard == b'test_102174':
# this is a CopyOutResponse
flow.kill()
killed_a_connection = True
tcp_msg.content = b''
return

return

if not tcp_msg.from_client and msg.startswith(b'H') and b'COPY 1' in msg:
# this is a CopyOutResponse
flow.kill()
killed_a_connection = True
tcp_msg.content = b''
return

def print_message(shard, tcp_msg):
print("[{}] from {} to {}:\r\n{}".format(
shard if shard else "message",
"client" if tcp_msg.from_client else "server",
"server" if tcp_msg.from_client else "client",
strutils.bytes_to_escaped_str(tcp_msg.content)
))
Loading

0 comments on commit fedadb2

Please sign in to comment.