Skip to content

Commit

Permalink
feat: add status code label to docs metric
Browse files Browse the repository at this point in the history
If a request fails record the status code with the elasticsearch.events.processed
metric.
Improve debugging and analysis of failed requests
  • Loading branch information
kruskall committed Dec 30, 2024
1 parent 3d1ed6e commit d287b40
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 2 deletions.
3 changes: 2 additions & 1 deletion appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -397,7 +398,7 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
}
if status != "" {
a.addCount(int64(n), legacy, a.metrics.docsIndexed,
metric.WithAttributes(attribute.String("status", status)),
metric.WithAttributes(attribute.String("status", status), semconv.HTTPResponseStatusCode(errFailed.statusCode)),
)
}
}
Expand Down
31 changes: 30 additions & 1 deletion appender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
Expand Down Expand Up @@ -693,7 +694,17 @@ func TestAppenderFlushRequestError(t *testing.T) {
w.WriteHeader(sc)
w.Write([]byte(`{"error": {"root_cause": [{"type": "x_content_parse_exception","reason": "reason"}],"type": "x_content_parse_exception","reason": "reason","caused_by": {"type": "json_parse_exception","reason": "reason"}},"status": 400}`))
})
indexer, err := docappender.New(client, docappender.Config{FlushInterval: time.Minute})

rdr := sdkmetric.NewManualReader(sdkmetric.WithTemporalitySelector(
func(ik sdkmetric.InstrumentKind) metricdata.Temporality {
return metricdata.DeltaTemporality
},
))

indexer, err := docappender.New(client, docappender.Config{
FlushInterval: time.Minute,
MeterProvider: sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr)),
})
require.NoError(t, err)
defer indexer.Close(context.Background())

Expand All @@ -716,15 +727,33 @@ func TestAppenderFlushRequestError(t *testing.T) {
BytesTotal: bytesTotal,
BytesUncompressedTotal: bytesUncompressedTotal,
}
var status string
switch {
case sc == 429:
status = "TooMany"
wantStats.TooManyRequests = int64(docs)
case sc >= 500:
status = "FailedServer"
wantStats.FailedServer = int64(docs)
case sc >= 400 && sc != 429:
status = "FailedClient"
wantStats.FailedClient = int64(docs)
}
assert.Equal(t, wantStats, stats)

var rm metricdata.ResourceMetrics
assert.NoError(t, rdr.Collect(context.Background(), &rm))

var asserted atomic.Int64
assertCounter := docappendertest.NewAssertCounter(t, &asserted)
docappendertest.AssertOTelMetrics(t, rm.ScopeMetrics[0].Metrics, func(m metricdata.Metrics) {
switch m.Name {
case "elasticsearch.events.processed":
assertCounter(m, 3, attribute.NewSet(attribute.String("status", status), semconv.HTTPResponseStatusCode(sc)))
}
})
assert.Equal(t, int64(1), asserted.Load())

}
t.Run("400", func(t *testing.T) {
test(t, http.StatusBadRequest, "flush failed (400): {\"error\":{\"type\":\"x_content_parse_exception\",\"caused_by\":{\"type\":\"json_parse_exception\"}}}")
Expand Down

0 comments on commit d287b40

Please sign in to comment.