diff --git a/app/models/ckb_sync/new_node_data_processor.rb b/app/models/ckb_sync/new_node_data_processor.rb index b1440ec305..2b1418899b 100644 --- a/app/models/ckb_sync/new_node_data_processor.rb +++ b/app/models/ckb_sync/new_node_data_processor.rb @@ -1149,7 +1149,7 @@ def cell_output_attributes(output, address, ckb_transaction, local_block, cell_i attrs = { ckb_transaction_id: ckb_transaction["id"], capacity: output.capacity, - occupied_capacity: CkbUtils.cal_cell_min_capacity(lock_script, type_script, output.capacity, binary_data), + occupied_capacity: CkbUtils.cal_cell_min_capacity(output.lock, output.type, binary_data), address_id: address.id, block_id: local_block.id, tx_hash: ckb_transaction["tx_hash"], diff --git a/app/models/ckb_sync/transaction_parser.rb b/app/models/ckb_sync/transaction_parser.rb new file mode 100644 index 0000000000..8e04dfa3d4 --- /dev/null +++ b/app/models/ckb_sync/transaction_parser.rb @@ -0,0 +1,151 @@ +module CkbSync + class TransactionParser + attr_reader :transaction, :extra_data + attr_accessor :tx_attr, :cell_outputs_attrs, :cell_data_attrs, :cell_inputs_attrs, :cell_deps_attrs, :witnesses_attrs, :header_deps_attrs, :lock_script_attrs, :addresses_attrs, + :account_books_attrs, :type_script_attrs + + # {"transaction"=>{"version"=>"0x0", "cell_deps"=>[{"out_point"=>{"tx_hash"=>"0xcd52d714ddea04d2917892f16d47cbd0bbbb7d9ba281233ec4021f79fc34bccc", "index"=>"0x0"}, "dep_type"=>"code"}, {"out_point"=>{"tx_hash"=>"0x9154df4f7336402114d04495175b37390ce86a4906d2d4001cf02c3e6d97f39c", "index"=>"0x0"}, "dep_type"=>"code"}, {"out_point"=>{"tx_hash"=>"0xbcd73881ba53f1cd95d0c855395c4ffe6f54e041765d9ab7602d48a7cb71612e", "index"=>"0x0"}, "dep_type"=>"code"}, {"out_point"=>{"tx_hash"=>"0xf8de3bb47d055cdf460d93a2a6e1b05f7432f9777c8c474abf4eec1d4aee5d37", "index"=>"0x0"}, "dep_type"=>"dep_group"}, {"out_point"=>{"tx_hash"=>"0x053fdb4ed3181eab3a3a5f05693b53a8cdec0a24569e16369f444bac48be7de9", "index"=>"0x0"}, "dep_type"=>"code"}], "header_deps"=>[], "inputs"=>[{"since"=>"0x40000000669f4e30", "previous_output"=>{"tx_hash"=>"0x3d9a919a18d2cc2b64d2063626c75a6c97e87d2e8c30ec7bd33ef3ce14039934", "index"=>"0x0"}}, {"since"=>"0x0", "previous_output"=>{"tx_hash"=>"0x3d9a919a18d2cc2b64d2063626c75a6c97e87d2e8c30ec7bd33ef3ce14039934", "index"=>"0x1"}}, {"since"=>"0x0", "previous_output"=>{"tx_hash"=>"0x3d9a919a18d2cc2b64d2063626c75a6c97e87d2e8c30ec7bd33ef3ce14039934", "index"=>"0x2"}}], "outputs"=>[{"capacity"=>"0x7676d7e00", "lock"=>{"code_hash"=>"0x79f90bb5e892d80dd213439eeab551120eb417678824f282b4ffb5f21bad2e1e", "hash_type"=>"type", "args"=>"0x00c267a8b93cdae15fb06325f11a72b1047bd4d33c00"}, "type"=>{"code_hash"=>"0x1e44736436b406f8e48a30dfbddcf044feb0c9eebfe63b0f81cb5bb727d84854", "hash_type"=>"type", "args"=>"0x86c7429247beba7ddd6e4361bcdfc0510b0b644131e2afb7e486375249a01802"}}, {"capacity"=>"0x3691d6afc000", "lock"=>{"code_hash"=>"0x7f5a09b8bd0e85bcf2ccad96411ccba2f289748a1c16900b0635c2ed9126f288", "hash_type"=>"type", "args"=>"0x702359ea7f073558921eb50d8c1c77e92f760c8f8656bde4995f26b8963e2dd8f245705db4fe72be953e4f9ee3808a1700a578341aa80a8b2349c236c4af64e5e077710000000000"}, "type"=>nil}, {"capacity"=>"0xe529edc1ba", "lock"=>{"code_hash"=>"0x9bd7e06f3ecf4be0f2fcd2188b23f1b9fcc88e5d4b65a8637b17723bbda3cce8", "hash_type"=>"type", "args"=>"0xc267a8b93cdae15fb06325f11a72b1047bd4d33c"}, "type"=>nil}], "outputs_data"=>["0xa0cf6037bfc238b179b74a30a9b12e15a4fbdd8881aebc8e5a66a8b5b5c95f0a6d833f7d5c1f8130fa2688970b421a57fbdffeff55bae5b4676043c543355799abe60200de0991241ff42c0255e2e2f65d114951c0a144e89d35527c582adc6603ff1ea2e17771000000000000000000000000000000000000000000000000000000000000000000000000001f779faa1f6184b10c9d865f62bba000e5d54e00a5d4b98cd768e43e376f68421a6b49de900100007c777100000000000001", "0x", "0x"], "witnesses"=>["0xc1030000100000006900000069000000550000005500000010000000550000005500000041000000e74f7818a6d2d1dda76593b30973967cd15fc6853731ecc0a5cf42cdb81859005b4a22781fa67973ccfbd3c5d0c1ccdd62506e2e99a049c19adc972e3909511a0054030000000000005003000010000000480300004c030000380300001c0000006c01000070010000740100007801000034030000500100002c000000340000005400000074000000940000009c000000c0000000e4000000e80000000c010000e0777100000000001c0000000200000014000000715ab282b873b79a7be8b0e8c13c4e8966a52040f7cfb9cf096dc32d69cac2b6f884bb2b1a8bb01660f3edc613ccfbeb7f3506d6f245705db4fe72be953e4f9ee3808a1700a578341aa80a8b2349c236c4af64e51a6b49de900100006d833f7d5c1f8130fa2688970b421a57fbdffeff55bae5b4676043c543355799abe602006d833f7d5c1f8130fa2688970b421a57fbdffeff55bae5b4676043c543355799abe602000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000024ea893c4fa601a048b1d3a8de265fd8b442ba2a1ac37d85dfe320a7c8c2069a000000000000000004000000b80100004c4f05500d310a045d500ed295c313abef5ae2862c24393fdacc2e2e4c2ba76b43828b9d505cfc4cbb06c1083817aa14c6e06df9c300687a61fa0ec947fe79962fb9c557845058409c8c929c05fe19cc818cd0da6f0bf887cdb7ca4e465e85f7796ddd0e1528507119e4869f5a24e613de92605fb220451c0d1727d65f1c030815155c3cb7acd350a011fc6c115f153c24055880abd2a1253281e4e8a894e2a0546ea8799b057667507cee0dc2c4b8b171a0331b3f9ec02cf8906ef52a873523d2dbe3ff9e5f58699a4f01509d5c06f8f45bfaf59625237cb06c099a9c2a59752d5c1032e503ef39fbded2d7506989c3d61168d80ae0e27a2b2ca904f768cc698f783fd4f5d6c45c1b7dd66bbb5090d5dd9cd0b9f66197a69ea30e933982c98b99a067b37a8d0b7adc687d8c8db44f0150da2c4d3f8cc63b7b827c24c55098ec49cc88a9c81ff07abb0057f7d1dc05c9304f0350287fc2f5b5005ff558ae296f4e9cb354876829fa9562d215fbf7cd9536d5e0e550a03eb9492d55fc2d9d63faf8ea6121d66a86230f0752178ab5643a81abff8380508ad8ce2ac94cf885730b362ca4b81787bf85b48d7f72ef7816130bdc54433f644fe9040000000000000000000000", "0x10000000100000001000000010000000", "0x55000000100000005500000055000000410000001ccd614d2dbafb3384f6da8d50fc6cef21e0280c05397a93072d1560c81fd8bf347d299d7879e7b7682812fbb9fd5841c97c2f92b5c3e64737aa61c9fab7a01401"], "hash"=>"0x74758da9a59938724839e442d6e2b10b5c69e8fa398d4bd4d9b64fd311801ac1"}, "cycles"=>"0x4819ea", "size"=>"0x853", "fee"=>"0x853", "timestamp"=>"0x190de4b95b3"} + def initialize(tx, extra_data = {}) + @transaction = tx + @extra_data = extra_data + @cell_outputs_attrs = [] + @cell_data_attrs = [] + @cell_inputs_attrs = [] + @cell_deps_attrs = [] + @witnesses_attrs = [] + @header_deps_attrs = [] + @lock_script_attrs = Set.new + @addresses_attrs = Set.new + @account_books_attrs = Set.new + @type_script_attrs = Set.new + end + + def parse + prepare_transaction_params + prepare_cell_inputs_params + prepare_cell_outputs_params + prepare_cell_witness_params + prepare_header_deps_params + prepare_cell_deps_params + end + + def prepare_transaction_params + @tx_attr = + { + tx_hash: transaction.hash, + version: transaction.version, + tx_status: "pending", + transaction_fee: extra_data["fee"]&.hex, + bytes: extra_data["size"]&.hex, + capacity_involved: nil, + cycles: extra_data["cycles"]&.hex, + live_cell_changes: transaction.outputs.count - transaction.inputs.count, + confirmation_time: extra_data["timestamp"]&.hex, + } + end + + def prepare_cell_inputs_params + transaction.inputs.each_with_index do |input, index| + @cell_inputs_attrs << + { + since: input.since, + previous_tx_hash: input.previous_output.tx_hash, + previous_index: input.previous_output.index, + index:, + tx_hash: transaction.hash, + from_cell_base: input.previous_output.tx_hash == CellOutput::SYSTEM_TX_HASH, + block_id: nil, + cell_type: nil, + ckb_transaction_id: nil, + previous_cell_output_id: nil, + } + end + end + + def prepare_cell_outputs_params + transaction.outputs.each_with_index do |output, index| + output_data = transaction.outputs_data[index] + binary_data = CKB::Utils.hex_to_bin(output_data) + cell_type = CkbUtils.cell_type(output.type, output_data) + @lock_script_attrs << output.lock.to_h.merge({ script_hash: output.lock.compute_hash }) + @addresses_attrs << { address_hash: CkbUtils.generate_address(output.lock), lock_hash: output.lock.compute_hash } + @account_books_attrs << { lock_script_hash: output.lock.compute_hash, tx_hash: transaction.hash } + @type_script_attrs << output.type.to_h.merge({ script_hash: output.type.compute_hash }) if output.type.present? + @cell_outputs_attrs << + { + capacity: output.capacity, + tx_hash: transaction.hash, + cell_index: index, + status: "pending", + occupied_capacity: CkbUtils.cal_cell_min_capacity(output.lock, output.type, binary_data), + address_id: nil, + cell_type:, + lock_script_hash: output.lock.compute_hash, + type_hash: output.type&.compute_hash, + udt_amount: udt_amount(cell_type, output_data, output.type&.args), + data_size: binary_data.bytesize, + data_hash: CKB::Blake2b.hexdigest(binary_data), + block_id: nil, + block_timestamp: nil, + dao: nil, + } + + if output_data != "0x" + @cell_data_attrs << + { + data: output_data, + cell_index: index, + tx_hash: transaction.hash, + } + end + end + end + + def prepare_cell_witness_params + transaction.witnesses.each_with_index do |witness, index| + @witnesses_attrs << + { + data: witness, + index:, + tx_hash: transaction.hash, + } + end + end + + def prepare_header_deps_params + transaction.header_deps.each_with_index do |header_dep, index| + @header_deps_attrs << + { + header_hash: header_dep, + index:, + tx_hash: transaction.hash, + } + end + end + + def prepare_cell_deps_params + transaction.cell_deps.each do |cell_dep| + @cell_deps_attrs << + { + dep_type: cell_dep.dep_type, + out_point_tx_hash: cell_dep.out_point.tx_hash, + out_point_index: cell_dep.out_point.index, + tx_hash: transaction.hash, + } + end + end + + private + + def udt_amount(cell_type, output_data, type_script_args) + case cell_type + when "udt", "xudt", "xudt_compatible" + CkbUtils.parse_udt_cell_data(output_data) + when "omiga_inscription" + CkbUtils.parse_omiga_inscription_data(output_data)[:mint_limit] + when "m_nft_token" + "0x#{type_script_args[-8..]}".hex + end + end + end +end diff --git a/app/models/ckb_sync/transactions.rb b/app/models/ckb_sync/transactions.rb new file mode 100644 index 0000000000..ef0360b011 --- /dev/null +++ b/app/models/ckb_sync/transactions.rb @@ -0,0 +1,128 @@ +module CkbSync + class Transactions + attr_accessor :parsers + + def initialize(raw_txs) + @parsers = + raw_txs.map do |raw_tx| + transaction, extra_data = + if raw_tx.is_a?(CKB::Types::Transaction) + [raw_tx, {}] + else + [CKB::Types::Transaction.from_h(raw_tx["transaction"].with_indifferent_access), raw_tx.except("transaction")] + end + + tx_parser = CkbSync::TransactionParser.new(transaction, extra_data) + tx_parser.parse + tx_parser + end + end + + def import + @parsers.each_slice(100).to_a.each do |group_parsers| + tx_attrs = [] + cell_outputs_attrs = [] + cell_data_attrs = [] + cell_inputs_attrs = [] + cell_deps_attrs = [] + witnesses_attrs = [] + header_deps_attrs = [] + lock_script_attrs = Set.new + addresses_attrs = Set.new + type_script_attrs = Set.new + account_books_attrs = Set.new + + group_parsers.each do |parser| + tx_attrs << parser.tx_attr + cell_outputs_attrs.concat(parser.cell_outputs_attrs) + cell_data_attrs.concat(parser.cell_data_attrs) + witnesses_attrs.concat(parser.witnesses_attrs) + cell_deps_attrs.concat(parser.cell_deps_attrs) + header_deps_attrs.concat(parser.header_deps_attrs) + cell_inputs_attrs.concat(parser.cell_inputs_attrs) + lock_script_attrs.merge(parser.lock_script_attrs) + addresses_attrs.merge(parser.addresses_attrs) + type_script_attrs.merge(parser.type_script_attrs) + account_books_attrs.merge(parser.account_books_attrs) + end + ApplicationRecord.transaction do + tx_returnings = CkbTransaction.upsert_all(tx_attrs, unique_by: %i[tx_status tx_hash], returning: %i[id tx_hash]) + tx_mappings = tx_returnings.rows.to_h { |id, tx_hash| [tx_hash.sub(/^\\x/, "0x"), id] } + lock_script_returnings = LockScript.upsert_all(lock_script_attrs.to_a, unique_by: :script_hash, returning: %i[id script_hash]) + lock_script_mappings = lock_script_returnings.rows.to_h { |id, script_hash| [script_hash, id] } + address_returnings = Address.upsert_all(addresses_attrs.to_a, unique_by: :lock_hash, returning: %i[id lock_hash]) + address_mappings = address_returnings.rows.to_h { |id, lock_hash| [[lock_hash.sub(/^\\x/, "")].pack("H*"), id] } + type_script_mappings = {} + if type_script_attrs.present? + type_script_returnings = TypeScript.upsert_all(type_script_attrs.to_a, unique_by: :script_hash, returning: %i[id script_hash]) + type_script_mappings = type_script_returnings.rows.to_h { |id, script_hash| [script_hash, id] } + end + new_account_books_attrs = + account_books_attrs.to_a.map do |attr| + { ckb_transaction_id: tx_mappings[attr[:tx_hash]], address_id: address_mappings[attr[:lock_script_hash]] } + end + AccountBook.upsert_all(new_account_books_attrs, unique_by: %i[address_id ckb_transaction_id]) + new_cell_outputs_attrs = + cell_outputs_attrs.map do |attr| + attr.merge({ ckb_transaction_id: tx_mappings[attr[:tx_hash]], lock_script_id: lock_script_mappings[attr[:lock_script_hash]], + type_script_id: type_script_mappings[attr[:type_hash]], address_id: address_mappings[attr[:lock_script_hash]] }).except(:lock_script_hash) + end + cell_outputs_returnings = CellOutput.upsert_all(new_cell_outputs_attrs, unique_by: %i[tx_hash cell_index status], returning: %i[id tx_hash cell_index]) + cell_output_mappings = cell_outputs_returnings.rows.to_h { |id, tx_hash, cell_index| ["#{tx_hash.sub(/^\\x/, '0x')}-#{cell_index}", id] } + new_cell_data_attrs = + cell_data_attrs.map do |attr| + { cell_output_id: cell_output_mappings["#{attr[:tx_hash]}-#{attr[:cell_index]}"], data: attr[:data] } + end + CellDatum.upsert_all(new_cell_data_attrs, unique_by: :cell_output_id) if new_cell_data_attrs.present? + new_witnesses_attrs = + witnesses_attrs.map do |attr| + { ckb_transaction_id: tx_mappings[attr[:tx_hash]], data: attr[:data], index: attr[:index] } + end + Witness.upsert_all(new_witnesses_attrs, unique_by: %i[ckb_transaction_id index]) + new_header_deps_attrs = + header_deps_attrs.map do |attr| + { ckb_transaction_id: tx_mappings[attr[:tx_hash]], header_hash: attr[:header_hash], index: attr[:index] } + end + HeaderDependency.upsert_all(new_header_deps_attrs, unique_by: %i[ckb_transaction_id index]) if new_header_deps_attrs.present? + cell_dep_conditions = cell_deps_attrs.map { |cell_dep| { tx_hash: cell_dep[:out_point_tx_hash], cell_index: cell_dep[:out_point_index] } } + cell_dep_returnings = batch_query_outputs(cell_dep_conditions, %i[id tx_hash cell_index]) + cell_dep_mappings = cell_dep_returnings.to_h { |id, tx_hash, cell_index| ["#{tx_hash}-#{cell_index}", id] } + new_cell_deps_attrs = + cell_deps_attrs.map do |attr| + { ckb_transaction_id: tx_mappings[attr[:tx_hash]], contract_cell_id: cell_dep_mappings["#{attr[:out_point_tx_hash]}-#{attr[:out_point_index]}"], dep_type: attr[:dep_type] } + end.filter { |attr| !attr[:contract_cell_id].nil? } + CellDependency.upsert_all(new_cell_deps_attrs, unique_by: %i[ckb_transaction_id contract_cell_id]) if new_cell_deps_attrs.present? + + input_conditions = cell_inputs_attrs.filter do |input| + input[:previous_tx_hash] != CellOutput::SYSTEM_TX_HASH + end.map { |input| { tx_hash: input[:previous_tx_hash], cell_index: input[:previous_index] } } + input_returnings = batch_query_outputs(input_conditions, %i[id cell_type tx_hash cell_index capacity]) + input_mappings = input_returnings.to_h { |id, cell_type, tx_hash, cell_index| ["#{tx_hash}-#{cell_index}", "#{cell_type}-#{id}"] } + new_cell_inputs_attrs = + cell_inputs_attrs.map do |attr| + attr[:ckb_transaction_id] = tx_mappings[attr[:tx_hash]] + if attr[:previous_tx_hash] != CellOutput::SYSTEM_TX_HASH && input_mappings["#{attr[:previous_tx_hash]}-#{attr[:previous_index]}"].present? + cell_type, previous_cell_output_id = input_mappings["#{attr[:previous_tx_hash]}-#{attr[:previous_index]}"].split("-") + attr[:previous_cell_output_id] = previous_cell_output_id + attr[:cell_type] = cell_type + end + attr.except(:tx_hash) + end + CellInput.upsert_all(new_cell_inputs_attrs, unique_by: %i[ckb_transaction_id index]) + end + end + end + + private + + def batch_query_outputs(conditions, returnings = %i[id cell_type tx_hash cell_index]) + relation = CellOutput.none + + conditions.each do |condition| + relation = relation.or(CellOutput.where(condition)) + end + + relation.pluck(Arel.sql(returnings.join(", "))) + end + end +end diff --git a/app/utils/ckb_utils.rb b/app/utils/ckb_utils.rb index 96b423d92c..125ea5f1af 100644 --- a/app/utils/ckb_utils.rb +++ b/app/utils/ckb_utils.rb @@ -11,10 +11,7 @@ def self.calculate_cell_min_capacity(output, data) output.calculate_min_capacity(data) end - def self.cal_cell_min_capacity(lock_script, type_script, capacity, binary_data) - lock = CKB::Types::Script.new(**lock_script.to_node) - type = type_script.present? ? CKB::Types::Script.new(**type_script.to_node) : nil - CKB::Types::Output.new(capacity: capacity.to_i, lock:, type:) + def self.cal_cell_min_capacity(lock_script, type_script, binary_data) CKB::Utils.byte_to_shannon([8, binary_data&.bytesize || 0, lock_script.calculate_bytesize, type_script&.calculate_bytesize || 0].sum) end diff --git a/app/workers/import_pending_tx_worker.rb b/app/workers/import_pending_tx_worker.rb new file mode 100644 index 0000000000..21db2e3f8d --- /dev/null +++ b/app/workers/import_pending_tx_worker.rb @@ -0,0 +1,12 @@ +# Check every pending transaction in the pool if rejected +class ImportPendingTxWorker + include Sidekiq::Worker + sidekiq_options retry: 0, queue: "pending_tx" + + def perform(data) + if CkbTransaction.tx_committed.find_by(tx_hash: data["transaction"]["hash"]) + else + CkbSync::Transactions.new([data]).import + end + end +end diff --git a/lib/websocket.rb b/lib/websocket.rb index 3bff70e1f2..65601e8db0 100644 --- a/lib/websocket.rb +++ b/lib/websocket.rb @@ -22,36 +22,36 @@ def subscribe(connection, topic) $message_id += 1 message = Protocol::WebSocket::JSONMessage.generate({ - "id": $message_id, - "jsonrpc": "2.0", - "method": "subscribe", - "params": [topic] - }) + "id": $message_id, + "jsonrpc": "2.0", + "method": "subscribe", + "params": [topic], + }) message.send(connection) connection.flush end -queue = Queue.new +# queue = Queue.new -persister = - Thread.new do - Rails.application.executor.wrap do - loop do - data = queue.pop +# persister = +# Thread.new do +# Rails.application.executor.wrap do +# loop do +# data = queue.pop - begin - ImportTransactionJob.new.perform(data["transaction"], { - cycles: data["cycles"].hex, - fee: data["fee"].hex, - size: data["size"].hex, - timestamp: data["timestamp"].hex - }) - rescue StandardError => e - Rails.logger.error "Error occurred during ImportTransactionJob data: #{data}, error: #{e.message}" - end - end - end - end +# begin +# ImportTransactionJob.new.perform(data["transaction"], { +# cycles: data["cycles"].hex, +# fee: data["fee"].hex, +# size: data["size"].hex, +# timestamp: data["timestamp"].hex, +# }) +# rescue StandardError => e +# Rails.logger.error "Error occurred during ImportTransactionJob data: #{data}, error: #{e.message}" +# end +# end +# end +# end Async do |_task| endpoint = Async::HTTP::Endpoint.parse(URL, alpn_protocols: Async::HTTP::Protocol::HTTP11.names) @@ -64,9 +64,6 @@ def subscribe(connection, topic) res = message.to_h if res[:method] == "subscribe" data = JSON.parse res[:params][:result] - # binding.pry - puts data["transaction"]["hash"] - queue.push(data) end end end