From 18ac07aea1f23b835caed6e98d43c57a62700923 Mon Sep 17 00:00:00 2001 From: Amir Abdin Date: Tue, 14 Nov 2023 12:45:40 +0100 Subject: [PATCH] Document Odin --- drasil-hugin/src/protocol/cmd/mod.rs | 3 +++ drasil-hugin/src/protocol/connection.rs | 3 +++ drasil-hugin/src/protocol/frame.rs | 3 +++ drasil-hugin/src/protocol/shutdown.rs | 3 +++ services/odin/src/main.rs | 31 ++++++++++++++++++++++++- 5 files changed, 42 insertions(+), 1 deletion(-) diff --git a/drasil-hugin/src/protocol/cmd/mod.rs b/drasil-hugin/src/protocol/cmd/mod.rs index c332948d..c6bfa4e3 100644 --- a/drasil-hugin/src/protocol/cmd/mod.rs +++ b/drasil-hugin/src/protocol/cmd/mod.rs @@ -34,6 +34,7 @@ pub trait IntoFrame { fn into_frame(self) -> Frame; } +/// Instruction types that the user can give to Drasil. #[derive(Debug)] pub enum Command { BuildContract(BuildContract), @@ -49,6 +50,7 @@ pub enum Command { } impl Command { + /// Parses frame into a command. pub fn from_frame(frame: Frame) -> crate::Result { let mut parse = Parse::new(frame)?; log::debug!("FromFrame: {:?}", &parse); @@ -87,6 +89,7 @@ impl Command { Ok(command) } + /// Executes command. pub async fn apply(self, dst: &mut Connection, _shutdown: &mut Shutdown) -> crate::Result<()> { match self { Command::BuildContract(cmd) => cmd.apply(dst).await?, diff --git a/drasil-hugin/src/protocol/connection.rs b/drasil-hugin/src/protocol/connection.rs index 521bbab0..5a2694ef 100644 --- a/drasil-hugin/src/protocol/connection.rs +++ b/drasil-hugin/src/protocol/connection.rs @@ -6,6 +6,7 @@ use tokio::io::BufWriter; use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; +/// Buffered TCP connection. #[derive(Debug)] pub struct Connection { stream: BufWriter, @@ -13,6 +14,7 @@ pub struct Connection { } impl Connection { + /// Creates a buffered TCP connection from a TCP stream. pub fn new(stream: TcpStream) -> Connection { Connection { stream: BufWriter::new(stream), @@ -20,6 +22,7 @@ impl Connection { } } + /// Reads frame from TCP connection. pub async fn read_frame(&mut self) -> crate::Result> { loop { if let Some(frame) = self.parse_frame().await? { diff --git a/drasil-hugin/src/protocol/frame.rs b/drasil-hugin/src/protocol/frame.rs index 13943ff3..dc5c9833 100644 --- a/drasil-hugin/src/protocol/frame.rs +++ b/drasil-hugin/src/protocol/frame.rs @@ -5,6 +5,9 @@ use std::io::Cursor; use std::num::TryFromIntError; use std::string::FromUtf8Error; +/// Data package that can be sent and received through TCP connection. +/// A peer can receive a frame by reading the TCP connection, and send +/// a frame by writing to the TCP connection. #[derive(Clone, Debug)] pub enum Frame { Simple(String), diff --git a/drasil-hugin/src/protocol/shutdown.rs b/drasil-hugin/src/protocol/shutdown.rs index 1be41cab..5418106c 100644 --- a/drasil-hugin/src/protocol/shutdown.rs +++ b/drasil-hugin/src/protocol/shutdown.rs @@ -1,5 +1,8 @@ use tokio::sync::broadcast; +/// Specifies the state of a given TCP connection. Can be used as +/// a trigger mechanism for whether the server should continue +/// listening to a TCP connection or drop it. #[derive(Debug)] pub struct Shutdown { shutdown: bool, diff --git a/services/odin/src/main.rs b/services/odin/src/main.rs index 29dcb50d..a787092d 100644 --- a/services/odin/src/main.rs +++ b/services/odin/src/main.rs @@ -1,3 +1,9 @@ +/// Odin is a server that receives transaction requests from Heimdallr clients. Odin can establish a TCP +/// connection with a given number of Heimdallr clients. Each Heimdallr client can send a transaction +/// request through the TCP connection to Odin server, which will cause the Odin server to +/// parse the transaction request from compressed form into human-readable form (also the form expected by +/// other parts of Drasil). The parsed transaction request is then passed to Hugin for further processing. + extern crate pretty_env_logger; use drasil_hugin::protocol::{connection::Connection, Shutdown}; use drasil_hugin::Command; @@ -11,6 +17,7 @@ use tokio::time::{self, Duration}; use std::env; use std::str; +/// TCP listener that can limit the number of active connections. struct Listener { listener: TcpListener, limit_connections: Arc, @@ -20,6 +27,7 @@ struct Listener { shutdown_complete_tx: mpsc::Sender<()>, } +/// A buffered TCP connection that keeps track on the number of TCP connections left on the TCP listener. struct Handler { connection: Connection, limit_connections: Arc, @@ -28,10 +36,12 @@ struct Handler { _shutdown_complete: mpsc::Sender<()>, } +/// Default address exposed by Odin server if another address isn't specified. const DEFAULT_HOST: &str = "127.0.0.1"; const DEFAULT_PORT: &str = "6142"; const MAX_CONNECTIONS: usize = 1000; +/// Run the Odin server until Odin receives ctrl_c shutdown command. pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result<()> { let (notify_shutdown, _) = broadcast::channel(1); let (shutdown_complete_tx, shutdown_complete_rx) = mpsc::channel(1); @@ -43,10 +53,11 @@ pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result< shutdown_complete_rx, }; + // Run Odin server, until Odin receives ctrl+C which causes the server to turn off tokio::select! { res = server.run() => { if let Err(err) = res { - log::error!("failed to accept: {:?}",err); + log::error!("failed to accept: {:?}", err); } } _ = shutdown => { @@ -54,6 +65,7 @@ pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result< } } + // Separate out Odin's channels in preparation for shutdown let Listener { mut shutdown_complete_rx, shutdown_complete_tx, @@ -61,6 +73,7 @@ pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result< .. } = server; + // Shut down the broadcast & mpsc channels drop(notify_shutdown); drop(shutdown_complete_tx); let _ = shutdown_complete_rx.recv().await; @@ -69,12 +82,14 @@ pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result< } impl Listener { + /// Establishes connections with one or more remote TCP clients. async fn run(&mut self) -> crate::Result<()> { log::info!( "accepting inbound connections at {:?}", self.listener.local_addr()? ); + // each iteration represents a single connection loop { self.limit_connections.acquire().await?.forget(); let socket = self.accept().await?; @@ -94,6 +109,12 @@ impl Listener { } } + /// Accepts incoming connection from remote client. + /// + /// This function checks for connection requests over and over. If remote client + /// hasn't made any connection requests, this function will wait twice as long as + /// last time before checking again. If no connection requests were made for some + /// time, then it will stop checking. async fn accept(&mut self) -> crate::Result { log::info!("accepted connection"); let mut backoff = 1; @@ -113,6 +134,8 @@ impl Listener { } impl Handler { + /// Continuously listens for incoming instructions from remote client to Odin server + /// until the connection is shut down. async fn run(&mut self) -> crate::Result<()> { log::debug!("started new handler"); while !self.shutdown.is_shutdown() { @@ -135,12 +158,18 @@ impl Handler { } } +/// Increases the number of allowed connections to Odin server by 1 when +/// current connection is terminated. impl Drop for Handler { fn drop(&mut self) { self.limit_connections.add_permits(1); } } +/// Runs the Odin server. +/// +/// It specifies the address that Odin server will expose, creates a TCP listener on that address +/// and then runs that TCP listener. use tokio::signal; #[tokio::main] pub async fn main() -> crate::Result<()> {