diff --git a/solar/src/actors/muxrpc/ebt.rs b/solar/src/actors/muxrpc/ebt.rs index d15eb46..26d1772 100644 --- a/solar/src/actors/muxrpc/ebt.rs +++ b/solar/src/actors/muxrpc/ebt.rs @@ -93,6 +93,18 @@ where } // Handle a broker message. RpcInput::Message(msg) => match msg { + BrokerMessage::Ebt(EbtEvent::TerminateSession(conn_id, session_role)) => { + if conn_id == &connection_id { + let req_no = match session_role { + SessionRole::Requester => self.active_request, + SessionRole::Responder => -(self.active_request), + }; + + return self.send_cancelstream(api, req_no).await; + } + + Ok(false) + } BrokerMessage::Ebt(EbtEvent::SendClock(conn_id, req_no, clock, session_role)) => { // This is, regrettably, rather unintuitive. // @@ -324,8 +336,7 @@ where Ok(false) } - /// Remove the associated request from the map of active requests and close - /// the stream. + /// Receive close-stream request. async fn recv_cancelstream(&mut self, api: &mut ApiCaller, req_no: ReqNo) -> Result { trace!(target: "ebt-handler", "Received cancel stream RPC response: {}", req_no); @@ -334,6 +345,15 @@ where Ok(true) } + /// Send close-stream request. + async fn send_cancelstream(&mut self, api: &mut ApiCaller, req_no: ReqNo) -> Result { + trace!(target: "ebt-handler", "Send cancel stream RPC response: {}", req_no); + + api.rpc().send_stream_eof(-req_no).await?; + + Ok(true) + } + /// Report a MUXRPC error and remove the associated request from the map of /// active requests. async fn recv_error_response(&mut self, req_no: ReqNo, err_msg: &str) -> Result { diff --git a/solar/src/actors/replication/ebt/manager.rs b/solar/src/actors/replication/ebt/manager.rs index e9ac783..64a94d8 100644 --- a/solar/src/actors/replication/ebt/manager.rs +++ b/solar/src/actors/replication/ebt/manager.rs @@ -60,6 +60,7 @@ pub enum EbtEvent { ReceivedMessage(Message), SessionConcluded(ConnectionId, SsbId), SessionTimeout(ConnectionData, SsbId), + TerminateSession(ConnectionId, SessionRole), Error(ConnectionData, SsbId, ErrorMsg), } @@ -686,6 +687,10 @@ impl EbtManager { Ok(()) } + async fn handle_terminate_session(&mut self, connection_id: ConnectionId) { + trace!(target: "ebt-replication", "Terminating session for connection {}", connection_id); + } + async fn handle_error( &mut self, connection_data: ConnectionData, @@ -801,6 +806,9 @@ impl EbtManager { error!("Error while handling 'session timeout' event: {}", err) } } + EbtEvent::TerminateSession(connection_data, _session_role) => { + self.handle_terminate_session(connection_data).await; + } EbtEvent::Error(connection_data, peer_ssb_id, error_msg) => { if let Err(err) = self.handle_error(connection_data, peer_ssb_id, error_msg).await { error!("Error while handling 'error' event: {}", err) diff --git a/solar/src/actors/replication/ebt/replicator.rs b/solar/src/actors/replication/ebt/replicator.rs index 1a7cf08..571d9b9 100644 --- a/solar/src/actors/replication/ebt/replicator.rs +++ b/solar/src/actors/replication/ebt/replicator.rs @@ -15,7 +15,7 @@ use crate::{ network::connection::ConnectionData, replication::ebt::{EbtEvent, SessionRole}, }, - broker::{ActorEndpoint, BrokerEvent, BrokerMessage, Destination, BROKER}, + broker::{ActorEndpoint, BrokerEvent, BrokerMessage, Destination, Void, BROKER}, Error, Result, }; @@ -27,7 +27,9 @@ pub async fn run( // Register the EBT replication loop actor with the broker. let ActorEndpoint { ch_terminate, + ch_terminated, ch_msg, + mut ch_broker, .. } = BROKER .lock() @@ -69,9 +71,6 @@ pub async fn run( let rpc_recv_stream = rpc_reader.into_stream().fuse(); pin_mut!(rpc_recv_stream); - // Create channel to send messages to broker. - let mut ch_broker = BROKER.lock().await.create_sender(); - trace!(target: "ebt-session", "Initiating EBT replication session with: {}", peer_ssb_id); let mut session_initiated = false; @@ -98,13 +97,26 @@ pub async fn run( // ready, one will be selected in order of declaration. let input = select_biased! { _value = ch_terminate_fuse => { - break; + // Communicate stream termination to the session peer. + RpcInput::Message( + BrokerMessage::Ebt( + EbtEvent::TerminateSession(connection_id, session_role.to_owned()) + ) + ) }, packet = rpc_recv_stream.select_next_some() => { let (req_no, packet) = packet; RpcInput::Network(req_no, packet) }, msg = ch_msg.next().fuse() => { + // Listen for a 'session concluded' event and terminate the + // replicator if the connection ID of the event matches the + // ID of this instance of the replicator. + if let Some(BrokerMessage::Ebt(EbtEvent::SessionConcluded(conn_id, _))) = msg { + if connection_id == conn_id { + break + } + } // Listen for a 'session initiated' event. if let Some(BrokerMessage::Ebt(EbtEvent::SessionInitiated(_connection_id, ref req_no, ref ssb_id, ref session_role))) = msg { if peer_ssb_id == *ssb_id && *session_role == SessionRole::Responder { @@ -188,5 +200,8 @@ pub async fn run( )) .await?; + // Send 'terminated' signal to broker. + let _ = ch_terminated.send(Void {}); + Ok(()) }