Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: ocaml/dune
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 3c2d8bb8592d7c5b0fe1651f7ca5ab7bbcc099f9
Choose a base ref
..
head repository: ocaml/dune
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: c2865cdb9681de01907a689ac9d18cda12c02167
Choose a head ref
Showing with 59 additions and 36 deletions.
  1. +57 −32 src/dune_rpc_server/dune_rpc_server.ml
  2. +2 −4 src/dune_rpc_server/dune_rpc_server.mli
89 changes: 57 additions & 32 deletions src/dune_rpc_server/dune_rpc_server.ml
Original file line number Diff line number Diff line change
@@ -33,12 +33,36 @@ end)
module Session = struct
module Id = Session_id

module Close = struct
type t =
{ ivar : unit Fiber.Ivar.t
; mutable state : [ `Open | `Closed ]
}

let create () = { ivar = Fiber.Ivar.create (); state = `Open }

let to_dyn { state; ivar = _ } =
let name =
match state with
| `Open -> "Open"
| `Closed -> "Closed"
in
Dyn.variant name []

let close t =
match t.state with
| `Closed -> Fiber.return ()
| `Open ->
t.state <- `Closed;
Fiber.Ivar.fill t.ivar ()
end

type 'a state =
| Uninitialized
| Uninitialized of Close.t
| Initialized of
{ init : Initialize.Request.t
; state : 'a
; closed : bool
; close : Close.t
}

module Stage1 = struct
@@ -54,27 +78,23 @@ module Session = struct
let set t state =
match t.state with
| Initialized s -> t.state <- Initialized { s with state }
| Uninitialized -> Code_error.raise "set: state not available" []
| Uninitialized _ -> Code_error.raise "set: state not available" []

let get t =
match t.state with
| Initialized s -> s.state
| Uninitialized -> Code_error.raise "get: state not available" []

let active t =
match t.state with
| Uninitialized -> true
| Initialized s -> s.closed
| Uninitialized _ -> Code_error.raise "get: state not available" []

let initialize t =
match t.state with
| Initialized s -> s.init
| Uninitialized -> Code_error.raise "initialize: request not available" []
| Uninitialized _ ->
Code_error.raise "initialize: request not available" []

let create ~queries ~send =
{ queries
; send
; state = Uninitialized
; state = Uninitialized (Close.create ())
; id = Id.gen ()
; on_upgrade = None
; pool = Fiber.Pool.create ()
@@ -84,14 +104,7 @@ module Session = struct

let close t =
match t.state with
| Uninitialized -> assert false
| Initialized s -> t.state <- Initialized { s with closed = true }

let closed t =
match t.state with
| Uninitialized ->
Code_error.raise "closed: called on uninitialized session" []
| Initialized { closed; _ } -> closed
| Uninitialized c | Initialized { close = c; _ } -> Close.close c

let id t = t.id

@@ -102,13 +115,13 @@ module Session = struct
let dyn_of_state f =
let open Dyn in
function
| Uninitialized -> variant "Uninitialized" []
| Initialized { init; state; closed } ->
| Uninitialized close -> variant "Uninitialized" [ Close.to_dyn close ]
| Initialized { init; state; close } ->
let record =
record
[ ("init", opaque init)
; ("state", f state)
; ("closed", bool closed)
; ("close", Close.to_dyn close)
]
in
variant "Initialized" [ record ]
@@ -131,15 +144,16 @@ module Session = struct

let set t = Stage1.set t.base

let active t = Stage1.active t.base

let initialize t = Stage1.initialize t.base

let close t = Stage1.close t.base

let request_close t = Stage1.request_close t.base
let closed t =
match t.base.state with
| Uninitialized close | Initialized { close; _ } ->
Fiber.Ivar.read close.ivar

let closed t = Stage1.closed t.base
let request_close t = Stage1.request_close t.base

let compare x y = Stage1.compare x.base y.base

@@ -318,8 +332,14 @@ module H = struct
Event.emit
(Message { kind; meth_; stage = Stop })
stats (Session.id session);
if Session.closed session then Fiber.return ()
else Session.send session (Some [ Response (id, response) ])
match
(match session.base.state with
| Initialized { close; _ } -> close
| Uninitialized close -> close)
.state
with
| `Closed -> Fiber.return ()
| `Open -> Session.send session (Some [ Response (id, response) ])

let run_session (type a) (t : a t) stats (session : a Session.t) =
let open Fiber.O in
@@ -337,7 +357,7 @@ module H = struct
~f:(dispatch_request t stats session r.method_ r id))
in
let* () = Session.request_close session in
let+ () = t.base.on_terminate session in
let* () = t.base.on_terminate session in
Session.close session

let negotiate_version (type a) (t : a stage1) stats
@@ -381,6 +401,13 @@ module H = struct

let handle (type a) (t : a stage1) stats (session : a Session.Stage1.t) =
let open Fiber.O in
let* () = Fiber.return () in
let close =
match session.state with
| Uninitialized c -> c
| Initialized _ -> assert false
in
Fiber.finalize ~finally:(fun () -> Session.Close.close close) @@ fun () ->
let* query = Fiber.Stream.In.read session.queries in
match query with
| None -> session.send None
@@ -401,9 +428,7 @@ module H = struct
~message:"The server and client use incompatible protocols."
else
let* a = t.base.on_init session init in
let () =
session.state <- Initialized { init; state = a; closed = false }
in
let () = session.state <- Initialized { init; state = a; close } in
let* () =
let response =
Ok
6 changes: 2 additions & 4 deletions src/dune_rpc_server/dune_rpc_server.mli
Original file line number Diff line number Diff line change
@@ -30,8 +30,6 @@ module Session : sig
(** [get session a] sets the current state to [a].*)
val set : 'a t -> 'a -> unit

val active : _ t -> bool

(** [notification session n a] Send notification [a] defined by [n] to
[session] *)
val notification : _ t -> 'a Decl.Notification.witness -> 'a -> unit Fiber.t
@@ -44,6 +42,8 @@ module Session : sig

val has_poller : _ t -> Poller.t -> bool

val closed : _ t -> unit Fiber.t

(** A ['a Session.Stage1.t] represents a session prior to version negotiation.
Used during initialization. *)
@@ -58,8 +58,6 @@ module Session : sig

val set : 'a t -> 'a -> unit

val active : _ t -> bool

val compare : 'a t -> 'a t -> Ordering.t

val request_close : 'a t -> unit Fiber.t