Skip to content

Commit

Permalink
Initial Eio port
Browse files Browse the repository at this point in the history
This switches capnp-rpc from Lwt to Eio. One particularly nice side
effect of this is that `Service.return_lwt` has gone, as there is no
distinction now between concurrent and non-concurrent service methods.
  • Loading branch information
talex5 committed Nov 9, 2024
1 parent b62d3ac commit c943b1f
Show file tree
Hide file tree
Showing 81 changed files with 1,488 additions and 1,680 deletions.
291 changes: 151 additions & 140 deletions README.md

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions capnp-rpc-net.opam
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ depends: [
"logs"
"asetmap"
"cstruct" {>= "6.0.0"}
"mirage-flow" {>= "4.0.2"}
"tls" {>= "1.0.2"}
"tls-eio" {>= "1.0.2"}
"base64" {>= "3.0.0"}
"uri" {>= "1.6.0"}
"ptime"
Expand Down
5 changes: 2 additions & 3 deletions capnp-rpc-net/auth.mli
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ module Secret_key : sig

val generate : unit -> t
(** [generate ()] is a fresh secret key.
You must call the relevant entropy initialization function
(e.g. {!Mirage_crypto_rng_lwt.initialize}) before using this, or it
will raise an error if you forget. *)
You must use e.g. {!Mirage_crypto_rng_eio.run} to set a source of
randomness before using this (it will raise an error if you forget). *)

val digest : ?hash:hash -> t -> Digest.t
(** [digest ~hash t] is the digest of [t]'s public key, using [hash]. *)
Expand Down
100 changes: 49 additions & 51 deletions capnp-rpc-net/capTP_capnp.ml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
open Lwt.Infix
open Eio.Std

module Metrics = struct
open Prometheus
Expand Down Expand Up @@ -42,6 +42,7 @@ module Make (Network : S.NETWORK) = struct
module Serialise = Serialise.Make(Endpoint_types)

type t = {
sw : Switch.t;
endpoint : Endpoint.t;
conn : Conn.t;
xmit_queue : Capnp.Message.rw Capnp.BytesMessage.Message.t Queue.t;
Expand All @@ -50,16 +51,6 @@ module Make (Network : S.NETWORK) = struct

let bootstrap t id = Conn.bootstrap t.conn id |> Capnp_rpc.Cast.cap_of_raw

let async_tagged label fn =
Lwt.async
(fun () ->
Lwt.catch fn
(fun ex ->
Log.warn (fun f -> f "Uncaught async exception in %S: %a" label Fmt.exn ex);
Lwt.return_unit
)
)

let pp_msg f call =
let open Reader in
let call = Capnp_rpc.Private.Msg.Request.readable call in
Expand All @@ -75,30 +66,32 @@ module Make (Network : S.NETWORK) = struct

(* [flush ~xmit_queue endpoint] writes each message in the queue until it is empty.
Invariant:
Whenever Lwt blocks or switches threads, a flush thread is running iff the
Whenever Eio blocks or switches threads, a flush thread is running iff the
queue is non-empty. *)
let rec flush ~xmit_queue endpoint =
(* We keep the item on the queue until it is transmitted, as the queue state
tells us whether there is a [flush] currently running. *)
let next = Queue.peek xmit_queue in
Endpoint.send endpoint next >>= function
match Endpoint.send endpoint next with
| Error `Closed ->
Endpoint.disconnect endpoint >|= fun () -> (* We'll read a close soon *)
Endpoint.disconnect endpoint; (* We'll read a close soon *)
drop_queue xmit_queue
| Error e ->
Log.warn (fun f -> f "Error sending messages: %a (will shutdown connection)" Endpoint.pp_error e);
Endpoint.disconnect endpoint >|= fun () ->
| Error (`Msg msg) ->
Log.warn (fun f -> f "Error sending messages: %s (will shutdown connection)" msg);
Endpoint.disconnect endpoint;
drop_queue xmit_queue
| Ok () ->
Prometheus.Counter.inc_one Metrics.messages_outbound_sent_total;
ignore (Queue.pop xmit_queue);
if not (Queue.is_empty xmit_queue) then
flush ~xmit_queue endpoint
else (* queue is empty and flush thread is done *)
Lwt.return_unit
(* else queue is empty and flush thread is done *)
| exception ex ->
drop_queue xmit_queue;
raise ex

(* Enqueue [message] in [xmit_queue] and ensure the flush thread is running. *)
let queue_send ~xmit_queue endpoint message =
let queue_send ~sw ~xmit_queue endpoint message =
Log.debug (fun f ->
let module M = Capnp_rpc.Private.Schema.MessageWrapper.Message in
f "queue_send: %d/%d allocated bytes in %d segs"
Expand All @@ -108,19 +101,19 @@ module Make (Network : S.NETWORK) = struct
let was_idle = Queue.is_empty xmit_queue in
Queue.add message xmit_queue;
Prometheus.Counter.inc_one Metrics.messages_outbound_enqueued_total;
if was_idle then async_tagged "Message sender thread" (fun () -> flush ~xmit_queue endpoint)
if was_idle then Eio.Fiber.fork ~sw (fun () -> flush ~xmit_queue endpoint)

let return_not_implemented t x =
Log.debug (fun f -> f ~tags:(tags t) "Returning Unimplemented");
let open Builder in
let m = Message.init_root () in
let _ : Builder.Message.t = Message.unimplemented_set_reader m x in
queue_send ~xmit_queue:t.xmit_queue t.endpoint (Message.to_message m)
queue_send ~sw:t.sw ~xmit_queue:t.xmit_queue t.endpoint (Message.to_message m)

let listen t =
let rec loop () =
Endpoint.recv t.endpoint >>= function
| Error e -> Lwt.return e
match Endpoint.recv t.endpoint with
| Error e -> e
| Ok msg ->
let open Reader.Message in
let msg = of_message msg in
Expand All @@ -134,8 +127,8 @@ module Make (Network : S.NETWORK) = struct
| `Abort _ ->
t.disconnecting <- true;
Conn.handle_msg t.conn msg;
Endpoint.disconnect t.endpoint >>= fun () ->
Lwt.return `Aborted
Endpoint.disconnect t.endpoint;
`Aborted
| _ ->
Conn.handle_msg t.conn msg;
loop ()
Expand All @@ -153,48 +146,53 @@ module Make (Network : S.NETWORK) = struct
in
loop ()

let send_abort t ex =
queue_send ~sw:t.sw ~xmit_queue:t.xmit_queue t.endpoint (Serialise.message (`Abort ex))

let disconnect t ex =
if not t.disconnecting then (
t.disconnecting <- true;
queue_send ~xmit_queue:t.xmit_queue t.endpoint (Serialise.message (`Abort ex));
Endpoint.disconnect t.endpoint >|= fun () ->
send_abort t ex;
Endpoint.disconnect t.endpoint;
Conn.disconnect t.conn ex
) else (
Lwt.return_unit
)

let disconnecting t = t.disconnecting

let connect ~restore ?(tags=Logs.Tag.empty) endpoint =
let connect ~sw ~restore ?(tags=Logs.Tag.empty) endpoint =
let xmit_queue = Queue.create () in
let queue_send msg = queue_send ~xmit_queue endpoint (Serialise.message msg) in
let queue_send msg = queue_send ~sw ~xmit_queue endpoint (Serialise.message msg) in
let restore = Restorer.fn restore in
let conn = Conn.create ~restore ~tags ~queue_send in
let t = {
let fork = Fiber.fork ~sw in
let conn = Conn.create ~restore ~tags ~fork ~queue_send in
{
sw;
conn;
endpoint;
xmit_queue;
disconnecting = false;
} in
}

let listen t =
Prometheus.Gauge.inc_one Metrics.connections;
Lwt.async (fun () ->
Lwt.catch
(fun () ->
listen t >|= fun (`Closed | `Aborted) -> ()
)
(fun ex ->
Log.warn (fun f ->
f ~tags "Uncaught exception handling CapTP connection: %a (dropping connection)" Fmt.exn ex
);
queue_send @@ `Abort (Capnp_rpc.Exception.v ~ty:`Failed (Printexc.to_string ex));
Lwt.return_unit
)
>>= fun () ->
Log.info (fun f -> f ~tags "Connection closed");
Prometheus.Gauge.dec_one Metrics.connections;
let tags = Conn.tags t.conn in
begin
match listen t with
| `Closed | `Aborted -> ()
| exception Eio.Cancel.Cancelled ex ->
Log.debug (fun f -> f ~tags "Cancelled: %a" Fmt.exn ex)
| exception ex ->
Log.warn (fun f ->
f ~tags "Uncaught exception handling CapTP connection: %a (dropping connection)" Fmt.exn ex
);
send_abort t (Capnp_rpc.Exception.v ~ty:`Failed (Printexc.to_string ex))
end;
Log.info (fun f -> f ~tags "Connection closed");
Prometheus.Gauge.dec_one Metrics.connections;
Eio.Cancel.protect (fun () ->
disconnect t (Capnp_rpc.Exception.v ~ty:`Disconnected "Connection closed")
);
t
Fiber.check ()

let dump f t = Conn.dump f t.conn
end
13 changes: 9 additions & 4 deletions capnp-rpc-net/capTP_capnp.mli
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,22 @@ module Make : S.NETWORK -> sig
type t
(** A Cap'n Proto RPC protocol handler. *)

val connect : restore:Restorer.t -> ?tags:Logs.Tag.set -> Endpoint.t -> t
(** [connect ~restore ~switch endpoint] is fresh CapTP protocol handler that sends and
val connect : sw:Eio.Switch.t -> restore:Restorer.t -> ?tags:Logs.Tag.set -> Endpoint.t -> t
(** [connect ~sw ~restore ~switch endpoint] is fresh CapTP protocol handler that sends and
receives messages using [endpoint].
[restore] is used to respond to "Bootstrap" messages.
If the connection fails then [endpoint] will be disconnected. *)
If the connection fails then [endpoint] will be disconnected.
You must call {!listen} to run the loop handling messages.
@param sw Used to run methods and to run the transmit thread. *)

val listen : t -> unit
(** [listen t] reads and handles incoming messages until the connection is finished. *)

val bootstrap : t -> string -> 'a Capnp_rpc.Capability.t
(** [bootstrap t object_id] is the peer's bootstrap object [object_id], if any.
Use [object_id = ""] for the main, public object. *)

val disconnect : t -> Capnp_rpc.Exception.t -> unit Lwt.t
val disconnect : t -> Capnp_rpc.Exception.t -> unit
(** [disconnect t reason] releases all resources used by the connection. *)

val disconnecting : t -> bool
Expand Down
6 changes: 2 additions & 4 deletions capnp-rpc-net/capnp_rpc_net.ml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@ module type VAT_NETWORK = S.VAT_NETWORK with
type service_id := Restorer.Id.t and
type 'a sturdy_ref := 'a Sturdy_ref.t

module Networking (N : S.NETWORK) (F : Mirage_flow.S) = struct
type flow = F.flow

module Networking (N : S.NETWORK) = struct
module Network = N
module Vat = Vat.Make (N) (F)
module Vat = Vat.Make (N)
module CapTP = Vat.CapTP
end

Expand Down
24 changes: 12 additions & 12 deletions capnp-rpc-net/capnp_rpc_net.mli
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(** This package adds networking support, including TLS. It contains code common
to capnp-rpc-unix and capnp-rpc-mirage. Libraries should not need to link against
this package (just use capnp-rpc-lwt instead), since they generally shouldn't
care whether services are local or remote. *)
(** This package adds networking support, including TLS.
Libraries should not need to link against this package (just use capnp-rpc
instead), since they generally shouldn't care whether services are local or
remote. *)

open Capnp_rpc.Std

Expand Down Expand Up @@ -90,7 +90,7 @@ module Restorer : sig
(** [make_sturdy t id] converts an ID to a full URI, by adding the
hosting vat's address and fingerprint. *)

val load : t -> 'a Sturdy_ref.t -> string -> resolution Lwt.t
val load : t -> 'a Sturdy_ref.t -> string -> resolution
(** [load t sr digest] is called to restore the service with key [digest].
[sr] is a sturdy ref that refers to the service, which the service
might want to hand out to clients.
Expand All @@ -109,9 +109,10 @@ module Restorer : sig
[make_sturdy id] converts an ID to a full URI, by adding the
hosting vat's address and fingerprint. *)

val of_loader : (module LOADER with type t = 'loader) -> 'loader -> t
(** [of_loader (module Loader) l] is a new caching table that uses
[Loader.load l sr (Loader.hash id)] to restore services that aren't in the cache. *)
val of_loader : sw:Eio.Switch.t -> (module LOADER with type t = 'loader) -> 'loader -> t
(** [of_loader ~sw (module Loader) l] is a new caching table that uses
[Loader.load l sr (Loader.hash id)] to restore services that aren't in the cache.
The load function runs in a new fiber in [sw]. *)

val add : t -> Id.t -> 'a Capability.t -> unit
(** [add t id cap] adds a mapping to [t].
Expand All @@ -130,7 +131,7 @@ module Restorer : sig

val of_table : Table.t -> t

val restore : t -> Id.t -> ('a Capability.t, Capnp_rpc.Exception.t) result Lwt.t
val restore : t -> Id.t -> ('a Capability.t, Capnp_rpc.Exception.t) result
(** [restore t id] restores [id] using [t].
You don't normally need to call this directly, as the Vat will do it automatically. *)
end
Expand All @@ -141,8 +142,7 @@ module type VAT_NETWORK = S.VAT_NETWORK with
type service_id := Restorer.Id.t and
type 'a sturdy_ref := 'a Sturdy_ref.t

module Networking (N : S.NETWORK) (Flow : Mirage_flow.S) : VAT_NETWORK with
module Network = N and
type flow = Flow.flow
module Networking (N : S.NETWORK) : VAT_NETWORK with
module Network = N

module Capnp_address = Capnp_address
4 changes: 2 additions & 2 deletions capnp-rpc-net/dune
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
(library
(name capnp_rpc_net)
(public_name capnp-rpc-net)
(libraries astring capnp capnp-rpc fmt logs mirage-flow mirage-crypto mirage-crypto-rng
tls-mirage base64 uri ptime prometheus))
(libraries astring capnp capnp-rpc fmt logs mirage-crypto-rng
tls-eio base64 uri ptime prometheus))
Loading

0 comments on commit c943b1f

Please sign in to comment.