From 4f82d5538632c3e5fa09dc06c282504483c4fe36 Mon Sep 17 00:00:00 2001 From: Daniel Barnes Date: Thu, 29 Aug 2024 18:31:51 +0900 Subject: [PATCH 1/4] more response statuses --- src/main.rs | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/src/main.rs b/src/main.rs index 1ef478a..46416c6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,6 @@ mod types; use crate::drivers::{CloudWatchDriver, LokiDriver}; use crate::types::LogDriver; -use anyhow::Result; use axum::{ body::{Body, Bytes}, extract::State, @@ -61,7 +60,7 @@ struct AppState { } #[tokio::main] -async fn main() -> Result<()> { +async fn main() -> anyhow::Result<()> { let args = Args::parse(); tracing_subscriber::fmt() .json() @@ -170,18 +169,16 @@ async fn ingest( body: Bytes, ) -> impl IntoResponse { debug!("received payload"); - let response = Response::builder() - .status(StatusCode::OK) - .header("x-vercel-verify", state.vercel_verify) - .body(Body::empty()) - .unwrap(); let signature = match headers.get("x-vercel-signature") { Some(signature) => signature.to_str().unwrap(), None => { warn!("received payload without signature"); counter!("drain_recv_invalid_signature").increment(1); - return response; + return Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Body::empty()) + .expect("Defined Responses to be infalliable."); } }; let body_string = match String::from_utf8(body.to_vec()) { @@ -189,7 +186,10 @@ async fn ingest( Err(e) => { error!("received bad utf-8: {:?}", e); counter!("drain_recv_bad_utf8").increment(1); - return response; + return Response::builder() + .status(StatusCode::NOT_ACCEPTABLE) + .body(Body::empty()) + .expect("Defined Responses to be infalliable."); } }; let mut sig_bytes = [0u8; 20]; @@ -199,7 +199,11 @@ async fn ingest( Err(e) => { error!("failed verifying signature: {:?}", e); counter!("drain_failed_verify_signature").increment(1); - return response; + return Response::builder() + .status(StatusCode::UNPROCESSABLE_ENTITY) + .header("x-vercel-verify", state.vercel_verify) + .body(Body::empty()) + .expect("Defined Responses to be infalliable."); } } match serde_json::from_str::(&body_string) { @@ -218,7 +222,11 @@ async fn ingest( error!(payload = ?body_string, "failed parsing: {:?}", e); } } - return response; + return Response::builder() + .status(StatusCode::OK) + .header("x-vercel-verify", state.vercel_verify) + .body(Body::empty()) + .expect("Defined Responses to be infalliable."); } async fn health_check() -> impl IntoResponse { From 6e1377354512299f67ba94d72cad20e6cf260138 Mon Sep 17 00:00:00 2001 From: Daniel Barnes Date: Thu, 29 Aug 2024 18:51:31 +0900 Subject: [PATCH 2/4] refactor to seperate files --- src/handlers.rs | 87 ++++++++++++++++++++++++++++++++++++++ src/main.rs | 108 +++++------------------------------------------- src/types.rs | 7 ++++ 3 files changed, 105 insertions(+), 97 deletions(-) create mode 100644 src/handlers.rs diff --git a/src/handlers.rs b/src/handlers.rs new file mode 100644 index 0000000..6d31214 --- /dev/null +++ b/src/handlers.rs @@ -0,0 +1,87 @@ +use crate::types; +use axum::{ + body::{Body, Bytes}, + extract::State, + http::{header::HeaderMap, Response, StatusCode}, + response::IntoResponse, +}; +use axum_prometheus::metrics::counter; +use ring::hmac; +use tracing::{debug, error, warn}; + +pub async fn root() -> impl IntoResponse { + Response::builder() + .status(StatusCode::OK) + .body(Body::empty()) + .unwrap() +} + +pub async fn health_check() -> impl IntoResponse { + return StatusCode::OK; +} + +pub async fn ingest( + State(state): State, + headers: HeaderMap, + body: Bytes, +) -> impl IntoResponse { + debug!("received payload"); + + let signature = match headers.get("x-vercel-signature") { + Some(signature) => signature.to_str().unwrap(), + None => { + warn!("received payload without signature"); + counter!("drain_recv_invalid_signature").increment(1); + return Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Body::empty()) + .expect("Defined Responses to be infalliable."); + } + }; + let body_string = match String::from_utf8(body.to_vec()) { + Ok(body_string) => body_string, + Err(e) => { + error!("received bad utf-8: {:?}", e); + counter!("drain_recv_bad_utf8").increment(1); + return Response::builder() + .status(StatusCode::NOT_ACCEPTABLE) + .body(Body::empty()) + .expect("Defined Responses to be infalliable."); + } + }; + let mut sig_bytes = [0u8; 20]; + hex::decode_to_slice(signature, &mut sig_bytes).unwrap(); + match hmac::verify(&state.vercel_secret, body_string.as_bytes(), &sig_bytes) { + Ok(_) => {} + Err(e) => { + error!("failed verifying signature: {:?}", e); + counter!("drain_failed_verify_signature").increment(1); + return Response::builder() + .status(StatusCode::UNPROCESSABLE_ENTITY) + .header("x-vercel-verify", state.vercel_verify) + .body(Body::empty()) + .expect("Defined Responses to be infalliable."); + } + } + match serde_json::from_str::(&body_string) { + Ok(payload) => { + debug!("parsed payload, OK"); + for message in payload.0 { + match state.log_queue.send(message) { + Ok(_) => {} + Err(e) => { + error!("failed to queue log message to be sent to outputs: {:?}", e); + } + } + } + } + Err(e) => { + error!(payload = ?body_string, "failed parsing: {:?}", e); + } + } + return Response::builder() + .status(StatusCode::OK) + .header("x-vercel-verify", state.vercel_verify) + .body(Body::empty()) + .expect("Defined Responses to be infalliable."); +} diff --git a/src/main.rs b/src/main.rs index 46416c6..fcf72f6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,23 +1,17 @@ mod controller; mod drivers; +mod handlers; mod types; use crate::drivers::{CloudWatchDriver, LokiDriver}; use crate::types::LogDriver; -use axum::{ - body::{Body, Bytes}, - extract::State, - http::{header::HeaderMap, Response, StatusCode}, - response::IntoResponse, - routing::get, -}; -use axum_prometheus::metrics::counter; +use axum::routing::get; use axum_prometheus::PrometheusMetricLayerBuilder; use clap::Parser; use ring::hmac; use tokio::signal::{unix, unix::SignalKind}; use tokio::sync::mpsc; -use tracing::{debug, error, info, warn, Level}; +use tracing::{debug, info, Level}; #[derive(Debug, Parser)] #[command(author, version, about, long_about = None)] @@ -52,13 +46,6 @@ struct Args { loki_basic_auth_pass: String, } -#[derive(Debug, Clone)] -struct AppState { - vercel_verify: String, - vercel_secret: hmac::Key, - log_queue: mpsc::UnboundedSender, -} - #[tokio::main] async fn main() -> anyhow::Result<()> { let args = Args::parse(); @@ -94,7 +81,7 @@ async fn main() -> anyhow::Result<()> { tokio::spawn(async move { controller.run().await; }); - let state = AppState { + let state = types::AppState { vercel_verify: args.vercel_verify, vercel_secret: hmac::Key::new( hmac::HMAC_SHA1_FOR_LEGACY_USE_ONLY, @@ -106,11 +93,7 @@ async fn main() -> anyhow::Result<()> { let listen_address = format!("{}:{}", args.ip, args.port); let listener = tokio::net::TcpListener::bind(listen_address.clone()).await?; - let mut app = axum::Router::new() - .route("/", axum::routing::post(root)) - .route("/health", axum::routing::get(health_check)) - .route("/vercel", axum::routing::post(ingest)) - .with_state(state); + let mut app = create_app(state); if args.enable_metrics { let (prometheus_layer, metric_handle) = PrometheusMetricLayerBuilder::new() @@ -156,79 +139,10 @@ async fn shutdown_for_signals() { } } -async fn root() -> impl IntoResponse { - Response::builder() - .status(StatusCode::OK) - .body(Body::empty()) - .unwrap() -} - -async fn ingest( - State(state): State, - headers: HeaderMap, - body: Bytes, -) -> impl IntoResponse { - debug!("received payload"); - - let signature = match headers.get("x-vercel-signature") { - Some(signature) => signature.to_str().unwrap(), - None => { - warn!("received payload without signature"); - counter!("drain_recv_invalid_signature").increment(1); - return Response::builder() - .status(StatusCode::BAD_REQUEST) - .body(Body::empty()) - .expect("Defined Responses to be infalliable."); - } - }; - let body_string = match String::from_utf8(body.to_vec()) { - Ok(body_string) => body_string, - Err(e) => { - error!("received bad utf-8: {:?}", e); - counter!("drain_recv_bad_utf8").increment(1); - return Response::builder() - .status(StatusCode::NOT_ACCEPTABLE) - .body(Body::empty()) - .expect("Defined Responses to be infalliable."); - } - }; - let mut sig_bytes = [0u8; 20]; - hex::decode_to_slice(signature, &mut sig_bytes).unwrap(); - match hmac::verify(&state.vercel_secret, body_string.as_bytes(), &sig_bytes) { - Ok(_) => {} - Err(e) => { - error!("failed verifying signature: {:?}", e); - counter!("drain_failed_verify_signature").increment(1); - return Response::builder() - .status(StatusCode::UNPROCESSABLE_ENTITY) - .header("x-vercel-verify", state.vercel_verify) - .body(Body::empty()) - .expect("Defined Responses to be infalliable."); - } - } - match serde_json::from_str::(&body_string) { - Ok(payload) => { - debug!("parsed payload, OK"); - for message in payload.0 { - match state.log_queue.send(message) { - Ok(_) => {} - Err(e) => { - error!("failed to queue log message to be sent to outputs: {:?}", e); - } - } - } - } - Err(e) => { - error!(payload = ?body_string, "failed parsing: {:?}", e); - } - } - return Response::builder() - .status(StatusCode::OK) - .header("x-vercel-verify", state.vercel_verify) - .body(Body::empty()) - .expect("Defined Responses to be infalliable."); -} - -async fn health_check() -> impl IntoResponse { - return StatusCode::OK; +fn create_app(state: types::AppState) -> axum::Router { + return axum::Router::new() + .route("/", axum::routing::post(handlers::root)) + .route("/health", axum::routing::get(handlers::health_check)) + .route("/vercel", axum::routing::post(handlers::ingest)) + .with_state(state); } diff --git a/src/types.rs b/src/types.rs index 1a6bb29..c70143f 100644 --- a/src/types.rs +++ b/src/types.rs @@ -3,6 +3,13 @@ use async_trait::async_trait; use serde::{Deserialize, Deserializer, Serialize}; use std::str::FromStr; +#[derive(Debug, Clone)] +pub struct AppState { + pub vercel_verify: String, + pub vercel_secret: ring::hmac::Key, + pub log_queue: tokio::sync::mpsc::UnboundedSender, +} + #[derive(Deserialize, Debug)] pub struct VercelPayload(pub Vec); From 5dca6c407f13406b856c2fae217eb50ab1c49941 Mon Sep 17 00:00:00 2001 From: Daniel Barnes Date: Thu, 29 Aug 2024 20:58:30 +0900 Subject: [PATCH 3/4] testing refactor --- Cargo.lock | 19 +++++-- Cargo.toml | 1 + src/app.rs | 158 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 11 +--- 4 files changed, 176 insertions(+), 13 deletions(-) create mode 100644 src/app.rs diff --git a/Cargo.lock b/Cargo.lock index 1845f71..9ef42b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -457,7 +457,7 @@ dependencies = [ "serde_urlencoded", "sync_wrapper 1.0.1", "tokio", - "tower", + "tower 0.4.13", "tower-layer", "tower-service", "tracing", @@ -501,7 +501,7 @@ dependencies = [ "mime", "pin-project-lite", "serde", - "tower", + "tower 0.4.13", "tower-layer", "tower-service", "tracing", @@ -524,7 +524,7 @@ dependencies = [ "once_cell", "pin-project", "tokio", - "tower", + "tower 0.4.13", "tower-http", ] @@ -1107,7 +1107,7 @@ dependencies = [ "pin-project-lite", "socket2", "tokio", - "tower", + "tower 0.4.13", "tower-service", "tracing", ] @@ -2123,6 +2123,16 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36b837f86b25d7c0d7988f00a54e74739be6477f2aac6201b8f429a7569991b7" +dependencies = [ + "tower-layer", + "tower-service", +] + [[package]] name = "tower-http" version = "0.5.2" @@ -2314,6 +2324,7 @@ dependencies = [ "serde", "serde_json", "tokio", + "tower 0.5.0", "tracing", "tracing-subscriber", ] diff --git a/Cargo.toml b/Cargo.toml index 2a7579e..d79db76 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,5 +22,6 @@ ring = "0.17.7" serde = { version = "1.0.196", features = ["derive"] } serde_json = "1.0.112" tokio = { version = "1.35.1", features = ["full"] } +tower = "0.5.0" tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["json"] } diff --git a/src/app.rs b/src/app.rs new file mode 100644 index 0000000..65b3806 --- /dev/null +++ b/src/app.rs @@ -0,0 +1,158 @@ +use crate::{handlers, types}; + +pub fn create_app(state: types::AppState) -> axum::Router { + return axum::Router::new() + .route("/", axum::routing::post(handlers::root)) + .route("/health", axum::routing::get(handlers::health_check)) + .route("/vercel", axum::routing::post(handlers::ingest)) + .with_state(state); +} + +#[cfg(test)] +mod tests { + use super::*; + use anyhow::Result; + use axum::{ + body::Body, + http::{Request, StatusCode}, + }; + use tower::Service; + + #[tokio::test] + async fn health_check() -> Result<()> { + let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::(); + let state = types::AppState { + vercel_verify: String::from(""), + vercel_secret: ring::hmac::Key::new( + ring::hmac::HMAC_SHA1_FOR_LEGACY_USE_ONLY, + "".as_bytes(), + ), + log_queue: tx, + }; + let mut app = create_app(state); + + let request = Request::builder() + .uri("/health") + .body(Body::empty()) + .unwrap(); + let response = app.as_service().call(request).await?; + + assert_eq!(response.status(), StatusCode::OK); + return Ok(()); + } + #[tokio::test] + async fn root_check() -> Result<()> { + let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::(); + let state = types::AppState { + vercel_verify: String::from(""), + vercel_secret: ring::hmac::Key::new( + ring::hmac::HMAC_SHA1_FOR_LEGACY_USE_ONLY, + "".as_bytes(), + ), + log_queue: tx, + }; + let mut app = create_app(state); + + let request = Request::builder() + .method("POST") + .uri("/") + .body(Body::empty()) + .unwrap(); + let response = app.as_service().call(request).await?; + + assert_eq!(response.status(), StatusCode::OK); + return Ok(()); + } + #[tokio::test] + async fn ingest_check_samples() -> Result<()> { + let test_data = vec![ + include_str!("fixtures/sample_1.json"), + include_str!("fixtures/sample_2.json"), + include_str!("fixtures/sample_3.json"), + include_str!("fixtures/sample_4.json"), + include_str!("fixtures/sample_5.json"), + ]; + + let (tx, rx) = tokio::sync::mpsc::unbounded_channel::(); + let key = ring::hmac::Key::new( + ring::hmac::HMAC_SHA1_FOR_LEGACY_USE_ONLY, + "deadbeef1234dacb4321".as_bytes(), + ); + + let state = types::AppState { + vercel_verify: String::from("test"), + vercel_secret: key.clone(), + log_queue: tx, + }; + let mut app = create_app(state); + let mut app_service = app.as_service(); + + for data in test_data { + let sig = ring::hmac::sign(&key, data.as_bytes()); + let request = Request::builder() + .method("POST") + .header("x-vercel-signature", hex::encode(sig.as_ref())) + .uri("/vercel") + .body(Body::from(data)) + .unwrap(); + let response = app_service.call(request).await?; + assert_eq!( + response + .headers() + .get("x-vercel-verify") + .unwrap() + .to_str() + .unwrap(), + "test" + ); + assert_eq!(response.status(), StatusCode::OK); + } + assert_eq!(rx.len(), 8); + return Ok(()); + } + #[tokio::test] + async fn ingest_check_structured_messages() -> Result<()> { + let test_data = vec![ + include_str!("fixtures/structured_message_1.json"), + include_str!("fixtures/structured_message_2.json"), + include_str!("fixtures/sample_1.json"), + ]; + + let (tx, rx) = tokio::sync::mpsc::unbounded_channel::(); + let key = ring::hmac::Key::new( + ring::hmac::HMAC_SHA1_FOR_LEGACY_USE_ONLY, + "deadbeef1234dacb4321".as_bytes(), + ); + + let state = types::AppState { + vercel_verify: String::from("test"), + vercel_secret: key.clone(), + log_queue: tx, + }; + let mut app = create_app(state); + let mut app_service = app.as_service(); + + for data in test_data { + let sig = ring::hmac::sign(&key, data.as_bytes()); + let request = Request::builder() + .method("POST") + .header("x-vercel-signature", hex::encode(sig.as_ref())) + .uri("/vercel") + .body(Body::from(data)) + .unwrap(); + let response = app_service.call(request).await?; + assert_eq!( + response + .headers() + .get("x-vercel-verify") + .unwrap() + .to_str() + .unwrap(), + "test" + ); + assert_eq!(response.status(), StatusCode::OK); + } + assert_eq!(rx.len(), 3); + return Ok(()); + } +} diff --git a/src/main.rs b/src/main.rs index fcf72f6..3c799d3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +mod app; mod controller; mod drivers; mod handlers; @@ -93,7 +94,7 @@ async fn main() -> anyhow::Result<()> { let listen_address = format!("{}:{}", args.ip, args.port); let listener = tokio::net::TcpListener::bind(listen_address.clone()).await?; - let mut app = create_app(state); + let mut app = app::create_app(state); if args.enable_metrics { let (prometheus_layer, metric_handle) = PrometheusMetricLayerBuilder::new() @@ -138,11 +139,3 @@ async fn shutdown_for_signals() { } => {} } } - -fn create_app(state: types::AppState) -> axum::Router { - return axum::Router::new() - .route("/", axum::routing::post(handlers::root)) - .route("/health", axum::routing::get(handlers::health_check)) - .route("/vercel", axum::routing::post(handlers::ingest)) - .with_state(state); -} From fb25a3f58d6c4a15f2bfda99e0d8fdf7413d4f4b Mon Sep 17 00:00:00 2001 From: Daniel Barnes Date: Thu, 29 Aug 2024 21:03:17 +0900 Subject: [PATCH 4/4] add test workflow --- .github/workflows/test.yml | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 .github/workflows/test.yml diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..b053dbb --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,24 @@ +name: Test and Lint +on: + pull_request: + branches: + - main + +env: + CARGO_TERM_COLOR: always + +jobs: + lint: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v3 + - name: Format + run: cargo fmt && git diff --exit-code + test: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v3 + - name: test + run: cargo test