This repository has been archived by the owner on Dec 26, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 89
/
Copy pathrun_consensus.py
240 lines (192 loc) · 8.49 KB
/
run_consensus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
import subprocess
import time
import os
import signal
import argparse
import tempfile
import socket
from contextlib import closing
import fcntl
# The SECRET_KEY is used for building the BOOT_NODE_PEER_ID, so they are coupled and must be used together.
SECRET_KEY = "0xabcdabcdabcdabcdabcdabcdabcdabcdabcdabcdabcdabcdabcdabcdabcdabcd"
BOOT_NODE_PEER_ID = "12D3KooWDFYi71juk6dYWo3UDvqs5gAzGDc124LSvcR5d187Tdvi"
MONITORING_PERIOD = 10
class Node:
def __init__(self, validator_id, monitoring_gateway_server_port, cmd):
self.validator_id = validator_id
self.monitoring_gateway_server_port = monitoring_gateway_server_port
self.cmd = cmd
self.process = None
self.height_and_timestamp = (None, None) # (height, timestamp)
def start(self):
self.process = subprocess.Popen(self.cmd, shell=True, preexec_fn=os.setsid)
def stop(self):
if self.process:
os.killpg(os.getpgid(self.process.pid), signal.SIGINT)
self.process.wait()
def get_height(self):
port = self.monitoring_gateway_server_port
command = f"curl -s -X GET http://localhost:{port}/monitoring/metrics | grep -oP 'papyrus_consensus_height \\K\\d+'"
result = subprocess.run(command, shell=True, capture_output=True, text=True)
# returns the most recently decided height, or None if node is not ready or consensus has not yet reached any height.
return int(result.stdout) if result.stdout else None
def check_height(self):
height = self.get_height()
if self.height_and_timestamp[0] != height:
if self.height_and_timestamp[0] is not None and height is not None:
assert height > self.height_and_timestamp[0], "Height should be increasing."
self.height_and_timestamp = (height, time.time())
return self.height_and_timestamp
class LockDir:
def __init__(self, db_dir):
self.db_dir = db_dir
self.file_path = os.path.join(db_dir, "lockfile")
self.file = None
def __enter__(self):
self.file = open(self.file_path, "w")
try:
fcntl.flock(self.file, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
print(
f"Could not acquire lock for {self.file_path}, {self.db_dir} is in use by another simulation."
)
exit(1)
return self.file
def __exit__(self, exc_type, exc_value, traceback):
if self.file:
fcntl.flock(self.file, fcntl.LOCK_UN)
self.file.close()
def find_free_port():
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
s.bind(("", 0))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
return s.getsockname()[1]
BOOTNODE_TCP_PORT = find_free_port()
# Returns if the simulation should exit.
def monitor_simulation(nodes, start_time, duration, stagnation_timeout):
curr_time = time.time()
if duration is not None and duration < (curr_time - start_time):
return True
stagnated_nodes = []
for node in nodes:
(height, last_update) = node.check_height()
print(f"Node: {node.validator_id}, height: {height}")
if height is not None and (curr_time - last_update) > stagnation_timeout:
stagnated_nodes.append(node.validator_id)
if stagnated_nodes:
print(f"Nodes {stagnated_nodes} have stagnated. Exiting simulation.")
return True
return False
def run_simulation(nodes, duration, stagnation_timeout):
for node in nodes:
node.start()
start_time = time.time()
try:
while True:
time.sleep(MONITORING_PERIOD)
print(f"\nTime elapsed: {time.time() - start_time}s")
should_exit = monitor_simulation(nodes, start_time, duration, stagnation_timeout)
if should_exit:
break
except KeyboardInterrupt:
print("\nTerminating subprocesses...")
finally:
for node in nodes:
node.stop()
def build_node(base_layer_node_url, data_dir, logs_dir, num_validators, i):
is_bootstrap = i == 1
tcp_port = BOOTNODE_TCP_PORT if is_bootstrap else find_free_port()
monitoring_gateway_server_port = find_free_port()
data_dir = os.path.join(data_dir, f"data{i}")
cmd = (
f"RUST_LOG=papyrus_consensus=debug,papyrus=info "
f"target/release/papyrus_node --network.#is_none false "
f"--base_layer.node_url {base_layer_node_url} "
f"--storage.db_config.path_prefix {data_dir} "
f"--consensus.#is_none false --consensus.validator_id 0x{i} "
f"--consensus.num_validators {num_validators} "
f"--network.tcp_port {tcp_port} "
f"--rpc.server_address 127.0.0.1:{find_free_port()} "
f"--monitoring_gateway.server_address 127.0.0.1:{monitoring_gateway_server_port} "
f"--collect_metrics true "
)
if is_bootstrap:
cmd += (
f"--network.secret_key {SECRET_KEY} "
+ f"| sed -r 's/\\x1B\\[[0-9;]*[mK]//g' > {logs_dir}/validator{i}.txt"
)
else:
cmd += (
f"--network.bootstrap_peer_multiaddr.#is_none false "
f"--network.bootstrap_peer_multiaddr /ip4/127.0.0.1/tcp/{BOOTNODE_TCP_PORT}/p2p/{BOOT_NODE_PEER_ID} "
+ f"| sed -r 's/\\x1B\\[[0-9;]*[mK]//g' > {logs_dir}/validator{i}.txt"
)
return Node(
validator_id=i,
monitoring_gateway_server_port=monitoring_gateway_server_port,
cmd=cmd,
)
def build_all_nodes(base_layer_node_url, data_dir, logs_dir, num_validators):
# Validators are started in a specific order to ensure proper network formation:
# 1. The bootnode (validator 1) is started first for network peering.
# 2. Validators 2+ are started next to join the network through the bootnode.
# 3. Validator 0, which is the proposer, is started last so the validators don't miss the proposals.
nodes = []
nodes.append(build_node(base_layer_node_url, data_dir, logs_dir, num_validators, 1)) # Bootstrap
for i in range(2, num_validators):
nodes.append(build_node(base_layer_node_url, data_dir, logs_dir, num_validators, i))
nodes.append(build_node(base_layer_node_url, data_dir, logs_dir, num_validators, 0)) # Proposer
return nodes
def main(base_layer_node_url, num_validators, db_dir, stagnation_threshold, duration):
assert num_validators >= 2, "At least 2 validators are required for the simulation."
logs_dir = tempfile.mkdtemp()
if db_dir is not None:
actual_dirs = {d for d in os.listdir(db_dir) if os.path.isdir(os.path.join(db_dir, d))}
# Ensure the directories are named data0, data1, ..., data{num_validators - 1}
expected_dirs = {f"data{i}" for i in range(num_validators)}
assert expected_dirs.issubset(
actual_dirs
), f"{db_dir} must contain: {', '.join(expected_dirs)}."
else:
db_dir = logs_dir
for i in range(num_validators):
os.makedirs(os.path.join(db_dir, f"data{i}"))
# Acquire lock on the db_dir
with LockDir(db_dir):
print("Running cargo build...")
subprocess.run("cargo build --release --package papyrus_node", shell=True, check=True)
print(
f"Output files will be stored in: {logs_dir} and data files will be stored in: {db_dir}"
)
nodes = build_all_nodes(base_layer_node_url, db_dir, logs_dir, num_validators)
print("Running validators...")
run_simulation(nodes, duration, stagnation_threshold)
print(f"Output files were stored in: {logs_dir} and data files were stored in: {db_dir}")
print("Simulation complete.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run Papyrus Node simulation.")
parser.add_argument("--base_layer_node_url", required=True)
parser.add_argument("--num_validators", type=int, required=True)
parser.add_argument(
"--db_dir",
required=False,
default=None,
help="Directory with existing DBs that this simulation can reuse.",
)
parser.add_argument(
"--stagnation_threshold",
type=int,
required=False,
default=60,
help="Time in seconds to check for height stagnation.",
)
parser.add_argument("--duration", type=int, required=False, default=None)
args = parser.parse_args()
main(
args.base_layer_node_url,
args.num_validators,
args.db_dir,
args.stagnation_threshold,
args.duration,
)