From 89e3543aa06e00eeab7eaf5bdd3a33a6112356b1 Mon Sep 17 00:00:00 2001 From: okkez Date: Wed, 13 Jun 2018 07:50:47 +0900 Subject: [PATCH] Support closing stream (#877) * Support closing stream In this version, we can close stream as following: ```ruby client = Twitter::Streaming::Client.new do |config| # Set up configuration end t = Thread.new do client.filter(track: keyword) do |object| # process object end end Signal.trap(:TERM) do # terminate process silently client.close exit true end ``` * Add Twitter::Streaming::Connection#stream spec * Wait for connection * Suppress rubocop warning lib/twitter/streaming/connection.rb:17:7: C: Method has too many lines. [13/10] def stream(request, response) ... ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ * Use port number 8443 instead of 11443 Because rubocop reports following warnings: spec/twitter/streaming/connection_spec.rb:88:48: C: Use underscores(_) as decimal mark and separate every 3 digits with them. let(:client) { TCPSocket.new('127.0.0.1', 11443) } ^^^^^ spec/twitter/streaming/connection_spec.rb:94:44: C: Use underscores(_) as decimal mark and separate every 3 digits with them. @server = TCPServer.new('127.0.0.1', 11443) ^^^^^ --- lib/twitter/streaming/client.rb | 4 +++ lib/twitter/streaming/connection.rb | 19 ++++++++++-- spec/twitter/streaming/connection_spec.rb | 37 +++++++++++++++++++++++ 3 files changed, 57 insertions(+), 3 deletions(-) diff --git a/lib/twitter/streaming/client.rb b/lib/twitter/streaming/client.rb index 154f3d5ba..569221f2f 100644 --- a/lib/twitter/streaming/client.rb +++ b/lib/twitter/streaming/client.rb @@ -105,6 +105,10 @@ def before_request(&block) end end + def close + @connection.close + end + private def request(method, uri, params) diff --git a/lib/twitter/streaming/connection.rb b/lib/twitter/streaming/connection.rb index 9343c8e32..364f3435e 100644 --- a/lib/twitter/streaming/connection.rb +++ b/lib/twitter/streaming/connection.rb @@ -11,14 +11,23 @@ def initialize(options = {}) @tcp_socket_class = options.fetch(:tcp_socket_class) { TCPSocket } @ssl_socket_class = options.fetch(:ssl_socket_class) { OpenSSL::SSL::SSLSocket } @using_ssl = options.fetch(:using_ssl) { false } + @write_pipe = nil end - def stream(request, response) + def stream(request, response) # rubocop:disable MethodLength client = connect(request) request.stream(client) - while body = client.readpartial(1024) # rubocop:disable AssignmentInCondition - response << body + read_pipe, @write_pipe = IO.pipe + loop do + read_ios, _write_ios, _exception_ios = IO.select([read_pipe, client]) + case read_ios.first + when client + response << client.readpartial(1024) + when read_pipe + break + end end + client.close end def connect(request) @@ -30,6 +39,10 @@ def connect(request) ssl_client.connect end + def close + @write_pipe.write('q') if @write_pipe + end + private def new_tcp_socket(host, port) diff --git a/spec/twitter/streaming/connection_spec.rb b/spec/twitter/streaming/connection_spec.rb index ca6d3e399..04a988d47 100644 --- a/spec/twitter/streaming/connection_spec.rb +++ b/spec/twitter/streaming/connection_spec.rb @@ -77,4 +77,41 @@ def <<(data); end end end end + + describe 'stream' do + subject(:connection) do + Twitter::Streaming::Connection.new(tcp_socket_class: DummyTCPSocket, ssl_socket_class: DummySSLSocket) + end + + let(:method) { :get } + let(:uri) { 'https://stream.twitter.com:443/1.1/statuses/sample.json' } + let(:client) { TCPSocket.new('127.0.0.1', 8443) } + + let(:request) { HTTP::Request.new(verb: method, uri: uri) } + let(:response) { DummyResponse.new {} } + + before do + @server = TCPServer.new('127.0.0.1', 8443) + end + + after do + @server.close + end + + it 'close stream' do + expect(connection).to receive(:connect).with(request).and_return(client) + expect(request).to receive(:stream).with(client) + + stream_closed = false + t = Thread.start do + connection.stream(request, response) + stream_closed = true + end + expect(stream_closed).to be false + sleep 1 + connection.close + t.join + expect(stream_closed).to be true + end + end end