Skip to content

Commit

Permalink
Merge pull request #5 from mezmo/holmberg/LOG-19506
Browse files Browse the repository at this point in the history
Fast forward mezmo/master
  • Loading branch information
aholmberg authored Mar 19, 2024
2 parents a049b0c + c267715 commit 35fed6f
Show file tree
Hide file tree
Showing 59 changed files with 3,980 additions and 775 deletions.
14 changes: 5 additions & 9 deletions .github/workflows/integration-test-mezmo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,14 @@ name: Integration Test Suite (Mezmo)

on:
workflow_dispatch:
pull_request:
branches-ignore:
- 'dependabot/**'
paths-ignore:
- 'deployment/**'
- 'test/data/**'
- 'docs/**'

push:
branches:
- master
- ci-fix
paths-ignore:
- 'deployment/**'
- 'test/data/**'
- 'docs/**'

concurrency:
# For pull requests, cancel running workflows, for master, run all
Expand Down Expand Up @@ -48,7 +44,7 @@ jobs:
include:
# Integrations that we are not using/supporting are disabled
- test: 'aws'
# - test: 'azure'
- test: 'azure'
# - test: 'clickhouse'
# - test: 'datadog-agent'
# - test: 'datadog-logs'
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

96 changes: 96 additions & 0 deletions MEZMO_CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,99 @@
# [3.12.0](https://github.com/answerbook/vector/compare/v3.11.1...v3.12.0) (2024-03-13)


### Features

* **s3 sink**: add recursive directory consolidation [301ac96](https://github.com/answerbook/vector/commit/301ac96b00170af05aab9f3bfd4db1a0b4a896c8) - dominic-mcallister-logdna [LOG-19448](https://logdna.atlassian.net/browse/LOG-19448)


### Miscellaneous

* Merge pull request #428 from answerbook/dominic/LOG-19448 [f43e028](https://github.com/answerbook/vector/commit/f43e028b0e500a84997afb60ff0026dc38ba0fb1) - GitHub [LOG-19448](https://logdna.atlassian.net/browse/LOG-19448)

## [3.11.1](https://github.com/answerbook/vector/compare/v3.11.0...v3.11.1) (2024-03-12)


### Bug Fixes

* **classification**: `SYSLOGLINE` is having false positive matches [e908658](https://github.com/answerbook/vector/commit/e9086582b383f9a25178a6215333521f14ae66f2) - Darin Spivey [LOG-19416](https://logdna.atlassian.net/browse/LOG-19416)


### Miscellaneous

* Merge pull request #429 from answerbook/darinspivey/LOG-19416 [c129f60](https://github.com/answerbook/vector/commit/c129f605b54128ae46eb40ec682ac4920bee5cb5) - GitHub [LOG-19416](https://logdna.atlassian.net/browse/LOG-19416)

# [3.11.0](https://github.com/answerbook/vector/compare/v3.10.0...v3.11.0) (2024-03-01)


### Chores

* disable integration tests on pull requests [23251f2](https://github.com/answerbook/vector/commit/23251f290ac02e3ec8c34fe93082e6fe1a74041a) - Mike Del Tito [LOG-19433](https://logdna.atlassian.net/browse/LOG-19433)


### Features

* **classification**: identify and annotate json string messages [a837636](https://github.com/answerbook/vector/commit/a837636de79d9339b03d16c4e95e906f4cdca587) - Mike Del Tito [LOG-19433](https://logdna.atlassian.net/browse/LOG-19433)


### Miscellaneous

* Merge pull request #427 from answerbook/mdeltito/LOG-19433 [6ef9311](https://github.com/answerbook/vector/commit/6ef931137b5ae2600240c39d0fa111664f047d4a) - GitHub [LOG-19433](https://logdna.atlassian.net/browse/LOG-19433)

# [3.10.0](https://github.com/answerbook/vector/compare/v3.9.0...v3.10.0) (2024-02-29)


### Features

* **azure blob**: add tagging and file consolidation [d933d28](https://github.com/answerbook/vector/commit/d933d2800edfde28e2eb19d6c27a84b5431238e4) - dominic-mcallister-logdna [LOG-19336](https://logdna.atlassian.net/browse/LOG-19336) [LOG-19337](https://logdna.atlassian.net/browse/LOG-19337)


### Miscellaneous

* Merge pull request #426 from answerbook/dominic/LOG-19232 [a5acdec](https://github.com/answerbook/vector/commit/a5acdec594f76de949900f6f31dcd1f300f6f2dd) - GitHub [LOG-19232](https://logdna.atlassian.net/browse/LOG-19232)

# [3.9.0](https://github.com/answerbook/vector/compare/v3.8.1...v3.9.0) (2024-02-26)


### Features

* **aggregate-v2**: Track and expose metadata on event [e88c421](https://github.com/answerbook/vector/commit/e88c421ba9818b1ab7678be413064dfdc2d36915) - Dan Hable [LOG-19291](https://logdna.atlassian.net/browse/LOG-19291)

## [3.8.1](https://github.com/answerbook/vector/compare/v3.8.0...v3.8.1) (2024-02-23)


### Bug Fixes

* **sinks**: Enable user error logging for azure blob (#424) [fc3cd8a](https://github.com/answerbook/vector/commit/fc3cd8a84fe2893fe01e203ee4ece48fb5305047) - GitHub [LOG-19360](https://logdna.atlassian.net/browse/LOG-19360)


### Code Refactoring

* **sinks**: Restore healthcheck and response user error logging (#422) [4326471](https://github.com/answerbook/vector/commit/4326471d8ee7a406fe52f8c450c1d2ba8eaaa61b) - GitHub [LOG-19146](https://logdna.atlassian.net/browse/LOG-19146)

# [3.8.0](https://github.com/answerbook/vector/compare/v3.7.4...v3.8.0) (2024-02-20)


### Features

* spread vector nodes using topo spread constraints [df7f3f6](https://github.com/answerbook/vector/commit/df7f3f6cf9481766f53ab7cb4325693ce84825d4) - Adam Holmberg [LOG-18832](https://logdna.atlassian.net/browse/LOG-18832)


### Miscellaneous

* Merge pull request #421 from answerbook/holmberg/LOG-18832 [6aa2489](https://github.com/answerbook/vector/commit/6aa2489a43843266e54881fa869456fe4f46b0d2) - GitHub [LOG-18832](https://logdna.atlassian.net/browse/LOG-18832)

## [3.7.4](https://github.com/answerbook/vector/compare/v3.7.3...v3.7.4) (2024-02-14)


### Bug Fixes

* **mezmo-sink**: remove ARC, replace sleep timer with manual check [8b74d7f](https://github.com/answerbook/vector/commit/8b74d7f2272ddf561f20d4b93656d27ddc4ece4b) - Mike Del Tito [LOG-19184](https://logdna.atlassian.net/browse/LOG-19184)


### Miscellaneous

* Merge pull request #420 from answerbook/feature/LOG-19184 [273f6a5](https://github.com/answerbook/vector/commit/273f6a5f19585db8c1dfde9fe6684b133f2ad917) - GitHub [LOG-19184](https://logdna.atlassian.net/browse/LOG-19184)

## [3.7.3](https://github.com/answerbook/vector/compare/v3.7.2...v3.7.3) (2024-02-08)


Expand Down
7 changes: 7 additions & 0 deletions deployment/kubernetes/mtp-vector.yaml.envsubst
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,13 @@ spec:
sources:
- configMap:
name: vector
topologySpreadConstraints:
- maxSkew: 1
topologyKey: kubernetes.io/hostname
whenUnsatisfiable: ScheduleAnyway
labelSelector:
matchLabels:
app: vector
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
Expand Down
1 change: 1 addition & 0 deletions lib/vector-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ schannel = "0.1.22"
prost-build = "0.12"

[dev-dependencies]
assay = "0.1.1"
base64 = "0.21.5"
chrono-tz = { version = "0.8.4", default-features = false }
criterion = { version = "0.5.1", features = ["html_reports"] }
Expand Down
5 changes: 4 additions & 1 deletion lib/vector-core/src/fanout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tokio::sync::mpsc;
use tokio_util::sync::ReusableBoxFuture;
use vector_buffers::topology::channel::BufferSender;

use crate::{config::ComponentKey, event::EventArray};
use crate::{config::ComponentKey, event::EventArray, usage_metrics::OutputUsageTracker};

pub enum ControlMessage {
/// Adds a new sink to the fanout.
Expand Down Expand Up @@ -170,10 +170,13 @@ impl Fanout {
pub async fn send_stream(
&mut self,
events: impl Stream<Item = EventArray>,
usage_tracker: Box<dyn OutputUsageTracker>,
) -> crate::Result<()> {
tokio::pin!(events);
while let Some(event_array) = events.next().await {
let usage_profile = usage_tracker.get_size_and_profile(&event_array);
self.send(event_array).await?;
usage_tracker.track_output(usage_profile);
}
Ok(())
}
Expand Down
94 changes: 93 additions & 1 deletion lib/vector-core/src/usage_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ impl UsageMetricsKey {
/// Determines whether the component with this key should be tracked for profiling.
/// Currently only during the analysis phase.
fn is_tracked_for_profiling(&self) -> bool {
// The absence of a pipeline id means that the component is part of the analysis phase
if self.pipeline_id.is_some() {
return false;
}
Expand All @@ -134,6 +135,8 @@ impl UsageMetricsKey {
return false;
};

// Only internal instances of the log classification component should be tracked
// for profiling.
!internal && self.component_type == "mezmo_log_classification"
}

Expand Down Expand Up @@ -299,7 +302,10 @@ fn track_usage(
});
}

if is_profile_enabled() && !usage.usage_by_annotation.is_empty() {
if is_profile_enabled()
&& key.is_tracked_for_profiling()
&& !usage.usage_by_annotation.is_empty()
{
usage_by_annotation = Some(usage.usage_by_annotation);
}

Expand Down Expand Up @@ -817,6 +823,7 @@ pub fn get_db_config(endpoint_url: &str) -> Result<Config, MetricsPublishingErro

#[cfg(test)]
mod tests {
use assay::assay;
use std::collections::{BTreeMap, HashMap};
use tokio::sync::mpsc;
use vrl::value::Value;
Expand Down Expand Up @@ -861,6 +868,18 @@ mod tests {
};
}

fn track_usage_test(key: &UsageMetricsKey, event: LogEvent) -> UnboundedReceiver<UsageMetrics> {
let (tx, rx) = mpsc::unbounded_channel::<UsageMetrics>();
let usage_profile = get_size_and_profile(&event.into());

track_usage(&tx, key, usage_profile);
rx
}

fn annotation_path(parts: &[&str]) -> String {
log_schema().annotations_key().to_string() + "." + parts.join(".").as_str()
}

#[test]
fn usage_metrics_key_parse() {
assert_parse_ok!(
Expand Down Expand Up @@ -972,6 +991,79 @@ mod tests {
);
}

#[assay(env = [("USAGE_METRICS_PROFILE_ENABLED", "true")])]
fn track_usage_key_not_tracked_either() {
let key: UsageMetricsKey = "v1:filter-by-field:transform:comp1:pipe1:account1"
.parse()
.unwrap();
let mut event_map: BTreeMap<String, Value> = BTreeMap::new();
event_map.insert(
log_schema().message_key().unwrap().to_string(),
"foo".into(),
);
let event: LogEvent = event_map.into();

let mut rx = track_usage_test(&key, event);
assert!(rx.try_recv().is_err());
}

#[assay(env = [("USAGE_METRICS_PROFILE_ENABLED", "true")])]
fn track_usage_key_billing_only() {
let key: UsageMetricsKey = "v1:remap:internal_transform:comp1:pipe1:account1"
.parse()
.unwrap();
let mut event_map: BTreeMap<String, Value> = BTreeMap::new();
event_map.insert(
log_schema().message_key().unwrap().to_string(),
"the message".into(),
);

let mut event: LogEvent = event_map.into();
event.insert(annotation_path(vec!["app"].as_ref()).as_str(), "app-1");

let mut rx = track_usage_test(&key, event);
let tracked = rx.try_recv();
assert!(tracked.is_ok());

let tracked = tracked.unwrap();
assert_eq!(tracked.key, key, "should be the same key");
assert!(tracked.billing.is_some(), "should contain billing metrics");
assert!(
tracked.usage_by_annotation.is_none(),
"should NOT contain profiling metrics"
);
}

#[assay(env = [("USAGE_METRICS_PROFILE_ENABLED", "true")])]
fn track_usage_key_profiling_only() {
let key: UsageMetricsKey = "v1:mezmo_log_classification:transform:comp1:account1"
.parse()
.unwrap();
let mut event_map: BTreeMap<String, Value> = BTreeMap::new();
event_map.insert(
log_schema().message_key().unwrap().to_string(),
"the message".into(),
);

let mut event: LogEvent = event_map.into();
event.insert(annotation_path(vec!["app"].as_ref()).as_str(), "app-1");

let mut rx = track_usage_test(&key, event);
let tracked = rx.try_recv();
assert!(tracked.is_ok());

let tracked = tracked.unwrap();
assert_eq!(tracked.key, key, "should be the same key");
assert!(
tracked.billing.is_none(),
"should NOT contain billing metrics"
);
assert!(
tracked.usage_by_annotation.is_some(),
"should contain profiling metrics"
);
}

#[test]
fn target_key_test() {
let value: UsageMetricsKey = "v1:s3:source:comp1:pipe1:account1".parse().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "vector",
"version": "3.7.3",
"version": "3.12.0",
"description": "Vector is a high-performance, end-to-end (agent & aggregator) observability data pipeline",
"repository": {
"type": "git",
Expand Down
2 changes: 1 addition & 1 deletion scripts/integration/azure/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ env:
LOGSTASH_ADDRESS: 0.0.0.0:8081

matrix:
version: [3.14.0]
version: [3.29.0]

# changes to these files/paths will invoke the integration test in CI
# expressions are evaluated using https://github.com/micromatch/picomatch
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/aws_cloudwatch_logs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,12 @@ impl SinkConfig for CloudwatchLogsSinkConfig {
self.clone(),
client.clone(),
std::sync::Arc::new(smithy_client),
cx,
cx.clone(),
));
let transformer = self.encoding.transformer();
let serializer = self.encoding.build()?;
let encoder = Encoder::<()>::new(serializer);
let healthcheck = healthcheck(self.clone(), client).boxed();
let healthcheck = healthcheck(self.clone(), client, cx.clone()).boxed();

let sink = CloudwatchSink {
batcher_settings,
Expand Down
Loading

0 comments on commit 35fed6f

Please sign in to comment.