Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend balances table for better Rosetta #9859

Merged
merged 6 commits into from
Nov 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 71 additions & 32 deletions src/app/archive/archive_lib/processor.ml
Original file line number Diff line number Diff line change
Expand Up @@ -697,11 +697,18 @@ module Coinbase = struct
end

module Balance = struct
type t = {id: int; public_key_id: int; balance: int64} [@@deriving hlist]
type t = { id: int
; public_key_id: int
; balance: int64
; block_id: int
; block_height: int64
; block_sequence_no: int
; block_secondary_sequence_no: int
} [@@deriving hlist]

let typ =
let open Caqti_type_spec in
let spec = Caqti_type.[int; int; int64] in
let spec = Caqti_type.[int; int; int64; int; int64;int;int] in
let encode t = Ok (hlist_to_tuple spec (to_hlist t)) in
let decode t = Ok (of_hlist (tuple_to_hlist spec t)) in
Caqti_type.custom ~encode ~decode (to_rep spec)
Expand All @@ -711,43 +718,55 @@ module Balance = struct
|> Unsigned.UInt64.to_int64

let find (module Conn : CONNECTION) ~(public_key_id : int)
~(balance : Currency.Balance.t) =
~(balance : Currency.Balance.t) ~block_id ~block_height ~block_sequence_no ~block_secondary_sequence_no =
Conn.find_opt
(Caqti_request.find_opt
Caqti_type.(tup2 int int64)
Caqti_type.(tup2 (tup2 int int64) (tup4 int int64 int int))
Caqti_type.int
{sql| SELECT id FROM balances
WHERE public_key_id = $1
AND balance = $2
AND block_id = $3
AND block_height = $4
AND block_sequence_no = $5
AND block_secondary_sequence_no = $6
|sql})
(public_key_id, balance_to_int64 balance)
((public_key_id, balance_to_int64 balance),
(block_id, block_height, block_sequence_no, block_secondary_sequence_no))

let load (module Conn : CONNECTION) ~(id : int) =
Conn.find
(Caqti_request.find Caqti_type.int
Caqti_type.(tup2 int int64)
{sql| SELECT public_key_id, balance FROM balances
typ
{sql| SELECT id, public_key_id, balance,
block_id, block_height,
block_sequence_no, block_secondary_sequence_no
FROM balances
WHERE id = $1
|sql})
id

let add (module Conn : CONNECTION) ~(public_key_id : int)
~(balance : Currency.Balance.t) =
~(balance : Currency.Balance.t) ~block_id ~block_height ~block_sequence_no ~block_secondary_sequence_no =
Conn.find
(Caqti_request.find
Caqti_type.(tup2 int int64)
Caqti_type.(tup2 (tup2 int int64) (tup4 int int64 int int))
Caqti_type.int
{sql| INSERT INTO balances (public_key_id, balance) VALUES (?, ?) RETURNING id |sql})
(public_key_id, balance_to_int64 balance)
{sql| INSERT INTO balances (public_key_id, balance,
block_id, block_height, block_sequence_no, block_secondary_sequence_no)
VALUES (?, ?, ?, ?, ?, ?)
RETURNING id |sql})
((public_key_id, balance_to_int64 balance),
(block_id, block_height, block_sequence_no, block_secondary_sequence_no))

let add_if_doesn't_exist (module Conn : CONNECTION) ~(public_key_id : int)
~(balance : Currency.Balance.t) =
~(balance : Currency.Balance.t) ~block_id ~block_height ~block_sequence_no ~block_secondary_sequence_no =
let open Deferred.Result.Let_syntax in
match%bind find (module Conn) ~public_key_id ~balance with
match%bind find (module Conn) ~public_key_id ~balance ~block_id ~block_height ~block_sequence_no ~block_secondary_sequence_no with
| Some balance_id ->
return balance_id
| None ->
add (module Conn) ~public_key_id ~balance
add (module Conn) ~public_key_id ~balance ~block_id ~block_height ~block_sequence_no ~block_secondary_sequence_no
end

module Block_and_internal_command = struct
Expand Down Expand Up @@ -899,7 +918,7 @@ module Block_and_signed_command = struct
; source_balance_id
; receiver_balance_id }

let add_with_status (module Conn : CONNECTION) ~block_id ~user_command_id
let add_with_status (module Conn : CONNECTION) ~block_id ~block_height ~user_command_id
~sequence_no ~(status : Transaction_status.t) ~fee_payer_id ~source_id
~receiver_id =
let open Deferred.Result.Let_syntax in
Expand All @@ -926,7 +945,7 @@ module Block_and_signed_command = struct
| Failed (failure, balances) ->
("failed", Some failure, None, None, None, balances)
in
let add_optional_balance id balance =
let add_optional_balance id balance ~block_id ~block_height ~block_sequence_no ~block_secondary_sequence_no =
match balance with
| None ->
Deferred.Result.return None
Expand All @@ -935,6 +954,7 @@ module Block_and_signed_command = struct
Balance.add_if_doesn't_exist
(module Conn)
~public_key_id:id ~balance
~block_id ~block_height ~block_sequence_no ~block_secondary_sequence_no
in
Some balance_id
in
Expand All @@ -945,12 +965,15 @@ module Block_and_signed_command = struct
Balance.add_if_doesn't_exist
(module Conn)
~public_key_id:fee_payer_id ~balance:fee_payer_balance
~block_id ~block_height ~block_sequence_no:sequence_no ~block_secondary_sequence_no:0
in
let%bind source_balance_id =
add_optional_balance source_id source_balance
~block_id ~block_height ~block_sequence_no:sequence_no ~block_secondary_sequence_no:0
in
let%bind receiver_balance_id =
add_optional_balance receiver_id receiver_balance
~block_id ~block_height ~block_sequence_no:sequence_no ~block_secondary_sequence_no:0
in
add
(module Conn)
Expand Down Expand Up @@ -1113,6 +1136,11 @@ module Block = struct
(module Conn)
(Consensus.Data.Consensus_state.next_epoch_data consensus_state)
in
let height =
consensus_state
|> Consensus.Data.Consensus_state.blockchain_length
|> Unsigned.UInt32.to_int64
in
let%bind block_id =
Conn.find
(Caqti_request.find typ Caqti_type.int
Expand All @@ -1138,10 +1166,7 @@ module Block = struct
Protocol_state.blockchain_state protocol_state
|> Blockchain_state.staged_ledger_hash
|> Staged_ledger_hash.ledger_hash |> Ledger_hash.to_string
; height=
consensus_state
|> Consensus.Data.Consensus_state.blockchain_length
|> Unsigned.UInt32.to_int64
; height
; global_slot_since_hard_fork=
Consensus.Data.Consensus_state.curr_global_slot consensus_state
|> Unsigned.UInt32.to_int64
Expand Down Expand Up @@ -1224,7 +1249,7 @@ module Block = struct
let%map () =
Block_and_signed_command.add_with_status
(module Conn)
~block_id ~user_command_id:id ~sequence_no
~block_id ~block_height:height ~user_command_id:id ~sequence_no
~status:user_command.status ~fee_payer_id ~source_id
~receiver_id
>>| ignore
Expand Down Expand Up @@ -1283,6 +1308,8 @@ module Block = struct
Balance.add_if_doesn't_exist
(module Conn)
~public_key_id:receiver_id ~balance
~block_id ~block_height:height
~block_sequence_no:sequence_no ~block_secondary_sequence_no:secondary_sequence_no
in
let receiver_account_creation_fee_paid =
account_creation_fee_of_fees_and_balance fee balance
Expand Down Expand Up @@ -1328,6 +1355,7 @@ module Block = struct
(module Conn)
~public_key_id:fee_transfer_receiver_id
~balance
~block_id ~block_height:height ~block_sequence_no:sequence_no ~block_secondary_sequence_no:0
in
let receiver_account_creation_fee_paid =
account_creation_fee_of_fees_and_balance fee balance
Expand All @@ -1354,6 +1382,9 @@ module Block = struct
(module Conn)
~public_key_id:coinbase_receiver_id
~balance:balances.coinbase_receiver_balance
~block_id ~block_height:height
~block_sequence_no:sequence_no
~block_secondary_sequence_no:0
in
let receiver_account_creation_fee_paid =
account_creation_fee_of_fees_and_balance ?additional_fee
Expand Down Expand Up @@ -1470,33 +1501,40 @@ module Block = struct
in
List.zip_exn block.user_cmds (List.rev user_cmd_ids_rev)
in
let balance_id_of_pk_and_balance pk balance =
let balance_id_of_info pk balance ~block_sequence_no ~block_secondary_sequence_no =
let%bind public_key_id =
Public_key.add_if_doesn't_exist (module Conn) pk
in
Balance.add_if_doesn't_exist (module Conn) ~public_key_id ~balance
~block_id ~block_height:(block.height |> Unsigned.UInt32.to_int64) ~block_sequence_no
~block_secondary_sequence_no
in
let balance_id_of_pk_and_balance_opt pk balance_opt =
let balance_id_of_info_balance_opt pk balance_opt ~block_sequence_no ~block_secondary_sequence_no =
Option.value_map balance_opt ~default:(Deferred.Result.return None)
~f:(fun balance ->
let%map id = balance_id_of_pk_and_balance pk balance in
Some id )
let%map id = balance_id_of_info pk balance
~block_sequence_no ~block_secondary_sequence_no
in
Some id )
in
(* add user commands to join table *)
let%bind () =
deferred_result_list_fold user_cmds_with_ids ~init:()
~f:(fun () (user_command, user_command_id) ->
let%bind source_balance_id =
balance_id_of_pk_and_balance_opt user_command.source
user_command.source_balance
in
let%bind fee_payer_balance_id =
balance_id_of_pk_and_balance user_command.fee_payer
balance_id_of_info user_command.fee_payer
user_command.fee_payer_balance
~block_sequence_no:user_command.sequence_no ~block_secondary_sequence_no:0
in
let%bind source_balance_id =
balance_id_of_info_balance_opt user_command.source
user_command.source_balance
~block_sequence_no:user_command.sequence_no ~block_secondary_sequence_no:0
in
let%bind receiver_balance_id =
balance_id_of_pk_and_balance_opt user_command.receiver
balance_id_of_info_balance_opt user_command.receiver
user_command.receiver_balance
~block_sequence_no:user_command.sequence_no ~block_secondary_sequence_no:0
in
Block_and_signed_command.add_if_doesn't_exist
(module Conn)
Expand Down Expand Up @@ -1536,8 +1574,9 @@ module Block = struct
, (sequence_no, secondary_sequence_no) )
->
let%bind receiver_balance_id =
balance_id_of_pk_and_balance internal_command.receiver
balance_id_of_info internal_command.receiver
internal_command.receiver_balance
~block_sequence_no:internal_command.sequence_no ~block_secondary_sequence_no:internal_command.secondary_sequence_no
in
Block_and_internal_command.add_if_doesn't_exist
(module Conn)
Expand Down
20 changes: 17 additions & 3 deletions src/app/archive/create_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,28 @@ CREATE INDEX idx_blocks_creator_id ON blocks(creator_id);
CREATE INDEX idx_blocks_height ON blocks(height);
CREATE INDEX idx_chain_status ON blocks(chain_status);

/* the block_* columns refer to the block containing a user command or internal command that
results in a balance
for a balance resulting from a user command, the secondary sequence no is always 0
these columns duplicate information available in the
blocks_user_commands and blocks_internal_commands tables
they are included here to allow Rosetta account queries to consume
fewer Postgresql resources
*/
CREATE TABLE balances
( id serial PRIMARY KEY
, public_key_id int NOT NULL REFERENCES public_keys(id)
, balance bigint NOT NULL
( id serial PRIMARY KEY
, public_key_id int NOT NULL REFERENCES public_keys(id)
, balance bigint NOT NULL
, block_id int NOT NULL REFERENCES blocks(id) ON DELETE CASCADE
, block_height int NOT NULL
, block_sequence_no int NOT NULL
, block_secondary_sequence_no int NOT NULL
, UNIQUE (public_key_id,balance,block_id,block_height,block_sequence_no,block_secondary_sequence_no)
);

CREATE INDEX idx_balances_id ON balances(id);
CREATE INDEX idx_balances_public_key_id ON balances(public_key_id);
CREATE INDEX idx_balances_height_seq_nos ON balances(block_height,block_sequence_no,block_secondary_sequence_no);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉


CREATE TABLE blocks_user_commands
( block_id int NOT NULL REFERENCES blocks(id) ON DELETE CASCADE
Expand Down
4 changes: 2 additions & 2 deletions src/app/extract_blocks/extract_blocks.ml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ let fill_in_user_commands pool block_state_hash =
Public_key.Compressed.of_base58_check_exn pk_str
in
let balance_of_id id ~item =
let%map _pk_id, balance =
let%map { balance; _ } =
query_db ~f:(fun db -> Processor.Balance.load db ~id) ~item
in
balance |> Unsigned.UInt64.of_int64 |> Currency.Balance.of_uint64
Expand Down Expand Up @@ -254,7 +254,7 @@ let fill_in_internal_commands pool block_state_hash =
; receiver_balance_id
}
->
let%bind _pubkey, receiver_balance_int64 =
let%bind { balance = receiver_balance_int64; _ } =
query_db ~item:"receiver balance" ~f:(fun db ->
Processor.Balance.load db ~id:receiver_balance_id)
in
Expand Down
15 changes: 15 additions & 0 deletions src/app/migrate-balances-table/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
(executable
(package migrate_balances_table)
(name migrate_balances_table)
(public_name migrate_balances_table)
(libraries
async
core_kernel
caqti
caqti-async
caqti-driver-postgresql
logger
)
(preprocessor_deps ../../config.mlh)
(instrumentation (backend bisect_ppx))
(preprocess (pps ppx_coda ppx_version ppx_let)))
Loading