Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RO syncs #1008

Merged
merged 4 commits into from
Jul 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
### Unreleased

#### Changed
- **irmin-pack**:
- `sync` has to be called by the read-only instance to synchronise with the
files on disk. (#1008, @icristescu)
- Renamed `sync` to `flush` for the operation that flushes to disk all buffers
of a read-write instance. (#1008, @icristescu)

### 2.2.0 (2020-06-26)

#### Added
Expand Down
4 changes: 4 additions & 0 deletions irmin-pack.opam
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,8 @@ depends: [
"alcotest-lwt" {with-test}
]

pin-depends: [
"index.dev" "git+https://github.com/mirage/index#9717194feb57c27cac9ad993aaf24a64dc5b64a3"
]

synopsis: "Irmin backend which stores values in a pack file"
10 changes: 5 additions & 5 deletions src/irmin-pack/IO.ml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ module type S = sig

val version : t -> string

val sync : t -> unit
val flush : t -> unit

val close : t -> unit
end
Expand Down Expand Up @@ -71,9 +71,9 @@ module Unix : S = struct

let header = 16L (* offset + version *)

let sync t =
let flush t =
if t.readonly then raise RO_Not_Allowed;
Log.debug (fun l -> l "IO sync %s" t.file);
Log.debug (fun l -> l "IO flush %s" t.file);
let buf = Buffer.contents t.buf in
let offset = t.offset in
Buffer.clear t.buf;
Expand All @@ -96,11 +96,11 @@ module Unix : S = struct
Buffer.add_string t.buf buf;
let len = Int64.of_int (String.length buf) in
t.offset <- t.offset ++ len;
if t.offset -- t.flushed > auto_flush_limit then sync t
if t.offset -- t.flushed > auto_flush_limit then flush t

let set t ~off buf =
if t.readonly then raise RO_Not_Allowed;
sync t;
flush t;
Raw.unsafe_write t.raw ~off:(header ++ off) buf;
let len = Int64.of_int (String.length buf) in
let off = header ++ off ++ len in
Expand Down
2 changes: 1 addition & 1 deletion src/irmin-pack/IO.mli
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ module type S = sig

val version : t -> string

val sync : t -> unit
val flush : t -> unit

val close : t -> unit
end
Expand Down
4 changes: 4 additions & 0 deletions src/irmin-pack/closeable.ml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ module Pack (S : Pack.S) = struct
check_not_closed t;
S.unsafe_find t.t k

let flush ?index t =
check_not_closed t;
S.flush ?index t.t

let sync t =
check_not_closed t;
S.sync t.t
Expand Down
12 changes: 8 additions & 4 deletions src/irmin-pack/dict.ml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ module type S = sig

val index : t -> string -> int option

val flush : t -> unit

val sync : t -> unit

val v : ?fresh:bool -> ?readonly:bool -> ?capacity:int -> string -> t
Expand Down Expand Up @@ -78,11 +80,14 @@ module Make (IO : IO.S) : S = struct
let log_offset = IO.force_offset t.io in
if log_offset > former_log_offset then refill ~from:former_log_offset t

let sync t = IO.sync t.io
let sync t =
if IO.readonly t.io then sync_offset t
else invalid_arg "only a readonly instance should call this function"

let flush t = IO.flush t.io

let index t v =
Log.debug (fun l -> l "[dict] index %S" v);
if IO.readonly t.io then sync_offset t;
try Some (Hashtbl.find t.cache v)
with Not_found ->
let id = Hashtbl.length t.cache in
Expand All @@ -95,7 +100,6 @@ module Make (IO : IO.S) : S = struct
Some id )

let find t id =
if IO.readonly t.io then sync_offset t;
Log.debug (fun l -> l "[dict] find %d" id);
let v = try Some (Hashtbl.find t.index id) with Not_found -> None in
v
Expand All @@ -116,7 +120,7 @@ module Make (IO : IO.S) : S = struct
let close t =
t.open_instances <- t.open_instances - 1;
if t.open_instances = 0 then (
if not (IO.readonly t.io) then sync t;
if not (IO.readonly t.io) then flush t;
IO.close t.io;
Hashtbl.reset t.cache;
Hashtbl.reset t.index )
Expand Down
2 changes: 2 additions & 0 deletions src/irmin-pack/dict.mli
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ module type S = sig

val index : t -> string -> int option

val flush : t -> unit

val sync : t -> unit

val v : ?fresh:bool -> ?readonly:bool -> ?capacity:int -> string -> t
Expand Down
4 changes: 4 additions & 0 deletions src/irmin-pack/inode.ml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ module type S = sig
offset:int64 -> length:int -> key -> 'a t -> (unit, integrity_error) result

val close : 'a t -> unit Lwt.t

val sync : 'a t -> unit
end

module type CONFIG = sig
Expand Down Expand Up @@ -800,4 +802,6 @@ struct
let integrity_check = Inode.integrity_check

let close = Inode.close

let sync = Inode.sync
end
2 changes: 2 additions & 0 deletions src/irmin-pack/inode.mli
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ module type S = sig
offset:int64 -> length:int -> key -> 'a t -> (unit, integrity_error) result

val close : 'a t -> unit Lwt.t

val sync : 'a t -> unit
end

module Make
Expand Down
34 changes: 32 additions & 2 deletions src/irmin-pack/irmin_pack.ml
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ module Atomic_write (K : Irmin.Type.S) (V : Irmin.Hash.S) = struct
if t.open_instances = 0 then (
Tbl.reset t.index;
Tbl.reset t.cache;
if not (IO.readonly t.block) then IO.sync t.block;
if not (IO.readonly t.block) then IO.flush t.block;
IO.close t.block;
W.clear t.w )
else Lwt.return_unit
Expand All @@ -297,6 +297,20 @@ end

module type CONFIG = Inode.CONFIG

module type Stores_extra = sig
type repo

val integrity_check :
?ppf:Format.formatter ->
auto_repair:bool ->
repo ->
( [> `Fixed of int | `No_error ],
[> `Cannot_fix of string | `Corrupted of int ] )
result

val sync : repo -> unit
end

module Make_ext
(Config : CONFIG)
(M : Irmin.Metadata.S)
Expand Down Expand Up @@ -436,18 +450,32 @@ struct
let lru_size = lru_size config in
let readonly = readonly config in
let log_size = index_log_size config in
let index = Index.v ~fresh ~readonly ~log_size root in
let f = ref (fun () -> ()) in
let index =
Index.v
~auto_flush_callback:(fun () -> !f ())
(* backpatching to add pack flush before an index flush *)
~fresh ~readonly ~log_size root
in
Contents.CA.v ~fresh ~readonly ~lru_size ~index root >>= fun contents ->
Node.CA.v ~fresh ~readonly ~lru_size ~index root >>= fun node ->
Commit.CA.v ~fresh ~readonly ~lru_size ~index root >>= fun commit ->
Branch.v ~fresh ~readonly root >|= fun branch ->
(* Stores share instances in memory, one flush is enough. In case of a
system crash, the auto_flush_callback might not make with the disk.
In this case, when the store is reopened, [integrity_check] needs to
be called to repair the store. *)
(f := fun () -> Contents.CA.flush ~index:false contents);
{ contents; node; commit; branch; config; index }

let close t =
Index.close t.index;
Contents.CA.close (contents_t t) >>= fun () ->
Node.CA.close (snd (node_t t)) >>= fun () ->
Commit.CA.close (snd (commit_t t)) >>= fun () -> Branch.close t.branch

(** stores share instances in memory, one sync is enough *)
let sync t = Contents.CA.sync (contents_t t)
end
end

Expand Down Expand Up @@ -517,6 +545,8 @@ struct
else Error (`Corrupted (!nb_corrupted + !nb_absent)) )

include Irmin.Of_private (X)

let sync = X.Repo.sync
end

module Hash = Irmin.Hash.BLAKE2B
Expand Down
44 changes: 22 additions & 22 deletions src/irmin-pack/irmin_pack.mli
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,26 @@ module type CONFIG = sig
val stable_hash : int
end

module type Stores_extra = sig
type repo

val integrity_check :
?ppf:Format.formatter ->
auto_repair:bool ->
repo ->
( [> `Fixed of int | `No_error ],
[> `Cannot_fix of string | `Corrupted of int ] )
result
(** Checks the integrity of the repository. if [auto_repair] is [true], will
also try to fix the issues. [ppf] is a formatter for progressive
reporting. [`Fixed] and [`Corrupted] report the number of fixed/corrupted
entries. *)

val sync : repo -> unit
(** [sync t] syncs a readonly pack with the files on disk. Raises
[invalid_argument] if called by a read-write pack.*)
end

module Make_ext
(Config : CONFIG)
(Metadata : Irmin.Metadata.S)
Expand All @@ -57,17 +77,7 @@ module Make_ext
and type Key.step = Path.step
and type Private.Sync.endpoint = unit

val integrity_check :
?ppf:Format.formatter ->
auto_repair:bool ->
repo ->
( [> `Fixed of int | `No_error ],
[> `Cannot_fix of string | `Corrupted of int ] )
result
(** Checks the integrity of the repository. if [auto_repair] is [true], will
also try to fix the issues. [ppf] is a formatter for progressive
reporting. [`Fixed] and [`Corrupted] report the number of fixed/corrupted
entries. *)
include Stores_extra with type repo := repo
end

module Make
Expand All @@ -87,17 +97,7 @@ module Make
and type hash = H.t
and type Private.Sync.endpoint = unit

val integrity_check :
?ppf:Format.formatter ->
auto_repair:bool ->
repo ->
( [> `Fixed of int | `No_error ],
[> `Cannot_fix of string | `Corrupted of int ] )
result
(** Checks the integrity of the repository. if [auto_repair] is [true], will
also try to fix the issues. [ppf] is a formatter for progressive
reporting. [`Fixed] and [`Corrupted] report the number of fixed/corrupted
entries. *)
include Stores_extra with type repo := repo
end

module KV (Config : CONFIG) : Irmin.KV_MAKER
Expand Down
24 changes: 15 additions & 9 deletions src/irmin-pack/pack.ml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ module type S = sig

val unsafe_find : 'a t -> key -> value option

val flush : ?index:bool -> 'a t -> unit

val sync : 'a t -> unit

type integrity_error = [ `Wrong_hash | `Absent_value ]
Expand Down Expand Up @@ -149,7 +151,7 @@ struct
let close t =
t.open_instances <- t.open_instances - 1;
if t.open_instances = 0 then (
if not (IO.readonly t.block) then IO.sync t.block;
if not (IO.readonly t.block) then IO.flush t.block;
IO.close t.block;
Dict.close t.dict )

Expand Down Expand Up @@ -187,6 +189,12 @@ struct
true )
else false

let flush ?(index = true) t =
Dict.flush t.pack.dict;
IO.flush t.pack.block;
if index then Index.flush t.pack.index;
Tbl.clear t.staging

let unsafe_v_no_cache ~fresh ~readonly ~lru_size ~index root =
let pack = v index ~fresh ~readonly root in
let staging = Tbl.create 127 in
Expand Down Expand Up @@ -281,12 +289,6 @@ struct

let cast t = (t :> [ `Read | `Write ] t)

let sync t =
Dict.sync t.pack.dict;
IO.sync t.pack.block;
Index.flush t.pack.index;
Tbl.clear t.staging

type integrity_error = [ `Wrong_hash | `Absent_value ]

let integrity_check ~offset ~length k t =
Expand All @@ -301,7 +303,7 @@ struct
f (cast t) >>= fun r ->
if Tbl.length t.staging = 0 then Lwt.return r
else (
sync t;
flush t;
Lwt.return r )

let auto_flush = 1024
Expand All @@ -325,7 +327,7 @@ struct
V.encode_bin ~offset ~dict v k (IO.append t.pack.block);
let len = Int64.to_int (IO.offset t.pack.block -- off) in
Index.add t.pack.index k (off, len, V.magic v);
if Tbl.length t.staging >= auto_flush then sync t
if Tbl.length t.staging >= auto_flush then flush t
else Tbl.add t.staging k v;
Lru.add t.lru k v

Expand All @@ -352,5 +354,9 @@ struct
Lwt_mutex.with_lock t.pack.lock (fun () ->
unsafe_close t;
Lwt.return_unit)

let sync t =
Dict.sync t.pack.dict;
Index.sync t.pack.index
end
end
2 changes: 2 additions & 0 deletions src/irmin-pack/pack.mli
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ module type S = sig

val unsafe_find : 'a t -> key -> value option

val flush : ?index:bool -> 'a t -> unit

val sync : 'a t -> unit

type integrity_error = [ `Wrong_hash | `Absent_value ]
Expand Down
6 changes: 5 additions & 1 deletion test/irmin-pack/common.ml
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,12 @@ struct

let get_pack () =
let name = fresh_name "dict" in
let index = Index.v ~log_size ~fresh:true name in
let f = ref (fun () -> ()) in
let index =
Index.v ~auto_flush_callback:(fun () -> !f ()) ~log_size ~fresh:true name
in
Pack.v ~fresh:true ~lru_size:0 ~index name >|= fun pack ->
(f := fun () -> Pack.flush ~index:false pack);
let clone_pack ~readonly = Pack.v ~fresh:false ~readonly ~index name in
let clone_index_pack ~readonly =
let index = Index.v ~log_size ~fresh:false ~readonly name in
Expand Down
Loading