diff --git a/Cargo.lock b/Cargo.lock index 28d15cc2c..67a1fa9e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10776,6 +10776,7 @@ dependencies = [ name = "vector-core" version = "0.1.0" dependencies = [ + "assay", "async-graphql", "async-trait", "base64 0.21.5", diff --git a/lib/vector-core/Cargo.toml b/lib/vector-core/Cargo.toml index d67bb000b..f14fee48c 100644 --- a/lib/vector-core/Cargo.toml +++ b/lib/vector-core/Cargo.toml @@ -80,6 +80,7 @@ schannel = "0.1.22" prost-build = "0.12" [dev-dependencies] +assay = "0.1.1" base64 = "0.21.5" chrono-tz = { version = "0.8.4", default-features = false } criterion = { version = "0.5.1", features = ["html_reports"] } diff --git a/lib/vector-core/src/fanout.rs b/lib/vector-core/src/fanout.rs index c95969900..63b1c322b 100644 --- a/lib/vector-core/src/fanout.rs +++ b/lib/vector-core/src/fanout.rs @@ -7,7 +7,7 @@ use tokio::sync::mpsc; use tokio_util::sync::ReusableBoxFuture; use vector_buffers::topology::channel::BufferSender; -use crate::{config::ComponentKey, event::EventArray}; +use crate::{config::ComponentKey, event::EventArray, usage_metrics::OutputUsageTracker}; pub enum ControlMessage { /// Adds a new sink to the fanout. @@ -170,10 +170,13 @@ impl Fanout { pub async fn send_stream( &mut self, events: impl Stream, + usage_tracker: Box, ) -> crate::Result<()> { tokio::pin!(events); while let Some(event_array) = events.next().await { + let usage_profile = usage_tracker.get_size_and_profile(&event_array); self.send(event_array).await?; + usage_tracker.track_output(usage_profile); } Ok(()) } diff --git a/lib/vector-core/src/usage_metrics/mod.rs b/lib/vector-core/src/usage_metrics/mod.rs index 965c67cef..3748d5318 100644 --- a/lib/vector-core/src/usage_metrics/mod.rs +++ b/lib/vector-core/src/usage_metrics/mod.rs @@ -126,6 +126,7 @@ impl UsageMetricsKey { /// Determines whether the component with this key should be tracked for profiling. /// Currently only during the analysis phase. fn is_tracked_for_profiling(&self) -> bool { + // The absence of a pipeline id means that the component is part of the analysis phase if self.pipeline_id.is_some() { return false; } @@ -134,6 +135,8 @@ impl UsageMetricsKey { return false; }; + // Only internal instances of the log classification component should be tracked + // for profiling. !internal && self.component_type == "mezmo_log_classification" } @@ -299,7 +302,10 @@ fn track_usage( }); } - if is_profile_enabled() && !usage.usage_by_annotation.is_empty() { + if is_profile_enabled() + && key.is_tracked_for_profiling() + && !usage.usage_by_annotation.is_empty() + { usage_by_annotation = Some(usage.usage_by_annotation); } @@ -817,6 +823,7 @@ pub fn get_db_config(endpoint_url: &str) -> Result UnboundedReceiver { + let (tx, rx) = mpsc::unbounded_channel::(); + let usage_profile = get_size_and_profile(&event.into()); + + track_usage(&tx, key, usage_profile); + rx + } + + fn annotation_path(parts: &[&str]) -> String { + log_schema().annotations_key().to_string() + "." + parts.join(".").as_str() + } + #[test] fn usage_metrics_key_parse() { assert_parse_ok!( @@ -972,6 +991,79 @@ mod tests { ); } + #[assay(env = [("USAGE_METRICS_PROFILE_ENABLED", "true")])] + fn track_usage_key_not_tracked_either() { + let key: UsageMetricsKey = "v1:filter-by-field:transform:comp1:pipe1:account1" + .parse() + .unwrap(); + let mut event_map: BTreeMap = BTreeMap::new(); + event_map.insert( + log_schema().message_key().unwrap().to_string(), + "foo".into(), + ); + let event: LogEvent = event_map.into(); + + let mut rx = track_usage_test(&key, event); + assert!(rx.try_recv().is_err()); + } + + #[assay(env = [("USAGE_METRICS_PROFILE_ENABLED", "true")])] + fn track_usage_key_billing_only() { + let key: UsageMetricsKey = "v1:remap:internal_transform:comp1:pipe1:account1" + .parse() + .unwrap(); + let mut event_map: BTreeMap = BTreeMap::new(); + event_map.insert( + log_schema().message_key().unwrap().to_string(), + "the message".into(), + ); + + let mut event: LogEvent = event_map.into(); + event.insert(annotation_path(vec!["app"].as_ref()).as_str(), "app-1"); + + let mut rx = track_usage_test(&key, event); + let tracked = rx.try_recv(); + assert!(tracked.is_ok()); + + let tracked = tracked.unwrap(); + assert_eq!(tracked.key, key, "should be the same key"); + assert!(tracked.billing.is_some(), "should contain billing metrics"); + assert!( + tracked.usage_by_annotation.is_none(), + "should NOT contain profiling metrics" + ); + } + + #[assay(env = [("USAGE_METRICS_PROFILE_ENABLED", "true")])] + fn track_usage_key_profiling_only() { + let key: UsageMetricsKey = "v1:mezmo_log_classification:transform:comp1:account1" + .parse() + .unwrap(); + let mut event_map: BTreeMap = BTreeMap::new(); + event_map.insert( + log_schema().message_key().unwrap().to_string(), + "the message".into(), + ); + + let mut event: LogEvent = event_map.into(); + event.insert(annotation_path(vec!["app"].as_ref()).as_str(), "app-1"); + + let mut rx = track_usage_test(&key, event); + let tracked = rx.try_recv(); + assert!(tracked.is_ok()); + + let tracked = tracked.unwrap(); + assert_eq!(tracked.key, key, "should be the same key"); + assert!( + tracked.billing.is_none(), + "should NOT contain billing metrics" + ); + assert!( + tracked.usage_by_annotation.is_some(), + "should contain profiling metrics" + ); + } + #[test] fn target_key_test() { let value: UsageMetricsKey = "v1:s3:source:comp1:pipe1:account1".parse().unwrap(); diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 314b7f86d..dec774e7f 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -798,6 +798,7 @@ fn build_transform( node.typetag, &node.key, &node.outputs, + usage_tracker, ), } } @@ -1001,6 +1002,7 @@ fn build_task_transform( typetag: &str, key: &ComponentKey, outputs: &[TransformOutput], + usage_tracker: Box, ) -> (Task, HashMap) { let (mut fanout, control) = Fanout::new(); @@ -1049,7 +1051,7 @@ fn build_task_transform( let transform = async move { debug!("Task transform starting."); - match fanout.send_stream(stream).await { + match fanout.send_stream(stream, usage_tracker).await { Ok(()) => { debug!("Task transform finished normally."); Ok(TaskOutput::Transform)