Skip to content

Commit

Permalink
Add some more metics to the ingesters (#3782)
Browse files Browse the repository at this point in the history
* wip

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* add metrics

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* add metrics

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* fix tests

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* fix tests

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* store average time to change row

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* better metric name

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* better buckets, add histo per operation

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

* lint

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>

---------

Signed-off-by: Chris Martin <chris@cmartinit.co.uk>
Co-authored-by: Chris Martin <chris@cmartinit.co.uk>
  • Loading branch information
d80tb7 and d80tb7 authored Jul 12, 2024
1 parent 6da3fbd commit 32c85fc
Show file tree
Hide file tree
Showing 13 changed files with 146 additions and 47 deletions.
10 changes: 8 additions & 2 deletions internal/common/ingest/ingestion_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (i *IngestionPipeline[T]) Run(ctx *armadacontext.Context) error {
preprocessedBatchEventSequences := make(chan *EventSequencesWithIds)
go func() {
for msg := range batchedEventSequences {
logSummaryOfEventSequences(msg)
i.recordSummaryOfEventSequences(msg)
preprocessedBatchEventSequences <- msg
}
close(preprocessedBatchEventSequences)
Expand Down Expand Up @@ -214,6 +214,7 @@ func (i *IngestionPipeline[T]) Run(ctx *armadacontext.Context) error {
time.Sleep(i.pulsarConfig.BackoffTime)
},
)
i.metrics.RecordPulsarMessageProcessed()
}
}
}
Expand Down Expand Up @@ -291,14 +292,19 @@ func combineEventSequences(sequences []*EventSequencesWithIds) *EventSequencesWi
}
}

func logSummaryOfEventSequences(sequence *EventSequencesWithIds) {
func (i *IngestionPipeline[T]) recordSummaryOfEventSequences(sequence *EventSequencesWithIds) {
numberOfEvents := 0
countOfEventsByType := map[string]int{}
for _, eventSequence := range sequence.EventSequences {
numberOfEvents += len(eventSequence.Events)
for _, e := range eventSequence.Events {
typeString := e.GetEventName()
countOfEventsByType[typeString] = countOfEventsByType[typeString] + 1

// Record the event sequence as processed here. This is technically not true as we haven't finished
// processing yet but it saves us having to pass all these details down the pipeline and as the
// pipeline is single threaded, the error here should be inconsequential.
i.metrics.RecordEventSequenceProcessed(eventSequence.Queue, typeString)
}
}
log.Infof("Batch being processed contains %d event messages and %d events of type %v", len(sequence.MessageIds), numberOfEvents, countOfEventsByType)
Expand Down
34 changes: 27 additions & 7 deletions internal/common/ingest/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@ const (
)

const (
ArmadaLookoutIngesterMetricsPrefix = "armada_lookout_ingester_"
ArmadaLookoutIngesterV2MetricsPrefix = "armada_lookout_ingester_v2_"
ArmadaEventIngesterMetricsPrefix = "armada_event_ingester_"
)

type Metrics struct {
dbErrorsCounter *prometheus.CounterVec
pulsarConnectionError prometheus.Counter
pulsarMessageError *prometheus.CounterVec
dbErrorsCounter *prometheus.CounterVec
pulsarConnectionError prometheus.Counter
pulsarMessageError *prometheus.CounterVec
pulsarMessagesProcessed prometheus.Counter
eventsProcessed *prometheus.CounterVec
}

func NewMetrics(prefix string) *Metrics {
Expand All @@ -44,10 +45,21 @@ func NewMetrics(prefix string) *Metrics {
Name: prefix + "pulsar_connection_errors",
Help: "Number of Pulsar connection errors",
}
pulsarMessagesProcessedOpts := prometheus.CounterOpts{
Name: prefix + "pulsar_messages_processed",
Help: "Number of pulsar messages processed",
}
eventsProcessedOpts := prometheus.CounterOpts{
Name: prefix + "events_processed",
Help: "Number of events processed",
}

return &Metrics{
dbErrorsCounter: promauto.NewCounterVec(dbErrorsCounterOpts, []string{"operation"}),
pulsarMessageError: promauto.NewCounterVec(pulsarMessageErrorOpts, []string{"error"}),
pulsarConnectionError: promauto.NewCounter(pulsarConnectionErrorOpts),
dbErrorsCounter: promauto.NewCounterVec(dbErrorsCounterOpts, []string{"operation"}),
pulsarMessageError: promauto.NewCounterVec(pulsarMessageErrorOpts, []string{"error"}),
pulsarConnectionError: promauto.NewCounter(pulsarConnectionErrorOpts),
pulsarMessagesProcessed: promauto.NewCounter(pulsarMessagesProcessedOpts),
eventsProcessed: promauto.NewCounterVec(eventsProcessedOpts, []string{"queue", "msgType"}),
}
}

Expand All @@ -62,3 +74,11 @@ func (m *Metrics) RecordPulsarMessageError(error PulsarMessageError) {
func (m *Metrics) RecordPulsarConnectionError() {
m.pulsarConnectionError.Inc()
}

func (m *Metrics) RecordPulsarMessageProcessed() {
m.pulsarMessagesProcessed.Inc()
}

func (m *Metrics) RecordEventSequenceProcessed(queue, msgTpe string) {
m.eventsProcessed.With(map[string]string{"queue": queue, "msgType": msgTpe}).Inc()
}
4 changes: 2 additions & 2 deletions internal/lookoutingesterv2/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func Run(config *configuration.LookoutIngesterV2Configuration) {
}()
}

converter := instructions.NewInstructionConverter(m, config.UserAnnotationPrefix, compressor)
converter := instructions.NewInstructionConverter(m.Metrics, config.UserAnnotationPrefix, compressor)

ingester := ingest.NewIngestionPipeline[*model.InstructionSet](
config.Pulsar,
Expand All @@ -71,7 +71,7 @@ func Run(config *configuration.LookoutIngesterV2Configuration) {
converter,
lookoutDb,
config.MetricsPort,
m,
m.Metrics,
)

if err := ingester.Run(app.CreateContextWithShutdown()); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/lookoutingesterv2/instructions/instructions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ func TestConvert(t *testing.T) {
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
converter := NewInstructionConverter(metrics.Get(), userAnnotationPrefix, &compress.NoOpCompressor{})
converter := NewInstructionConverter(metrics.Get().Metrics, userAnnotationPrefix, &compress.NoOpCompressor{})
decompressor := &compress.NoOpDecompressor{}
instructionSet := converter.Convert(armadacontext.TODO(), tc.events)
require.Equal(t, len(tc.expected.JobsToCreate), len(instructionSet.JobsToCreate))
Expand Down Expand Up @@ -542,7 +542,7 @@ func TestTruncatesStringsThatAreTooLong(t *testing.T) {
MessageIds: []pulsar.MessageID{pulsarutils.NewMessageId(1)},
}

converter := NewInstructionConverter(metrics.Get(), userAnnotationPrefix, &compress.NoOpCompressor{})
converter := NewInstructionConverter(metrics.Get().Metrics, userAnnotationPrefix, &compress.NoOpCompressor{})
actual := converter.Convert(armadacontext.TODO(), events)

// String lengths obtained from database schema
Expand Down
69 changes: 47 additions & 22 deletions internal/lookoutingesterv2/lookoutdb/insertion.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/armadaerrors"
"github.com/armadaproject/armada/internal/common/database/lookout"
"github.com/armadaproject/armada/internal/common/ingest/metrics"
commonmetrics "github.com/armadaproject/armada/internal/common/ingest/metrics"
"github.com/armadaproject/armada/internal/lookoutingesterv2/metrics"
"github.com/armadaproject/armada/internal/lookoutingesterv2/model"
)

Expand Down Expand Up @@ -46,6 +47,10 @@ func (l *LookoutDb) Store(ctx *armadacontext.Context, instructions *model.Instru
jobsToUpdate := conflateJobUpdates(instructions.JobsToUpdate)
jobRunsToUpdate := conflateJobRunUpdates(instructions.JobRunsToUpdate)

numRowsToChange := len(instructions.JobsToCreate) + len(jobsToUpdate) + len(instructions.JobRunsToCreate) +
len(jobRunsToUpdate) + len(instructions.JobErrorsToCreate)

start := time.Now()
// Jobs need to be ingested first as other updates may reference these
l.CreateJobs(ctx, instructions.JobsToCreate)

Expand All @@ -69,6 +74,11 @@ func (l *LookoutDb) Store(ctx *armadacontext.Context, instructions *model.Instru

// Finally, we can update the job runs
l.UpdateJobRuns(ctx, jobRunsToUpdate)

taken := time.Since(start)
if numRowsToChange != 0 && taken != 0 {
l.metrics.RecordAvRowChangeTime(numRowsToChange, taken)
}
return nil
}

Expand All @@ -82,7 +92,10 @@ func (l *LookoutDb) CreateJobs(ctx *armadacontext.Context, instructions []*model
log.WithError(err).Warn("Creating jobs via batch failed, will attempt to insert serially (this might be slow).")
l.CreateJobsScalar(ctx, instructions)
}
log.Infof("Inserted %d jobs in %s", len(instructions), time.Since(start))
taken := time.Since(start)
l.metrics.RecordAvRowChangeTimeByOperation("job", commonmetrics.DBOperationInsert, len(instructions), taken)
l.metrics.RecordRowsChange("job", commonmetrics.DBOperationInsert, len(instructions))
log.Infof("Inserted %d jobs in %s", len(instructions), taken)
}

func (l *LookoutDb) UpdateJobs(ctx *armadacontext.Context, instructions []*model.UpdateJobInstruction) {
Expand All @@ -96,7 +109,10 @@ func (l *LookoutDb) UpdateJobs(ctx *armadacontext.Context, instructions []*model
log.WithError(err).Warn("Updating jobs via batch failed, will attempt to insert serially (this might be slow).")
l.UpdateJobsScalar(ctx, instructions)
}
log.Infof("Updated %d jobs in %s", len(instructions), time.Since(start))
taken := time.Since(start)
l.metrics.RecordAvRowChangeTimeByOperation("job", commonmetrics.DBOperationUpdate, len(instructions), taken)
l.metrics.RecordRowsChange("job", commonmetrics.DBOperationUpdate, len(instructions))
log.Infof("Updated %d jobs in %s", len(instructions), taken)
}

func (l *LookoutDb) CreateJobRuns(ctx *armadacontext.Context, instructions []*model.CreateJobRunInstruction) {
Expand All @@ -109,7 +125,10 @@ func (l *LookoutDb) CreateJobRuns(ctx *armadacontext.Context, instructions []*mo
log.WithError(err).Warn("Creating job runs via batch failed, will attempt to insert serially (this might be slow).")
l.CreateJobRunsScalar(ctx, instructions)
}
log.Infof("Inserted %d job runs in %s", len(instructions), time.Since(start))
taken := time.Since(start)
l.metrics.RecordAvRowChangeTimeByOperation("job_run", commonmetrics.DBOperationInsert, len(instructions), taken)
l.metrics.RecordRowsChange("job_run", commonmetrics.DBOperationInsert, len(instructions))
log.Infof("Inserted %d job runs in %s", len(instructions), taken)
}

func (l *LookoutDb) UpdateJobRuns(ctx *armadacontext.Context, instructions []*model.UpdateJobRunInstruction) {
Expand All @@ -122,7 +141,10 @@ func (l *LookoutDb) UpdateJobRuns(ctx *armadacontext.Context, instructions []*mo
log.WithError(err).Warn("Updating job runs via batch failed, will attempt to insert serially (this might be slow).")
l.UpdateJobRunsScalar(ctx, instructions)
}
log.Infof("Updated %d job runs in %s", len(instructions), time.Since(start))
taken := time.Since(start)
l.metrics.RecordAvRowChangeTimeByOperation("job_run", commonmetrics.DBOperationUpdate, len(instructions), taken)
l.metrics.RecordRowsChange("job_run", commonmetrics.DBOperationUpdate, len(instructions))
log.Infof("Updated %d job runs in %s", len(instructions), taken)
}

func (l *LookoutDb) CreateJobErrors(ctx *armadacontext.Context, instructions []*model.CreateJobErrorInstruction) {
Expand All @@ -135,7 +157,10 @@ func (l *LookoutDb) CreateJobErrors(ctx *armadacontext.Context, instructions []*
log.WithError(err).Warn("Creating job errors via batch failed, will attempt to insert serially (this might be slow).")
l.CreateJobErrorsScalar(ctx, instructions)
}
log.Infof("Inserted %d job errors in %s", len(instructions), time.Since(start))
taken := time.Since(start)
l.metrics.RecordAvRowChangeTimeByOperation("job_error", commonmetrics.DBOperationInsert, len(instructions), taken)
l.metrics.RecordRowsChange("job_error", commonmetrics.DBOperationInsert, len(instructions))
log.Infof("Inserted %d job errors in %s", len(instructions), taken)
}

func (l *LookoutDb) CreateJobsBatch(ctx *armadacontext.Context, instructions []*model.CreateJobInstruction) error {
Expand Down Expand Up @@ -165,7 +190,7 @@ func (l *LookoutDb) CreateJobsBatch(ctx *armadacontext.Context, instructions []*
annotations jsonb
) ON COMMIT DROP;`, tmpTable))
if err != nil {
l.metrics.RecordDBError(metrics.DBOperationCreateTempTable)
l.metrics.RecordDBError(commonmetrics.DBOperationCreateTempTable)
}
return err
}
Expand Down Expand Up @@ -243,7 +268,7 @@ func (l *LookoutDb) CreateJobsBatch(ctx *armadacontext.Context, instructions []*
ON CONFLICT DO NOTHING`, tmpTable),
)
if err != nil {
l.metrics.RecordDBError(metrics.DBOperationInsert)
l.metrics.RecordDBError(commonmetrics.DBOperationInsert)
}
return err
}
Expand Down Expand Up @@ -297,7 +322,7 @@ func (l *LookoutDb) CreateJobsScalar(ctx *armadacontext.Context, instructions []
i.Annotations,
)
if err != nil {
l.metrics.RecordDBError(metrics.DBOperationInsert)
l.metrics.RecordDBError(commonmetrics.DBOperationInsert)
}
return err
})
Expand Down Expand Up @@ -325,7 +350,7 @@ func (l *LookoutDb) UpdateJobsBatch(ctx *armadacontext.Context, instructions []*
cancel_reason varchar(512)
) ON COMMIT DROP;`, tmpTable))
if err != nil {
l.metrics.RecordDBError(metrics.DBOperationCreateTempTable)
l.metrics.RecordDBError(commonmetrics.DBOperationCreateTempTable)
}
return err
}
Expand Down Expand Up @@ -377,7 +402,7 @@ func (l *LookoutDb) UpdateJobsBatch(ctx *armadacontext.Context, instructions []*
FROM %s as tmp WHERE tmp.job_id = job.job_id`, tmpTable),
)
if err != nil {
l.metrics.RecordDBError(metrics.DBOperationUpdate)
l.metrics.RecordDBError(commonmetrics.DBOperationUpdate)
}
return err
}
Expand Down Expand Up @@ -411,7 +436,7 @@ func (l *LookoutDb) UpdateJobsScalar(ctx *armadacontext.Context, instructions []
i.LatestRunId,
i.CancelReason)
if err != nil {
l.metrics.RecordDBError(metrics.DBOperationUpdate)
l.metrics.RecordDBError(commonmetrics.DBOperationUpdate)
}
return err
})
Expand All @@ -437,7 +462,7 @@ func (l *LookoutDb) CreateJobRunsBatch(ctx *armadacontext.Context, instructions
job_run_state smallint
) ON COMMIT DROP;`, tmpTable))
if err != nil {
l.metrics.RecordDBError(metrics.DBOperationCreateTempTable)
l.metrics.RecordDBError(commonmetrics.DBOperationCreateTempTable)
}
return err
}
Expand Down Expand Up @@ -484,7 +509,7 @@ func (l *LookoutDb) CreateJobRunsBatch(ctx *armadacontext.Context, instructions
) SELECT * from %s
ON CONFLICT DO NOTHING`, tmpTable))
if err != nil {
l.metrics.RecordDBError(metrics.DBOperationInsert)
l.metrics.RecordDBError(commonmetrics.DBOperationInsert)
}
return err
}
Expand Down Expand Up @@ -514,7 +539,7 @@ func (l *LookoutDb) CreateJobRunsScalar(ctx *armadacontext.Context, instructions
i.Pending,
i.JobRunState)
if err != nil {
l.metrics.RecordDBError(metrics.DBOperationInsert)
l.metrics.RecordDBError(commonmetrics.DBOperationInsert)
}
return err
})
Expand Down Expand Up @@ -542,7 +567,7 @@ func (l *LookoutDb) UpdateJobRunsBatch(ctx *armadacontext.Context, instructions
exit_code int
) ON COMMIT DROP;`, tmpTable))
if err != nil {
l.metrics.RecordDBError(metrics.DBOperationCreateTempTable)
l.metrics.RecordDBError(commonmetrics.DBOperationCreateTempTable)
}
return err
}
Expand Down Expand Up @@ -594,7 +619,7 @@ func (l *LookoutDb) UpdateJobRunsBatch(ctx *armadacontext.Context, instructions
FROM %s as tmp where tmp.run_id = job_run.run_id`, tmpTable),
)
if err != nil {
l.metrics.RecordDBError(metrics.DBOperationUpdate)
l.metrics.RecordDBError(commonmetrics.DBOperationUpdate)
}
return err
}
Expand Down Expand Up @@ -628,7 +653,7 @@ func (l *LookoutDb) UpdateJobRunsScalar(ctx *armadacontext.Context, instructions
i.Pending,
i.Debug)
if err != nil {
l.metrics.RecordDBError(metrics.DBOperationUpdate)
l.metrics.RecordDBError(commonmetrics.DBOperationUpdate)
}
return err
})
Expand All @@ -648,7 +673,7 @@ func (l *LookoutDb) CreateJobErrorsBatch(ctx *armadacontext.Context, instruction
error bytea
) ON COMMIT DROP;`, tmpTable))
if err != nil {
l.metrics.RecordDBError(metrics.DBOperationCreateTempTable)
l.metrics.RecordDBError(commonmetrics.DBOperationCreateTempTable)
}
return err
}
Expand Down Expand Up @@ -680,7 +705,7 @@ func (l *LookoutDb) CreateJobErrorsBatch(ctx *armadacontext.Context, instruction
) SELECT * from %s
ON CONFLICT DO NOTHING`, tmpTable))
if err != nil {
l.metrics.RecordDBError(metrics.DBOperationInsert)
l.metrics.RecordDBError(commonmetrics.DBOperationInsert)
}
return err
}
Expand All @@ -698,7 +723,7 @@ func (l *LookoutDb) CreateJobErrorsScalar(ctx *armadacontext.Context, instructio
i.JobId,
i.Error)
if err != nil {
l.metrics.RecordDBError(metrics.DBOperationInsert)
l.metrics.RecordDBError(commonmetrics.DBOperationInsert)
}
return err
})
Expand Down Expand Up @@ -871,7 +896,7 @@ func (l *LookoutDb) filterEventsForTerminalJobs(
return db.Query(ctx, "SELECT DISTINCT job_id, state FROM JOB where state = any($1) AND job_id = any($2)", terminalStates, jobIds)
})
if err != nil {
m.RecordDBError(metrics.DBOperationRead)
m.RecordDBError(commonmetrics.DBOperationRead)
log.WithError(err).Warnf("Cannot retrieve job state from the database- Cancelled jobs may not be filtered out")
return instructions
}
Expand Down
Loading

0 comments on commit 32c85fc

Please sign in to comment.