Skip to content

Commit

Permalink
Group generation and offset in one read/write
Browse files Browse the repository at this point in the history
  • Loading branch information
icristescu committed Jun 28, 2020
1 parent 8d0f15b commit a973d87
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 54 deletions.
23 changes: 8 additions & 15 deletions src/index.ml
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,16 @@ struct
Mutex.with_lock t.merge_lock (fun () ->
t.generation <- Int64.succ t.generation;
let log = assert_and_get t.log in
IO.set_generation log.io t.generation;
IO.clear ~keep_generation:true log.io;
IO.clear ~generation:t.generation log.io;
Tbl.clear log.mem;
may
(fun l ->
IO.set_generation l.io t.generation;
IO.clear ~keep_generation:true l.io;
IO.clear ~generation:t.generation log.io;
IO.close l.io)
t.log_async;
may
(fun (i : index) ->
IO.set_generation i.io t.generation;
IO.clear ~keep_generation:true i.io;
IO.clear ~generation:t.generation i.io;
IO.close i.io)
t.index;
t.index <- None;
Expand Down Expand Up @@ -262,10 +259,9 @@ struct
match t.log with
| None -> sync_log_async ()
| Some log ->
let generation = IO.get_generation log.io in
let log_offset = IO.offset log.io in
may (fun f -> f `Before_offset_read) hook;
let new_log_offset = IO.force_offset log.io in
let new_log_offset, generation = IO.get_header log.io in
let add_log_entry e = add_log_entry log e in
sync_log_async ~generation_change:(t.generation <> generation) ();
if t.generation <> generation then (
Expand Down Expand Up @@ -375,7 +371,7 @@ struct
append_key_value log.io e.key e.value)
io;
IO.sync log.io;
IO.clear io)
IO.clear ~generation:0L io)
log;
IO.close io );
let generation =
Expand Down Expand Up @@ -593,10 +589,8 @@ struct
Mutex.with_lock t.rename_lock (fun () ->
IO.rename ~src:merge ~dst:index.io;
t.index <- Some index;
IO.set_generation log.io generation;
t.generation <- generation;
may (fun f -> f `After_generation_change) hook;
IO.clear ~keep_generation:true log.io;
IO.clear ~generation log.io;
Tbl.clear log.mem;
may (fun f -> f `After_clear) hook;
let log_async = assert_and_get t.log_async in
Expand All @@ -608,7 +602,7 @@ struct
IO.sync log.io;
t.log_async <- None);
may (fun f -> f `After) hook;
IO.clear log_async.io;
IO.clear ~generation log_async.io;
IO.close log_async.io;
Mutex.unlock t.merge_lock;
`Completed
Expand Down Expand Up @@ -764,8 +758,7 @@ module Private = struct
val close' : hook:[ `Abort_signalled ] Hook.t -> t -> unit
val force_merge :
?hook:
[ `After | `After_clear | `After_generation_change | `Before ] Hook.t ->
?hook:[ `After | `After_clear | `Before ] Hook.t ->
t ->
[ `Completed | `Aborted ] async
Expand Down
11 changes: 4 additions & 7 deletions src/index_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ module type S = sig

val ro_sync : t -> unit
(** [ro_sync t] syncs a read-only index with the files on disk. Raises
[RW_not_allowed] if called by an read-write index. *)
[RW_not_allowed] if called by a read-write index. *)
end

module type Index = sig
Expand Down Expand Up @@ -250,8 +250,7 @@ module type Index = sig
(** The type of asynchronous computation. *)

val force_merge :
?hook:
[ `After | `After_clear | `After_generation_change | `Before ] Hook.t ->
?hook:[ `After | `After_clear | `Before ] Hook.t ->
t ->
[ `Completed | `Aborted ] async
(** [force_merge t] forces a merge for [t]. Optionally, a hook can be
Expand All @@ -261,8 +260,6 @@ module type Index = sig
lock);
- [`After_clear]: immediately after clearing the log, at the end of a
merge;
- [`After_generation_change]: immediately after increasing the
generation, at the end of a merge;
- [`After]: immediately after merging (while holding the merge lock). *)

val await : 'a async -> ('a, [ `Async_exn of exn ]) result
Expand All @@ -279,8 +276,8 @@ module type Index = sig
(** Time ro_sync operations. *)

val ro_sync' : ?hook:[ `Before_offset_read ] Hook.t -> t -> unit
(** [`Before_offset_read]: after reading the generation number but before
reading the offset. *)
(** [`Before_offset_read]: after reading the generation number and the
offset. *)
end

module Make (K : Key) (V : Value) (IO : IO) (M : MUTEX) (T : THREAD) :
Expand Down
6 changes: 5 additions & 1 deletion src/io.mli
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ module type S = sig

val read : t -> off:int64 -> len:int -> bytes -> int

val clear : ?keep_generation:bool -> t -> unit
val clear : generation:int64 -> t -> unit

val sync : ?with_fsync:bool -> t -> unit

Expand All @@ -61,4 +61,8 @@ module type S = sig
val lock : string -> lock

val unlock : lock -> unit

val set_header : t -> offset:int64 -> generation:int64 -> unit

val get_header : t -> int64 * int64
end
17 changes: 14 additions & 3 deletions src/unix/index_unix.ml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,18 @@ module IO : Index.IO = struct
assert (Int64.equal (Int64.of_int (String.length buf)) t.fan_size);
Raw.Fan.set t.raw buf

let get_header t =
let h = Raw.Header.get t.raw in
t.offset <- h.offset;
Log.debug (fun m ->
m "[%s] get_header, generation = %Ld" t.file h.generation);
(h.offset, h.generation)

let set_header t ~offset ~generation =
let version = version () in
Log.debug (fun m -> m "[%s] set_header: generation = %Ld" t.file generation);
Raw.Header.set t.raw { offset; version; generation }

let readonly t = t.readonly

let protect_unix_exn = function
Expand All @@ -132,11 +144,10 @@ module IO : Index.IO = struct
in
(aux [@tailcall]) dirname (fun () -> ())

let clear ?(keep_generation = false) t =
let clear ~generation t =
t.offset <- 0L;
t.flushed <- t.header;
if not keep_generation then Raw.Generation.set t.raw 0L;
Raw.Offset.set t.raw t.offset;
set_header ~offset:t.offset ~generation t;
Raw.Fan.set t.raw "";
Buffer.clear t.buf

Expand Down
25 changes: 25 additions & 0 deletions src/unix/raw.ml
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,28 @@ module Fan = struct
assert (n = size);
Bytes.unsafe_to_string buf
end

module Header = struct
type h = { offset : int64; version : string; generation : int64 }

(* offset + version + generation*)
let len = 8 + 8 + 8

let get t =
let buf = Bytes.create len in
let n = unsafe_read t ~off:0L ~len buf in
assert (n = len);
let buf = Bytes.unsafe_to_string buf in
let offset = String.sub buf 0 8 |> decode_int64 in
let version = String.sub buf 8 8 in
let generation = String.sub buf 16 8 |> decode_int64 in
{ offset; version; generation }

let set t h =
let buf = Buffer.create len in
encode_int64 h.offset |> Buffer.add_string buf;
h.version |> Buffer.add_string buf;
encode_int64 h.generation |> Buffer.add_string buf;
let buf = Buffer.contents buf in
unsafe_write t ~off:0L buf
end
8 changes: 8 additions & 0 deletions src/unix/raw.mli
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,11 @@ module Fan : sig

val set_size : t -> int64 -> unit
end

module Header : sig
type h = { offset : int64; version : string; generation : int64 }

val get : t -> h

val set : t -> h -> unit
end
27 changes: 0 additions & 27 deletions test/unix/force_merge.ml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ let after f = Hook.v (function `After -> f () | _ -> ())

let after_clear f = Hook.v (function `After_clear -> f () | _ -> ())

let after_generation_change f =
Hook.v (function `After_generation_change -> f () | _ -> ())

let before f = Hook.v (function `Before -> f () | _ -> ())

let before_offset_read f =
Expand Down Expand Up @@ -323,27 +320,6 @@ let sync_after_clear_log () =
Index.close rw;
Index.close ro

(** The same test as above, but with a hook after the generation changed.*)
let sync_after_generation_change () =
let Context.{ rw; clone; _ } = Context.empty_index () in
let ro = clone ~readonly:true () in
let k1, v1 = (Key.v (), Value.v ()) in
Index.replace rw k1 v1;
Index.flush rw;
let hook = after_generation_change (fun () -> Index.ro_sync ro) in
let t = Index.force_merge ~hook rw in
Index.await t |> check_completed;
test_one_entry ro k1 v1;
let k2, v2 = (Key.v (), Value.v ()) in
Index.replace rw k2 v2;
Index.flush rw;
Index.ro_sync ro;
let hook = after_generation_change (fun () -> test_one_entry ro k1 v1) in
let t = Index.force_merge ~hook rw in
Index.await t |> check_completed;
Index.close rw;
Index.close ro

(** during a merge RO sync can miss a value if it reads the generation before
the generation is updated. *)
let merge_during_sync () =
Expand Down Expand Up @@ -373,8 +349,5 @@ let tests =
("find in async without log", `Quick, find_in_async_generation_change);
("find in async with log", `Quick, find_in_async_same_generation);
("sync and find after log cleared", `Quick, sync_after_clear_log);
( "sync and find after generation change",
`Quick,
sync_after_generation_change );
("merge during ro sync", `Quick, merge_during_sync);
]
2 changes: 1 addition & 1 deletion test/unix/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ module Close = struct
| `Before ->
Fmt.pr "Child: issuing request to close the index\n%!";
Mutex.unlock close_request
| `After_clear | `After_generation_change | `After ->
| `After_clear | `After ->
Alcotest.fail "Merge completed despite concurrent close"
in
let merge_promise : _ Index.async =
Expand Down

0 comments on commit a973d87

Please sign in to comment.