Skip to content

Commit

Permalink
Merge pull request #126 from timkpaine/queue
Browse files Browse the repository at this point in the history
add streaming queue
  • Loading branch information
timkpaine authored Feb 25, 2021
2 parents fcaefb3 + d359fb4 commit db71a2c
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 9 deletions.
5 changes: 3 additions & 2 deletions tributary/lazy/node.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import six
import inspect
from boltons.funcutils import wraps

# from boltons.funcutils import wraps
from ..utils import _either_type
from ..base import TributaryException

Expand Down Expand Up @@ -473,7 +474,6 @@ def my_method(self):
"Missing args (call or preprocessing error has occurred)"
)

@wraps(meth)
def meth_wrapper(self, *args, **kwargs):
if is_method:
# val = meth(self, *(arg.value() if isinstance(arg, Node) else getattr(self, arg).value() for arg in args if arg not in default_attrs), **
Expand Down Expand Up @@ -522,4 +522,5 @@ def meth_wrapper(self, *args, **kwargs):
)

ret._node_wrapper = new_node
# ret = wraps(meth)(ret)
return ret
5 changes: 3 additions & 2 deletions tributary/streaming/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .utils import *


def run(node, blocking=True):
def run(node, blocking=True, **kwargs):
graph = node.constructGraph()
return graph.run(blocking)
kwargs["blocking"] = blocking
return graph.run(**kwargs)
13 changes: 8 additions & 5 deletions tributary/streaming/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ async def _run(self):
# return last val
return last

def run(self, blocking=True, newloop=False):
def run(self, blocking=True, newloop=False, start=True):
if sys.platform == "win32":
# Set to proactor event loop on window
# (default in python 3.8+)
Expand All @@ -103,10 +103,13 @@ def run(self, blocking=True, newloop=False):
except KeyboardInterrupt:
return

t = Thread(target=loop.run_until_complete, args=(task,))
t.daemon = True
t.start()
return loop
if start:
t = Thread(target=loop.run_until_complete, args=(task,))
t.daemon = True
t.start()
return loop

return loop, task

def graph(self):
return self._starting_node.graph()
Expand Down
18 changes: 18 additions & 0 deletions tributary/streaming/input/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,21 @@ async def _input(message=message, json=json):

super().__init__(foo=_input, message=message, json=json, **kwargs)
self._name = "Console"


class Queue(Foo):
"""Streaming wrapper to emit as values are received from an asynchronous queue
Arguments:
queue (Queue): asyncio Queue
"""

def __init__(self, queue, **kwargs):
async def foo(queue=queue):
return await queue.get()

super().__init__(foo=foo, count=0, **kwargs)
self._name = "Queue"


QueueSource = Queue
1 change: 1 addition & 0 deletions tributary/streaming/output/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from .kafka import Kafka as KafkaSink
from .output import Collect, Dagre
from .output import Foo as FooOutput
from .output import Queue as QueueSink
from .output import Graph, GraphViz, Logging, Perspective, PPrint, Print
from .postgres import Postgres as PostgresSink
from .socketio import SocketIO as SocketIOSink
Expand Down
18 changes: 18 additions & 0 deletions tributary/streaming/output/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,23 @@ def foo(val):
return ret


def Queue(node, queue):
async def foo(val):
await queue.put(val)
return val

node = _gen_node(node)
ret = Node(
foo=foo,
foo_kwargs=None,
name="Queue",
inputs=1,
graphvizshape=_OUTPUT_GRAPHVIZSHAPE,
)
node >> ret
return ret


Node.foo = Foo
Node.collect = Collect
Node.graph = Graph
Expand All @@ -211,3 +228,4 @@ def foo(val):
Node.print = Print
Node.logging = Logging
Node.perspective = Perspective
Node.queue = Queue
3 changes: 3 additions & 0 deletions tributary/tests/streaming/test_api_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,16 @@ def test_api_outputs(self):
assert hasattr(ts, "EmailSink")
assert hasattr(ts.StreamingNode, "textMessage")
assert hasattr(ts, "TextMessageSink")
assert hasattr(ts.StreamingNode, "queue")
assert hasattr(ts, "QueueSink")

def test_api_other_inputs(self):
# Other inputs
assert hasattr(ts, "Timer")
assert hasattr(ts, "Const")
assert hasattr(ts, "Curve")
assert hasattr(ts, "Foo")
assert hasattr(ts, "Queue")
assert hasattr(ts, "Random")

def test_api_streaming_specific(self):
Expand Down

0 comments on commit db71a2c

Please sign in to comment.