Skip to content

Commit

Permalink
Merge pull request #431 from answerbook/mdeltito/LOG-19502
Browse files Browse the repository at this point in the history
fix(profiling): only track profiling annotations from classification
  • Loading branch information
mdeltito authored Mar 19, 2024
2 parents a8b82eb + c3106a9 commit c267715
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/vector-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ schannel = "0.1.22"
prost-build = "0.12"

[dev-dependencies]
assay = "0.1.1"
base64 = "0.21.5"
chrono-tz = { version = "0.8.4", default-features = false }
criterion = { version = "0.5.1", features = ["html_reports"] }
Expand Down
5 changes: 4 additions & 1 deletion lib/vector-core/src/fanout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tokio::sync::mpsc;
use tokio_util::sync::ReusableBoxFuture;
use vector_buffers::topology::channel::BufferSender;

use crate::{config::ComponentKey, event::EventArray};
use crate::{config::ComponentKey, event::EventArray, usage_metrics::OutputUsageTracker};

pub enum ControlMessage {
/// Adds a new sink to the fanout.
Expand Down Expand Up @@ -170,10 +170,13 @@ impl Fanout {
pub async fn send_stream(
&mut self,
events: impl Stream<Item = EventArray>,
usage_tracker: Box<dyn OutputUsageTracker>,
) -> crate::Result<()> {
tokio::pin!(events);
while let Some(event_array) = events.next().await {
let usage_profile = usage_tracker.get_size_and_profile(&event_array);
self.send(event_array).await?;
usage_tracker.track_output(usage_profile);
}
Ok(())
}
Expand Down
94 changes: 93 additions & 1 deletion lib/vector-core/src/usage_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ impl UsageMetricsKey {
/// Determines whether the component with this key should be tracked for profiling.
/// Currently only during the analysis phase.
fn is_tracked_for_profiling(&self) -> bool {
// The absence of a pipeline id means that the component is part of the analysis phase
if self.pipeline_id.is_some() {
return false;
}
Expand All @@ -134,6 +135,8 @@ impl UsageMetricsKey {
return false;
};

// Only internal instances of the log classification component should be tracked
// for profiling.
!internal && self.component_type == "mezmo_log_classification"
}

Expand Down Expand Up @@ -299,7 +302,10 @@ fn track_usage(
});
}

if is_profile_enabled() && !usage.usage_by_annotation.is_empty() {
if is_profile_enabled()
&& key.is_tracked_for_profiling()
&& !usage.usage_by_annotation.is_empty()
{
usage_by_annotation = Some(usage.usage_by_annotation);
}

Expand Down Expand Up @@ -817,6 +823,7 @@ pub fn get_db_config(endpoint_url: &str) -> Result<Config, MetricsPublishingErro

#[cfg(test)]
mod tests {
use assay::assay;
use std::collections::{BTreeMap, HashMap};
use tokio::sync::mpsc;
use vrl::value::Value;
Expand Down Expand Up @@ -861,6 +868,18 @@ mod tests {
};
}

fn track_usage_test(key: &UsageMetricsKey, event: LogEvent) -> UnboundedReceiver<UsageMetrics> {
let (tx, rx) = mpsc::unbounded_channel::<UsageMetrics>();
let usage_profile = get_size_and_profile(&event.into());

track_usage(&tx, key, usage_profile);
rx
}

fn annotation_path(parts: &[&str]) -> String {
log_schema().annotations_key().to_string() + "." + parts.join(".").as_str()
}

#[test]
fn usage_metrics_key_parse() {
assert_parse_ok!(
Expand Down Expand Up @@ -972,6 +991,79 @@ mod tests {
);
}

#[assay(env = [("USAGE_METRICS_PROFILE_ENABLED", "true")])]
fn track_usage_key_not_tracked_either() {
let key: UsageMetricsKey = "v1:filter-by-field:transform:comp1:pipe1:account1"
.parse()
.unwrap();
let mut event_map: BTreeMap<String, Value> = BTreeMap::new();
event_map.insert(
log_schema().message_key().unwrap().to_string(),
"foo".into(),
);
let event: LogEvent = event_map.into();

let mut rx = track_usage_test(&key, event);
assert!(rx.try_recv().is_err());
}

#[assay(env = [("USAGE_METRICS_PROFILE_ENABLED", "true")])]
fn track_usage_key_billing_only() {
let key: UsageMetricsKey = "v1:remap:internal_transform:comp1:pipe1:account1"
.parse()
.unwrap();
let mut event_map: BTreeMap<String, Value> = BTreeMap::new();
event_map.insert(
log_schema().message_key().unwrap().to_string(),
"the message".into(),
);

let mut event: LogEvent = event_map.into();
event.insert(annotation_path(vec!["app"].as_ref()).as_str(), "app-1");

let mut rx = track_usage_test(&key, event);
let tracked = rx.try_recv();
assert!(tracked.is_ok());

let tracked = tracked.unwrap();
assert_eq!(tracked.key, key, "should be the same key");
assert!(tracked.billing.is_some(), "should contain billing metrics");
assert!(
tracked.usage_by_annotation.is_none(),
"should NOT contain profiling metrics"
);
}

#[assay(env = [("USAGE_METRICS_PROFILE_ENABLED", "true")])]
fn track_usage_key_profiling_only() {
let key: UsageMetricsKey = "v1:mezmo_log_classification:transform:comp1:account1"
.parse()
.unwrap();
let mut event_map: BTreeMap<String, Value> = BTreeMap::new();
event_map.insert(
log_schema().message_key().unwrap().to_string(),
"the message".into(),
);

let mut event: LogEvent = event_map.into();
event.insert(annotation_path(vec!["app"].as_ref()).as_str(), "app-1");

let mut rx = track_usage_test(&key, event);
let tracked = rx.try_recv();
assert!(tracked.is_ok());

let tracked = tracked.unwrap();
assert_eq!(tracked.key, key, "should be the same key");
assert!(
tracked.billing.is_none(),
"should NOT contain billing metrics"
);
assert!(
tracked.usage_by_annotation.is_some(),
"should contain profiling metrics"
);
}

#[test]
fn target_key_test() {
let value: UsageMetricsKey = "v1:s3:source:comp1:pipe1:account1".parse().unwrap();
Expand Down
4 changes: 3 additions & 1 deletion src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,7 @@ fn build_transform(
node.typetag,
&node.key,
&node.outputs,
usage_tracker,
),
}
}
Expand Down Expand Up @@ -1001,6 +1002,7 @@ fn build_task_transform(
typetag: &str,
key: &ComponentKey,
outputs: &[TransformOutput],
usage_tracker: Box<dyn OutputUsageTracker>,
) -> (Task, HashMap<OutputId, fanout::ControlChannel>) {
let (mut fanout, control) = Fanout::new();

Expand Down Expand Up @@ -1049,7 +1051,7 @@ fn build_task_transform(
let transform = async move {
debug!("Task transform starting.");

match fanout.send_stream(stream).await {
match fanout.send_stream(stream, usage_tracker).await {
Ok(()) => {
debug!("Task transform finished normally.");
Ok(TaskOutput::Transform)
Expand Down

0 comments on commit c267715

Please sign in to comment.