Skip to content

Commit

Permalink
feat: 🌈 Prometheus read API work in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
fungiboletus committed Sep 30, 2024
1 parent fca825b commit 0cb653c
Show file tree
Hide file tree
Showing 11 changed files with 594 additions and 28 deletions.
30 changes: 30 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,4 @@ rot13 = "0.1"
time = "0.3"
zstd = "0.13"
prometheus-parser = "0.4"
rusty-chunkenc = "0.1.1"
11 changes: 11 additions & 0 deletions src/datamodel/matchers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ pub struct LabelMatcher {
}

impl LabelMatcher {
pub fn new(name: String, matcher: StringMatcher) -> Self {
Self { name, matcher }
}

fn from_prometheus_label(label: PrometheusLabel) -> Self {
let name = label.key;
let matcher = StringMatcher::from_prometheus_label_op(label.op, label.value);
Expand Down Expand Up @@ -82,6 +86,13 @@ pub struct SensorMatcher {
}

impl SensorMatcher {
pub fn new(name_matcher: StringMatcher, label_matchers: Option<Vec<LabelMatcher>>) -> Self {
Self {
name_matcher,
label_matchers,
}
}

/// Create a new sensor matcher from a prometheus query.
///
/// PLease note that this is a subset of the prometheus query language.
Expand Down
221 changes: 198 additions & 23 deletions src/ingestors/http/prometheus.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,40 @@
use std::io::Write;

use super::{app_error::AppError, state::HttpServerState};
use crate::{
datamodel::batch_builder::BatchBuilder,
parsing::{prometheus::PrometheusParser, ParseData},
datamodel::{
batch_builder::BatchBuilder, sensapp_datetime::SensAppDateTimeExt, SensAppDateTime,
},
parsing::{
prometheus::{
remote_read_request_models::ResponseType,
remote_read_request_parser::parse_remote_read_request,
remote_read_response::{
chunk, Chunk, ChunkedReadResponse, ChunkedSeries, Label, QueryResult, ReadResponse,
Sample, TimeSeries,
},
PrometheusParser,
},
ParseData,
},
};
use anyhow::Result;
use axum::{
debug_handler,
extract::State,
http::{HeaderMap, StatusCode},
http::{HeaderMap, HeaderValue, StatusCode},
};
use prost::Message;
use rusty_chunkenc::{crc32c::write_crc32c, uvarint::write_uvarint, xor};
use tokio_util::bytes::Bytes;

fn verify_headers(headers: &HeaderMap) -> Result<(), AppError> {
#[derive(Debug, PartialEq)]
enum VerifyHeadersMode {
Read,
Write,
}

fn verify_headers(headers: &HeaderMap, mode: VerifyHeadersMode) -> Result<(), AppError> {
// Check that we have the right content encoding, that must be snappy
match headers.get("content-encoding") {
Some(content_encoding) => match content_encoding.to_str() {
Expand Down Expand Up @@ -46,20 +69,39 @@ fn verify_headers(headers: &HeaderMap) -> Result<(), AppError> {
}
}

// Check that the remote write version is supported
match headers.get("x-prometheus-remote-write-version") {
Some(version) => match version.to_str() {
Ok("0.1.0") => {}
_ => {
if mode == VerifyHeadersMode::Write {
// Check that the remote write version is supported
match headers.get("x-prometheus-remote-write-version") {
Some(version) => match version.to_str() {
Ok("0.1.0") => {}
_ => {
return Err(AppError::BadRequest(anyhow::anyhow!(
"Unsupported x-prometheus-remote-write-version, must be 0.1.0"
)));
}
},
None => {
return Err(AppError::BadRequest(anyhow::anyhow!(
"Unsupported x-prometheus-remote-write-version, must be 0.1.0"
"Missing x-prometheus-remote-write-version header"
)));
}
}
} else {
// Check that the remote read version is supported
match headers.get("x-prometheus-remote-read-version") {
Some(version) => match version.to_str() {
Ok("0.1.0") => {}
_ => {
return Err(AppError::BadRequest(anyhow::anyhow!(
"Unsupported x-prometheus-remote-read-version, must be 0.1.0"
)));
}
},
None => {
return Err(AppError::BadRequest(anyhow::anyhow!(
"Missing x-prometheus-remote-read-version header"
)));
}
},
None => {
return Err(AppError::BadRequest(anyhow::anyhow!(
"Missing x-prometheus-remote-write-version header"
)));
}
}

Expand All @@ -78,7 +120,7 @@ fn verify_headers(headers: &HeaderMap) -> Result<(), AppError> {
request_body(
content = Bytes,
content_type = "application/x-protobuf",
description = "Prometheus Remote Write endpoint. [Reference](https://prometheus.io/docs/concepts/remote_write_spec/)",
description = "Prometheus Remote Write data. [Reference](https://prometheus.io/docs/concepts/remote_write_spec/)",
),
params(
("content-encoding" = String, Header, format = "snappy", description = "Content encoding, must be snappy"),
Expand All @@ -92,12 +134,12 @@ fn verify_headers(headers: &HeaderMap) -> Result<(), AppError> {
)
)]
#[debug_handler]
pub async fn publish_prometheus(
pub async fn prometheus_remote_write(
State(state): State<HttpServerState>,
headers: HeaderMap,
bytes: Bytes,
) -> Result<StatusCode, AppError> {
verify_headers(&headers)?;
verify_headers(&headers, VerifyHeadersMode::Write)?;

let mut batch_builder = BatchBuilder::new()?;
let parser = PrometheusParser;
Expand Down Expand Up @@ -128,23 +170,156 @@ pub async fn publish_prometheus(
request_body(
content = String,
content_type = "application/x-protobuf",
description = "Prometheus Remote Read endpoint. [Reference](https://prometheus.io/docs/prometheus/latest/querying/remote_read_api/).",
description = "Prometheus Remote Read query. [Reference](https://prometheus.io/docs/prometheus/latest/querying/remote_read_api/).",
),
params(
("content-encoding" = String, Header, format = "snappy", description = "Content encoding, must be snappy"),
("content-type" = String, Header, format = "application/x-protobuf", description = "Content type, must be application/x-protobuf"),
),
responses(
(status = 200, description = "Prometheus Remote Read endpoint"),
(status = 200, description = "Prometheus Remote Read data"),
(status = 400, description = "Bad Request", body = AppError),
(status = 500, description = "Internal Server Error", body = AppError),
)
)]
#[debug_handler]
pub async fn read(
pub async fn prometheus_remote_read(
State(state): State<HttpServerState>,
headers: HeaderMap,
bytes: Bytes,
) -> Result<StatusCode, AppError> {
unimplemented!();
) -> Result<(StatusCode, HeaderMap, Bytes), AppError> {
println!("Prometheus Remote Write API");
println!("bytes: {:?}", bytes);
println!("headers: {:?}", headers);
verify_headers(&headers, VerifyHeadersMode::Read)?;

let read_request = parse_remote_read_request(&bytes)?;
println!("read_request: {:?}", read_request);
let xor_response_type =
read_request
.accepted_response_types
.iter()
.any(|accepted_response_type| {
*accepted_response_type == ResponseType::StreamedXorChunks as i32
});

println!("xor_response_type: {:?}", xor_response_type);

// Write status header
// content-type: application/x-protobuf

let mut headers = HeaderMap::new();
if xor_response_type {
headers.insert(
"Content-Type",
HeaderValue::from_static(
"application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse",
),
);
headers.insert("Content-Encoding", HeaderValue::from_static(""));
} else {
headers.insert(
"Content-Type",
HeaderValue::from_static("application/x-protobuf"),
);
headers.insert("Content-Encoding", HeaderValue::from_static("snappy"));
}

for (i, query) in read_request.queries.iter().enumerate() {
println!("query matcher: {:?}", query.to_sensor_matcher());
let start = query.start_timestamp_ms;
let end = query.end_timestamp_ms;

// create 100 samples
let n_samples = 10_000_i64;
let mut samples = Vec::with_capacity(n_samples as usize);
let step = (end - start) / n_samples;
for j in 0..n_samples {
let timestamp = start + step * j;
let value = (i + 1) as f64 * j as f64;
samples.push(crate::datamodel::sample::Sample {
datetime: SensAppDateTime::from_unix_milliseconds_i64(timestamp),
value,
});
}

if xor_response_type {
let mut chunk_buffer: Vec<u8> = Vec::new();
let chunk = rusty_chunkenc::xor::XORChunk::new(
samples
.into_iter()
.map(|sample| rusty_chunkenc::XORSample {
timestamp: sample.datetime.to_unix_milliseconds().floor() as i64,
value: sample.value,
})
.collect(),
);
chunk.write(&mut chunk_buffer).unwrap();

println!("buffer: {}", base64::encode(&chunk_buffer));

let chunked_read_response = ChunkedReadResponse {
chunked_series: vec![ChunkedSeries {
labels: vec![Label {
name: "__name__".to_string(),
value: "canard".to_string(),
}],
chunks: vec![Chunk {
min_time_ms: start,
max_time_ms: end,
r#type: chunk::Encoding::Xor as i32,
data: chunk_buffer,
}],
}],
query_index: i as i64,
};

// convert to bytes
let mut proto_buffer: Vec<u8> = Vec::new();
chunked_read_response.encode(&mut proto_buffer).unwrap();

let mut buffer: Vec<u8> = Vec::new();
write_uvarint(proto_buffer.len() as u64, &mut buffer).unwrap();

write_crc32c(&proto_buffer, &mut buffer).unwrap();
buffer.write_all(&proto_buffer).unwrap();

// just to let it go
if buffer.len() > 0 {
return Ok((StatusCode::OK, headers.clone(), buffer.into()));
}
} else {
let read_response = ReadResponse {
results: vec![QueryResult {
timeseries: vec![TimeSeries {
labels: vec![Label {
name: "__name__".to_string(),
value: "canard".to_string(),
}],
samples: samples
.into_iter()
.map(|sample| Sample {
timestamp: sample.datetime.to_unix_milliseconds().floor() as i64,
value: sample.value,
})
.collect(),
}],
}],
};

// convert to bytes
let mut proto_buffer: Vec<u8> = Vec::new();
read_response.encode(&mut proto_buffer).unwrap();

// snappy it
let mut encoder = snap::raw::Encoder::new();
let buffer = encoder.compress_vec(&proto_buffer).unwrap();

if buffer.len() > 0 {
return Ok((StatusCode::OK, headers.clone(), buffer.into()));
}
}
}

Ok((StatusCode::NO_CONTENT, headers, Bytes::new()))
}
Loading

0 comments on commit 0cb653c

Please sign in to comment.