diff --git a/.chloggen/add-scraper-for-logs.yaml b/.chloggen/add-scraper-for-logs.yaml new file mode 100644 index 00000000000..07cc4f675d6 --- /dev/null +++ b/.chloggen/add-scraper-for-logs.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: receiver/scraperhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add scraper for logs + +# One or more tracking issues or pull requests related to the change +issues: [11238] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/scraper/README.md b/scraper/README.md index 9b57da2a58d..9a5b1ea5eea 100644 --- a/scraper/README.md +++ b/scraper/README.md @@ -5,8 +5,8 @@ A scraper defines how to connect and scrape telemetry data from an external sour | Status | | | ------------- |-----------| -| Stability | [development]: metrics | +| Stability | [development]: metrics, logs | | Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector?query=is%3Aissue%20is%3Aopen%20label%3Apkg%2F%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector/issues?q=is%3Aopen+is%3Aissue+label%3Apkg%2F) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector?query=is%3Aissue%20is%3Aclosed%20label%3Apkg%2F%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector/issues?q=is%3Aclosed+is%3Aissue+label%3Apkg%2F) | [development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development - \ No newline at end of file + diff --git a/scraper/logs.go b/scraper/logs.go new file mode 100644 index 00000000000..9618001d0c2 --- /dev/null +++ b/scraper/logs.go @@ -0,0 +1,44 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package scraper // import "go.opentelemetry.io/collector/scraper" + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/plog" +) + +// Logs is the base interface for logs scrapers. +type Logs interface { + component.Component + + // ScrapeLogs is the base interface to indicate that how should logs be scraped. + ScrapeLogs(context.Context) (plog.Logs, error) +} + +// ScrapeLogsFunc is a helper function that is similar to Logs.ScrapeLogs. +type ScrapeLogsFunc ScrapeFunc[plog.Logs] + +func (sf ScrapeLogsFunc) ScrapeLogs(ctx context.Context) (plog.Logs, error) { + return sf(ctx) +} + +type logs struct { + baseScraper + ScrapeLogsFunc +} + +// NewLogs creates a new Logs scraper. +func NewLogs(scrape ScrapeLogsFunc, options ...Option) (Logs, error) { + if scrape == nil { + return nil, errNilFunc + } + bs := &logs{ + baseScraper: newBaseScraper(options), + ScrapeLogsFunc: scrape, + } + + return bs, nil +} diff --git a/scraper/logs_test.go b/scraper/logs_test.go new file mode 100644 index 00000000000..89f0a6a6331 --- /dev/null +++ b/scraper/logs_test.go @@ -0,0 +1,79 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package scraper + +import ( + "context" + "errors" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/pdata/plog" +) + +func TestNewLogs(t *testing.T) { + mp, err := NewLogs(newTestScrapeLogsFunc(nil)) + require.NoError(t, err) + + require.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost())) + md, err := mp.ScrapeLogs(context.Background()) + require.NoError(t, err) + assert.Equal(t, plog.NewLogs(), md) + require.NoError(t, mp.Shutdown(context.Background())) +} + +func TestNewLogs_WithOptions(t *testing.T) { + want := errors.New("my_error") + mp, err := NewLogs(newTestScrapeLogsFunc(nil), + WithStart(func(context.Context, component.Host) error { return want }), + WithShutdown(func(context.Context) error { return want })) + require.NoError(t, err) + + assert.Equal(t, want, mp.Start(context.Background(), componenttest.NewNopHost())) + assert.Equal(t, want, mp.Shutdown(context.Background())) +} + +func TestNewLogs_NilRequiredFields(t *testing.T) { + _, err := NewLogs(nil) + require.Error(t, err) +} + +func TestNewLogs_ProcessLogsError(t *testing.T) { + want := errors.New("my_error") + mp, err := NewLogs(newTestScrapeLogsFunc(want)) + require.NoError(t, err) + _, err = mp.ScrapeLogs(context.Background()) + require.ErrorIs(t, err, want) +} + +func TestLogsConcurrency(t *testing.T) { + mp, err := NewLogs(newTestScrapeLogsFunc(nil)) + require.NoError(t, err) + require.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost())) + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 10000; j++ { + _, errScrape := mp.ScrapeLogs(context.Background()) + assert.NoError(t, errScrape) + } + }() + } + wg.Wait() + require.NoError(t, mp.Shutdown(context.Background())) +} + +func newTestScrapeLogsFunc(retError error) ScrapeLogsFunc { + return func(_ context.Context) (plog.Logs, error) { + return plog.NewLogs(), retError + } +} diff --git a/scraper/metadata.yaml b/scraper/metadata.yaml index 9cfa23ae35c..a1c93bd8540 100644 --- a/scraper/metadata.yaml +++ b/scraper/metadata.yaml @@ -4,4 +4,4 @@ github_project: open-telemetry/opentelemetry-collector status: class: pkg stability: - development: [metrics] + development: [metrics, logs] diff --git a/scraper/scraper.go b/scraper/scraper.go index ed506b6790c..385c65f3253 100644 --- a/scraper/scraper.go +++ b/scraper/scraper.go @@ -12,7 +12,7 @@ import ( var errNilFunc = errors.New("nil scrape func") -// ScrapeFunc scrapes metrics. +// ScrapeFunc scrapes data. type ScrapeFunc[T any] func(context.Context) (T, error) // Option apply changes to internal options. diff --git a/scraper/scrapererror/partialscrapeerror.go b/scraper/scrapererror/partialscrapeerror.go index 21dfce899fb..7bdf69caebb 100644 --- a/scraper/scrapererror/partialscrapeerror.go +++ b/scraper/scrapererror/partialscrapeerror.go @@ -6,13 +6,13 @@ package scrapererror // import "go.opentelemetry.io/collector/scraper/scrapererr import "errors" // PartialScrapeError is an error to represent -// that a subset of metrics were failed to be scraped. +// that a subset of data were failed to be scraped. type PartialScrapeError struct { error Failed int } -// NewPartialScrapeError creates PartialScrapeError for failed metrics. +// NewPartialScrapeError creates PartialScrapeError for failed data. // Use this error type only when a subset of data was failed to be scraped. func NewPartialScrapeError(err error, failed int) PartialScrapeError { return PartialScrapeError{