Skip to content

Commit

Permalink
fix(profiling): only track profiling annotations from classification
Browse files Browse the repository at this point in the history
The existing implementation intended to only track after the
`mezmo_log_classification` transform, but was instead inadvertently
tracking only when the component was also being tracked for billing.

This fix implements usage tracking for "task" transforms which allows us
to track after the classification transform, and also tightens up the
logic such that we only track profiling data for component keys that
_should_ be tracked.

Ref: LOG-19502
  • Loading branch information
mdeltito committed Mar 19, 2024
1 parent a8b82eb commit c3106a9
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/vector-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ schannel = "0.1.22"
prost-build = "0.12"

[dev-dependencies]
assay = "0.1.1"
base64 = "0.21.5"
chrono-tz = { version = "0.8.4", default-features = false }
criterion = { version = "0.5.1", features = ["html_reports"] }
Expand Down
5 changes: 4 additions & 1 deletion lib/vector-core/src/fanout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tokio::sync::mpsc;
use tokio_util::sync::ReusableBoxFuture;
use vector_buffers::topology::channel::BufferSender;

use crate::{config::ComponentKey, event::EventArray};
use crate::{config::ComponentKey, event::EventArray, usage_metrics::OutputUsageTracker};

pub enum ControlMessage {
/// Adds a new sink to the fanout.
Expand Down Expand Up @@ -170,10 +170,13 @@ impl Fanout {
pub async fn send_stream(
&mut self,
events: impl Stream<Item = EventArray>,
usage_tracker: Box<dyn OutputUsageTracker>,
) -> crate::Result<()> {
tokio::pin!(events);
while let Some(event_array) = events.next().await {
let usage_profile = usage_tracker.get_size_and_profile(&event_array);
self.send(event_array).await?;
usage_tracker.track_output(usage_profile);
}
Ok(())
}
Expand Down
94 changes: 93 additions & 1 deletion lib/vector-core/src/usage_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ impl UsageMetricsKey {
/// Determines whether the component with this key should be tracked for profiling.
/// Currently only during the analysis phase.
fn is_tracked_for_profiling(&self) -> bool {
// The absence of a pipeline id means that the component is part of the analysis phase
if self.pipeline_id.is_some() {
return false;
}
Expand All @@ -134,6 +135,8 @@ impl UsageMetricsKey {
return false;
};

// Only internal instances of the log classification component should be tracked
// for profiling.
!internal && self.component_type == "mezmo_log_classification"
}

Expand Down Expand Up @@ -299,7 +302,10 @@ fn track_usage(
});
}

if is_profile_enabled() && !usage.usage_by_annotation.is_empty() {
if is_profile_enabled()
&& key.is_tracked_for_profiling()
&& !usage.usage_by_annotation.is_empty()
{
usage_by_annotation = Some(usage.usage_by_annotation);
}

Expand Down Expand Up @@ -817,6 +823,7 @@ pub fn get_db_config(endpoint_url: &str) -> Result<Config, MetricsPublishingErro

#[cfg(test)]
mod tests {
use assay::assay;
use std::collections::{BTreeMap, HashMap};
use tokio::sync::mpsc;
use vrl::value::Value;
Expand Down Expand Up @@ -861,6 +868,18 @@ mod tests {
};
}

fn track_usage_test(key: &UsageMetricsKey, event: LogEvent) -> UnboundedReceiver<UsageMetrics> {
let (tx, rx) = mpsc::unbounded_channel::<UsageMetrics>();
let usage_profile = get_size_and_profile(&event.into());

track_usage(&tx, key, usage_profile);
rx
}

fn annotation_path(parts: &[&str]) -> String {
log_schema().annotations_key().to_string() + "." + parts.join(".").as_str()
}

#[test]
fn usage_metrics_key_parse() {
assert_parse_ok!(
Expand Down Expand Up @@ -972,6 +991,79 @@ mod tests {
);
}

#[assay(env = [("USAGE_METRICS_PROFILE_ENABLED", "true")])]
fn track_usage_key_not_tracked_either() {
let key: UsageMetricsKey = "v1:filter-by-field:transform:comp1:pipe1:account1"
.parse()
.unwrap();
let mut event_map: BTreeMap<String, Value> = BTreeMap::new();
event_map.insert(
log_schema().message_key().unwrap().to_string(),
"foo".into(),
);
let event: LogEvent = event_map.into();

let mut rx = track_usage_test(&key, event);
assert!(rx.try_recv().is_err());
}

#[assay(env = [("USAGE_METRICS_PROFILE_ENABLED", "true")])]
fn track_usage_key_billing_only() {
let key: UsageMetricsKey = "v1:remap:internal_transform:comp1:pipe1:account1"
.parse()
.unwrap();
let mut event_map: BTreeMap<String, Value> = BTreeMap::new();
event_map.insert(
log_schema().message_key().unwrap().to_string(),
"the message".into(),
);

let mut event: LogEvent = event_map.into();
event.insert(annotation_path(vec!["app"].as_ref()).as_str(), "app-1");

let mut rx = track_usage_test(&key, event);
let tracked = rx.try_recv();
assert!(tracked.is_ok());

let tracked = tracked.unwrap();
assert_eq!(tracked.key, key, "should be the same key");
assert!(tracked.billing.is_some(), "should contain billing metrics");
assert!(
tracked.usage_by_annotation.is_none(),
"should NOT contain profiling metrics"
);
}

#[assay(env = [("USAGE_METRICS_PROFILE_ENABLED", "true")])]
fn track_usage_key_profiling_only() {
let key: UsageMetricsKey = "v1:mezmo_log_classification:transform:comp1:account1"
.parse()
.unwrap();
let mut event_map: BTreeMap<String, Value> = BTreeMap::new();
event_map.insert(
log_schema().message_key().unwrap().to_string(),
"the message".into(),
);

let mut event: LogEvent = event_map.into();
event.insert(annotation_path(vec!["app"].as_ref()).as_str(), "app-1");

let mut rx = track_usage_test(&key, event);
let tracked = rx.try_recv();
assert!(tracked.is_ok());

let tracked = tracked.unwrap();
assert_eq!(tracked.key, key, "should be the same key");
assert!(
tracked.billing.is_none(),
"should NOT contain billing metrics"
);
assert!(
tracked.usage_by_annotation.is_some(),
"should contain profiling metrics"
);
}

#[test]
fn target_key_test() {
let value: UsageMetricsKey = "v1:s3:source:comp1:pipe1:account1".parse().unwrap();
Expand Down
4 changes: 3 additions & 1 deletion src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,7 @@ fn build_transform(
node.typetag,
&node.key,
&node.outputs,
usage_tracker,
),
}
}
Expand Down Expand Up @@ -1001,6 +1002,7 @@ fn build_task_transform(
typetag: &str,
key: &ComponentKey,
outputs: &[TransformOutput],
usage_tracker: Box<dyn OutputUsageTracker>,
) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
let (mut fanout, control) = Fanout::new();

Expand Down Expand Up @@ -1049,7 +1051,7 @@ fn build_task_transform(
let transform = async move {
debug!("Task transform starting.");

match fanout.send_stream(stream).await {
match fanout.send_stream(stream, usage_tracker).await {
Ok(()) => {
debug!("Task transform finished normally.");
Ok(TaskOutput::Transform)
Expand Down

0 comments on commit c3106a9

Please sign in to comment.