Skip to content

Commit

Permalink
Merge pull request #292 from talex5/cap-run
Browse files Browse the repository at this point in the history
Replace CapTP.listen with CapTP.run
  • Loading branch information
talex5 authored Nov 20, 2024
2 parents 8c273a3 + 2780d95 commit ff70ce4
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 31 deletions.
10 changes: 8 additions & 2 deletions capnp-rpc-net/capTP_capnp.ml
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ module Make (Network : S.NETWORK) = struct
listen t

let send_abort t ex =
Endpoint.send t.endpoint (Serialise.message (`Abort ex));
Endpoint.flush t.endpoint (* We're probably about to disconnect *)
Endpoint.send t.endpoint (Serialise.message (`Abort ex))

let disconnect t ex =
if not t.disconnecting then (
Expand Down Expand Up @@ -113,5 +112,12 @@ module Make (Network : S.NETWORK) = struct
);
Fiber.check ()

let run t =
(* When [run_writer] finishes it shuts down the socket, causing [listen] to read end-of-stream.
When [listen] finishes, calling [shutdown_send] causes [run_writer] to return. *)
Fiber.both
(fun () -> Endpoint.run_writer ~tags:(tags t) t.endpoint)
(fun () -> listen t; Endpoint.shutdown_send t.endpoint)

let dump f t = Conn.dump f t.conn
end
4 changes: 2 additions & 2 deletions capnp-rpc-net/capTP_capnp.mli
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ module Make : S.NETWORK -> sig
You must call {!listen} to run the loop handling messages.
@param sw Used to run methods and to run the transmit thread. *)

val listen : t -> unit
(** [listen t] reads and handles incoming messages until the connection is finished. *)
val run : t -> unit
(** [run t] reads and handles incoming messages until the connection is finished. *)

val bootstrap : t -> string -> 'a Capnp_rpc.Capability.t
(** [bootstrap t object_id] is the peer's bootstrap object [object_id], if any.
Expand Down
8 changes: 2 additions & 6 deletions capnp-rpc-net/endpoint.ml
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,9 @@ let disconnect t =
(* TCP connection already shut down, so TLS shutdown failed. Ignore. *)
()

let flush t =
let shutdown_send t =
Write.unpause t.writer;
(* Give the writer a chance to send the last of the data.
We could use [Write.flush] to be sure the data got sent, but this code is
only used to send aborts, which isn't very important and it's probably
better to drop the buffered messages if one yield isn't enough. *)
Fiber.yield ()
Write.close t.writer

let rec run_writer ~tags t =
let bufs = Write.await_batch t.writer in
Expand Down
6 changes: 3 additions & 3 deletions capnp-rpc-net/endpoint.mli
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ val peer_id : t -> Auth.Digest.t
(** [peer_id t] is the fingerprint of the peer's public key,
or [Auth.Digest.insecure] if TLS isn't being used. *)

val flush : t -> unit
(** [flush t] is useful to try to send any buffered data before disconnecting.
Otherwise, the final abort message is likely to get lost. *)
val shutdown_send : t -> unit
(** [shutdown_send t] closes the writer, causing [run_writer] to return once
all buffered data has been written. *)

val disconnect : t -> unit
(** [disconnect t] shuts down the underlying flow. *)
6 changes: 3 additions & 3 deletions capnp-rpc-net/s.ml
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ module type VAT_NETWORK = sig
receives messages using [endpoint].
[restore] is used to respond to "Bootstrap" messages.
If the connection fails then [endpoint] will be disconnected.
You must call {!listen} to run the loop handling messages.
You must call {!run} to run the loop handling messages.
@param sw Used to run methods and to run the transmit thread. *)

val listen : t -> unit
(** [listen t] reads and handles incoming messages until the connection is finished. *)
val run : t -> unit
(** [run t] reads and handles incoming messages until the connection is finished. *)

val bootstrap : t -> service_id -> 'a capability
(** [bootstrap t object_id] is the peer's bootstrap object [object_id], if any.
Expand Down
6 changes: 1 addition & 5 deletions capnp-rpc-net/vat.ml
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,7 @@ module Make (Network : S.NETWORK) = struct
let run_connection_generic t ~add ~remove endpoint =
let conn = CapTP.connect ~sw:t.sw ~tags:t.tags ~restore:t.restore endpoint in
add conn;
Fun.protect (fun () ->
Fiber.both
(fun () -> Endpoint.run_writer ~tags:t.tags endpoint)
(fun () -> CapTP.listen conn)
)
Fun.protect (fun () -> CapTP.run conn)
~finally:(fun () ->
remove conn;
Eio.Condition.broadcast t.connection_removed
Expand Down
14 changes: 6 additions & 8 deletions test-bin/calc_direct.ml
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,12 @@ module Logging = struct
Logs.set_reporter (reporter id)
end

let run_connection conn endpoint =
Fiber.all [
let run_connection conn =
Fiber.both
(* Normally the vat runs a leak handler to free resources that get GC'd
with a non-zero reference count. We're not using a vat, so run it ourselves. *)
Capnp_rpc.Leak_handler.run;
(fun () -> Capnp_rpc_unix.CapTP.listen conn);
(fun () -> Capnp_rpc_net.Endpoint.run_writer ~tags:Logs.Tag.empty endpoint);
]
Capnp_rpc.Leak_handler.run
(fun () -> Capnp_rpc_unix.CapTP.run conn)

module Parent = struct
let run socket =
Expand All @@ -46,7 +44,7 @@ module Parent = struct
let p = Capnp_rpc_net.Endpoint.of_flow socket ~peer_id:Capnp_rpc_net.Auth.Digest.insecure in
Logs.info (fun f -> f "Connecting to child process...");
let conn = Capnp_rpc_unix.CapTP.connect ~sw ~restore:Capnp_rpc_net.Restorer.none p in
Fiber.fork_daemon ~sw (fun () -> run_connection conn p; `Stop_daemon);
Fiber.fork_daemon ~sw (fun () -> run_connection conn; `Stop_daemon);
(* Get the child's service object: *)
let calc = Capnp_rpc_unix.CapTP.bootstrap conn service_name in
(* Use the service: *)
Expand All @@ -69,7 +67,7 @@ module Child = struct
let endpoint = Capnp_rpc_net.Endpoint.of_flow socket ~peer_id:Capnp_rpc_net.Auth.Digest.insecure in
let conn = Capnp_rpc_unix.CapTP.connect ~sw ~restore endpoint in
Logs.info (fun f -> f "Serving requests...");
run_connection conn endpoint
run_connection conn
end

let () =
Expand Down
4 changes: 2 additions & 2 deletions test/test.ml
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ let test_crossed_calls ~net =
Capability.dec_ref to_server

(* Run test_crossed_calls several times to try to trigger the various behaviours. *)
let test_crossed_calls ~net =
let test_crossed_calls ~net () =
for _ = 1 to 10 do
test_crossed_calls ~net
done
Expand Down Expand Up @@ -744,7 +744,7 @@ let rpc_tests ~net ~dir =
run "Broken ref 4" test_broken4;
run_eio "Parallel connect" test_parallel_connect;
run_eio "Parallel fails" test_parallel_fails;
run_eio "Crossed calls" test_crossed_calls;
run "Crossed calls" (test_crossed_calls ~net); (* Aborted connections can log warnings *)
run_eio "Store" test_store;
run_eio "File store" (test_file_store ~dir);
run_eio "Await settled" test_await_settled;
Expand Down

0 comments on commit ff70ce4

Please sign in to comment.