Skip to content

Commit

Permalink
Improved responses and testing refactor. (#9)
Browse files Browse the repository at this point in the history
* more response statuses

* refactor to seperate files

* testing refactor

* add test workflow
  • Loading branch information
dacbd authored Aug 29, 2024
1 parent fafe68b commit 77629c8
Show file tree
Hide file tree
Showing 7 changed files with 299 additions and 96 deletions.
24 changes: 24 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: Test and Lint
on:
pull_request:
branches:
- main

env:
CARGO_TERM_COLOR: always

jobs:
lint:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Format
run: cargo fmt && git diff --exit-code
test:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
- name: test
run: cargo test
19 changes: 15 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ ring = "0.17.7"
serde = { version = "1.0.196", features = ["derive"] }
serde_json = "1.0.112"
tokio = { version = "1.35.1", features = ["full"] }
tower = "0.5.0"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["json"] }
158 changes: 158 additions & 0 deletions src/app.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
use crate::{handlers, types};

pub fn create_app(state: types::AppState) -> axum::Router {
return axum::Router::new()
.route("/", axum::routing::post(handlers::root))
.route("/health", axum::routing::get(handlers::health_check))
.route("/vercel", axum::routing::post(handlers::ingest))
.with_state(state);
}

#[cfg(test)]
mod tests {
use super::*;
use anyhow::Result;
use axum::{
body::Body,
http::{Request, StatusCode},
};
use tower::Service;

#[tokio::test]
async fn health_check() -> Result<()> {
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::<types::Message>();
let state = types::AppState {
vercel_verify: String::from(""),
vercel_secret: ring::hmac::Key::new(
ring::hmac::HMAC_SHA1_FOR_LEGACY_USE_ONLY,
"".as_bytes(),
),
log_queue: tx,
};
let mut app = create_app(state);

let request = Request::builder()
.uri("/health")
.body(Body::empty())
.unwrap();
let response = app.as_service().call(request).await?;

assert_eq!(response.status(), StatusCode::OK);
return Ok(());
}
#[tokio::test]
async fn root_check() -> Result<()> {
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::<types::Message>();
let state = types::AppState {
vercel_verify: String::from(""),
vercel_secret: ring::hmac::Key::new(
ring::hmac::HMAC_SHA1_FOR_LEGACY_USE_ONLY,
"".as_bytes(),
),
log_queue: tx,
};
let mut app = create_app(state);

let request = Request::builder()
.method("POST")
.uri("/")
.body(Body::empty())
.unwrap();
let response = app.as_service().call(request).await?;

assert_eq!(response.status(), StatusCode::OK);
return Ok(());
}
#[tokio::test]
async fn ingest_check_samples() -> Result<()> {
let test_data = vec![
include_str!("fixtures/sample_1.json"),
include_str!("fixtures/sample_2.json"),
include_str!("fixtures/sample_3.json"),
include_str!("fixtures/sample_4.json"),
include_str!("fixtures/sample_5.json"),
];

let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<types::Message>();
let key = ring::hmac::Key::new(
ring::hmac::HMAC_SHA1_FOR_LEGACY_USE_ONLY,
"deadbeef1234dacb4321".as_bytes(),
);

let state = types::AppState {
vercel_verify: String::from("test"),
vercel_secret: key.clone(),
log_queue: tx,
};
let mut app = create_app(state);
let mut app_service = app.as_service();

for data in test_data {
let sig = ring::hmac::sign(&key, data.as_bytes());
let request = Request::builder()
.method("POST")
.header("x-vercel-signature", hex::encode(sig.as_ref()))
.uri("/vercel")
.body(Body::from(data))
.unwrap();
let response = app_service.call(request).await?;
assert_eq!(
response
.headers()
.get("x-vercel-verify")
.unwrap()
.to_str()
.unwrap(),
"test"
);
assert_eq!(response.status(), StatusCode::OK);
}
assert_eq!(rx.len(), 8);
return Ok(());
}
#[tokio::test]
async fn ingest_check_structured_messages() -> Result<()> {
let test_data = vec![
include_str!("fixtures/structured_message_1.json"),
include_str!("fixtures/structured_message_2.json"),
include_str!("fixtures/sample_1.json"),
];

let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<types::Message>();
let key = ring::hmac::Key::new(
ring::hmac::HMAC_SHA1_FOR_LEGACY_USE_ONLY,
"deadbeef1234dacb4321".as_bytes(),
);

let state = types::AppState {
vercel_verify: String::from("test"),
vercel_secret: key.clone(),
log_queue: tx,
};
let mut app = create_app(state);
let mut app_service = app.as_service();

for data in test_data {
let sig = ring::hmac::sign(&key, data.as_bytes());
let request = Request::builder()
.method("POST")
.header("x-vercel-signature", hex::encode(sig.as_ref()))
.uri("/vercel")
.body(Body::from(data))
.unwrap();
let response = app_service.call(request).await?;
assert_eq!(
response
.headers()
.get("x-vercel-verify")
.unwrap()
.to_str()
.unwrap(),
"test"
);
assert_eq!(response.status(), StatusCode::OK);
}
assert_eq!(rx.len(), 3);
return Ok(());
}
}
87 changes: 87 additions & 0 deletions src/handlers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use crate::types;
use axum::{
body::{Body, Bytes},
extract::State,
http::{header::HeaderMap, Response, StatusCode},
response::IntoResponse,
};
use axum_prometheus::metrics::counter;
use ring::hmac;
use tracing::{debug, error, warn};

pub async fn root() -> impl IntoResponse {
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}

pub async fn health_check() -> impl IntoResponse {
return StatusCode::OK;
}

pub async fn ingest(
State(state): State<types::AppState>,
headers: HeaderMap,
body: Bytes,
) -> impl IntoResponse {
debug!("received payload");

let signature = match headers.get("x-vercel-signature") {
Some(signature) => signature.to_str().unwrap(),
None => {
warn!("received payload without signature");
counter!("drain_recv_invalid_signature").increment(1);
return Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::empty())
.expect("Defined Responses to be infalliable.");
}
};
let body_string = match String::from_utf8(body.to_vec()) {
Ok(body_string) => body_string,
Err(e) => {
error!("received bad utf-8: {:?}", e);
counter!("drain_recv_bad_utf8").increment(1);
return Response::builder()
.status(StatusCode::NOT_ACCEPTABLE)
.body(Body::empty())
.expect("Defined Responses to be infalliable.");
}
};
let mut sig_bytes = [0u8; 20];
hex::decode_to_slice(signature, &mut sig_bytes).unwrap();
match hmac::verify(&state.vercel_secret, body_string.as_bytes(), &sig_bytes) {
Ok(_) => {}
Err(e) => {
error!("failed verifying signature: {:?}", e);
counter!("drain_failed_verify_signature").increment(1);
return Response::builder()
.status(StatusCode::UNPROCESSABLE_ENTITY)
.header("x-vercel-verify", state.vercel_verify)
.body(Body::empty())
.expect("Defined Responses to be infalliable.");
}
}
match serde_json::from_str::<types::VercelPayload>(&body_string) {
Ok(payload) => {
debug!("parsed payload, OK");
for message in payload.0 {
match state.log_queue.send(message) {
Ok(_) => {}
Err(e) => {
error!("failed to queue log message to be sent to outputs: {:?}", e);
}
}
}
}
Err(e) => {
error!(payload = ?body_string, "failed parsing: {:?}", e);
}
}
return Response::builder()
.status(StatusCode::OK)
.header("x-vercel-verify", state.vercel_verify)
.body(Body::empty())
.expect("Defined Responses to be infalliable.");
}
Loading

0 comments on commit 77629c8

Please sign in to comment.