From 0a557ef771e3b8cbe4d556ae14f796b02914e9a4 Mon Sep 17 00:00:00 2001 From: Hanshuo Tan Date: Mon, 5 Aug 2024 13:13:58 +1000 Subject: [PATCH 1/2] update metrics for otlp backend --- pkg/backends/otlp/backend.go | 50 +++++++++++++++++++++++++----------- 1 file changed, 35 insertions(+), 15 deletions(-) diff --git a/pkg/backends/otlp/backend.go b/pkg/backends/otlp/backend.go index 1d6a30bc..8c12fec0 100644 --- a/pkg/backends/otlp/backend.go +++ b/pkg/backends/otlp/backend.go @@ -29,8 +29,14 @@ const ( // to export values as OTLP metrics. // The zero value is not safe to use. type Backend struct { - droppedMetrics uint64 - droppedEvents uint64 + batchesCreated uint64 // Accumulated number of batches created + batchesDropped uint64 // Accumulated number of batches aborted (data loss) + batchesSent uint64 // Accumulated number of batches successfully sent + seriesSent uint64 // Accumulated number of series successfully sent + seriesDropped uint64 // Accumulated number of series aborted (data loss) + + eventsSent uint64 // Accumulated number of events successfully sent + eventsDropped uint64 // Accumulated number of events aborted (data loss) metricsEndpoint string logsEndpoint string @@ -86,7 +92,14 @@ func (*Backend) Name() string { func (b *Backend) SendEvent(ctx context.Context, event *gostatsd.Event) error { statser := stats.FromContext(ctx).WithTags(gostatsd.Tags{"backend:otlp"}) defer func() { - statser.Gauge("backend.dropped_events", float64(atomic.LoadUint64(&b.droppedEvents)), nil) + statser.Gauge("backend.created", float64(atomic.LoadUint64(&b.batchesCreated)), nil) + statser.Gauge("backend.dropped", float64(atomic.LoadUint64(&b.batchesDropped)), nil) + statser.Gauge("backend.sent", float64(atomic.LoadUint64(&b.batchesSent)), nil) + statser.Gauge("backend.series.sent", float64(atomic.LoadUint64(&b.seriesSent)), nil) + statser.Gauge("backend.series.dropped", float64(atomic.LoadUint64(&b.seriesDropped)), nil) + + statser.Gauge("backend.dropped_events", float64(atomic.LoadUint64(&b.eventsDropped)), nil) + statser.Gauge("backend.sent_events", float64(atomic.LoadUint64(&b.eventsSent)), nil) }() se, err := data.NewOtlpEvent( @@ -100,7 +113,7 @@ func (b *Backend) SendEvent(ctx context.Context, event *gostatsd.Event) error { req, err := data.NewEventsRequest(ctx, b.logsEndpoint, el, rt) if err != nil { - atomic.AddUint64(&b.droppedEvents, 1) + atomic.AddUint64(&b.eventsDropped, 1) return err } @@ -108,17 +121,23 @@ func (b *Backend) SendEvent(ctx context.Context, event *gostatsd.Event) error { resp, err := b.client.Do(req) <-b.requestsBufferSem if err != nil { - atomic.AddUint64(&b.droppedEvents, 1) + atomic.AddUint64(&b.eventsDropped, 1) return err } err = data.ProcessEventsResponse(resp) - atomic.AddUint64(&b.droppedEvents, 1) + atomic.AddUint64(&b.eventsDropped, 1) return err } func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap, cb gostatsd.SendCallback) { + statser := stats.FromContext(ctx).WithTags(gostatsd.Tags{"backend:otlp"}) + defer func() { + statser.Gauge("backend.dropped", float64(atomic.LoadUint64(&bd.batchesDropped)), nil) + statser.Gauge("backend.sent", float64(atomic.LoadUint64(&bd.batchesSent)), nil) + }() + group := newGroups(bd.metricsPerBatch) mm.Counters.Each(func(name, _ string, cm gostatsd.Counter) { @@ -268,26 +287,27 @@ func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap, var errs error for _, b := range group.batches { - err := bd.postMetrics(ctx, b.values()) + atomic.AddUint64(&bd.batchesCreated, 1) + err := bd.postMetrics(ctx, b) if err != nil { bd.logger.WithError(err).WithFields(logrus.Fields{ "endpoint": bd.metricsEndpoint, }).Error("Issues trying to submit data") errs = multierr.Append(errs, err) + } else { + atomic.AddUint64(&bd.batchesSent, 1) + atomic.AddUint64(&bd.seriesSent, uint64(b.lenMetrics())) } } cb(multierr.Errors(errs)) } -func (c *Backend) postMetrics(ctx context.Context, resourceMetrics []data.ResourceMetrics) error { - statser := stats.FromContext(ctx).WithTags(gostatsd.Tags{"backend:otlp"}) - defer func() { - statser.Gauge("backend.dropped", float64(atomic.LoadUint64(&c.droppedMetrics)), nil) - }() +func (c *Backend) postMetrics(ctx context.Context, batch group) error { + resourceMetrics := batch.values() req, err := data.NewMetricsRequest(ctx, c.metricsEndpoint, resourceMetrics) if err != nil { - atomic.AddUint64(&c.droppedMetrics, uint64(len(resourceMetrics))) + atomic.AddUint64(&c.batchesDropped, 1) return err } @@ -295,11 +315,11 @@ func (c *Backend) postMetrics(ctx context.Context, resourceMetrics []data.Resour resp, err := c.client.Do(req) <-c.requestsBufferSem if err != nil { - atomic.AddUint64(&c.droppedMetrics, uint64(len(resourceMetrics))) + atomic.AddUint64(&c.batchesDropped, 1) return err } dropped, err := data.ProcessMetricResponse(resp) - atomic.AddUint64(&c.droppedMetrics, uint64(dropped)) + atomic.AddUint64(&c.seriesDropped, uint64(dropped)) return err } From 4da01327218582697003c19a7b5bf9fb6ccc3878 Mon Sep 17 00:00:00 2001 From: Hanshuo Tan Date: Mon, 5 Aug 2024 13:17:07 +1000 Subject: [PATCH 2/2] move statsers into right method --- pkg/backends/otlp/backend.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/pkg/backends/otlp/backend.go b/pkg/backends/otlp/backend.go index 8c12fec0..7c803b50 100644 --- a/pkg/backends/otlp/backend.go +++ b/pkg/backends/otlp/backend.go @@ -92,12 +92,6 @@ func (*Backend) Name() string { func (b *Backend) SendEvent(ctx context.Context, event *gostatsd.Event) error { statser := stats.FromContext(ctx).WithTags(gostatsd.Tags{"backend:otlp"}) defer func() { - statser.Gauge("backend.created", float64(atomic.LoadUint64(&b.batchesCreated)), nil) - statser.Gauge("backend.dropped", float64(atomic.LoadUint64(&b.batchesDropped)), nil) - statser.Gauge("backend.sent", float64(atomic.LoadUint64(&b.batchesSent)), nil) - statser.Gauge("backend.series.sent", float64(atomic.LoadUint64(&b.seriesSent)), nil) - statser.Gauge("backend.series.dropped", float64(atomic.LoadUint64(&b.seriesDropped)), nil) - statser.Gauge("backend.dropped_events", float64(atomic.LoadUint64(&b.eventsDropped)), nil) statser.Gauge("backend.sent_events", float64(atomic.LoadUint64(&b.eventsSent)), nil) }() @@ -134,8 +128,11 @@ func (b *Backend) SendEvent(ctx context.Context, event *gostatsd.Event) error { func (bd *Backend) SendMetricsAsync(ctx context.Context, mm *gostatsd.MetricMap, cb gostatsd.SendCallback) { statser := stats.FromContext(ctx).WithTags(gostatsd.Tags{"backend:otlp"}) defer func() { + statser.Gauge("backend.created", float64(atomic.LoadUint64(&bd.batchesCreated)), nil) statser.Gauge("backend.dropped", float64(atomic.LoadUint64(&bd.batchesDropped)), nil) statser.Gauge("backend.sent", float64(atomic.LoadUint64(&bd.batchesSent)), nil) + statser.Gauge("backend.series.sent", float64(atomic.LoadUint64(&bd.seriesSent)), nil) + statser.Gauge("backend.series.dropped", float64(atomic.LoadUint64(&bd.seriesDropped)), nil) }() group := newGroups(bd.metricsPerBatch)