diff --git a/Cargo.lock b/Cargo.lock index 80dae84..b595d6f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -346,7 +346,7 @@ dependencies = [ "memchr", "num", "regex", - "regex-syntax 0.8.4", + "regex-syntax 0.8.5", ] [[package]] @@ -3002,9 +3002,12 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.19.0" +version = "1.20.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +checksum = "82881c4be219ab5faaf2ad5e5e5ecdff8c66bd7402ca3160975c93b24961afd1" +dependencies = [ + "portable-atomic", +] [[package]] name = "opcua" @@ -3789,6 +3792,12 @@ dependencies = [ "version_check", ] +[[package]] +name = "portable-atomic" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc9c68a3f6da06753e9335d63e27f6b9754dd1920d941135b7ea8224f141adb2" + [[package]] name = "powerfmt" version = "0.2.0" @@ -4197,14 +4206,14 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.6" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4219d74c6b67a3654a9fbebc4b419e22126d13d2f3c4a07ee0cb61ff79a79619" +checksum = "38200e5ee88914975b69f657f0801b6f6dccafd44fd9326302a4aaeecfacb1d8" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.7", - "regex-syntax 0.8.4", + "regex-automata 0.4.8", + "regex-syntax 0.8.5", ] [[package]] @@ -4218,13 +4227,13 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.7" +version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38caf58cc5ef2fed281f89292ef23f6365465ed9a41b7a7754eb4e26496c92df" +checksum = "368758f23274712b504848e9d5a6f010445cc8b87a7cdb4d7cbee666c1288da3" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.8.4", + "regex-syntax 0.8.5", ] [[package]] @@ -4235,9 +4244,9 @@ checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" [[package]] name = "regex-syntax" -version = "0.8.4" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" +checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" [[package]] name = "rend" @@ -4645,6 +4654,7 @@ version = "0.2.0" dependencies = [ "anyhow", "async-broadcast", + "async-stream", "async-trait", "axum", "base64 0.22.1", @@ -4670,6 +4680,7 @@ dependencies = [ "hybridmap", "influxdb-line-protocol", "iso8601", + "itertools 0.13.0", "nom", "num-traits", "once_cell", diff --git a/Cargo.toml b/Cargo.toml index 47870f7..c44167c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,11 +61,11 @@ prost = "0.13" snap = "1.1" hex = "0.4" blake3 = "1.5" -regex = "1.10" +regex = "1.11" influxdb-line-protocol = "2.0" flate2 = "1.0" smallvec = "1.13.2" -once_cell = "1.19" +once_cell = "1.20" urlencoding = "2.1" hybridmap = "0.1" clap = { version = "4.5", features = ["derive"] } @@ -103,3 +103,5 @@ time = "0.3" zstd = "0.13" prometheus-parser = "0.4" rusty-chunkenc = "0.1.1" +async-stream = "0.3" +itertools = "0.13" diff --git a/src/ingestors/http/app_error.rs b/src/ingestors/http/app_error.rs index c220495..bbd251d 100644 --- a/src/ingestors/http/app_error.rs +++ b/src/ingestors/http/app_error.rs @@ -3,6 +3,7 @@ use axum::response::IntoResponse; use axum::response::Response; use axum::Json; use serde_json::json; +use std::fmt; // Anyhow error handling with axum // https://github.com/tokio-rs/axum/blob/d3112a40d55f123bc5e65f995e2068e245f12055/examples/anyhow-error-response/src/main.rs @@ -38,3 +39,13 @@ where Self::InternalServerError(err.into()) } } + +impl fmt::Display for AppError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + AppError::InternalServerError(error) => write!(f, "Internal Server Error: {}", error), + AppError::NotFound(error) => write!(f, "Not Found: {}", error), + AppError::BadRequest(error) => write!(f, "Bad Request: {}", error), + } + } +} diff --git a/src/ingestors/http/prometheus.rs b/src/ingestors/http/prometheus.rs index 02488b2..289420a 100644 --- a/src/ingestors/http/prometheus.rs +++ b/src/ingestors/http/prometheus.rs @@ -1,17 +1,17 @@ -use std::io::Write; +use std::sync::Arc; use super::{app_error::AppError, state::HttpServerState}; use crate::{ datamodel::{ - batch_builder::BatchBuilder, sensapp_datetime::SensAppDateTimeExt, SensAppDateTime, + batch::SingleSensorBatch, batch_builder::BatchBuilder, + sensapp_datetime::SensAppDateTimeExt, SensAppDateTime, TypedSamples, }, parsing::{ prometheus::{ - remote_read_request_models::ResponseType, + remote_read_request_models::{Query, ResponseType}, remote_read_request_parser::parse_remote_read_request, remote_read_response::{ - chunk, Chunk, ChunkedReadResponse, ChunkedSeries, Label, QueryResult, ReadResponse, - Sample, TimeSeries, + ChunkedReadResponse, ChunkedSeries, QueryResult, ReadResponse, TimeSeries, }, PrometheusParser, }, @@ -19,13 +19,16 @@ use crate::{ }, }; use anyhow::Result; +use async_stream::stream; use axum::{ + body::Body, debug_handler, extract::State, http::{HeaderMap, HeaderValue, StatusCode}, }; +use futures::stream::StreamExt; +use futures::Stream; use prost::Message; -use rusty_chunkenc::{crc32c::write_crc32c, uvarint::write_uvarint, xor}; use tokio_util::bytes::Bytes; #[derive(Debug, PartialEq)] @@ -187,7 +190,7 @@ pub async fn prometheus_remote_read( State(state): State, headers: HeaderMap, bytes: Bytes, -) -> Result<(StatusCode, HeaderMap, Bytes), AppError> { +) -> Result<(StatusCode, HeaderMap, Body), AppError> { println!("Prometheus Remote Write API"); println!("bytes: {:?}", bytes); println!("headers: {:?}", headers); @@ -203,123 +206,126 @@ pub async fn prometheus_remote_read( *accepted_response_type == ResponseType::StreamedXorChunks as i32 }); - println!("xor_response_type: {:?}", xor_response_type); + let stream = prometheus_read_stream(read_request.queries); - // Write status header - // content-type: application/x-protobuf - - let mut headers = HeaderMap::new(); if xor_response_type { - headers.insert( - "Content-Type", - HeaderValue::from_static( - "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse", - ), - ); - headers.insert("Content-Encoding", HeaderValue::from_static("")); + prometheus_read_xor(Box::pin(stream)) } else { - headers.insert( - "Content-Type", - HeaderValue::from_static("application/x-protobuf"), - ); - headers.insert("Content-Encoding", HeaderValue::from_static("snappy")); + prometheus_read_protobuf(Box::pin(stream)).await } +} - for (i, query) in read_request.queries.iter().enumerate() { - println!("query matcher: {:?}", query.to_sensor_matcher()); - let start = query.start_timestamp_ms; - let end = query.end_timestamp_ms; - - // create 100 samples - let n_samples = 10_000_i64; - let mut samples = Vec::with_capacity(n_samples as usize); - let step = (end - start) / n_samples; - for j in 0..n_samples { - let timestamp = start + step * j; - let value = (i + 1) as f64 * j as f64; - samples.push(crate::datamodel::sample::Sample { - datetime: SensAppDateTime::from_unix_milliseconds_i64(timestamp), - value, - }); - } +fn prometheus_read_stream(queries: Vec) -> impl Stream { + stream! { + for (i, query) in queries.into_iter().enumerate() { + let start = query.start_timestamp_ms; + let end = query.end_timestamp_ms; + + // create 100 samples + let n_samples = 10_000_i64; + let mut samples = Vec::with_capacity(n_samples as usize); + let step = (end - start) / n_samples; + for j in 0..n_samples { + let timestamp = start + step * j; + let value = (i + 1) as f64 * j as f64; + samples.push(crate::datamodel::sample::Sample { + datetime: SensAppDateTime::from_unix_milliseconds_i64(timestamp), + value, + }); + } - if xor_response_type { - let mut chunk_buffer: Vec = Vec::new(); - let chunk = rusty_chunkenc::xor::XORChunk::new( - samples - .into_iter() - .map(|sample| rusty_chunkenc::XORSample { - timestamp: sample.datetime.to_unix_milliseconds().floor() as i64, - value: sample.value, - }) - .collect(), + // create the single sensor batch + let batch = SingleSensorBatch::new( + Arc::new(crate::datamodel::Sensor::new_without_uuid( + "canard".to_string(), + crate::datamodel::SensorType::Float, + None, + None, + ) + .unwrap()), + TypedSamples::Float(samples.into()), ); - chunk.write(&mut chunk_buffer).unwrap(); - - println!("buffer: {}", base64::encode(&chunk_buffer)); - - let chunked_read_response = ChunkedReadResponse { - chunked_series: vec![ChunkedSeries { - labels: vec![Label { - name: "__name__".to_string(), - value: "canard".to_string(), - }], - chunks: vec![Chunk { - min_time_ms: start, - max_time_ms: end, - r#type: chunk::Encoding::Xor as i32, - data: chunk_buffer, - }], - }], - query_index: i as i64, - }; - // convert to bytes - let mut proto_buffer: Vec = Vec::new(); - chunked_read_response.encode(&mut proto_buffer).unwrap(); + yield (i, batch); + } + } +} - let mut buffer: Vec = Vec::new(); - write_uvarint(proto_buffer.len() as u64, &mut buffer).unwrap(); +async fn prometheus_read_protobuf( + mut chunk_stream: S, +) -> Result<(StatusCode, HeaderMap, Body), AppError> +where + S: Stream + Unpin, +{ + let mut headers = HeaderMap::new(); + headers.insert( + "Content-Type", + HeaderValue::from_static("application/x-protobuf"), + ); + headers.insert("Content-Encoding", HeaderValue::from_static("snappy")); - write_crc32c(&proto_buffer, &mut buffer).unwrap(); - buffer.write_all(&proto_buffer).unwrap(); + let mut current_query_index = 0; + let mut current_query_result = QueryResult { + timeseries: Vec::new(), + }; + let mut read_response = ReadResponse { results: vec![] }; - // just to let it go - if buffer.len() > 0 { - return Ok((StatusCode::OK, headers.clone(), buffer.into())); - } - } else { - let read_response = ReadResponse { - results: vec![QueryResult { - timeseries: vec![TimeSeries { - labels: vec![Label { - name: "__name__".to_string(), - value: "canard".to_string(), - }], - samples: samples - .into_iter() - .map(|sample| Sample { - timestamp: sample.datetime.to_unix_milliseconds().floor() as i64, - value: sample.value, - }) - .collect(), - }], - }], + while let Some((query_index, batch)) = chunk_stream.next().await { + if query_index != current_query_index { + read_response.results.push(current_query_result); + current_query_index = query_index; + current_query_result = QueryResult { + timeseries: Vec::new(), }; + } + let timeserie = TimeSeries::from_single_sensor_batch(&batch).await; + current_query_result.timeseries.push(timeserie); + } + read_response.results.push(current_query_result); - // convert to bytes - let mut proto_buffer: Vec = Vec::new(); - read_response.encode(&mut proto_buffer).unwrap(); + // Serialise to protobuf binary + let mut proto_buffer: Vec = Vec::new(); + read_response.encode(&mut proto_buffer).unwrap(); - // snappy it - let mut encoder = snap::raw::Encoder::new(); - let buffer = encoder.compress_vec(&proto_buffer).unwrap(); + // Snappy it + let mut encoder = snap::raw::Encoder::new(); + let buffer = encoder.compress_vec(&proto_buffer).unwrap(); - if buffer.len() > 0 { - return Ok((StatusCode::OK, headers.clone(), buffer.into())); - } - } - } + // It could be possible to have some performance gains by writing the + // buffer directly to the stream instead of a temporary buffer. + // But the XOR stream should be preffered if performance is a concern. + + Ok((StatusCode::OK, headers, buffer.into())) +} + +fn prometheus_read_xor(chunk_stream: S) -> Result<(StatusCode, HeaderMap, Body), AppError> +where + S: Stream + Unpin + Send + 'static, +{ + let body_stream = chunk_stream.then(|(query_index, batch)| async move { + let chunked_serie = ChunkedSeries::from_single_sensor_batch(&batch).await; + + let chunked_read_response = ChunkedReadResponse { + chunked_series: vec![chunked_serie], + query_index: query_index as i64, + }; + + let mut buffer: Vec = Vec::new(); + chunked_read_response.promotheus_stream_encode(&mut buffer)?; + + Ok::, anyhow::Error>(buffer) + }); + + let mut headers = HeaderMap::new(); + headers.insert( + "Content-Type", + HeaderValue::from_static( + "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse", + ), + ); + headers.insert("Content-Encoding", HeaderValue::from_static("")); + + let body = Body::from_stream(body_stream); - Ok((StatusCode::NO_CONTENT, headers, Bytes::new())) + Ok((StatusCode::OK, headers, body)) } diff --git a/src/parsing/prometheus/remote_read_response.rs b/src/parsing/prometheus/remote_read_response.rs index 0261aa7..c143f67 100644 --- a/src/parsing/prometheus/remote_read_response.rs +++ b/src/parsing/prometheus/remote_read_response.rs @@ -1,4 +1,11 @@ +use std::io::Write; + +use anyhow::Result; +use itertools::Itertools; +use rusty_chunkenc::{crc32c::write_crc32c, uvarint::write_uvarint, xor::XORChunk, XORSample}; + // Note that this file is not automatically generated from the .proto file. +use crate::datamodel::{batch::SingleSensorBatch, Sensor, TypedSamples}; #[derive(prost::Message)] pub struct ReadResponse { @@ -74,3 +81,271 @@ pub mod chunk { FloatHistogram = 3, } } + +impl From<&Sensor> for Vec