Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an overcommit option in replace to postpone merges #253

Merged
merged 4 commits into from
Dec 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
- Added `Index.Checks.cli`, which provides offline integrity checking of Index
stores. (#236)

- `Index.replace` now takes a `~overcommit` argument to postpone a merge. (#253)

- `Index.merge` is now part of the public API. (#253)

## Changed

- `sync` has to be called by the read-only instance to synchronise with the
Expand Down
18 changes: 10 additions & 8 deletions src/index.ml
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ struct
incr n;
!n

let merge ?(blocking = false) ?(filter = fun _ -> true) ?(hook = fun _ -> ())
let merge' ?(blocking = false) ?(filter = fun _ -> true) ?(hook = fun _ -> ())
~witness t =
let yield () = check_pending_cancel t in
Semaphore.acquire t.merge_lock;
Expand Down Expand Up @@ -715,7 +715,9 @@ struct
| None ->
Log.debug (fun l -> l "[%s] index is empty" (Filename.basename t.root));
Thread.return `Completed
| Some witness -> merge ?hook ~witness t
| Some witness -> merge' ?hook ~witness t

let merge t = ignore (force_merge ?hook:None t : _ async)

(** [t.merge_lock] is used to detect an ongoing merge. Other operations can
take this lock, but as they are not async, we consider this to be a good
Expand All @@ -727,7 +729,7 @@ struct
if t.config.readonly then raise RO_not_allowed;
instance_is_merging t

let replace' ?hook t key value =
let replace' ?hook ?(overcommit = false) t key value =
let t = check_open t in
Stats.incr_nb_replace ();
Log.debug (fun l ->
Expand All @@ -745,19 +747,19 @@ struct
Tbl.replace log.mem key value;
Int64.compare (IO.offset log.io) (Int64.of_int t.config.log_size) > 0)
in
if log_limit_reached then
if log_limit_reached && not overcommit then
let is_merging = instance_is_merging t in
match (t.config.throttle, is_merging) with
| `Overcommit_memory, true ->
(* Merging now would block on completion of the ongoing merge *)
None
| `Overcommit_memory, false | `Block_writes, _ ->
let hook = hook |> Option.map (fun f stage -> f (`Merge stage)) in
Some (merge ?hook ~witness:(Entry.v key value) t)
Some (merge' ?hook ~witness:(Entry.v key value) t)
else None

let replace t key value =
ignore (replace' ?hook:None t key value : _ async option)
let replace ?overcommit t key value =
ignore (replace' ?hook:None ?overcommit t key value : _ async option)

let replace_with_timer ?sampling_interval t key value =
if sampling_interval <> None then Stats.start_replace ();
Expand All @@ -777,7 +779,7 @@ struct
| None ->
Log.debug (fun l -> l "[%s] index is empty" (Filename.basename t.root))
| Some witness -> (
match Thread.await (merge ~blocking:true ~filter:f ~witness t) with
match Thread.await (merge' ~blocking:true ~filter:f ~witness t) with
| Ok (`Aborted | `Completed) -> ()
| Error (`Async_exn exn) ->
Fmt.failwith "filter: asynchronous exception during merge (%s)"
Expand Down
18 changes: 16 additions & 2 deletions src/index_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,12 @@ module type S = sig
val mem : t -> key -> bool
(** [mem t k] is [true] iff [k] is bound in [t]. *)

val replace : t -> key -> value -> unit
val replace : ?overcommit:bool -> t -> key -> value -> unit
(** [replace t k v] binds [k] to [v] in [t], replacing any existing binding of
[k]. *)
[k].

If [overcommit] is true, the operation does not triger a merge, even if
the caches are full. By default [overcommit] is false. *)

val filter : t -> (key * value -> bool) -> unit
(** [filter t p] removes all the bindings (k, v) that do not satisfy [p]. This
Expand Down Expand Up @@ -169,6 +172,16 @@ module type S = sig
(** [is_merging t] returns true if [t] is running a merge. Raises
{!RO_not_allowed} if called by a read-only index. *)

val merge : t -> unit
(** [merge t] forces a merge for [t].

If there is no merge running, this operation is non-blocking, i.e. it
returns immediately, with the merge running concurrently.

If a merge is running already, this operation blocks until the previous
merge is complete. It then launches a merge (which runs concurrently) and
returns. *)

(** Offline [fsck]-like utility for checking the integrity of Index stores
built using this module. *)
module Checks : sig
Expand Down Expand Up @@ -205,6 +218,7 @@ module type Private = sig

val replace' :
?hook:[ `Merge of merge_stages ] hook ->
?overcommit:bool ->
t ->
key ->
value ->
Expand Down