Skip to content

Commit

Permalink
Merge pull request #156 from nats-io/fix/timeouts-in-specs
Browse files Browse the repository at this point in the history
Fix specs hanging on CI
  • Loading branch information
palkan authored Jan 8, 2025
2 parents 10b2f1c + d1e8adc commit ac2112c
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 74 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/test-jruby.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,7 @@ jobs:
# Mark failures as warnings for now due to high level of flakiness
# and occasional OOM-s on CI
continue-on-error: true
env:
DEBUG_NATS_TEST: ${{ runner.debug }}
run: |
bundle exec rspec || DEBUG_NATS_TEST=true bundle exec rspec --only-failures
bundle exec rspec --force-color || bundle exec rspec --only-failures --force-color
6 changes: 4 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ jobs:
ruby-version: ${{ matrix.ruby }}
bundler-cache: true
- name: Run RSpec
env:
DEBUG_NATS_TEST: ${{ runner.debug }}
run: |
bundle exec rspec
bundle exec rspec --force-color
rspec-rails:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -62,4 +64,4 @@ jobs:
bundler-cache: true
- name: Run RSpec
run: |
bundle exec rspec --tag rails
bundle exec rspec --tag rails --force-color
1 change: 1 addition & 0 deletions .rspec
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
--require spec_helper
--color
144 changes: 85 additions & 59 deletions spec/client_drain_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@
end

it 'should gracefully drain a connection' do
nc = NATS.connect
nc2 = NATS.connect(drain_timeout: 15)
nc = NATS.connect(drain_timeout: 5)
nc2 = NATS.connect

errors = []
nc.on_error do |e|
errors << e
raise RSpec::Expectations::ExpectationNotMetError.new("Unexpected connection error: #{e}")
end

future = Future.new
Expand All @@ -28,102 +27,129 @@
end

wait_subs = Future.new
wait_pubs = Future.new
reqs_started = Queue.new
wait_reqs = Future.new

t = Thread.new do
sleep 1
stop_sending = false
wait_subs.wait_for(1)
50.times do |i|
break if nc2.closed?

('a'..'e').each do |subject|
10.times do |n|
begin
payload = "REQ:#{subject}:#{i}"
nc2.publish(subject, payload * 128)
rescue => e
end
end
wait_subs.wait_for(2)
40.times do |i|
('a'..'b').each do
payload = "REQ:#{_1}:#{i}"
nc2.publish(_1, payload * 128)
sleep 0.01
end
sleep 0.01
end

50.times do |i|
break if nc2.closed?

('a'..'e').each do |subject|
10.times do |n|
begin
payload = "REQ:#{subject}:#{i}"
msg = nc2.request(subject, payload)
rescue => e
end
end
wait_pubs.set_result(:ok)

('a'..'b').map do |sub|
Thread.new do
reqs_started << sub
payload = "REQ:#{sub}"
msg = nc2.request(sub, payload)
end
sleep 0.01
end
end.each(&:join)

wait_reqs.set_result(:ok)
end

# A queue to control the speed of processing messages
sub_queue = Queue.new
subs = []
('a'..'e').each do |subject|
('a'..'b').each do |subject|
sub = nc.subscribe(subject) do |msg|
begin
msg.respond("OK:#{msg.data}") if msg.reply
sleep 0.01
rescue => e
p e
end
ft = sub_queue.pop
msg.respond("OK:#{msg.data}") if msg.reply
sleep 0.01
ensure
ft.set_result(:ok)
end
subs << sub
end
nc.flush
wait_subs.set_result(:OK)

# Let the threads start accumulating some messages.
sleep 3
# process a few messages
f1, f2 = Future.new, Future.new
sub_queue.push(f1)
sub_queue.push(f2)

expect(f1.wait_for(1)).to eql(:ok)
expect(f2.wait_for(1)).to eql(:ok)

wait_pubs.wait_for(2)

reqs_started.pop; reqs_started.pop

# Start draining process asynchronously.
nc.drain
result = future.wait_for(30)

# Release the queue (we have 38 messages left)
80.times { sub_queue.push(Future.new) }
result = future.wait_for(2)
expect(result).to eql(:closed)
nc2.drain
sleep 2
t.exit
expect(wait_reqs.wait_for(2)).to eql(:ok)
end

it 'should report drain timeout error' do
nc = NATS.connect(drain_timeout: 0.1)
nc = NATS.connect(drain_timeout: 0.5)
nc2 = NATS.connect

future = Future.new

errors = []
nc.on_error do |e|
errors << e
end

nc.on_close do |err|
future.set_result(:closed)
future.set_result(:error)
end

wait_subs = Future.new
wait_pubs = Future.new

subs = []
('a'..'e').each do |subject|
sub = nc.subscribe(subject) do |msg|
begin
msg.respond("OK:#{msg.data}") if msg.reply
t = Thread.new do
wait_subs.wait_for(2)
10.times do |i|
('a'..'b').each do
payload = "REQ:#{_1}:#{i}"
nc2.publish(_1, payload * 128)
sleep 0.01
rescue => e
p e
end
end

wait_pubs.set_result(:ok)
end
nc.flush

# A queue to control the speed of processing messages
sub_queue = Queue.new
subs = []
('a'..'b').each do |subject|
sub = nc.subscribe(subject) do |msg|
ft = sub_queue.pop
sleep 0.01
ensure
ft&.set_result(:ok)
end
subs << sub
end
nc.flush

wait_subs.set_result(:OK)

# process a few messages
f1, f2 = Future.new, Future.new
sub_queue.push(f1)
sub_queue.push(f2)

expect(f1.wait_for(1)).to eql(:ok)
expect(f2.wait_for(1)).to eql(:ok)

wait_pubs.wait_for(2)

nc.drain
result = future.wait_for(10)
expect(result).to eql(:closed)
result = future.wait_for(2)
expect(result).to eql(:error)
expect(errors.first).to be_a(NATS::IO::DrainTimeoutError)
end
end
11 changes: 10 additions & 1 deletion spec/support/dns.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@

Resolv::DefaultResolver.replace_resolvers([hosts_resolver, dns_resolver])

# resolve-replace doesn't support modern Ruby TCPSocket signature:
# https://github.com/ruby/resolv-replace/issues/2
class TCPSocket
def initialize(host, serv, *rest, **kwargs)
rest[0] = IPSocket.getaddress(rest[0]) if rest[0]
original_resolv_initialize(IPSocket.getaddress(host), serv, *rest, **kwargs)
end
end

# Patch Socket to rely on Resolve when getting address info
# Inspired by the original resolve-replace.rb: https://github.com/ruby/resolv-replace/blob/master/lib/resolv-replace.rb
require "socket"
Expand All @@ -32,7 +41,7 @@ def getaddrinfo(host, *args)
begin
return original_getaddrinfo(Resolv.getaddress(host).to_s, *args)
rescue Resolv::ResolvError
raise SocketError, "Hostname not known: #{host}"
original_getaddrinfo(host, *args)
end
end
end
35 changes: 24 additions & 11 deletions spec/support/nats_server_helper.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require "socket"

class NatsServerControl
BIN_PATH = File.expand_path(File.join(__dir__, "../../scripts/nats-server"))

Expand All @@ -17,7 +19,7 @@ def init_with_config(config_file)
end

def init_with_config_from_string(config_string, config={})
puts config_string if ENV["DEBUG_NATS_TEST"] == "true"
puts config_string if debug?
config_file = Tempfile.new(['nats-cluster-tests', '.conf'])
File.open(config_file.path, 'w') do |f|
f.puts(config_string)
Expand All @@ -32,6 +34,9 @@ def init_with_config_from_string(config_string, config={})
NatsServerControl.new(uri, config['pid_file'], "-c #{config_file.path}", config_file)
end

def debug?
%w[1 true t].include?(ENV["DEBUG_NATS_TEST"])
end
end

attr_reader :uri
Expand All @@ -43,6 +48,10 @@ def initialize(uri='nats://127.0.0.1:4222', pid_file='/tmp/test-nats.pid', flags
@config_file = config_file
end

def debug?
self.class.debug?
end

def server_pid
@pid ||= File.read(@pid_file).chomp.to_i
end
Expand Down Expand Up @@ -70,7 +79,7 @@ def start_server(wait_for_server=true)
end
args += " #{@flags}" if @flags

if ENV["DEBUG_NATS_TEST"] == "true"
if debug?
system("#{BIN_PATH} #{args} -DV &")
else
system("#{BIN_PATH} #{args} 2> /dev/null &")
Expand All @@ -90,19 +99,23 @@ def kill_server
end

def wait_for_server(uri, max_wait = 5) # :nodoc:
start = Time.now
while (Time.now - start < max_wait) # Wait max_wait seconds max
break if server_running?(uri)
wait = max_wait.to_f
loop do
return if server_running?(uri)
sleep(0.1)
wait -= 0.1

raise "NATS Server did not start in #{max_wait} seconds" if wait <= 0
end
end

def server_running?(uri) # :nodoc:
require 'socket'
s = TCPSocket.new(uri.host, uri.port)
s.close
return true
rescue
return false
s = TCPSocket.new(uri.host, uri.port, nil, nil, connect_timeout: 0.5)
true
rescue => e
puts "Server is not available: #{e}" if debug?
false
ensure
s&.close
end
end

0 comments on commit ac2112c

Please sign in to comment.