Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scraper for logs #11799

Merged
merged 8 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .chloggen/add-scraper-for-logs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: receiver/scraperhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add scraper for logs

# One or more tracking issues or pull requests related to the change
issues: [11238]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
4 changes: 2 additions & 2 deletions scraper/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ A scraper defines how to connect and scrape telemetry data from an external sour
<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [development]: metrics |
| Stability | [development]: metrics, logs |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector?query=is%3Aissue%20is%3Aopen%20label%3Apkg%2F%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector/issues?q=is%3Aopen+is%3Aissue+label%3Apkg%2F) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector?query=is%3Aissue%20is%3Aclosed%20label%3Apkg%2F%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector/issues?q=is%3Aclosed+is%3Aissue+label%3Apkg%2F) |

[development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development
<!-- end autogenerated section -->
<!-- end autogenerated section -->
44 changes: 44 additions & 0 deletions scraper/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package scraper // import "go.opentelemetry.io/collector/scraper"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/plog"
)

// Logs is the base interface for logs scrapers.
type Logs interface {
component.Component

// ScrapeLogs is the base interface to indicate that how should logs be scraped.
ScrapeLogs(context.Context) (plog.Logs, error)
bogdandrutu marked this conversation as resolved.
Show resolved Hide resolved
}

// ScrapeLogsFunc is a helper function that is similar to Logs.ScrapeLogs.
type ScrapeLogsFunc ScrapeFunc[plog.Logs]

func (sf ScrapeLogsFunc) ScrapeLogs(ctx context.Context) (plog.Logs, error) {
return sf(ctx)
}

type logs struct {
baseScraper
ScrapeLogsFunc
}

// NewLogs creates a new Logs scraper.
func NewLogs(scrape ScrapeLogsFunc, options ...Option) (Logs, error) {
if scrape == nil {
return nil, errNilFunc
}
bs := &logs{
baseScraper: newBaseScraper(options),
ScrapeLogsFunc: scrape,
}

return bs, nil
}
79 changes: 79 additions & 0 deletions scraper/logs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package scraper

import (
"context"
"errors"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/pdata/plog"
)

func TestNewLogs(t *testing.T) {
mp, err := NewLogs(newTestScrapeLogsFunc(nil))
require.NoError(t, err)

require.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost()))
md, err := mp.ScrapeLogs(context.Background())
require.NoError(t, err)
assert.Equal(t, plog.NewLogs(), md)
require.NoError(t, mp.Shutdown(context.Background()))
}

func TestNewLogs_WithOptions(t *testing.T) {
want := errors.New("my_error")
mp, err := NewLogs(newTestScrapeLogsFunc(nil),
WithStart(func(context.Context, component.Host) error { return want }),
WithShutdown(func(context.Context) error { return want }))
require.NoError(t, err)

assert.Equal(t, want, mp.Start(context.Background(), componenttest.NewNopHost()))
assert.Equal(t, want, mp.Shutdown(context.Background()))
}

func TestNewLogs_NilRequiredFields(t *testing.T) {
_, err := NewLogs(nil)
require.Error(t, err)
}

func TestNewLogs_ProcessLogsError(t *testing.T) {
want := errors.New("my_error")
mp, err := NewLogs(newTestScrapeLogsFunc(want))
require.NoError(t, err)
_, err = mp.ScrapeLogs(context.Background())
require.ErrorIs(t, err, want)
}

func TestLogsConcurrency(t *testing.T) {
mp, err := NewLogs(newTestScrapeLogsFunc(nil))
require.NoError(t, err)
require.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost()))

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 10000; j++ {
_, errScrape := mp.ScrapeLogs(context.Background())
assert.NoError(t, errScrape)
}
}()
}
wg.Wait()
require.NoError(t, mp.Shutdown(context.Background()))
}

func newTestScrapeLogsFunc(retError error) ScrapeLogsFunc {
return func(_ context.Context) (plog.Logs, error) {
return plog.NewLogs(), retError
}
}
2 changes: 1 addition & 1 deletion scraper/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ github_project: open-telemetry/opentelemetry-collector
status:
class: pkg
stability:
development: [metrics]
development: [metrics, logs]
2 changes: 1 addition & 1 deletion scraper/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

var errNilFunc = errors.New("nil scrape func")

// ScrapeFunc scrapes metrics.
// ScrapeFunc scrapes data.
type ScrapeFunc[T any] func(context.Context) (T, error)

// Option apply changes to internal options.
Expand Down
4 changes: 2 additions & 2 deletions scraper/scrapererror/partialscrapeerror.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ package scrapererror // import "go.opentelemetry.io/collector/scraper/scrapererr
import "errors"

// PartialScrapeError is an error to represent
// that a subset of metrics were failed to be scraped.
// that a subset of data were failed to be scraped.
type PartialScrapeError struct {
error
Failed int
}

// NewPartialScrapeError creates PartialScrapeError for failed metrics.
// NewPartialScrapeError creates PartialScrapeError for failed data.
// Use this error type only when a subset of data was failed to be scraped.
func NewPartialScrapeError(err error, failed int) PartialScrapeError {
return PartialScrapeError{
Expand Down
Loading