diff --git a/internal/common/ingest/ingestion_pipeline.go b/internal/common/ingest/ingestion_pipeline.go index 3cb42156b00..792f5bc303f 100644 --- a/internal/common/ingest/ingestion_pipeline.go +++ b/internal/common/ingest/ingestion_pipeline.go @@ -169,7 +169,7 @@ func (i *IngestionPipeline[T]) Run(ctx *armadacontext.Context) error { preprocessedBatchEventSequences := make(chan *EventSequencesWithIds) go func() { for msg := range batchedEventSequences { - logSummaryOfEventSequences(msg) + i.recordSummaryOfEventSequences(msg) preprocessedBatchEventSequences <- msg } close(preprocessedBatchEventSequences) @@ -214,6 +214,7 @@ func (i *IngestionPipeline[T]) Run(ctx *armadacontext.Context) error { time.Sleep(i.pulsarConfig.BackoffTime) }, ) + i.metrics.RecordPulsarMessageProcessed() } } } @@ -291,7 +292,7 @@ func combineEventSequences(sequences []*EventSequencesWithIds) *EventSequencesWi } } -func logSummaryOfEventSequences(sequence *EventSequencesWithIds) { +func (i *IngestionPipeline[T]) recordSummaryOfEventSequences(sequence *EventSequencesWithIds) { numberOfEvents := 0 countOfEventsByType := map[string]int{} for _, eventSequence := range sequence.EventSequences { @@ -299,6 +300,11 @@ func logSummaryOfEventSequences(sequence *EventSequencesWithIds) { for _, e := range eventSequence.Events { typeString := e.GetEventName() countOfEventsByType[typeString] = countOfEventsByType[typeString] + 1 + + // Record the event sequence as processed here. This is technically not true as we haven't finished + // processing yet but it saves us having to pass all these details down the pipeline and as the + // pipeline is single threaded, the error here should be inconsequential. + i.metrics.RecordEventSequenceProcessed(eventSequence.Queue, typeString) } } log.Infof("Batch being processed contains %d event messages and %d events of type %v", len(sequence.MessageIds), numberOfEvents, countOfEventsByType) diff --git a/internal/common/ingest/metrics/metrics.go b/internal/common/ingest/metrics/metrics.go index a4e87dd6332..a3ccc4c7ac5 100644 --- a/internal/common/ingest/metrics/metrics.go +++ b/internal/common/ingest/metrics/metrics.go @@ -20,15 +20,16 @@ const ( ) const ( - ArmadaLookoutIngesterMetricsPrefix = "armada_lookout_ingester_" ArmadaLookoutIngesterV2MetricsPrefix = "armada_lookout_ingester_v2_" ArmadaEventIngesterMetricsPrefix = "armada_event_ingester_" ) type Metrics struct { - dbErrorsCounter *prometheus.CounterVec - pulsarConnectionError prometheus.Counter - pulsarMessageError *prometheus.CounterVec + dbErrorsCounter *prometheus.CounterVec + pulsarConnectionError prometheus.Counter + pulsarMessageError *prometheus.CounterVec + pulsarMessagesProcessed prometheus.Counter + eventsProcessed *prometheus.CounterVec } func NewMetrics(prefix string) *Metrics { @@ -44,10 +45,21 @@ func NewMetrics(prefix string) *Metrics { Name: prefix + "pulsar_connection_errors", Help: "Number of Pulsar connection errors", } + pulsarMessagesProcessedOpts := prometheus.CounterOpts{ + Name: prefix + "pulsar_messages_processed", + Help: "Number of pulsar messages processed", + } + eventsProcessedOpts := prometheus.CounterOpts{ + Name: prefix + "events_processed", + Help: "Number of events processed", + } + return &Metrics{ - dbErrorsCounter: promauto.NewCounterVec(dbErrorsCounterOpts, []string{"operation"}), - pulsarMessageError: promauto.NewCounterVec(pulsarMessageErrorOpts, []string{"error"}), - pulsarConnectionError: promauto.NewCounter(pulsarConnectionErrorOpts), + dbErrorsCounter: promauto.NewCounterVec(dbErrorsCounterOpts, []string{"operation"}), + pulsarMessageError: promauto.NewCounterVec(pulsarMessageErrorOpts, []string{"error"}), + pulsarConnectionError: promauto.NewCounter(pulsarConnectionErrorOpts), + pulsarMessagesProcessed: promauto.NewCounter(pulsarMessagesProcessedOpts), + eventsProcessed: promauto.NewCounterVec(eventsProcessedOpts, []string{"queue", "msgType"}), } } @@ -62,3 +74,11 @@ func (m *Metrics) RecordPulsarMessageError(error PulsarMessageError) { func (m *Metrics) RecordPulsarConnectionError() { m.pulsarConnectionError.Inc() } + +func (m *Metrics) RecordPulsarMessageProcessed() { + m.pulsarMessagesProcessed.Inc() +} + +func (m *Metrics) RecordEventSequenceProcessed(queue, msgTpe string) { + m.eventsProcessed.With(map[string]string{"queue": queue, "msgType": msgTpe}).Inc() +} diff --git a/internal/lookoutingesterv2/ingester.go b/internal/lookoutingesterv2/ingester.go index a2c3d041411..8e133be859c 100644 --- a/internal/lookoutingesterv2/ingester.go +++ b/internal/lookoutingesterv2/ingester.go @@ -60,7 +60,7 @@ func Run(config *configuration.LookoutIngesterV2Configuration) { }() } - converter := instructions.NewInstructionConverter(m, config.UserAnnotationPrefix, compressor) + converter := instructions.NewInstructionConverter(m.Metrics, config.UserAnnotationPrefix, compressor) ingester := ingest.NewIngestionPipeline[*model.InstructionSet]( config.Pulsar, @@ -71,7 +71,7 @@ func Run(config *configuration.LookoutIngesterV2Configuration) { converter, lookoutDb, config.MetricsPort, - m, + m.Metrics, ) if err := ingester.Run(app.CreateContextWithShutdown()); err != nil { diff --git a/internal/lookoutingesterv2/instructions/instructions_test.go b/internal/lookoutingesterv2/instructions/instructions_test.go index 4e2844b08ec..099e53e2ff0 100644 --- a/internal/lookoutingesterv2/instructions/instructions_test.go +++ b/internal/lookoutingesterv2/instructions/instructions_test.go @@ -476,7 +476,7 @@ func TestConvert(t *testing.T) { } for name, tc := range tests { t.Run(name, func(t *testing.T) { - converter := NewInstructionConverter(metrics.Get(), userAnnotationPrefix, &compress.NoOpCompressor{}) + converter := NewInstructionConverter(metrics.Get().Metrics, userAnnotationPrefix, &compress.NoOpCompressor{}) decompressor := &compress.NoOpDecompressor{} instructionSet := converter.Convert(armadacontext.TODO(), tc.events) require.Equal(t, len(tc.expected.JobsToCreate), len(instructionSet.JobsToCreate)) @@ -542,7 +542,7 @@ func TestTruncatesStringsThatAreTooLong(t *testing.T) { MessageIds: []pulsar.MessageID{pulsarutils.NewMessageId(1)}, } - converter := NewInstructionConverter(metrics.Get(), userAnnotationPrefix, &compress.NoOpCompressor{}) + converter := NewInstructionConverter(metrics.Get().Metrics, userAnnotationPrefix, &compress.NoOpCompressor{}) actual := converter.Convert(armadacontext.TODO(), events) // String lengths obtained from database schema diff --git a/internal/lookoutingesterv2/lookoutdb/insertion.go b/internal/lookoutingesterv2/lookoutdb/insertion.go index 985ab99b2f6..481c888bf45 100644 --- a/internal/lookoutingesterv2/lookoutdb/insertion.go +++ b/internal/lookoutingesterv2/lookoutdb/insertion.go @@ -13,7 +13,8 @@ import ( "github.com/armadaproject/armada/internal/common/armadacontext" "github.com/armadaproject/armada/internal/common/armadaerrors" "github.com/armadaproject/armada/internal/common/database/lookout" - "github.com/armadaproject/armada/internal/common/ingest/metrics" + commonmetrics "github.com/armadaproject/armada/internal/common/ingest/metrics" + "github.com/armadaproject/armada/internal/lookoutingesterv2/metrics" "github.com/armadaproject/armada/internal/lookoutingesterv2/model" ) @@ -46,6 +47,10 @@ func (l *LookoutDb) Store(ctx *armadacontext.Context, instructions *model.Instru jobsToUpdate := conflateJobUpdates(instructions.JobsToUpdate) jobRunsToUpdate := conflateJobRunUpdates(instructions.JobRunsToUpdate) + numRowsToChange := len(instructions.JobsToCreate) + len(jobsToUpdate) + len(instructions.JobRunsToCreate) + + len(jobRunsToUpdate) + len(instructions.JobErrorsToCreate) + + start := time.Now() // Jobs need to be ingested first as other updates may reference these l.CreateJobs(ctx, instructions.JobsToCreate) @@ -69,6 +74,11 @@ func (l *LookoutDb) Store(ctx *armadacontext.Context, instructions *model.Instru // Finally, we can update the job runs l.UpdateJobRuns(ctx, jobRunsToUpdate) + + taken := time.Since(start) + if numRowsToChange != 0 && taken != 0 { + l.metrics.RecordAvRowChangeTime(numRowsToChange, taken) + } return nil } @@ -82,7 +92,10 @@ func (l *LookoutDb) CreateJobs(ctx *armadacontext.Context, instructions []*model log.WithError(err).Warn("Creating jobs via batch failed, will attempt to insert serially (this might be slow).") l.CreateJobsScalar(ctx, instructions) } - log.Infof("Inserted %d jobs in %s", len(instructions), time.Since(start)) + taken := time.Since(start) + l.metrics.RecordAvRowChangeTimeByOperation("job", commonmetrics.DBOperationInsert, len(instructions), taken) + l.metrics.RecordRowsChange("job", commonmetrics.DBOperationInsert, len(instructions)) + log.Infof("Inserted %d jobs in %s", len(instructions), taken) } func (l *LookoutDb) UpdateJobs(ctx *armadacontext.Context, instructions []*model.UpdateJobInstruction) { @@ -96,7 +109,10 @@ func (l *LookoutDb) UpdateJobs(ctx *armadacontext.Context, instructions []*model log.WithError(err).Warn("Updating jobs via batch failed, will attempt to insert serially (this might be slow).") l.UpdateJobsScalar(ctx, instructions) } - log.Infof("Updated %d jobs in %s", len(instructions), time.Since(start)) + taken := time.Since(start) + l.metrics.RecordAvRowChangeTimeByOperation("job", commonmetrics.DBOperationUpdate, len(instructions), taken) + l.metrics.RecordRowsChange("job", commonmetrics.DBOperationUpdate, len(instructions)) + log.Infof("Updated %d jobs in %s", len(instructions), taken) } func (l *LookoutDb) CreateJobRuns(ctx *armadacontext.Context, instructions []*model.CreateJobRunInstruction) { @@ -109,7 +125,10 @@ func (l *LookoutDb) CreateJobRuns(ctx *armadacontext.Context, instructions []*mo log.WithError(err).Warn("Creating job runs via batch failed, will attempt to insert serially (this might be slow).") l.CreateJobRunsScalar(ctx, instructions) } - log.Infof("Inserted %d job runs in %s", len(instructions), time.Since(start)) + taken := time.Since(start) + l.metrics.RecordAvRowChangeTimeByOperation("job_run", commonmetrics.DBOperationInsert, len(instructions), taken) + l.metrics.RecordRowsChange("job_run", commonmetrics.DBOperationInsert, len(instructions)) + log.Infof("Inserted %d job runs in %s", len(instructions), taken) } func (l *LookoutDb) UpdateJobRuns(ctx *armadacontext.Context, instructions []*model.UpdateJobRunInstruction) { @@ -122,7 +141,10 @@ func (l *LookoutDb) UpdateJobRuns(ctx *armadacontext.Context, instructions []*mo log.WithError(err).Warn("Updating job runs via batch failed, will attempt to insert serially (this might be slow).") l.UpdateJobRunsScalar(ctx, instructions) } - log.Infof("Updated %d job runs in %s", len(instructions), time.Since(start)) + taken := time.Since(start) + l.metrics.RecordAvRowChangeTimeByOperation("job_run", commonmetrics.DBOperationUpdate, len(instructions), taken) + l.metrics.RecordRowsChange("job_run", commonmetrics.DBOperationUpdate, len(instructions)) + log.Infof("Updated %d job runs in %s", len(instructions), taken) } func (l *LookoutDb) CreateJobErrors(ctx *armadacontext.Context, instructions []*model.CreateJobErrorInstruction) { @@ -135,7 +157,10 @@ func (l *LookoutDb) CreateJobErrors(ctx *armadacontext.Context, instructions []* log.WithError(err).Warn("Creating job errors via batch failed, will attempt to insert serially (this might be slow).") l.CreateJobErrorsScalar(ctx, instructions) } - log.Infof("Inserted %d job errors in %s", len(instructions), time.Since(start)) + taken := time.Since(start) + l.metrics.RecordAvRowChangeTimeByOperation("job_error", commonmetrics.DBOperationInsert, len(instructions), taken) + l.metrics.RecordRowsChange("job_error", commonmetrics.DBOperationInsert, len(instructions)) + log.Infof("Inserted %d job errors in %s", len(instructions), taken) } func (l *LookoutDb) CreateJobsBatch(ctx *armadacontext.Context, instructions []*model.CreateJobInstruction) error { @@ -165,7 +190,7 @@ func (l *LookoutDb) CreateJobsBatch(ctx *armadacontext.Context, instructions []* annotations jsonb ) ON COMMIT DROP;`, tmpTable)) if err != nil { - l.metrics.RecordDBError(metrics.DBOperationCreateTempTable) + l.metrics.RecordDBError(commonmetrics.DBOperationCreateTempTable) } return err } @@ -243,7 +268,7 @@ func (l *LookoutDb) CreateJobsBatch(ctx *armadacontext.Context, instructions []* ON CONFLICT DO NOTHING`, tmpTable), ) if err != nil { - l.metrics.RecordDBError(metrics.DBOperationInsert) + l.metrics.RecordDBError(commonmetrics.DBOperationInsert) } return err } @@ -297,7 +322,7 @@ func (l *LookoutDb) CreateJobsScalar(ctx *armadacontext.Context, instructions [] i.Annotations, ) if err != nil { - l.metrics.RecordDBError(metrics.DBOperationInsert) + l.metrics.RecordDBError(commonmetrics.DBOperationInsert) } return err }) @@ -325,7 +350,7 @@ func (l *LookoutDb) UpdateJobsBatch(ctx *armadacontext.Context, instructions []* cancel_reason varchar(512) ) ON COMMIT DROP;`, tmpTable)) if err != nil { - l.metrics.RecordDBError(metrics.DBOperationCreateTempTable) + l.metrics.RecordDBError(commonmetrics.DBOperationCreateTempTable) } return err } @@ -377,7 +402,7 @@ func (l *LookoutDb) UpdateJobsBatch(ctx *armadacontext.Context, instructions []* FROM %s as tmp WHERE tmp.job_id = job.job_id`, tmpTable), ) if err != nil { - l.metrics.RecordDBError(metrics.DBOperationUpdate) + l.metrics.RecordDBError(commonmetrics.DBOperationUpdate) } return err } @@ -411,7 +436,7 @@ func (l *LookoutDb) UpdateJobsScalar(ctx *armadacontext.Context, instructions [] i.LatestRunId, i.CancelReason) if err != nil { - l.metrics.RecordDBError(metrics.DBOperationUpdate) + l.metrics.RecordDBError(commonmetrics.DBOperationUpdate) } return err }) @@ -437,7 +462,7 @@ func (l *LookoutDb) CreateJobRunsBatch(ctx *armadacontext.Context, instructions job_run_state smallint ) ON COMMIT DROP;`, tmpTable)) if err != nil { - l.metrics.RecordDBError(metrics.DBOperationCreateTempTable) + l.metrics.RecordDBError(commonmetrics.DBOperationCreateTempTable) } return err } @@ -484,7 +509,7 @@ func (l *LookoutDb) CreateJobRunsBatch(ctx *armadacontext.Context, instructions ) SELECT * from %s ON CONFLICT DO NOTHING`, tmpTable)) if err != nil { - l.metrics.RecordDBError(metrics.DBOperationInsert) + l.metrics.RecordDBError(commonmetrics.DBOperationInsert) } return err } @@ -514,7 +539,7 @@ func (l *LookoutDb) CreateJobRunsScalar(ctx *armadacontext.Context, instructions i.Pending, i.JobRunState) if err != nil { - l.metrics.RecordDBError(metrics.DBOperationInsert) + l.metrics.RecordDBError(commonmetrics.DBOperationInsert) } return err }) @@ -542,7 +567,7 @@ func (l *LookoutDb) UpdateJobRunsBatch(ctx *armadacontext.Context, instructions exit_code int ) ON COMMIT DROP;`, tmpTable)) if err != nil { - l.metrics.RecordDBError(metrics.DBOperationCreateTempTable) + l.metrics.RecordDBError(commonmetrics.DBOperationCreateTempTable) } return err } @@ -594,7 +619,7 @@ func (l *LookoutDb) UpdateJobRunsBatch(ctx *armadacontext.Context, instructions FROM %s as tmp where tmp.run_id = job_run.run_id`, tmpTable), ) if err != nil { - l.metrics.RecordDBError(metrics.DBOperationUpdate) + l.metrics.RecordDBError(commonmetrics.DBOperationUpdate) } return err } @@ -628,7 +653,7 @@ func (l *LookoutDb) UpdateJobRunsScalar(ctx *armadacontext.Context, instructions i.Pending, i.Debug) if err != nil { - l.metrics.RecordDBError(metrics.DBOperationUpdate) + l.metrics.RecordDBError(commonmetrics.DBOperationUpdate) } return err }) @@ -648,7 +673,7 @@ func (l *LookoutDb) CreateJobErrorsBatch(ctx *armadacontext.Context, instruction error bytea ) ON COMMIT DROP;`, tmpTable)) if err != nil { - l.metrics.RecordDBError(metrics.DBOperationCreateTempTable) + l.metrics.RecordDBError(commonmetrics.DBOperationCreateTempTable) } return err } @@ -680,7 +705,7 @@ func (l *LookoutDb) CreateJobErrorsBatch(ctx *armadacontext.Context, instruction ) SELECT * from %s ON CONFLICT DO NOTHING`, tmpTable)) if err != nil { - l.metrics.RecordDBError(metrics.DBOperationInsert) + l.metrics.RecordDBError(commonmetrics.DBOperationInsert) } return err } @@ -698,7 +723,7 @@ func (l *LookoutDb) CreateJobErrorsScalar(ctx *armadacontext.Context, instructio i.JobId, i.Error) if err != nil { - l.metrics.RecordDBError(metrics.DBOperationInsert) + l.metrics.RecordDBError(commonmetrics.DBOperationInsert) } return err }) @@ -871,7 +896,7 @@ func (l *LookoutDb) filterEventsForTerminalJobs( return db.Query(ctx, "SELECT DISTINCT job_id, state FROM JOB where state = any($1) AND job_id = any($2)", terminalStates, jobIds) }) if err != nil { - m.RecordDBError(metrics.DBOperationRead) + m.RecordDBError(commonmetrics.DBOperationRead) log.WithError(err).Warnf("Cannot retrieve job state from the database- Cancelled jobs may not be filtered out") return instructions } diff --git a/internal/lookoutingesterv2/metrics/metrics.go b/internal/lookoutingesterv2/metrics/metrics.go index c4ee06f0c38..159661bfa52 100644 --- a/internal/lookoutingesterv2/metrics/metrics.go +++ b/internal/lookoutingesterv2/metrics/metrics.go @@ -1,16 +1,64 @@ package metrics import ( + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/armadaproject/armada/internal/common/ingest/metrics" ) -type ( - DBOperation string - PulsarMessageError string +// Lookout ingester specific metrics +var avRowChangeTimeHist = promauto.NewHistogram( + prometheus.HistogramOpts{ + Name: metrics.ArmadaLookoutIngesterV2MetricsPrefix + "average_row_change_time", + Help: "Average time take in milliseconds to change one database row", + Buckets: []float64{0.1, 0.2, 0.5, 1, 2, 3, 5, 7, 10, 15, 25, 50, 100, 1000}, + }, +) + +var avRowChangeTimeByOperationHist = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: metrics.ArmadaLookoutIngesterV2MetricsPrefix + "average_row_change_time_by_operation", + Help: "Average time take in milliseconds to change one database row", + Buckets: []float64{0.1, 0.2, 0.5, 1, 2, 3, 5, 7, 10, 15, 25, 50, 100, 1000}, + }, + []string{"table", "operation"}, ) -var m = metrics.NewMetrics(metrics.ArmadaLookoutIngesterV2MetricsPrefix) +var rowsChangedCounter = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: metrics.ArmadaLookoutIngesterV2MetricsPrefix + "rows_changed", + Help: "Number of rows changed in the database", + }, + []string{"table", "operation"}, +) + +type Metrics struct { + *metrics.Metrics +} -func Get() *metrics.Metrics { +var m = &Metrics{ + metrics.NewMetrics(metrics.ArmadaLookoutIngesterV2MetricsPrefix), +} + +func Get() *Metrics { return m } + +func (m *Metrics) RecordAvRowChangeTime(numRows int, duration time.Duration) { + avRowChangeTimeHist.Observe(float64(numRows) / float64(duration.Microseconds()*1000)) +} + +func (m *Metrics) RecordAvRowChangeTimeByOperation(table string, operation metrics.DBOperation, numRows int, duration time.Duration) { + avRowChangeTimeByOperationHist. + With(map[string]string{"table": table, "operation": string(operation)}). + Observe(float64(numRows) / float64(duration.Microseconds()*1000)) +} + +func (m *Metrics) RecordRowsChange(table string, operation metrics.DBOperation, numRows int) { + rowsChangedCounter. + With(map[string]string{"table": table, "operation": string(operation)}). + Add(float64(numRows)) +} diff --git a/internal/lookoutv2/pruner/pruner_test.go b/internal/lookoutv2/pruner/pruner_test.go index 2e86dd7d88d..3fae15ac9b2 100644 --- a/internal/lookoutv2/pruner/pruner_test.go +++ b/internal/lookoutv2/pruner/pruner_test.go @@ -129,7 +129,7 @@ func TestPruneDb(t *testing.T) { for _, tc := range testCases { t.Run(tc.testName, func(t *testing.T) { err := lookout.WithLookoutDb(func(db *pgxpool.Pool) error { - converter := instructions.NewInstructionConverter(metrics.Get(), "armadaproject.io/", &compress.NoOpCompressor{}) + converter := instructions.NewInstructionConverter(metrics.Get().Metrics, "armadaproject.io/", &compress.NoOpCompressor{}) store := lookoutdb.NewLookoutDb(db, nil, metrics.Get(), 10) ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 5*time.Minute) diff --git a/internal/lookoutv2/repository/getjoberror_test.go b/internal/lookoutv2/repository/getjoberror_test.go index 50042c4b4b1..62599071bd8 100644 --- a/internal/lookoutv2/repository/getjoberror_test.go +++ b/internal/lookoutv2/repository/getjoberror_test.go @@ -16,7 +16,7 @@ import ( func TestGetJobError(t *testing.T) { err := lookout.WithLookoutDb(func(db *pgxpool.Pool) error { - converter := instructions.NewInstructionConverter(metrics.Get(), userAnnotationPrefix, &compress.NoOpCompressor{}) + converter := instructions.NewInstructionConverter(metrics.Get().Metrics, userAnnotationPrefix, &compress.NoOpCompressor{}) store := lookoutdb.NewLookoutDb(db, nil, metrics.Get(), 10) errMsg := "some bad error happened!" _ = NewJobSimulator(converter, store). diff --git a/internal/lookoutv2/repository/getjobrundebugmessage_test.go b/internal/lookoutv2/repository/getjobrundebugmessage_test.go index 6c54671a716..fb18af46b09 100644 --- a/internal/lookoutv2/repository/getjobrundebugmessage_test.go +++ b/internal/lookoutv2/repository/getjobrundebugmessage_test.go @@ -16,7 +16,7 @@ import ( func TestGetJobRunDebugMessage(t *testing.T) { err := lookout.WithLookoutDb(func(db *pgxpool.Pool) error { - converter := instructions.NewInstructionConverter(metrics.Get(), userAnnotationPrefix, &compress.NoOpCompressor{}) + converter := instructions.NewInstructionConverter(metrics.Get().Metrics, userAnnotationPrefix, &compress.NoOpCompressor{}) store := lookoutdb.NewLookoutDb(db, nil, metrics.Get(), 10) debugMessageStrings := []string{ diff --git a/internal/lookoutv2/repository/getjobrunerror_test.go b/internal/lookoutv2/repository/getjobrunerror_test.go index f2eeb99f13f..31320616bcd 100644 --- a/internal/lookoutv2/repository/getjobrunerror_test.go +++ b/internal/lookoutv2/repository/getjobrunerror_test.go @@ -16,7 +16,7 @@ import ( func TestGetJobRunError(t *testing.T) { err := lookout.WithLookoutDb(func(db *pgxpool.Pool) error { - converter := instructions.NewInstructionConverter(metrics.Get(), userAnnotationPrefix, &compress.NoOpCompressor{}) + converter := instructions.NewInstructionConverter(metrics.Get().Metrics, userAnnotationPrefix, &compress.NoOpCompressor{}) store := lookoutdb.NewLookoutDb(db, nil, metrics.Get(), 10) errorStrings := []string{ diff --git a/internal/lookoutv2/repository/getjobs_test.go b/internal/lookoutv2/repository/getjobs_test.go index 620340850a0..2e177a20106 100644 --- a/internal/lookoutv2/repository/getjobs_test.go +++ b/internal/lookoutv2/repository/getjobs_test.go @@ -57,7 +57,7 @@ var ( func withGetJobsSetup(f func(*instructions.InstructionConverter, *lookoutdb.LookoutDb, *SqlGetJobsRepository, *clock.FakeClock) error) error { testClock := clock.NewFakeClock(time.Now()) return lookout.WithLookoutDb(func(db *pgxpool.Pool) error { - converter := instructions.NewInstructionConverter(metrics.Get(), userAnnotationPrefix, &compress.NoOpCompressor{}) + converter := instructions.NewInstructionConverter(metrics.Get().Metrics, userAnnotationPrefix, &compress.NoOpCompressor{}) store := lookoutdb.NewLookoutDb(db, nil, metrics.Get(), 10) repo := NewSqlGetJobsRepository(db) repo.clock = testClock diff --git a/internal/lookoutv2/repository/getjobspec_test.go b/internal/lookoutv2/repository/getjobspec_test.go index 81728bb95db..62e619dbf41 100644 --- a/internal/lookoutv2/repository/getjobspec_test.go +++ b/internal/lookoutv2/repository/getjobspec_test.go @@ -17,7 +17,7 @@ import ( func TestGetJobSpec(t *testing.T) { err := lookout.WithLookoutDb(func(db *pgxpool.Pool) error { - converter := instructions.NewInstructionConverter(metrics.Get(), userAnnotationPrefix, &compress.NoOpCompressor{}) + converter := instructions.NewInstructionConverter(metrics.Get().Metrics, userAnnotationPrefix, &compress.NoOpCompressor{}) store := lookoutdb.NewLookoutDb(db, nil, metrics.Get(), 10) job := NewJobSimulator(converter, store). diff --git a/internal/lookoutv2/repository/groupjobs_test.go b/internal/lookoutv2/repository/groupjobs_test.go index 2a3798677d7..392c9ea61f3 100644 --- a/internal/lookoutv2/repository/groupjobs_test.go +++ b/internal/lookoutv2/repository/groupjobs_test.go @@ -22,7 +22,7 @@ import ( func withGroupJobsSetup(f func(*instructions.InstructionConverter, *lookoutdb.LookoutDb, *SqlGroupJobsRepository) error) error { return lookout.WithLookoutDb(func(db *pgxpool.Pool) error { - converter := instructions.NewInstructionConverter(metrics.Get(), userAnnotationPrefix, &compress.NoOpCompressor{}) + converter := instructions.NewInstructionConverter(metrics.Get().Metrics, userAnnotationPrefix, &compress.NoOpCompressor{}) store := lookoutdb.NewLookoutDb(db, nil, metrics.Get(), 10) repo := NewSqlGroupJobsRepository(db) return f(converter, store, repo)