diff --git a/nameko_grpc/streams.py b/nameko_grpc/streams.py index edf81b8..c8b9765 100644 --- a/nameko_grpc/streams.py +++ b/nameko_grpc/streams.py @@ -2,6 +2,8 @@ import struct from queue import Empty, Queue +from eventlet import greenthread + from nameko_grpc.compression import compress, decompress from nameko_grpc.errors import GrpcError from nameko_grpc.headers import HeaderManager @@ -201,6 +203,14 @@ def flush_queue_to_buffer(self): ) self.buffer.write(data) + # This while loop can lockup the main thread for a long time if we have a + # large queue of large messages to process. We need to yield to cooperate + # with eventlet greenthreads and this is the canonical way of doing so. + # Calling :func:`~greenthread.sleep` with *seconds* of 0 is the + # canonical way of expressing a cooperative yield + # TODO: Consider alternative solutions, ie breaking the flush into chunks + greenthread.sleep(0) + def serialize_message(self, message): return message.SerializeToString()