diff --git a/lib/dalli/protocol/base.rb b/lib/dalli/protocol/base.rb index 89c4fd25..899fb8aa 100644 --- a/lib/dalli/protocol/base.rb +++ b/lib/dalli/protocol/base.rb @@ -32,7 +32,10 @@ def request(opkey, *args) verify_state(opkey) begin - send(opkey, *args) + @connection_manager.start_request! + response = send(opkey, *args) + @connection_manager.finish_request! + response rescue Dalli::MarshalError => e log_marshal_err(args.first, e) raise @@ -40,7 +43,8 @@ def request(opkey, *args) raise rescue StandardError => e log_unexpected_err(e) - down! + close + raise end end @@ -66,9 +70,9 @@ def unlock!; end # Returns nothing. def pipeline_response_setup verify_state(:getkq) + @connection_manager.start_request! write_noop response_buffer.reset - @connection_manager.start_request! end # Attempt to receive and parse as many key/value pairs as possible diff --git a/lib/dalli/protocol/connection_manager.rb b/lib/dalli/protocol/connection_manager.rb index 5a0b6ff0..f1bf17f1 100644 --- a/lib/dalli/protocol/connection_manager.rb +++ b/lib/dalli/protocol/connection_manager.rb @@ -54,6 +54,7 @@ def establish_connection @sock = memcached_socket @pid = PIDCache.pid + @request_in_progress = false rescue SystemCallError, Timeout::Error, EOFError, SocketError => e # SocketError = DNS resolution failure error_on_request!(e) @@ -98,6 +99,7 @@ def socket_timeout end def confirm_ready! + close if request_in_progress? error_on_request!(RuntimeError.new('Already writing to socket')) if request_in_progress? close_on_fork if fork_detected? end @@ -136,36 +138,26 @@ def abort_request! end def read_line - start_request! data = @sock.gets("\r\n") error_on_request!('EOF in read_line') if data.nil? - finish_request! data rescue SystemCallError, Timeout::Error, EOFError => e error_on_request!(e) end def read(count) - start_request! - data = @sock.readfull(count) - finish_request! - data + @sock.readfull(count) rescue SystemCallError, Timeout::Error, EOFError => e error_on_request!(e) end def write(bytes) - start_request! - result = @sock.write(bytes) - finish_request! - result + @sock.write(bytes) rescue SystemCallError, Timeout::Error => e error_on_request!(e) end - # Non-blocking read. Should only be used in the context - # of a caller who has called start_request!, but not yet - # called finish_request!. Here to support the operation + # Non-blocking read. Here to support the operation # of the get_multi operation def read_nonblock @sock.read_available diff --git a/test/integration/test_network.rb b/test/integration/test_network.rb index e5461d36..27b6a31a 100644 --- a/test/integration/test_network.rb +++ b/test/integration/test_network.rb @@ -113,6 +113,104 @@ end end + it 'handles asynchronous Thread#raise' do + with_nil_logger do + memcached(p, 19_191) do |dc| + 10.times do |i| + thread = Thread.new do + loop do + assert_instance_of Integer, dc.set("key:#{i}", i.to_s) + end + rescue RuntimeError + end + thread.join(rand(0.01..0.2)) + + thread.raise("Test Timeout Error") + joined_thread = thread.join(1) + refute_nil joined_thread + refute_predicate joined_thread, :alive? + assert_equal i.to_s, dc.get("key:#{i}") + end + end + end + end + + it 'handles asynchronous Thread#raise during pipelined get' do + with_nil_logger do + memcached(p, 19_191) do |dc| + 10.times do |i| + expected_response = 100.times.to_h { |x| ["key:#{i}:#{x}", x.to_s] } + expected_response.each do |key, val| + dc.set(key, val) + end + + thread = Thread.new do + loop do + assert_equal expected_response, dc.get_multi(expected_response.keys) + end + rescue RuntimeError + end + thread.join(rand(0.01..0.2)) + + thread.raise("Test Timeout Error") + joined_thread = thread.join(1) + refute_nil joined_thread + refute_predicate joined_thread, :alive? + assert_equal expected_response, dc.get_multi(expected_response.keys) + end + end + end + end + + it 'handles asynchronous Thread#kill' do + with_nil_logger do + memcached(p, 19_191) do |dc| + 10.times do |i| + thread = Thread.new do + loop do + assert_instance_of Integer, dc.set("key:#{i}", i.to_s) + end + rescue RuntimeError + end + thread.join(rand(0.01..0.2)) + + thread.kill + joined_thread = thread.join(1) + refute_nil joined_thread + refute_predicate joined_thread, :alive? + assert_equal i.to_s, dc.get("key:#{i}") + end + end + end + end + + it 'handles asynchronous Thread#kill during pipelined get' do + with_nil_logger do + memcached(p, 19_191) do |dc| + 10.times do |i| + expected_response = 100.times.to_h { |x| ["key:#{i}:#{x}", x.to_s] } + expected_response.each do |key, val| + dc.set(key, val) + end + + thread = Thread.new do + loop do + assert_equal expected_response, dc.get_multi(expected_response.keys) + end + rescue RuntimeError + end + thread.join(rand(0.01..0.2)) + + thread.kill + joined_thread = thread.join(1) + refute_nil joined_thread + refute_predicate joined_thread, :alive? + assert_equal expected_response, dc.get_multi(expected_response.keys) + end + end + end + end + it 'passes a simple smoke test on a TCP socket' do memcached_persistent(p) do |dc, port| resp = dc.flush