diff --git a/.gitmodules b/.gitmodules index 35bfcda..0bad1a5 100644 --- a/.gitmodules +++ b/.gitmodules @@ -9,4 +9,7 @@ [submodule "ocaml-dockerfile"] path = ocaml-dockerfile url = https://github.com/ocurrent/ocaml-dockerfile.git - branch = master \ No newline at end of file + branch = master +[submodule "opam-0install-solver"] + path = opam-0install-solver + url = https://github.com/ocaml-opam/opam-0install-solver.git diff --git a/bin/dune b/bin/dune new file mode 100644 index 0000000..422706e --- /dev/null +++ b/bin/dune @@ -0,0 +1,7 @@ +(executable + (name main) + (package solver-service) + (public_name solver-service) + (preprocess + (pps ppx_deriving.std ppx_deriving_yojson)) + (libraries solver-service logs.cli capnp-rpc-unix eio_main dune-build-info logs.fmt logs.threaded fmt.cli fmt.tty)) diff --git a/service/main.ml b/bin/main.ml similarity index 77% rename from service/main.ml rename to bin/main.ml index 6c161bb..071e981 100644 --- a/service/main.ml +++ b/bin/main.ml @@ -1,7 +1,5 @@ -open Solver_service -open Lwt.Syntax -module Service = Service.Make (Opam_repository) -module Worker_process = Internal_worker.Worker_process +open Eio.Std +module Service = Solver_service let pp_timestamp f x = let open Unix in @@ -37,6 +35,7 @@ let setup_log style_renderer level = () let export service ~on:socket = + let open Lwt.Syntax in let restore = Capnp_rpc_net.Restorer.single (Capnp_rpc_net.Restorer.Id.public "solver") @@ -60,38 +59,36 @@ let export service ~on:socket = in crashed -let start_server address ~n_workers = +let start_server ~sw ~process_mgr ~domain_mgr address ~n_workers = + let open Lwt.Syntax in + Lwt_eio.run_lwt @@ fun () -> let config = Capnp_rpc_unix.Vat_config.create ~secret_key:(`File "server.pem") address in let service_id = Capnp_rpc_unix.Vat_config.derived_id config "solver-service" in - let create_worker commits = - let cmd = - ("", [| Sys.argv.(0); "--worker"; Remote_commit.list_to_string commits |]) - in - Worker_process.create cmd - in - let* service = Service.v ~n_workers ~create_worker in + let service = Service.create ~sw ~domain_mgr ~process_mgr ~n_workers in + let service = Service.capnp_service service in let restore = Capnp_rpc_net.Restorer.single service_id service in let+ vat = Capnp_rpc_unix.serve config ~restore in Capnp_rpc_unix.Vat.sturdy_uri vat service_id -let main () hash address sockpath n_workers = - match hash with - | Some commits_str -> Solver.main (Remote_commit.list_of_string_or_fail commits_str) +let main () address sockpath n_workers = + Eio_main.run @@ fun env -> + Switch.run @@ fun sw -> + let process_mgr = env#process_mgr in + Lwt_eio.with_event_loop ~clock:env#clock @@ fun () -> + match address with + | Some address -> + (* Run with a capnp address as the endpoint *) + let uri = start_server ~sw ~process_mgr ~domain_mgr:env#domain_mgr address ~n_workers in + Fmt.pr "Solver service running at: %a@." Uri.pp_hum uri; + Fiber.await_cancel () | None -> - Eio_main.run @@ fun env -> - Lwt_eio.with_event_loop ~clock:env#clock @@ fun () -> - Lwt_eio.run_lwt @@ fun () -> - match address with - | Some address -> - (* Run with a capnp address as the endpoint *) - let* uri = start_server address ~n_workers in - Fmt.pr "Solver service running at: %a@." Uri.pp_hum uri; - fst @@ Lwt.wait () - | None -> + ignore (sockpath, export); + assert false +(* let socket = match sockpath with | Some path -> @@ -112,6 +109,7 @@ let main () hash address sockpath n_workers = in let* service = Service.v ~n_workers ~create_worker in export service ~on:socket +*) (* Command-line parsing *) @@ -122,15 +120,9 @@ let setup_log = Term.( const setup_log $ Fmt_cli.style_renderer ~docs () $ Logs_cli.level ~docs ()) -let worker_commits = - Arg.value - @@ Arg.opt Arg.(some string) None - @@ Arg.info ~doc:"The hash commits of the worker." ~docv:"COMMITS" - [ "worker" ] - let internal_workers = Arg.value - @@ Arg.opt Arg.int 20 + @@ Arg.opt Arg.int (Domain.recommended_domain_count () - 1) @@ Arg.info ~doc:"The number of sub-process solving requests in parallel" ~docv:"N" [ "internal-workers" ] @@ -164,7 +156,6 @@ let cmd = Term.( const main $ setup_log - $ worker_commits $ address $ sockpath $ internal_workers) diff --git a/service/remote_commit.ml b/bin/remote_commit.ml similarity index 100% rename from service/remote_commit.ml rename to bin/remote_commit.ml diff --git a/service/remote_commit.mli b/bin/remote_commit.mli similarity index 100% rename from service/remote_commit.mli rename to bin/remote_commit.mli diff --git a/dune b/dune index 108af19..be0ebd0 100644 --- a/dune +++ b/dune @@ -1,3 +1,3 @@ (dirs :standard \ var) -(vendored_dirs ocluster ocurrent ocaml-dockerfile) +(vendored_dirs ocluster ocurrent ocaml-dockerfile opam-0install-solver) diff --git a/examples/dune b/examples/dune index e3c386c..e9ee584 100644 --- a/examples/dune +++ b/examples/dune @@ -4,6 +4,7 @@ (names main submit) (package solver-service) (libraries + eio_main current_git capnp-rpc-unix solver-service-api diff --git a/examples/main.ml b/examples/main.ml index d321357..e8a5bfb 100644 --- a/examples/main.ml +++ b/examples/main.ml @@ -36,10 +36,11 @@ let opam_template arch = |} arch -let get_vars ~ocaml_package_name ~ocaml_version ?arch () = - let+ vars = - Solver_service.Process.pread - ("", [| "opam"; "config"; "expand"; opam_template arch |]) +let get_vars ~process_mgr ~ocaml_package_name ~ocaml_version ?arch () = + Lwt_eio.run_eio @@ fun () -> + let vars = + Eio.Process.parse_out process_mgr Eio.Buf_read.take_all + ["opam"; "config"; "expand"; opam_template arch] in let json = match Yojson.Safe.from_string vars with @@ -55,15 +56,16 @@ let get_vars ~ocaml_package_name ~ocaml_version ?arch () = in Result.get_ok @@ Solver_service_api.Worker.Vars.of_yojson json -let get_opam_file pv = - Solver_service.Process.pread ("", [| "opam"; "show"; "--raw"; pv |]) +let get_opam_file ~process_mgr pv = + Lwt_eio.run_eio @@ fun () -> + Eio.Process.parse_out process_mgr Eio.Buf_read.take_all ["opam"; "show"; "--raw"; pv] -let run_client ~package ~version ~ocaml_version ~opam_commit service = +let run_client ~process_mgr ~package ~version ~ocaml_version ~opam_commit service = let pv = package ^ "." ^ version in let* platform = - get_vars ~ocaml_package_name:"ocaml-base-compiler" ~ocaml_version () + get_vars ~process_mgr ~ocaml_package_name:"ocaml-base-compiler" ~ocaml_version () in - let* opam_file = get_opam_file pv in + let* opam_file = get_opam_file ~process_mgr pv in let request = Solver_service_api.Worker.Solve_request. { @@ -88,11 +90,14 @@ let run_client ~package ~version ~ocaml_version ~opam_commit service = | Error `Cancelled -> Fmt.failwith "Job Cancelled" let connect package version ocaml_version opam_commit uri = - Lwt_main.run - (let client_vat = Capnp_rpc_unix.client_only_vat () in - let sr = Capnp_rpc_unix.Vat.import_exn client_vat uri in - Capnp_rpc_unix.with_cap_exn sr - (run_client ~package ~version ~ocaml_version ~opam_commit)) + Eio_main.run @@ fun env -> + let process_mgr = env#process_mgr in + Lwt_eio.with_event_loop ~clock:env#clock @@ fun () -> + Lwt_eio.run_lwt @@ fun () -> + let client_vat = Capnp_rpc_unix.client_only_vat () in + let sr = Capnp_rpc_unix.Vat.import_exn client_vat uri in + Capnp_rpc_unix.with_cap_exn sr + (run_client ~process_mgr ~package ~version ~ocaml_version ~opam_commit) open Cmdliner diff --git a/service/dune b/service/dune index b7e0830..c03495e 100644 --- a/service/dune +++ b/service/dune @@ -1,36 +1,14 @@ (library (name solver_service) (public_name solver-service) - (preprocess - (pps ppx_deriving.std ppx_deriving_yojson)) (libraries - eio_main + eio lwt_eio solver-service-api ppx_deriving_yojson.runtime + prometheus-app opam-0install - capnp-rpc-unix + capnp-rpc-net git-unix ocaml-version - dune-build-info - str - fmt.cli - fmt.tty) - (modules - epoch_lock - git_context - opam_repository - opam_repository_intf - process - remote_commit - internal_worker - service - solver - solver_service)) - -(executable - (name main) - (package solver-service) - (public_name solver-service) - (libraries solver-service logs.cli) - (modules main)) + ocluster-api)) diff --git a/service/epoch_lock.ml b/service/epoch_lock.ml deleted file mode 100644 index 9c4bd15..0000000 --- a/service/epoch_lock.ml +++ /dev/null @@ -1,60 +0,0 @@ -open Lwt.Infix - -type ('a, 'key) t = { - mutable current : - [ `Idle - | `Activating of unit Lwt.t (* Promise resolves after moving to [`Active] *) - | `Active of 'key * 'a - | `Draining of - unit Lwt.t * unit Lwt_condition.t - (* Promise resolves after moving back to [`Active] *) ]; - mutable users : int; (* Zero unless active or draining *) - create : 'key -> 'a Lwt.t; - dispose : 'a -> unit Lwt.t; -} - -let activate t epoch ~ready ~set_ready = - Lwt.finalize - (fun () -> - t.current <- `Activating ready; - t.create epoch >|= fun v -> - t.current <- `Active (epoch, v); - Lwt.wakeup_later set_ready ()) - (fun () -> - match t.current with - | `Activating _ -> Lwt.return (t.current <- `Idle) - | _ -> Lwt.return_unit) - -let rec with_epoch t epoch fn = - match t.current with - | `Active (current_epoch, v) when current_epoch = epoch -> - t.users <- t.users + 1; - Lwt.finalize - (fun () -> fn v) - (fun () -> - t.users <- t.users - 1; - (match t.current with - | `Active _ -> () - | `Draining (_, cond) -> - if t.users = 0 then Lwt_condition.broadcast cond () - | `Idle | `Activating _ -> assert false); - Lwt.return_unit) - | `Active (_, old_v) -> - let cond = Lwt_condition.create () in - let ready, set_ready = Lwt.wait () in - t.current <- `Draining (ready, cond); - (* After this point, no new users can start. *) - let rec drain () = - if t.users = 0 then Lwt.return_unit - else Lwt_condition.wait cond >>= drain - in - drain () >>= fun () -> - t.dispose old_v >>= fun () -> - activate t epoch ~ready ~set_ready >>= fun () -> with_epoch t epoch fn - | `Draining (ready, _) | `Activating ready -> - ready >>= fun () -> with_epoch t epoch fn - | `Idle -> - let ready, set_ready = Lwt.wait () in - activate t epoch ~ready ~set_ready >>= fun () -> with_epoch t epoch fn - -let v ~create ~dispose () = { current = `Idle; users = 0; create; dispose } diff --git a/service/epoch_lock.mli b/service/epoch_lock.mli deleted file mode 100644 index 01f9905..0000000 --- a/service/epoch_lock.mli +++ /dev/null @@ -1,22 +0,0 @@ -(** Divide jobs up into distinct epochs. Any number of jobs can run at the same - time within an epoch, but changing epoch requires first draining the - existing jobs, finishing the epoch, and then creating the new one. The - solver uses this to handle updates to opam-repository (each commit is a - separate epoch). *) - -type ('a, 'key) t - -val v : - create:('key -> 'a Lwt.t) -> - dispose:('a -> unit Lwt.t) -> - unit -> - ('a, 'key) t -(** [v ~create ~dispose ()] is an epoch lock that calls [create] to start a new - epoch and [dispose] to finish one. A new epoch doesn't start until the old - one has been disposed. *) - -val with_epoch : ('a, 'key) t -> 'key -> ('a -> 'b Lwt.t) -> 'b Lwt.t -(** [with_epoch t epoch fn] runs [fn v] with the [v] for [epoch]. If we are - already in [epoch], [fn] runs immediately. If we are already in another - epoch then we wait for all users in the previous epoch to finish, then - create a new one, then run [fn]. *) diff --git a/service/git_context.ml b/service/git_context.ml index 1ab7800..72b5d52 100644 --- a/service/git_context.ml +++ b/service/git_context.ml @@ -2,11 +2,15 @@ module Store = Git_unix.Store module Search = Git.Search.Make (Digestif.SHA1) (Store) open Lwt.Infix +type index = OpamFile.OPAM.t OpamPackage.Version.Map.t Lazy.t OpamPackage.Name.Map.t + +let empty_index = OpamPackage.Name.Map.empty + type rejection = UserConstraint of OpamFormula.atom | Unavailable type t = { env : string -> OpamVariable.variable_contents option; - packages : OpamFile.OPAM.t OpamPackage.Version.Map.t OpamPackage.Name.Map.t; + packages : index; pins : (OpamPackage.Version.t * OpamFile.OPAM.t) OpamPackage.Name.Map.t; constraints : OpamFormula.version_constraint OpamTypes.name_map; (* User-provided constraints *) @@ -15,6 +19,8 @@ type t = { lower_bound : bool; } +let git_lock = Eio.Mutex.create () + let ocaml_beta_pkg = OpamPackage.of_string "ocaml-beta.enabled" (* From https://github.com/ocaml/ocaml-beta-repository/blob/master/packages/ocaml-beta/ocaml-beta.enabled/opam *) @@ -91,35 +97,36 @@ let candidates t name = (OpamPackage.Name.to_string name); [] | Some versions -> - let versions = - if - t.with_beta_remote - && OpamPackage.Name.compare name (OpamPackage.name ocaml_beta_pkg) - = 0 - then - OpamPackage.Version.Map.add - (OpamPackage.version ocaml_beta_pkg) - ocaml_beta_opam versions - else versions - in - let user_constraints = user_restrictions t name in - OpamPackage.Version.Map.bindings versions - |> List.fast_sort (version_compare t.lower_bound) - |> List.rev_map (fun (v, opam) -> - match user_constraints with - | Some test - when not - (OpamFormula.check_version_formula - (OpamFormula.Atom test) v) -> - (v, Error (UserConstraint (name, Some test))) - | _ -> - let pkg = OpamPackage.create name v in - (v, filter_available t pkg opam))) + let versions = Eio.Mutex.use_ro git_lock (fun () -> Lazy.force versions) in + let versions = + if + t.with_beta_remote + && OpamPackage.Name.compare name (OpamPackage.name ocaml_beta_pkg) + = 0 + then + OpamPackage.Version.Map.add + (OpamPackage.version ocaml_beta_pkg) + ocaml_beta_opam versions + else versions + in + let user_constraints = user_restrictions t name in + OpamPackage.Version.Map.bindings versions + |> List.fast_sort (version_compare t.lower_bound) + |> List.rev_map (fun (v, opam) -> + match user_constraints with + | Some test + when not + (OpamFormula.check_version_formula + (OpamFormula.Atom test) v) -> + (v, Error (UserConstraint (name, Some test))) + | _ -> + let pkg = OpamPackage.create name v in + (v, filter_available t pkg opam))) let pp_rejection f = function | UserConstraint x -> - Fmt.pf f "Rejected by user-specified constraint %s" - (OpamFormula.string_of_atom x) + Fmt.pf f "Rejected by user-specified constraint %s" + (OpamFormula.string_of_atom x) | Unavailable -> Fmt.string f "Availability condition not satisfied" let read_dir store hash = @@ -128,75 +135,77 @@ let read_dir store hash = | Ok (Git.Value.Tree tree) -> Some tree | Ok _ -> None +let opam_lock = Mutex.create () (* https://github.com/ocaml/opam/issues/5591 *) + +let opam_read_from_string_threadsafe str = + Mutex.lock opam_lock; + let x = OpamFile.OPAM.read_from_string str in + Mutex.unlock opam_lock; + x + let read_package store pkg hash = Search.find store hash (`Path [ "opam" ]) >>= function | None -> - Fmt.failwith "opam file not found for %s" (OpamPackage.to_string pkg) + Fmt.failwith "opam file not found for %s" (OpamPackage.to_string pkg) | Some hash -> ( Store.read store hash >|= function - | Ok (Git.Value.Blob blob) -> - OpamFile.OPAM.read_from_string (Store.Value.Blob.to_string blob) + | Ok (Git.Value.Blob blob) -> opam_read_from_string_threadsafe (Store.Value.Blob.to_string blob) | _ -> - Fmt.failwith "Bad Git object type for %s!" (OpamPackage.to_string pkg) - ) + Fmt.failwith "Bad Git object type for %s!" (OpamPackage.to_string pkg) + ) (* Get a map of the versions inside [entry] (an entry under "packages") *) let read_versions store (entry : Store.Value.Tree.entry) = read_dir store entry.node >>= function - | None -> Lwt.return_none + | None -> Lwt.return OpamPackage.Version.Map.empty | Some tree -> - Store.Value.Tree.to_list tree - |> Lwt_list.fold_left_s - (fun acc (entry : Store.Value.Tree.entry) -> - match OpamPackage.of_string_opt entry.name with - | Some pkg -> - read_package store pkg entry.node >|= fun opam -> - OpamPackage.Version.Map.add pkg.version opam acc - | None -> - OpamConsole.log "opam-0install" "Invalid package name %S" - entry.name; - Lwt.return acc) - OpamPackage.Version.Map.empty - >|= fun versions -> Some versions - -let merge_versions vs1 vs2 = - OpamPackage.Version.Map.merge - (fun _ v1 v2 -> - match (v1, v2) with - | None, _ -> v2 - | Some _, None -> v1 - | Some _, Some _ -> - (* Overwrite the v1 entry. This gives the semantics that that second - repo given to read_packages is an overlay on the first one. *) - v2) - vs1 vs2 - -let add_versions name versions packages_by_name = - OpamPackage.Name.Map.update name - (fun prev_versions -> merge_versions prev_versions versions) - OpamPackage.Version.Map.empty packages_by_name - -let read_packages ?acc:(result_acc = OpamPackage.Name.Map.empty) store commit = + Store.Value.Tree.to_list tree + |> Lwt_list.fold_left_s + (fun acc (entry : Store.Value.Tree.entry) -> + match OpamPackage.of_string_opt entry.name with + | Some pkg -> + read_package store pkg entry.node >|= fun opam -> + OpamPackage.Version.Map.add pkg.version opam acc + | None -> + OpamConsole.log "opam-0install" "Invalid package name %S" + entry.name; + Lwt.return acc) + OpamPackage.Version.Map.empty + +let read_packages ~store tree = + Store.Value.Tree.to_list tree + |> List.filter_map (fun (entry : Store.Value.Tree.entry) -> + match OpamPackage.Name.of_string entry.name with + | exception ex -> + OpamConsole.log "opam-0install" + "Invalid package name %S: %s" entry.name + (Printexc.to_string ex); + None + | name -> + Some (name, lazy (Lwt_eio.run_lwt_in_main (fun () -> read_versions store entry))) + ) + |> OpamPackage.Name.Map.of_list + +let overlay _name v1 v2 = + match (v1, v2) with + | None, _ -> v2 + | Some _, None -> v1 + | Some _, Some _ -> + (* Overwrite the v1 entry. This gives the semantics that that second + repo given to read_packages is an overlay on the first one. *) + v2 + +let read_packages ?acc:(super=empty_index) store commit : index = + Lwt_eio.run_lwt_in_main @@ fun () -> Search.find store commit (`Commit (`Path [ "packages" ])) >>= function | None -> Fmt.failwith "Failed to find packages directory!" | Some tree_hash -> ( read_dir store tree_hash >>= function | None -> Fmt.failwith "'packages' is not a directory!" | Some tree -> - Store.Value.Tree.to_list tree - |> Lwt_list.fold_left_s - (fun acc (entry : Store.Value.Tree.entry) -> - match OpamPackage.Name.of_string entry.name with - | exception ex -> - OpamConsole.log "opam-0install" - "Invalid package name %S: %s" entry.name - (Printexc.to_string ex); - Lwt.return acc - | name -> ( - read_versions store entry >|= function - | None -> acc - | Some versions -> add_versions name versions acc)) - result_acc) + let packages = read_packages ~store tree in + Lwt.return (OpamPackage.Name.Map.merge overlay super packages) + ) let create ?(test = OpamPackage.Name.Set.empty) ?(pins = OpamPackage.Name.Map.empty) ?(lower_bound = false) ~constraints diff --git a/service/git_context.mli b/service/git_context.mli index 10d892e..07eaee0 100644 --- a/service/git_context.mli +++ b/service/git_context.mli @@ -1,10 +1,14 @@ include Opam_0install.S.CONTEXT +type index + +val empty_index : index + val read_packages : - ?acc:OpamFile.OPAM.t OpamPackage.Version.Map.t OpamPackage.Name.Map.t -> + ?acc:index -> Git_unix.Store.t -> Git_unix.Store.Hash.t -> - OpamFile.OPAM.t OpamPackage.Version.Map.t OpamPackage.Name.Map.t Lwt.t + index (** [read_packages store commit] is an index of the opam files in [store] at [commit]. *) @@ -14,7 +18,9 @@ val create : ?lower_bound:bool -> constraints:OpamFormula.version_constraint OpamPackage.Name.Map.t -> env:(string -> OpamVariable.variable_contents option) -> - packages:OpamFile.OPAM.t OpamPackage.Version.Map.t OpamPackage.Name.Map.t -> + packages:index -> with_beta_remote:bool -> unit -> t + +val opam_read_from_string_threadsafe : string -> OpamFile.OPAM.t diff --git a/service/internal_worker.ml b/service/internal_worker.ml deleted file mode 100644 index 9325173..0000000 --- a/service/internal_worker.ml +++ /dev/null @@ -1,37 +0,0 @@ -open Lwt.Infix - -module Worker_process = struct - type state = - | Available - | Released - | Closed of Unix.process_status - | Failed of exn - - type t = { process : Lwt_process.process; mutable state : state } - - let create cmd = { process = Lwt_process.open_process cmd; state = Available } - let pid t = t.process#pid - - let state t = - match Lwt.state t.process#status with - | Lwt.Sleep -> t.state - | Lwt.Fail ex -> Failed ex - | Lwt.Return status -> Closed status - - let release t = t.state <- Released - - let close t = - release t; - t.process#terminate; - t.process#close >|= fun status -> - t.state <- Closed status; - status - - let read_line t = Lwt_io.read_line t.process#stdout - let write t msg = Lwt_io.write t.process#stdin msg - - let read_into t len = - let buf = Bytes.create len in - Lwt_io.read_into_exactly t.process#stdout buf 0 len >|= fun () -> - Bytes.unsafe_to_string buf -end diff --git a/service/internal_worker.mli b/service/internal_worker.mli deleted file mode 100644 index fdd0c65..0000000 --- a/service/internal_worker.mli +++ /dev/null @@ -1,18 +0,0 @@ -module Worker_process : sig - type state = - | Available - | Released - | Closed of Unix.process_status - | Failed of exn - - type t = { process : Lwt_process.process; mutable state : state } - - val create : Lwt_process.command -> t - val pid : t -> int - val state : t -> state - val read_line : t -> String.t Lwt.t - val write : t -> string -> unit Lwt.t - val read_into : t -> int -> string Lwt.t - val release : t -> unit - val close : t -> Unix.process_status Lwt.t -end diff --git a/service/opam_repository.ml b/service/opam_repository.ml index 963e8a3..707d292 100644 --- a/service/opam_repository.ml +++ b/service/opam_repository.ml @@ -1,9 +1,16 @@ -open Lwt.Infix +open Eio.Std module Log = Solver_service_api.Solver.Log module Store = Git_unix.Store let default_repo_url = "https://github.com/ocaml/opam-repository.git" -let sanitize_re = Str.regexp "[^A-Za-z0-9-]" + +let replace_special = + String.map @@ function + | 'A'..'Z' + | 'a'..'z' + | '0'..'9' + | '-' as c -> c + | _ -> '_' let rec mkdir_p path = try Unix.mkdir path 0o700 with @@ -20,7 +27,7 @@ let repo_url_to_clone_path repo_url = let uri = Uri.of_string repo_url in let sane_host = match Uri.host uri with - | Some host -> Str.global_replace sanitize_re "_" host + | Some host -> replace_special host | None -> "no_host" in let sane_path = @@ -28,53 +35,44 @@ let repo_url_to_clone_path repo_url = path uri |> pct_decode |> Filename.chop_extension - |> Str.global_replace sanitize_re "_") + |> replace_special) in Fpath.(v sane_host / sane_path) -let clone ?(repo_url = default_repo_url) () = +let clone ~process_mgr ?(repo_url = default_repo_url) () = let clone_path = repo_url_to_clone_path repo_url in let clone_parent = Fpath.parent clone_path |> Fpath.to_string in let clone_path_str = Fpath.to_string clone_path in match Unix.lstat clone_path_str with - | Unix.{ st_kind = S_DIR; _ } -> Lwt.return_unit + | Unix.{ st_kind = S_DIR; _ } -> () | _ -> Fmt.failwith "%S is not a directory!" clone_path_str | exception Unix.Unix_error (Unix.ENOENT, _, _) -> mkdir_p clone_parent; - Process.exec ("", [| "git"; "clone"; "--bare"; repo_url; clone_path_str |]) + Eio.Process.run process_mgr ["git"; "clone"; "--bare"; repo_url; clone_path_str] -let open_store ?(repo_url = default_repo_url) () = - clone ~repo_url () >>= fun () -> +let open_store ~process_mgr ?(repo_url = default_repo_url) () = + clone ~process_mgr ~repo_url (); let path = repo_url_to_clone_path repo_url in - Git_unix.Store.v ~dotgit:path path >|= function + match Lwt_eio.run_lwt (fun () -> Git_unix.Store.v ~dotgit:path path) with | Ok x -> x | Error e -> Fmt.failwith "Failed to open %a: %a" Fpath.pp path Store.pp_error e -let close_store store = Git_unix.Store.close_pack_files store - -let with_store ?repo_url f = - open_store ?repo_url () >>= fun store -> - Lwt.finalize (fun () -> f store) (fun () -> close_store store) - -let oldest_commit_with ~repo_url ~from paths = +let oldest_commit_with ~process_mgr ~repo_url ~from paths = let clone_path = repo_url_to_clone_path repo_url |> Fpath.to_string in let cmd = "git" - :: "-C" - :: clone_path + :: "-C" :: clone_path :: "log" - :: "-n" - :: "1" + :: "-n" :: "1" :: "--format=format:%H" :: from :: "--" :: paths in - let cmd = ("", Array.of_list cmd) in - Process.pread cmd >|= String.trim + Eio.Process.parse_out process_mgr Eio.Buf_read.take_all cmd |> String.trim -let oldest_commits_with ~from pkgs = +let oldest_commits_with ~process_mgr ~from pkgs = let paths = pkgs |> List.map (fun pkg -> @@ -83,10 +81,11 @@ let oldest_commits_with ~from pkgs = Printf.sprintf "packages/%s/%s.%s" name name version) in from - |> Lwt_list.map_p (fun (repo_url, hash) -> - Lwt.bind (oldest_commit_with ~repo_url ~from:hash paths) (fun commit -> - Lwt.return (repo_url, commit))) + |> Fiber.List.map (fun (repo_url, hash) -> + let commit = oldest_commit_with ~process_mgr ~repo_url ~from:hash paths in + (repo_url, commit) + ) -let fetch ?(repo_url = default_repo_url) () = +let fetch ~process_mgr ?(repo_url = default_repo_url) () = let clone_path = repo_url_to_clone_path repo_url |> Fpath.to_string in - Process.exec ("", [| "git"; "-C"; clone_path; "fetch"; "origin" |]) + Eio.Process.run process_mgr ["git"; "-C"; clone_path; "fetch"; "origin"] diff --git a/service/opam_repository.mli b/service/opam_repository.mli index ca65fa5..71a98ac 100644 --- a/service/opam_repository.mli +++ b/service/opam_repository.mli @@ -1,2 +1,25 @@ -include Opam_repository_intf.S -(** @inline *) +val open_store : process_mgr:#Eio.Process.mgr -> ?repo_url:string -> unit -> Git_unix.Store.t +(** Open the local clone of the repo at the given URL. If the local clone does + not yet exist, this clones it first. If repo_url is unspecified, it + defaults to ocaml/opam-repository on GitHub. *) + +val clone : process_mgr:#Eio.Process.mgr -> ?repo_url:string -> unit -> unit +(** [clone ()] ensures that a local clone of the specified repo exists. If + not, it clones it. If repo_url is unspecified, it defaults to + ocaml/opam-repository on GitHub. *) + +val oldest_commits_with : + process_mgr:#Eio.Process.mgr -> + from:(string * string) list -> + OpamPackage.t list -> + (string * string) list +(** Use "git-log" to find the oldest commits with these package versions. This + avoids invalidating the Docker build cache on every update to + opam-repository. + + @param from + The repo_url and commit hash for each opam_repository at which to begin + the search. *) + +val fetch : process_mgr:#Eio.Process.mgr -> ?repo_url:string -> unit -> unit +(* Does a "git fetch origin" to update the store. *) diff --git a/service/opam_repository_intf.ml b/service/opam_repository_intf.ml deleted file mode 100644 index 6cd04b1..0000000 --- a/service/opam_repository_intf.ml +++ /dev/null @@ -1,34 +0,0 @@ -module type S = sig - val open_store : ?repo_url:string -> unit -> Git_unix.Store.t Lwt.t - (** Open the local clone of the repo at the given URL. If the local clone does - not yet exist, this clones it first. If repo_url is unspecified, it - defaults to ocaml/opam-repository on GitHub. *) - - val close_store : Git_unix.Store.t -> unit Lwt.t - (** [close_store t] closes all file descriptors used by [t] *) - - val with_store : - ?repo_url:string -> (Git_unix.Store.t -> 'a Lwt.t) -> 'a Lwt.t - (** [with_store f] ensures that after [f] the store that being opened is - closed *) - - val clone : ?repo_url:string -> unit -> unit Lwt.t - (** [clone ()] ensures that a local clone of the specified repo exists. If - not, it clones it. If repo_url is unspecified, it defaults to - ocaml/opam-repository on GitHub. *) - - val oldest_commits_with : - from:(string * string) list -> - OpamPackage.t list -> - (string * string) list Lwt.t - (** Use "git-log" to find the oldest commits with these package versions. This - avoids invalidating the Docker build cache on every update to - opam-repository. - - @param from - The repo_url and commit hash for each opam_repository at which to begin - the search. *) - - val fetch : ?repo_url:string -> unit -> unit Lwt.t - (* Does a "git fetch origin" to update the store. *) -end diff --git a/service/process.ml b/service/process.ml deleted file mode 100644 index 8b38079..0000000 --- a/service/process.ml +++ /dev/null @@ -1,29 +0,0 @@ -(** Helper functions for Lwt process handling. *) - -open Lwt.Infix - -let pp_args = - let sep = Fmt.(const string) " " in - Fmt.(array ~sep (quote string)) - -let pp_cmd f = function - | "", args -> pp_args f args - | bin, args -> Fmt.pf f "(%S, %a)" bin pp_args args - -let pp_status f = function - | Unix.WEXITED x -> Fmt.pf f "exited with status %d" x - | Unix.WSIGNALED x -> Fmt.pf f "failed with signal %a" Fmt.Dump.signal x - | Unix.WSTOPPED x -> Fmt.pf f "stopped with signal %a" Fmt.Dump.signal x - -let check_status cmd = function - | Unix.WEXITED 0 -> () - | status -> Fmt.failwith "%a %a" pp_cmd cmd pp_status status - -let exec cmd = - Lwt_process.with_process_none cmd @@ fun proc -> - proc#status >|= check_status cmd - -let pread cmd = - Lwt_process.with_process_in cmd @@ fun proc -> - Lwt_io.read proc#stdout >>= fun output -> - proc#status >|= check_status cmd >|= fun () -> output diff --git a/service/service.ml b/service/service.ml deleted file mode 100644 index 8244e24..0000000 --- a/service/service.ml +++ /dev/null @@ -1,283 +0,0 @@ -open Lwt.Infix -open Capnp_rpc_lwt -module Worker = Solver_service_api.Worker -module Log = Solver_service_api.Solver.Log -module Selection = Worker.Selection -module Store = Git_unix.Store -module Worker_process = Internal_worker.Worker_process - -let oldest_commit = Lwt_pool.create 180 @@ fun _ -> Lwt.return_unit -(* we are using at most 360 pipes at the same time and that's enough to keep the current - * performance and prevent some jobs to fail because of file descriptors exceed the limit.*) - -module Make (Opam_repo : Opam_repository_intf.S) = struct - module Epoch : sig - type t - (* An Epoch handles all requests for a single opam-repository HEAD commit. *) - - val create : - n_workers:int -> - create_worker:(Remote_commit.t list -> Worker_process.t) -> - Remote_commit.t list -> - t Lwt.t - - val process : - switch:Lwt_switch.t -> - log:Log.X.t Capability.t -> - id:string -> - Worker.Solve_request.t -> - Worker_process.t -> - (string list, string) result Lwt.t - - val handle : - switch:Lwt_switch.t -> - log:Solver_service_api.Solver.Log.t -> - Worker.Solve_request.t -> - t -> - Selection.t list Lwt.t - - val dispose : t -> unit Lwt.t - end = struct - type t = Worker_process.t Lwt_pool.t - - let validate (worker : Worker_process.t) = - match Worker_process.state worker with - | Available -> Lwt.return true - | Released -> - Format.eprintf - "Worker %d is released - closing and removing from pool@." - (Worker_process.pid worker); - Lwt.return false - | Closed status -> - Format.eprintf "Worker %d is closed (%a) - removing from pool@." - (Worker_process.pid worker) - Process.pp_status status; - Lwt.return false - | Failed ex -> Lwt.fail ex - - let dispose (worker : Worker_process.t) = - let pid = Worker_process.pid worker in - Fmt.epr "Terminating worker %d@." pid; - Worker_process.close worker >|= function - | Unix.WEXITED code -> - Fmt.epr "Worker %d finished. Exited with code %d@." pid code - | Unix.WSIGNALED code -> - Fmt.epr "Worker %d finished. Killed by signal %a@." pid - Fmt.Dump.signal code - | Unix.WSTOPPED code -> - Fmt.epr "Worker %d finished. Stopped by signal %a@." pid - Fmt.Dump.signal code - - let update_opam_repository_to_commit commit = - let repo_url = commit.Remote_commit.repo in - let hash = Store.Hash.of_hex commit.Remote_commit.hash in - Opam_repo.with_store ~repo_url (fun store -> Store.mem store hash) - >>= fun r -> - if r then Lwt.return_unit - else ( - Fmt.pr "Need to update %s to get new commit %a@." repo_url Store.Hash.pp - hash; - Opam_repo.fetch ~repo_url () >>= fun () -> - Opam_repo.with_store ~repo_url (fun store -> Store.mem store hash) - >>= fun r -> - if r then Lwt.return_unit - else Fmt.failwith "Still missing commit after update!") - (*Closing the store after usage is necessary to prevent file descriptor leaks*) - - let create ~n_workers ~create_worker commits = - Lwt_list.iter_p update_opam_repository_to_commit commits >|= fun () -> - Lwt_pool.create n_workers ~validate ~dispose (fun () -> - Lwt.return (create_worker commits)) - - (* Send [request] to [worker] and read the reply. *) - let process ~switch ~log ~id request worker = - let request_str = - Worker.Solve_request.to_yojson request |> Yojson.Safe.to_string - in - let request_str = - Printf.sprintf "%d\n%s" (String.length request_str) request_str - in - let process = - Worker_process.write worker request_str >>= fun () -> - Worker_process.read_line worker >>= fun time -> - Worker_process.read_line worker >>= fun len -> - match Astring.String.to_int len with - | None -> Fmt.failwith "Bad frame from worker: time=%S len=%S" time len - | Some len -> ( - Worker_process.read_into worker len >|= fun results -> - match results.[0] with - | '+' -> - Log.info log "%s: found solution in %s s" id time; - let packages = - Astring.String.with_range ~first:1 results - |> Astring.String.cuts ~sep:" " - in - Ok packages - | '-' -> - Log.info log "%s: eliminated all possibilities in %s s" id time; - let msg = results |> Astring.String.with_range ~first:1 in - Error msg - | '!' -> - let msg = results |> Astring.String.with_range ~first:1 in - Fmt.failwith "BUG: solver worker failed: %s" msg - | _ -> Fmt.failwith "BUG: bad output: %s" results) - in - ( Lwt_switch.add_hook_or_exec (Some switch) @@ fun () -> - (* Release the worker before cancelling the promise of the request, in order to prevent the - * workers's pool choosing the worker for another processing.*) - if Lwt.state process = Lwt.Sleep then ( - Worker_process.release worker; - Lwt.cancel process; - dispose worker) - else Lwt.return_unit ) - >>= fun () -> process - - let dispose = Lwt_pool.clear - let ocaml = OpamPackage.Name.of_string "ocaml" - - (* If a local package has a literal constraint on OCaml's version and it doesn't match - the platform, we just remove that package from the set to test, so other packages - can still be tested. *) - let compatible_with ~ocaml_version (dep_name, filter) = - let check_ocaml = function - | OpamTypes.Constraint (op, OpamTypes.FString v) -> - let v = OpamPackage.Version.of_string v in - OpamFormula.eval_relop op ocaml_version v - | _ -> true - in - if OpamPackage.Name.equal dep_name ocaml then - OpamFormula.eval check_ocaml filter - else true - - let handle ~switch ~log request t = - let { - Worker.Solve_request.opam_repository_commits; - platforms; - root_pkgs; - pinned_pkgs; - lower_bound = _; - } = - request - in - let root_pkgs = List.map fst root_pkgs in - let pinned_pkgs = List.map fst pinned_pkgs in - let pins = - root_pkgs @ pinned_pkgs - |> List.map (fun pkg -> OpamPackage.name (OpamPackage.of_string pkg)) - |> OpamPackage.Name.Set.of_list - in - Log.info log "Solving for %a" Fmt.(list ~sep:comma string) root_pkgs; - platforms - |> Lwt_list.map_p (fun p -> - let id, vars = p in - let ocaml_version = - OpamPackage.Version.of_string vars.Worker.Vars.ocaml_version - in - let compatible_root_pkgs = - request.root_pkgs - |> List.filter (fun (_name, contents) -> - if String.equal "" contents then true - else - let opam = OpamFile.OPAM.read_from_string contents in - let deps = OpamFile.OPAM.depends opam in - OpamFormula.eval (compatible_with ~ocaml_version) deps) - in - (* If some packages are compatible but some aren't, just solve for the compatible ones. - Otherwise, try to solve for everything to get a suitable error. *) - let root_pkgs = - if compatible_root_pkgs = [] then request.root_pkgs - else compatible_root_pkgs - in - let slice = { request with platforms = [ p ]; root_pkgs } in - Lwt_pool.use t (process ~switch ~log ~id slice) >>= function - | Error _ as e -> Lwt.return (id, e) - | Ok packages -> - let repo_packages = - packages - |> List.filter_map (fun pkg -> - let pkg = OpamPackage.of_string pkg in - if OpamPackage.Name.Set.mem pkg.name pins then None - else Some pkg) - in - (* Hack: ocaml-ci sometimes also installs odoc, but doesn't tell us about it. - Make sure we have at least odoc 2.1.1 available, otherwise it won't work on OCaml 5.0. *) - let repo_packages = - OpamPackage.of_string "odoc.2.1.1" :: repo_packages - in - ( Lwt_pool.use oldest_commit @@ fun () -> - Opam_repo.oldest_commits_with repo_packages - ~from:opam_repository_commits ) - >|= fun commits -> - let compat_pkgs = List.map fst compatible_root_pkgs in - (id, Ok { Worker.Selection.id; compat_pkgs; packages; commits })) - >|= List.filter_map (fun (id, result) -> - Log.info log "= %s =" id; - match result with - | Ok result -> - Log.info log "-> @[%a@]" - Fmt.(list ~sep:sp string) - result.Selection.packages; - Log.info log "(valid since opam-repository commit(s): @[%a@])" - Fmt.(list ~sep:semi (pair ~sep:comma string string)) - result.Selection.commits; - Some result - | Error msg -> - Log.info log "%s" msg; - None) - end - - (* Handle a request by distributing it among the worker processes and then aggregating their responses. *) - let handle ~switch t ~log (request : Worker.Solve_request.t) = - let commits = - request.opam_repository_commits - |> List.map (fun (repo, hash) -> Remote_commit.v ~repo ~hash) - in - Epoch_lock.with_epoch t commits (Epoch.handle ~switch ~log request) - - let v ~n_workers ~create_worker = - let create commits = Epoch.create ~n_workers ~create_worker commits in - let t = Epoch_lock.v ~create ~dispose:Epoch.dispose () in - let module X = Solver_service_api.Raw.Service.Solver in - Lwt.return - @@ X.local - @@ object - inherit X.service - - method solve_impl params release_param_caps = - let open X.Solve in - let request = Params.request_get params in - let log = Params.log_get params in - release_param_caps (); - match log with - | None -> Service.fail "Missing log argument!" - | Some log -> ( - Capnp_rpc_lwt.Service.return_lwt @@ fun () -> - Capability.with_ref log @@ fun log -> - match - Worker.Solve_request.of_yojson - (Yojson.Safe.from_string request) - with - | Error msg -> - Lwt_result.fail - (`Capnp - (Capnp_rpc.Error.exn "Bad JSON in request: %s" msg)) - | Ok request -> - Lwt.catch - (fun () -> - handle t ~switch:(Lwt_switch.create ()) ~log request - >|= Result.ok) - (function - | Failure msg -> Lwt_result.fail (`Msg msg) - | ex -> Lwt.return (Fmt.error_msg "%a" Fmt.exn ex)) - >|= fun selections -> - let json = - Yojson.Safe.to_string - (Worker.Solve_response.to_yojson selections) - in - let response, results = - Capnp_rpc_lwt.Service.Response.create Results.init_pointer - in - Results.response_set results json; - Ok response) - end -end diff --git a/service/service.mli b/service/service.mli deleted file mode 100644 index 1138520..0000000 --- a/service/service.mli +++ /dev/null @@ -1,40 +0,0 @@ -module Make (_ : Opam_repository_intf.S) : sig - module Epoch : sig - type t - (* An Epoch handles all requests for a single opam-repository HEAD commit. *) - - val create : - n_workers:int -> - create_worker:(Remote_commit.t list -> Internal_worker.Worker_process.t) -> - Remote_commit.t list -> - t Lwt.t - - val process : - switch:Lwt_switch.t -> - log:Solver_service_api.Solver.Log.X.t Capnp_rpc_lwt.Capability.t -> - id:string -> - Solver_service_api.Worker.Solve_request.t -> - Internal_worker.Worker_process.t -> - (string list, string) result Lwt.t - (** [process ~log ~id request process] will write the [request] to the stdin - of [procress] and read [stdout] returning the packages. Information is - logged into [log] with [id]. *) - - val dispose : t -> unit Lwt.t - end - - val handle : - switch:Lwt_switch.t -> - (Epoch.t, Remote_commit.t list) Epoch_lock.t -> - log:Solver_service_api.Solver.Log.X.t Capnp_rpc_lwt.Capability.t -> - Solver_service_api.Worker.Solve_request.t -> - Solver_service_api.Worker.Selection.t list Lwt.t - - val v : - n_workers:int -> - create_worker:(Remote_commit.t list -> Internal_worker.Worker_process.t) -> - Solver_service_api.Solver.t Lwt.t - (** [v ~n_workers ~create_worker] is a solver service that distributes work to - up to [n_workers] subprocesses, using [create_worker hash] to spawn new - workers. *) -end diff --git a/service/solver.ml b/service/solver.ml index 2d44448..2694c58 100644 --- a/service/solver.ml +++ b/service/solver.ml @@ -1,15 +1,32 @@ +open Eio.Std + module Worker = Solver_service_api.Worker module Solver = Opam_0install.Solver.Make (Git_context) module Store = Git_unix.Store +type reply = (OpamPackage.t list, string) result * float + +type stream = (Solver_service_api.Worker.Solve_request.t * reply Promise.u) Eio.Stream.t + let env (vars : Worker.Vars.t) v = - Opam_0install.Dir_context.std_env ~arch:vars.arch ~os:vars.os - ~os_distribution:vars.os_distribution ~os_version:vars.os_version - ~os_family:vars.os_family ~opam_version:vars.opam_version () v + match v with + | "arch" -> Some (OpamTypes.S vars.arch) + | "os" -> Some (OpamTypes.S vars.os) + | "os-distribution" -> Some (OpamTypes.S vars.os_distribution) + | "os-version" -> Some (OpamTypes.S vars.os_version) + | "os-family" -> Some (OpamTypes.S vars.os_family) + | "opam-version" -> Some (OpamVariable.S vars.opam_version) + | "sys-ocaml-version" -> None + | "ocaml:native" -> Some (OpamTypes.B true) + | "enable-ocaml-beta-repository" -> None (* Fake variable? *) + | _ -> + (* Disabled, as not thread-safe! *) + (* OpamConsole.warning "Unknown variable %S" v; *) + None let parse_opam (name, contents) = let pkg = OpamPackage.of_string name in - let opam = OpamFile.OPAM.read_from_string contents in + let opam = Git_context.opam_read_from_string_threadsafe contents in (OpamPackage.name pkg, (OpamPackage.version pkg, opam)) let solve ~packages ~pins ~root_pkgs ~lower_bound (vars : Worker.Vars.t) = @@ -27,76 +44,55 @@ let solve ~packages ~pins ~root_pkgs ~lower_bound (vars : Worker.Vars.t) = let t0 = Unix.gettimeofday () in let r = Solver.solve context (ocaml_package :: root_pkgs) in let t1 = Unix.gettimeofday () in - Printf.printf "%.2f\n" (t1 -. t0); - match r with - | Ok sels -> - let pkgs = Solver.packages_of_result sels in - Ok (List.map OpamPackage.to_string pkgs) - | Error diagnostics -> Error (Solver.diagnostics diagnostics) + let r = + match r with + | Ok sels -> Ok (Solver.packages_of_result sels) + | Error diagnostics -> Error (Solver.diagnostics diagnostics) + in + r, (t1 -. t0) -let main commits = - let open Lwt.Infix in - let packages = - Lwt_main.run +let last_index = ref None + +let main ~stores (stream:stream) = + Logs.info (fun f -> f "solver.ml:main"); + let packages commits = + Eio.Mutex.use_ro Stores.git_lock @@ fun () -> + match !last_index with + | Some (k, v) when k = commits -> v + | _ -> (* Read all the package from all the given opam-repository repos, * and collate them into a single Map. *) - (Lwt_list.fold_left_s - (fun acc commit -> - let repo_url = commit.Remote_commit.repo in - let hash = Store.Hash.of_hex commit.Remote_commit.hash in - Opam_repository.open_store ~repo_url () >>= fun store -> - Git_context.read_packages ~acc store hash >>= fun packages -> - Opam_repository.close_store store >>= fun () -> Lwt.return packages) - OpamPackage.Name.Map.empty commits) - in - let rec aux () = - match input_line stdin with - | exception End_of_file -> () - | len -> - let len = int_of_string len in - let data = really_input_string stdin len in - let request = - Worker.Solve_request.of_yojson (Yojson.Safe.from_string data) - |> Result.get_ok - in - let { - Worker.Solve_request.opam_repository_commits; - root_pkgs; - pinned_pkgs; - platforms; - lower_bound; - } = - request - in - assert ( - List.for_all - (fun (repo, hash) -> List.mem (Remote_commit.v ~repo ~hash) commits) - opam_repository_commits); - let root_pkgs = List.map parse_opam root_pkgs in - let pinned_pkgs = List.map parse_opam pinned_pkgs in - let pins = root_pkgs @ pinned_pkgs |> OpamPackage.Name.Map.of_list in - let root_pkgs = List.map fst root_pkgs in - platforms - |> List.iter (fun (_id, platform) -> - let msg = - match - solve ~packages ~pins ~root_pkgs ~lower_bound platform - with - | Ok packages -> "+" ^ String.concat " " packages - | Error msg -> "-" ^ msg - in - Printf.printf "%d\n%s%!" (String.length msg) msg); - aux () + let v = + List.fold_left + (fun acc (repo_url, hash) -> + let store = Stores.get stores repo_url in + let hash = Store.Hash.of_hex hash in + Git_context.read_packages ~acc store hash + ) + Git_context.empty_index commits + in + last_index := Some (commits, v); + v in - aux () - -let main commit = - try main commit - with ex -> - Fmt.epr "solver bug: %a@." Fmt.exn ex; - let msg = - match ex with Failure msg -> msg | ex -> Printexc.to_string ex + while true do + let request, reply = Eio.Stream.take stream in + let { + Worker.Solve_request.opam_repository_commits; + root_pkgs; + pinned_pkgs; + platforms; + lower_bound; + } = + request in - let msg = "!" ^ msg in - Printf.printf "0.0\n%d\n%s%!" (String.length msg) msg; - raise ex + let packages = packages opam_repository_commits in + let root_pkgs = List.map parse_opam root_pkgs in + let pinned_pkgs = List.map parse_opam pinned_pkgs in + let pins = root_pkgs @ pinned_pkgs |> OpamPackage.Name.Map.of_list in + let root_pkgs = List.map fst root_pkgs in + platforms + |> List.iter (fun (_id, platform) -> + let r, time = solve ~packages ~pins ~root_pkgs ~lower_bound platform in + Promise.resolve reply (r, time) + ) + done diff --git a/service/solver.mli b/service/solver.mli index d30e865..3ae96e0 100644 --- a/service/solver.mli +++ b/service/solver.mli @@ -1,3 +1,8 @@ -val main : Remote_commit.t list -> unit -(** [main hash] runs a worker process that reads requests from stdin and writes - results to stdout, using the given commit(s) from opam-repository repos. *) +type reply = (OpamPackage.t list, string) result * float + +type stream = (Solver_service_api.Worker.Solve_request.t * reply Eio.Promise.u) Eio.Stream.t + +val main : + stores:Stores.t -> + stream -> unit +(** [main stream] runs a worker process that reads requests from [stream] and solves them. *) diff --git a/service/solver_service.ml b/service/solver_service.ml index 140db27..e1f70cb 100644 --- a/service/solver_service.ml +++ b/service/solver_service.ml @@ -1,8 +1,187 @@ -module Epoch_lock = Epoch_lock -module Git_context = Git_context -module Opam_repository = Opam_repository -module Process = Process -module Service = Service -module Solver = Solver -module Remote_commit = Remote_commit -module Internal_worker = Internal_worker +open Eio.Std +module Worker = Solver_service_api.Worker +module Log = Solver_service_api.Solver.Log +module Selection = Worker.Selection + +type t = { + pool : Solver.stream Lwt_pool.t; + stores : Stores.t; +} + +(* Send [request] to [worker] and read the reply. *) +let process ~log ~id request (worker:Solver.stream) = + let reply, set_reply = Promise.create () in + Eio.Stream.add worker (request, set_reply); + let results, time = Promise.await reply in + match results with + | Ok packages -> + Log.info log "%s: found solution in %f s" id time; + Ok packages + | Error msg -> + Log.info log "%s: eliminated all possibilities in %f s" id time; + Error msg + +let ocaml = OpamPackage.Name.of_string "ocaml" + +(* If a local package has a literal constraint on OCaml's version and it doesn't match + the platform, we just remove that package from the set to test, so other packages + can still be tested. *) +let compatible_with ~ocaml_version (dep_name, filter) = + let check_ocaml = function + | OpamTypes.Constraint (op, OpamTypes.FString v) -> + let v = OpamPackage.Version.of_string v in + OpamFormula.eval_relop op ocaml_version v + | _ -> true + in + if OpamPackage.Name.equal dep_name ocaml then + OpamFormula.eval check_ocaml filter + else true + +(* Handle a request by distributing it among the worker processes and then aggregating their responses. *) +let solve t ~log request = + let { + Worker.Solve_request.opam_repository_commits; + platforms; + root_pkgs; + pinned_pkgs; + lower_bound = _; + } = + request + in + Stores.fetch_commits t.stores opam_repository_commits; + let root_pkgs = List.map fst root_pkgs in + let pinned_pkgs = List.map fst pinned_pkgs in + let pins = + root_pkgs @ pinned_pkgs + |> List.map (fun pkg -> OpamPackage.name (OpamPackage.of_string pkg)) + |> OpamPackage.Name.Set.of_list + in + Log.info log "Solving for %a" Fmt.(list ~sep:comma string) root_pkgs; + platforms + |> Fiber.List.map (fun p -> + let id, vars = p in + let ocaml_version = + OpamPackage.Version.of_string vars.Worker.Vars.ocaml_version + in + let compatible_root_pkgs = + request.root_pkgs + |> List.filter (fun (_name, contents) -> + if String.equal "" contents then true + else + let opam = Git_context.opam_read_from_string_threadsafe contents in + let deps = OpamFile.OPAM.depends opam in + OpamFormula.eval (compatible_with ~ocaml_version) deps) + in + (* If some packages are compatible but some aren't, just solve for the compatible ones. + Otherwise, try to solve for everything to get a suitable error. *) + let root_pkgs = + if compatible_root_pkgs = [] then request.root_pkgs + else compatible_root_pkgs + in + let slice = { request with platforms = [ p ]; root_pkgs } in + match + Lwt_eio.run_lwt @@ fun () -> + Lwt_pool.use t.pool (fun worker -> + Lwt_eio.run_eio @@ fun () -> + process ~log ~id slice worker + ) + with + | Error _ as e -> (id, e) + | Ok packages -> + let repo_packages = + packages + |> List.filter_map (fun (pkg : OpamPackage.t) -> + if OpamPackage.Name.Set.mem pkg.name pins then None + else Some pkg) + in + (* Hack: ocaml-ci sometimes also installs odoc, but doesn't tell us about it. + Make sure we have at least odoc 2.1.1 available, otherwise it won't work on OCaml 5.0. *) + let repo_packages = + OpamPackage.of_string "odoc.2.1.1" :: repo_packages + in + let commits = Stores.oldest_commits_with t.stores repo_packages ~from:opam_repository_commits in + let compat_pkgs = List.map fst compatible_root_pkgs in + let packages = List.map OpamPackage.to_string packages in + (id, Ok { Worker.Selection.id; compat_pkgs; packages; commits })) + |> List.filter_map (fun (id, result) -> + Log.info log "= %s =" id; + match result with + | Ok result -> + Log.info log "-> @[%a@]" + Fmt.(list ~sep:sp string) + result.Selection.packages; + Log.info log "(valid since opam-repository commit(s): @[%a@])" + Fmt.(list ~sep:semi (pair ~sep:comma string string)) + result.Selection.commits; + Some result + | Error msg -> + Log.info log "%s" msg; + None) + +let solve t ~log request = + try Ok (solve t ~log request) + with + | Failure msg -> Error (`Msg msg) + | ex -> Fmt.error_msg "%a" Fmt.exn ex + +let create ~sw ~domain_mgr ~process_mgr ~n_workers = + let stores = Stores.create ~process_mgr in + let create_worker _commits = + Logs.info (fun f -> f "create_worker"); + try + let stream = Eio.Stream.create n_workers in + Fiber.fork ~sw (fun () -> + Eio.Domain_manager.run domain_mgr @@ fun () -> + Solver.main ~stores stream + ); + Logs.info (fun f -> f "create_worker done"); + stream + with ex -> + let bt = Printexc.get_raw_backtrace () in + Eio.traceln "service.ml:v: %a" Fmt.exn_backtrace (ex, bt); + raise ex + in + let pool = Lwt_pool.create n_workers (fun () -> Lwt_eio.run_eio create_worker) in + { + stores; + pool; + } + +let capnp_service t = + let open Capnp_rpc_lwt in + let module X = Solver_service_api.Raw.Service.Solver in + X.local + @@ object + inherit X.service + + method solve_impl params release_param_caps = + let open X.Solve in + let request = Params.request_get params in + let log = Params.log_get params in + release_param_caps (); + match log with + | None -> Service.fail "Missing log argument!" + | Some log -> + Capnp_rpc_lwt.Service.return_lwt @@ fun () -> + Capability.with_ref log @@ fun log -> + match + Worker.Solve_request.of_yojson + (Yojson.Safe.from_string request) + with + | Error msg -> + Lwt_result.fail + (`Capnp + (Capnp_rpc.Error.exn "Bad JSON in request: %s" msg)) + | Ok request -> + Lwt_eio.run_eio @@ fun () -> + let selections = solve t ~log request in + let json = + Yojson.Safe.to_string + (Worker.Solve_response.to_yojson selections) + in + let response, results = + Capnp_rpc_lwt.Service.Response.create Results.init_pointer + in + Results.response_set results json; + Ok response + end diff --git a/service/solver_service.mli b/service/solver_service.mli new file mode 100644 index 0000000..c031734 --- /dev/null +++ b/service/solver_service.mli @@ -0,0 +1,24 @@ +type t + +val create : + sw:Eio.Switch.t -> + domain_mgr:#Eio.Domain_manager.t -> + process_mgr:#Eio.Process.mgr -> + n_workers:int -> + t +(** [create ~sw ~domain_mgr ~process_mgr ~n_workers] is a solver service that + distributes work to up to [n_workers] domains. + + @param sw Holds the worker domains. + @param domain_mgr Used to spawn new domains. + @param process_mgr Used to run the "git" command. + @param n_workers Maximum number of worker domains. *) + +val solve : + t -> + log:Solver_service_api.Solver.Log.t -> + Solver_service_api.Worker.Solve_request.t -> + Solver_service_api.Worker.Solve_response.t + +val capnp_service : t -> Solver_service_api.Solver.t +(** [capnp_service t] is a Cap'n Proto service that handles requests using [t]. *) diff --git a/service/stores.ml b/service/stores.ml new file mode 100644 index 0000000..02aea8f --- /dev/null +++ b/service/stores.ml @@ -0,0 +1,57 @@ +open Eio.Std + +module Store = Git_unix.Store +module Store_map = Map.Make(String) + +let git_lock = Eio.Mutex.create () + +type t = { + stores_lock : Eio.Mutex.t; + mutable stores : Store.t Store_map.t; + process_mgr : Eio.Process.mgr; +} + +let oldest_commit = Eio.Semaphore.make 180 +(* we are using at most 360 pipes at the same time and that's enough to keep the current + * performance and prevent some jobs to fail because of file descriptors exceed the limit.*) + +let get t repo_url = + Eio.Mutex.use_rw ~protect:false t.stores_lock @@ fun () -> + match Store_map.find_opt repo_url t.stores with + | Some x -> x + | None -> + let store = Opam_repository.open_store ~process_mgr:t.process_mgr ~repo_url () in + t.stores <- Store_map.add repo_url store t.stores; + store + +let mem store hash = Lwt_eio.run_lwt (fun () -> Store.mem store hash) + +let update_opam_repository_to_commit t (repo_url, hash) = + let store = get t repo_url in + let hash = Store.Hash.of_hex hash in + if mem store hash then () + else ( + Fmt.pr "Need to update %s to get new commit %a@." repo_url Store.Hash.pp + hash; + Opam_repository.fetch ~process_mgr:t.process_mgr ~repo_url (); + if not (mem store hash) then + Fmt.failwith "Still missing commit after update!") + +let create ~process_mgr = + { + process_mgr = (process_mgr :> Eio.Process.mgr); + stores_lock = Eio.Mutex.create (); + stores = Store_map.empty; + } + +let oldest_commits_with t ~from repo_packages = + Eio.Semaphore.acquire oldest_commit; + Fun.protect ~finally:(fun () -> Eio.Semaphore.release oldest_commit) @@ fun () -> + Opam_repository.oldest_commits_with repo_packages + ~process_mgr:t.process_mgr + ~from + +let fetch_commits t commits = + Eio.Mutex.use_rw ~protect:true git_lock (fun () -> + Fiber.List.iter (update_opam_repository_to_commit t) commits + ) diff --git a/stress/dune b/stress/dune index 29a15fe..8ad6f08 100644 --- a/stress/dune +++ b/stress/dune @@ -1,7 +1,7 @@ ; No-op test to attach stress.exe to the solver-service package (tests - (names stress stress_submit) + (names stress) ; stress_submit) (package solver-worker) - (libraries solver-worker current_ocluster logs.cli logs.fmt fmt.cli) + (libraries solver-service current_ocluster logs.cli logs.fmt fmt.cli logs.threaded fmt.tty capnp-rpc-unix eio_main) (action (progn))) diff --git a/stress/utils.ml b/stress/utils.ml index c8691f2..9a7d9c2 100644 --- a/stress/utils.ml +++ b/stress/utils.ml @@ -1,5 +1,28 @@ +open Lwt.Infix open Lwt.Syntax +let pp_args = + let sep = Fmt.(const string) " " in + Fmt.(array ~sep (quote string)) + +let pp_cmd f = function + | "", args -> pp_args f args + | bin, args -> Fmt.pf f "(%S, %a)" bin pp_args args + +let pp_status f = function + | Unix.WEXITED x -> Fmt.pf f "exited with status %d" x + | Unix.WSIGNALED x -> Fmt.pf f "failed with signal %a" Fmt.Dump.signal x + | Unix.WSTOPPED x -> Fmt.pf f "stopped with signal %a" Fmt.Dump.signal x + +let check_status cmd = function + | Unix.WEXITED 0 -> () + | status -> Fmt.failwith "%a %a" pp_cmd cmd pp_status status + +let pread cmd = + Lwt_process.with_process_in cmd @@ fun proc -> + Lwt_io.read proc#stdout >>= fun output -> + proc#status >|= check_status cmd >|= fun () -> output + let opam_template arch = let arch = Option.value ~default:"%{arch}%" arch in Fmt.str @@ -17,7 +40,7 @@ let opam_template arch = let get_vars ~ocaml_package_name ~ocaml_version ?arch () = let+ vars = - Solver_service.Process.pread + pread ("", [| "opam"; "config"; "expand"; opam_template arch |]) in let json = @@ -35,11 +58,11 @@ let get_vars ~ocaml_package_name ~ocaml_version ?arch () = Result.get_ok @@ Solver_service_api.Worker.Vars.of_yojson json let get_opam_file pv = - Solver_service.Process.pread ("", [| "opam"; "show"; "--raw"; pv |]) + pread ("", [| "opam"; "show"; "--raw"; pv |]) let get_opam_packages () = let open Lwt.Infix in - Solver_service.Process.pread + pread ("", [| "opam"; "list"; "--short"; "--color=never" |]) >|= String.split_on_char '\n' >|= List.map String.trim diff --git a/test/dune b/test/dune deleted file mode 100644 index 02b5ed5..0000000 --- a/test/dune +++ /dev/null @@ -1,4 +0,0 @@ -(test - (name test) - (package solver-service) - (libraries alcotest-lwt solver-service)) diff --git a/test/mock_opam_repo.ml b/test/mock_opam_repo.ml deleted file mode 100644 index d2c1149..0000000 --- a/test/mock_opam_repo.ml +++ /dev/null @@ -1,64 +0,0 @@ -open Lwt.Syntax -module Log = Solver_service_api.Solver.Log -module P = Solver_service.Process - -let commits, set_commits = Lwt.wait () -let clone_path, set_clone_path = Lwt.wait () - -let get_sha clone_path = - let cmd = - "git" - :: "-C" - :: clone_path - :: "log" - :: "-n" - :: "1" - :: [ "--format=format:%H" ] - in - let cmd = ("", Array.of_list cmd) in - P.pread cmd - -let setup_store path = - match path with - | Error _ -> failwith "failed to create in-memory git store" - | Ok path -> ( - Lwt.wakeup set_clone_path path; - let* () = - P.exec ("git", [| "git"; "-C"; Fpath.to_string path; "init" |]) - in - let* () = - P.exec - ( "git", - [| - "git"; - "-C"; - Fpath.to_string path; - "commit"; - "-m"; - "'empty'"; - "--allow-empty"; - |] ) - in - let* store = Git_unix.Store.v path in - match store with - | Error err -> Fmt.failwith "%a" Git_unix.Store.pp_error err - | Ok store -> - let+ hash = get_sha (Fpath.to_string path) in - Lwt.wakeup set_commits [ ("github.com/ocaml/opam-repository", hash) ]; - store) - -let open_store ?repo_url:_ () = - let* clone_path in - let+ store = Git_unix.Store.v clone_path in - match store with - | Ok store -> store - | Error err -> Fmt.failwith "%a" Git_unix.Store.pp_error err - -let clone ?repo_url:_ () = Lwt.return () -let oldest_commits_with ~from:_ _pkgs = commits -let fetch ?repo_url:_ () = Lwt.return () -let close_store _ = Lwt.return () - -let with_store ?repo_url f = - let* store = open_store ~repo_url () in - f store diff --git a/test/test.ml b/test/test.ml deleted file mode 100644 index 7b0d9bd..0000000 --- a/test/test.ml +++ /dev/null @@ -1,7 +0,0 @@ -let () = - (* ignore user's git configuration *) - Unix.putenv "GIT_AUTHOR_NAME" "test"; - Unix.putenv "GIT_COMMITTER_NAME" "test"; - Unix.putenv "EMAIL" "test@example.com"; - Lwt_main.run - @@ Alcotest_lwt.run "solver-service" [ ("service", Test_service.tests) ] diff --git a/test/test_service.ml b/test/test_service.ml deleted file mode 100644 index f4eeb0c..0000000 --- a/test/test_service.ml +++ /dev/null @@ -1,159 +0,0 @@ -open Lwt.Syntax - -let job_log job = - let module L = Solver_service_api.Raw.Service.Log in - L.local - @@ object - inherit L.service - - method write_impl params release_param_caps = - let open L.Write in - release_param_caps (); - let msg = Params.msg_get params in - Buffer.add_string job msg; - Capnp_rpc_lwt.Service.(return (Response.create_empty ())) - end - -let const_response ~response : Lwt_process.process = - object - val std_out = - let time = 1000 in - let length = String.length response in - let output = Fmt.str "%i\n%d\n%s" time length response in - Lwt_io.of_bytes ~mode:Input (Lwt_bytes.of_string output) - - method pid = 10 - method rusage = Lwt.return Lwt_unix.{ ru_utime = 0.; ru_stime = 0. } - method state = Lwt_process.Running - method stdin = Lwt_io.null - method stdout = std_out - method terminate = () - method status = Lwt.return (Unix.WEXITED 0) - method close = Lwt.return (Unix.WEXITED 0) - method kill _i = () - end - -let create_proc ~response = - { - Solver_service.Internal_worker.Worker_process.process = - const_response ~response; - state = Available; - } - -module Service = Solver_service.Service.Make (Mock_opam_repo) - -let test_good_packages _sw () = - let proc = create_proc ~response:"+lwt.5.5.0 yaml.3.0.0" in - let log = Buffer.create 100 in - let req = - Solver_service_api.Worker.Solve_request. - { - opam_repository_commits = - [ - ( "github.com/ocaml/opam-repository", - "95d27ad970057f68179577594813dc1828324a2f" ); - ]; - root_pkgs = []; - pinned_pkgs = []; - platforms = []; - lower_bound = false; - } - in - let+ process = - let switch = Lwt_switch.create () in - Service.Epoch.process ~switch ~log:(job_log log) ~id:"unique-id" req proc - in - Alcotest.(check (result (list string) string)) - "Same packages" - (Ok [ "lwt.5.5.0"; "yaml.3.0.0" ]) - process - -let test_error _sw () = - let msg = "Something went wrong!" in - let proc = create_proc ~response:("-" ^ msg) in - let log = Buffer.create 100 in - let req = - Solver_service_api.Worker.Solve_request. - { - opam_repository_commits = - [ - ( "github.com/ocaml/opam-repository", - "95d27ad970057f68179577594813dc1828324a2f" ); - ]; - root_pkgs = []; - pinned_pkgs = []; - platforms = []; - lower_bound = false; - } - in - let+ process = - let switch = Lwt_switch.create () in - Service.Epoch.process ~switch ~log:(job_log log) ~id:"unique-id" req proc - in - Alcotest.(check (result (list string) string)) - "Same packages" (Error msg) process - -let solver_response = - Alcotest.of_pp (fun ppf t -> - Yojson.Safe.pp ppf (Solver_service_api.Worker.Solve_response.to_yojson t)) - -let test_e2e _sw () = - Lwt_io.with_temp_dir @@ fun dir -> - let* _store = Mock_opam_repo.setup_store (Ok (Fpath.v dir)) in - let* commits = Mock_opam_repo.commits in - let os_id = "testOS" in - let* vars = - Utils.get_vars ~ocaml_package_name:"ocaml" ~ocaml_version:"4.13.1" () - in - let create_worker _hash = create_proc ~response:"+lwt.5.5.0 yaml.3.0.0" in - let log = Buffer.create 100 in - let req = - Solver_service_api.Worker.Solve_request. - { - opam_repository_commits = commits; - root_pkgs = [ ("yaml.3.0.0", "") ]; - pinned_pkgs = []; - platforms = [ (os_id, vars) ]; - lower_bound = false; - } - in - let* service = Service.v ~n_workers:1 ~create_worker in - let* response = - Solver_service_api.Solver.solve ~log:(job_log log) service req - in - let req_lower_bound = { req with lower_bound = true } in - let+ response_lower_bound = - Solver_service_api.Solver.solve ~log:(job_log log) service req_lower_bound - in - Alcotest.(check solver_response) - "Same solve response" - (Ok - [ - { - id = os_id; - packages = [ "lwt.5.5.0"; "yaml.3.0.0" ]; - compat_pkgs = [ "yaml.3.0.0" ]; - commits; - }; - ]) - response; - Alcotest.(check solver_response) - "Same solve response lower bound" - (Ok - [ - { - id = os_id; - packages = [ "lwt.5.5.0"; "yaml.3.0.0" ]; - compat_pkgs = [ "yaml.3.0.0" ]; - commits; - }; - ]) - response_lower_bound - -let tests = - Alcotest_lwt. - [ - test_case "good-packages" `Quick test_good_packages; - test_case "error-handling" `Quick test_error; - test_case "end-to-end" `Quick test_e2e; - ] diff --git a/test/utils.ml b/test/utils.ml deleted file mode 100644 index 130bb18..0000000 --- a/test/utils.ml +++ /dev/null @@ -1,37 +0,0 @@ -open Lwt.Syntax - -let opam_template arch = - let arch = Option.value ~default:"%{arch}%" arch in - Fmt.str - {| - { - "arch": "%s", - "os": "%%{os}%%", - "os_family": "%%{os-family}%%", - "os_distribution": "%%{os-distribution}%%", - "os_version": "%%{os-version}%%", - "opam_version": "%%{opam-version}%%" - } -|} - arch - -let get_vars ~ocaml_package_name ~ocaml_version ?arch () = - let+ vars = - Solver_service.Process.pread - ("", [| "opam"; "config"; "expand"; opam_template arch |]) - in - let json = - match Yojson.Safe.from_string vars with - | `Assoc items -> - `Assoc - (("ocaml_package", `String ocaml_package_name) - :: ("ocaml_version", `String ocaml_version) - :: items) - | json -> - Fmt.failwith "Unexpected JSON: %a" - Yojson.Safe.(pretty_print ~std:true) - json - in - match Solver_service_api.Worker.Vars.of_yojson json with - | Ok x -> x - | Error m -> failwith m diff --git a/worker/dune b/worker/dune index 3668218..5a04841 100644 --- a/worker/dune +++ b/worker/dune @@ -7,11 +7,15 @@ (executable (name main) (libraries + eio_main + lwt_eio dune-build-info logs.cli + logs.threaded fmt.cli fmt.tty prometheus-app.unix + capnp-rpc-unix solver-worker) (modules main worker)) diff --git a/worker/main.ml b/worker/main.ml index c5b914e..e2033d9 100644 --- a/worker/main.ml +++ b/worker/main.ml @@ -1,23 +1,29 @@ +open Eio.Std + let setup_log style_renderer level = Fmt_tty.setup_std_outputs ?style_renderer (); Logs.set_level level; + Logs_threaded.enable (); Prometheus_unix.Logging.init ?default_level:level (); Logs.set_reporter (Logs_fmt.reporter ()); () let or_die = function Ok x -> x | Error (`Msg m) -> failwith m -let build ~solver ~switch ~log ~src:_ ~secrets:_ c = - Solver_worker.solve ~solver ~switch ~log c +let build ~solver ~switch:_ ~log ~src:_ ~secrets:_ c = + Lwt_eio.run_eio @@ fun () -> + Ok (Solver_worker.solve ~solver ~log c) let main () registration_path capacity internal_workers name state_dir = - Lwt_main.run - (let vat = Capnp_rpc_unix.client_only_vat () in - let sr = Capnp_rpc_unix.Cap_file.load vat registration_path |> or_die in - let solver = - Solver_worker.Solver_request.create ~n_workers:internal_workers () - in - Worker.run ~build:(build ~solver) ~capacity ~name ~state_dir sr) + Eio_main.run @@ fun env -> + Lwt_eio.with_event_loop ~clock:env#clock @@ fun () -> + let domain_mgr = env#domain_mgr in + let process_mgr = env#process_mgr in + let vat = Capnp_rpc_unix.client_only_vat () in + let sr = Capnp_rpc_unix.Cap_file.load vat registration_path |> or_die in + Switch.run @@ fun sw -> + let solver = Solver_service.create ~sw ~domain_mgr ~process_mgr ~n_workers:internal_workers in + Worker.run ~build:(build ~solver) ~capacity ~name ~state_dir sr open Cmdliner diff --git a/worker/solver_worker.ml b/worker/solver_worker.ml index e1ad059..59f10a4 100644 --- a/worker/solver_worker.ml +++ b/worker/solver_worker.ml @@ -1,50 +1,10 @@ (* Workers that can also solve opam jobs *) -open Lwt.Syntax module Log_data = Log_data module Context = Context module Log = Log module Process = Process -module Solver_request = struct - module Worker = Solver_service_api.Worker - module Log = Solver_service_api.Solver.Log - module Service = Solver_service.Service.Make (Solver_service.Opam_repository) - module Worker_process = Solver_service.Internal_worker.Worker_process - - type t = - ( Service.Epoch.t, - Solver_service.Remote_commit.t list ) - Solver_service.Epoch_lock.t - - let create ~n_workers () = - let create_worker commits = - let cmd = - ( "", - [| - "solver-service"; - "--worker"; - Solver_service.Remote_commit.list_to_string commits; - |] ) - in - Worker_process.create cmd - in - let create commits = - Service.Epoch.create ~n_workers ~create_worker commits - in - Solver_service.Epoch_lock.v ~create ~dispose:Service.Epoch.dispose () - - let solve t ~switch ~log ~request = - Lwt.catch - (fun () -> - let+ request = Service.handle ~switch t ~log request in - Result.ok request) - (function - | Failure msg -> Lwt_result.fail (`Msg msg) - | Lwt.Canceled -> Lwt_result.fail `Cancelled - | ex -> Lwt.return (Fmt.error_msg "%a" Fmt.exn ex)) -end - let solve_to_custom req = let open Cluster_api.Raw in let params = @@ -82,21 +42,20 @@ let cluster_worker_log log = Capnp_rpc_lwt.Service.(return (Response.create_empty ())) end -let solve ~solver ~switch ~log c = +let solve ~solver ~log c = match solve_of_custom c with | Error m -> failwith m - | Ok request -> ( - let log = cluster_worker_log log in - let+ selections = - Capnp_rpc_lwt.Capability.with_ref log @@ fun log -> - Solver_request.solve solver ~switch ~log ~request - in - match selections with - | Ok _ -> - let response = - Yojson.Safe.to_string - @@ Solver_service_api.Worker.Solve_response.to_yojson selections - in - Solver_service_api.Solver.Log.info log "%s" response; - Ok response - | Error _ as error -> error) + | Ok request -> + let log = cluster_worker_log log in + let selections = + Lwt_eio.run_lwt @@ fun () -> + Capnp_rpc_lwt.Capability.with_ref log @@ fun log -> + Lwt_eio.run_eio @@ fun () -> + Solver_service.solve solver ~log request + in + let response = + Yojson.Safe.to_string + @@ Solver_service_api.Worker.Solve_response.to_yojson selections + in + Solver_service_api.Solver.Log.info log "%s" response; + response diff --git a/worker/solver_worker.mli b/worker/solver_worker.mli index 4bd98c4..3d08145 100644 --- a/worker/solver_worker.mli +++ b/worker/solver_worker.mli @@ -6,25 +6,6 @@ module Context = Context module Log = Log module Process = Process -module Solver_request : sig - type t - (* A pool of subprocesses as internal-workers used for handling solver request *) - - val create : n_workers:int -> unit -> t - (** [create ~n_workers ()] will create a pool of [n_workers] subprocesses as - internal-workers *) - - val solve : - t -> - switch:Lwt_switch.t -> - log:Solver_service_api.Solver.Log.X.t Capnp_rpc_lwt.Capability.t -> - request:Solver_service_api.Worker.Solve_request.t -> - Solver_service_api.Worker.Solve_response.t Lwt.t - (**[solve t ~switch ~log ~request] will solve the [request] using the pool - [t], the [request] will be distributed among the internal-workers of the - pool.*) -end - val solve_to_custom : Solver_service_api.Worker.Solve_request.t -> Cluster_api.Custom.payload (** [solve_to_custom req] converts the solver request to a custom job @@ -37,10 +18,9 @@ val solve_of_custom : request. *) val solve : - solver:Solver_request.t -> - switch:Lwt_switch.t -> + solver:Solver_service.t -> log:Log_data.t -> Solver_service_api.Raw.Reader.pointer_t Cluster_api.Custom.t -> - (string, [ `Cancelled | `Msg of string ]) Lwt_result.t + string (** [handle ~solver ~switch ~log c] interprets [c] as a solver request and solves it using [solver]. *) diff --git a/worker/worker.ml b/worker/worker.ml index 67bb971..65a2fb8 100644 --- a/worker/worker.ml +++ b/worker/worker.ml @@ -203,15 +203,7 @@ let loop ~switch t queue = admin has updated the local package version. *) let update_normal () = Lwt.return (fun () -> Lwt.return ()) -let run ?switch ?prune_threshold ~build ~capacity ~name ~state_dir - registration_service = - (match prune_threshold with - | None -> - Log.info (fun f -> - f "Prune threshold not set. Will not check for low disk-space!") - | Some frac when frac < 0.0 || frac > 100.0 -> - Fmt.invalid_arg "prune_threshold must be in the range 0 to 100" - | Some _ -> ()); +let run ?switch ~build ~capacity ~name ~state_dir registration_service = let t = { name; @@ -224,6 +216,7 @@ let run ?switch ?prune_threshold ~build ~capacity ~name ~state_dir cancel = ignore; } in + Lwt_eio.run_lwt @@ fun () -> Lwt_switch.add_hook_or_exec switch (fun () -> Log.info (fun f -> f "Switch turned off. Will shut down."); t.cancel ();