Skip to content

Commit

Permalink
test: Various stress.py changes
Browse files Browse the repository at this point in the history
1. Removing the limit on 50 transactions per batch. It was needed when we had a bug that hangs if the tx doesn't exist, and is no longer needed;
2. Adding a new mode that drops a percentage of packets (fixes #3105);
3. Disabling the check for not deleting the same object within a transaction, until #3169 is fixed. After (1) above it crashes stress.py 3 out of 4 times, preventing it from getting to the (potential) real issues;
4. Increasing the epoch to 25 blocks, so that in the time it takes to send all the transactions and wait for the balances in the `local_network` mode ((15+20) * 2 = 70 seconds, which is approx 100 blocks) five epochs do not pass, and the transactions results are not garbage collected
5. Enabling `local_network` in default nayduck runs. Also enabling a mode without shutting down nodes and interfering with the network, in which more invariants are checked (e.g. the transactions loss tolerance is lower)

Test plan:
---------
With (3) above the test becomes relatively stable (but still flaky). local_network and node_restart modes:
http://nayduck.eastus.cloudapp.azure.com:3000/#/run/122

Tests without any interference, and with packages_drop:
http://nayduck.eastus.cloudapp.azure.com:3000/#/run/128
  • Loading branch information
SkidanovAlex committed Aug 17, 2020
1 parent d3faebd commit 91131ad
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 115 deletions.
2 changes: 1 addition & 1 deletion chain/client/tests/catching_up.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ mod tests {

let _connectors1 = connectors.clone();

let block_prod_time: u64 = 3000;
let block_prod_time: u64 = 3500;
let (_, conn, _) = setup_mock_all_validators(
validators.clone(),
key_pairs.clone(),
Expand Down
5 changes: 3 additions & 2 deletions core/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ impl StoreUpdate {
}

pub fn commit(self) -> Result<(), io::Error> {
debug_assert!(
/* TODO: enable after #3169 is fixed
debug_assert!(
self.transaction.ops.len()
== self
.transaction
Expand All @@ -222,7 +223,7 @@ impl StoreUpdate {
.len(),
"Transaction overwrites itself: {:?}",
self
);
);*/
if let Some(tries) = self.tries {
assert_eq!(
tries.get_store().storage.deref() as *const _,
Expand Down
4 changes: 3 additions & 1 deletion nightly/tests_for_nayduck.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ pytest contracts/deploy_call_smart_contract.py
pytest --timeout=240 contracts/gibberish.py

# python stress tests
# pytest --timeout=2000 stress/stress.py 3 3 3 0 staking transactions local_network
pytest --timeout=2000 stress/stress.py 3 3 3 0 staking transactions
pytest --timeout=2000 stress/stress.py 3 3 3 0 staking transactions local_network packets_drop
pytest --timeout=2000 stress/stress.py 3 3 3 0 staking transactions node_restart
pytest --timeout=2000 stress/stress.py 3 3 3 0 staking transactions node_restart packets_drop
# pytest --timeout=2000 stress/stress.py 3 2 4 0 staking transactions node_set

# pytest stress/network_stress.py
Expand Down
14 changes: 11 additions & 3 deletions pytest/lib/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,10 @@ async def bridge(reader, writer, handler_fn, global_stopped, local_stopped, erro
break

assert len(header) == 4, header
raw_message = await reader.read(struct.unpack('I', header)[0])
raw_message_len = struct.unpack('I', header)[0]
raw_message = await reader.read(raw_message_len)
while len(raw_message) < raw_message_len:
raw_message += await reader.read(raw_message_len - len(raw_message))

debug(
f"Message size={len(raw_message)} port={_MY_PORT} bridge_id={bridge_id}", level=2)
Expand All @@ -229,7 +232,7 @@ async def bridge(reader, writer, handler_fn, global_stopped, local_stopped, erro

debug(
f"Gracefully close bridge. port={_MY_PORT} bridge_id={bridge_id}", level=2)
except ConnectionResetError:
except (ConnectionResetError, BrokenPipeError):
debug(
f"Endpoint closed (Writer). port={_MY_PORT} bridge_id={bridge_id}", level=2)

Expand Down Expand Up @@ -262,6 +265,11 @@ async def handle_connection(outer_reader, outer_writer, inner_port, outer_port,
f"Cancelled Error (handle_connection). port={_MY_PORT} connection_id={connection_id} global_stopped={global_stopped.value} local_stopped={local_stopped.value} error={error.value}")
if local_stopped.value == 0:
global_stopped.value = 1
except ConnectionRefusedError:
debug(
f"ConnectionRefusedError (handle_connection). port={_MY_PORT} connection_id={connection_id} global_stopped={global_stopped.value} local_stopped={local_stopped.value} error={error.value}")
if local_stopped.value == 0:
global_stopped.value = 1
except:
debug(
f"Other Error (handle_connection). port={_MY_PORT} connection_id={connection_id} global_stopped={global_stopped.value} local_stopped={local_stopped.value} error={error.value}")
Expand All @@ -277,7 +285,7 @@ async def listener(inner_port, outer_port, handler_ctr, global_stopped, local_st
async def start_connection(reader, writer):
await handle_connection(reader, writer, inner_port, outer_port, handler, global_stopped, local_stopped, error)

attempts = 2
attempts = 3

# Possibly need to wait 1 second to start listener if node was killed and previous listener is on yet.
while attempts > 0:
Expand Down
18 changes: 13 additions & 5 deletions pytest/lib/proxy_instances.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
import logging, multiprocessing
import logging, multiprocessing, random
from proxy import ProxyHandler, NodesProxy


class RejectListHandler(ProxyHandler):

def __init__(self, reject_list, ordinal):
def __init__(self, reject_list, drop_probability, ordinal):
super().__init__(ordinal)
self.reject_list = reject_list
self.drop_probability = drop_probability

async def handle(self, msg, fr, to):
msg_type = msg.enum if msg.enum != 'Routed' else msg.Routed.body.enum

if self.drop_probability > 0 and 'Handshake' not in msg_type and random.uniform(0, 1) < self.drop_probability:
logging.info(
f'NODE {self.ordinal} dropping message {msg_type} from {fr} to {to}')
return False

if fr in self.reject_list or to in self.reject_list:
msg_type = msg.enum if msg.enum != 'Routed' else msg.Routed.body.enum
logging.info(
f'NODE {self.ordinal} blocking message {msg_type} from {fr} to {to}')
return False
Expand All @@ -20,9 +27,10 @@ async def handle(self, msg, fr, to):

class RejectListProxy(NodesProxy):

def __init__(self, reject_list):
def __init__(self, reject_list, drop_probability):
self.reject_list = reject_list
handler = lambda ordinal: RejectListHandler(reject_list, ordinal)
self.drop_probability = drop_probability
handler = lambda ordinal: RejectListHandler(reject_list, drop_probability, ordinal)
super().__init__(handler)

@staticmethod
Expand Down
168 changes: 65 additions & 103 deletions pytest/tests/stress/stress.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# [v] `node_set`: ocasionally spins up new nodes or kills existing ones, as long as the number of nodes doesn't exceed `N` and doesn't go below `n`. Also makes sure that for each shard there's at least one node that has been live sufficiently long
# [v] `node_restart`: ocasionally restarts nodes
# [v] `local_network`: ocasionally briefly shuts down the network connection for a specific node
# [v] `packets_drop`: drop 10% of all the network packets
# [ ] `global_network`: ocasionally shots down the network globally for several seconds
# [v] `transactions`: sends random transactions keeping track of expected balances
# [v] `staking`: runs staking transactions for validators. Presently the test doesn't track staking invariants, relying on asserts in the nearcore.
Expand All @@ -39,7 +40,7 @@
TIMEOUT = 1500 # after how much time to shut down the test
TIMEOUT_SHUTDOWN = 120 # time to wait after the shutdown was initiated before failing the test due to process stalling
MAX_STAKE = int(1e32)
EPOCH_LENGTH = 20
EPOCH_LENGTH = 25

# How many times to try to send transactions to each validator.
# Is only applicable in the scenarios where we expect failures in tx sends.
Expand Down Expand Up @@ -168,6 +169,12 @@ def monkey_node_restart(stopped, error, nodes, nonces):
time.sleep(5)


@stress_process
def monkey_packets_drop(stopped, error, nodes, nonces):
# no implementation needed, packet drop is configured on start
pass


@stress_process
def monkey_local_network(stopped, error, nodes, nonces):
last_height = 0
Expand Down Expand Up @@ -234,16 +241,6 @@ def get_balances():
"BALANCES NEVER CAUGHT UP, CHECKING UNFINISHED TRANSACTIONS"
)

def trace_reverted_txs(last_tx_set, tx_ords):
logging.info("\n\nREVERTING THE FOLLOWING TXS WOULD BE ENOUGH:\n")
for tx_ord in tx_ords:
tx = last_tx_set[tx_ord]
logging.info("\nTRANSACTION %s" % tx_ord)
logging.info("TX tuple: %s" % (tx[1:],))
response = nodes[-1].json_rpc(
'tx', [tx[3], "test%s" % tx[1]], timeout=5)
logging.info("Status: %s", response)

def revert_txs():
nonlocal expected_balances
good = 0
Expand Down Expand Up @@ -296,107 +293,67 @@ def revert_txs():
"REVERTING DIDN'T HELP, TX EXECUTED: %s, TX LOST: %s" %
(good, bad))

for i in range(0, len(last_tx_set)):
tx = last_tx_set[i]
expected_balances[tx[1]] += tx[4]
expected_balances[tx[2]] -= tx[4]

if get_balances() == expected_balances:
trace_reverted_txs(last_tx_set, [i])
assert False, "Balances didn't update in time. Expected: %s, received: %s" % (
expected_balances, get_balances())
last_iter_switch = time.time()

for j in range(i + 1, len(last_tx_set)):
tx = last_tx_set[j]
expected_balances[tx[1]] += tx[4]
expected_balances[tx[2]] -= tx[4]
if mode == 0:
from_ = random.randint(0, len(nodes) - 1)
while min_balances[from_] < 0:
from_ = random.randint(0, len(nodes) - 1)
to = random.randint(0, len(nodes) - 1)
while from_ == to:
to = random.randint(0, len(nodes) - 1)
amt = random.randint(0, min_balances[from_])
nonce_val, nonce_lock = nonces[from_]

if get_balances() == expected_balances:
trace_reverted_txs(last_tx_set, [i, j])
hash_, _ = get_recent_hash(nodes[-1], 5)

for k in range(j + 1, len(last_tx_set)):
tx = last_tx_set[k]
expected_balances[tx[1]] += tx[4]
expected_balances[tx[2]] -= tx[4]
with nonce_lock:
tx = sign_payment_tx(nodes[from_].signer_key, 'test%s' % to,
amt, nonce_val.value,
base58.b58decode(hash_.encode('utf8')))

if get_balances() == expected_balances:
trace_reverted_txs(last_tx_set, [i, j, k])
# Loop trying to send the tx to all the validators, until at least one receives it
tx_hash = None
for send_attempt in range(SEND_TX_ATTEMPTS):
shuffled_validator_ids = [x for x in validator_ids]
random.shuffle(shuffled_validator_ids)
for validator_id in shuffled_validator_ids:
try:
info = nodes[validator_id].send_tx(tx)
if 'error' in info and info['error']['data'] == 'IsSyncing':
pass

expected_balances[tx[1]] -= tx[4]
expected_balances[tx[2]] += tx[4]
elif 'result' in info:
tx_hash = info['result']
break

tx = last_tx_set[j]
expected_balances[tx[1]] -= tx[4]
expected_balances[tx[2]] += tx[4]
else:
assert False, info

tx = last_tx_set[i]
expected_balances[tx[1]] -= tx[4]
expected_balances[tx[2]] += tx[4]
except (requests.exceptions.ReadTimeout,
requests.exceptions.ConnectionError):
if not network_issues_expected and not nodes[
validator_id].mess_with:
raise

logging.info(
"The latest and greatest stats on successful/failed: %s/%s"
% (good, bad))
assert False, "Balances didn't update in time. Expected: %s, received: %s" % (
expected_balances, get_balances())
last_iter_switch = time.time()
if tx_hash is not None:
break

if mode == 0:
# do not send more than 50 txs, so that at the end of the test we have time to query all of them. When #2195 is fixed, this condition can probably be safely removed
if tx_count < 50:
from_ = random.randint(0, len(nodes) - 1)
while min_balances[from_] < 0:
from_ = random.randint(0, len(nodes) - 1)
to = random.randint(0, len(nodes) - 1)
while from_ == to:
to = random.randint(0, len(nodes) - 1)
amt = random.randint(0, min_balances[from_])
nonce_val, nonce_lock = nonces[from_]

hash_, _ = get_recent_hash(nodes[-1], 5)

with nonce_lock:
tx = sign_payment_tx(nodes[from_].signer_key, 'test%s' % to,
amt, nonce_val.value,
base58.b58decode(hash_.encode('utf8')))

# Loop trying to send the tx to all the validators, until at least one receives it
tx_hash = None
for send_attempt in range(SEND_TX_ATTEMPTS):
shuffled_validator_ids = [x for x in validator_ids]
random.shuffle(shuffled_validator_ids)
for validator_id in shuffled_validator_ids:
try:
info = nodes[validator_id].send_tx(tx)
if 'error' in info and info['error']['data'] == 'IsSyncing':
pass

elif 'result' in info:
tx_hash = info['result']
break

else:
assert False, info

except (requests.exceptions.ReadTimeout,
requests.exceptions.ConnectionError):
if not network_issues_expected and not nodes[
validator_id].mess_with:
raise

if tx_hash is not None:
break

time.sleep(1)
time.sleep(1)

else:
assert False, "Failed to send the transation after %s attempts" % SEND_TX_ATTEMPTS
else:
assert False, "Failed to send the transation after %s attempts" % SEND_TX_ATTEMPTS

last_tx_set.append((tx, from_, to, tx_hash, amt))
nonce_val.value = nonce_val.value + 1
last_tx_set.append((tx, from_, to, tx_hash, amt))
nonce_val.value = nonce_val.value + 1

expected_balances[from_] -= amt
expected_balances[to] += amt
min_balances[from_] -= amt
expected_balances[from_] -= amt
expected_balances[to] += amt
min_balances[from_] -= amt

tx_count += 1
tx_count += 1

else:
if get_balances() == expected_balances:
Expand Down Expand Up @@ -574,19 +531,24 @@ def doit(s, n, N, k, monkeys, timeout):
monkey_names = [x.__name__ for x in monkeys]
proxy = None
logging.info(monkey_names)
if 'monkey_local_network' in monkey_names or 'monkey_global_network' in monkey_names:
timeouts_increased = False
if 'monkey_local_network' in monkey_names or 'monkey_global_network' in monkey_names or 'monkey_packets_drop' in monkey_names:
assert config['local'], 'Network stress operations only work on local nodes'
drop_probability = 0.1 if 'monkey_packets_drop' in monkey_names else 0

reject_list = RejectListProxy.create_reject_list(1)
proxy = RejectListProxy(reject_list)
proxy = RejectListProxy(reject_list, drop_probability)
expect_network_issues()
block_timeout += 40
balances_timeout += 20
tx_tolerance += 0.3
timeouts_increased = True
if 'monkey_node_restart' in monkey_names:
expect_network_issues()
if 'monkey_node_restart' in monkey_names or 'monkey_node_set' in monkey_names:
block_timeout += 40
balances_timeout += 10
if not timeouts_increased:
block_timeout += 40
balances_timeout += 10
tx_tolerance += 0.5

started = time.time()
Expand Down

0 comments on commit 91131ad

Please sign in to comment.