Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved responses and testing refactor. #9

Merged
merged 4 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: Test and Lint
on:
pull_request:
branches:
- main

env:
CARGO_TERM_COLOR: always

jobs:
lint:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Format
run: cargo fmt && git diff --exit-code
test:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
- name: test
run: cargo test
19 changes: 15 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ ring = "0.17.7"
serde = { version = "1.0.196", features = ["derive"] }
serde_json = "1.0.112"
tokio = { version = "1.35.1", features = ["full"] }
tower = "0.5.0"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["json"] }
158 changes: 158 additions & 0 deletions src/app.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
use crate::{handlers, types};

pub fn create_app(state: types::AppState) -> axum::Router {
return axum::Router::new()
.route("/", axum::routing::post(handlers::root))
.route("/health", axum::routing::get(handlers::health_check))
.route("/vercel", axum::routing::post(handlers::ingest))
.with_state(state);
}

#[cfg(test)]
mod tests {
use super::*;
use anyhow::Result;
use axum::{
body::Body,
http::{Request, StatusCode},
};
use tower::Service;

#[tokio::test]
async fn health_check() -> Result<()> {
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::<types::Message>();
let state = types::AppState {
vercel_verify: String::from(""),
vercel_secret: ring::hmac::Key::new(
ring::hmac::HMAC_SHA1_FOR_LEGACY_USE_ONLY,
"".as_bytes(),
),
log_queue: tx,
};
let mut app = create_app(state);

let request = Request::builder()
.uri("/health")
.body(Body::empty())
.unwrap();
let response = app.as_service().call(request).await?;

assert_eq!(response.status(), StatusCode::OK);
return Ok(());
}
#[tokio::test]
async fn root_check() -> Result<()> {
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::<types::Message>();
let state = types::AppState {
vercel_verify: String::from(""),
vercel_secret: ring::hmac::Key::new(
ring::hmac::HMAC_SHA1_FOR_LEGACY_USE_ONLY,
"".as_bytes(),
),
log_queue: tx,
};
let mut app = create_app(state);

let request = Request::builder()
.method("POST")
.uri("/")
.body(Body::empty())
.unwrap();
let response = app.as_service().call(request).await?;

assert_eq!(response.status(), StatusCode::OK);
return Ok(());
}
#[tokio::test]
async fn ingest_check_samples() -> Result<()> {
let test_data = vec![
include_str!("fixtures/sample_1.json"),
include_str!("fixtures/sample_2.json"),
include_str!("fixtures/sample_3.json"),
include_str!("fixtures/sample_4.json"),
include_str!("fixtures/sample_5.json"),
];

let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<types::Message>();
let key = ring::hmac::Key::new(
ring::hmac::HMAC_SHA1_FOR_LEGACY_USE_ONLY,
"deadbeef1234dacb4321".as_bytes(),
);

let state = types::AppState {
vercel_verify: String::from("test"),
vercel_secret: key.clone(),
log_queue: tx,
};
let mut app = create_app(state);
let mut app_service = app.as_service();

for data in test_data {
let sig = ring::hmac::sign(&key, data.as_bytes());
let request = Request::builder()
.method("POST")
.header("x-vercel-signature", hex::encode(sig.as_ref()))
.uri("/vercel")
.body(Body::from(data))
.unwrap();
let response = app_service.call(request).await?;
assert_eq!(
response
.headers()
.get("x-vercel-verify")
.unwrap()
.to_str()
.unwrap(),
"test"
);
assert_eq!(response.status(), StatusCode::OK);
}
assert_eq!(rx.len(), 8);
return Ok(());
}
#[tokio::test]
async fn ingest_check_structured_messages() -> Result<()> {
let test_data = vec![
include_str!("fixtures/structured_message_1.json"),
include_str!("fixtures/structured_message_2.json"),
include_str!("fixtures/sample_1.json"),
];

let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<types::Message>();
let key = ring::hmac::Key::new(
ring::hmac::HMAC_SHA1_FOR_LEGACY_USE_ONLY,
"deadbeef1234dacb4321".as_bytes(),
);

let state = types::AppState {
vercel_verify: String::from("test"),
vercel_secret: key.clone(),
log_queue: tx,
};
let mut app = create_app(state);
let mut app_service = app.as_service();

for data in test_data {
let sig = ring::hmac::sign(&key, data.as_bytes());
let request = Request::builder()
.method("POST")
.header("x-vercel-signature", hex::encode(sig.as_ref()))
.uri("/vercel")
.body(Body::from(data))
.unwrap();
let response = app_service.call(request).await?;
assert_eq!(
response
.headers()
.get("x-vercel-verify")
.unwrap()
.to_str()
.unwrap(),
"test"
);
assert_eq!(response.status(), StatusCode::OK);
}
assert_eq!(rx.len(), 3);
return Ok(());
}
}
87 changes: 87 additions & 0 deletions src/handlers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use crate::types;
use axum::{
body::{Body, Bytes},
extract::State,
http::{header::HeaderMap, Response, StatusCode},
response::IntoResponse,
};
use axum_prometheus::metrics::counter;
use ring::hmac;
use tracing::{debug, error, warn};

pub async fn root() -> impl IntoResponse {
Response::builder()
.status(StatusCode::OK)
.body(Body::empty())
.unwrap()
}

pub async fn health_check() -> impl IntoResponse {
return StatusCode::OK;
}

pub async fn ingest(
State(state): State<types::AppState>,
headers: HeaderMap,
body: Bytes,
) -> impl IntoResponse {
debug!("received payload");

let signature = match headers.get("x-vercel-signature") {
Some(signature) => signature.to_str().unwrap(),
None => {
warn!("received payload without signature");
counter!("drain_recv_invalid_signature").increment(1);
return Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::empty())
.expect("Defined Responses to be infalliable.");
}
};
let body_string = match String::from_utf8(body.to_vec()) {
Ok(body_string) => body_string,
Err(e) => {
error!("received bad utf-8: {:?}", e);
counter!("drain_recv_bad_utf8").increment(1);
return Response::builder()
.status(StatusCode::NOT_ACCEPTABLE)
.body(Body::empty())
.expect("Defined Responses to be infalliable.");
}
};
let mut sig_bytes = [0u8; 20];
hex::decode_to_slice(signature, &mut sig_bytes).unwrap();
match hmac::verify(&state.vercel_secret, body_string.as_bytes(), &sig_bytes) {
Ok(_) => {}
Err(e) => {
error!("failed verifying signature: {:?}", e);
counter!("drain_failed_verify_signature").increment(1);
return Response::builder()
.status(StatusCode::UNPROCESSABLE_ENTITY)
.header("x-vercel-verify", state.vercel_verify)
.body(Body::empty())
.expect("Defined Responses to be infalliable.");
}
}
match serde_json::from_str::<types::VercelPayload>(&body_string) {
Ok(payload) => {
debug!("parsed payload, OK");
for message in payload.0 {
match state.log_queue.send(message) {
Ok(_) => {}
Err(e) => {
error!("failed to queue log message to be sent to outputs: {:?}", e);
}
}
}
}
Err(e) => {
error!(payload = ?body_string, "failed parsing: {:?}", e);
}
}
return Response::builder()
.status(StatusCode::OK)
.header("x-vercel-verify", state.vercel_verify)
.body(Body::empty())
.expect("Defined Responses to be infalliable.");
}
Loading
Loading