From 5ac816d757f9c96feedd05da6c27ca629af39ac3 Mon Sep 17 00:00:00 2001 From: Graham Mainwaring Date: Thu, 14 May 2020 20:13:34 -0400 Subject: [PATCH] Add Unix control socket and remove ephemeral nodes --- docs/controller.rst | 116 ++++---- docs/install.rst | 2 +- .../templates/receptor_node.conf.j2 | 4 +- .../roles/receptor_install/vars/main.yml | 4 +- packaging/docker/Dockerfile | 4 +- receptor/__main__.py | 11 +- receptor/config.py | 253 ++++++++---------- receptor/connection/manager.py | 2 +- receptor/control_socket.py | 165 ++++++++++++ receptor/controller.py | 74 ++--- receptor/entrypoints.py | 231 ++++------------ receptor/receptor.py | 42 +-- receptor/router.py | 10 +- receptor/work.py | 2 - test.ini | 4 +- test/perf/flat-mesh.yaml | 2 +- test/perf/random-mesh.yaml | 2 +- test/perf/test_ports.py | 4 +- test/perf/tree-mesh.yaml | 2 +- 19 files changed, 444 insertions(+), 490 deletions(-) create mode 100644 receptor/control_socket.py diff --git a/docs/controller.rst b/docs/controller.rst index e96ccee2..8d8fe470 100644 --- a/docs/controller.rst +++ b/docs/controller.rst @@ -35,7 +35,7 @@ receptor nodes :lineno-start: 3 controller = receptor.Controller(config) - controller.enable_server(["rnp://0.0.0.0:8888"]) + controller.enable_server(["rnp://0.0.0.0:7323"]) controller.run() The server won't start listening until you call the ``run()`` method. @@ -43,7 +43,7 @@ The server won't start listening until you call the ``run()`` method. Starting the service is useful for letting other Receptor nodes connect to the Controller but it's also possible to have the controller reach out to peers directly:: - controller.add_peer("rnp://10.0.0.1:8888") + controller.add_peer("rnp://10.0.0.1:7323") Once the event loop is started with ``run()`` the connection will be established. @@ -59,33 +59,25 @@ Sending and Receiving Work -------------------------- Now that we have the basics of initializing the Controller and starting a service lets look at -sending and receiving work +sending and receiving work:: .. code-block:: python msgid = await controller.send(payload={"url": "https://github.com/status", "method": "GET"}, recipient="othernode", - directive="receptor_http:execute) + directive="receptor_http:execute, + response_handler=receive_response) -When a message is constructed for sending via the Receptor mesh, an identifier is generated and -returned. If you have sent several messages to the mesh you can use this identifier to distinguish -responses from one request to another. You should have another task elsewhere that can receive -responses, which we'll get to later. In the meantime +When a message is constructed for sending via the Receptor mesh, a message identifier is generated and +returned. Reply traffic referring to this message identifier will be passed to the ``response_handler`` +callback. The final reply message will have an "eof" header. This callback can be implemented as follows: .. code-block:: python - message = await controller.recv() - print(f"{message.header.in_response_to} : {message.payload.readall()}) - -Plugins on Receptor nodes can send multiple messages in response to a single request and it's -useful to know when a plugin is done performing work - -.. code-block:: python - - message = await controller.recv() - print(f"{message.header.in_response_to} : {message.payload.readall()}) - if message.header.get("eof", False): - print("Work finished!") + async def receive_response(message)): + print(f"{message.header} : {message.payload.readall()}") + if message.header.get("eof", False): + print("Work finished!") Using asyncio tasks for Sending and Receiving --------------------------------------------- @@ -99,7 +91,7 @@ Tasks that send data One approach that you can take is to create an async task that looks for work and pass that to the Controller's *run()* method. This way, when you are finished checking for work all you need to do -is return and the Controller shuts down +is return and the Controller shuts down:: .. code-block:: python :linenos: @@ -117,7 +109,6 @@ is return and the Controller shuts down else: if am_i_done: break - asyncio.sleep(0.1) print("All done, Controller shutting down!") config = receptor.ReceptorConfig() @@ -125,69 +116,57 @@ is return and the Controller shuts down controller.run(relay_work) Passing this task to *run()* is optional and it's entirely possible to just create this task and -just have the runloop be persistent +have the run loop be persistent:: .. code-block:: python :linenos: - def my_awesome_controller(): - async def relay_work(): - while True: - work_thing = get_some_work() - if work_thing: - controller.send( - payload=work_thing, - recipient="my_other_receptor_node", - directive="receptor_plugin:execute" - ) - asyncio.sleep(0.1) - print("All done, Controller shutting down!") - config = receptor.ReceptorConfig() controller = receptor.Controller(config) controller.loop.create_task(relay_work) + controller.loop.create_task(some_other_task) controller.run() +Unlike the first example, this will not exit when :meth:`relay_work` and :meth:`some_other_task` +are complete. + Tasks that receive data ^^^^^^^^^^^^^^^^^^^^^^^ -Receiving data is very similar to sending data in that it allows you to take a few different -approaches that match your use case. The Controller internally relies on an -`AsyncIO Queue `_ if you have your own -way of fetching events from this queue you can pass it to the Controller when you instantiate it - -.. code-block:: python - - controller = receptor.Controller(config, queue=my_asyncio_queue) - -Any responses will be received in the queue as they arrive to the Controller node. If you don't -have an existing queue, one is automatically created for you and is available at *controller.queue* - -There is a helpful method on the controller that you can use to call and receive an event once they -come in: :meth:`receptor.controller.Controller.recv` lets take a look at how we can create a task -to consume an event one at a time from that queue +As described above, data is received through a callback provided to the +:meth:`receptor.controller.Controller.send` method. We can extend the sample controller +to receive reply traffic as follows:: .. code-block:: python :linenos: def my_awesome_controller(): - async def read_responses(): + + async def handle_response(message): + if message.payload: + print(f"I got a response and it said: {message.payload.readall().decode()}") + print(f"It was in response to {message.header.get("in_response_to", None)}") + if message.header.get("eof", False): + print("The plugin was finished sending messages") + + async def relay_work(): while True: - message = await controller.recv() - sent_message_id = message.header.get("in_response_to", None) - if message.payload and sent_message_id: - print(f"I got a response and it said: {message.payload.readall().decode()}") - print(f"It was in response to {sent_message_id}") - if message.header.get("eof", False): - print("The plugin was finished sending messages") + work_thing = get_some_work() + if work_thing: + controller.send( + payload=work_thing, + recipient="my_other_receptor_node", + directive="receptor_plugin:execute", + response_handler=handle_response + ) + else: + if am_i_done: + break + print("All done, Controller shutting down!") config = receptor.ReceptorConfig() controller = receptor.Controller(config) - controller.loop.create_task(read_responses()) - controller.run() - -Combine this response handling task with the message sending tasks from the section above and you -have a complete Receptor controller system ready to be integrated. + controller.run(relay_work) Getting information about the mesh ---------------------------------- @@ -227,12 +206,13 @@ is currently being executed. Sending a ping works a lot like sending a normal message as in the examples above, except there is a special controller method for it: :meth:`receptor.controller.Controller.ping`:: - msg_id = controller.ping("some_other_node") + def handle_ping_response(message): + print(f"Ping response with content {message.payload.readall().decode()}") + + controller.ping("some_other_node", response_handler=handle_ping_response) -The responses appear in the response queue if/when it's received. The *msg_id* will match -the *in_response_to* key on the received message:: +The content of a ping response looks as follows:: - message = await controller.recv() pprint(message.payload.readall().decode()) { "initial_time":{ diff --git a/docs/install.rst b/docs/install.rst index f7fe7faa..a6cdfecf 100644 --- a/docs/install.rst +++ b/docs/install.rst @@ -13,7 +13,7 @@ Receptor nodes once you provide an ``inventory`` file that looks like this hostA server_port=32888 hostB peers=["hostA"] - hostC peers=["hostB", "hostA:32888"] server_port=8889 + hostC peers=["hostB", "hostA:32888"] server_port=7323 This will deploy a 3 node cluster. There are some other ways of tuning the deployment on each node which you can see by reading the install playbook, run ansible to start the install:: diff --git a/installer/roles/receptor_install/templates/receptor_node.conf.j2 b/installer/roles/receptor_install/templates/receptor_node.conf.j2 index 2397b5b3..a7e2df80 100644 --- a/installer/roles/receptor_install/templates/receptor_node.conf.j2 +++ b/installer/roles/receptor_install/templates/receptor_node.conf.j2 @@ -2,5 +2,5 @@ debug={{ server_debug|bool|default("False") }} [node] -listen={{ server_address | default("0.0.0.0") }}:{{ service_port | default("8888") }} -server_disable={{ server_disable|bool|default("False") }} +listen={{ server_address | default("0.0.0.0") }}:{{ service_port | default("7323") }} +no_listen={{ no_listen|bool|default("False") }} diff --git a/installer/roles/receptor_install/vars/main.yml b/installer/roles/receptor_install/vars/main.yml index e5ec8dfd..460785d1 100644 --- a/installer/roles/receptor_install/vars/main.yml +++ b/installer/roles/receptor_install/vars/main.yml @@ -1,6 +1,6 @@ --- receptor_user: receptor -service_port: 8888 +service_port: 7323 server_address: 0.0.0.0 -server_disable: false +no_listen: false server_debug: false diff --git a/packaging/docker/Dockerfile b/packaging/docker/Dockerfile index dfbacc30..8f01f723 100644 --- a/packaging/docker/Dockerfile +++ b/packaging/docker/Dockerfile @@ -28,7 +28,9 @@ ENV LANG=en_US.UTF-8 ENV LANGUAGE=en_US:en ENV LC_ALL=en_US.UTF-8 ENV HOME=/var/lib/receptor -EXPOSE 8888/tcp +EXPOSE 7323/tcp +EXPOSE 7324/tcp +EXPOSE 7325/tcp WORKDIR /var/lib/receptor ENTRYPOINT ["entrypoint"] CMD ["receptor", "-c", "/var/lib/receptor/receptor.conf", "node"] diff --git a/receptor/__main__.py b/receptor/__main__.py index af7d745a..4a439b2c 100644 --- a/receptor/__main__.py +++ b/receptor/__main__.py @@ -6,6 +6,8 @@ from .config import ReceptorConfig from .diagnostics import log_buffer from .logstash_formatter.logstash import LogstashFormatter +from . import entrypoints +from .exceptions import ReceptorRuntimeError logger = logging.getLogger(__name__) @@ -47,7 +49,7 @@ def main(args=None): ) def _f(record): - record.node_id = config.default_node_id + record.node_id = config.node_node_id if record.levelno == logging.ERROR: log_buffer.appendleft(record) return True @@ -56,7 +58,12 @@ def _f(record): h.addFilter(_f) try: - config.go() + entrypoint_name = config.get_entrypoint_name() + entrypoint_func = getattr(entrypoints, entrypoint_name, None) + if entrypoint_func: + entrypoint_func(config) + else: + raise ReceptorRuntimeError(f"Unknown entrypoint {entrypoint_name}") except asyncio.CancelledError: pass except Exception: diff --git a/receptor/config.py b/receptor/config.py index f608aa85..908b90da 100644 --- a/receptor/config.py +++ b/receptor/config.py @@ -1,24 +1,19 @@ +import os import argparse import configparser import logging -import os import ssl -from .entrypoints import run_as_node, run_as_ping, run_as_send, run_as_status from .exceptions import ReceptorRuntimeError, ReceptorConfigError logger = logging.getLogger(__name__) SINGLETONS = {} SUBCOMMAND_EXTRAS = { - "node": {"hint": "Run a Receptor node", "entrypoint": run_as_node, "is_ephemeral": False}, - "ping": {"hint": "Ping a Receptor node", "entrypoint": run_as_ping, "is_ephemeral": True}, - "send": {"hint": "Send a directive to a node", "entrypoint": run_as_send, "is_ephemeral": True}, - "status": { - "hint": "Display status of the Receptor network", - "entrypoint": run_as_status, - "is_ephemeral": True, - }, + "node": {"hint": "Run a Receptor node", "entrypoint": "run_as_node"}, + "ping": {"hint": "Ping a Receptor node", "entrypoint": "run_as_ping"}, + "send": {"hint": "Send a directive to a node", "entrypoint": "run_as_send"}, + "status": {"hint": "Display status of the Receptor network", "entrypoint": "run_as_status"}, } @@ -39,7 +34,7 @@ class ReceptorConfig: the caller by passing them in a dictionary to args:: config = receptor.ReceptorConfig(args=dict(default_config="/opt/receptor.conf")) - config.default_data_dir = "/var/run/" + config.node_data_dir = "/var/run/" controller = receptor.Controller(config) Some options are only relevant when running as a node from the command line. When invoking @@ -47,25 +42,22 @@ class ReceptorConfig: and listen addresses will be set up using Controller methods. """ - def __init__(self, args=None): + def __init__( + self, args=None, parser_class=argparse.ArgumentParser, parser_opts=None, context=None + ): self._config_options = {} - self._cli_args = argparse.ArgumentParser("receptor") + if not parser_opts: + parser_opts = {} + self._cli_args = parser_class("receptor", **parser_opts) self._cli_sub_args = self._cli_args.add_subparsers() self._parsed_args = None self._config_file = configparser.ConfigParser(allow_no_value=True, delimiters=("=",)) - self._is_ephemeral = False + self._context = context # Default options, which apply to all sub-commands. self.add_config_option( section="default", - key="node_id", - default_value="", - value_type="str", - hint="""Set/override node identifier. If unspecified here or in a config file, - one will be automatically generated.""", - ) - self.add_config_option( - section="default", + contexts=["cli"], key="config", short_option="-c", default_value="/etc/receptor/receptor.conf", @@ -74,14 +66,26 @@ def __init__(self, args=None): ) self.add_config_option( section="default", - key="data_dir", - short_option="-d", - default_value=None, + contexts=["cli"], + key="socket_path", + short_option="-s", + default_value="/var/run/receptor.sock", value_type="path", - hint="Path to the directory where Receptor stores its database and metadata.", + hint="Path to the Receptor control socket.", ) self.add_config_option( section="default", + contexts=["cli"], + key="no_socket", + long_option="--no-socket", + default_value=False, + set_value=True, + value_type="bool", + hint="Disable the control socket", + ) + self.add_config_option( + section="default", + contexts=["cli"], key="debug", default_value=None, set_value=True, @@ -91,6 +95,7 @@ def __init__(self, args=None): default_max_workers = min(32, os.cpu_count() + 4) self.add_config_option( section="default", + contexts=["cli"], key="max_workers", default_value=default_max_workers, value_type="int", @@ -99,6 +104,7 @@ def __init__(self, args=None): ) self.add_config_option( section="default", + contexts=["cli"], key="logging_format", default_value="simple", value_type="str", @@ -109,6 +115,7 @@ def __init__(self, args=None): # so all of these options use `subparse=False`. self.add_config_option( section="auth", + contexts=["cli"], key="server_cert", default_value="", value_type="str", @@ -117,6 +124,7 @@ def __init__(self, args=None): ) self.add_config_option( section="auth", + contexts=["cli"], key="server_key", default_value="", value_type="str", @@ -125,6 +133,7 @@ def __init__(self, args=None): ) self.add_config_option( section="auth", + contexts=["cli"], key="server_ca_bundle", default_value=None, value_type="str", @@ -133,6 +142,7 @@ def __init__(self, args=None): ) self.add_config_option( section="auth", + contexts=["cli"], key="client_cert", default_value="", value_type="str", @@ -141,6 +151,7 @@ def __init__(self, args=None): ) self.add_config_option( section="auth", + contexts=["cli"], key="client_key", default_value="", value_type="str", @@ -149,6 +160,7 @@ def __init__(self, args=None): ) self.add_config_option( section="auth", + contexts=["cli"], key="client_verification_ca", default_value=None, value_type="str", @@ -157,6 +169,7 @@ def __init__(self, args=None): ) self.add_config_option( section="auth", + contexts=["cli"], key="server_cipher_list", default_value=None, value_type="str", @@ -165,6 +178,7 @@ def __init__(self, args=None): ) self.add_config_option( section="auth", + contexts=["cli"], key="client_cipher_list", default_value=None, value_type="str", @@ -174,14 +188,44 @@ def __init__(self, args=None): # Receptor node options self.add_config_option( section="node", + contexts=["cli"], + key="node_id", + default_value="", + value_type="str", + hint="""Set/override node identifier. If unspecified here or in a config file, + one will be automatically generated.""", + ) + self.add_config_option( + section="node", + contexts=["cli"], + key="data_dir", + short_option="-d", + default_value="/var/lib/receptor", + value_type="path", + hint="Path to the directory where Receptor stores its database and metadata.", + ) + self.add_config_option( + section="node", + contexts=["cli"], key="listen", - default_value=["rnp://0.0.0.0:8888"], + default_value=["rnp://0.0.0.0:7323"], value_type="list", hint="""Set/override IP address and port to listen on. If not set here - or in a config file, the default is rnp://0.0.0.0:8888.""", + or in a config file, the default is rnp://0.0.0.0:7323.""", + ) + self.add_config_option( + section="node", + contexts=["cli"], + key="no_listen", + long_option="--no-listen", + default_value=False, + set_value=True, + value_type="bool", + hint="Disable the server function and only connect to configured peers", ) self.add_config_option( section="node", + contexts=["cli"], key="peers", short_option="-p", long_option="--peer", @@ -192,15 +236,7 @@ def __init__(self, args=None): ) self.add_config_option( section="node", - key="server_disable", - long_option="--server-disable", - default_value=False, - set_value=True, - value_type="bool", - hint="Disable the server function and only connect to configured peers", - ) - self.add_config_option( - section="node", + contexts=["cli"], key="stats_enable", default_value=None, set_value=True, @@ -209,13 +245,15 @@ def __init__(self, args=None): ) self.add_config_option( section="node", + contexts=["cli"], key="stats_port", - default_value=8889, + default_value=7325, value_type="int", hint="Port to listen for requests to show stats", ) self.add_config_option( section="node", + contexts=["cli"], key="keepalive_interval", default_value=-1, value_type="int", @@ -224,6 +262,7 @@ def __init__(self, args=None): ) self.add_config_option( section="node", + contexts=["cli"], key="groups", short_option="-g", long_option="--group", @@ -234,6 +273,7 @@ def __init__(self, args=None): ) self.add_config_option( section="node", + contexts=["cli"], key="ws_extra_headers", long_option="--ws_extra_header", default_value=[], @@ -242,6 +282,7 @@ def __init__(self, args=None): ) self.add_config_option( section="node", + contexts=["cli"], key="ws_heartbeat", long_option="--ws_heartbeat", default_value=None, @@ -251,22 +292,16 @@ def __init__(self, args=None): # ping options self.add_config_option( section="ping", - key="peer", - default_value="localhost:8888", - value_type="str", - hint="""The peer to relay the ping directive through. If unspecified here or - in a config file, localhost:8888 will be used.""", - ) - self.add_config_option( - section="ping", + contexts=["cli", "socket"], key="count", - default_value=4, + default_value=1, value_type="int", hint="""Number of pings to send. If set to zero, pings will be continuously sent until interrupted.""", ) self.add_config_option( section="ping", + contexts=["cli", "socket"], key="delay", default_value=1, value_type="float", @@ -275,54 +310,35 @@ def __init__(self, args=None): ) self.add_config_option( section="ping", + contexts=["cli", "socket"], key="recipient", long_option="ping_recipient", default_value="", value_type="str", hint="Node ID of the Receptor node to ping.", ) - self.add_config_option( - section="ping", - key="ws_extra_headers", - long_option="--ws_extra_header", - default_value=[], - value_type="key-value-list", - hint="Set additional headers to provide when connecting to websocket peers.", - ) - self.add_config_option( - section="ping", - key="ws_heartbeat", - long_option="--ws_heartbeat", - default_value=None, - value_type="int", - hint="Set heartbeat interval for websocket connections.", - ) # send options self.add_config_option( section="send", - key="peer", - default_value="localhost:8888", + contexts=["cli", "socket"], + key="recipient", + long_option="send_recipient", + default_value="", value_type="str", - hint="""The peer to relay the directive through. If unspecified here or in a config - file, localhost:8888 will be used.""", + hint="Node ID of the Receptor node to ping.", ) self.add_config_option( section="send", + contexts=["cli", "socket"], key="directive", + long_option="send_directive", default_value="", value_type="str", hint="Directive to send.", ) self.add_config_option( section="send", - key="recipient", - long_option="send_recipient", - default_value="", - value_type="str", - hint="Node ID of the Receptor node to ping.", - ) - self.add_config_option( - section="send", + contexts=["cli", "socket"], key="payload", long_option="send_payload", default_value="", @@ -330,61 +346,23 @@ def __init__(self, args=None): hint="""Payload of the directive to send. Use - for stdin or give the path to a file to transmit the file contents.""", ) - self.add_config_option( - section="send", - key="ws_extra_headers", - long_option="--ws_extra_header", - default_value=[], - value_type="list", - hint="Set additional headers to provide when connecting to websocket peers.", - ) - self.add_config_option( - section="send", - key="ws_heartbeat", - long_option="--ws_heartbeat", - default_value=None, - value_type="int", - hint="Set heartbeat interval for websocket connections.", - ) # status options self.add_config_option( section="status", - key="peer", - default_value="localhost:8888", - value_type="str", - hint="""The peer to access the mesh through. If unspecified here or in a config file, - localhost:8888 will be used.""", - ) - self.add_config_option( - section="status", - key="ws_extra_headers", - long_option="--ws_extra_header", - default_value=[], - value_type="key-value-list", - listof="str", - hint="Set additional headers to provide when connecting to websocket peers.", - ) - self.add_config_option( - section="status", - key="show_ephemeral", - default_value=None, + contexts=["cli", "socket"], + key="verbose", + long_option="verbose", + default_value=False, set_value=True, value_type="bool", - hint="Show ephemeral nodes in output", - ) - self.add_config_option( - section="status", - key="ws_heartbeat", - long_option="--ws_heartbeat", - default_value=None, - value_type="int", - hint="Set heartbeat interval for websocket connections.", + hint="Print additional status information.", ) self.parse_options(args) def add_config_option( self, section, + contexts, key, cli=True, short_option="", @@ -396,6 +374,8 @@ def add_config_option( subparse=True, hint=None, ): + if self._context and self._context not in contexts: + return config_entry = "%s_%s" % (section, key) if cli: # for lists, we switch the action from 'store' to 'append' @@ -437,8 +417,7 @@ def add_config_option( sub_extra = SUBCOMMAND_EXTRAS.get(section, None) if sub_extra: subparser = self._cli_sub_args.add_parser(section, help=sub_extra["hint"]) - subparser.set_defaults(func=sub_extra["entrypoint"]) - subparser.set_defaults(ephemeral=sub_extra["is_ephemeral"]) + subparser.set_defaults(entrypoint=sub_extra["entrypoint"]) subparser.add_argument(*args, **kwargs) # finally, we add the ConfigOption to the internal dict for tracking @@ -471,15 +450,16 @@ def _get_config_value(self, key, ignore_config_file=False): def parse_options(self, args): # first we parse the cli args self._parsed_args = self._cli_args.parse_args(args) - # we manually force the config entry to be parsed first, since - # we need it before we do anything else - config_entry = self._config_options["default_config"] - config_path = self._get_config_value("default_config", ignore_config_file=True) - if config_path is not None: - config_entry.value = config_path - self._enforce_entry_type(config_entry) - # next we read the config file - self._config_file.read([config_entry.value]) + if "default_config" in self._config_options: + # we manually force the config entry to be parsed first, since + # we need it before we do anything else + config_entry = self._config_options["default_config"] + config_path = self._get_config_value("default_config", ignore_config_file=True) + if config_path is not None: + config_entry.value = config_path + self._enforce_entry_type(config_entry) + # next we read the config file + self._config_file.read([config_entry.value]) # then we loop through our config options, based on the option # precedence of CLI > environment > config file for key in self._config_options: @@ -500,12 +480,6 @@ def parse_options(self, args): self._config_options["plugins"][section.replace("plugin_", "")] = dict( self._config_file[section] ) - # If we did not get a data_dir from anywhere else, use a default - if self._config_options["default_data_dir"].value is None: - if self._is_ephemeral: - self._config_options["default_data_dir"].value = "/tmp/receptor" - else: - self._config_options["default_data_dir"].value = "/var/lib/receptor" def _enforce_entry_type(self, entry): if entry.value is not None: @@ -559,15 +533,14 @@ def _enforce_value_type(self, value, value_type): return None raise ReceptorConfigError(e) - def go(self): + def get_entrypoint_name(self): if not self._parsed_args: raise ReceptorRuntimeError("there are no parsed args yet") - elif not hasattr(self._parsed_args, "func"): + elif not hasattr(self._parsed_args, "entrypoint"): raise ReceptorRuntimeError( "you must specify a subcommand (%s)." % (", ".join(SUBCOMMAND_EXTRAS.keys()),) ) - self._is_ephemeral = self._parsed_args.ephemeral - self._parsed_args.func(self) + return self._parsed_args.entrypoint def get_ssl_context(self, context_type): if context_type == "server": diff --git a/receptor/connection/manager.py b/receptor/connection/manager.py index fab33143..5f320e15 100644 --- a/receptor/connection/manager.py +++ b/receptor/connection/manager.py @@ -4,7 +4,7 @@ from . import sock, ws -default_scheme_ports = {"rnp": 8888, "rnps": 8899, "ws": 80, "wss": 443} +default_scheme_ports = {"rnp": 7323, "rnps": 7324, "ws": 80, "wss": 443} def parse_peer(peer, role): diff --git a/receptor/control_socket.py b/receptor/control_socket.py new file mode 100644 index 00000000..892d42f5 --- /dev/null +++ b/receptor/control_socket.py @@ -0,0 +1,165 @@ +import logging +import argparse +import shlex +import asyncio +from .config import ReceptorConfig + +logger = logging.getLogger(__name__) + + +class ArgumentParsingError(Exception): + pass + + +class QuietArgParser(argparse.ArgumentParser): + def __init__(self, *args, **kwargs): + self.writer = kwargs.pop("writer", None) + super().__init__(*args, **kwargs) + + def print_usage(self, file=None): + if file is None: + file = self.writer + self._print_message(self.format_usage(), file) + + def print_help(self, file=None): + if file is None: + file = self.writer + self._print_message(self.format_help(), file) + + def _print_message(self, message, file=None): + if message: + if file is None: + file = self.writer + if type(file) is asyncio.StreamWriter: + file.write(message.encode("utf-8")) + else: + file.write(message) + + def exit(self, status=0, message=None): + raise ArgumentParsingError(f"Error {status}: {message}") + + def error(self, message): + raise ArgumentParsingError(message) + + +class ControlSocketSession: + def __init__(self, controller, reader, writer): + self.controller = controller + self.receptor = controller.receptor + self.reader = reader + self.writer = writer + self.events = dict() + + async def writestr(self, *args, **kwargs): + sep = kwargs.get("sep", " ") + end = kwargs.get("end", "\n") + self.writer.write((sep.join(args) + end).encode("utf-8")) + await self.writer.drain() + + async def writebytes(self, bytes, crlf=False): + self.writer.write(bytes) + if crlf: + self.writer.write(b"\n") + await self.writer.drain() + + async def handle_ping_response(self, message): + if message.payload and not self.writer.is_closing(): + await self.writebytes(message.payload.readall(), crlf=True) + e = self.events.get(message.header["in_response_to"], None) + if e: + e.set() + + async def run_as_ping(self, config): + try: + eventlist = list() + for i in range(config.ping_count): + msg_id = await self.controller.ping( + config.ping_recipient, self.handle_ping_response + ) + e = asyncio.Event() + eventlist.append(e) + self.events[msg_id] = e + if i < config.ping_count - 1: + await asyncio.sleep(config.ping_delay) + try: + await asyncio.wait([e.wait() for e in eventlist], timeout=5) + except asyncio.TimeoutError: + pass + except Exception as e: + await self.writestr(f"Error: {str(e)}") + + async def handle_send_response(self, message): + if message.payload and not self.writer.is_closing(): + await self.writebytes(message.payload.readall(), crlf=True) + if message.header.get("eof", False): + e = self.events.get(message.header["in_response_to"], None) + if e: + e.set() + + async def run_as_send(self, config): + try: + msg_id = await self.controller.send( + config.send_payload, + config.send_recipient, + config.send_directive, + self.handle_send_response, + ) + e = asyncio.Event() + self.events[msg_id] = e + await e.wait() + except Exception as e: + await self.writestr(f"Error: {str(e)}") + + async def run_as_status(self, config): + await self.writestr("Nodes:") + await self.writestr(" Myself:", self.receptor.router.node_id) + await self.writestr(" Others:") + for node in self.receptor.router.get_nodes(): + await self.writestr(" -", node) + await self.writestr() + await self.writestr("Route Map:") + for edge in self.receptor.router.get_edges(): + await self.writestr("-", str(tuple(edge))) + await self.writestr() + await self.writestr("Known Node Capabilities:") + for node, node_data in self.receptor.known_nodes.items(): + await self.writestr(" ", node, ":", sep="") + for cap, cap_value in node_data["capabilities"].items(): + await self.writestr(" ", cap, ": ", str(cap_value), sep="") + + async def session(self): + logger.debug("Received socket connection") + while not self.reader.at_eof(): + command = await self.reader.readline() + if command: + command = command.decode("utf-8").strip() + logger.debug(f"Received command {command}") + try: + config = ReceptorConfig( + shlex.split(command), + parser_class=QuietArgParser, + parser_opts=dict(writer=self.writer, add_help=False), + context="socket", + ) + entrypoint_name = config.get_entrypoint_name() + entrypoint_func = getattr(self, entrypoint_name, None) + if entrypoint_func: + await entrypoint_func(config) + else: + await self.writestr(f"Not implemented: {entrypoint_name}") + except ArgumentParsingError as e: + await self.writestr(f"Error: {str(e)}") + await self.writer.drain() + logger.debug("Socket connection closed") + if self.writer.can_write_eof(): + self.writer.write_eof() + await self.writer.drain() + self.writer.close() + + +class ControlSocketServer: + def __init__(self, controller): + self.controller = controller + + async def serve_from_socket(self, reader, writer): + await ControlSocketSession(self.controller, reader, writer).session() diff --git a/receptor/controller.py b/receptor/controller.py index 9ab1168f..2eecf98d 100644 --- a/receptor/controller.py +++ b/receptor/controller.py @@ -3,7 +3,6 @@ import io import logging import os -import shutil from contextlib import suppress from .connection.base import Worker @@ -11,6 +10,7 @@ from .diagnostics import status from .messages.framed import FileBackedBuffer, FramedMessage from .receptor import Receptor +from .control_socket import ControlSocketServer logger = logging.getLogger(__name__) @@ -30,16 +30,12 @@ class Controller: :type queue: asyncio.Queue """ - def __init__(self, config, loop=asyncio.get_event_loop(), queue=None): + def __init__(self, config, loop=asyncio.get_event_loop()): self.receptor = Receptor(config) self.loop = loop self.connection_manager = Manager( lambda: Worker(self.receptor, loop), self.receptor.config.get_ssl_context, loop ) - self.queue = queue - if self.queue is None: - self.queue = asyncio.Queue(loop=loop) - self.receptor.response_queue = self.queue self.status_task = loop.create_task(status(self.receptor)) async def shutdown_loop(self): @@ -69,9 +65,9 @@ def enable_server(self, listen_urls): Examples of supported formats: - * rnps://0.0.0.0:8888 - Secure receptor protocol bound on all interfaces port 8888 - * rnp://1.2.3.4:8888 - Insecure receptor protocol bound to the interface of 1.2.3.4 - port 8888 + * rnps://0.0.0.0:7323 - Secure receptor protocol bound on all interfaces port 7323 + * rnp://1.2.3.4:7323 - Insecure receptor protocol bound to the interface of 1.2.3.4 + port 7323 * wss://0.0.0.0:443 - Secure websocket protocol bound on all interfaces port 443 The services are started as asyncio tasks and will start listening once @@ -81,40 +77,43 @@ def enable_server(self, listen_urls): """ tasks = list() for url in listen_urls: - listener = self.connection_manager.get_listener(url) logger.info("Serving on %s", url) + listener = self.connection_manager.get_listener(url) tasks.append(self.loop.create_task(listener)) return tasks + def enable_control_socket(self, socket_path): + """ + Enables a Unix domain socket over which the running node can receive commands such as send, + ping and status. + + The socket listener is started as an asyncio task and will start listening once + :meth:`receptor.controller.Controller.run` is called. + + :param socket_path: A path to the socket file, such as /var/run/receptor.sock. + """ + logger.info("Listening on Unix socket on %s", socket_path) + socket_server = asyncio.start_unix_server( + ControlSocketServer(self).serve_from_socket, path=socket_path, loop=self.loop + ) + return self.loop.create_task(socket_server) + def add_peer(self, peer, ws_extra_headers=None, ws_heartbeat=None): """ Adds a Receptor Node *Peer*. A connection will be established to this node once :meth:`receptor.controller.Controller.run` is called. Example format: - rnps://10.0.1.1:8888 + rnps://10.0.1.1:7323 :param peer: remote peer url """ logger.info("Connecting to peer {}".format(peer)) return self.connection_manager.get_peer( - peer, - reconnect=not self.receptor.config._is_ephemeral, - ws_extra_headers=ws_extra_headers, - ws_heartbeat=ws_heartbeat, + peer, ws_extra_headers=ws_extra_headers, ws_heartbeat=ws_heartbeat, ) - async def recv(self): - """ - Fetch a single response message from the response queue, this method blocks - and should be *await* ed or assigned to a Future - - :return: A single response message - :rtype: :class:`receptor.messages.framed.FramedMessage` - """ - return await self.receptor.response_queue.get() - - async def send(self, payload, recipient, directive, expect_response=True): + async def send(self, payload, recipient, directive, response_handler=None): """ Sends a payload to a recipient *Node* to execute under a given *directive*. @@ -163,22 +162,20 @@ async def send(self, payload, recipient, directive, expect_response=True): ), payload=buffer, ) - await self.receptor.router.send(message, expected_response=expect_response) + await self.receptor.router.send(message, response_handler=response_handler) return message.msg_id - async def ping(self, destination, expected_response=True): + async def ping(self, destination, response_handler=None): """ Sends a ping message to a remote Receptor node with the expectation that it will return information about when it received the ping, what its capabilities are and what work it is currently doing. - A good example of a standalone Controller that just implements ping can be found at - :meth:`receptor.entrypoints.run_as_ping` - :param destination: The node id of the target node + :param response_handler: Callback function to receive the ping response :returns: a message-id that can be used to reference responses """ - return await self.receptor.router.ping_node(destination, expected_response) + return await self.receptor.router.ping_node(destination, response_handler) def run(self, app=None): """ @@ -196,16 +193,3 @@ def run(self, app=None): pass finally: self.loop.stop() - - def cleanup_tmpdir(self): - try: - is_ephemeral = self.receptor.config._is_ephemeral - base_path = self.receptor.base_path - except AttributeError: - return - if is_ephemeral: - try: - logger.debug(f"Removing temporary directory {base_path}") - shutil.rmtree(base_path) - except Exception: - logger.error(f"Error while removing temporary directory {base_path}", exc_info=True) diff --git a/receptor/entrypoints.py b/receptor/entrypoints.py index 2e1b35ab..99ed4fa3 100644 --- a/receptor/entrypoints.py +++ b/receptor/entrypoints.py @@ -1,7 +1,6 @@ import asyncio import logging -import sys -import time +import shlex from prometheus_client import start_http_server @@ -22,192 +21,62 @@ async def node_keepalive(): ) * config.node_keepalive_interval controller.loop.call_at(absolute_call_time, controller.loop.create_task, node_keepalive()) - try: - controller = Controller(config) - logger.info(f"Running as Receptor node with ID: {controller.receptor.node_id}") - if config.node_stats_enable: - logger.info(f"Starting stats on port {config.node_stats_port}") - start_http_server(config.node_stats_port) - if not config.node_server_disable: - listen_tasks = controller.enable_server(config.node_listen) - controller.loop.create_task(controller.exit_on_exceptions_in(listen_tasks)) - for peer in config.node_peers: - controller.add_peer( - peer, - ws_extra_headers=config.node_ws_extra_headers, - ws_heartbeat=config.node_ws_heartbeat, - ) - if config.node_keepalive_interval > 1: - controller.loop.create_task(node_keepalive()) - controller.loop.create_task( - controller.receptor.connection_manifest.watch_expire(controller.receptor.buffer_mgr) + controller = Controller(config) + logger.info(f"Running as Receptor node with ID: {controller.receptor.node_id}") + if config.node_stats_enable: + logger.info(f"Starting stats on port {config.node_stats_port}") + start_http_server(config.node_stats_port) + if not config.node_no_listen: + listen_tasks = controller.enable_server(config.node_listen) + controller.loop.create_task(controller.exit_on_exceptions_in(listen_tasks)) + if not config.default_no_socket: + socket_task = controller.enable_control_socket(config.default_socket_path) + controller.loop.create_task(controller.exit_on_exceptions_in([socket_task])) + + for peer in config.node_peers: + controller.add_peer( + peer, + ws_extra_headers=config.node_ws_extra_headers, + ws_heartbeat=config.node_ws_heartbeat, ) - controller.run() - finally: - controller.cleanup_tmpdir() - - -async def run_oneshot_command( - controller, peer, recipient, ws_extra_headers, ws_heartbeat, send_func, read_func -): - if (not recipient) or (recipient != controller.receptor.node_id): - add_peer_task = controller.add_peer( - peer, ws_extra_headers=ws_extra_headers, ws_heartbeat=ws_heartbeat - ) - start_wait = time.time() - while True: - if add_peer_task and add_peer_task.done() and not add_peer_task.result(): - print("Connection failed. Exiting.") - return False - if ( - (not recipient or controller.receptor.router.node_is_known(recipient)) - and controller.receptor.route_send_time is not None - and time.time() - controller.receptor.route_send_time > 2.0 - ): - break - if time.time() - start_wait > 10: - print("Connection timed out. Exiting.") - if not add_peer_task.done(): - add_peer_task.cancel() - return False - await asyncio.sleep(0.5) - read_task = controller.loop.create_task(read_func()) - await send_func() - await read_task - return True + if config.node_keepalive_interval > 1: + controller.loop.create_task(node_keepalive()) + controller.loop.create_task( + controller.receptor.connection_manifest.watch_expire(controller.receptor.buffer_mgr) + ) + controller.run() + + +def run_command_via_socket(config, command): + async def command_client(): + reader, writer = await asyncio.open_unix_connection(config.default_socket_path) + writer.write(f"{' '.join(shlex.quote(str(x)) for x in command)}\n".encode("utf-8")) + if writer.can_write_eof(): + writer.write_eof() + while not reader.at_eof(): + line = await reader.readline() + print(line.decode("utf-8"), end="") + writer.close() + + loop = asyncio.get_event_loop() + loop.run_until_complete(command_client()) + loop.close() def run_as_ping(config): - def ping_iter(): - if config.ping_count: - for x in range(config.ping_count): - yield x - else: - while True: - yield 0 - - async def ping_entrypoint(): - return await run_oneshot_command( - controller, - config.ping_peer, - config.ping_recipient, - config.ping_ws_extra_headers, - config.ping_ws_heartbeat, - send_pings, - read_responses, - ) - - async def read_responses(): - for _ in ping_iter(): - message = await controller.recv() - print(message.payload.readall().decode()) - - async def send_pings(): - for x in ping_iter(): - await controller.ping(config.ping_recipient) - if x + 1 < config.ping_count: - await asyncio.sleep(config.ping_delay) - - try: - logger.info(f"Sending ping to {config.ping_recipient} via {config.ping_peer}.") - controller = Controller(config) - controller.run(ping_entrypoint) - finally: - controller.cleanup_tmpdir() + command = ["ping", config.ping_recipient] + if config.ping_count is not None: + command.extend(["--count", config.ping_count]) + if config.ping_delay is not None: + command.extend(["--delay", config.ping_delay]) + run_command_via_socket(config, command) def run_as_send(config): - async def send_entrypoint(): - return await run_oneshot_command( - controller, - config.send_peer, - config.send_recipient, - config.send_ws_extra_headers, - config.send_ws_heartbeat, - send_message, - read_responses, - ) - - async def send_message(): - if config.send_payload == "-": - data = sys.stdin.buffer.read() - else: - data = config.send_payload - await controller.send( - payload=data, recipient=config.send_recipient, directive=config.send_directive - ) - - async def read_responses(): - while True: - message = await controller.recv() - logger.debug(f"{message}") - if message.header.get("in_response_to", None): - logger.debug("Received response message") - if message.header.get("eof", False): - logger.info("Received EOF") - if message.header.get("code", 0) != 0: - logger.error(f"EOF was an error result") - if message.payload: - print(f"ERROR: {message.payload.readall().decode()}") - else: - print(f"No EOF Error Payload") - break - elif message.payload: - print(message.payload.readall().decode()) - else: - print("---") - else: - logger.warning(f"Received unknown message {message}") - - try: - logger.info( - f"""Sending directive {config.send_directive} to {config.send_recipient} - via {config.send_peer}""" - ) - controller = Controller(config) - controller.run(send_entrypoint) - finally: - controller.cleanup_tmpdir() + run_command_via_socket( + config, ["send", config.send_recipient, config.send_directive, config.send_payload] + ) def run_as_status(config): - async def status_entrypoint(): - return await run_oneshot_command( - controller, - config.status_peer, - None, - config.status_ws_extra_headers, - config.status_ws_heartbeat, - print_status, - noop, - ) - - async def print_status(): - - # This output should be formatted so as to be parseable as YAML - - r = controller.receptor - print("Nodes:") - print(" Myself:", r.router.node_id) - print(" Others:") - for node in r.router.get_nodes(): - print(" -", node) - print() - print("Route Map:") - for edge in r.router.get_edges(): - print("-", str(tuple(edge))) - print() - print("Known Node Capabilities:") - for node, node_data in r.known_nodes.items(): - print(" ", node, ":", sep="") - for cap, cap_value in node_data["capabilities"].items(): - print(" ", cap, ": ", str(cap_value), sep="") - - async def noop(): - return - - try: - controller = Controller(config) - controller.run(status_entrypoint) - finally: - controller.cleanup_tmpdir() + run_command_via_socket(config, ["status"]) diff --git a/receptor/receptor.py b/receptor/receptor.py index dda9cb61..a8312c6e 100644 --- a/receptor/receptor.py +++ b/receptor/receptor.py @@ -76,11 +76,9 @@ async def remove(self, connection): class Receptor: """ Owns all connections and maintains adding and removing them. """ - def __init__( - self, config, node_id=None, router_cls=None, work_manager_cls=None, response_queue=None - ): + def __init__(self, config, node_id=None, router_cls=None, work_manager_cls=None): self.config = config - self.node_id = node_id or self.config.default_node_id or self._find_node_id() + self.node_id = node_id or self.config.node_node_id or self._find_node_id() self.router = (router_cls or MeshRouter)(self) self.route_sender_task = None self.route_send_time = time.time() @@ -88,10 +86,9 @@ def __init__( self.route_adv_seen = dict() self.work_manager = (work_manager_cls or WorkManager)(self) self.connections = dict() - self.response_queue = response_queue - self.base_path = os.path.join(self.config.default_data_dir, self.node_id) + self.base_path = os.path.join(self.config.node_data_dir, self.node_id) if not os.path.exists(self.base_path): - os.makedirs(os.path.join(self.config.default_data_dir, self.node_id)) + os.makedirs(os.path.join(self.config.node_data_dir, self.node_id)) self.connection_manifest = Manifest(os.path.join(self.base_path, "connection_manifest")) path = os.path.join(os.path.expanduser(self.base_path)) self.buffer_mgr = FileBufferManager(path) @@ -154,41 +151,18 @@ async def update_connections(self, protocol_obj, id_=None): stats.connected_peers_gauge.inc() - async def remove_ephemeral(self, node): - logger.debug(f"Removing ephemeral node {node}") - changed = False - if node in self.connections: - await self.connection_manifest.remove(node) - changed = True - if node in self.known_nodes: - del self.known_nodes[node] - changed = True - if changed: - await self.recalculate_and_send_routes_soon() - async def remove_connection(self, protocol_obj, id_=None): routing_changed = False for connection_node in self.connections: if protocol_obj in self.connections[connection_node]: routing_changed = True logger.info(f"Removing connection for node {connection_node}") - if self.is_ephemeral(connection_node): - self.connections[connection_node].remove(protocol_obj) - await self.remove_ephemeral(connection_node) - else: - self.connections[connection_node].remove(protocol_obj) - await self.connection_manifest.update(connection_node) + self.connections[connection_node].remove(protocol_obj) + await self.connection_manifest.update(connection_node) if routing_changed: await self.recalculate_and_send_routes_soon() stats.connected_peers_gauge.dec() - def is_ephemeral(self, id_): - return ( - id_ in self.known_nodes - and "ephemeral" in self.known_nodes[id_]["capabilities"] - and self.known_nodes[id_]["capabilities"]["ephemeral"] - ) - async def remove_connection_by_id(self, id_, loop=None): if id_ in self.connections: for protocol_obj in self.connections[id_]: @@ -443,7 +417,9 @@ async def handle_response(self, msg): in_response_to = msg.header["in_response_to"] if in_response_to in self.router.response_registry: logger.info(f"Handling response to {in_response_to} with callback.") - await self.response_queue.put(msg) + resp = self.router.response_registry[in_response_to].get("response_handler", None) + if resp: + asyncio.ensure_future(resp(msg)) else: logger.warning(f"Received response to {in_response_to} but no record of sent message.") diff --git a/receptor/router.py b/receptor/router.py index 32dd1400..50dda9dc 100644 --- a/receptor/router.py +++ b/receptor/router.py @@ -180,7 +180,7 @@ def next_hop(self, recipient): else: return None - async def ping_node(self, node_id, expected_response=True): + async def ping_node(self, node_id, response_handler=None): now = datetime.datetime.utcnow() logger.info(f"Sending ping to node {node_id}, timestamp={now}") message = FramedMessage( @@ -188,7 +188,7 @@ async def ping_node(self, node_id, expected_response=True): sender=self.node_id, recipient=node_id, timestamp=now, directive="receptor:ping" ) ) - return await self.send(message, expected_response) + return await self.send(message, response_handler) async def forward(self, msg, next_hop): """ @@ -209,7 +209,7 @@ async def forward(self, msg, next_hop): except Exception as e: logger.exception("Error trying to forward message to {}: {}".format(next_hop, e)) - async def send(self, message, expected_response=False): + async def send(self, message, response_handler=None): """ Send a new message with the given outer envelope. """ @@ -223,9 +223,9 @@ async def send(self, message, expected_response=False): message.header.update({"sender": self.node_id, "route_list": [self.node_id]}) logger.debug(f"Sending {message.msg_id} to {recipient} via {next_node_id}") - if expected_response and "directive" in message.header: + if response_handler: self.response_registry[message.msg_id] = dict( - message_sent_time=message.header["timestamp"] + response_handler=response_handler, message_sent_time=message.header["timestamp"] ) if next_node_id == self.node_id: asyncio.ensure_future(self.receptor.handle_message(message)) diff --git a/receptor/work.py b/receptor/work.py index 072096c7..7eaffa94 100644 --- a/receptor/work.py +++ b/receptor/work.py @@ -42,8 +42,6 @@ def get_capabilities(self): }, "max_work_threads": self.receptor.config.default_max_workers, } - if self.receptor.config._is_ephemeral: - caps["ephemeral"] = True return caps def get_work(self): diff --git a/test.ini b/test.ini index 4735223c..0a0a2b53 100644 --- a/test.ini +++ b/test.ini @@ -1,5 +1,5 @@ [server] -port=8889 +port=7324 [peers] -127.0.0.1:8888 +127.0.0.1:7323 diff --git a/test/perf/flat-mesh.yaml b/test/perf/flat-mesh.yaml index 61c672f2..26659c9b 100644 --- a/test/perf/flat-mesh.yaml +++ b/test/perf/flat-mesh.yaml @@ -2,7 +2,7 @@ nodes: controller: connections: [] - listen: receptor://127.0.0.1:8889 + listen: receptor://127.0.0.1:7323 name: controller stats_enable: true stats_port: null diff --git a/test/perf/random-mesh.yaml b/test/perf/random-mesh.yaml index 9f4dcf45..79c10f85 100644 --- a/test/perf/random-mesh.yaml +++ b/test/perf/random-mesh.yaml @@ -2,7 +2,7 @@ nodes: controller: connections: [] - listen: receptor://127.0.0.1:8889 + listen: receptor://127.0.0.1:7323 name: controller stats_enable: true stats_port: null diff --git a/test/perf/test_ports.py b/test/perf/test_ports.py index e04f443e..8f3e1aef 100644 --- a/test/perf/test_ports.py +++ b/test/perf/test_ports.py @@ -62,7 +62,7 @@ def test_listen_query(): def test_no_port_given(): """Start a node, and don't specify a port on which to listen. - Assert that it listens on port 8888. Older versions of receptor would listen on a random port. + Assert that it listens on port 7323. Older versions of receptor would listen on a random port. See: `receptor #138`_. .. _receptor #138: https://github.com/project-receptor/receptor/issues/138 @@ -72,6 +72,6 @@ def test_no_port_given(): try: conns = psutil.Process(node.pid).connections() assert len(conns) == 1 - assert conns[0].laddr.port == 8888 + assert conns[0].laddr.port == 7323 finally: node.stop() diff --git a/test/perf/tree-mesh.yaml b/test/perf/tree-mesh.yaml index d0c892a7..8a95bfe3 100644 --- a/test/perf/tree-mesh.yaml +++ b/test/perf/tree-mesh.yaml @@ -2,7 +2,7 @@ nodes: controller: connections: [] - listen: receptor://127.0.0.1:8889 + listen: receptor://127.0.0.1:7323 name: controller stats_enable: true stats_port: null