-
Notifications
You must be signed in to change notification settings - Fork 42
/
websocket.rb
71 lines (60 loc) · 2.25 KB
/
websocket.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
require_relative "../config/environment"
require "new_relic/agent"
NewRelic::Agent.manual_start(sync_startup: true)
Rails.logger = Logger.new(STDERR)
Rails.logger.level = ENV.fetch("LOG_LEVEL") { "info" }
ActiveRecord::Base.logger = Rails.logger
at_exit do
puts "exiting & clearing"
end
check_environments if Rails.env.production?
require "async"
require "async/http"
require "async/websocket"
require "protocol/websocket/json_message"
URL = ENV.fetch("CKB_WS_URL", "http://localhost:28114")
$message_id = 0
def subscribe(connection, topic)
$message_id += 1
message = Protocol::WebSocket::JSONMessage.generate({
"id": $message_id,
"jsonrpc": "2.0",
"method": "subscribe",
"params": [topic],
})
message.send(connection)
connection.flush
end
# queue = Queue.new
# persister =
# Thread.new do
# Rails.application.executor.wrap do
# loop do
# data = queue.pop
# begin
# ImportTransactionJob.new.perform(data["transaction"], {
# cycles: data["cycles"].hex,
# fee: data["fee"].hex,
# size: data["size"].hex,
# timestamp: data["timestamp"].hex,
# })
# rescue StandardError => e
# Rails.logger.error "Error occurred during ImportTransactionJob data: #{data}, error: #{e.message}"
# end
# end
# end
# end
Async do |_task|
endpoint = Async::HTTP::Endpoint.parse(URL, alpn_protocols: Async::HTTP::Protocol::HTTP11.names)
Async::WebSocket::Client.connect(endpoint) do |connection|
subscribe connection, "new_transaction"
while message = connection.read
message = Protocol::WebSocket::JSONMessage.wrap(message)
res = message.to_h
if res[:method] == "subscribe"
data = JSON.parse res[:params][:result]
ImportPendingTxWorker.perform_async(data)
end
end
end
end