Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

253 implement fault tolerance mechanisms eg retries dead letter queues circuit breakers e5 #280

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ thiserror.workspace = true
didcomm = { workspace = true, features = ["uniffi"] }
hyper = { workspace = true, features = ["full"] }
axum = { workspace = true, features = ["macros"] }
tokio = "1.27.0"

[dev-dependencies]
keystore = { workspace = true, features = ["test-utils"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub(crate) enum ForwardError {
UncoordinatedSender,
#[error("Internal server error")]
InternalServerError,
#[error("Service unavailable")]
CircuitOpen,
}

impl IntoResponse for ForwardError {
Expand All @@ -18,6 +20,7 @@ impl IntoResponse for ForwardError {
ForwardError::MalformedBody => StatusCode::BAD_REQUEST,
ForwardError::UncoordinatedSender => StatusCode::UNAUTHORIZED,
ForwardError::InternalServerError => StatusCode::INTERNAL_SERVER_ERROR,
ForwardError::CircuitOpen => StatusCode::SERVICE_UNAVAILABLE,
};

let body = Json(serde_json::json!({
Expand Down
110 changes: 77 additions & 33 deletions crates/web-plugins/didcomm-messaging/protocols/forward/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,48 +4,84 @@ use didcomm::{AttachmentData, Message};
use mongodb::bson::doc;
use serde_json::{json, Value};
use shared::{
circuit_breaker::CircuitBreaker,
repository::entity::{Connection, RoutedMessage},
retry::{retry_async, RetryOptions},
state::{AppState, AppStateRepository},
};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;

/// Mediator receives forwarded messages, extract the next field in the message body, and the attachments in the message
/// then stores the attachment with the next field as key for pickup
pub(crate) async fn mediator_forward_process(
state: Arc<AppState>,
message: Message,
circuit_breaker: Arc<Mutex<CircuitBreaker>>,
) -> Result<Option<Message>, ForwardError> {
let AppStateRepository {
message_repository,
connection_repository,
..
} = state
.repository
.as_ref()
.ok_or(ForwardError::InternalServerError)?;

let next = match checks(&message, connection_repository).await.ok() {
Some(next) => Ok(next),
None => Err(ForwardError::InternalServerError),
};

let attachments = message.attachments.unwrap_or_default();
for attachment in attachments {
let attached = match attachment.data {
AttachmentData::Json { value: data } => data.json,
AttachmentData::Base64 { value: data } => json!(data.base64),
AttachmentData::Links { value: data } => json!(data.links),
};
message_repository
.store(RoutedMessage {
id: None,
message: attached,
recipient_did: next.as_ref().unwrap().to_owned(),
})
.await
.map_err(|_| ForwardError::InternalServerError)?;
let mut cb = circuit_breaker.lock().await;

let result = cb
.call_async(|| {
let state = Arc::clone(&state);
let message = message.clone();
async move {
let AppStateRepository {
message_repository,
connection_repository,
..
} = state
.repository
.as_ref()
.ok_or_else(|| ForwardError::InternalServerError)?;

let next = match checks(&message, connection_repository).await.ok() {
Some(next) => Ok(next),
None => Err(ForwardError::InternalServerError),
}?;

let attachments = message.attachments.unwrap_or_default();
for attachment in attachments {
let attached = match attachment.data {
AttachmentData::Json { value: data } => data.json,
AttachmentData::Base64 { value: data } => json!(data.base64),
AttachmentData::Links { value: data } => json!(data.links),
};
retry_async(
|| {
let attached = attached.clone();
let recipient_did = next.to_owned();

async move {
message_repository
.store(RoutedMessage {
id: None,
message: attached,
recipient_did,
})
.await
}
},
RetryOptions::new()
.retries(5)
.exponential_backoff(Duration::from_millis(100))
.max_delay(Duration::from_secs(1)),
)
.await
.map_err(|_| ForwardError::InternalServerError)?;
}
Ok::<Option<Message>, ForwardError>(None)
}
})
.await;

match result {
Some(Ok(None)) => Ok(None),
Some(Ok(Some(_))) => Err(ForwardError::InternalServerError),
Some(Err(err)) => Err(err),
None => Err(ForwardError::CircuitOpen),
}
Ok(None)
}

async fn checks(
Expand Down Expand Up @@ -83,6 +119,7 @@ mod test {
use keystore::Secrets;
use serde_json::json;
use shared::{
circuit_breaker,
repository::{
entity::Connection,
tests::{MockConnectionRepository, MockMessagesRepository},
Expand Down Expand Up @@ -166,9 +203,16 @@ mod test {
.await
.expect("Unable unpack");

let msg = mediator_forward_process(Arc::new(state.clone()), msg)
.await
.unwrap();
// Wrap the CircuitBreaker in Arc and Mutex
let circuit_breaker = circuit_breaker::CircuitBreaker::new(3, Duration::from_secs(3));

let msg: Option<Message> = mediator_forward_process(
Arc::new(state.clone()),
msg,
Arc::new(circuit_breaker.into()),
)
.await
.unwrap();

println!("Mediator1 is forwarding message \n{:?}\n", msg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use async_trait::async_trait;
use axum::response::{IntoResponse, Response};
use didcomm::Message;
use message_api::{MessageHandler, MessagePlugin, MessageRouter};
use shared::state::AppState;
use std::sync::Arc;
use shared::{circuit_breaker::CircuitBreaker, state::AppState};
use std::{sync::Arc, time::Duration};
use tokio::sync::Mutex;

/// Represents the routing protocol plugin.
pub struct RoutingProtocol;
Expand All @@ -18,7 +19,13 @@ impl MessageHandler for ForwardHandler {
state: Arc<AppState>,
msg: Message,
) -> Result<Option<Message>, Response> {
crate::handler::mediator_forward_process(state, msg)
let circuit_breaker = Arc::new(Mutex::new(CircuitBreaker::new(
2,
Duration::from_millis(5000),
)));

// Pass the state, msg, and the circuit_breaker as arguments
crate::handler::mediator_forward_process(state, msg, circuit_breaker)
.await
.map_err(|e| e.into_response())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ pub(crate) enum MediationError {
UnexpectedMessageFormat,
#[error("internal server error")]
InternalServerError,
#[error("service unavailable")]
CircuitOpen,
}

impl IntoResponse for MediationError {
Expand All @@ -26,6 +28,7 @@ impl IntoResponse for MediationError {
MediationError::UncoordinatedSender => StatusCode::UNAUTHORIZED,
MediationError::UnexpectedMessageFormat => StatusCode::BAD_REQUEST,
MediationError::InternalServerError => StatusCode::INTERNAL_SERVER_ERROR,
MediationError::CircuitOpen => StatusCode::SERVICE_UNAVAILABLE,
};

let body = Json(serde_json::json!({
Expand Down
Loading
Loading