Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logs scraper in scraperhelper #11899

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .chloggen/add-logs-scraper-in-scraperhelper.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: receiver/scraperhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add logs scraper in receiver/scraperhelper

# One or more tracking issues or pull requests related to the change
issues: [11899]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: This change adds a new logs scraper in receiver/scraperhelper, also introduced new metrics for scraping logs.

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
6 changes: 6 additions & 0 deletions component/componenttest/obsreporttest.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@
return checkReceiverMetrics(tts.reader, tts.id, protocol, acceptedMetricPoints, droppedMetricPoints)
}

// CheckScraperLogs checks that for the current exported values for logs scraper metrics match given values.
// Note: SetupTelemetry must be called before this function.
func (tts *TestTelemetry) CheckScraperLogs(receiver component.ID, scraper component.ID, scrapedLogRecords, erroredLogRecords int64) error {
return checkScraperLogs(tts.reader, receiver, scraper, scrapedLogRecords, erroredLogRecords)

Check warning on line 98 in component/componenttest/obsreporttest.go

View check run for this annotation

Codecov / codecov/patch

component/componenttest/obsreporttest.go#L97-L98

Added lines #L97 - L98 were not covered by tests
}

// CheckScraperMetrics checks that for the current exported values for metrics scraper metrics match given values.
// Note: SetupTelemetry must be called before this function.
func (tts *TestTelemetry) CheckScraperMetrics(receiver component.ID, scraper component.ID, scrapedMetricPoints, erroredMetricPoints int64) error {
Expand Down
7 changes: 7 additions & 0 deletions component/componenttest/otelchecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
"go.opentelemetry.io/collector/component"
)

func checkScraperLogs(reader *sdkmetric.ManualReader, receiver component.ID, scraper component.ID, scrapedLogRecords, erroredLogRecords int64) error {
scraperAttrs := attributesForScraperMetrics(receiver, scraper)
return multierr.Combine(
checkIntSum(reader, "otelcol_scraper_scraped_log_records", scrapedLogRecords, scraperAttrs),
checkIntSum(reader, "otelcol_scraper_errored_log_records", erroredLogRecords, scraperAttrs))

Check warning on line 22 in component/componenttest/otelchecker.go

View check run for this annotation

Codecov / codecov/patch

component/componenttest/otelchecker.go#L18-L22

Added lines #L18 - L22 were not covered by tests
}

func checkScraperMetrics(reader *sdkmetric.ManualReader, receiver component.ID, scraper component.ID, scrapedMetricPoints, erroredMetricPoints int64) error {
scraperAttrs := attributesForScraperMetrics(receiver, scraper)
return multierr.Combine(
Expand Down
12 changes: 12 additions & 0 deletions receiver/scraperhelper/alias.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package scraperhelper // import "go.opentelemetry.io/collector/receiver/scraperhelper"

var (
AddScraper = AddMetricsScraper
NewScraperControllerReceiver = NewMetricsScraperControllerReceiver
WithTickerChannel = WithMetricsTickerChannel
)

type ScraperControllerOption = MetricsScraperControllerOption
16 changes: 16 additions & 0 deletions receiver/scraperhelper/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@

The following telemetry is emitted by this component.

### otelcol_scraper_errored_log_records

Number of log records that were unable to be scraped. [alpha]

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {datapoints} | Sum | Int | true |

### otelcol_scraper_errored_metric_points

Number of metric points that were unable to be scraped. [alpha]
Expand All @@ -14,6 +22,14 @@ Number of metric points that were unable to be scraped. [alpha]
| ---- | ----------- | ---------- | --------- |
| {datapoints} | Sum | Int | true |

### otelcol_scraper_scraped_log_records

Number of log records successfully scraped. [alpha]

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {datapoints} | Sum | Int | true |

### otelcol_scraper_scraped_metric_points

Number of metric points successfully scraped. [alpha]
Expand Down
14 changes: 14 additions & 0 deletions receiver/scraperhelper/internal/metadata/generated_telemetry.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

205 changes: 205 additions & 0 deletions receiver/scraperhelper/logs_scrapercontroller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package scraperhelper // import "go.opentelemetry.io/collector/receiver/scraperhelper"

import (
"context"
"sync"
"time"

"go.uber.org/multierr"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.opentelemetry.io/collector/scraper"
"go.opentelemetry.io/collector/scraper/scrapererror"
)

// LogsScraperControllerOption apply changes to internal options.
type LogsScraperControllerOption interface {
apply(*logsController)
}

type logsScraperControllerOptionFunc func(*logsController)

func (of logsScraperControllerOptionFunc) apply(e *logsController) {
of(e)
}

// AddLogsScraper configures the provided scrape function to be called
// with the specified options, and at the specified collection interval.
//
// Observability information will be reported, and the scraped logs
// will be passed to the next consumer.
func AddLogsScraper(t component.Type, scraper scraper.Logs) LogsScraperControllerOption {
return logsScraperControllerOptionFunc(func(o *logsController) {
o.scrapers = append(o.scrapers, logsScraperWithID{
Logs: scraper,
id: component.NewID(t),
})
})
}

// WithLogsTickerChannel allows you to override the scraper controller's ticker
// channel to specify when scrape is called. This is only expected to be
// used by tests.
func WithLogsTickerChannel(tickerCh <-chan time.Time) LogsScraperControllerOption {
return logsScraperControllerOptionFunc(func(o *logsController) {
o.tickerCh = tickerCh
})
}

type logsController struct {
collectionInterval time.Duration
initialDelay time.Duration
timeout time.Duration
nextConsumer consumer.Logs

scrapers []logsScraperWithID
obsScrapers []scraper.Logs

tickerCh <-chan time.Time

done chan struct{}
wg sync.WaitGroup

obsrecv *receiverhelper.ObsReport
}
type logsScraperWithID struct {
scraper.Logs
id component.ID
}

// NewLogsScraperControllerReceiver creates a Receiver with the configured options, that can control multiple scrapers.
func NewLogsScraperControllerReceiver(
cfg *ControllerConfig,
set receiver.Settings,
nextConsumer consumer.Logs,
options ...LogsScraperControllerOption,
) (component.Component, error) {
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: set.ID,
Transport: "",
ReceiverCreateSettings: set,
})
if err != nil {
return nil, err
}

Check warning on line 92 in receiver/scraperhelper/logs_scrapercontroller.go

View check run for this annotation

Codecov / codecov/patch

receiver/scraperhelper/logs_scrapercontroller.go#L91-L92

Added lines #L91 - L92 were not covered by tests

sc := &logsController{
collectionInterval: cfg.CollectionInterval,
initialDelay: cfg.InitialDelay,
timeout: cfg.Timeout,
nextConsumer: nextConsumer,
done: make(chan struct{}),
obsrecv: obsrecv,
}

for _, op := range options {
op.apply(sc)
}

sc.obsScrapers = make([]scraper.Logs, len(sc.scrapers))
for i := range sc.scrapers {
telSet := set.TelemetrySettings
telSet.Logger = telSet.Logger.With(zap.String("scraper", sc.scrapers[i].id.String()))
var obsScrp scraper.ScrapeLogsFunc
obsScrp, err = newObsLogs(sc.scrapers[i].ScrapeLogs, set.ID, sc.scrapers[i].id, telSet)
if err != nil {
return nil, err
}

Check warning on line 115 in receiver/scraperhelper/logs_scrapercontroller.go

View check run for this annotation

Codecov / codecov/patch

receiver/scraperhelper/logs_scrapercontroller.go#L114-L115

Added lines #L114 - L115 were not covered by tests
sc.obsScrapers[i], err = scraper.NewLogs(obsScrp, scraper.WithStart(sc.scrapers[i].Start), scraper.WithShutdown(sc.scrapers[i].Shutdown))
if err != nil {
return nil, err
}

Check warning on line 119 in receiver/scraperhelper/logs_scrapercontroller.go

View check run for this annotation

Codecov / codecov/patch

receiver/scraperhelper/logs_scrapercontroller.go#L118-L119

Added lines #L118 - L119 were not covered by tests
}

return sc, nil
}

// Start the receiver, invoked during service start.
func (sc *logsController) Start(ctx context.Context, host component.Host) error {
for _, scrp := range sc.obsScrapers {
if err := scrp.Start(ctx, host); err != nil {
return err
}
}

sc.startScraping()
return nil
}

// Shutdown the receiver, invoked during service shutdown.
func (sc *logsController) Shutdown(ctx context.Context) error {
// Signal the goroutine to stop.
close(sc.done)
sc.wg.Wait()
var errs error
for _, scrp := range sc.obsScrapers {
errs = multierr.Append(errs, scrp.Shutdown(ctx))
}

return errs
}

// startScraping initiates a ticker that calls Scrape based on the configured
// collection interval.
func (sc *logsController) startScraping() {
sc.wg.Add(1)
go func() {
defer sc.wg.Done()
if sc.initialDelay > 0 {
select {
case <-time.After(sc.initialDelay):
case <-sc.done:
return
}
}

if sc.tickerCh == nil {
ticker := time.NewTicker(sc.collectionInterval)
defer ticker.Stop()

sc.tickerCh = ticker.C
}
// Call scrape method on initialization to ensure
// that scrapers start from when the component starts
// instead of waiting for the full duration to start.
sc.scrapeLogsAndReport()
for {
select {
case <-sc.tickerCh:
sc.scrapeLogsAndReport()
case <-sc.done:
return
}
}
}()
}

// scrapeLogsAndReport calls the Scrape function for each of the configured
// Scrapers, records observability information, and passes the scraped logs
// to the next component.
func (sc *logsController) scrapeLogsAndReport() {
ctx, done := withScrapeContext(sc.timeout)
defer done()

logs := plog.NewLogs()
for i := range sc.obsScrapers {
md, err := sc.obsScrapers[i].ScrapeLogs(ctx)
if err != nil && !scrapererror.IsPartialScrapeError(err) {
continue
}
md.ResourceLogs().MoveAndAppendTo(logs.ResourceLogs())
}

logRecordCount := logs.LogRecordCount()
ctx = sc.obsrecv.StartLogsOp(ctx)
err := sc.nextConsumer.ConsumeLogs(ctx, logs)
sc.obsrecv.EndLogsOp(ctx, "", logRecordCount, err)
}
Loading
Loading