Skip to content

Commit

Permalink
Record Metrics for Reminder
Browse files Browse the repository at this point in the history
Signed-off-by: Vyom Yadav <jackhammervyom@gmail.com>
  • Loading branch information
Vyom-Yadav committed Jan 4, 2025
1 parent a6d9b22 commit ccde7be
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 20 deletions.
105 changes: 105 additions & 0 deletions internal/reminder/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors
// SPDX-License-Identifier: Apache-2.0

// Package metrics provides metrics for the reminder service
package metrics

import (
"context"

"go.opentelemetry.io/otel/metric"
)

// Default bucket boundaries in seconds for the delay histograms
var delayBuckets = []float64{
60, // 1 minute
300, // 5 minutes
600, // 10 minutes
1800, // 30 minutes
3600, // 1 hour
7200, // 2 hours
10800, // 3 hours
18000, // 5 hours
25200, // 7 hours
36000, // 10 hours
}

// Metrics contains all the metrics for the reminder service
type Metrics struct {
// Time between when a reminder became eligible and when it was sent
SendDelay metric.Float64Histogram

// Time between when a reminder became eligible and when it was sent
NewSendDelay metric.Float64Histogram

// Current number of reminders in the batch
BatchSize metric.Int64Histogram

// Total number of batches processed
TotalBatches metric.Int64Counter

// Total number of reminders processed (total entities reconciled)
TotalReminders metric.Int64Counter
}

// NewMetrics creates a new metrics instance
func NewMetrics(meter metric.Meter) (*Metrics, error) {
sendDelay, err := meter.Float64Histogram(
"send_delay",
metric.WithDescription("Time between reminder becoming eligible and actual send (seconds)"),
metric.WithUnit("s"),
metric.WithExplicitBucketBoundaries(delayBuckets...),
)
if err != nil {
return nil, err
}

newSendDelay, err := meter.Float64Histogram(
"new_send_delay",
metric.WithDescription("Time between reminder becoming eligible and actual send (seconds) for first time reminders"),
metric.WithUnit("s"),
metric.WithExplicitBucketBoundaries(delayBuckets...),
)
if err != nil {
return nil, err
}

batchSize, err := meter.Int64Histogram(
"batch_size",
metric.WithDescription("Current number of reminders in the batch"),
)
if err != nil {
return nil, err
}

totalBatches, err := meter.Int64Counter(
"total_batches",
metric.WithDescription("Total number of batches processed"),
)
if err != nil {
return nil, err
}

totalReminders, err := meter.Int64Counter(
"total_reminders",
metric.WithDescription("Total number of reminders processed"),
)
if err != nil {
return nil, err
}

return &Metrics{
SendDelay: sendDelay,
NewSendDelay: newSendDelay,
BatchSize: batchSize,
TotalBatches: totalBatches,
TotalReminders: totalReminders,
}, nil
}

// RecordBatch records the metrics for a batch of reminders
func (m *Metrics) RecordBatch(ctx context.Context, size int64) {
m.BatchSize.Record(ctx, size)
m.TotalBatches.Add(ctx, 1)
m.TotalReminders.Add(ctx, size)
}
84 changes: 84 additions & 0 deletions internal/reminder/metrics_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors
// SPDX-License-Identifier: Apache-2.0

package reminder

import (
"context"
"fmt"
"net/http"
"time"

"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/prometheus"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
)

const (
metricsPath = "/metrics"
readHeaderTimeout = 2 * time.Second
)

func (r *reminder) startMetricServer(ctx context.Context) error {
logger := zerolog.Ctx(ctx)

prometheusExporter, err := prometheus.New(
prometheus.WithNamespace("reminder"),
)
if err != nil {
return fmt.Errorf("failed to create Prometheus exporter: %w", err)
}

res := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName("reminder"),
// TODO: Make this auto-generated
semconv.ServiceVersion("v0.1.0"),
)

mp := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(prometheusExporter),
sdkmetric.WithResource(res),
)

otel.SetMeterProvider(mp)

mux := http.NewServeMux()
mux.Handle(metricsPath, promhttp.Handler())

server := &http.Server{
Addr: r.cfg.MetricServer.GetAddress(),
Handler: mux,
ReadHeaderTimeout: readHeaderTimeout,
}

logger.Info().Msgf("starting metrics server on %s", server.Addr)

errCh := make(chan error)
go func() {
errCh <- server.ListenAndServe()
}()

select {
case err := <-errCh:
return err
case <-ctx.Done():
case <-r.stop:
}

// shutdown the metrics server when either the context is done or when reminder is stopped
shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownRelease()

logger.Info().Msg("shutting down metrics server")

if err := mp.Shutdown(shutdownCtx); err != nil {
logger.Err(err).Msg("error shutting down metrics provider")
}

return server.Shutdown(shutdownCtx)
}
61 changes: 46 additions & 15 deletions internal/reminder/reminder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import (
"github.com/ThreeDotsLabs/watermill/message"
"github.com/google/uuid"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel"

"github.com/mindersec/minder/internal/db"
remindermessages "github.com/mindersec/minder/internal/reminder/messages"
"github.com/mindersec/minder/internal/reminder/metrics"
reminderconfig "github.com/mindersec/minder/pkg/config/reminder"
"github.com/mindersec/minder/pkg/eventer/constants"
)
Expand All @@ -42,6 +44,8 @@ type reminder struct {
ticker *time.Ticker

eventPublisher message.Publisher

metrics *metrics.Metrics
}

// NewReminder creates a new reminder instance
Expand Down Expand Up @@ -80,6 +84,20 @@ func (r *reminder) Start(ctx context.Context) error {
return fmt.Errorf("invalid interval: %s", r.cfg.RecurrenceConfig.Interval)
}

if r.cfg.MetricsConfig.Enabled {
go func() {
if err := r.startMetricServer(ctx); err != nil {
logger.Error().Err(err).Msg("error starting metrics server")
}
}()

var err error
r.metrics, err = metrics.NewMetrics(otel.Meter("reminder"))
if err != nil {
return err
}
}

r.ticker = time.NewTicker(interval)
defer r.Stop()

Expand Down Expand Up @@ -126,7 +144,7 @@ func (r *reminder) sendReminders(ctx context.Context) error {
logger := zerolog.Ctx(ctx)

// Fetch a batch of repositories
repos, err := r.getRepositoryBatch(ctx)
repos, repoToLastUpdated, err := r.getRepositoryBatch(ctx)
if err != nil {
return fmt.Errorf("error fetching repository batch: %w", err)
}
Expand All @@ -143,6 +161,10 @@ func (r *reminder) sendReminders(ctx context.Context) error {
return fmt.Errorf("error creating reminder messages: %w", err)
}

if r.metrics != nil {
r.metrics.RecordBatch(ctx, int64(len(repos)))
}

err = r.eventPublisher.Publish(constants.TopicQueueRepoReminder, messages...)
if err != nil {
return fmt.Errorf("error publishing messages: %w", err)
Expand All @@ -151,14 +173,19 @@ func (r *reminder) sendReminders(ctx context.Context) error {
repoIds := make([]uuid.UUID, len(repos))
for _, repo := range repos {
repoIds = append(repoIds, repo.ID)
if r.metrics != nil {
// sendDelay = Now() - ReminderLastSent - MinElapsed
reminderLastSent := repo.ReminderLastSent
if reminderLastSent.Valid {
r.metrics.SendDelay.Record(ctx, (time.Since(reminderLastSent.Time) - r.cfg.RecurrenceConfig.MinElapsed).Seconds())
} else {
// Recording metric for first time reminders (after how long was the initial reminder sent)
newSendDelay := time.Since(repoToLastUpdated[repo.ID]) - r.cfg.RecurrenceConfig.MinElapsed
r.metrics.NewSendDelay.Record(ctx, newSendDelay.Seconds())
}
}
}

// TODO: Collect Metrics
// Potential metrics:
// - Gauge: Number of reminders in the current batch
// - UpDownCounter: Average reminders sent per batch
// - Histogram: reminder_last_sent time distribution

err = r.store.UpdateReminderLastSentForRepositories(ctx, repoIds)
if err != nil {
return fmt.Errorf("reminders published but error updating last sent time: %w", err)
Expand All @@ -167,7 +194,7 @@ func (r *reminder) sendReminders(ctx context.Context) error {
return nil
}

func (r *reminder) getRepositoryBatch(ctx context.Context) ([]db.Repository, error) {
func (r *reminder) getRepositoryBatch(ctx context.Context) ([]db.Repository, map[uuid.UUID]time.Time, error) {
logger := zerolog.Ctx(ctx)

logger.Debug().Msgf("fetching repositories after cursor: %s", r.repositoryCursor)
Expand All @@ -176,22 +203,25 @@ func (r *reminder) getRepositoryBatch(ctx context.Context) ([]db.Repository, err
Limit: int64(r.cfg.RecurrenceConfig.BatchSize),
})
if err != nil {
return nil, err
return nil, nil, err
}

eligibleRepos, err := r.getEligibleRepositories(ctx, repos)
eligibleRepos, eligibleReposLastUpdated, err := r.getEligibleRepositories(ctx, repos)
if err != nil {
return nil, err
return nil, nil, err
}
logger.Debug().Msgf("%d/%d repositories are eligible for reminders", len(eligibleRepos), len(repos))

r.updateRepositoryCursor(ctx, repos)

return eligibleRepos, nil
return eligibleRepos, eligibleReposLastUpdated, nil
}

func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repository) ([]db.Repository, error) {
func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repository) (
[]db.Repository, map[uuid.UUID]time.Time, error,
) {
eligibleRepos := make([]db.Repository, 0, len(repos))
eligibleReposLastUpdated := make(map[uuid.UUID]time.Time, len(repos))

// We have a slice of repositories, but the sqlc-generated code wants a slice of UUIDs,
// and similarly returns slices of ID -> date (in possibly different order), so we need
Expand All @@ -202,7 +232,7 @@ func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repos
}
oldestRuleEvals, err := r.store.ListOldestRuleEvaluationsByRepositoryId(ctx, repoIds)
if err != nil {
return nil, err
return nil, nil, err
}
idToLastUpdate := make(map[uuid.UUID]time.Time, len(oldestRuleEvals))
for _, times := range oldestRuleEvals {
Expand All @@ -213,10 +243,11 @@ func (r *reminder) getEligibleRepositories(ctx context.Context, repos []db.Repos
for _, repo := range repos {
if t, ok := idToLastUpdate[repo.ID]; ok && t.Before(cutoff) {
eligibleRepos = append(eligibleRepos, repo)
eligibleReposLastUpdated[repo.ID] = t
}
}

return eligibleRepos, nil
return eligibleRepos, eligibleReposLastUpdated, nil
}

func (r *reminder) updateRepositoryCursor(ctx context.Context, repos []db.Repository) {
Expand Down
2 changes: 1 addition & 1 deletion internal/reminder/reminder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func Test_getRepositoryBatch(t *testing.T) {

r := createTestReminder(t, store, cfg)

got, err := r.getRepositoryBatch(context.Background())
got, _, err := r.getRepositoryBatch(context.Background())
if test.err != "" {
require.ErrorContains(t, err, test.err)
return
Expand Down
10 changes: 6 additions & 4 deletions pkg/config/reminder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ import (

// Config contains the configuration for the reminder service
type Config struct {
Database config.DatabaseConfig `mapstructure:"database"`
RecurrenceConfig RecurrenceConfig `mapstructure:"recurrence"`
EventConfig serverconfig.EventConfig `mapstructure:"events"`
LoggingConfig LoggingConfig `mapstructure:"logging"`
Database config.DatabaseConfig `mapstructure:"database"`
RecurrenceConfig RecurrenceConfig `mapstructure:"recurrence"`
EventConfig serverconfig.EventConfig `mapstructure:"events"`
LoggingConfig LoggingConfig `mapstructure:"logging"`
MetricsConfig serverconfig.MetricsConfig `mapstructure:"metrics"`
MetricServer serverconfig.MetricServerConfig `mapstructure:"metric_server" default:"{\"port\":\"9091\"}"`
}

// Validate validates the configuration
Expand Down

0 comments on commit ccde7be

Please sign in to comment.