Skip to content

Commit

Permalink
Better handle Thread#raise and Thread#kill
Browse files Browse the repository at this point in the history
Fix: petergoldstein#956

While it's heavily discouraged, `Timeout.timeout` end up being
relatively frequently used in production, so ideally it's better
to try to handle it gracefully.

This patch is inspired from redis-rb/redis-client@5f82254
before sending a request we flip a flag, and once we fully read the
response(s), we flip it back.

If the flag is not `false` when we start a request, we know the connection
may have unread responses from a previously aborted request, and we
automatically discard it.
  • Loading branch information
byroot committed May 17, 2023
1 parent 0483446 commit 0f2b374
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 23 deletions.
9 changes: 6 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ Dalli Changelog
Unreleased
==========

- Better handle memcached requests being interrupted by Thread#raise or Thread#kill (byroot)
- Unexpected errors are no longer treated as `Dalli::NetworkError`, including errors raised by `Timeout.timeout` (byroot)

3.2.4
==========

- Cache PID calls for performance since glibc no longer caches in recent versions (casperisfine)
- Preallocate the read buffer in Socket#readfull (casperisfine)
- Cache PID calls for performance since glibc no longer caches in recent versions (byroot)
- Preallocate the read buffer in Socket#readfull (byroot)

3.2.3
==========
Expand Down Expand Up @@ -51,7 +54,7 @@ Unreleased
3.1.4
==========

- Improve response parsing performance (casperisfine)
- Improve response parsing performance (byroot)
- Reorganize binary protocol parsing a bit (petergoldstein)
- Fix handling of non-ASCII keys in get_multi (petergoldstein)

Expand Down
19 changes: 15 additions & 4 deletions lib/dalli/protocol/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,22 @@ def request(opkey, *args)
verify_state(opkey)

begin
send(opkey, *args)
@connection_manager.start_request!
response = send(opkey, *args)

# pipelined_get emit query but doesn't read the response(s)
@connection_manager.finish_request! unless opkey == :pipelined_get

response
rescue Dalli::MarshalError => e
log_marshal_err(args.first, e)
raise
rescue Dalli::DalliError
raise
rescue StandardError => e
log_unexpected_err(e)
down!
close
raise
end
end

Expand All @@ -65,10 +72,9 @@ def unlock!; end
#
# Returns nothing.
def pipeline_response_setup
verify_state(:getkq)
verify_pipelined_state(:getkq)
write_noop
response_buffer.reset
@connection_manager.start_request!
end

# Attempt to receive and parse as many key/value pairs as possible
Expand Down Expand Up @@ -169,6 +175,11 @@ def verify_state(opkey)
raise_down_error unless ensure_connected!
end

def verify_pipelined_state(_opkey)
@connection_manager.confirm_in_progress!
raise_down_error unless connected?
end

# The socket connection to the underlying server is initialized as a side
# effect of this call. In fact, this is the ONLY place where that
# socket connection is initialized.
Expand Down
29 changes: 15 additions & 14 deletions lib/dalli/protocol/connection_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def establish_connection

@sock = memcached_socket
@pid = PIDCache.pid
@request_in_progress = false
rescue SystemCallError, Timeout::Error, EOFError, SocketError => e
# SocketError = DNS resolution failure
error_on_request!(e)
Expand Down Expand Up @@ -98,7 +99,13 @@ def socket_timeout
end

def confirm_ready!
error_on_request!(RuntimeError.new('Already writing to socket')) if request_in_progress?
close if request_in_progress?
close_on_fork if fork_detected?
end

def confirm_in_progress!
raise '[Dalli] No request in progress. This may be a bug in Dalli.' unless request_in_progress?

close_on_fork if fork_detected?
end

Expand All @@ -124,10 +131,14 @@ def request_in_progress?
end

def start_request!
raise '[Dalli] Request already in progress. This may be a bug in Dalli.' if @request_in_progress

@request_in_progress = true
end

def finish_request!
raise '[Dalli] No request in progress. This may be a bug in Dalli.' unless @request_in_progress

@request_in_progress = false
end

Expand All @@ -136,36 +147,26 @@ def abort_request!
end

def read_line
start_request!
data = @sock.gets("\r\n")
error_on_request!('EOF in read_line') if data.nil?
finish_request!
data
rescue SystemCallError, Timeout::Error, EOFError => e
error_on_request!(e)
end

def read(count)
start_request!
data = @sock.readfull(count)
finish_request!
data
@sock.readfull(count)
rescue SystemCallError, Timeout::Error, EOFError => e
error_on_request!(e)
end

def write(bytes)
start_request!
result = @sock.write(bytes)
finish_request!
result
@sock.write(bytes)
rescue SystemCallError, Timeout::Error => e
error_on_request!(e)
end

# Non-blocking read. Should only be used in the context
# of a caller who has called start_request!, but not yet
# called finish_request!. Here to support the operation
# Non-blocking read. Here to support the operation
# of the get_multi operation
def read_nonblock
@sock.read_available
Expand Down
106 changes: 106 additions & 0 deletions test/integration/test_network.rb
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,112 @@
end
end

it 'handles asynchronous Thread#raise' do
with_nil_logger do
memcached(p, 19_191) do |dc|
10.times do |i|
thread = Thread.new do
loop do
assert_instance_of Integer, dc.set("key:#{i}", i.to_s)
end
rescue RuntimeError
nil # expected
end
thread.join(rand(0.01..0.2))

thread.raise('Test Timeout Error')
joined_thread = thread.join(1)

refute_nil joined_thread
refute_predicate joined_thread, :alive?
assert_equal i.to_s, dc.get("key:#{i}")
end
end
end
end

it 'handles asynchronous Thread#raise during pipelined get' do
with_nil_logger do
memcached(p, 19_191) do |dc|
10.times do |i|
expected_response = 100.times.to_h { |x| ["key:#{i}:#{x}", x.to_s] }
expected_response.each do |key, val|
dc.set(key, val)
end

thread = Thread.new do
loop do
assert_equal expected_response, dc.get_multi(expected_response.keys)
end
rescue RuntimeError
nil # expected
end
thread.join(rand(0.01..0.2))

thread.raise('Test Timeout Error')
joined_thread = thread.join(1)

refute_nil joined_thread
refute_predicate joined_thread, :alive?
assert_equal expected_response, dc.get_multi(expected_response.keys)
end
end
end
end

it 'handles asynchronous Thread#kill' do
with_nil_logger do
memcached(p, 19_191) do |dc|
10.times do |i|
thread = Thread.new do
loop do
assert_instance_of Integer, dc.set("key:#{i}", i.to_s)
end
rescue RuntimeError
nil # expected
end
thread.join(rand(0.01..0.2))

thread.kill
joined_thread = thread.join(1)

refute_nil joined_thread
refute_predicate joined_thread, :alive?
assert_equal i.to_s, dc.get("key:#{i}")
end
end
end
end

it 'handles asynchronous Thread#kill during pipelined get' do
with_nil_logger do
memcached(p, 19_191) do |dc|
10.times do |i|
expected_response = 100.times.to_h { |x| ["key:#{i}:#{x}", x.to_s] }
expected_response.each do |key, val|
dc.set(key, val)
end

thread = Thread.new do
loop do
assert_equal expected_response, dc.get_multi(expected_response.keys)
end
rescue RuntimeError
nil # expected
end
thread.join(rand(0.01..0.2))

thread.kill
joined_thread = thread.join(1)

refute_nil joined_thread
refute_predicate joined_thread, :alive?
assert_equal expected_response, dc.get_multi(expected_response.keys)
end
end
end
end

it 'passes a simple smoke test on a TCP socket' do
memcached_persistent(p) do |dc, port|
resp = dc.flush
Expand Down
4 changes: 2 additions & 2 deletions test/integration/test_pipelined_get.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
describe 'pipeline_next_responses' do
it 'raises NetworkError when called before pipeline_response_setup' do
memcached_persistent(p) do |dc|
server = dc.instance_variable_get(:@ring).servers.first
server = dc.send(:ring).servers.first
server.request(:pipelined_get, %w[a b])
assert_raises Dalli::NetworkError do
server.pipeline_next_responses
Expand All @@ -92,7 +92,7 @@

it 'raises NetworkError when called after pipeline_abort' do
memcached_persistent(p) do |dc|
server = dc.instance_variable_get(:@ring).servers.first
server = dc.send(:ring).servers.first
server.request(:pipelined_get, %w[a b])
server.pipeline_response_setup
server.pipeline_abort
Expand Down

0 comments on commit 0f2b374

Please sign in to comment.