Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add Unix control socket and remove ephemeral nodes #223

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 48 additions & 68 deletions docs/controller.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ receptor nodes
:lineno-start: 3

controller = receptor.Controller(config)
controller.enable_server(["rnp://0.0.0.0:8888"])
controller.enable_server(["rnp://0.0.0.0:7323"])
controller.run()

The server won't start listening until you call the ``run()`` method.

Starting the service is useful for letting other Receptor nodes connect to the Controller but
it's also possible to have the controller reach out to peers directly::

controller.add_peer("rnp://10.0.0.1:8888")
controller.add_peer("rnp://10.0.0.1:7323")

Once the event loop is started with ``run()`` the connection will be established.

Expand All @@ -59,33 +59,25 @@ Sending and Receiving Work
--------------------------

Now that we have the basics of initializing the Controller and starting a service lets look at
sending and receiving work
sending and receiving work::

.. code-block:: python

msgid = await controller.send(payload={"url": "https://github.com/status", "method": "GET"},
recipient="othernode",
directive="receptor_http:execute)
directive="receptor_http:execute,
response_handler=receive_response)

When a message is constructed for sending via the Receptor mesh, an identifier is generated and
returned. If you have sent several messages to the mesh you can use this identifier to distinguish
responses from one request to another. You should have another task elsewhere that can receive
responses, which we'll get to later. In the meantime
When a message is constructed for sending via the Receptor mesh, a message identifier is generated and
returned. Reply traffic referring to this message identifier will be passed to the ``response_handler``
callback. The final reply message will have an "eof" header. This callback can be implemented as follows:

.. code-block:: python

message = await controller.recv()
print(f"{message.header.in_response_to} : {message.payload.readall()})

Plugins on Receptor nodes can send multiple messages in response to a single request and it's
useful to know when a plugin is done performing work

.. code-block:: python

message = await controller.recv()
print(f"{message.header.in_response_to} : {message.payload.readall()})
if message.header.get("eof", False):
print("Work finished!")
async def receive_response(message)):
print(f"{message.header} : {message.payload.readall()}")
if message.header.get("eof", False):
print("Work finished!")

Using asyncio tasks for Sending and Receiving
---------------------------------------------
Expand All @@ -99,7 +91,7 @@ Tasks that send data

One approach that you can take is to create an async task that looks for work and pass that to the
Controller's *run()* method. This way, when you are finished checking for work all you need to do
is return and the Controller shuts down
is return and the Controller shuts down::

.. code-block:: python
:linenos:
Expand All @@ -117,77 +109,64 @@ is return and the Controller shuts down
else:
if am_i_done:
break
asyncio.sleep(0.1)
print("All done, Controller shutting down!")

config = receptor.ReceptorConfig()
controller = receptor.Controller(config)
controller.run(relay_work)

Passing this task to *run()* is optional and it's entirely possible to just create this task and
just have the runloop be persistent
have the run loop be persistent::

.. code-block:: python
:linenos:

def my_awesome_controller():
async def relay_work():
while True:
work_thing = get_some_work()
if work_thing:
controller.send(
payload=work_thing,
recipient="my_other_receptor_node",
directive="receptor_plugin:execute"
)
asyncio.sleep(0.1)
print("All done, Controller shutting down!")

config = receptor.ReceptorConfig()
controller = receptor.Controller(config)
controller.loop.create_task(relay_work)
controller.loop.create_task(some_other_task)
controller.run()

Unlike the first example, this will not exit when :meth:`relay_work` and :meth:`some_other_task`
are complete.

Tasks that receive data
^^^^^^^^^^^^^^^^^^^^^^^

Receiving data is very similar to sending data in that it allows you to take a few different
approaches that match your use case. The Controller internally relies on an
`AsyncIO Queue <https://docs.python.org/3.6/library/asyncio-queue.html>`_ if you have your own
way of fetching events from this queue you can pass it to the Controller when you instantiate it

.. code-block:: python

controller = receptor.Controller(config, queue=my_asyncio_queue)

Any responses will be received in the queue as they arrive to the Controller node. If you don't
have an existing queue, one is automatically created for you and is available at *controller.queue*

There is a helpful method on the controller that you can use to call and receive an event once they
come in: :meth:`receptor.controller.Controller.recv` lets take a look at how we can create a task
to consume an event one at a time from that queue
As described above, data is received through a callback provided to the
:meth:`receptor.controller.Controller.send` method. We can extend the sample controller
to receive reply traffic as follows::

.. code-block:: python
:linenos:

def my_awesome_controller():
async def read_responses():

async def handle_response(message):
if message.payload:
print(f"I got a response and it said: {message.payload.readall().decode()}")
print(f"It was in response to {message.header.get("in_response_to", None)}")
if message.header.get("eof", False):
print("The plugin was finished sending messages")

async def relay_work():
while True:
message = await controller.recv()
sent_message_id = message.header.get("in_response_to", None)
if message.payload and sent_message_id:
print(f"I got a response and it said: {message.payload.readall().decode()}")
print(f"It was in response to {sent_message_id}")
if message.header.get("eof", False):
print("The plugin was finished sending messages")
work_thing = get_some_work()
if work_thing:
controller.send(
payload=work_thing,
recipient="my_other_receptor_node",
directive="receptor_plugin:execute",
response_handler=handle_response
)
else:
if am_i_done:
break
print("All done, Controller shutting down!")

config = receptor.ReceptorConfig()
controller = receptor.Controller(config)
controller.loop.create_task(read_responses())
controller.run()

Combine this response handling task with the message sending tasks from the section above and you
have a complete Receptor controller system ready to be integrated.
controller.run(relay_work)

Getting information about the mesh
----------------------------------
Expand Down Expand Up @@ -227,12 +206,13 @@ is currently being executed.
Sending a ping works a lot like sending a normal message as in the examples above, except
there is a special controller method for it: :meth:`receptor.controller.Controller.ping`::

msg_id = controller.ping("some_other_node")
def handle_ping_response(message):
print(f"Ping response with content {message.payload.readall().decode()}")

controller.ping("some_other_node", response_handler=handle_ping_response)

The responses appear in the response queue if/when it's received. The *msg_id* will match
the *in_response_to* key on the received message::
The content of a ping response looks as follows::

message = await controller.recv()
pprint(message.payload.readall().decode())
{
"initial_time":{
Expand Down
2 changes: 1 addition & 1 deletion docs/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Receptor nodes once you provide an ``inventory`` file that looks like this

hostA server_port=32888
hostB peers=["hostA"]
hostC peers=["hostB", "hostA:32888"] server_port=8889
hostC peers=["hostB", "hostA:32888"] server_port=7323

This will deploy a 3 node cluster. There are some other ways of tuning the deployment on each node
which you can see by reading the install playbook, run ansible to start the install::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
debug={{ server_debug|bool|default("False") }}

[node]
listen={{ server_address | default("0.0.0.0") }}:{{ service_port | default("8888") }}
server_disable={{ server_disable|bool|default("False") }}
listen={{ server_address | default("0.0.0.0") }}:{{ service_port | default("7323") }}
no_listen={{ no_listen|bool|default("False") }}
4 changes: 2 additions & 2 deletions installer/roles/receptor_install/vars/main.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
receptor_user: receptor
service_port: 8888
service_port: 7323
server_address: 0.0.0.0
server_disable: false
no_listen: false
server_debug: false
4 changes: 3 additions & 1 deletion packaging/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ ENV LANG=en_US.UTF-8
ENV LANGUAGE=en_US:en
ENV LC_ALL=en_US.UTF-8
ENV HOME=/var/lib/receptor
EXPOSE 8888/tcp
EXPOSE 7323/tcp
EXPOSE 7324/tcp
EXPOSE 7325/tcp
WORKDIR /var/lib/receptor
ENTRYPOINT ["entrypoint"]
CMD ["receptor", "-c", "/var/lib/receptor/receptor.conf", "node"]
11 changes: 9 additions & 2 deletions receptor/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from .config import ReceptorConfig
from .diagnostics import log_buffer
from .logstash_formatter.logstash import LogstashFormatter
from . import entrypoints
from .exceptions import ReceptorRuntimeError

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -47,7 +49,7 @@ def main(args=None):
)

def _f(record):
record.node_id = config.default_node_id
record.node_id = config.node_node_id
if record.levelno == logging.ERROR:
log_buffer.appendleft(record)
return True
Expand All @@ -56,7 +58,12 @@ def _f(record):
h.addFilter(_f)

try:
config.go()
entrypoint_name = config.get_entrypoint_name()
entrypoint_func = getattr(entrypoints, entrypoint_name, None)
if entrypoint_func:
entrypoint_func(config)
else:
raise ReceptorRuntimeError(f"Unknown entrypoint {entrypoint_name}")
except asyncio.CancelledError:
pass
except Exception:
Expand Down
Loading