Skip to content

Commit

Permalink
Document Odin
Browse files Browse the repository at this point in the history
  • Loading branch information
amab8901 committed Nov 15, 2023
1 parent ae1e109 commit 18ac07a
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 1 deletion.
3 changes: 3 additions & 0 deletions drasil-hugin/src/protocol/cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub trait IntoFrame {
fn into_frame(self) -> Frame;
}

/// Instruction types that the user can give to Drasil.
#[derive(Debug)]
pub enum Command {
BuildContract(BuildContract),
Expand All @@ -49,6 +50,7 @@ pub enum Command {
}

impl Command {
/// Parses frame into a command.
pub fn from_frame(frame: Frame) -> crate::Result<Command> {
let mut parse = Parse::new(frame)?;
log::debug!("FromFrame: {:?}", &parse);
Expand Down Expand Up @@ -87,6 +89,7 @@ impl Command {
Ok(command)
}

/// Executes command.
pub async fn apply(self, dst: &mut Connection, _shutdown: &mut Shutdown) -> crate::Result<()> {
match self {
Command::BuildContract(cmd) => cmd.apply(dst).await?,
Expand Down
3 changes: 3 additions & 0 deletions drasil-hugin/src/protocol/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,23 @@ use tokio::io::BufWriter;
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;

/// Buffered TCP connection.
#[derive(Debug)]
pub struct Connection {
stream: BufWriter<TcpStream>,
buffer: BytesMut,
}

impl Connection {
/// Creates a buffered TCP connection from a TCP stream.
pub fn new(stream: TcpStream) -> Connection {
Connection {
stream: BufWriter::new(stream),
buffer: BytesMut::with_capacity(50 * 1024),
}
}

/// Reads frame from TCP connection.
pub async fn read_frame(&mut self) -> crate::Result<Option<Frame>> {
loop {
if let Some(frame) = self.parse_frame().await? {
Expand Down
3 changes: 3 additions & 0 deletions drasil-hugin/src/protocol/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ use std::io::Cursor;
use std::num::TryFromIntError;
use std::string::FromUtf8Error;

/// Data package that can be sent and received through TCP connection.
/// A peer can receive a frame by reading the TCP connection, and send
/// a frame by writing to the TCP connection.
#[derive(Clone, Debug)]
pub enum Frame {
Simple(String),
Expand Down
3 changes: 3 additions & 0 deletions drasil-hugin/src/protocol/shutdown.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use tokio::sync::broadcast;

/// Specifies the state of a given TCP connection. Can be used as
/// a trigger mechanism for whether the server should continue
/// listening to a TCP connection or drop it.
#[derive(Debug)]
pub struct Shutdown {
shutdown: bool,
Expand Down
31 changes: 30 additions & 1 deletion services/odin/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
/// Odin is a server that receives transaction requests from Heimdallr clients. Odin can establish a TCP
/// connection with a given number of Heimdallr clients. Each Heimdallr client can send a transaction
/// request through the TCP connection to Odin server, which will cause the Odin server to
/// parse the transaction request from compressed form into human-readable form (also the form expected by
/// other parts of Drasil). The parsed transaction request is then passed to Hugin for further processing.
extern crate pretty_env_logger;
use drasil_hugin::protocol::{connection::Connection, Shutdown};
use drasil_hugin::Command;
Expand All @@ -11,6 +17,7 @@ use tokio::time::{self, Duration};
use std::env;
use std::str;

/// TCP listener that can limit the number of active connections.
struct Listener {
listener: TcpListener,
limit_connections: Arc<Semaphore>,
Expand All @@ -20,6 +27,7 @@ struct Listener {
shutdown_complete_tx: mpsc::Sender<()>,
}

/// A buffered TCP connection that keeps track on the number of TCP connections left on the TCP listener.
struct Handler {
connection: Connection,
limit_connections: Arc<Semaphore>,
Expand All @@ -28,10 +36,12 @@ struct Handler {
_shutdown_complete: mpsc::Sender<()>,
}

/// Default address exposed by Odin server if another address isn't specified.
const DEFAULT_HOST: &str = "127.0.0.1";
const DEFAULT_PORT: &str = "6142";
const MAX_CONNECTIONS: usize = 1000;

/// Run the Odin server until Odin receives ctrl_c shutdown command.
pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result<()> {
let (notify_shutdown, _) = broadcast::channel(1);
let (shutdown_complete_tx, shutdown_complete_rx) = mpsc::channel(1);
Expand All @@ -43,24 +53,27 @@ pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result<
shutdown_complete_rx,
};

// Run Odin server, until Odin receives ctrl+C which causes the server to turn off
tokio::select! {
res = server.run() => {
if let Err(err) = res {
log::error!("failed to accept: {:?}",err);
log::error!("failed to accept: {:?}", err);
}
}
_ = shutdown => {
log::info!("shutting down")
}
}

// Separate out Odin's channels in preparation for shutdown
let Listener {
mut shutdown_complete_rx,
shutdown_complete_tx,
notify_shutdown,
..
} = server;

// Shut down the broadcast & mpsc channels
drop(notify_shutdown);
drop(shutdown_complete_tx);
let _ = shutdown_complete_rx.recv().await;
Expand All @@ -69,12 +82,14 @@ pub async fn run(listener: TcpListener, shutdown: impl Future) -> crate::Result<
}

impl Listener {
/// Establishes connections with one or more remote TCP clients.
async fn run(&mut self) -> crate::Result<()> {
log::info!(
"accepting inbound connections at {:?}",
self.listener.local_addr()?
);

// each iteration represents a single connection
loop {
self.limit_connections.acquire().await?.forget();
let socket = self.accept().await?;
Expand All @@ -94,6 +109,12 @@ impl Listener {
}
}

/// Accepts incoming connection from remote client.
///
/// This function checks for connection requests over and over. If remote client
/// hasn't made any connection requests, this function will wait twice as long as
/// last time before checking again. If no connection requests were made for some
/// time, then it will stop checking.
async fn accept(&mut self) -> crate::Result<TcpStream> {
log::info!("accepted connection");
let mut backoff = 1;
Expand All @@ -113,6 +134,8 @@ impl Listener {
}

impl Handler {
/// Continuously listens for incoming instructions from remote client to Odin server
/// until the connection is shut down.
async fn run(&mut self) -> crate::Result<()> {
log::debug!("started new handler");
while !self.shutdown.is_shutdown() {
Expand All @@ -135,12 +158,18 @@ impl Handler {
}
}

/// Increases the number of allowed connections to Odin server by 1 when
/// current connection is terminated.
impl Drop for Handler {
fn drop(&mut self) {
self.limit_connections.add_permits(1);
}
}

/// Runs the Odin server.
///
/// It specifies the address that Odin server will expose, creates a TCP listener on that address
/// and then runs that TCP listener.
use tokio::signal;
#[tokio::main]
pub async fn main() -> crate::Result<()> {
Expand Down

0 comments on commit 18ac07a

Please sign in to comment.