Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interrupt Transaction_pool.apply when the ledger is detached #6971

Merged
merged 6 commits into from
Dec 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/lib/interruptible/interruptible.ml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,29 @@ module Result = struct
end)
end

module Or_error = struct
type nonrec ('a, 's) t = ('a Or_error.t, 's) t

include (
Result :
module type of Result with type ('a, 'b, 's) t := ('a, 'b, 's) Result.t )
end

module Deferred_let_syntax = struct
module Let_syntax = struct
let return = return

let bind x ~f = bind (uninterruptible x) ~f

let map x ~f = map (uninterruptible x) ~f

let both x y =
Let_syntax.Let_syntax.both (uninterruptible x) (uninterruptible y)

module Open_on_rhs = Deferred.Let_syntax
end
end

let%test_unit "monad gets interrupted" =
Async.Thread_safe.block_on_async_exn (fun () ->
let r = ref 0 in
Expand Down
20 changes: 20 additions & 0 deletions src/lib/interruptible/interruptible.mli
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,23 @@ module Result : sig

include Monad.S3 with type ('a, 'b, 's) t := ('a, 'b, 's) t
end

module Or_error : sig
type nonrec ('a, 's) t = ('a Or_error.t, 's) t

include Monad.S2 with type ('a, 's) t := ('a, 's) t
end

module Deferred_let_syntax : sig
module Let_syntax : sig
val return : 'a -> ('a, _) t

val bind : 'a Deferred.t -> f:('a -> ('b, 's) t) -> ('b, 's) t

val map : 'a Deferred.t -> f:('a -> 'b) -> ('b, _) t

val both : 'a Deferred.t -> 'b Deferred.t -> ('a * 'b, _) t

module Open_on_rhs = Deferred.Let_syntax
end
end
2 changes: 2 additions & 0 deletions src/lib/merkle_ledger/any_ledger.ml
Original file line number Diff line number Diff line change
Expand Up @@ -203,5 +203,7 @@ module Make_base (Inputs : Inputs_intf) :
(* This better be the same depth inside Base or you're going to have a bad
* time *)
let depth (T ((module Base), t)) = Base.depth t

let detached_signal (T ((module Base), t)) = Base.detached_signal t
end
end
5 changes: 5 additions & 0 deletions src/lib/merkle_ledger/base_ledger_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,9 @@ module type S = sig
val merkle_path_at_index_exn : t -> int -> Path.t

val remove_accounts_exn : t -> account_id list -> unit

(** Triggers when the ledger has been detached and should no longer be
accessed.
*)
val detached_signal : t -> unit Async.Deferred.t
end
28 changes: 23 additions & 5 deletions src/lib/merkle_ledger/database.ml
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,20 @@ module Make (Inputs : Inputs_intf) :

type path = Path.t

module Detached_parent_signal = struct
type t = unit Async.Ivar.t

let sexp_of_t (_ : t) = Sexp.List []

let t_of_sexp (_ : Sexp.t) : t = Async.Ivar.create ()
end

type t =
{ uuid: Uuid.Stable.V1.t
; kvdb: Kvdb.t sexp_opaque
; depth: int
; directory: string }
; directory: string
; detached_parent_signal: Detached_parent_signal.t }
[@@deriving sexp]

let get_uuid t = t.uuid
Expand All @@ -73,14 +82,23 @@ module Make (Inputs : Inputs_intf) :
in
Unix.mkdir_p directory ;
let kvdb = Kvdb.create directory in
{uuid; kvdb; depth; directory}
{uuid; kvdb; depth; directory; detached_parent_signal= Async.Ivar.create ()}

let create_checkpoint t ~directory_name () =
let uuid = Uuid_unix.create () in
let kvdb = Kvdb.create_checkpoint t.kvdb directory_name in
{uuid; kvdb; depth= t.depth; directory= directory_name}

let close {kvdb; uuid= _; depth= _; directory= _} = Kvdb.close kvdb
{ uuid
; kvdb
; depth= t.depth
; directory= directory_name
; detached_parent_signal= Async.Ivar.create () }

let close {kvdb; uuid= _; depth= _; directory= _; detached_parent_signal} =
Kvdb.close kvdb ;
Async.Ivar.fill_if_empty detached_parent_signal ()

let detached_signal {detached_parent_signal; _} =
Async.Ivar.read detached_parent_signal

let with_ledger ~depth ~f =
let t = create ~depth () in
Expand Down
2 changes: 2 additions & 0 deletions src/lib/merkle_ledger/null_ledger.ml
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,6 @@ end = struct
let num_accounts _t = 0

let depth t = t.depth

let detached_signal _ = Async.Deferred.never ()
end
10 changes: 9 additions & 1 deletion src/lib/merkle_mask/maskable_merkle_tree.ml
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,13 @@ module Make (Inputs : Inputs_intf) = struct
(Uuid.to_string_hum parent_uuid)
suffix
in
let trigger_detach_signal =
match grandchildren with
| `Check | `Recursive ->
true
| `I_promise_I_am_reparenting_this_mask ->
false
in
( match grandchildren with
| `Check -> (
match Hashtbl.find registered_masks (Mask.Attached.get_uuid mask) with
Expand Down Expand Up @@ -203,7 +210,8 @@ module Make (Inputs : Inputs_intf) = struct
| other_masks ->
Uuid.Table.set registered_masks ~key:parent_uuid
~data:other_masks ) ) ;
Mask.Attached.unset_parent ~loc mask
Mask.Attached.unset_parent ~trigger_signal:trigger_detach_signal ~loc
mask

(** a set calls the Base implementation set, notifies registered mask childen *)
let set t location account =
Expand Down
23 changes: 21 additions & 2 deletions src/lib/merkle_mask/masking_merkle_tree.ml
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,21 @@ module Make (Inputs : Inputs_intf.S) = struct
[@@deriving sexp]
end

module Detached_parent_signal = struct
type t = unit Async.Ivar.t

let sexp_of_t (_ : t) = Sexp.List []

let t_of_sexp (_ : Sexp.t) : t = Async.Ivar.create ()
end

type t =
{ uuid: Uuid.Stable.V1.t
; account_tbl: Account.t Location_binable.Table.t
; token_owners: Key.Stable.Latest.t Token_id.Table.t
; mutable next_available_token: Token_id.t option
; mutable parent: Parent.t
; detached_parent_signal: Detached_parent_signal.t
; hash_tbl: Hash.t Addr.Table.t
; location_tbl: Location.t Account_id.Table.t
; mutable current_location: Location.t option
Expand All @@ -46,6 +55,7 @@ module Make (Inputs : Inputs_intf.S) = struct
let create ~depth () =
{ uuid= Uuid_unix.create ()
; parent= Error __LOC__
; detached_parent_signal= Async.Ivar.create ()
; account_tbl= Location_binable.Table.create ()
; token_owners= Token_id.Table.create ()
; next_available_token= None
Expand Down Expand Up @@ -93,9 +103,11 @@ module Make (Inputs : Inputs_intf.S) = struct
"Mask.Attached.with_ledger: cannot create an attached mask; use \
Mask.create and Mask.set_parent"

let unset_parent ~loc t =
let unset_parent ?(trigger_signal = true) ~loc t =
assert (Result.is_ok t.parent) ;
t.parent <- Error loc ;
if trigger_signal then
Async.Ivar.fill_if_empty t.detached_parent_signal () ;
t

let assert_is_attached t =
Expand All @@ -105,6 +117,10 @@ module Make (Inputs : Inputs_intf.S) = struct
| Ok _ ->
()

let detached_signal t =
assert_is_attached t ;
Async.Ivar.read t.detached_parent_signal

let get_parent ({parent= opt; _} as t) =
assert_is_attached t ; Result.ok_or_failwith opt

Expand Down Expand Up @@ -386,6 +402,7 @@ module Make (Inputs : Inputs_intf.S) = struct
let copy t =
{ uuid= Uuid_unix.create ()
; parent= Ok (get_parent t)
; detached_parent_signal= Async.Ivar.create ()
; account_tbl= Location_binable.Table.copy t.account_tbl
; token_owners= Token_id.Table.copy t.token_owners
; next_available_token= t.next_available_token
Expand Down Expand Up @@ -570,7 +587,8 @@ module Make (Inputs : Inputs_intf.S) = struct
Location_binable.Table.clear t.account_tbl ;
t.next_available_token <- None ;
Addr.Table.clear t.hash_tbl ;
Account_id.Table.clear t.location_tbl
Account_id.Table.clear t.location_tbl ;
Async.Ivar.fill_if_empty t.detached_parent_signal ()

let index_of_account_exn t key =
assert_is_attached t ;
Expand Down Expand Up @@ -748,6 +766,7 @@ module Make (Inputs : Inputs_intf.S) = struct

let set_parent t parent =
assert (Result.is_error t.parent) ;
assert (Option.is_none (Async.Ivar.peek t.detached_parent_signal)) ;
assert (Int.equal t.depth (Base.depth parent)) ;
t.parent <- Ok parent ;
t.current_location <- Attached.last_filled t ;
Expand Down
12 changes: 10 additions & 2 deletions src/lib/merkle_mask/masking_merkle_tree_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,16 @@ module type S = sig
(** commit all state to the parent, flush state locally *)
val commit : t -> unit

(** remove parent *)
val unset_parent : loc:string -> t -> unattached
(** [unset_parent ?trigger_signal ~loc:__LOC__ t] detaches the parent from
[t]. The [loc] argument is shown in the [Dangling_parent_reference]
exception, which will be raised if [t] is used while no parent is
registered.

If the [trigger_signal] optional argument is [true] or omitted,
[detached_signal] for [t] will be resolved. This should only be set to
[false] when the mask will be reparented.
*)
val unset_parent : ?trigger_signal:bool -> loc:string -> t -> unattached

(** get mask parent *)
val get_parent : t -> parent
Expand Down
26 changes: 23 additions & 3 deletions src/lib/network_pool/transaction_pool.ml
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ module Make0 (Base_ledger : sig
val location_of_account : t -> Account_id.t -> Location.t option

val get : t -> Location.t -> Account.t option

val detached_signal : t -> unit Deferred.t
end) (Staged_ledger : sig
type t

Expand Down Expand Up @@ -872,16 +874,17 @@ struct
Deferred.Or_error.error_string
"Got transaction pool diff when transition frontier is \
unavailable, ignoring."
| Some ledger ->
| Some ledger -> (
let trust_record =
Trust_system.record_envelope_sender t.config.trust_system
t.logger sender
in
let rec go txs' pool (accepted, rejected) =
let open Interruptible.Deferred_let_syntax in
match txs' with
| [] ->
t.pool <- pool ;
Deferred.Or_error.return
Interruptible.Or_error.return
@@ (List.rev accepted, List.rev rejected)
| tx' :: txs'' -> (
let tx = User_command.forget_check tx' in
Expand Down Expand Up @@ -1137,7 +1140,22 @@ struct
, (tx, Diff_versioned.Diff_error.Insufficient_fee)
:: rejected ) )
in
go txs t.pool ([], [])
match%map
Interruptible.force
@@
let open Interruptible.Let_syntax in
let signal =
Deferred.map (Base_ledger.detached_signal ledger) ~f:(fun () ->
Error.createf "Ledger was detatched"
|> Error.tag ~tag:"Transaction_pool.apply" )
in
let%bind () = Interruptible.lift Deferred.unit signal in
go txs t.pool ([], [])
with
| Ok res ->
res
| Error err ->
Error err )

let unsafe_apply t diff =
match%map apply t diff with
Expand Down Expand Up @@ -1233,6 +1251,8 @@ let%test_module _ =
let location_of_account _t k = Some k

let get t l = Map.find t l

let detached_signal _ = Deferred.never ()
end

module Mock_staged_ledger = struct
Expand Down