Skip to content

Commit

Permalink
Merge pull request #306 from talex5/table-sw
Browse files Browse the repository at this point in the history
Restorer.Table.create now takes a switch
  • Loading branch information
talex5 authored Dec 3, 2024
2 parents 8ac82a0 + 4f53cd9 commit 6d62d84
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 30 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,7 @@ let make_service ~config ~services name =
let start_server ~sw net =
let config = Capnp_rpc_unix.Vat_config.create ~secret_key ~net listen_address in
let make_sturdy = Capnp_rpc_unix.Vat_config.sturdy_uri config in
let services = Restorer.Table.create make_sturdy in
let services = Restorer.Table.create ~sw make_sturdy in
let restore = Restorer.of_table services in
let services = List.map (make_service ~config ~services) ["alice"; "bob"] in
let vat = Capnp_rpc_unix.serve ~sw ~restore config in
Expand Down
10 changes: 6 additions & 4 deletions capnp-rpc-net/capnp_rpc_net.mli
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,17 @@ module Restorer : sig
type t
(** A restorer that keeps a hashtable mapping IDs to capabilities in memory. *)

val create : (Id.t -> Uri.t) -> t
(** [create make_sturdy] is a new in-memory-only table.
val create : sw:Eio.Switch.t -> (Id.t -> Uri.t) -> t
(** [create ~sw make_sturdy] is a new in-memory-only table.
[make_sturdy id] converts an ID to a full URI, by adding the
hosting vat's address and fingerprint. *)
hosting vat's address and fingerprint.
@param sw {!clear} is called when [sw] is turned off. *)

val of_loader : sw:Eio.Switch.t -> (module LOADER with type t = 'loader) -> 'loader -> t
(** [of_loader ~sw (module Loader) l] is a new caching table that uses
[Loader.load l sr (Loader.hash id)] to restore services that aren't in the cache.
The load function runs in a new fiber in [sw]. *)
@param sw The [load] function runs in a new fiber in [sw].
{!clear} is called when [sw] is turned off. *)

val add : t -> Id.t -> 'a Capability.t -> unit
(** [add t id cap] adds a mapping to [t].
Expand Down
37 changes: 19 additions & 18 deletions capnp-rpc-net/restorer.ml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ module Table = struct
| Manual of Core_types.cap (* We hold a ref on the cap *)

type t = {
sw : Switch.t;
hash : Digestif.hash';
cache : (digest, entry) Hashtbl.t;
load : Id.t -> digest -> resolution Promise.or_exn;
Expand All @@ -86,11 +87,21 @@ module Table = struct

(* [cache] contains promises or capabilities with positive ref-counts. *)

let create make_sturdy =
let release = function
| Manual cap -> Core_types.dec_ref cap;
| Cached _ -> ()

let clear t =
Hashtbl.iter (fun _ v -> release v) t.cache;
Hashtbl.clear t.cache

let create ~sw make_sturdy =
let hash = `SHA256 in
let cache = Hashtbl.create 53 in
let load _ _ = Promise.create_resolved (Ok unknown_service_id) in
{ hash; cache; load; make_sturdy }
let t = { sw; hash; cache; load; make_sturdy } in
Switch.on_release sw (fun () -> clear t);
t

let hash t id =
Id.digest t.hash id
Expand All @@ -102,13 +113,9 @@ module Table = struct
Core_types.inc_ref cap;
Ok cap
| Cached res ->
begin match Promise.await_exn res with
| Error _ as e -> e
| Ok cap ->
Core_types.inc_ref cap;
Fiber.yield ();
Ok cap
end
let res = Promise.await_exn res in
Result.iter Core_types.inc_ref res;
res
| exception Not_found ->
let cap = t.load id digest in
Hashtbl.add t.cache digest (Cached cap);
Expand Down Expand Up @@ -138,13 +145,15 @@ module Table = struct
end in
L.load loader (Cast.sturdy_of_raw sr) digest
)
and t = { hash; cache; load; make_sturdy = L.make_sturdy loader } in
and t = { sw; hash; cache; load; make_sturdy = L.make_sturdy loader } in
Switch.on_release sw (fun () -> clear t);
t

let add t id cap =
let cap = Cast.cap_to_raw cap in
let id = hash t id in
assert (not (Hashtbl.mem t.cache id));
Switch.check t.sw;
Hashtbl.add t.cache id (Manual cap)

let sturdy_ref t id =
Expand All @@ -153,21 +162,13 @@ module Table = struct
method to_uri_with_secrets = t.make_sturdy id
end

let release = function
| Manual cap -> Core_types.dec_ref cap;
| Cached _ -> ()

let remove t id =
let id = hash t id in
match Hashtbl.find t.cache id with
| exception Not_found -> failwith "Service ID not in restorer table"
| value ->
release value;
Hashtbl.remove t.cache id

let clear t =
Hashtbl.iter (fun _ v -> release v) t.cache;
Hashtbl.clear t.cache
end

let of_table = Table.resolve
2 changes: 1 addition & 1 deletion examples/sturdy-refs-2/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ let or_fail = function
let start_server ~sw net =
let config = Capnp_rpc_unix.Vat_config.create ~secret_key ~net listen_address in
let make_sturdy = Capnp_rpc_unix.Vat_config.sturdy_uri config in
let services = Restorer.Table.create make_sturdy in
let services = Restorer.Table.create ~sw make_sturdy in
let restore = Restorer.of_table services in
let root_id = Capnp_rpc_unix.Vat_config.derived_id config "root" in
let root = Logger.local "root" in
Expand Down
3 changes: 1 addition & 2 deletions examples/sturdy-refs-3/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ let or_fail = function
let start_server ~sw net =
let config = Capnp_rpc_unix.Vat_config.create ~secret_key ~net listen_address in
let make_sturdy = Capnp_rpc_unix.Vat_config.sturdy_uri config in
let services = Restorer.Table.create make_sturdy in
Switch.on_release sw (fun () -> Restorer.Table.clear services);
let services = Restorer.Table.create ~sw make_sturdy in
let restore = Restorer.of_table services in
(* $MDX part-begin=root *)
let root_id = Capnp_rpc_unix.Vat_config.derived_id config "root" in
Expand Down
2 changes: 1 addition & 1 deletion examples/sturdy-refs/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ let make_service ~config ~services name =
let start_server ~sw net =
let config = Capnp_rpc_unix.Vat_config.create ~secret_key ~net listen_address in
let make_sturdy = Capnp_rpc_unix.Vat_config.sturdy_uri config in
let services = Restorer.Table.create make_sturdy in
let services = Restorer.Table.create ~sw make_sturdy in
let restore = Restorer.of_table services in
let services = List.map (make_service ~config ~services) ["alice"; "bob"] in
let vat = Capnp_rpc_unix.serve ~sw ~restore config in
Expand Down
8 changes: 5 additions & 3 deletions test/test.ml
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,13 @@ let with_vats ?serve_tls ~net ~service fn =
(* Generic runner for Alcotest. *)
let run_eio ~net name ?(expected_warnings=0) fn =
Alcotest.test_case name `Quick @@ fun () ->
Switch.run @@ fun sw ->
Fiber.fork_daemon ~sw Capnp_rpc.Leak_handler.run;
let warnings_at_start = Logs.(err_count () + warn_count ()) in
Logs.info (fun f -> f "Start test-case");
fn ~net;
Gc.full_major ();
Fiber.yield (); (* Allow leak-detector to run *)
let warnings_at_end = Logs.(err_count () + warn_count ()) in
Alcotest.(check int) "Check log for warnings" expected_warnings (warnings_at_end - warnings_at_start)

Expand Down Expand Up @@ -403,7 +406,7 @@ let except_ty = Alcotest.testable Capnp_rpc.Exception.pp_ty (=)
let test_table_restorer ~net:_ =
Switch.run @@ fun sw ->
let make_sturdy id = Uri.make ~path:(Restorer.Id.to_string id) () in
let table = Restorer.Table.create make_sturdy in
let table = Restorer.Table.create ~sw make_sturdy in
let echo_id = Restorer.Id.public "echo" in
let registry_id = Restorer.Id.public "registry" in
let broken_id = Restorer.Id.public "broken" in
Expand All @@ -428,8 +431,7 @@ let test_table_restorer ~net:_ =
Capability.dec_ref a1;
Capability.dec_ref a2;
Capability.dec_ref r1;
Restorer.Table.remove table echo_id;
Restorer.Table.clear table
Restorer.Table.remove table echo_id

module Loader = struct
type t = string -> Restorer.resolution
Expand Down

0 comments on commit 6d62d84

Please sign in to comment.