Skip to content

Commit

Permalink
Merge branch 'heartbeat'
Browse files Browse the repository at this point in the history
# Conflicts:
#	lib/skyfall/stream.rb
  • Loading branch information
mackuba committed Mar 21, 2024
2 parents b6762fd + 8283c29 commit e63f03c
Showing 1 changed file with 22 additions and 2 deletions.
24 changes: 22 additions & 2 deletions lib/skyfall/stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ class Stream
:subscribe_labels => SUBSCRIBE_LABELS
}

EVENTS = %w(message raw_message connecting connect disconnect reconnect error)
EVENTS = %w(message raw_message connecting connect disconnect reconnect error timeout)

MAX_RECONNECT_INTERVAL = 300

attr_accessor :heartbeat_timeout, :heartbeat_interval, :cursor, :auto_reconnect
attr_accessor :heartbeat_timeout, :heartbeat_interval, :cursor, :auto_reconnect, :last_update

def initialize(server, endpoint, cursor = nil)
@endpoint = check_endpoint(endpoint)
Expand All @@ -27,6 +27,9 @@ def initialize(server, endpoint, cursor = nil)
@handlers = {}
@auto_reconnect = true
@connection_attempts = 0
@heartbeat_interval = 10
@heartbeat_timeout = 300
@last_update = nil

@handlers[:error] = proc { |e| puts "ERROR: #{e}" }
end
Expand All @@ -51,11 +54,14 @@ def connect

@ws.on(:open) do |e|
@handlers[:connect]&.call
@last_update = Time.now
start_heartbeat_timer
end

@ws.on(:message) do |msg|
@reconnecting = false
@connection_attempts = 0
@last_update = Time.now

data = msg.data.pack('C*')
@handlers[:raw_message]&.call(data)
Expand Down Expand Up @@ -110,6 +116,20 @@ def disconnect

alias close disconnect

def start_heartbeat_timer
return if @timer || @heartbeat_interval.to_f <= 0 || @heartbeat_timeout.to_f <= 0

@timer = EM::PeriodicTimer.new(@heartbeat_interval) do
next if @ws.nil? || @heartbeat_timeout.to_f <= 0
time_passed = Time.now - @last_update

if time_passed > @heartbeat_timeout
@handlers[:timeout]&.call
reconnect
end
end
end

EVENTS.each do |event|
define_method "on_#{event}" do |&block|
@handlers[event.to_sym] = block
Expand Down

0 comments on commit e63f03c

Please sign in to comment.