From ccde7bee8326eeca8a56b99bcb7653afd877cf16 Mon Sep 17 00:00:00 2001 From: Vyom Yadav Date: Sat, 4 Jan 2025 22:51:25 +0530 Subject: [PATCH] Record Metrics for Reminder Signed-off-by: Vyom Yadav --- internal/reminder/metrics/metrics.go | 105 +++++++++++++++++++++++++++ internal/reminder/metrics_server.go | 84 +++++++++++++++++++++ internal/reminder/reminder.go | 61 ++++++++++++---- internal/reminder/reminder_test.go | 2 +- pkg/config/reminder/config.go | 10 ++- 5 files changed, 242 insertions(+), 20 deletions(-) create mode 100644 internal/reminder/metrics/metrics.go create mode 100644 internal/reminder/metrics_server.go diff --git a/internal/reminder/metrics/metrics.go b/internal/reminder/metrics/metrics.go new file mode 100644 index 0000000000..657ec64e62 --- /dev/null +++ b/internal/reminder/metrics/metrics.go @@ -0,0 +1,105 @@ +// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package metrics provides metrics for the reminder service +package metrics + +import ( + "context" + + "go.opentelemetry.io/otel/metric" +) + +// Default bucket boundaries in seconds for the delay histograms +var delayBuckets = []float64{ + 60, // 1 minute + 300, // 5 minutes + 600, // 10 minutes + 1800, // 30 minutes + 3600, // 1 hour + 7200, // 2 hours + 10800, // 3 hours + 18000, // 5 hours + 25200, // 7 hours + 36000, // 10 hours +} + +// Metrics contains all the metrics for the reminder service +type Metrics struct { + // Time between when a reminder became eligible and when it was sent + SendDelay metric.Float64Histogram + + // Time between when a reminder became eligible and when it was sent + NewSendDelay metric.Float64Histogram + + // Current number of reminders in the batch + BatchSize metric.Int64Histogram + + // Total number of batches processed + TotalBatches metric.Int64Counter + + // Total number of reminders processed (total entities reconciled) + TotalReminders metric.Int64Counter +} + +// NewMetrics creates a new metrics instance +func NewMetrics(meter metric.Meter) (*Metrics, error) { + sendDelay, err := meter.Float64Histogram( + "send_delay", + metric.WithDescription("Time between reminder becoming eligible and actual send (seconds)"), + metric.WithUnit("s"), + metric.WithExplicitBucketBoundaries(delayBuckets...), + ) + if err != nil { + return nil, err + } + + newSendDelay, err := meter.Float64Histogram( + "new_send_delay", + metric.WithDescription("Time between reminder becoming eligible and actual send (seconds) for first time reminders"), + metric.WithUnit("s"), + metric.WithExplicitBucketBoundaries(delayBuckets...), + ) + if err != nil { + return nil, err + } + + batchSize, err := meter.Int64Histogram( + "batch_size", + metric.WithDescription("Current number of reminders in the batch"), + ) + if err != nil { + return nil, err + } + + totalBatches, err := meter.Int64Counter( + "total_batches", + metric.WithDescription("Total number of batches processed"), + ) + if err != nil { + return nil, err + } + + totalReminders, err := meter.Int64Counter( + "total_reminders", + metric.WithDescription("Total number of reminders processed"), + ) + if err != nil { + return nil, err + } + + return &Metrics{ + SendDelay: sendDelay, + NewSendDelay: newSendDelay, + BatchSize: batchSize, + TotalBatches: totalBatches, + TotalReminders: totalReminders, + }, nil +} + +// RecordBatch records the metrics for a batch of reminders +func (m *Metrics) RecordBatch(ctx context.Context, size int64) { + m.BatchSize.Record(ctx, size) + m.TotalBatches.Add(ctx, 1) + m.TotalReminders.Add(ctx, size) +} diff --git a/internal/reminder/metrics_server.go b/internal/reminder/metrics_server.go new file mode 100644 index 0000000000..34489abfdd --- /dev/null +++ b/internal/reminder/metrics_server.go @@ -0,0 +1,84 @@ +// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors +// SPDX-License-Identifier: Apache-2.0 + +package reminder + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/rs/zerolog" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/prometheus" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.17.0" +) + +const ( + metricsPath = "/metrics" + readHeaderTimeout = 2 * time.Second +) + +func (r *reminder) startMetricServer(ctx context.Context) error { + logger := zerolog.Ctx(ctx) + + prometheusExporter, err := prometheus.New( + prometheus.WithNamespace("reminder"), + ) + if err != nil { + return fmt.Errorf("failed to create Prometheus exporter: %w", err) + } + + res := resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceName("reminder"), + // TODO: Make this auto-generated + semconv.ServiceVersion("v0.1.0"), + ) + + mp := sdkmetric.NewMeterProvider( + sdkmetric.WithReader(prometheusExporter), + sdkmetric.WithResource(res), + ) + + otel.SetMeterProvider(mp) + + mux := http.NewServeMux() + mux.Handle(metricsPath, promhttp.Handler()) + + server := &http.Server{ + Addr: r.cfg.MetricServer.GetAddress(), + Handler: mux, + ReadHeaderTimeout: readHeaderTimeout, + } + + logger.Info().Msgf("starting metrics server on %s", server.Addr) + + errCh := make(chan error) + go func() { + errCh <- server.ListenAndServe() + }() + + select { + case err := <-errCh: + return err + case <-ctx.Done(): + case <-r.stop: + } + + // shutdown the metrics server when either the context is done or when reminder is stopped + shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), 5*time.Second) + defer shutdownRelease() + + logger.Info().Msg("shutting down metrics server") + + if err := mp.Shutdown(shutdownCtx); err != nil { + logger.Err(err).Msg("error shutting down metrics provider") + } + + return server.Shutdown(shutdownCtx) +} diff --git a/internal/reminder/reminder.go b/internal/reminder/reminder.go index 22cb041e83..3fd4dfb3a2 100644 --- a/internal/reminder/reminder.go +++ b/internal/reminder/reminder.go @@ -14,9 +14,11 @@ import ( "github.com/ThreeDotsLabs/watermill/message" "github.com/google/uuid" "github.com/rs/zerolog" + "go.opentelemetry.io/otel" "github.com/mindersec/minder/internal/db" remindermessages "github.com/mindersec/minder/internal/reminder/messages" + "github.com/mindersec/minder/internal/reminder/metrics" reminderconfig "github.com/mindersec/minder/pkg/config/reminder" "github.com/mindersec/minder/pkg/eventer/constants" ) @@ -42,6 +44,8 @@ type reminder struct { ticker *time.Ticker eventPublisher message.Publisher + + metrics *metrics.Metrics } // NewReminder creates a new reminder instance @@ -80,6 +84,20 @@ func (r *reminder) Start(ctx context.Context) error { return fmt.Errorf("invalid interval: %s", r.cfg.RecurrenceConfig.Interval) } + if r.cfg.MetricsConfig.Enabled { + go func() { + if err := r.startMetricServer(ctx); err != nil { + logger.Error().Err(err).Msg("error starting metrics server") + } + }() + + var err error + r.metrics, err = metrics.NewMetrics(otel.Meter("reminder")) + if err != nil { + return err + } + } + r.ticker = time.NewTicker(interval) defer r.Stop() @@ -126,7 +144,7 @@ func (r *reminder) sendReminders(ctx context.Context) error { logger := zerolog.Ctx(ctx) // Fetch a batch of repositories - repos, err := r.getRepositoryBatch(ctx) + repos, repoToLastUpdated, err := r.getRepositoryBatch(ctx) if err != nil { return fmt.Errorf("error fetching repository batch: %w", err) } @@ -143,6 +161,10 @@ func (r *reminder) sendReminders(ctx context.Context) error { return fmt.Errorf("error creating reminder messages: %w", err) } + if r.metrics != nil { + r.metrics.RecordBatch(ctx, int64(len(repos))) + } + err = r.eventPublisher.Publish(constants.TopicQueueRepoReminder, messages...) if err != nil { return fmt.Errorf("error publishing messages: %w", err) @@ -151,14 +173,19 @@ func (r *reminder) sendReminders(ctx context.Context) error { repoIds := make([]uuid.UUID, len(repos)) for _, repo := range repos { repoIds = append(repoIds, repo.ID) + if r.metrics != nil { + // sendDelay = Now() - ReminderLastSent - MinElapsed + reminderLastSent := repo.ReminderLastSent + if reminderLastSent.Valid { + r.metrics.SendDelay.Record(ctx, (time.Since(reminderLastSent.Time) - r.cfg.RecurrenceConfig.MinElapsed).Seconds()) + } else { + // Recording metric for first time reminders (after how long was the initial reminder sent) + newSendDelay := time.Since(repoToLastUpdated[repo.ID]) - r.cfg.RecurrenceConfig.MinElapsed + r.metrics.NewSendDelay.Record(ctx, newSendDelay.Seconds()) + } + } } - // TODO: Collect Metrics - // Potential metrics: - // - Gauge: Number of reminders in the current batch - // - UpDownCounter: Average reminders sent per batch - // - Histogram: reminder_last_sent time distribution - err = r.store.UpdateReminderLastSentForRepositories(ctx, repoIds) if err != nil { return fmt.Errorf("reminders published but error updating last sent time: %w", err) @@ -167,7 +194,7 @@ func (r *reminder) sendReminders(ctx context.Context) error { return nil } -func (r *reminder) getRepositoryBatch(ctx context.Context) ([]db.Repository, error) { +func (r *reminder) getRepositoryBatch(ctx context.Context) ([]db.Repository, map[uuid.UUID]time.Time, error) { logger := zerolog.Ctx(ctx) logger.Debug().Msgf("fetching repositories after cursor: %s", r.repositoryCursor) @@ -176,22 +203,25 @@ func (r *reminder) getRepositoryBatch(ctx context.Context) ([]db.Repository, err Limit: int64(r.cfg.RecurrenceConfig.BatchSize), }) if err != nil { - return nil, err + return nil, nil, err } - eligibleRepos, err := r.getEligibleRepositories(ctx, repos) + eligibleRepos, eligibleReposLastUpdated, err := r.getEligibleRepositories(ctx, repos) if err != nil { - return nil, err + return nil, nil, err } logger.Debug().Msgf("%d/%d repositories are eligible for reminders", len(eligibleRepos), len(repos)) r.updateRepositoryCursor(ctx, repos) - return eligibleRepos, nil + return eligibleRepos, eligibleReposLastUpdated, nil } -func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repository) ([]db.Repository, error) { +func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repository) ( + []db.Repository, map[uuid.UUID]time.Time, error, +) { eligibleRepos := make([]db.Repository, 0, len(repos)) + eligibleReposLastUpdated := make(map[uuid.UUID]time.Time, len(repos)) // We have a slice of repositories, but the sqlc-generated code wants a slice of UUIDs, // and similarly returns slices of ID -> date (in possibly different order), so we need @@ -202,7 +232,7 @@ func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repos } oldestRuleEvals, err := r.store.ListOldestRuleEvaluationsByRepositoryId(ctx, repoIds) if err != nil { - return nil, err + return nil, nil, err } idToLastUpdate := make(map[uuid.UUID]time.Time, len(oldestRuleEvals)) for _, times := range oldestRuleEvals { @@ -213,10 +243,11 @@ func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repos for _, repo := range repos { if t, ok := idToLastUpdate[repo.ID]; ok && t.Before(cutoff) { eligibleRepos = append(eligibleRepos, repo) + eligibleReposLastUpdated[repo.ID] = t } } - return eligibleRepos, nil + return eligibleRepos, eligibleReposLastUpdated, nil } func (r *reminder) updateRepositoryCursor(ctx context.Context, repos []db.Repository) { diff --git a/internal/reminder/reminder_test.go b/internal/reminder/reminder_test.go index c47e025d21..aea5d32f80 100644 --- a/internal/reminder/reminder_test.go +++ b/internal/reminder/reminder_test.go @@ -159,7 +159,7 @@ func Test_getRepositoryBatch(t *testing.T) { r := createTestReminder(t, store, cfg) - got, err := r.getRepositoryBatch(context.Background()) + got, _, err := r.getRepositoryBatch(context.Background()) if test.err != "" { require.ErrorContains(t, err, test.err) return diff --git a/pkg/config/reminder/config.go b/pkg/config/reminder/config.go index da16082e2a..a55a218f35 100644 --- a/pkg/config/reminder/config.go +++ b/pkg/config/reminder/config.go @@ -18,10 +18,12 @@ import ( // Config contains the configuration for the reminder service type Config struct { - Database config.DatabaseConfig `mapstructure:"database"` - RecurrenceConfig RecurrenceConfig `mapstructure:"recurrence"` - EventConfig serverconfig.EventConfig `mapstructure:"events"` - LoggingConfig LoggingConfig `mapstructure:"logging"` + Database config.DatabaseConfig `mapstructure:"database"` + RecurrenceConfig RecurrenceConfig `mapstructure:"recurrence"` + EventConfig serverconfig.EventConfig `mapstructure:"events"` + LoggingConfig LoggingConfig `mapstructure:"logging"` + MetricsConfig serverconfig.MetricsConfig `mapstructure:"metrics"` + MetricServer serverconfig.MetricServerConfig `mapstructure:"metric_server" default:"{\"port\":\"9091\"}"` } // Validate validates the configuration