-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add buffering of outgoing messages #287
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,27 @@ | ||
open Eio.Std | ||
|
||
module Metrics = struct | ||
open Prometheus | ||
|
||
let namespace = "capnp" | ||
|
||
let subsystem = "net" | ||
|
||
let connections = | ||
let help = "Number of live capnp-rpc connections" in | ||
Gauge.v ~help ~namespace ~subsystem "connections" | ||
|
||
let messages_inbound_received_total = | ||
let help = "Total number of messages received" in | ||
Counter.v ~help ~namespace ~subsystem "messages_inbound_received_total" | ||
|
||
let messages_outbound_enqueued_total = | ||
let help = "Total number of messages enqueued to be transmitted" in | ||
Counter.v ~help ~namespace ~subsystem "messages_outbound_enqueued_total" | ||
end | ||
|
||
module Write = Eio.Buf_write | ||
|
||
let src = Logs.Src.create "endpoint" ~doc:"Send and receive Cap'n'Proto messages" | ||
module Log = (val Logs.src_log src: Logs.LOG) | ||
|
||
|
@@ -11,16 +33,20 @@ type flow = Eio.Flow.two_way_ty r | |
|
||
type t = { | ||
flow : flow; | ||
writer : Write.t; | ||
decoder : Capnp.Codecs.FramedStream.t; | ||
peer_id : Auth.Digest.t; | ||
recv_buf : Cstruct.t; | ||
} | ||
|
||
let peer_id t = t.peer_id | ||
|
||
let of_flow ~peer_id flow = | ||
let decoder = Capnp.Codecs.FramedStream.empty compression in | ||
let flow = (flow :> flow) in | ||
{ flow; decoder; peer_id } | ||
let writer = Write.create 4096 in | ||
let recv_buf = Cstruct.create 4096 in | ||
{ flow; writer; decoder; peer_id; recv_buf } | ||
|
||
let dump_msg = | ||
let next = ref 0 in | ||
|
@@ -33,42 +59,78 @@ let dump_msg = | |
close_out ch | ||
|
||
let send t msg = | ||
let data = Capnp.Codecs.serialize ~compression msg in | ||
if record_sent_messages then dump_msg data; | ||
match Eio.Flow.copy_string data t.flow with | ||
| () | ||
| exception End_of_file -> Ok () | ||
| exception (Eio.Io (Eio.Net.E Connection_reset _, _) as ex) -> | ||
Log.info (fun f -> f "%a" Eio.Exn.pp ex); | ||
Error `Closed | ||
| exception ex -> | ||
Eio.Fiber.check (); | ||
Error (`Msg (Printexc.to_string ex)) | ||
Log.debug (fun f -> | ||
let module M = Capnp_rpc.Private.Schema.MessageWrapper.Message in | ||
f "queue_send: %d/%d allocated bytes in %d segs" | ||
(M.total_size msg) | ||
(M.total_alloc_size msg) | ||
(M.num_segments msg)); | ||
Capnp.Codecs.serialize_iter_copyless ~compression msg ~f:(fun x len -> Write.string t.writer x ~len); | ||
Prometheus.Counter.inc_one Metrics.messages_outbound_enqueued_total; | ||
if record_sent_messages then dump_msg (Capnp.Codecs.serialize ~compression msg) | ||
|
||
let rec recv t = | ||
let rec recv ~tags t = | ||
match Capnp.Codecs.FramedStream.get_next_frame t.decoder with | ||
| Ok msg -> Ok (Capnp.BytesMessage.Message.readonly msg) | ||
| Ok msg -> | ||
Prometheus.Counter.inc_one Metrics.messages_inbound_received_total; | ||
(* We often want to send multiple response messages while processing a batch of requests, | ||
so pause the writer to collect them. We'll unpause on the next [single_read]. *) | ||
Write.pause t.writer; | ||
Ok (Capnp.BytesMessage.Message.readonly msg) | ||
| Error Capnp.Codecs.FramingError.Unsupported -> failwith "Unsupported Cap'n'Proto frame received" | ||
| Error Capnp.Codecs.FramingError.Incomplete -> | ||
Log.debug (fun f -> f "Incomplete; waiting for more data..."); | ||
let buf = Cstruct.create 4096 in (* TODO: make this efficient *) | ||
match Eio.Flow.single_read t.flow buf with | ||
Log.debug (fun f -> f ~tags "Incomplete; waiting for more data..."); | ||
(* We probably scheduled one or more application fibers to run while handling the last | ||
batch of messages. Give them a chance to run now while the writer is paused, because | ||
they might want to send more messages immediately. *) | ||
Fiber.yield (); | ||
Write.unpause t.writer; | ||
match Eio.Flow.single_read t.flow t.recv_buf with | ||
| got -> | ||
Log.debug (fun f -> f "Read %d bytes" got); | ||
Capnp.Codecs.FramedStream.add_fragment t.decoder (Cstruct.to_string buf ~len:got); | ||
recv t | ||
Log.debug (fun f -> f ~tags "Read %d bytes" got); | ||
Capnp.Codecs.FramedStream.add_fragment t.decoder (Cstruct.to_string t.recv_buf ~len:got); | ||
recv ~tags t | ||
| exception End_of_file -> | ||
Log.info (fun f -> f "Connection closed"); | ||
Log.info (fun f -> f ~tags "Received end-of-stream"); | ||
Error `Closed | ||
| exception (Eio.Io (Eio.Net.E Connection_reset _, _) as ex) -> | ||
Log.info (fun f -> f "%a" Eio.Exn.pp ex); | ||
Log.info (fun f -> f ~tags "Receive failed: %a" Eio.Exn.pp ex); | ||
Error `Closed | ||
|
||
let disconnect t = | ||
try | ||
Eio.Flow.shutdown t.flow `All | ||
with | ||
| Invalid_argument _ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We now don't close the socket until both send and receive fibers are done, so |
||
| Eio.Io (Eio.Net.E Connection_reset _, _) -> | ||
with Eio.Io (Eio.Net.E Connection_reset _, _) -> | ||
(* TCP connection already shut down, so TLS shutdown failed. Ignore. *) | ||
() | ||
|
||
let flush t = | ||
Write.unpause t.writer; | ||
(* Give the writer a chance to send the last of the data. | ||
We could use [Write.flush] to be sure the data got sent, but this code is | ||
only used to send aborts, which isn't very important and it's probably | ||
better to drop the buffered messages if one yield isn't enough. *) | ||
Fiber.yield () | ||
|
||
let rec run_writer ~tags t = | ||
let bufs = Write.await_batch t.writer in | ||
match Eio.Flow.single_write t.flow bufs with | ||
| n -> Write.shift t.writer n; run_writer ~tags t | ||
| exception (Eio.Io (Eio.Net.E Connection_reset _, _) as ex) -> | ||
Log.info (fun f -> f ~tags "Send failed: %a" Eio.Exn.pp ex) | ||
| exception ex -> | ||
Eio.Fiber.check (); | ||
Log.warn (fun f -> f ~tags "Error sending messages: %a (will shutdown connection)" Fmt.exn ex) | ||
|
||
let run_writer ~tags t = | ||
let cleanup () = | ||
Prometheus.Gauge.dec_one Metrics.connections; | ||
disconnect t (* The listen fiber will read end-of-stream soon *) | ||
in | ||
Prometheus.Gauge.inc_one Metrics.connections; | ||
match run_writer ~tags t with | ||
| () -> cleanup () | ||
| exception ex -> | ||
let bt = Printexc.get_raw_backtrace () in | ||
cleanup (); | ||
Printexc.raise_with_backtrace ex bt |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Endpont already reports when the connection is closed, and with a better message.