From e2c31ec59679f2d92df18b734c75b602086adcd3 Mon Sep 17 00:00:00 2001 From: Chao Date: Mon, 16 Dec 2024 16:27:18 +0800 Subject: [PATCH 1/4] Add logs scraper in scraperhelper --- .../add-logs-scraper-in-scraperhelper.yaml | 25 + component/componenttest/obsreporttest.go | 6 + component/componenttest/otelchecker.go | 7 + receiver/scraperhelper/alias.go | 9 + receiver/scraperhelper/documentation.md | 16 + .../internal/metadata/generated_telemetry.go | 14 + .../scraperhelper/logs_scrapercontroller.go | 206 ++++++++ .../logs_scrapercontroller_test.go | 452 ++++++++++++++++++ receiver/scraperhelper/metadata.yaml | 22 +- ...roller.go => metrics_scrapercontroller.go} | 46 +- ...t.go => metrics_scrapercontroller_test.go} | 0 receiver/scraperhelper/obs_logs.go | 83 ++++ receiver/scraperhelper/obs_logs_test.go | 96 ++++ 13 files changed, 958 insertions(+), 24 deletions(-) create mode 100644 .chloggen/add-logs-scraper-in-scraperhelper.yaml create mode 100644 receiver/scraperhelper/alias.go create mode 100644 receiver/scraperhelper/logs_scrapercontroller.go create mode 100644 receiver/scraperhelper/logs_scrapercontroller_test.go rename receiver/scraperhelper/{scrapercontroller.go => metrics_scrapercontroller.go} (77%) rename receiver/scraperhelper/{scrapercontroller_test.go => metrics_scrapercontroller_test.go} (100%) create mode 100644 receiver/scraperhelper/obs_logs.go create mode 100644 receiver/scraperhelper/obs_logs_test.go diff --git a/.chloggen/add-logs-scraper-in-scraperhelper.yaml b/.chloggen/add-logs-scraper-in-scraperhelper.yaml new file mode 100644 index 00000000000..8db333e7999 --- /dev/null +++ b/.chloggen/add-logs-scraper-in-scraperhelper.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: receiver/scraperhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add logs scraper in receiver/scraperhelper + +# One or more tracking issues or pull requests related to the change +issues: [11899] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: This change adds a new logs scraper in receiver/scraperhelper, also introduced new metrics for scraping logs. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/component/componenttest/obsreporttest.go b/component/componenttest/obsreporttest.go index 7c2875aa0dc..730170f26be 100644 --- a/component/componenttest/obsreporttest.go +++ b/component/componenttest/obsreporttest.go @@ -92,6 +92,12 @@ func (tts *TestTelemetry) CheckReceiverMetrics(protocol string, acceptedMetricPo return checkReceiverMetrics(tts.reader, tts.id, protocol, acceptedMetricPoints, droppedMetricPoints) } +// CheckScraperLogs checks that for the current exported values for logs scraper metrics match given values. +// Note: SetupTelemetry must be called before this function. +func (tts *TestTelemetry) CheckScraperLogs(receiver component.ID, scraper component.ID, scrapedLogRecords, erroredLogRecords int64) error { + return checkScraperLogs(tts.reader, receiver, scraper, scrapedLogRecords, erroredLogRecords) +} + // CheckScraperMetrics checks that for the current exported values for metrics scraper metrics match given values. // Note: SetupTelemetry must be called before this function. func (tts *TestTelemetry) CheckScraperMetrics(receiver component.ID, scraper component.ID, scrapedMetricPoints, erroredMetricPoints int64) error { diff --git a/component/componenttest/otelchecker.go b/component/componenttest/otelchecker.go index c3196f7d66e..842aba02968 100644 --- a/component/componenttest/otelchecker.go +++ b/component/componenttest/otelchecker.go @@ -15,6 +15,13 @@ import ( "go.opentelemetry.io/collector/component" ) +func checkScraperLogs(reader *sdkmetric.ManualReader, receiver component.ID, scraper component.ID, scrapedLogRecords, erroredLogRecords int64) error { + scraperAttrs := attributesForScraperMetrics(receiver, scraper) + return multierr.Combine( + checkIntSum(reader, "otelcol_scraper_scraped_log_records", scrapedLogRecords, scraperAttrs), + checkIntSum(reader, "otelcol_scraper_errored_log_records", erroredLogRecords, scraperAttrs)) +} + func checkScraperMetrics(reader *sdkmetric.ManualReader, receiver component.ID, scraper component.ID, scrapedMetricPoints, erroredMetricPoints int64) error { scraperAttrs := attributesForScraperMetrics(receiver, scraper) return multierr.Combine( diff --git a/receiver/scraperhelper/alias.go b/receiver/scraperhelper/alias.go new file mode 100644 index 00000000000..ffa84867fde --- /dev/null +++ b/receiver/scraperhelper/alias.go @@ -0,0 +1,9 @@ +package scraperhelper + +var ( + AddScraper = AddMetricsScraper + NewScraperControllerReceiver = NewMetricsScraperControllerReceiver + WithTickerChannel = WithMetricsTickerChannel +) + +type ScraperControllerOption = MetricsScraperControllerOption diff --git a/receiver/scraperhelper/documentation.md b/receiver/scraperhelper/documentation.md index 48591f4afcc..abe567b3b78 100644 --- a/receiver/scraperhelper/documentation.md +++ b/receiver/scraperhelper/documentation.md @@ -6,6 +6,14 @@ The following telemetry is emitted by this component. +### otelcol_scraper_errored_log_records + +Number of log records that were unable to be scraped. [alpha] + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {datapoints} | Sum | Int | true | + ### otelcol_scraper_errored_metric_points Number of metric points that were unable to be scraped. [alpha] @@ -14,6 +22,14 @@ Number of metric points that were unable to be scraped. [alpha] | ---- | ----------- | ---------- | --------- | | {datapoints} | Sum | Int | true | +### otelcol_scraper_scraped_log_records + +Number of log records successfully scraped. [alpha] + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {datapoints} | Sum | Int | true | + ### otelcol_scraper_scraped_metric_points Number of metric points successfully scraped. [alpha] diff --git a/receiver/scraperhelper/internal/metadata/generated_telemetry.go b/receiver/scraperhelper/internal/metadata/generated_telemetry.go index 8e01bc1d74b..9fad800ce8d 100644 --- a/receiver/scraperhelper/internal/metadata/generated_telemetry.go +++ b/receiver/scraperhelper/internal/metadata/generated_telemetry.go @@ -25,7 +25,9 @@ func Tracer(settings component.TelemetrySettings) trace.Tracer { // as defined in metadata and user config. type TelemetryBuilder struct { meter metric.Meter + ScraperErroredLogRecords metric.Int64Counter ScraperErroredMetricPoints metric.Int64Counter + ScraperScrapedLogRecords metric.Int64Counter ScraperScrapedMetricPoints metric.Int64Counter } @@ -49,12 +51,24 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme } builder.meter = Meter(settings) var err, errs error + builder.ScraperErroredLogRecords, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter( + "otelcol_scraper_errored_log_records", + metric.WithDescription("Number of log records that were unable to be scraped. [alpha]"), + metric.WithUnit("{datapoints}"), + ) + errs = errors.Join(errs, err) builder.ScraperErroredMetricPoints, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter( "otelcol_scraper_errored_metric_points", metric.WithDescription("Number of metric points that were unable to be scraped. [alpha]"), metric.WithUnit("{datapoints}"), ) errs = errors.Join(errs, err) + builder.ScraperScrapedLogRecords, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter( + "otelcol_scraper_scraped_log_records", + metric.WithDescription("Number of log records successfully scraped. [alpha]"), + metric.WithUnit("{datapoints}"), + ) + errs = errors.Join(errs, err) builder.ScraperScrapedMetricPoints, err = getLeveledMeter(builder.meter, configtelemetry.LevelBasic, settings.MetricsLevel).Int64Counter( "otelcol_scraper_scraped_metric_points", metric.WithDescription("Number of metric points successfully scraped. [alpha]"), diff --git a/receiver/scraperhelper/logs_scrapercontroller.go b/receiver/scraperhelper/logs_scrapercontroller.go new file mode 100644 index 00000000000..bd4663be0ba --- /dev/null +++ b/receiver/scraperhelper/logs_scrapercontroller.go @@ -0,0 +1,206 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package scraperhelper // import "go.opentelemetry.io/collector/receiver/scraperhelper" + +import ( + "context" + "sync" + "time" + + "go.opentelemetry.io/collector/pdata/plog" + + "go.uber.org/multierr" + "go.uber.org/zap" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/receiverhelper" + "go.opentelemetry.io/collector/scraper" + "go.opentelemetry.io/collector/scraper/scrapererror" +) + +// LogsScraperControllerOption apply changes to internal options. +type LogsScraperControllerOption interface { + apply(*logsController) +} + +type logsScraperControllerOptionFunc func(*logsController) + +func (of logsScraperControllerOptionFunc) apply(e *logsController) { + of(e) +} + +// AddLogsScraper configures the provided scrape function to be called +// with the specified options, and at the specified collection interval. +// +// Observability information will be reported, and the scraped logs +// will be passed to the next consumer. +func AddLogsScraper(t component.Type, scraper scraper.Logs) LogsScraperControllerOption { + return logsScraperControllerOptionFunc(func(o *logsController) { + o.scrapers = append(o.scrapers, logsScraperWithID{ + Logs: scraper, + id: component.NewID(t), + }) + }) +} + +// WithLogsTickerChannel allows you to override the scraper controller's ticker +// channel to specify when scrape is called. This is only expected to be +// used by tests. +func WithLogsTickerChannel(tickerCh <-chan time.Time) LogsScraperControllerOption { + return logsScraperControllerOptionFunc(func(o *logsController) { + o.tickerCh = tickerCh + }) +} + +type logsController struct { + collectionInterval time.Duration + initialDelay time.Duration + timeout time.Duration + nextConsumer consumer.Logs + + scrapers []logsScraperWithID + obsScrapers []scraper.Logs + + tickerCh <-chan time.Time + + done chan struct{} + wg sync.WaitGroup + + obsrecv *receiverhelper.ObsReport +} +type logsScraperWithID struct { + scraper.Logs + id component.ID +} + +// NewLogsScraperControllerReceiver creates a Receiver with the configured options, that can control multiple scrapers. +func NewLogsScraperControllerReceiver( + cfg *ControllerConfig, + set receiver.Settings, + nextConsumer consumer.Logs, + options ...LogsScraperControllerOption, +) (component.Component, error) { + obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ + ReceiverID: set.ID, + Transport: "", + ReceiverCreateSettings: set, + }) + if err != nil { + return nil, err + } + + sc := &logsController{ + collectionInterval: cfg.CollectionInterval, + initialDelay: cfg.InitialDelay, + timeout: cfg.Timeout, + nextConsumer: nextConsumer, + done: make(chan struct{}), + obsrecv: obsrecv, + } + + for _, op := range options { + op.apply(sc) + } + + sc.obsScrapers = make([]scraper.Logs, len(sc.scrapers)) + for i := range sc.scrapers { + telSet := set.TelemetrySettings + telSet.Logger = telSet.Logger.With(zap.String("scraper", sc.scrapers[i].id.String())) + var obsScrp scraper.ScrapeLogsFunc + obsScrp, err = newObsLogs(sc.scrapers[i].ScrapeLogs, set.ID, sc.scrapers[i].id, telSet) + if err != nil { + return nil, err + } + sc.obsScrapers[i], err = scraper.NewLogs(obsScrp, scraper.WithStart(sc.scrapers[i].Start), scraper.WithShutdown(sc.scrapers[i].Shutdown)) + if err != nil { + return nil, err + } + } + + return sc, nil +} + +// Start the receiver, invoked during service start. +func (sc *logsController) Start(ctx context.Context, host component.Host) error { + for _, scrp := range sc.obsScrapers { + if err := scrp.Start(ctx, host); err != nil { + return err + } + } + + sc.startScraping() + return nil +} + +// Shutdown the receiver, invoked during service shutdown. +func (sc *logsController) Shutdown(ctx context.Context) error { + // Signal the goroutine to stop. + close(sc.done) + sc.wg.Wait() + var errs error + for _, scrp := range sc.obsScrapers { + errs = multierr.Append(errs, scrp.Shutdown(ctx)) + } + + return errs +} + +// startScraping initiates a ticker that calls Scrape based on the configured +// collection interval. +func (sc *logsController) startScraping() { + sc.wg.Add(1) + go func() { + defer sc.wg.Done() + if sc.initialDelay > 0 { + select { + case <-time.After(sc.initialDelay): + case <-sc.done: + return + } + } + + if sc.tickerCh == nil { + ticker := time.NewTicker(sc.collectionInterval) + defer ticker.Stop() + + sc.tickerCh = ticker.C + } + // Call scrape method on initialization to ensure + // that scrapers start from when the component starts + // instead of waiting for the full duration to start. + sc.scrapeLogsAndReport() + for { + select { + case <-sc.tickerCh: + sc.scrapeLogsAndReport() + case <-sc.done: + return + } + } + }() +} + +// scrapeLogsAndReport calls the Scrape function for each of the configured +// Scrapers, records observability information, and passes the scraped logs +// to the next component. +func (sc *logsController) scrapeLogsAndReport() { + ctx, done := withScrapeContext(sc.timeout) + defer done() + + logs := plog.NewLogs() + for i := range sc.obsScrapers { + md, err := sc.obsScrapers[i].ScrapeLogs(ctx) + if err != nil && !scrapererror.IsPartialScrapeError(err) { + continue + } + md.ResourceLogs().MoveAndAppendTo(logs.ResourceLogs()) + } + + logRecordCount := logs.LogRecordCount() + ctx = sc.obsrecv.StartLogsOp(ctx) + err := sc.nextConsumer.ConsumeLogs(ctx, logs) + sc.obsrecv.EndLogsOp(ctx, "", logRecordCount, err) +} diff --git a/receiver/scraperhelper/logs_scrapercontroller_test.go b/receiver/scraperhelper/logs_scrapercontroller_test.go new file mode 100644 index 00000000000..40600de7bc6 --- /dev/null +++ b/receiver/scraperhelper/logs_scrapercontroller_test.go @@ -0,0 +1,452 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package scraperhelper + +import ( + "context" + "errors" + "testing" + "time" + + "go.opentelemetry.io/collector/pdata/plog" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/codes" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.uber.org/multierr" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/receivertest" + "go.opentelemetry.io/collector/scraper" + "go.opentelemetry.io/collector/scraper/scrapererror" +) + +//type testInitialize struct { +// ch chan bool +// err error +//} + +//func (ts *testInitialize) start(context.Context, component.Host) error { +// ts.ch <- true +// return ts.err +//} +// +//type testClose struct { +// ch chan bool +// err error +//} + +//func (ts *testClose) shutdown(context.Context) error { +// ts.ch <- true +// return ts.err +//} + +type testScrapeLogs struct { + ch chan int + timesScrapeCalled int + err error +} + +func (ts *testScrapeLogs) scrape(context.Context) (plog.Logs, error) { + ts.timesScrapeCalled++ + ts.ch <- ts.timesScrapeCalled + + if ts.err != nil { + return plog.Logs{}, ts.err + } + + md := plog.NewLogs() + md.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("test") + return md, nil +} + +//func newTestNoDelaySettings() *ControllerConfig { +// return &ControllerConfig{ +// CollectionInterval: time.Second, +// InitialDelay: 0, +// } +//} + +type logsTestCase struct { + name string + + scrapers int + scraperControllerSettings *ControllerConfig + scrapeErr error + expectScraped bool + + initialize bool + close bool + initializeErr error + closeErr error +} + +func TestLogsScrapeController(t *testing.T) { + testCases := []logsTestCase{ + { + name: "NoScrapers", + }, + { + name: "AddLogsScrapersWithCollectionInterval", + scrapers: 2, + expectScraped: true, + }, + { + name: "AddLogsScrapers_ScrapeError", + scrapers: 2, + scrapeErr: errors.New("err1"), + }, + { + name: "AddLogsScrapersWithInitializeAndClose", + scrapers: 2, + initialize: true, + expectScraped: true, + close: true, + }, + { + name: "AddLogsScrapersWithInitializeAndCloseErrors", + scrapers: 2, + initialize: true, + close: true, + initializeErr: errors.New("err1"), + closeErr: errors.New("err2"), + }, + } + + for _, tt := range testCases { + test := tt + t.Run(test.name, func(t *testing.T) { + receiverID := component.MustNewID("receiver") + tt, err := componenttest.SetupTelemetry(receiverID) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + initializeChs := make([]chan bool, test.scrapers) + scrapeLogsChs := make([]chan int, test.scrapers) + closeChs := make([]chan bool, test.scrapers) + options := configureLogOptions(t, test, initializeChs, scrapeLogsChs, closeChs) + + tickerCh := make(chan time.Time) + options = append(options, WithLogsTickerChannel(tickerCh)) + + sink := new(consumertest.LogsSink) + cfg := newTestNoDelaySettings() + if test.scraperControllerSettings != nil { + cfg = test.scraperControllerSettings + } + + mr, err := NewLogsScraperControllerReceiver(cfg, receiver.Settings{ID: receiverID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, sink, options...) + require.NoError(t, err) + + err = mr.Start(context.Background(), componenttest.NewNopHost()) + expectedStartErr := getLogsExpectedStartErr(test) + if expectedStartErr != nil { + assert.Equal(t, expectedStartErr, err) + } else if test.initialize { + assertChannelsCalled(t, initializeChs, "start was not called") + } + + const iterations = 5 + + if test.expectScraped || test.scrapeErr != nil { + // validate that scrape is called at least N times for each configured scraper + for _, ch := range scrapeLogsChs { + <-ch + } + // Consume the initial scrapes on start + for i := 0; i < iterations; i++ { + tickerCh <- time.Now() + + for _, ch := range scrapeLogsChs { + <-ch + } + } + + // wait until all calls to scrape have completed + if test.scrapeErr == nil { + require.Eventually(t, func() bool { + return sink.LogRecordCount() == (1+iterations)*(test.scrapers) + }, time.Second, time.Millisecond) + } + + if test.expectScraped { + assert.GreaterOrEqual(t, sink.LogRecordCount(), iterations) + } + + spans := tt.SpanRecorder.Ended() + assertLogsReceiverSpan(t, spans) + assertLogsReceiverViews(t, tt, sink) + assertLogsScraperSpan(t, test.scrapeErr, spans) + assertLogsScraperViews(t, tt, test.scrapeErr, sink) + } + + err = mr.Shutdown(context.Background()) + expectedShutdownErr := getLogsExpectedShutdownErr(test) + if expectedShutdownErr != nil { + assert.EqualError(t, err, expectedShutdownErr.Error()) + } else if test.close { + assertChannelsCalled(t, closeChs, "shutdown was not called") + } + }) + } +} + +func configureLogOptions(t *testing.T, test logsTestCase, initializeChs []chan bool, scrapeLogsChs []chan int, closeChs []chan bool) []LogsScraperControllerOption { + var logOptions []LogsScraperControllerOption + + for i := 0; i < test.scrapers; i++ { + var scraperOptions []scraper.Option + if test.initialize { + initializeChs[i] = make(chan bool, 1) + ti := &testInitialize{ch: initializeChs[i], err: test.initializeErr} + scraperOptions = append(scraperOptions, scraper.WithStart(ti.start)) + } + if test.close { + closeChs[i] = make(chan bool, 1) + tc := &testClose{ch: closeChs[i], err: test.closeErr} + scraperOptions = append(scraperOptions, scraper.WithShutdown(tc.shutdown)) + } + + scrapeLogsChs[i] = make(chan int) + tsm := &testScrapeLogs{ch: scrapeLogsChs[i], err: test.scrapeErr} + scp, err := scraper.NewLogs(tsm.scrape, scraperOptions...) + require.NoError(t, err) + + logOptions = append(logOptions, AddLogsScraper(component.MustNewType("scraper"), scp)) + } + + return logOptions +} + +func getLogsExpectedStartErr(test logsTestCase) error { + return test.initializeErr +} + +func getLogsExpectedShutdownErr(test logsTestCase) error { + var errs error + + if test.closeErr != nil { + for i := 0; i < test.scrapers; i++ { + errs = multierr.Append(errs, test.closeErr) + } + } + + return errs +} + +func assertLogsChannelsCalled(t *testing.T, chs []chan bool, message string) { + for _, ic := range chs { + assertChannelCalled(t, ic, message) + } +} + +func assertLogsChannelCalled(t *testing.T, ch chan bool, message string) { + select { + case <-ch: + default: + assert.Fail(t, message) + } +} + +func assertLogsReceiverSpan(t *testing.T, spans []sdktrace.ReadOnlySpan) { + receiverSpan := false + for _, span := range spans { + if span.Name() == "receiver/receiver/LogsReceived" { + receiverSpan = true + break + } + } + assert.True(t, receiverSpan) +} + +func assertLogsReceiverViews(t *testing.T, tt componenttest.TestTelemetry, sink *consumertest.LogsSink) { + logCount := 0 + for _, md := range sink.AllLogs() { + logCount += md.LogRecordCount() + } + require.NoError(t, tt.CheckReceiverLogs("", int64(logCount), 0)) +} + +func assertLogsScraperSpan(t *testing.T, expectedErr error, spans []sdktrace.ReadOnlySpan) { + expectedStatusCode := codes.Unset + expectedStatusMessage := "" + if expectedErr != nil { + expectedStatusCode = codes.Error + expectedStatusMessage = expectedErr.Error() + } + + scraperSpan := false + for _, span := range spans { + if span.Name() == "scraper/scraper/ScrapeLogs" { + scraperSpan = true + assert.Equal(t, expectedStatusCode, span.Status().Code) + assert.Equal(t, expectedStatusMessage, span.Status().Description) + break + } + } + assert.True(t, scraperSpan) +} + +func assertLogsScraperViews(t *testing.T, tt componenttest.TestTelemetry, expectedErr error, sink *consumertest.LogsSink) { + expectedScraped := int64(sink.LogRecordCount()) + expectedErrored := int64(0) + if expectedErr != nil { + var partialError scrapererror.PartialScrapeError + if errors.As(expectedErr, &partialError) { + expectedErrored = int64(partialError.Failed) + } else { + expectedScraped = int64(0) + expectedErrored = int64(sink.LogRecordCount()) + } + } + + require.NoError(t, tt.CheckScraperLogs(component.MustNewID("receiver"), component.MustNewID("scraper"), expectedScraped, expectedErrored)) +} + +func TestLogsSingleScrapePerInterval(t *testing.T) { + scrapeLogsCh := make(chan int, 10) + tsm := &testScrapeLogs{ch: scrapeLogsCh} + + cfg := newTestNoDelaySettings() + + tickerCh := make(chan time.Time) + + scp, err := scraper.NewLogs(tsm.scrape) + require.NoError(t, err) + + recv, err := NewLogsScraperControllerReceiver( + cfg, + receivertest.NewNopSettings(), + new(consumertest.LogsSink), + AddLogsScraper(component.MustNewType("scaper"), scp), + WithLogsTickerChannel(tickerCh), + ) + require.NoError(t, err) + + require.NoError(t, recv.Start(context.Background(), componenttest.NewNopHost())) + defer func() { require.NoError(t, recv.Shutdown(context.Background())) }() + + tickerCh <- time.Now() + + assert.Eventually( + t, + func() bool { + return <-scrapeLogsCh == 2 + }, + 300*time.Millisecond, + 100*time.Millisecond, + "Make sure the scraper channel is called twice", + ) + + select { + case <-scrapeLogsCh: + assert.Fail(t, "Scrape was called more than twice") + case <-time.After(100 * time.Millisecond): + return + } +} + +func TestLogsScrapeControllerStartsOnInit(t *testing.T) { + t.Parallel() + + tsm := &testScrapeLogs{ + ch: make(chan int, 1), + } + + scp, err := scraper.NewLogs(tsm.scrape) + require.NoError(t, err, "Must not error when creating scraper") + + r, err := NewLogsScraperControllerReceiver( + &ControllerConfig{ + CollectionInterval: time.Hour, + InitialDelay: 0, + }, + receivertest.NewNopSettings(), + new(consumertest.LogsSink), + AddLogsScraper(component.MustNewType("scaper"), scp), + ) + require.NoError(t, err, "Must not error when creating scrape controller") + + assert.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()), "Must not error on start") + <-time.After(500 * time.Nanosecond) + require.NoError(t, r.Shutdown(context.Background()), "Must not have errored on shutdown") + assert.Equal(t, 1, tsm.timesScrapeCalled, "Must have been called as soon as the controller started") +} + +func TestLogsScrapeControllerInitialDelay(t *testing.T) { + if testing.Short() { + t.Skip("This requires real time to pass, skipping") + return + } + + t.Parallel() + + var ( + elapsed = make(chan time.Time, 1) + cfg = ControllerConfig{ + CollectionInterval: time.Second, + InitialDelay: 300 * time.Millisecond, + } + ) + + scp, err := scraper.NewLogs(func(context.Context) (plog.Logs, error) { + elapsed <- time.Now() + return plog.NewLogs(), nil + }) + require.NoError(t, err, "Must not error when creating scraper") + + r, err := NewLogsScraperControllerReceiver( + &cfg, + receivertest.NewNopSettings(), + new(consumertest.LogsSink), + AddLogsScraper(component.MustNewType("scaper"), scp), + ) + require.NoError(t, err, "Must not error when creating receiver") + + t0 := time.Now() + require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()), "Must not error when starting") + t1 := <-elapsed + + assert.GreaterOrEqual(t, t1.Sub(t0), 300*time.Millisecond, "Must have had 300ms pass as defined by initial delay") + + assert.NoError(t, r.Shutdown(context.Background()), "Must not error closing down") +} + +func TestLogsShutdownBeforeScrapeCanStart(t *testing.T) { + cfg := ControllerConfig{ + CollectionInterval: time.Second, + InitialDelay: 5 * time.Second, + } + + scp, err := scraper.NewLogs(func(context.Context) (plog.Logs, error) { + // make the scraper wait for long enough it would disrupt a shutdown. + time.Sleep(30 * time.Second) + return plog.NewLogs(), nil + }) + require.NoError(t, err, "Must not error when creating scraper") + + r, err := NewLogsScraperControllerReceiver( + &cfg, + receivertest.NewNopSettings(), + new(consumertest.LogsSink), + AddLogsScraper(component.MustNewType("scaper"), scp), + ) + require.NoError(t, err, "Must not error when creating receiver") + require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost())) + shutdown := make(chan struct{}, 1) + go func() { + assert.NoError(t, r.Shutdown(context.Background())) + close(shutdown) + }() + timer := time.NewTicker(10 * time.Second) + select { + case <-timer.C: + require.Fail(t, "shutdown should not wait for scraping") + case <-shutdown: + } +} diff --git a/receiver/scraperhelper/metadata.yaml b/receiver/scraperhelper/metadata.yaml index 3fd8990d23c..2690f5362a7 100644 --- a/receiver/scraperhelper/metadata.yaml +++ b/receiver/scraperhelper/metadata.yaml @@ -27,4 +27,24 @@ telemetry: unit: "{datapoints}" sum: value_type: int - monotonic: true \ No newline at end of file + monotonic: true + + scraper_scraped_log_records: + enabled: true + stability: + level: alpha + description: Number of log records successfully scraped. + unit: "{datapoints}" + sum: + value_type: int + monotonic: true + + scraper_errored_log_records: + enabled: true + stability: + level: alpha + description: Number of log records that were unable to be scraped. + unit: "{datapoints}" + sum: + value_type: int + monotonic: true diff --git a/receiver/scraperhelper/scrapercontroller.go b/receiver/scraperhelper/metrics_scrapercontroller.go similarity index 77% rename from receiver/scraperhelper/scrapercontroller.go rename to receiver/scraperhelper/metrics_scrapercontroller.go index 78e5f2a3e58..e207d91ddd4 100644 --- a/receiver/scraperhelper/scrapercontroller.go +++ b/receiver/scraperhelper/metrics_scrapercontroller.go @@ -20,47 +20,47 @@ import ( "go.opentelemetry.io/collector/scraper/scrapererror" ) -// ScraperControllerOption apply changes to internal options. -type ScraperControllerOption interface { - apply(*controller) +// MetricsScraperControllerOption apply changes to internal options. +type MetricsScraperControllerOption interface { + apply(*metricsController) } -type scraperControllerOptionFunc func(*controller) +type metricsScraperControllerOptionFunc func(*metricsController) -func (of scraperControllerOptionFunc) apply(e *controller) { +func (of metricsScraperControllerOptionFunc) apply(e *metricsController) { of(e) } -// AddScraper configures the provided scrape function to be called +// AddMetricsScraper configures the provided scrape function to be called // with the specified options, and at the specified collection interval. // // Observability information will be reported, and the scraped metrics // will be passed to the next consumer. -func AddScraper(t component.Type, scraper scraper.Metrics) ScraperControllerOption { - return scraperControllerOptionFunc(func(o *controller) { - o.scrapers = append(o.scrapers, scraperWithID{ +func AddMetricsScraper(t component.Type, scraper scraper.Metrics) MetricsScraperControllerOption { + return metricsScraperControllerOptionFunc(func(o *metricsController) { + o.scrapers = append(o.scrapers, metricsScraperWithID{ Metrics: scraper, id: component.NewID(t), }) }) } -// WithTickerChannel allows you to override the scraper controller's ticker +// WithMetricsTickerChannel allows you to override the scraper controller's ticker // channel to specify when scrape is called. This is only expected to be // used by tests. -func WithTickerChannel(tickerCh <-chan time.Time) ScraperControllerOption { - return scraperControllerOptionFunc(func(o *controller) { +func WithMetricsTickerChannel(tickerCh <-chan time.Time) MetricsScraperControllerOption { + return metricsScraperControllerOptionFunc(func(o *metricsController) { o.tickerCh = tickerCh }) } -type controller struct { +type metricsController struct { collectionInterval time.Duration initialDelay time.Duration timeout time.Duration nextConsumer consumer.Metrics - scrapers []scraperWithID + scrapers []metricsScraperWithID obsScrapers []scraper.Metrics tickerCh <-chan time.Time @@ -71,17 +71,17 @@ type controller struct { obsrecv *receiverhelper.ObsReport } -type scraperWithID struct { +type metricsScraperWithID struct { scraper.Metrics id component.ID } -// NewScraperControllerReceiver creates a Receiver with the configured options, that can control multiple scrapers. -func NewScraperControllerReceiver( +// NewMetricsScraperControllerReceiver creates a Receiver with the configured options, that can control multiple scrapers. +func NewMetricsScraperControllerReceiver( cfg *ControllerConfig, set receiver.Settings, nextConsumer consumer.Metrics, - options ...ScraperControllerOption, + options ...MetricsScraperControllerOption, ) (component.Component, error) { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ ReceiverID: set.ID, @@ -92,7 +92,7 @@ func NewScraperControllerReceiver( return nil, err } - sc := &controller{ + sc := &metricsController{ collectionInterval: cfg.CollectionInterval, initialDelay: cfg.InitialDelay, timeout: cfg.Timeout, @@ -124,7 +124,7 @@ func NewScraperControllerReceiver( } // Start the receiver, invoked during service start. -func (sc *controller) Start(ctx context.Context, host component.Host) error { +func (sc *metricsController) Start(ctx context.Context, host component.Host) error { for _, scrp := range sc.obsScrapers { if err := scrp.Start(ctx, host); err != nil { return err @@ -136,7 +136,7 @@ func (sc *controller) Start(ctx context.Context, host component.Host) error { } // Shutdown the receiver, invoked during service shutdown. -func (sc *controller) Shutdown(ctx context.Context) error { +func (sc *metricsController) Shutdown(ctx context.Context) error { // Signal the goroutine to stop. close(sc.done) sc.wg.Wait() @@ -150,7 +150,7 @@ func (sc *controller) Shutdown(ctx context.Context) error { // startScraping initiates a ticker that calls Scrape based on the configured // collection interval. -func (sc *controller) startScraping() { +func (sc *metricsController) startScraping() { sc.wg.Add(1) go func() { defer sc.wg.Done() @@ -186,7 +186,7 @@ func (sc *controller) startScraping() { // scrapeMetricsAndReport calls the Scrape function for each of the configured // Scrapers, records observability information, and passes the scraped metrics // to the next component. -func (sc *controller) scrapeMetricsAndReport() { +func (sc *metricsController) scrapeMetricsAndReport() { ctx, done := withScrapeContext(sc.timeout) defer done() diff --git a/receiver/scraperhelper/scrapercontroller_test.go b/receiver/scraperhelper/metrics_scrapercontroller_test.go similarity index 100% rename from receiver/scraperhelper/scrapercontroller_test.go rename to receiver/scraperhelper/metrics_scrapercontroller_test.go diff --git a/receiver/scraperhelper/obs_logs.go b/receiver/scraperhelper/obs_logs.go new file mode 100644 index 00000000000..6c21cf523fb --- /dev/null +++ b/receiver/scraperhelper/obs_logs.go @@ -0,0 +1,83 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package scraperhelper // import "go.opentelemetry.io/collector/receiver/scraperhelper" + +import ( + "context" + "errors" + + "go.opentelemetry.io/collector/pdata/plog" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/metric" + "go.uber.org/zap" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/collector/receiver/internal" + "go.opentelemetry.io/collector/receiver/scraperhelper/internal/metadata" + "go.opentelemetry.io/collector/scraper" + "go.opentelemetry.io/collector/scraper/scrapererror" +) + +const ( + // scrapedLogRecordsKey used to identify log records scraped by the + // Collector. + scrapedLogRecordsKey = "scraped_log_records" + // erroredLogRecordsKey used to identify log records errored (i.e. + // unable to be scraped) by the Collector. + erroredLogRecordsKey = "errored_log_records" +) + +func newObsLogs(delegate scraper.ScrapeLogsFunc, receiverID component.ID, scraperID component.ID, telSettings component.TelemetrySettings) (scraper.ScrapeLogsFunc, error) { + telemetryBuilder, errBuilder := metadata.NewTelemetryBuilder(telSettings) + if errBuilder != nil { + return nil, errBuilder + } + + tracer := metadata.Tracer(telSettings) + spanName := scraperKey + internal.SpanNameSep + scraperID.String() + internal.SpanNameSep + "ScrapeLogs" + otelAttrs := metric.WithAttributeSet(attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(scraperKey, scraperID.String()), + )) + + return func(ctx context.Context) (plog.Logs, error) { + ctx, span := tracer.Start(ctx, spanName) + defer span.End() + + md, err := delegate(ctx) + numScrapedLogs := 0 + numErroredLogs := 0 + if err != nil { + telSettings.Logger.Error("Error scraping logs", zap.Error(err)) + var partialErr scrapererror.PartialScrapeError + if errors.As(err, &partialErr) { + numErroredLogs = partialErr.Failed + numScrapedLogs = md.LogRecordCount() + } + } else { + numScrapedLogs = md.LogRecordCount() + } + + telemetryBuilder.ScraperScrapedLogRecords.Add(ctx, int64(numScrapedLogs), otelAttrs) + telemetryBuilder.ScraperErroredLogRecords.Add(ctx, int64(numErroredLogs), otelAttrs) + + // end span according to errors + if span.IsRecording() { + span.SetAttributes( + attribute.String(internal.FormatKey, pipeline.SignalMetrics.String()), + attribute.Int64(scrapedLogRecordsKey, int64(numScrapedLogs)), + attribute.Int64(erroredLogRecordsKey, int64(numErroredLogs)), + ) + + if err != nil { + span.SetStatus(codes.Error, err.Error()) + } + } + + return md, err + }, nil +} diff --git a/receiver/scraperhelper/obs_logs_test.go b/receiver/scraperhelper/obs_logs_test.go new file mode 100644 index 00000000000..95acfe5b645 --- /dev/null +++ b/receiver/scraperhelper/obs_logs_test.go @@ -0,0 +1,96 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package scraperhelper + +import ( + "context" + "errors" + "testing" + + "go.opentelemetry.io/collector/pdata/plog" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/pdata/testdata" + "go.opentelemetry.io/collector/scraper" +) + +func TestScrapeLogsDataOp(t *testing.T) { + tt, err := componenttest.SetupTelemetry(receiverID) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + parentCtx, parentSpan := tt.TelemetrySettings().TracerProvider.Tracer("test").Start(context.Background(), t.Name()) + defer parentSpan.End() + + params := []testParams{ + {items: 23, err: partialErrFake}, + {items: 29, err: errFake}, + {items: 15, err: nil}, + } + for i := range params { + var sf scraper.ScrapeLogsFunc + sf, err = newObsLogs(func(context.Context) (plog.Logs, error) { + return testdata.GenerateLogs(params[i].items), params[i].err + }, receiverID, scraperID, tt.TelemetrySettings()) + require.NoError(t, err) + _, err = sf.ScrapeLogs(parentCtx) + require.ErrorIs(t, err, params[i].err) + } + + spans := tt.SpanRecorder.Ended() + require.Equal(t, len(params), len(spans)) + + var scrapedLogRecords, erroredLogRecords int + for i, span := range spans { + assert.Equal(t, "scraper/"+scraperID.String()+"/ScrapeLogs", span.Name()) + switch { + case params[i].err == nil: + scrapedLogRecords += params[i].items + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: scrapedLogRecordsKey, Value: attribute.Int64Value(int64(params[i].items))}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: erroredLogRecordsKey, Value: attribute.Int64Value(0)}) + assert.Equal(t, codes.Unset, span.Status().Code) + case errors.Is(params[i].err, errFake): + // Since we get an error, we cannot record any metrics because we don't know if the returned plog.Logs is valid instance. + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: scrapedLogRecordsKey, Value: attribute.Int64Value(0)}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: erroredLogRecordsKey, Value: attribute.Int64Value(0)}) + assert.Equal(t, codes.Error, span.Status().Code) + assert.Equal(t, params[i].err.Error(), span.Status().Description) + case errors.Is(params[i].err, partialErrFake): + scrapedLogRecords += params[i].items + erroredLogRecords += 2 + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: scrapedLogRecordsKey, Value: attribute.Int64Value(int64(params[i].items))}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: erroredLogRecordsKey, Value: attribute.Int64Value(2)}) + assert.Equal(t, codes.Error, span.Status().Code) + assert.Equal(t, params[i].err.Error(), span.Status().Description) + default: + t.Fatalf("unexpected err param: %v", params[i].err) + } + } + + require.NoError(t, tt.CheckScraperLogs(receiverID, scraperID, int64(scrapedLogRecords), int64(erroredLogRecords))) +} + +func TestCheckScraperLogs(t *testing.T) { + tt, err := componenttest.SetupTelemetry(receiverID) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + var sf scraper.ScrapeLogsFunc + sf, err = newObsLogs(func(context.Context) (plog.Logs, error) { + return testdata.GenerateLogs(7), nil + }, receiverID, scraperID, tt.TelemetrySettings()) + require.NoError(t, err) + _, err = sf.ScrapeLogs(context.Background()) + assert.NoError(t, err) + + require.NoError(t, tt.CheckScraperLogs(receiverID, scraperID, 7, 0)) + require.Error(t, tt.CheckScraperLogs(receiverID, scraperID, 7, 7)) + require.Error(t, tt.CheckScraperLogs(receiverID, scraperID, 0, 0)) + require.Error(t, tt.CheckScraperLogs(receiverID, scraperID, 0, 7)) +} From 6450529135b488fe66c194a0ea11af2270fa2cc2 Mon Sep 17 00:00:00 2001 From: Chao Date: Mon, 16 Dec 2024 17:04:24 +0800 Subject: [PATCH 2/4] Update --- receiver/scraperhelper/alias.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/receiver/scraperhelper/alias.go b/receiver/scraperhelper/alias.go index ffa84867fde..3f50550874d 100644 --- a/receiver/scraperhelper/alias.go +++ b/receiver/scraperhelper/alias.go @@ -1,4 +1,7 @@ -package scraperhelper +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package scraperhelper // import "go.opentelemetry.io/collector/receiver/scraperhelper" var ( AddScraper = AddMetricsScraper From 9bbcedcea8203b8a80c3d5798f73ba49b7f1daf1 Mon Sep 17 00:00:00 2001 From: Chao Date: Mon, 16 Dec 2024 17:19:42 +0800 Subject: [PATCH 3/4] Update --- .../logs_scrapercontroller_test.go | 41 ------------------- 1 file changed, 41 deletions(-) diff --git a/receiver/scraperhelper/logs_scrapercontroller_test.go b/receiver/scraperhelper/logs_scrapercontroller_test.go index 40600de7bc6..44c86b51395 100644 --- a/receiver/scraperhelper/logs_scrapercontroller_test.go +++ b/receiver/scraperhelper/logs_scrapercontroller_test.go @@ -26,26 +26,6 @@ import ( "go.opentelemetry.io/collector/scraper/scrapererror" ) -//type testInitialize struct { -// ch chan bool -// err error -//} - -//func (ts *testInitialize) start(context.Context, component.Host) error { -// ts.ch <- true -// return ts.err -//} -// -//type testClose struct { -// ch chan bool -// err error -//} - -//func (ts *testClose) shutdown(context.Context) error { -// ts.ch <- true -// return ts.err -//} - type testScrapeLogs struct { ch chan int timesScrapeCalled int @@ -65,13 +45,6 @@ func (ts *testScrapeLogs) scrape(context.Context) (plog.Logs, error) { return md, nil } -//func newTestNoDelaySettings() *ControllerConfig { -// return &ControllerConfig{ -// CollectionInterval: time.Second, -// InitialDelay: 0, -// } -//} - type logsTestCase struct { name string @@ -239,20 +212,6 @@ func getLogsExpectedShutdownErr(test logsTestCase) error { return errs } -func assertLogsChannelsCalled(t *testing.T, chs []chan bool, message string) { - for _, ic := range chs { - assertChannelCalled(t, ic, message) - } -} - -func assertLogsChannelCalled(t *testing.T, ch chan bool, message string) { - select { - case <-ch: - default: - assert.Fail(t, message) - } -} - func assertLogsReceiverSpan(t *testing.T, spans []sdktrace.ReadOnlySpan) { receiverSpan := false for _, span := range spans { From 180a68d0980699c0788c05006e90450c6186563a Mon Sep 17 00:00:00 2001 From: Chao Date: Mon, 16 Dec 2024 17:39:48 +0800 Subject: [PATCH 4/4] Update --- receiver/scraperhelper/logs_scrapercontroller.go | 3 +-- receiver/scraperhelper/logs_scrapercontroller_test.go | 3 +-- receiver/scraperhelper/obs_logs.go | 3 +-- receiver/scraperhelper/obs_logs_test.go | 3 +-- 4 files changed, 4 insertions(+), 8 deletions(-) diff --git a/receiver/scraperhelper/logs_scrapercontroller.go b/receiver/scraperhelper/logs_scrapercontroller.go index bd4663be0ba..a561d1162ef 100644 --- a/receiver/scraperhelper/logs_scrapercontroller.go +++ b/receiver/scraperhelper/logs_scrapercontroller.go @@ -8,13 +8,12 @@ import ( "sync" "time" - "go.opentelemetry.io/collector/pdata/plog" - "go.uber.org/multierr" "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/scraper" diff --git a/receiver/scraperhelper/logs_scrapercontroller_test.go b/receiver/scraperhelper/logs_scrapercontroller_test.go index 44c86b51395..a5ee81e01d4 100644 --- a/receiver/scraperhelper/logs_scrapercontroller_test.go +++ b/receiver/scraperhelper/logs_scrapercontroller_test.go @@ -9,8 +9,6 @@ import ( "testing" "time" - "go.opentelemetry.io/collector/pdata/plog" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/codes" @@ -20,6 +18,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receivertest" "go.opentelemetry.io/collector/scraper" diff --git a/receiver/scraperhelper/obs_logs.go b/receiver/scraperhelper/obs_logs.go index 6c21cf523fb..e64f04e0244 100644 --- a/receiver/scraperhelper/obs_logs.go +++ b/receiver/scraperhelper/obs_logs.go @@ -7,14 +7,13 @@ import ( "context" "errors" - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/metric" "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/receiver/internal" "go.opentelemetry.io/collector/receiver/scraperhelper/internal/metadata" diff --git a/receiver/scraperhelper/obs_logs_test.go b/receiver/scraperhelper/obs_logs_test.go index 95acfe5b645..1d06bab94aa 100644 --- a/receiver/scraperhelper/obs_logs_test.go +++ b/receiver/scraperhelper/obs_logs_test.go @@ -8,14 +8,13 @@ import ( "errors" "testing" - "go.opentelemetry.io/collector/pdata/plog" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/testdata" "go.opentelemetry.io/collector/scraper" )