diff --git a/src/index.ml b/src/index.ml index 2dc15dce..f02b077c 100644 --- a/src/index.ml +++ b/src/index.ml @@ -2,6 +2,12 @@ module Private = struct module Fan = Fan module Io_array = Io_array module Search = Search + + module Hook = struct + type 'a t = 'a -> unit + + let v f = f + end end module type Key = sig @@ -59,7 +65,7 @@ module type S = sig val iter : (key -> value -> unit) -> t -> unit - val force_merge : t -> unit + val force_merge : ?hook:[ `After | `Before ] Private.Hook.t -> t -> unit val flush : t -> unit @@ -119,8 +125,11 @@ module Make (K : Key) (V : Value) (IO : IO) = struct mutable generation : int64; mutable index : index option; mutable log : log option; + mutable log_async : log option; mutable open_instances : int; - lock : IO.lock option; + writer_lock : IO.lock option; + mutable merge_lock : IO.Mutex.t; + mutable rename_lock : IO.Mutex.t; } type t = instance option ref @@ -133,15 +142,34 @@ module Make (K : Key) (V : Value) (IO : IO) = struct Log.debug (fun l -> l "clear %S" t.root); if t.config.readonly then raise RO_not_allowed; t.generation <- 0L; - let log = assert_and_get t.log in - IO.clear log.io; - Tbl.clear log.mem; - may - (fun (i : index) -> - IO.clear i.io; - IO.close i.io) - t.index; - t.index <- None + IO.Mutex.with_lock t.merge_lock (fun () -> + let log = assert_and_get t.log in + IO.clear log.io; + Tbl.clear log.mem; + may + (fun l -> + IO.clear l.io; + IO.close l.io) + t.log_async; + may + (fun (i : index) -> + IO.clear i.io; + IO.close i.io) + t.index; + t.index <- None; + t.log_async <- None) + + let flush_instance instance = + Log.debug (fun l -> + l "[%s] flushing instance" (Filename.basename instance.root)); + if instance.config.readonly then raise RO_not_allowed; + may (fun log -> IO.sync log.io) instance.log; + may (fun log -> IO.sync log.io) instance.log_async + + let flush t = + let t = check_open t in + Log.info (fun l -> l "[%s] flush" (Filename.basename t.root)); + IO.Mutex.with_lock t.rename_lock (fun () -> flush_instance t) let ( // ) = Filename.concat @@ -149,6 +177,8 @@ module Make (K : Key) (V : Value) (IO : IO) = struct let log_path root = index_dir root // "log" + let log_async_path root = index_dir root // "log_async" + let index_path root = index_dir root // "data" let lock_path root = index_dir root // "lock" @@ -260,16 +290,13 @@ module Make (K : Key) (V : Value) (IO : IO) = struct Log.debug (fun l -> l "[%s] not found in cache, creating a new instance" (Filename.basename root)); - let lock = + let writer_lock = if not readonly then Some (IO.lock (lock_path root)) else None in let config = { log_size = log_size * entry_size; readonly; fresh } in let log_path = log_path root in let log = - if readonly && not (Sys.file_exists log_path) then ( - Log.debug (fun l -> - l "[%s] no log file detected." (Filename.basename root)); - None ) + if readonly then if fresh then raise RO_not_allowed else None else let io = IO.v ~fresh ~readonly ~generation:0L ~fan_size:0L log_path in let entries = Int64.div (IO.offset io) entry_sizeL in @@ -298,7 +325,18 @@ module Make (K : Key) (V : Value) (IO : IO) = struct l "[%s] no index file detected." (Filename.basename root)); None ) in - { config; generation; log; root; index; open_instances = 1; lock } + { + config; + generation; + log; + log_async = None; + root; + index; + open_instances = 1; + merge_lock = IO.Mutex.create (); + rename_lock = IO.Mutex.create (); + writer_lock; + } let (`Staged v) = with_cache ~v:v_no_cache ~clear @@ -310,20 +348,18 @@ module Make (K : Key) (V : Value) (IO : IO) = struct in Search.interpolation_search (IOArray.v index.io) key ~low ~high - let try_load_log t = + let try_load_log t path = Log.debug (fun l -> - l "[%s] checking for a newly created log file" - (Filename.basename t.root)); - let log_path = log_path t.root in - if Sys.file_exists log_path then ( + l "[%s] checking on-disk %s file" (Filename.basename t.root) + (Filename.basename path)); + if Sys.file_exists path then ( let io = - IO.v ~fresh:t.config.fresh ~readonly:true ~generation:0L ~fan_size:0L - log_path + IO.v ~fresh:false ~readonly:true ~generation:0L ~fan_size:0L path in - let mem = Tbl.create 1024 in + let mem = Tbl.create 0 in iter_io (fun e -> Tbl.replace mem e.key e.value) io; - t.generation <- IO.get_generation io; - t.log <- Some { io; mem } ) + Some { io; mem } ) + else None let sync_log t = Log.debug (fun l -> @@ -332,14 +368,38 @@ module Make (K : Key) (V : Value) (IO : IO) = struct Log.debug (fun l -> l "[%s] no changes detected" (Filename.basename t.root)) in - (match t.log with None -> try_load_log t | Some _ -> ()); + let add_log_entry log e = Tbl.replace log.mem e.key e.value in + ( match t.log with + | None -> t.log <- try_load_log t (log_path t.root) + | Some _ -> () ); + ( match t.log_async with + | None -> t.log_async <- try_load_log t (log_async_path t.root) + | Some log -> ( + try + let log_offset = IO.offset log.io in + IO.close log.io; + let path = log_async_path t.root in + if Sys.file_exists path then ( + let io = + IO.v ~fresh:false ~readonly:true ~generation:0L ~fan_size:0L path + in + t.log_async <- Some { log with io }; + let new_log_offset = IO.offset io in + if log_offset <> new_log_offset then ( + Tbl.clear log.mem; + iter_io (add_log_entry log) io ) ) + else () + with IO.Bad_Read -> + (* if log_async does not exist anymore, then its contents have been + moved to log and the generation has changed *) + () ) ); match t.log with | None -> no_changes () | Some log -> let generation = IO.get_generation log.io in let log_offset = IO.offset log.io in let new_log_offset = IO.force_offset log.io in - let add_log_entry e = Tbl.replace log.mem e.key e.value in + let add_log_entry e = add_log_entry log e in if t.generation <> generation then ( Log.debug (fun l -> l "[%s] generation has changed, reading log and index from disk" @@ -364,42 +424,51 @@ module Make (K : Key) (V : Value) (IO : IO) = struct l "[%s] new entries detected, reading log from disk" (Filename.basename t.root)); iter_io add_log_entry log.io ~min:log_offset ) - else if log_offset > new_log_offset then assert false + else if log_offset > new_log_offset then + (* In that case the log has probably been emptied and is being + refilled with async_log contents. *) + no_changes () else no_changes () + let find_instance t key = + let find_if_exists ~name ~find db () = + match db with + | None -> + Log.debug (fun l -> + l "[%s] %s is not present" (Filename.basename t.root) name); + raise Not_found + | Some e -> + let ans = find e key in + Log.debug (fun l -> + l "[%s] found in %s" (Filename.basename t.root) name); + ans + in + let ( @~ ) a b = try a () with Not_found -> b () in + let find_log_index () = + find_if_exists ~name:"log" ~find:(fun log -> Tbl.find log.mem) t.log + @~ find_if_exists ~name:"index" ~find:interpolation_search t.index + in + IO.Mutex.with_lock t.rename_lock (fun () -> + if t.config.readonly then sync_log t; + find_if_exists ~name:"log_async" + ~find:(fun log -> Tbl.find log.mem) + t.log_async + @~ fun () -> + find_log_index @~ fun () -> + if t.config.readonly then ( + sync_log t; + find_log_index () ) + else raise Not_found) + let find t key = let t = check_open t in Log.info (fun l -> l "[%s] find %a" (Filename.basename t.root) K.pp key); - if t.config.readonly then sync_log t; - match t.log with - | None -> raise Not_found - | Some log -> ( - try - let value = Tbl.find log.mem key in - Log.debug (fun l -> l "[%s] found in log" (Filename.basename t.root)); - value - with Not_found -> ( - match t.index with - | None -> - Log.debug (fun l -> - l "[%s] not found" (Filename.basename t.root)); - raise Not_found - | Some index -> ( - match interpolation_search index key with - | Some e -> - Log.debug (fun l -> - l "[%s] found in index" (Filename.basename t.root)); - e - | None -> - Log.debug (fun l -> - l "[%s] not found in index" (Filename.basename t.root)); - raise Not_found ) ) ) + find_instance t key let mem t key = - let instance = check_open t in - Log.info (fun l -> - l "[%s] mem %a" (Filename.basename instance.root) K.pp key); - match find t key with _ -> true | exception Not_found -> false + let t = check_open t in + Log.info (fun l -> l "[%s] mem %a" (Filename.basename t.root) K.pp key); + match find_instance t key with _ -> true | exception Not_found -> false let append_buf_fanout fan_out hash buf_str dst_io = Fan.update fan_out hash (IO.offset dst_io); @@ -413,7 +482,7 @@ module Make (K : Key) (V : Value) (IO : IO) = struct if log_i >= Array.length log then log_i else let v = log.(log_i) in - if v.key_hash > hash_e then log_i + if v.key_hash >= hash_e then log_i else ( append_entry_fanout fan_out v dst_io; (merge_from_log [@tailcall]) fan_out log (log_i + 1) hash_e dst_io ) @@ -458,57 +527,88 @@ module Make (K : Key) (V : Value) (IO : IO) = struct in (go [@tailcall]) 0L 0 0 - let merge ~witness t = + let merge ?hook ~witness t = + IO.Mutex.lock t.merge_lock; Log.info (fun l -> l "[%s] merge" (Filename.basename t.root)); - let log = assert_and_get t.log in - let merge_path = merge_path t.root in - let generation = Int64.succ t.generation in - let log_array = - let compare_entry e e' = compare e.key_hash e'.key_hash in - let b = Array.make (Tbl.length log.mem) witness in - Tbl.fold - (fun key value i -> - b.(i) <- { key; value; key_hash = K.hash key }; - i + 1) - log.mem 0 - |> ignore; - Array.fast_sort compare_entry b; - b - in - let fan_size = - match t.index with - | None -> Tbl.length log.mem - | Some index -> - (Int64.to_int (IO.offset index.io) / entry_size) + Tbl.length log.mem + flush_instance t; + let log_async = + let io = + let log_async_path = log_async_path t.root in + IO.v ~fresh:true ~readonly:false ~generation:(Int64.succ t.generation) + ~fan_size:0L log_async_path + in + let mem = Tbl.create 0 in + { io; mem } in - let fan_out = Fan.v ~hash_size:K.hash_size ~entry_size fan_size in - let merge = - IO.v ~readonly:false ~fresh:true ~generation - ~fan_size:(Int64.of_int (Fan.exported_size fan_out)) - merge_path + t.log_async <- Some log_async; + + let go () = + may (fun f -> f `Before) hook; + let log = assert_and_get t.log in + let generation = Int64.succ t.generation in + let log_array = + let compare_entry e e' = compare e.key_hash e'.key_hash in + let b = Array.make (Tbl.length log.mem) witness in + Tbl.fold + (fun key value i -> + b.(i) <- { key; key_hash = K.hash key; value }; + i + 1) + log.mem 0 + |> ignore; + Array.fast_sort compare_entry b; + b + in + let fan_size = + match t.index with + | None -> Tbl.length log.mem + | Some index -> + (Int64.to_int (IO.offset index.io) / entry_size) + + Array.length log_array + in + let fan_out = Fan.v ~hash_size:K.hash_size ~entry_size fan_size in + let merge = + let merge_path = merge_path t.root in + IO.v ~fresh:true ~readonly:false ~generation + ~fan_size:(Int64.of_int (Fan.exported_size fan_out)) + merge_path + in + let index = + match t.index with + | None -> + let io = + IO.v ~fresh:true ~readonly:false ~generation ~fan_size:0L + (index_path t.root) + in + append_remaining_log fan_out log_array 0 merge; + { io; fan_out } + | Some index -> + let index = { index with fan_out } in + merge_with log_array index merge; + index + in + Fan.finalize index.fan_out; + IO.set_fanout merge (Fan.export index.fan_out); + IO.Mutex.with_lock t.rename_lock (fun () -> + IO.rename ~src:merge ~dst:index.io; + t.index <- Some index; + IO.clear ~keep_generation:true log.io; + Tbl.clear log.mem; + IO.set_generation log.io generation; + t.generation <- generation; + let log_async = assert_and_get t.log_async in + Tbl.iter + (fun key value -> + Tbl.replace log.mem key value; + append_key_value log.io key value) + log_async.mem; + IO.sync log.io; + t.log_async <- None); + may (fun f -> f `After) hook; + IO.clear log_async.io; + IO.close log_async.io; + IO.Mutex.unlock t.merge_lock in - ( match t.index with - | None -> - let io = - IO.v ~fresh:true ~readonly:false ~generation:0L ~fan_size:0L - (index_path t.root) - in - append_remaining_log fan_out log_array 0 merge; - t.index <- Some { io; fan_out } - | Some index -> - let index = { index with fan_out } in - merge_with log_array index merge; - t.index <- Some index ); - match t.index with - | None -> assert false - | Some index -> - Fan.finalize index.fan_out; - IO.set_fanout merge (Fan.export index.fan_out); - IO.rename ~src:merge ~dst:index.io; - IO.clear log.io; - Tbl.clear log.mem; - IO.set_generation log.io generation; - t.generation <- generation + IO.async go let get_witness t = match t.log with @@ -531,24 +631,32 @@ module Make (K : Key) (V : Value) (IO : IO) = struct assert (n = entry_size); Some (decode_entry buf 0) ) ) - let force_merge t = + let force_merge ?hook t = let t = check_open t in Log.info (fun l -> l "[%s] forced merge" (Filename.basename t.root)); - match get_witness t with + let witness = IO.Mutex.with_lock t.rename_lock (fun () -> get_witness t) in + match witness with | None -> Log.debug (fun l -> l "[%s] index is empty" (Filename.basename t.root)) - | Some witness -> merge ~witness t + | Some witness -> merge ?hook ~witness t let replace t key value = let t = check_open t in Log.info (fun l -> l "[%s] replace %a %a" (Filename.basename t.root) K.pp key V.pp value); if t.config.readonly then raise RO_not_allowed; - let log = assert_and_get t.log in - append_key_value log.io key value; - Tbl.replace log.mem key value; - if Int64.compare (IO.offset log.io) (Int64.of_int t.config.log_size) > 0 - then merge ~witness:{ key; key_hash = K.hash key; value } t + let do_merge = + IO.Mutex.with_lock t.rename_lock (fun () -> + let log = + match t.log_async with + | Some async_log -> async_log + | None -> assert_and_get t.log + in + append_key_value log.io key value; + Tbl.replace log.mem key value; + Int64.compare (IO.offset log.io) (Int64.of_int t.config.log_size) > 0) + in + if do_merge then merge ~witness:{ key; key_hash = K.hash key; value } t let iter f t = let t = check_open t in @@ -560,34 +668,29 @@ module Make (K : Key) (V : Value) (IO : IO) = struct Tbl.iter f log.mem; may (fun (i : index) -> iter_io (fun e -> f e.key e.value) i.io) - t.index - - let flush_instance instance = - Log.debug (fun l -> - l "[%s] flushing instance" (Filename.basename instance.root)); - if instance.config.readonly then raise RO_not_allowed; - let log = assert_and_get instance.log in - IO.sync log.io - - let flush t = - let instance = check_open t in - Log.info (fun l -> l "[%s] flush" (Filename.basename instance.root)); - flush_instance instance + t.index; + IO.Mutex.with_lock t.rename_lock (fun () -> + ( match t.log_async with + | None -> () + | Some log -> Tbl.iter f log.mem ); + may + (fun (i : index) -> iter_io (fun e -> f e.key e.value) i.io) + t.index) let close it = match !it with | None -> Log.info (fun l -> l "close: instance already closed") | Some t -> - (* XXX This piece of code is not thread safe. *) Log.info (fun l -> l "[%s] close" (Filename.basename t.root)); - it := None; - t.open_instances <- t.open_instances - 1; - if t.open_instances = 0 then ( - Log.debug (fun l -> - l "[%s] last open instance: closing the file descriptor" - (Filename.basename t.root)); - if not t.config.readonly then flush_instance t; - may (fun l -> IO.close l.io) t.log; - may (fun (i : index) -> IO.close i.io) t.index; - may (fun lock -> IO.unlock lock) t.lock ) + IO.Mutex.with_lock t.merge_lock (fun () -> + it := None; + t.open_instances <- t.open_instances - 1; + if t.open_instances = 0 then ( + Log.debug (fun l -> + l "[%s] last open instance: closing the file descriptor" + (Filename.basename t.root)); + if not t.config.readonly then flush_instance t; + may (fun l -> IO.close l.io) t.log; + may (fun (i : index) -> IO.close i.io) t.index; + may (fun lock -> IO.unlock lock) t.writer_lock )) end diff --git a/src/index.mli b/src/index.mli index 02430b03..ef29e63b 100644 --- a/src/index.mli +++ b/src/index.mli @@ -49,6 +49,22 @@ module type Key = sig (** Formatter for keys *) end +(** These modules should not be used. They are exposed purely for testing + purposes. *) +module Private : sig + module Search : module type of Search + + module Io_array : module type of Io_array + + module Fan : module type of Fan + + module Hook : sig + type 'a t + + val v : ('a -> unit) -> 'a t + end +end + (** The input of [Make] for values. The same requirements as for [Key] apply. *) module type Value = sig type t @@ -114,7 +130,7 @@ module type S = sig recent replacements of existing values (after the last merge), this will hit both the new and old bindings. *) - val force_merge : t -> unit + val force_merge : ?hook:[ `After | `Before ] Private.Hook.t -> t -> unit (** [force_merge t] forces a merge for [t]. *) val flush : t -> unit @@ -126,13 +142,3 @@ end module Make (K : Key) (V : Value) (IO : IO) : S with type key = K.t and type value = V.t - -(** These modules should not be used. They are exposed purely for testing - purposes. *) -module Private : sig - module Search : module type of Search - - module Io_array : module type of Io_array - - module Fan : module type of Fan -end diff --git a/src/io.mli b/src/io.mli index 67f60465..182ed8f0 100644 --- a/src/io.mli +++ b/src/io.mli @@ -19,7 +19,7 @@ module type S = sig val read : t -> off:int64 -> len:int -> bytes -> int - val clear : t -> unit + val clear : ?keep_generation:bool -> t -> unit val sync : t -> unit @@ -44,4 +44,20 @@ module type S = sig val lock : string -> lock val unlock : lock -> unit + + exception Bad_Read + + module Mutex : sig + type t + + val create : unit -> t + + val lock : t -> unit + + val unlock : t -> unit + + val with_lock : t -> (unit -> 'a) -> 'a + end + + val async : (unit -> 'a) -> unit end diff --git a/src/io_array.ml b/src/io_array.ml index f3f79d62..1860bee7 100644 --- a/src/io_array.ml +++ b/src/io_array.ml @@ -78,7 +78,7 @@ module Make (IO : Io.S) (Elt : ELT) : Log.warn (fun m -> m "Requested pre-fetch region is empty: [%Ld, %Ld]" low high) else if range > max_buffer_size then - Log.debug (fun m -> + Log.warn (fun m -> m "Requested pre-fetch [%Ld, %Ld] is larger than %d" low high max_buffer_size) else @@ -96,7 +96,7 @@ module Make (IO : Io.S) (Elt : ELT) : pre-fetch [%Ld, %Ld]" low_buf high_buf low high) else ( - Log.debug (fun m -> + Log.warn (fun m -> m "Current buffer [%Ld, %Ld] insufficient. Prefetching in \ range [%Ld, %Ld]" diff --git a/src/search.ml b/src/search.ml index 794b4c1d..625cb709 100644 --- a/src/search.ml +++ b/src/search.ml @@ -52,7 +52,7 @@ module type S = sig module Array : ARRAY with type elt = Entry.t val interpolation_search : - Array.t -> Entry.Key.t -> low:int64 -> high:int64 -> Entry.Value.t option + Array.t -> Entry.Key.t -> low:int64 -> high:int64 -> Entry.Value.t end module Make @@ -83,17 +83,16 @@ module Make let look_around array key key_metric index = let rec search (op : int64 -> int64) curr = let i = op curr in - if i < 0L || i >= Array.length array then None + if i < 0L || i >= Array.length array then raise Not_found else let e = array.(i) in let e_metric = Metric.of_entry e in - if not Metric.(key_metric = e_metric) then None - else if Key.equal (Entry.to_key e) key then Some (Entry.to_value e) + if not Metric.(key_metric = e_metric) then raise Not_found + else if Key.equal (Entry.to_key e) key then Entry.to_value e else (search [@tailcall]) op i in - match search Int64.succ index with - | Some e -> Some e - | None -> (search [@tailcall]) Int64.pred index + try search Int64.pred index + with Not_found -> (search [@tailcall]) Int64.succ index (** Improves over binary search in cases where the values in some array are uniformly distributed according to some metric (such as a hash). *) @@ -101,21 +100,21 @@ module Make let key_metric = Metric.of_key key in (* The core of the search *) let rec search low high lowest_entry highest_entry = - if high < low then None + if high < low then raise Not_found else ( Array.pre_fetch array ~low ~high; let lowest_entry = Lazy.force lowest_entry in if high = low then if Key.(key = Entry.to_key lowest_entry) then - Some (Entry.to_value lowest_entry) - else None + Entry.to_value lowest_entry + else raise Not_found else let lowest_metric = Metric.of_entry lowest_entry in - if Metric.(lowest_metric > key_metric) then None + if Metric.(lowest_metric > key_metric) then raise Not_found else let highest_entry = Lazy.force highest_entry in let highest_metric = Metric.of_entry highest_entry in - if Metric.(highest_metric < key_metric) then None + if Metric.(highest_metric < key_metric) then raise Not_found else let next_index = Metric.linear_interpolate ~low:(low, lowest_metric) @@ -124,7 +123,7 @@ module Make let e = array.(next_index) in let e_metric = Metric.of_entry e in if Metric.(key_metric = e_metric) then - if Key.(key = Entry.to_key e) then Some (Entry.to_value e) + if Key.(key = Entry.to_key e) then Entry.to_value e else look_around array key key_metric next_index else if Metric.(key_metric > e_metric) then (search [@tailcall]) @@ -137,6 +136,6 @@ module Make (Lazy.from_val lowest_entry) (lazy array.(Int64.(pred next_index))) ) in - if high < 0L then None + if high < 0L then raise Not_found else (search [@tailcall]) low high (lazy array.(low)) (lazy array.(high)) end diff --git a/src/search.mli b/src/search.mli index 6fd6e81d..8ab54261 100644 --- a/src/search.mli +++ b/src/search.mli @@ -48,7 +48,7 @@ module type S = sig module Array : ARRAY with type elt = Entry.t val interpolation_search : - Array.t -> Entry.Key.t -> low:int64 -> high:int64 -> Entry.Value.t option + Array.t -> Entry.Key.t -> low:int64 -> high:int64 -> Entry.Value.t end module Make diff --git a/src/unix/dune b/src/unix/dune index 9ccc6583..29814900 100644 --- a/src/unix/dune +++ b/src/unix/dune @@ -2,4 +2,4 @@ (public_name index.unix) (name index_unix) (c_names pread pwrite) - (libraries logs unix index)) + (libraries index logs logs.threaded threads unix)) diff --git a/src/unix/index_unix.ml b/src/unix/index_unix.ml index 8b83f362..b274e1af 100644 --- a/src/unix/index_unix.ml +++ b/src/unix/index_unix.ml @@ -51,6 +51,8 @@ module IO : Index.IO = struct in get_uint64 buf 0 + exception Bad_Read + module Raw = struct type t = { fd : Unix.file_descr; mutable cursor : int64 } @@ -89,7 +91,10 @@ module IO : Index.IO = struct stats.nb_writes <- succ stats.nb_writes let unsafe_read t ~off ~len buf = - let n = really_read t.fd off len buf in + let n = + try really_read t.fd off len buf + with Unix.Unix_error (Unix.EBADF, "read", "") -> raise Bad_Read + in t.cursor <- off ++ Int64.of_int n; stats.bytes_read <- stats.bytes_read + n; stats.nb_reads <- succ stats.nb_reads; @@ -175,16 +180,8 @@ module IO : Index.IO = struct else ( Raw.unsafe_write t.raw ~off:t.flushed buf; Raw.Offset.set t.raw offset; - - (* concurrent append might happen so here t.offset might differ - from offset *) - if - not (t.flushed ++ Int64.of_int (String.length buf) = t.header ++ offset) - then - Fmt.failwith "sync error: %s flushed=%Ld buf=%Ld offset+header=%Ld\n%!" - t.file t.flushed - (Int64.of_int (String.length buf)) - (offset ++ t.header); + assert ( + t.flushed ++ Int64.of_int (String.length buf) = t.header ++ offset ); t.flushed <- offset ++ t.header ) let name t = t.file @@ -193,6 +190,7 @@ module IO : Index.IO = struct sync src; Unix.close dst.raw.fd; Unix.rename src.file dst.file; + Buffer.clear dst.buf; dst.header <- src.header; dst.fan_size <- src.fan_size; dst.offset <- src.offset; @@ -222,9 +220,14 @@ module IO : Index.IO = struct let version t = t.version - let get_generation t = Raw.Generation.get t.raw + let get_generation t = + let i = Raw.Generation.get t.raw in + Log.debug (fun m -> m "get_generation: %Ld" i); + i - let set_generation t = Raw.Generation.set t.raw + let set_generation t i = + Log.debug (fun m -> m "set_generation: %Ld" i); + Raw.Generation.set t.raw i let get_fanout t = Raw.Fan.get t.raw @@ -257,10 +260,10 @@ module IO : Index.IO = struct in (aux [@tailcall]) dirname (fun () -> ()) - let clear t = + let clear ?(keep_generation = false) t = t.offset <- 0L; t.flushed <- t.header; - Raw.Generation.set t.raw 0L; + if not keep_generation then Raw.Generation.set t.raw 0L; Raw.Offset.set t.raw t.offset; Raw.Fan.set t.raw ""; Buffer.clear t.buf @@ -366,6 +369,22 @@ module IO : Index.IO = struct let unlock { path; fd } = Log.debug (fun l -> l "Unlocking %s" path); Unix.close fd + + module Mutex = struct + include Mutex + + let with_lock t f = + Mutex.lock t; + try + let ans = f () in + Mutex.unlock t; + ans + with e -> + Mutex.unlock t; + raise e + end + + let async f = ignore (Thread.create f ()) end module Make (K : Index.Key) (V : Index.Value) = Index.Make (K) (V) (IO) diff --git a/test/search.ml b/test/search.ml index e459ede1..79be344f 100644 --- a/test/search.ml +++ b/test/search.ml @@ -65,7 +65,7 @@ let interpolation_unique () = Search.interpolation_search array i ~low:Int64.(zero) ~high:Int64.(pred length) - |> Alcotest.(check (option string)) "" (Some v)) + |> Alcotest.(check string) "" v) array let () = diff --git a/test/unix/common.ml b/test/unix/common.ml index 166d19e1..3bf9a596 100644 --- a/test/unix/common.ml +++ b/test/unix/common.ml @@ -18,6 +18,7 @@ let reporter ?(prefix = "") () = { Logs.report } let report () = + Logs_threaded.enable (); Logs.set_level (Some Logs.Debug); Logs.set_reporter (reporter ()) diff --git a/test/unix/force_merge.ml b/test/unix/force_merge.ml index fcb9d119..d4d28232 100644 --- a/test/unix/force_merge.ml +++ b/test/unix/force_merge.ml @@ -1,3 +1,4 @@ +module Hook = Index.Private.Hook open Common let root = Filename.concat "_tests" "unix.force_merge" @@ -6,6 +7,10 @@ module Context = Common.Make_context (struct let root = root end) +let after f = Hook.v (function `After -> f () | _ -> ()) + +let before f = Hook.v (function `Before -> f () | _ -> ()) + let test_find_present t tbl = Hashtbl.iter (fun k v -> @@ -117,6 +122,7 @@ let readonly_and_merge () = let k1 = Key.v () in let v1 = Value.v () in Index.replace w k1 v1; + Index.flush w; Index.force_merge w; test_one_entry r1 k1 v1; test_one_entry r2 k1 v1; @@ -125,6 +131,7 @@ let readonly_and_merge () = let k2 = Key.v () in let v2 = Value.v () in Index.replace w k2 v2; + Index.flush w; test_one_entry r1 k1 v1; Index.force_merge w; test_one_entry r2 k2 v2; @@ -136,15 +143,18 @@ let readonly_and_merge () = let v3 = Value.v () in test_one_entry r1 k1 v1; Index.replace w k2 v2; + Index.flush w; Index.force_merge w; test_one_entry r1 k1 v1; Index.replace w k3 v3; + Index.flush w; Index.force_merge w; test_one_entry r3 k3 v3; let k2 = Key.v () in let v2 = Value.v () in Index.replace w k2 v2; + Index.flush w; test_one_entry w k2 v2; Index.force_merge w; test_one_entry w k2 v2; @@ -154,26 +164,79 @@ let readonly_and_merge () = let k2 = Key.v () in let v2 = Value.v () in Index.replace w k2 v2; + Index.flush w; test_one_entry r2 k1 v1; Index.force_merge w; test_one_entry w k2 v2; test_one_entry r2 k2 v2; test_one_entry r3 k2 v2 in - let rec loop i = - if i = 0 then () - else ( - interleave (); - loop (i - 1) ) - in - loop 10; + for _ = 1 to 10 do + interleave () + done; test_fd () +(* A force merge has an implicit flush, however, if the replace occurs at the end of the merge, the value is not flushed *) +let write_after_merge () = + let { Context.rw; clone; _ } = Context.full_index () in + let w = rw in + let r1 = clone ~readonly:true in + let k1 = Key.v () in + let v1 = Value.v () in + let k2 = Key.v () in + let v2 = Value.v () in + Index.replace w k1 v1; + let hook = after (fun () -> Index.replace w k2 v2) in + Index.force_merge ~hook w; + test_one_entry r1 k1 v1; + Alcotest.check_raises (Printf.sprintf "Absent value was found: %s." k2) + Not_found (fun () -> ignore (Index.find r1 k2)) + +let replace_while_merge () = + let { Context.rw; clone; _ } = Context.full_index () in + let w = rw in + let r1 = clone ~readonly:true in + let k1 = Key.v () in + let v1 = Value.v () in + let k2 = Key.v () in + let v2 = Value.v () in + Index.replace w k1 v1; + let hook = + before (fun () -> + Index.replace w k2 v2; + test_one_entry w k2 v2) + in + Index.force_merge ~hook w; + test_one_entry r1 k1 v1 + +(* note that here we cannot do + `test_one_entry r1 k2 v2` + as there is no way to guarantee that the latests value + added by a RW instance is found by a RO instance +*) + +let find_while_merge () = + let { Context.rw; clone; _ } = Context.full_index () in + let w = rw in + let k1 = Key.v () in + let v1 = Value.v () in + Index.replace w k1 v1; + let f () = test_one_entry w k1 v1 in + Index.force_merge ~hook:(after f) w; + Index.force_merge ~hook:(after f) w; + let r1 = clone ~readonly:true in + let f () = test_one_entry r1 k1 v1 in + Index.force_merge ~hook:(before f) w; + Index.force_merge ~hook:(before f) w + let tests = [ ("readonly in sequence", `Quick, readonly_s); ("readonly interleaved", `Quick, readonly); ("interleaved merge", `Quick, readonly_and_merge); + ("write at the end of merge", `Quick, write_after_merge); + ("write in log_async", `Quick, replace_while_merge); + ("find while merging", `Quick, find_while_merge); ] (* Unix.sleep 10 *) diff --git a/test/unix/main.ml b/test/unix/main.ml index dd4f56ba..dd72c787 100644 --- a/test/unix/main.ml +++ b/test/unix/main.ml @@ -168,7 +168,7 @@ module DuplicateInstance = struct let fail_restart_ro_fresh () = let reuse_name = Context.fresh_name "empty_index" in let rw = Index.v ~fresh:true ~readonly:false ~log_size:4 reuse_name in - let exn = Failure "IO.v: cannot reset a readonly file" in + let exn = I.RO_not_allowed in Alcotest.check_raises "Index readonly cannot be fresh." exn (fun () -> ignore (Index.v ~fresh:true ~readonly:true ~log_size:4 reuse_name)); Index.close rw