diff --git a/CHANGES.md b/CHANGES.md index 650fe6e363..6e972821a3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,12 @@ +### Unreleased + +#### Changed +- **irmin-pack**: + - `sync` has to be called by the read-only instance to synchronise with the + files on disk. (#1008, @icristescu) + - Renamed `sync` to `flush` for the operation that flushes to disk all buffers + of a read-write instance. (#1008, @icristescu) + ### 2.2.0 (2020-06-26) #### Added diff --git a/irmin-pack.opam b/irmin-pack.opam index 52dbd9c3c0..b7c18d68da 100644 --- a/irmin-pack.opam +++ b/irmin-pack.opam @@ -22,4 +22,8 @@ depends: [ "alcotest-lwt" {with-test} ] +pin-depends: [ + "index.dev" "git+https://github.com/mirage/index#9717194feb57c27cac9ad993aaf24a64dc5b64a3" +] + synopsis: "Irmin backend which stores values in a pack file" diff --git a/src/irmin-pack/IO.ml b/src/irmin-pack/IO.ml index 9cbd4624b4..1779bc46ce 100644 --- a/src/irmin-pack/IO.ml +++ b/src/irmin-pack/IO.ml @@ -43,7 +43,7 @@ module type S = sig val version : t -> string - val sync : t -> unit + val flush : t -> unit val close : t -> unit end @@ -71,9 +71,9 @@ module Unix : S = struct let header = 16L (* offset + version *) - let sync t = + let flush t = if t.readonly then raise RO_Not_Allowed; - Log.debug (fun l -> l "IO sync %s" t.file); + Log.debug (fun l -> l "IO flush %s" t.file); let buf = Buffer.contents t.buf in let offset = t.offset in Buffer.clear t.buf; @@ -96,11 +96,11 @@ module Unix : S = struct Buffer.add_string t.buf buf; let len = Int64.of_int (String.length buf) in t.offset <- t.offset ++ len; - if t.offset -- t.flushed > auto_flush_limit then sync t + if t.offset -- t.flushed > auto_flush_limit then flush t let set t ~off buf = if t.readonly then raise RO_Not_Allowed; - sync t; + flush t; Raw.unsafe_write t.raw ~off:(header ++ off) buf; let len = Int64.of_int (String.length buf) in let off = header ++ off ++ len in diff --git a/src/irmin-pack/IO.mli b/src/irmin-pack/IO.mli index cbeaf07b7f..820bb8ebda 100644 --- a/src/irmin-pack/IO.mli +++ b/src/irmin-pack/IO.mli @@ -39,7 +39,7 @@ module type S = sig val version : t -> string - val sync : t -> unit + val flush : t -> unit val close : t -> unit end diff --git a/src/irmin-pack/closeable.ml b/src/irmin-pack/closeable.ml index 95d6dcbc34..4cbe13e926 100644 --- a/src/irmin-pack/closeable.ml +++ b/src/irmin-pack/closeable.ml @@ -65,6 +65,10 @@ module Pack (S : Pack.S) = struct check_not_closed t; S.unsafe_find t.t k + let flush ?index t = + check_not_closed t; + S.flush ?index t.t + let sync t = check_not_closed t; S.sync t.t diff --git a/src/irmin-pack/dict.ml b/src/irmin-pack/dict.ml index 20a43e5339..eaea121ebb 100644 --- a/src/irmin-pack/dict.ml +++ b/src/irmin-pack/dict.ml @@ -30,6 +30,8 @@ module type S = sig val index : t -> string -> int option + val flush : t -> unit + val sync : t -> unit val v : ?fresh:bool -> ?readonly:bool -> ?capacity:int -> string -> t @@ -78,11 +80,14 @@ module Make (IO : IO.S) : S = struct let log_offset = IO.force_offset t.io in if log_offset > former_log_offset then refill ~from:former_log_offset t - let sync t = IO.sync t.io + let sync t = + if IO.readonly t.io then sync_offset t + else invalid_arg "only a readonly instance should call this function" + + let flush t = IO.flush t.io let index t v = Log.debug (fun l -> l "[dict] index %S" v); - if IO.readonly t.io then sync_offset t; try Some (Hashtbl.find t.cache v) with Not_found -> let id = Hashtbl.length t.cache in @@ -95,7 +100,6 @@ module Make (IO : IO.S) : S = struct Some id ) let find t id = - if IO.readonly t.io then sync_offset t; Log.debug (fun l -> l "[dict] find %d" id); let v = try Some (Hashtbl.find t.index id) with Not_found -> None in v @@ -116,7 +120,7 @@ module Make (IO : IO.S) : S = struct let close t = t.open_instances <- t.open_instances - 1; if t.open_instances = 0 then ( - if not (IO.readonly t.io) then sync t; + if not (IO.readonly t.io) then flush t; IO.close t.io; Hashtbl.reset t.cache; Hashtbl.reset t.index ) diff --git a/src/irmin-pack/dict.mli b/src/irmin-pack/dict.mli index 6f8748f0b7..9c2fe98596 100644 --- a/src/irmin-pack/dict.mli +++ b/src/irmin-pack/dict.mli @@ -21,6 +21,8 @@ module type S = sig val index : t -> string -> int option + val flush : t -> unit + val sync : t -> unit val v : ?fresh:bool -> ?readonly:bool -> ?capacity:int -> string -> t diff --git a/src/irmin-pack/inode.ml b/src/irmin-pack/inode.ml index 6be64bd0a3..6c0f5d4bf6 100644 --- a/src/irmin-pack/inode.ml +++ b/src/irmin-pack/inode.ml @@ -46,6 +46,8 @@ module type S = sig offset:int64 -> length:int -> key -> 'a t -> (unit, integrity_error) result val close : 'a t -> unit Lwt.t + + val sync : 'a t -> unit end module type CONFIG = sig @@ -800,4 +802,6 @@ struct let integrity_check = Inode.integrity_check let close = Inode.close + + let sync = Inode.sync end diff --git a/src/irmin-pack/inode.mli b/src/irmin-pack/inode.mli index ab4f291ac5..cbae08627f 100644 --- a/src/irmin-pack/inode.mli +++ b/src/irmin-pack/inode.mli @@ -45,6 +45,8 @@ module type S = sig offset:int64 -> length:int -> key -> 'a t -> (unit, integrity_error) result val close : 'a t -> unit Lwt.t + + val sync : 'a t -> unit end module Make diff --git a/src/irmin-pack/irmin_pack.ml b/src/irmin-pack/irmin_pack.ml index d75c9d43b4..30cd3c7483 100644 --- a/src/irmin-pack/irmin_pack.ml +++ b/src/irmin-pack/irmin_pack.ml @@ -287,7 +287,7 @@ module Atomic_write (K : Irmin.Type.S) (V : Irmin.Hash.S) = struct if t.open_instances = 0 then ( Tbl.reset t.index; Tbl.reset t.cache; - if not (IO.readonly t.block) then IO.sync t.block; + if not (IO.readonly t.block) then IO.flush t.block; IO.close t.block; W.clear t.w ) else Lwt.return_unit @@ -297,6 +297,20 @@ end module type CONFIG = Inode.CONFIG +module type Stores_extra = sig + type repo + + val integrity_check : + ?ppf:Format.formatter -> + auto_repair:bool -> + repo -> + ( [> `Fixed of int | `No_error ], + [> `Cannot_fix of string | `Corrupted of int ] ) + result + + val sync : repo -> unit +end + module Make_ext (Config : CONFIG) (M : Irmin.Metadata.S) @@ -436,11 +450,22 @@ struct let lru_size = lru_size config in let readonly = readonly config in let log_size = index_log_size config in - let index = Index.v ~fresh ~readonly ~log_size root in + let f = ref (fun () -> ()) in + let index = + Index.v + ~auto_flush_callback:(fun () -> !f ()) + (* backpatching to add pack flush before an index flush *) + ~fresh ~readonly ~log_size root + in Contents.CA.v ~fresh ~readonly ~lru_size ~index root >>= fun contents -> Node.CA.v ~fresh ~readonly ~lru_size ~index root >>= fun node -> Commit.CA.v ~fresh ~readonly ~lru_size ~index root >>= fun commit -> Branch.v ~fresh ~readonly root >|= fun branch -> + (* Stores share instances in memory, one flush is enough. In case of a + system crash, the auto_flush_callback might not make with the disk. + In this case, when the store is reopened, [integrity_check] needs to + be called to repair the store. *) + (f := fun () -> Contents.CA.flush ~index:false contents); { contents; node; commit; branch; config; index } let close t = @@ -448,6 +473,9 @@ struct Contents.CA.close (contents_t t) >>= fun () -> Node.CA.close (snd (node_t t)) >>= fun () -> Commit.CA.close (snd (commit_t t)) >>= fun () -> Branch.close t.branch + + (** stores share instances in memory, one sync is enough *) + let sync t = Contents.CA.sync (contents_t t) end end @@ -517,6 +545,8 @@ struct else Error (`Corrupted (!nb_corrupted + !nb_absent)) ) include Irmin.Of_private (X) + + let sync = X.Repo.sync end module Hash = Irmin.Hash.BLAKE2B diff --git a/src/irmin-pack/irmin_pack.mli b/src/irmin-pack/irmin_pack.mli index f11818857a..c0d09f24b2 100644 --- a/src/irmin-pack/irmin_pack.mli +++ b/src/irmin-pack/irmin_pack.mli @@ -34,6 +34,26 @@ module type CONFIG = sig val stable_hash : int end +module type Stores_extra = sig + type repo + + val integrity_check : + ?ppf:Format.formatter -> + auto_repair:bool -> + repo -> + ( [> `Fixed of int | `No_error ], + [> `Cannot_fix of string | `Corrupted of int ] ) + result + (** Checks the integrity of the repository. if [auto_repair] is [true], will + also try to fix the issues. [ppf] is a formatter for progressive + reporting. [`Fixed] and [`Corrupted] report the number of fixed/corrupted + entries. *) + + val sync : repo -> unit + (** [sync t] syncs a readonly pack with the files on disk. Raises + [invalid_argument] if called by a read-write pack.*) +end + module Make_ext (Config : CONFIG) (Metadata : Irmin.Metadata.S) @@ -57,17 +77,7 @@ module Make_ext and type Key.step = Path.step and type Private.Sync.endpoint = unit - val integrity_check : - ?ppf:Format.formatter -> - auto_repair:bool -> - repo -> - ( [> `Fixed of int | `No_error ], - [> `Cannot_fix of string | `Corrupted of int ] ) - result - (** Checks the integrity of the repository. if [auto_repair] is [true], will - also try to fix the issues. [ppf] is a formatter for progressive - reporting. [`Fixed] and [`Corrupted] report the number of fixed/corrupted - entries. *) + include Stores_extra with type repo := repo end module Make @@ -87,17 +97,7 @@ module Make and type hash = H.t and type Private.Sync.endpoint = unit - val integrity_check : - ?ppf:Format.formatter -> - auto_repair:bool -> - repo -> - ( [> `Fixed of int | `No_error ], - [> `Cannot_fix of string | `Corrupted of int ] ) - result - (** Checks the integrity of the repository. if [auto_repair] is [true], will - also try to fix the issues. [ppf] is a formatter for progressive - reporting. [`Fixed] and [`Corrupted] report the number of fixed/corrupted - entries. *) + include Stores_extra with type repo := repo end module KV (Config : CONFIG) : Irmin.KV_MAKER diff --git a/src/irmin-pack/pack.ml b/src/irmin-pack/pack.ml index 08781003d6..73bba6e6c9 100644 --- a/src/irmin-pack/pack.ml +++ b/src/irmin-pack/pack.ml @@ -64,6 +64,8 @@ module type S = sig val unsafe_find : 'a t -> key -> value option + val flush : ?index:bool -> 'a t -> unit + val sync : 'a t -> unit type integrity_error = [ `Wrong_hash | `Absent_value ] @@ -149,7 +151,7 @@ struct let close t = t.open_instances <- t.open_instances - 1; if t.open_instances = 0 then ( - if not (IO.readonly t.block) then IO.sync t.block; + if not (IO.readonly t.block) then IO.flush t.block; IO.close t.block; Dict.close t.dict ) @@ -187,6 +189,12 @@ struct true ) else false + let flush ?(index = true) t = + Dict.flush t.pack.dict; + IO.flush t.pack.block; + if index then Index.flush t.pack.index; + Tbl.clear t.staging + let unsafe_v_no_cache ~fresh ~readonly ~lru_size ~index root = let pack = v index ~fresh ~readonly root in let staging = Tbl.create 127 in @@ -281,12 +289,6 @@ struct let cast t = (t :> [ `Read | `Write ] t) - let sync t = - Dict.sync t.pack.dict; - IO.sync t.pack.block; - Index.flush t.pack.index; - Tbl.clear t.staging - type integrity_error = [ `Wrong_hash | `Absent_value ] let integrity_check ~offset ~length k t = @@ -301,7 +303,7 @@ struct f (cast t) >>= fun r -> if Tbl.length t.staging = 0 then Lwt.return r else ( - sync t; + flush t; Lwt.return r ) let auto_flush = 1024 @@ -325,7 +327,7 @@ struct V.encode_bin ~offset ~dict v k (IO.append t.pack.block); let len = Int64.to_int (IO.offset t.pack.block -- off) in Index.add t.pack.index k (off, len, V.magic v); - if Tbl.length t.staging >= auto_flush then sync t + if Tbl.length t.staging >= auto_flush then flush t else Tbl.add t.staging k v; Lru.add t.lru k v @@ -352,5 +354,9 @@ struct Lwt_mutex.with_lock t.pack.lock (fun () -> unsafe_close t; Lwt.return_unit) + + let sync t = + Dict.sync t.pack.dict; + Index.sync t.pack.index end end diff --git a/src/irmin-pack/pack.mli b/src/irmin-pack/pack.mli index 2bbdf53346..761e12fba4 100644 --- a/src/irmin-pack/pack.mli +++ b/src/irmin-pack/pack.mli @@ -56,6 +56,8 @@ module type S = sig val unsafe_find : 'a t -> key -> value option + val flush : ?index:bool -> 'a t -> unit + val sync : 'a t -> unit type integrity_error = [ `Wrong_hash | `Absent_value ] diff --git a/test/irmin-pack/common.ml b/test/irmin-pack/common.ml index ca2dd7f6ec..a7ca39db2a 100644 --- a/test/irmin-pack/common.ml +++ b/test/irmin-pack/common.ml @@ -68,8 +68,12 @@ struct let get_pack () = let name = fresh_name "dict" in - let index = Index.v ~log_size ~fresh:true name in + let f = ref (fun () -> ()) in + let index = + Index.v ~auto_flush_callback:(fun () -> !f ()) ~log_size ~fresh:true name + in Pack.v ~fresh:true ~lru_size:0 ~index name >|= fun pack -> + (f := fun () -> Pack.flush ~index:false pack); let clone_pack ~readonly = Pack.v ~fresh:false ~readonly ~index name in let clone_index_pack ~readonly = let index = Index.v ~log_size ~fresh:false ~readonly name in diff --git a/test/irmin-pack/multiple_instances.ml b/test/irmin-pack/multiple_instances.ml index 5ee5b4461a..689bf0f33c 100644 --- a/test/irmin-pack/multiple_instances.ml +++ b/test/irmin-pack/multiple_instances.ml @@ -45,8 +45,57 @@ let open_ro_after_rw_closed () = Alcotest.(check (option string)) "RO find" (Some "x") x; S.Repo.close ro +let ro_sync_after_add () = + let check ro c k v = + S.Commit.of_hash ro (S.Commit.hash c) >>= function + | None -> Alcotest.failf "commit not found" + | Some commit -> + let tree = S.Commit.tree commit in + S.Tree.find tree [ k ] >|= fun x -> + Alcotest.(check (option string)) "RO find" (Some v) x + in + rm_dir root; + S.Repo.v (config ~readonly:false ~fresh:true root) >>= fun rw -> + S.Repo.v (config ~readonly:true ~fresh:false root) >>= fun ro -> + S.Tree.add S.Tree.empty [ "a" ] "x" >>= fun tree -> + S.Commit.v rw ~parents:[] ~info:(info ()) tree >>= fun c1 -> + S.sync ro; + check ro c1 "a" "x" >>= fun () -> + S.Tree.add S.Tree.empty [ "a" ] "y" >>= fun tree -> + S.Commit.v rw ~parents:[] ~info:(info ()) tree >>= fun c2 -> + check ro c1 "a" "x" >>= fun () -> + (S.Commit.of_hash ro (S.Commit.hash c2) >|= function + | None -> () + | Some _ -> Alcotest.failf "should not find branch by") + >>= fun () -> + S.sync ro; + check ro c2 "a" "y" >>= fun () -> + S.Repo.close ro >>= fun () -> S.Repo.close rw + +let ro_sync_after_close () = + let check ro c k v = + S.Commit.of_hash ro (S.Commit.hash c) >>= function + | None -> Alcotest.failf "commit not found" + | Some commit -> + let tree = S.Commit.tree commit in + S.Tree.find tree [ k ] >|= fun x -> + Alcotest.(check (option string)) "RO find" (Some v) x + in + rm_dir root; + S.Repo.v (config ~readonly:false ~fresh:true root) >>= fun rw -> + S.Repo.v (config ~readonly:true ~fresh:false root) >>= fun ro -> + S.Tree.add S.Tree.empty [ "a" ] "x" >>= fun tree -> + S.Commit.v rw ~parents:[] ~info:(info ()) tree >>= fun c1 -> + S.Repo.close rw >>= fun () -> + S.sync ro; + check ro c1 "a" "x" >>= fun () -> S.Repo.close ro + let tests = [ Alcotest.test_case "Test open ro after rw closed" `Quick (fun () -> Lwt_main.run (open_ro_after_rw_closed ())); + Alcotest.test_case "Test ro sync after add" `Quick (fun () -> + Lwt_main.run (ro_sync_after_add ())); + Alcotest.test_case "Test ro sync after close" `Quick (fun () -> + Lwt_main.run (ro_sync_after_close ())); ] diff --git a/test/irmin-pack/test_pack.ml b/test/irmin-pack/test_pack.ml index 7f891dcd44..6bde73210b 100644 --- a/test/irmin-pack/test_pack.ml +++ b/test/irmin-pack/test_pack.ml @@ -110,7 +110,8 @@ module Dict = struct ignore_int (Dict.index dict "toto"); ignore_int (Dict.index dict "titiabc"); ignore_int (Dict.index dict "foo"); - Dict.sync dict; + Dict.flush dict; + Dict.sync r; check_index "titiabc" 3; check_index "bar" 1; check_index "toto" 2; @@ -119,7 +120,8 @@ module Dict = struct ignore_int (Dict.index dict "hello"); check_raise "hello"; check_none "hello" 4; - Dict.sync dict; + Dict.flush dict; + Dict.sync r; check_find "hello" 4; Dict.close dict; Dict.close r @@ -175,7 +177,8 @@ module Pack = struct adds [ (h1, x1); (h2, x2) ]; Pack.find r h2 >>= fun y2 -> Alcotest.(check (option string)) "before sync" None y2; - Pack.sync w; + Pack.flush w; + Pack.sync r; Pack.find r h2 >>= fun y2 -> Alcotest.(check (option string)) "after sync" (Some x2) y2; let x3 = "otoo" in @@ -183,7 +186,8 @@ module Pack = struct let h3 = sha1 x3 in let h4 = sha1 x4 in adds [ (h3, x3); (h4, x4) ]; - Pack.sync w; + Pack.flush w; + Pack.sync r; Pack.find r h2 >>= fun y2 -> Alcotest.(check (option string)) "y2" (Some x2) y2; Pack.find r h3 >>= fun y3 -> @@ -212,7 +216,7 @@ module Pack = struct let x1 = "foo" in let h1 = sha1 x1 in Pack.unsafe_append w h1 x1; - Pack.sync w; + Pack.flush w; Index.close t.index; Pack.close w >>= fun () -> (*open and close in ro*) @@ -280,6 +284,66 @@ module Pack = struct Alcotest.fail "Add after closing the index should not be allowed") (function I.Closed -> Lwt.return_unit | exn -> Lwt.fail exn) + (** Index can be flushed to disk independently of pack, we simulate this in + the tests using [Index.filter] and [Index.flush]. Regression test for PR + 1008 in which values were indexed before being reachable in pack. *) + let readonly_sync_index_flush () = + Context.get_pack () >>= fun t -> + t.clone_index_pack ~readonly:true >>= fun (i, r) -> + let test w = + let x1 = "foo" in + let h1 = sha1 x1 in + Pack.unsafe_append w h1 x1; + Pack.sync r; + Pack.find r h1 >>= fun y1 -> + Alcotest.(check (option string)) "sync before filter" None y1; + Index.filter t.index (fun _ -> true); + Pack.sync r; + Pack.find r h1 >>= fun y1 -> + Alcotest.(check (option string)) "sync after filter" (Some x1) y1; + let x2 = "foo" in + let h2 = sha1 x2 in + Pack.unsafe_append w h2 x2; + Index.flush t.index; + Pack.find r h2 >|= fun y2 -> + Alcotest.(check (option string)) "sync after flush" (Some x2) y2 + in + test t.pack >>= fun () -> + Context.close t.index t.pack >>= fun () -> Context.close i r + + let readonly_find_index_flush () = + Context.get_pack () >>= fun t -> + t.clone_index_pack ~readonly:true >>= fun (i, r) -> + let check h x msg = + Pack.find r h >|= fun y -> Alcotest.(check (option string)) msg (Some x) y + in + let test w = + let x1 = "foo" in + let h1 = sha1 x1 in + Pack.unsafe_append w h1 x1; + Pack.flush t.pack; + Pack.sync r; + check h1 x1 "find before filter" >>= fun () -> + Index.filter t.index (fun _ -> true); + check h1 x1 "find after filter" >>= fun () -> + let x2 = "bar" in + let h2 = sha1 x2 in + Pack.unsafe_append w h2 x2; + Pack.flush t.pack; + Pack.sync r; + check h2 x2 "find before flush" >>= fun () -> + let x3 = "toto" in + let h3 = sha1 x3 in + Pack.unsafe_append w h3 x3; + Index.flush t.index; + check h2 x2 "find after flush" >>= fun () -> + Pack.flush t.pack; + Pack.sync r; + check h3 x3 "find after flush new values" + in + test t.pack >>= fun () -> + Context.close t.index t.pack >>= fun () -> Context.close i r + let tests = [ Alcotest.test_case "pack" `Quick (fun () -> Lwt_main.run (test_pack ())); @@ -291,6 +355,10 @@ module Pack = struct Lwt_main.run (test_close_pack ())); Alcotest.test_case "close readonly" `Quick (fun () -> Lwt_main.run (test_close_pack_more ())); + Alcotest.test_case "readonly sync, index flush" `Quick (fun () -> + Lwt_main.run (readonly_sync_index_flush ())); + Alcotest.test_case "readonly find, index flush" `Quick (fun () -> + Lwt_main.run (readonly_find_index_flush ())); ] end