From 9987bdd4995ded92d4100f190863c54275c561c2 Mon Sep 17 00:00:00 2001 From: Brett Boston Date: Tue, 18 Jun 2024 15:14:24 -0700 Subject: [PATCH 1/2] Update overlay survey script with lessons learned during testnet run This change makes a few tweaks to the overlay survey script to fix some small things I noticed after running it on testnet: * Changes the scripts end condition to depend only on responses, and not requests. Without this it was possible for the survey script to run for the full duration of the collecting phase (2 hours) if a node with more than 25 peers stopped responding after the surveyor recieved the first set of peers. * Downgrades the severity of "node already in backlog" messages from `error` to `debug`. This is an expected condition that I simply forgot to special-case before. * Modifies the simulator to occasionally return "node already in backlog" messages to test the script against that case. * Adds a `--fast` option to the `simulate` mode that skips any `sleep` calls. This makes the script much nicer to test. * Fixes naming of graphml fields to match JSON result fields. * I did most of this in the V2 script update, but missed a couple spots. * Most of this change is in the simulator to support the new field names. --- scripts/OverlaySurvey.py | 96 +++++++++++++++++----------- scripts/overlay_survey/simulation.py | 57 +++++++++++++---- scripts/overlay_survey/util.py | 11 +++- 3 files changed, 112 insertions(+), 52 deletions(-) diff --git a/scripts/OverlaySurvey.py b/scripts/OverlaySurvey.py index 82abdf6fa2..4d53d072cf 100755 --- a/scripts/OverlaySurvey.py +++ b/scripts/OverlaySurvey.py @@ -84,15 +84,14 @@ # internal limit. MAX_COLLECT_DURATION = 30 -# Maximum number of consecutive rounds in which the surveyor neither sent -# requests to nor received responses from any nodes. A round contains a batch of -# requests sent to select nodes, followed by a wait period of 15 seconds, -# followed by checking for responses and building up the next batch of requests -# to send. Therefore, a setting of `8` is roughly 2 minutes of inactivity -# before the script considers the survey complete. This is necessary because -# it's very likely that not all surveyed nodes will respond to the survey. -# Therefore, we need some cutoff after we which we assume those nodes will never -# respond. +# Maximum number of consecutive rounds in which the surveyor does not receive +# responses from any nodes. A round contains a batch of requests sent to select +# nodes, followed by a wait period of 15 seconds, followed by checking for +# responses and building up the next batch of requests to send. Therefore, a +# setting of `8` is roughly 2 minutes of inactivity before the script considers +# the survey complete. This is necessary because it's very likely that not all +# surveyed nodes will respond to the survey. Therefore, we need some cutoff +# after we which we assume those nodes will never respond. MAX_INACTIVE_ROUNDS = 8 def get_request(url, params=None): @@ -187,8 +186,18 @@ def send_survey_requests(peer_list, url_base): util.SURVEY_TOPOLOGY_TIME_SLICED_SUCCESS_START): logger.debug("Send request to %s", nodeid) else: - logger.error("Failed to send survey request to %s: %s", - nodeid, response.text) + try: + exception = response.json()["exception"] + if exception == \ + util.SURVEY_TOPOLOGY_TIME_SLICED_ALREADY_IN_BACKLOG_OR_SELF: + logger.debug("Node %s is already in backlog or is self", + nodeid) + else: + logger.error("Failed to send survey request to %s: %s", + nodeid, exception) + except (requests.exceptions.JSONDecodeError, KeyError): + logger.error("Failed to send survey request to %s: %s", + nodeid, response.text) logger.info("Done sending survey requests") @@ -324,6 +333,7 @@ def run_survey(args): logger.critical("%s", e) sys.exit(1) + skip_sleep = args.simulate and args.fast url = args.node peers = url + "/peers" @@ -339,10 +349,11 @@ def run_survey(args): logger.critical("Failed to start survey: %s", response.text) sys.exit(1) - # Sleep for duration of collecting phase - logger.info("Sleeping for collecting phase (%i minutes)", - args.collect_duration) - time.sleep(args.collect_duration * 60) + if not skip_sleep: + # Sleep for duration of collecting phase + logger.info("Sleeping for collecting phase (%i minutes)", + args.collect_duration) + time.sleep(args.collect_duration * 60) # Stop survey recording logger.info("Stopping survey collecting") @@ -351,12 +362,13 @@ def run_survey(args): logger.critical("Failed to stop survey: %s", response.text) sys.exit(1) - # Allow time for stop message to propagate - sleep_time = 60 - logger.info( - "Waiting %i seconds for 'stop collecting' message to propagate", - sleep_time) - time.sleep(sleep_time) + if not skip_sleep: + # Allow time for stop message to propagate + sleep_time = 60 + logger.info( + "Waiting %i seconds for 'stop collecting' message to propagate", + sleep_time) + time.sleep(sleep_time) peer_list = set() if args.nodeList: @@ -382,21 +394,19 @@ def run_survey(args): self_name = get_request(url + "/scp", scp_params).json()["you"] graph.add_node(self_name, version=get_request(url + "/info").json()["info"]["build"], - numTotalInboundPeers=len(peers["inbound"] or []), - numTotalOutboundPeers=len(peers["outbound"] or [])) + totalInbound=len(peers["inbound"] or []), + totalOutbound=len(peers["outbound"] or [])) sent_requests = set() heard_from = set() + incomplete_responses = set() # Number of consecutive rounds in which surveyor neither sent requests nor # received responses inactive_rounds = 0 while True: - if peer_list: - inactive_rounds = 0 - else: - inactive_rounds += 1 + inactive_rounds += 1 send_survey_requests(peer_list, url) @@ -405,12 +415,13 @@ def run_survey(args): peer_list = set() - # allow time for results. Stellar-core sends out a batch of requests - # every 15 seconds, so there's not much benefit in checking more - # frequently than that - sleep_time = 15 - logger.info("Waiting %i seconds for survey results", sleep_time) - time.sleep(sleep_time) + if not skip_sleep: + # allow time for results. Stellar-core sends out a batch of requests + # every 15 seconds, so there's not much benefit in checking more + # frequently than that + sleep_time = 15 + logger.info("Waiting %i seconds for survey results", sleep_time) + time.sleep(sleep_time) logger.info("Fetching survey result") data = get_request(url=survey_result).json() @@ -418,12 +429,19 @@ def run_survey(args): if "topology" in data: for key in data["topology"]: - if data["topology"][key] is not None: + node_data = data["topology"][key] + if node_data is not None: if key not in heard_from: # Received a new response! logger.debug("Received response from %s", key) inactive_rounds = 0 heard_from.add(key) + elif key in incomplete_responses and len(node_data) > 0: + # Received additional data for a node that previously + # responded + logger.debug("Received additional data for %s", key) + inactive_rounds = 0 + incomplete_responses.remove(key) waiting_to_hear = set() for node in sent_requests: @@ -457,9 +475,9 @@ def run_survey(args): have_outbound = len(node["outboundPeers"]) if (node["totalInbound"] > have_inbound or node["totalOutbound"] > have_outbound): - peer_list.add(util.PendingRequest(key, - have_inbound, - have_outbound)) + incomplete_responses.add(key) + req = util.PendingRequest(key, have_inbound, have_outbound) + peer_list.add(req) logger.info("New nodes: %s Gathering additional peer data: %s", new_peers, len(peer_list)-new_peers) @@ -554,6 +572,10 @@ def main(): "--simRoot", required=True, help="node to start simulation from") + parser_simulate.add_argument("-f", + "--fast", + action="store_true", + help="Skip sleep calls during simulation.") parser_simulate.set_defaults(simulate=True) parser_analyze = subparsers.add_parser('analyze', diff --git a/scripts/overlay_survey/simulation.py b/scripts/overlay_survey/simulation.py index 4551eac9ed..234ec445c4 100644 --- a/scripts/overlay_survey/simulation.py +++ b/scripts/overlay_survey/simulation.py @@ -41,7 +41,7 @@ def _add_v2_survey_data(node_json): Does nothing if the node_json is already a v2 survey result or if the V1 survey data indicates the node didn't respond to the survey. """ - if "numTotalInboundPeers" not in node_json: + if "totalInbound" not in node_json: # Node did not respond to the survey. Nothing to do. return @@ -63,6 +63,13 @@ def _add_v2_survey_data(node_json): for peer in node_json["outboundPeers"]: peer["averageLatencyMs"] = random.randint(0, 2**32-1) +def _rename_dictionary_field(dictionary, old_key, new_key): + """ + Rename a field in a dictionary. If the field is not present, do nothing. + """ + if old_key in dictionary: + dictionary[new_key] = dictionary[old_key] + del dictionary[old_key] class SurveySimulation: """ @@ -139,11 +146,25 @@ def _surveytopologytimesliced(self, params): assert params.keys() == {"node", "inboundpeerindex", "outboundpeerindex"} - if params["node"] != self._root_node: - req = util.PendingRequest(params["node"], - params["inboundpeerindex"], - params["outboundpeerindex"]) - self._pending_requests.append(req) + + fail_response = SimulatedResponse( + {"exception" : + util.SURVEY_TOPOLOGY_TIME_SLICED_ALREADY_IN_BACKLOG_OR_SELF}) + node = params["node"] + inbound_peer_idx = params["inboundpeerindex"] + outbound_peer_idx = params["outboundpeerindex"] + if node == self._root_node: + # Nodes cannot survey themselves (yet) + return fail_response + + if ((inbound_peer_idx > 0 or outbound_peer_idx > 0) and + random.random() < 0.2): + # Randomly indicate that node is already in backlog if it is being + # resurveyed. Script should handle this by trying again later. + return fail_response + + req = util.PendingRequest(node, inbound_peer_idx, outbound_peer_idx) + self._pending_requests.append(req) return SimulatedResponse( text=util.SURVEY_TOPOLOGY_TIME_SLICED_SUCCESS_TEXT) @@ -191,8 +212,8 @@ def _getsurveyresult(self, params): ] for (node_id, _, data) in inbound_slice: self._addpeer(node_id, data, node_json["inboundPeers"]) - if ("numTotalInboundPeers" in node_json and - node_json["numTotalInboundPeers"] != len(in_edges)): + if ("totalInbound" in node_json and + node_json["totalInbound"] != len(in_edges)): # The V1 survey contains a race condition in which the number of # peers can change between when a node reports its peer count # and when the surveyor requests the peers themselves. The V2 @@ -209,8 +230,8 @@ def _getsurveyresult(self, params): "count.", node, len(in_edges), - node_json["numTotalInboundPeers"]) - node_json["numTotalInboundPeers"] = len(in_edges) + node_json["totalInbound"]) + node_json["totalInbound"] = len(in_edges) # Generate outboundPeers list node_json["outboundPeers"] = [] @@ -220,8 +241,8 @@ def _getsurveyresult(self, params): ] for (_, node_id, data) in outbound_slice: self._addpeer(node_id, data, node_json["outboundPeers"]) - if ("numTotalOutboundPeers" in node_json and - node_json["numTotalOutboundPeers"] != len(out_edges)): + if ("totalOutbound" in node_json and + node_json["totalOutbound"] != len(out_edges)): # Patch up peer counts in simulated survey results with real # peer counts (see note on similar conditional for inbound peers # above) @@ -231,11 +252,19 @@ def _getsurveyresult(self, params): "count.", node, len(out_edges), - node_json["numTotalOutboundPeers"]) - node_json["numTotalOutboundPeers"] = len(out_edges) + node_json["totalOutbound"]) + node_json["totalOutbound"] = len(out_edges) _add_v2_survey_data(node_json) + # Rename peer count fields to match stellar-core's response + _rename_dictionary_field(node_json, + "totalInbound", + "numTotalInboundPeers") + _rename_dictionary_field(node_json, + "totalOutbound", + "numTotalOutboundPeers") + self._results["topology"][node] = node_json return SimulatedResponse(json=self._results) diff --git a/scripts/overlay_survey/util.py b/scripts/overlay_survey/util.py index 1288b54863..400323696d 100644 --- a/scripts/overlay_survey/util.py +++ b/scripts/overlay_survey/util.py @@ -25,4 +25,13 @@ # "Adding node." SURVEY_TOPOLOGY_TIME_SLICED_SUCCESS_START = "Adding node." SURVEY_TOPOLOGY_TIME_SLICED_SUCCESS_TEXT = \ - SURVEY_TOPOLOGY_TIME_SLICED_SUCCESS_START + "Survey already running!" \ No newline at end of file + SURVEY_TOPOLOGY_TIME_SLICED_SUCCESS_START + "Survey already running!" + +# The error response from the surveytopologytimesliced endpoint when the survey +# backlog already contains the node requested to be surveyed, or the requested +# node is the surveyor. stellar-core returns this error JSON object where the +# error text is contained in the "exception" field. +SURVEY_TOPOLOGY_TIME_SLICED_ALREADY_IN_BACKLOG_OR_SELF = ( + "addPeerToBacklog failed: Peer is already in the backlog, or peer " + "is self." + ) \ No newline at end of file From e26825641717c5ced981fb4a974f59c3fe3fc80a Mon Sep 17 00:00:00 2001 From: Brett Boston Date: Mon, 8 Jul 2024 11:23:58 -0700 Subject: [PATCH 2/2] Use stellar-core output names for everything --- scripts/OverlaySurvey.py | 47 ++++++++++++++-------------- scripts/overlay_survey/simulation.py | 34 ++++++-------------- 2 files changed, 32 insertions(+), 49 deletions(-) diff --git a/scripts/OverlaySurvey.py b/scripts/OverlaySurvey.py index 4d53d072cf..620a403737 100755 --- a/scripts/OverlaySurvey.py +++ b/scripts/OverlaySurvey.py @@ -128,15 +128,14 @@ def get_next_peers(topology): def update_node(graph, node_info, node_key, results, field_names): """ - For each `(info_field, node_field)` pair in `field_names`, if `info_field` - is in `node_info`, modify the node in `graph` with key `node_key` to store - the value of `info_field` in `node_field`. + For each `field_name` in `field_names`, if `field_name` is in `node_info`, + modify `graph` and `results` to contain the field. """ - for (info_field, node_field) in field_names: - if info_field in node_info: - val = node_info[info_field] - results[node_field] = val - graph.add_node(node_key, **{node_field: val}) + for field_name in field_names: + if field_name in node_info: + val = node_info[field_name] + results[field_name] = val + graph.add_node(node_key, **{field_name: val}) def update_results(graph, parent_info, parent_key, results, is_inbound): direction_tag = "inboundPeers" if is_inbound else "outboundPeers" @@ -157,16 +156,16 @@ def update_results(graph, parent_info, parent_key, results, is_inbound): graph.add_edge(parent_key, other_key, **edge_properties) # Add survey results to parent node (if available) - field_names = [("numTotalInboundPeers", "totalInbound"), - ("numTotalOutboundPeers", "totalOutbound"), - ("maxInboundPeerCount", "maxInboundPeerCount"), - ("maxOutboundPeerCount", "maxOutboundPeerCount"), - ("addedAuthenticatedPeers", "addedAuthenticatedPeers"), - ("droppedAuthenticatedPeers", "droppedAuthenticatedPeers"), - ("p75SCPFirstToSelfLatencyMs", "p75SCPFirstToSelfLatencyMs"), - ("p75SCPSelfToOtherLatencyMs", "p75SCPSelfToOtherLatencyMs"), - ("lostSyncCount", "lostSyncCount"), - ("isValidator", "isValidator")] + field_names = ["numTotalInboundPeers", + "numTotalOutboundPeers", + "maxInboundPeerCount", + "maxOutboundPeerCount", + "addedAuthenticatedPeers", + "droppedAuthenticatedPeers", + "p75SCPFirstToSelfLatencyMs", + "p75SCPSelfToOtherLatencyMs", + "lostSyncCount", + "isValidator"] update_node(graph, parent_info, parent_key, results, field_names) @@ -318,8 +317,8 @@ def augment(args): def run_survey(args): graph = nx.DiGraph() merged_results = defaultdict(lambda: { - "totalInbound": 0, - "totalOutbound": 0, + "numTotalInboundPeers": 0, + "numTotalOutboundPeers": 0, "maxInboundPeerCount": 0, "maxOutboundPeerCount": 0, "inboundPeers": {}, @@ -394,8 +393,8 @@ def run_survey(args): self_name = get_request(url + "/scp", scp_params).json()["you"] graph.add_node(self_name, version=get_request(url + "/info").json()["info"]["build"], - totalInbound=len(peers["inbound"] or []), - totalOutbound=len(peers["outbound"] or [])) + numTotalInboundPeers=len(peers["inbound"] or []), + numTotalOutboundPeers=len(peers["outbound"] or [])) sent_requests = set() heard_from = set() @@ -473,8 +472,8 @@ def run_survey(args): node = merged_results[key] have_inbound = len(node["inboundPeers"]) have_outbound = len(node["outboundPeers"]) - if (node["totalInbound"] > have_inbound or - node["totalOutbound"] > have_outbound): + if (node["numTotalInboundPeers"] > have_inbound or + node["numTotalOutboundPeers"] > have_outbound): incomplete_responses.add(key) req = util.PendingRequest(key, have_inbound, have_outbound) peer_list.add(req) diff --git a/scripts/overlay_survey/simulation.py b/scripts/overlay_survey/simulation.py index 234ec445c4..da7c51af30 100644 --- a/scripts/overlay_survey/simulation.py +++ b/scripts/overlay_survey/simulation.py @@ -41,7 +41,7 @@ def _add_v2_survey_data(node_json): Does nothing if the node_json is already a v2 survey result or if the V1 survey data indicates the node didn't respond to the survey. """ - if "totalInbound" not in node_json: + if "numTotalInboundPeers" not in node_json: # Node did not respond to the survey. Nothing to do. return @@ -63,14 +63,6 @@ def _add_v2_survey_data(node_json): for peer in node_json["outboundPeers"]: peer["averageLatencyMs"] = random.randint(0, 2**32-1) -def _rename_dictionary_field(dictionary, old_key, new_key): - """ - Rename a field in a dictionary. If the field is not present, do nothing. - """ - if old_key in dictionary: - dictionary[new_key] = dictionary[old_key] - del dictionary[old_key] - class SurveySimulation: """ Simulates the HTTP endpoints of stellar-core's overlay survey. Raises @@ -212,8 +204,8 @@ def _getsurveyresult(self, params): ] for (node_id, _, data) in inbound_slice: self._addpeer(node_id, data, node_json["inboundPeers"]) - if ("totalInbound" in node_json and - node_json["totalInbound"] != len(in_edges)): + if ("numTotalInboundPeers" in node_json and + node_json["numTotalInboundPeers"] != len(in_edges)): # The V1 survey contains a race condition in which the number of # peers can change between when a node reports its peer count # and when the surveyor requests the peers themselves. The V2 @@ -230,8 +222,8 @@ def _getsurveyresult(self, params): "count.", node, len(in_edges), - node_json["totalInbound"]) - node_json["totalInbound"] = len(in_edges) + node_json["numTotalInboundPeers"]) + node_json["numTotalInboundPeers"] = len(in_edges) # Generate outboundPeers list node_json["outboundPeers"] = [] @@ -241,8 +233,8 @@ def _getsurveyresult(self, params): ] for (_, node_id, data) in outbound_slice: self._addpeer(node_id, data, node_json["outboundPeers"]) - if ("totalOutbound" in node_json and - node_json["totalOutbound"] != len(out_edges)): + if ("numTotalOutboundPeers" in node_json and + node_json["numTotalOutboundPeers"] != len(out_edges)): # Patch up peer counts in simulated survey results with real # peer counts (see note on similar conditional for inbound peers # above) @@ -252,19 +244,11 @@ def _getsurveyresult(self, params): "count.", node, len(out_edges), - node_json["totalOutbound"]) - node_json["totalOutbound"] = len(out_edges) + node_json["numTotalOutboundPeers"]) + node_json["numTotalOutboundPeers"] = len(out_edges) _add_v2_survey_data(node_json) - # Rename peer count fields to match stellar-core's response - _rename_dictionary_field(node_json, - "totalInbound", - "numTotalInboundPeers") - _rename_dictionary_field(node_json, - "totalOutbound", - "numTotalOutboundPeers") - self._results["topology"][node] = node_json return SimulatedResponse(json=self._results)