Skip to content

Commit

Permalink
Merge pull request #6248 from MinaProtocol/fix/include-memo-user-cmds
Browse files Browse the repository at this point in the history
Include memos in user commands in replayer
  • Loading branch information
mergify[bot] authored Oct 3, 2020
2 parents 1a40fc4 + ebf7b19 commit f4e2bd5
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 16 deletions.
70 changes: 58 additions & 12 deletions src/app/replayer/replayer.ml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ let create_ledger accounts =
Ledger.create_new_account_exn ledger acct_id acct ) ;
ledger

let json_ledger_hash_of_ledger ledger =
Ledger_hash.to_yojson @@ Ledger.merkle_root ledger

let create_output target_state_hash target_proof ledger =
let target_ledger = Ledger.to_list ledger in
{target_state_hash; target_proof; target_ledger}
Expand Down Expand Up @@ -245,13 +248,14 @@ let run_user_command ~logger ~pool ~ledger (cmd : Sql.User_command.t) =
cmd.nonce cmd.global_slot cmd.sequence_no ;
let%bind body = body_of_sql_user_cmd pool cmd in
let%map fee_payer_pk = pk_of_pk_id pool cmd.fee_payer_id in
let memo = Signed_command_memo.of_string cmd.memo in
let payload =
Signed_command_payload.create
~fee:(Currency.Fee.of_uint64 @@ Unsigned.UInt64.of_int64 cmd.fee)
~fee_token:(Token_id.of_uint64 @@ Unsigned.UInt64.of_int64 cmd.fee_token)
~fee_payer_pk
~nonce:(Unsigned.UInt32.of_int64 cmd.nonce)
~valid_until:None ~memo:Signed_command_memo.dummy ~body
~valid_until:None ~memo ~body
in
(* when applying the transaction, there's a check that the fee payer and
signer keys are the same; since this transaction was accepted, we know
Expand Down Expand Up @@ -314,6 +318,20 @@ let main ~input_file ~output_file ~archive_uri () =
State_hash.to_yojson input.target_state_hash
|> unquoted_string_of_yojson
in
[%log info] "Loading global slots" ;
let%bind global_slots =
match%bind
Caqti_async.Pool.use
(fun db -> Sql.Global_slots.run db state_hash)
pool
with
| Ok slots ->
return (Int64.Set.of_list slots)
| Error msg ->
[%log error] "Error getting global slots"
~metadata:[("error", `String (Caqti_error.show msg))] ;
exit 1
in
[%log info] "Loading user command ids" ;
let%bind user_cmd_ids =
match%bind
Expand Down Expand Up @@ -364,8 +382,13 @@ let main ~input_file ~output_file ~archive_uri () =
id (Caqti_error.show msg) () )
in
let unsorted_internal_cmds = List.concat unsorted_internal_cmds_list in
(* filter out internal commands in blocks not along chain from target state hash *)
let filtered_internal_cmds =
List.filter unsorted_internal_cmds ~f:(fun cmd ->
Int64.Set.mem global_slots cmd.global_slot )
in
let sorted_internal_cmds =
List.sort unsorted_internal_cmds ~compare:(fun ic1 ic2 ->
List.sort filtered_internal_cmds ~compare:(fun ic1 ic2 ->
let tuple (ic : Sql.Internal_command.t) =
(ic.global_slot, ic.sequence_no, ic.secondary_sequence_no)
in
Expand All @@ -392,33 +415,47 @@ let main ~input_file ~output_file ~archive_uri () =
(Caqti_error.show msg) () )
in
let unsorted_user_cmds = List.concat unsorted_user_cmds_list in
(* filter out user commands in blocks not along chain from target state hash *)
let filtered_user_cmds =
List.filter unsorted_user_cmds ~f:(fun cmd ->
Int64.Set.mem global_slots cmd.global_slot )
in
let sorted_user_cmds =
List.sort unsorted_user_cmds ~compare:(fun uc1 uc2 ->
List.sort filtered_user_cmds ~compare:(fun uc1 uc2 ->
let tuple (uc : Sql.User_command.t) =
(uc.global_slot, uc.sequence_no)
in
[%compare: int64 * int] (tuple uc1) (tuple uc2) )
in
(* apply commands in global slot, sequence order *)
let rec apply_commands (internal_cmds : Sql.Internal_command.t list)
(user_cmds : Sql.User_command.t list) =
(user_cmds : Sql.User_command.t list) ~last_global_slot =
let log_on_slot_change curr_global_slot =
if Int64.( > ) curr_global_slot last_global_slot then
[%log info] "Applied all commands at global slot %Ld, ledger hash"
~metadata:[("ledger_hash", json_ledger_hash_of_ledger ledger)]
last_global_slot
in
let combine_or_run_internal_cmds (ic : Sql.Internal_command.t)
(ics : Sql.Internal_command.t list) =
match ics with
| ic2 :: ics2
when Int.equal ic.sequence_no ic2.sequence_no
when Int64.equal ic.global_slot ic2.global_slot
&& Int.equal ic.sequence_no ic2.sequence_no
&& String.equal ic.type_ "fee_transfer"
&& String.equal ic.type_ ic2.type_ ->
(* combining situation 2
two fee transfer commands with same sequence number
*)
two fee transfer commands with same global slot, sequence number
*)
log_on_slot_change ic.global_slot ;
let%bind () =
apply_combined_fee_transfer ~logger ~pool ~ledger ic ic2
in
apply_commands ics2 user_cmds
apply_commands ics2 user_cmds ~last_global_slot:ic.global_slot
| _ ->
log_on_slot_change ic.global_slot ;
let%bind () = run_internal_command ~logger ~pool ~ledger ic in
apply_commands ics user_cmds
apply_commands ics user_cmds ~last_global_slot:ic.global_slot
in
(* choose command with least global slot, sequence number
TODO: check for gaps?
Expand All @@ -432,11 +469,13 @@ let main ~input_file ~output_file ~archive_uri () =
| [], [] ->
Deferred.unit
| [], uc :: ucs ->
log_on_slot_change uc.global_slot ;
let%bind () = run_user_command ~logger ~pool ~ledger uc in
apply_commands [] ucs
apply_commands [] ucs ~last_global_slot:uc.global_slot
| ic :: _, uc :: ucs when cmp_ic_uc ic uc > 0 ->
log_on_slot_change uc.global_slot ;
let%bind () = run_user_command ~logger ~pool ~ledger uc in
apply_commands internal_cmds ucs
apply_commands internal_cmds ucs ~last_global_slot:uc.global_slot
| ic :: ics, [] ->
combine_or_run_internal_cmds ic ics
| ic :: ics, uc :: _ when cmp_ic_uc ic uc < 0 ->
Expand All @@ -447,7 +486,14 @@ let main ~input_file ~output_file ~archive_uri () =
slot %Ld and sequence number %d"
ic.global_slot ic.sequence_no ()
in
let%bind () = apply_commands sorted_internal_cmds sorted_user_cmds in
[%log info] "At genesis, ledger hash"
~metadata:[("ledger_hash", json_ledger_hash_of_ledger ledger)] ;
let%bind () =
apply_commands sorted_internal_cmds sorted_user_cmds
~last_global_slot:0L
in
[%log info] "After applying all commands, ledger hash"
~metadata:[("ledger_hash", json_ledger_hash_of_ledger ledger)] ;
[%log info] "Writing output to $output_file"
~metadata:[("output_file", `String output_file)] ;
let output =
Expand Down
35 changes: 31 additions & 4 deletions src/app/replayer/sql.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,31 @@

open Core_kernel

module Global_slots = struct
(* find all global slots in blocks, working back from block with given state hash *)
let query =
Caqti_request.collect Caqti_type.string Caqti_type.int64
{|
WITH RECURSIVE chain AS (

SELECT id,parent_id,global_slot FROM blocks b WHERE b.state_hash = ?

UNION ALL

SELECT b.id,b.parent_id,b.global_slot FROM blocks b

INNER JOIN chain

ON b.id = chain.parent_id
)

SELECT global_slot FROM chain c
|}

let run (module Conn : Caqti_async.CONNECTION) state_hash =
Conn.collect_list query state_hash
end

(* build query to find all blocks back to genesis block, starting with the block containing the
specified state hash; for each such block, find ids of all (user or internal) commands in that block
*)
Expand Down Expand Up @@ -52,6 +77,7 @@ module User_command = struct
; fee_token: int64
; token: int64
; amount: int64 option
; memo: string
; nonce: int64
; global_slot: int64
; sequence_no: int }
Expand All @@ -62,12 +88,12 @@ module User_command = struct
Ok
( (t.type_, t.fee_payer_id, t.source_id, t.receiver_id)
, (t.fee, t.fee_token, t.token, t.amount)
, (t.nonce, t.global_slot, t.sequence_no) )
, (t.memo, t.nonce, t.global_slot, t.sequence_no) )
in
let decode
( (type_, fee_payer_id, source_id, receiver_id)
, (fee, fee_token, token, amount)
, (nonce, global_slot, sequence_no) ) =
, (memo, nonce, global_slot, sequence_no) ) =
Ok
{ type_
; fee_payer_id
Expand All @@ -77,6 +103,7 @@ module User_command = struct
; fee_token
; token
; amount
; memo
; nonce
; global_slot
; sequence_no }
Expand All @@ -85,14 +112,14 @@ module User_command = struct
Caqti_type.(
tup3 (tup4 string int int int)
(tup4 int64 int64 int64 (option int64))
(tup3 int64 int64 int))
(tup4 string int64 int64 int))
in
Caqti_type.custom ~encode ~decode rep

let query =
Caqti_request.collect Caqti_type.int typ
{|
SELECT type,fee_payer_id, source_id,receiver_id,fee,fee_token,token,amount,nonce,global_slot,sequence_no,status FROM
SELECT type,fee_payer_id, source_id,receiver_id,fee,fee_token,token,amount,memo,nonce,global_slot,sequence_no,status FROM

(SELECT * FROM user_commands WHERE id = ?) AS uc

Expand Down

0 comments on commit f4e2bd5

Please sign in to comment.