Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RO sync #175

Merged
merged 11 commits into from
Jul 8, 2020
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

- `Index.close` will now abort an ongoing asynchronous merge operation, rather
than waiting for it to finish. (#185)
- `sync` has to be called by the read-only instance to synchronise with the
files on disk. (#175)

## Fixed

Expand Down
212 changes: 112 additions & 100 deletions src/index.ml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ let assert_and_get = function None -> assert false | Some e -> e

exception RO_not_allowed

exception RW_not_allowed

exception Closed

module Make_private
Expand Down Expand Up @@ -105,19 +107,16 @@ struct
Mutex.with_lock t.merge_lock (fun () ->
t.generation <- Int64.succ t.generation;
let log = assert_and_get t.log in
IO.set_generation log.io t.generation;
IO.clear ~keep_generation:true log.io;
IO.clear ~generation:t.generation log.io;
Tbl.clear log.mem;
may
(fun l ->
IO.set_generation l.io t.generation;
IO.clear ~keep_generation:true l.io;
IO.clear ~generation:t.generation log.io;
IO.close l.io)
t.log_async;
may
(fun (i : index) ->
IO.set_generation i.io t.generation;
IO.clear ~keep_generation:true i.io;
IO.clear ~generation:t.generation i.io;
IO.close i.io)
t.index;
t.index <- None;
Expand Down Expand Up @@ -219,6 +218,82 @@ struct
Int64.of_float rounded
end)

let try_load_log t path =
Log.debug (fun l ->
l "[%s] checking on-disk %s file" (Filename.basename t.root)
(Filename.basename path));
if Sys.file_exists path then (
let io =
IO.v ~fresh:false ~readonly:true ~generation:0L ~fan_size:0L path
in
let mem = Tbl.create 0 in
iter_io (fun e -> Tbl.replace mem e.key e.value) io;
Some { io; mem } )
else None

let sync_log ?(hook = fun _ -> ()) t =
Log.debug (fun l ->
l "[%s] checking for changes on disk" (Filename.basename t.root));
let no_changes () =
Log.debug (fun l ->
l "[%s] no changes detected" (Filename.basename t.root))
in
let add_log_entry log e = Tbl.replace log.mem e.key e.value in
let sync_log_async ?(generation_change = false) () =
match t.log_async with
| None -> t.log_async <- try_load_log t (log_async_path t.root)
| Some log ->
let offset = IO.offset log.io in
let new_offset = IO.force_offset log.io in
if generation_change || offset <> new_offset then (
Tbl.clear log.mem;
iter_io (add_log_entry log) log.io )
else ()
in
( match t.log with
| None -> t.log <- try_load_log t (log_path t.root)
| Some _ -> () );
match t.log with
| None -> sync_log_async ()
| Some log ->
let log_offset = IO.offset log.io in
hook `Before_offset_read;
let IO.Header.{ generation; offset = new_log_offset } =
IO.Header.get_header log.io
in
let add_log_entry e = add_log_entry log e in
sync_log_async ~generation_change:(t.generation <> generation) ();
if t.generation <> generation then (
Log.debug (fun l ->
l "[%s] generation has changed, reading log and index from disk"
(Filename.basename t.root));
t.generation <- generation;
Tbl.clear log.mem;
iter_io add_log_entry log.io;
may (fun (i : index) -> IO.close i.io) t.index;
let index_path = index_path t.root in
if not (Sys.file_exists index_path) then t.index <- None
else
let io =
IO.v ~fresh:false ~readonly:true ~generation ~fan_size:0L
index_path
in
let fan_out =
Fan.import ~hash_size:K.hash_size (IO.get_fanout io)
in
if IO.offset io = 0L then t.index <- None
else t.index <- Some { fan_out; io } )
else if log_offset < new_log_offset then (
Log.debug (fun l ->
l "[%s] new entries detected, reading log from disk"
(Filename.basename t.root));
iter_io add_log_entry log.io ~min:log_offset )
else if log_offset > new_log_offset then
(* In that case the log has probably been emptied and is being
refilled with async_log contents. *)
no_changes ()
else no_changes ()

let with_cache ~v ~clear =
let roots = Hashtbl.create 0 in
let f ?auto_flush_callback ?(fresh = false) ?(readonly = false) ~log_size
Expand All @@ -238,6 +313,7 @@ struct
if t.open_instances <> 0 then (
Log.debug (fun l -> l "[%s] found in cache" (Filename.basename root));
t.open_instances <- t.open_instances + 1;
if readonly then sync_log t;
let t = ref (Some t) in
if fresh then clear t;
t )
Expand All @@ -247,6 +323,7 @@ struct
with Not_found ->
let instance = v ?auto_flush_callback ~fresh ~readonly ~log_size root in
Hashtbl.add roots (root, readonly) instance;
if readonly then sync_log instance;
ref (Some instance)
in
`Staged f
Expand Down Expand Up @@ -275,6 +352,9 @@ struct
iter_io (fun e -> Tbl.replace mem e.key e.value) io;
Some { io; mem }
in
let generation =
match log with None -> 0L | Some log -> IO.get_generation log.io
in
let log_async_path = log_async_path root in
(* If we are in readonly mode, the log_async will be read during sync_log so
there is no need to do it here. *)
Expand All @@ -298,12 +378,9 @@ struct
append_key_value log.io e.key e.value)
io;
IO.sync log.io;
IO.clear io)
IO.clear ~generation io)
log;
IO.close io );
let generation =
match log with None -> 0L | Some log -> IO.get_generation log.io
in
let index =
let index_path = index_path root in
if Sys.file_exists index_path then
Expand Down Expand Up @@ -348,80 +425,6 @@ struct
in
Search.interpolation_search (IOArray.v index.io) key ~low ~high

let try_load_log t path =
Log.debug (fun l ->
l "[%s] checking on-disk %s file" (Filename.basename t.root)
(Filename.basename path));
if Sys.file_exists path then (
let io =
IO.v ~fresh:false ~readonly:true ~generation:0L ~fan_size:0L path
in
let mem = Tbl.create 0 in
iter_io (fun e -> Tbl.replace mem e.key e.value) io;
Some { io; mem } )
else None

let sync_log t =
Log.debug (fun l ->
l "[%s] checking for changes on disk" (Filename.basename t.root));
let no_changes () =
Log.debug (fun l ->
l "[%s] no changes detected" (Filename.basename t.root))
in
let add_log_entry log e = Tbl.replace log.mem e.key e.value in
let sync_log_async ?(generation_change = false) () =
match t.log_async with
| None -> t.log_async <- try_load_log t (log_async_path t.root)
| Some log ->
let offset = IO.offset log.io in
let new_offset = IO.force_offset log.io in
if generation_change || offset <> new_offset then (
Tbl.clear log.mem;
iter_io (add_log_entry log) log.io )
else ()
in
( match t.log with
| None -> t.log <- try_load_log t (log_path t.root)
| Some _ -> () );
match t.log with
| None -> sync_log_async ()
| Some log ->
let generation = IO.get_generation log.io in
let log_offset = IO.offset log.io in
let new_log_offset = IO.force_offset log.io in
let add_log_entry e = add_log_entry log e in
sync_log_async ~generation_change:(t.generation <> generation) ();
if t.generation <> generation then (
Log.debug (fun l ->
l "[%s] generation has changed, reading log and index from disk"
(Filename.basename t.root));
t.generation <- generation;
Tbl.clear log.mem;
iter_io add_log_entry log.io;
may (fun (i : index) -> IO.close i.io) t.index;
let index_path = index_path t.root in
if not (Sys.file_exists index_path) then t.index <- None
else
let io =
IO.v ~fresh:false ~readonly:true ~generation ~fan_size:0L
index_path
in
let fan_out =
Fan.import ~hash_size:K.hash_size (IO.get_fanout io)
in
if IO.offset io = 0L then t.index <- None
else t.index <- Some { fan_out; io } )
else if log_offset < new_log_offset then (
Log.debug (fun l ->
l "[%s] new entries detected, reading log from disk"
(Filename.basename t.root));
iter_io add_log_entry log.io ~min:log_offset )
else if log_offset > new_log_offset then
(* In that case the log has probably been emptied and is being
refilled with async_log contents. *)
no_changes ()
else no_changes ()

let find_instance t key =
let find_if_exists ~name ~find db () =
match db with
Expand All @@ -441,16 +444,10 @@ struct
@~ find_if_exists ~name:"index" ~find:interpolation_search t.index
in
Mutex.with_lock t.rename_lock (fun () ->
if t.config.readonly then sync_log t;
find_if_exists ~name:"log_async"
~find:(fun log -> Tbl.find log.mem)
t.log_async
@~ fun () ->
find_log_index @~ fun () ->
if t.config.readonly then (
sync_log t;
find_log_index () )
else raise Not_found)
@~ find_log_index)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case that was handled by this retry is now completely omitted.
That means that it is virtually possible to call ro_sync and have the same interleaving behavior with merge as before, causing the sync_log to not see the disk modifications (that's why we retried before).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and it's tricky, because no test fails (I run them 500 times).

I looked at what interleaving execution was causing this bug,
(#118 (comment)), and this particular bug can be fixed at the end of a merge, by first setting the generation number and then clearing the log. It can cause a ro_sync to refill a log, even if there are no new values, but it cannot lead to a value not found anymore. What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline: we can test this properly with a couple of extra hooks, and we probably should.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the hooks and tested them in commit 8d0f15b.
There is one test that fails in that commit: the RO reads the generation number and the offset to decide if a change occurred. If between the two reads both the generation changed and the log was cleared then RO cannot detect a change (see test 8d0f15b#diff-81aec069a46f58216cc2e7046f8d183dR349).
The solution I propose in 0af8704 is to group generation and offset reads/writes into a single read/write.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch. Your proposed fix sounds good to me.

You may be interested in craigfe@6fe96bd (now on CraigFe/atomic-reads), which was an experiment as part of #177 (comment) to have batch header operations. At the time, it seemed not worth it, but if we're going to have correctness issues with reading headers individually, it may be.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I knew you had this somewhere, but didn't know where :) I added some of your modifications in the commit, and added you as co-author.
I did not add the fan_size in the headers, as it is not necessary for the correctness of this PR.


let find t key =
let t = check_open t in
Expand Down Expand Up @@ -527,7 +524,8 @@ struct
in
(go [@tailcall]) 0L 0 0

let merge ?(blocking = false) ?(filter = fun _ -> true) ?hook ~witness t =
let merge ?(blocking = false) ?(filter = fun _ -> true) ?(hook = fun _ -> ())
~witness t =
let yield () = check_pending_cancel t in
Mutex.lock t.merge_lock;
Log.info (fun l -> l "[%s] merge" (Filename.basename t.root));
Expand All @@ -545,7 +543,7 @@ struct
t.log_async <- Some log_async;

let go () =
may (fun f -> f `Before) hook;
hook `Before;
let log = assert_and_get t.log in
let generation = Int64.succ t.generation in
let log_array =
Expand Down Expand Up @@ -599,10 +597,10 @@ struct
Mutex.with_lock t.rename_lock (fun () ->
IO.rename ~src:merge ~dst:index.io;
t.index <- Some index;
IO.clear ~keep_generation:true log.io;
Tbl.clear log.mem;
IO.set_generation log.io generation;
t.generation <- generation;
IO.clear ~generation log.io;
Tbl.clear log.mem;
hook `After_clear;
let log_async = assert_and_get t.log_async in
Tbl.iter
(fun key value ->
Expand All @@ -611,8 +609,8 @@ struct
log_async.mem;
IO.sync log.io;
t.log_async <- None);
may (fun f -> f `After) hook;
IO.clear log_async.io;
hook `After;
IO.clear ~generation log_async.io;
IO.close log_async.io;
Mutex.unlock t.merge_lock;
`Completed
Expand Down Expand Up @@ -698,7 +696,6 @@ struct
let iter f t =
let t = check_open t in
Log.info (fun l -> l "[%s] iter" (Filename.basename t.root));
if t.config.readonly then sync_log t;
match t.log with
| None -> ()
| Some log ->
Expand Down Expand Up @@ -736,6 +733,17 @@ struct
may (fun lock -> IO.unlock lock) t.writer_lock ))

let close = close' ~hook:(fun _ -> ())

let sync' ?hook t =
let f t =
Stats.incr_nb_sync ();
let t = check_open t in
Log.info (fun l -> l "[%s] ro_sync" (Filename.basename t.root));
if t.config.readonly then sync_log ?hook t else raise RW_not_allowed
in
Stats.sync_with_timer (fun () -> f t)

let sync = sync' ?hook:None
end

module Make = Make_private
Expand All @@ -759,11 +767,15 @@ module Private = struct
val close' : hook:[ `Abort_signalled ] Hook.t -> t -> unit

val force_merge :
?hook:[ `After | `Before ] Hook.t -> t -> [ `Completed | `Aborted ] async
?hook:[ `After | `After_clear | `Before ] Hook.t ->
t ->
[ `Completed | `Aborted ] async

val await : 'a async -> ('a, [ `Async_exn of exn ]) result

val replace_with_timer : ?sampling_interval:int -> t -> key -> value -> unit

val sync' : ?hook:[ `Before_offset_read ] Hook.t -> t -> unit
end

module Make = Make_private
Expand Down
Loading