-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Conversation
Twisted core doesn't have a general purpose one, so we need to write one ourselves. Features: - All writing happens in background thread - Supports both push and pull producers - Push producers get paused if the consumer falls behind
synapse/util/file_consumer.py
Outdated
@@ -0,0 +1,158 @@ | |||
# -*- coding: utf-8 -*- | |||
# Copyright 2018 New Vecotr Ltd |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
vecotr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's almost as if you're typing this by hand instead of having your editor generate it...
synapse/util/file_consumer.py
Outdated
self._notify_empty_deferred = None | ||
|
||
def registerProducer(self, producer, streaming): | ||
"""Part of IProducer interface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IConsumer?
synapse/util/file_consumer.py
Outdated
if not streaming: | ||
self.producer.resumeProducing() | ||
|
||
self.paused_producer = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we declare this in the constructor?
synapse/util/file_consumer.py
Outdated
_RESUME_ON_QUEUE_SIZE = 2 | ||
|
||
def __init__(self, file_obj): | ||
self.file_obj = file_obj |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a reason that some fields get underscores and some do not? I'd say they all should have one
synapse/util/file_consumer.py
Outdated
|
||
self.bytes_queue.put_nowait(bytes) | ||
|
||
# If this is a pushed based consumer and the queue is getting behind |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/pushed based consumer/PushProducer/ ?
synapse/util/file_consumer.py
Outdated
streaming (bool): True if push based producer, False if pull | ||
based. | ||
""" | ||
self.producer = producer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we have some sanity checks to prevent registering twice?
synapse/util/file_consumer.py
Outdated
if self.bytes_queue.qsize() <= self._RESUME_ON_QUEUE_SIZE: | ||
reactor.callFromThread(self._resume_paused_producer) | ||
|
||
if self._notify_empty and self.bytes_queue.empty(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self._notify_empty is always truthy
synapse/util/file_consumer.py
Outdated
# producer. | ||
if self.producer and self.paused_producer: | ||
if self.bytes_queue.qsize() <= self._RESUME_ON_QUEUE_SIZE: | ||
reactor.callFromThread(self._resume_paused_producer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this block until the callee finishes, or not? if not we could end up resuming multiple times - is that a problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fairly sure it doesn't, and its not a problem as _resume_paused_producer
checks self.paused_producer
again anyway.
synapse/util/file_consumer.py
Outdated
def wait(self): | ||
"""Returns a deferred that resolves when finished writing to file | ||
""" | ||
return make_deferred_yieldable(self.finished_deferred) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does this make_deferred_yieldable when nothing else does?
tests/util/test_file_consumer.py
Outdated
consumer.registerProducer(producer, True) | ||
|
||
consumer.write("Foo") | ||
yield consumer.wait_for_writes() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kinda feel like this might be better done by having the test file object let you wait for writes, rather than having the extra functionality in the consumer solely for testing, but ymmv
a048fa8
to
be0dfcd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
Twisted core doesn't have a general purpose one, so we need to write one
ourselves.
Features: