Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent merges #118

Merged
merged 24 commits into from
Nov 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
381 changes: 242 additions & 139 deletions src/index.ml

Large diffs are not rendered by default.

28 changes: 17 additions & 11 deletions src/index.mli
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,22 @@ module type Key = sig
(** Formatter for keys *)
end

(** These modules should not be used. They are exposed purely for testing
purposes. *)
module Private : sig
module Search : module type of Search

module Io_array : module type of Io_array

module Fan : module type of Fan

module Hook : sig
type 'a t

val v : ('a -> unit) -> 'a t
end
end

(** The input of [Make] for values. The same requirements as for [Key] apply. *)
module type Value = sig
type t
Expand Down Expand Up @@ -114,7 +130,7 @@ module type S = sig
recent replacements of existing values (after the last merge), this will
hit both the new and old bindings. *)

val force_merge : t -> unit
val force_merge : ?hook:[ `After | `Before ] Private.Hook.t -> t -> unit
(** [force_merge t] forces a merge for [t]. *)

val flush : t -> unit
Expand All @@ -126,13 +142,3 @@ end

module Make (K : Key) (V : Value) (IO : IO) :
S with type key = K.t and type value = V.t

(** These modules should not be used. They are exposed purely for testing
purposes. *)
module Private : sig
module Search : module type of Search

module Io_array : module type of Io_array

module Fan : module type of Fan
end
18 changes: 17 additions & 1 deletion src/io.mli
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ module type S = sig

val read : t -> off:int64 -> len:int -> bytes -> int

val clear : t -> unit
val clear : ?keep_generation:bool -> t -> unit

val sync : t -> unit

Expand All @@ -44,4 +44,20 @@ module type S = sig
val lock : string -> lock

val unlock : lock -> unit

exception Bad_Read

module Mutex : sig
type t

val create : unit -> t

val lock : t -> unit

val unlock : t -> unit

val with_lock : t -> (unit -> 'a) -> 'a
end

val async : (unit -> 'a) -> unit
end
4 changes: 2 additions & 2 deletions src/io_array.ml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ module Make (IO : Io.S) (Elt : ELT) :
Log.warn (fun m ->
m "Requested pre-fetch region is empty: [%Ld, %Ld]" low high)
else if range > max_buffer_size then
Log.debug (fun m ->
Log.warn (fun m ->
m "Requested pre-fetch [%Ld, %Ld] is larger than %d" low high
max_buffer_size)
else
Expand All @@ -96,7 +96,7 @@ module Make (IO : Io.S) (Elt : ELT) :
pre-fetch [%Ld, %Ld]"
low_buf high_buf low high)
else (
Log.debug (fun m ->
Log.warn (fun m ->
m
"Current buffer [%Ld, %Ld] insufficient. Prefetching in \
range [%Ld, %Ld]"
Expand Down
27 changes: 13 additions & 14 deletions src/search.ml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ module type S = sig
module Array : ARRAY with type elt = Entry.t

val interpolation_search :
Array.t -> Entry.Key.t -> low:int64 -> high:int64 -> Entry.Value.t option
Array.t -> Entry.Key.t -> low:int64 -> high:int64 -> Entry.Value.t
end

module Make
Expand Down Expand Up @@ -83,39 +83,38 @@ module Make
let look_around array key key_metric index =
let rec search (op : int64 -> int64) curr =
let i = op curr in
if i < 0L || i >= Array.length array then None
if i < 0L || i >= Array.length array then raise Not_found
else
let e = array.(i) in
let e_metric = Metric.of_entry e in
if not Metric.(key_metric = e_metric) then None
else if Key.equal (Entry.to_key e) key then Some (Entry.to_value e)
if not Metric.(key_metric = e_metric) then raise Not_found
else if Key.equal (Entry.to_key e) key then Entry.to_value e
else (search [@tailcall]) op i
in
match search Int64.succ index with
| Some e -> Some e
| None -> (search [@tailcall]) Int64.pred index
try search Int64.pred index
with Not_found -> (search [@tailcall]) Int64.succ index

(** Improves over binary search in cases where the values in some array are
uniformly distributed according to some metric (such as a hash). *)
let interpolation_search array key ~low ~high =
let key_metric = Metric.of_key key in
(* The core of the search *)
let rec search low high lowest_entry highest_entry =
if high < low then None
if high < low then raise Not_found
else (
Array.pre_fetch array ~low ~high;
let lowest_entry = Lazy.force lowest_entry in
if high = low then
if Key.(key = Entry.to_key lowest_entry) then
Some (Entry.to_value lowest_entry)
else None
Entry.to_value lowest_entry
else raise Not_found
else
let lowest_metric = Metric.of_entry lowest_entry in
if Metric.(lowest_metric > key_metric) then None
if Metric.(lowest_metric > key_metric) then raise Not_found
else
let highest_entry = Lazy.force highest_entry in
let highest_metric = Metric.of_entry highest_entry in
if Metric.(highest_metric < key_metric) then None
if Metric.(highest_metric < key_metric) then raise Not_found
else
let next_index =
Metric.linear_interpolate ~low:(low, lowest_metric)
Expand All @@ -124,7 +123,7 @@ module Make
let e = array.(next_index) in
let e_metric = Metric.of_entry e in
if Metric.(key_metric = e_metric) then
if Key.(key = Entry.to_key e) then Some (Entry.to_value e)
if Key.(key = Entry.to_key e) then Entry.to_value e
else look_around array key key_metric next_index
else if Metric.(key_metric > e_metric) then
(search [@tailcall])
Expand All @@ -137,6 +136,6 @@ module Make
(Lazy.from_val lowest_entry)
(lazy array.(Int64.(pred next_index))) )
in
if high < 0L then None
if high < 0L then raise Not_found
else (search [@tailcall]) low high (lazy array.(low)) (lazy array.(high))
end
2 changes: 1 addition & 1 deletion src/search.mli
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ module type S = sig
module Array : ARRAY with type elt = Entry.t

val interpolation_search :
Array.t -> Entry.Key.t -> low:int64 -> high:int64 -> Entry.Value.t option
Array.t -> Entry.Key.t -> low:int64 -> high:int64 -> Entry.Value.t
end

module Make
Expand Down
2 changes: 1 addition & 1 deletion src/unix/dune
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
(public_name index.unix)
(name index_unix)
(c_names pread pwrite)
(libraries logs unix index))
(libraries index logs logs.threaded threads unix))
49 changes: 34 additions & 15 deletions src/unix/index_unix.ml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ module IO : Index.IO = struct
in
get_uint64 buf 0

exception Bad_Read

module Raw = struct
type t = { fd : Unix.file_descr; mutable cursor : int64 }

Expand Down Expand Up @@ -89,7 +91,10 @@ module IO : Index.IO = struct
stats.nb_writes <- succ stats.nb_writes

let unsafe_read t ~off ~len buf =
let n = really_read t.fd off len buf in
let n =
try really_read t.fd off len buf
with Unix.Unix_error (Unix.EBADF, "read", "") -> raise Bad_Read
in
t.cursor <- off ++ Int64.of_int n;
stats.bytes_read <- stats.bytes_read + n;
stats.nb_reads <- succ stats.nb_reads;
Expand Down Expand Up @@ -175,16 +180,8 @@ module IO : Index.IO = struct
else (
Raw.unsafe_write t.raw ~off:t.flushed buf;
Raw.Offset.set t.raw offset;

(* concurrent append might happen so here t.offset might differ
from offset *)
if
not (t.flushed ++ Int64.of_int (String.length buf) = t.header ++ offset)
then
Fmt.failwith "sync error: %s flushed=%Ld buf=%Ld offset+header=%Ld\n%!"
t.file t.flushed
(Int64.of_int (String.length buf))
(offset ++ t.header);
assert (
t.flushed ++ Int64.of_int (String.length buf) = t.header ++ offset );
t.flushed <- offset ++ t.header )

let name t = t.file
Expand All @@ -193,6 +190,7 @@ module IO : Index.IO = struct
sync src;
Unix.close dst.raw.fd;
Unix.rename src.file dst.file;
Buffer.clear dst.buf;
dst.header <- src.header;
dst.fan_size <- src.fan_size;
dst.offset <- src.offset;
Expand Down Expand Up @@ -222,9 +220,14 @@ module IO : Index.IO = struct

let version t = t.version

let get_generation t = Raw.Generation.get t.raw
let get_generation t =
let i = Raw.Generation.get t.raw in
Log.debug (fun m -> m "get_generation: %Ld" i);
i

let set_generation t = Raw.Generation.set t.raw
let set_generation t i =
Log.debug (fun m -> m "set_generation: %Ld" i);
Raw.Generation.set t.raw i

let get_fanout t = Raw.Fan.get t.raw

Expand Down Expand Up @@ -257,10 +260,10 @@ module IO : Index.IO = struct
in
(aux [@tailcall]) dirname (fun () -> ())

let clear t =
let clear ?(keep_generation = false) t =
t.offset <- 0L;
t.flushed <- t.header;
Raw.Generation.set t.raw 0L;
if not keep_generation then Raw.Generation.set t.raw 0L;
Raw.Offset.set t.raw t.offset;
Raw.Fan.set t.raw "";
Buffer.clear t.buf
Expand Down Expand Up @@ -366,6 +369,22 @@ module IO : Index.IO = struct
let unlock { path; fd } =
Log.debug (fun l -> l "Unlocking %s" path);
Unix.close fd

module Mutex = struct
include Mutex

let with_lock t f =
Mutex.lock t;
try
let ans = f () in
Mutex.unlock t;
ans
with e ->
Mutex.unlock t;
raise e
end

let async f = ignore (Thread.create f ())
end

module Make (K : Index.Key) (V : Index.Value) = Index.Make (K) (V) (IO)
Expand Down
2 changes: 1 addition & 1 deletion test/search.ml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ let interpolation_unique () =
Search.interpolation_search array i
~low:Int64.(zero)
~high:Int64.(pred length)
|> Alcotest.(check (option string)) "" (Some v))
|> Alcotest.(check string) "" v)
array

let () =
Expand Down
1 change: 1 addition & 0 deletions test/unix/common.ml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ let reporter ?(prefix = "") () =
{ Logs.report }

let report () =
Logs_threaded.enable ();
Logs.set_level (Some Logs.Debug);
Logs.set_reporter (reporter ())

Expand Down
Loading