diff --git a/internal/common/ingest/ingestion_pipeline.go b/internal/common/ingest/ingestion_pipeline.go index 5fa628255d7..160cf677268 100644 --- a/internal/common/ingest/ingestion_pipeline.go +++ b/internal/common/ingest/ingestion_pipeline.go @@ -9,7 +9,6 @@ import ( "github.com/pkg/errors" log "github.com/sirupsen/logrus" - "github.com/armadaproject/armada/internal/common" "github.com/armadaproject/armada/internal/common/armadacontext" commonconfig "github.com/armadaproject/armada/internal/common/config" commonmetrics "github.com/armadaproject/armada/internal/common/ingest/metrics" @@ -63,7 +62,6 @@ type Sink[T HasPulsarMessageIds] interface { // exhausted and a Sink capable of exhausting these objects type IngestionPipeline[T HasPulsarMessageIds, U utils.ArmadaEvent] struct { pulsarConfig commonconfig.PulsarConfig - metricsPort uint16 metrics *commonmetrics.Metrics pulsarTopic string pulsarSubscriptionName string @@ -93,13 +91,11 @@ func NewIngestionPipeline[T HasPulsarMessageIds, U utils.ArmadaEvent]( metricPublisher BatchMetricPublisher[U], converter InstructionConverter[T, U], sink Sink[T], - metricsPort uint16, metrics *commonmetrics.Metrics, ) *IngestionPipeline[T, U] { return &IngestionPipeline[T, U]{ pulsarConfig: pulsarConfig, pulsarTopic: pulsarTopic, - metricsPort: metricsPort, metrics: metrics, pulsarSubscriptionName: pulsarSubscriptionName, pulsarBatchSize: pulsarBatchSize, @@ -116,9 +112,6 @@ func NewIngestionPipeline[T HasPulsarMessageIds, U utils.ArmadaEvent]( // Run will run the ingestion pipeline until the supplied context is shut down func (i *IngestionPipeline[T, U]) Run(ctx *armadacontext.Context) error { - shutdownMetricServer := common.ServeMetrics(i.metricsPort) - defer shutdownMetricServer() - // Waitgroup that wil fire when the pipeline has been torn down wg := &sync.WaitGroup{} wg.Add(1) diff --git a/internal/common/ingest/ingestion_pipeline_test.go b/internal/common/ingest/ingestion_pipeline_test.go index 4fb4a3fc16c..fc3c502c89a 100644 --- a/internal/common/ingest/ingestion_pipeline_test.go +++ b/internal/common/ingest/ingestion_pipeline_test.go @@ -1,6 +1,7 @@ package ingest import ( + "sync" "testing" "time" @@ -263,6 +264,7 @@ func TestRun_ControlPlaneEvents_HappyPath_SingleMessage(t *testing.T) { mockConsumer.assertDidAck(messages) sink.assertDidProcess(messages) + sink.assertProcessedMessageCount(len(messages)) } func TestRun_ControlPlaneEvents_HappyPath_MultipleMessages(t *testing.T) { @@ -286,6 +288,7 @@ func TestRun_ControlPlaneEvents_HappyPath_MultipleMessages(t *testing.T) { mockConsumer.assertDidAck(messages) sink.assertDidProcess(messages) + sink.assertProcessedMessageCount(len(messages)) } func TestRun_ControlPlaneEvents_LimitsProcessingBatchSize(t *testing.T) { @@ -337,6 +340,7 @@ func TestRun_ControlPlaneEvents_LimitsProcessingBatchSize(t *testing.T) { assert.True(t, eventCount < tc.batchSize+1) } sink.assertDidProcess(messages) + sink.assertProcessedMessageCount(len(messages)) }) } } @@ -356,7 +360,6 @@ func testControlPlaneEventsPipeline(consumer pulsar.Consumer, converter Instruct metricPublisher: controlplaneevents_ingest_utils.BatchMetricPublisher, converter: converter, sink: sink, - metricsPort: 8080, metrics: testMetrics, consumer: consumer, } @@ -365,25 +368,30 @@ func testControlPlaneEventsPipeline(consumer pulsar.Consumer, converter Instruct type simpleSink struct { simpleMessages map[pulsar.MessageID]*simpleMessage t *testing.T + mutex sync.Mutex } func newSimpleSink(t *testing.T) *simpleSink { return &simpleSink{ simpleMessages: make(map[pulsar.MessageID]*simpleMessage), t: t, + mutex: sync.Mutex{}, } } func (s *simpleSink) Store(_ *armadacontext.Context, msg *simpleMessages) error { for _, simpleMessage := range msg.msgs { - s.simpleMessages[simpleMessage.id] = simpleMessage + s.mutex.Lock() + if simpleMessage != nil { + s.simpleMessages[simpleMessage.id] = simpleMessage + } + s.mutex.Unlock() } return nil } func (s *simpleSink) assertDidProcess(messages []pulsar.Message) { s.t.Helper() - assert.Len(s.t, s.simpleMessages, len(messages)) for _, msg := range messages { simpleMessage, ok := s.simpleMessages[msg.ID()] assert.True(s.t, ok) @@ -391,6 +399,11 @@ func (s *simpleSink) assertDidProcess(messages []pulsar.Message) { } } +func (s *simpleSink) assertProcessedMessageCount(count int) { + s.t.Helper() + assert.Len(s.t, s.simpleMessages, count) +} + func TestRun_JobSetEvents_HappyPath_SingleMessage(t *testing.T) { ctx, cancel := armadacontext.WithDeadline(armadacontext.Background(), time.Now().Add(10*time.Second)) messages := []pulsar.Message{ @@ -410,6 +423,7 @@ func TestRun_JobSetEvents_HappyPath_SingleMessage(t *testing.T) { mockConsumer.assertDidAck(messages) sink.assertDidProcess(messages) + sink.assertProcessedMessageCount(len(messages)) } func TestRun_JobSetEvents_HappyPath_MultipleMessages(t *testing.T) { @@ -433,6 +447,7 @@ func TestRun_JobSetEvents_HappyPath_MultipleMessages(t *testing.T) { mockConsumer.assertDidAck(messages) sink.assertDidProcess(messages) + sink.assertProcessedMessageCount(len(messages)) } func TestRun_JobSetEvents_LimitsProcessingBatchSize(t *testing.T) { @@ -490,10 +505,67 @@ func TestRun_JobSetEvents_LimitsProcessingBatchSize(t *testing.T) { assert.True(t, eventCount < tc.batchSize+tc.numberOfEventsPerMessage) } sink.assertDidProcess(messages) + sink.assertProcessedMessageCount(len(messages)) }) } } +// This will become a more common use case - multiple ingesters ingesting into the same sink +func TestRun_MultipleSimultaneousIngesters(t *testing.T) { + jsCtx, jsCancel := armadacontext.WithDeadline(armadacontext.Background(), time.Now().Add(10*time.Second)) + cpCtx, cpCancel := armadacontext.WithDeadline(armadacontext.Background(), time.Now().Add(10*time.Second)) + jobSetMessages := []pulsar.Message{ + pulsarutils.NewPulsarMessage(1, baseTime, marshal(t, succeeded)), + pulsarutils.NewPulsarMessage(2, baseTime.Add(1*time.Second), marshal(t, pendingAndRunning)), + pulsarutils.NewPulsarMessage(3, baseTime.Add(2*time.Second), marshal(t, failed)), + } + controlPlaneMessages := []pulsar.Message{ + pulsarutils.NewPulsarMessage(4, baseTime, marshal(t, f.UpsertExecutorSettingsCordon)), + pulsarutils.NewPulsarMessage(5, baseTime.Add(1*time.Second), marshal(t, f.UpsertExecutorSettingsUncordon)), + pulsarutils.NewPulsarMessage(6, baseTime.Add(2*time.Second), marshal(t, f.DeleteExecutorSettings)), + } + mockJobSetEventsConsumer := newMockPulsarConsumer(t, jobSetMessages, jsCancel) + mockControlPlaneEventsConsumer := newMockPulsarConsumer(t, controlPlaneMessages, cpCancel) + + jobSetEventsConverter := newSimpleEventSequenceConverter(t) + controlPlaneEventsConverter := newSimpleControlPlaneEventConverter(t) + + sink := newSimpleSink(t) + + jobSetEventsPipeline := testJobSetEventsPipeline(mockJobSetEventsConsumer, jobSetEventsConverter, sink) + controlPlaneEventsPipeline := testControlPlaneEventsPipeline(mockControlPlaneEventsConsumer, controlPlaneEventsConverter, sink) + + var jsErr error + var cpErr error + wg := sync.WaitGroup{} + start := time.Now() + + wg.Add(1) + go func() { + defer wg.Done() + jsErr = jobSetEventsPipeline.Run(jsCtx) + }() + + wg.Add(1) + go func() { + defer wg.Done() + cpErr = controlPlaneEventsPipeline.Run(cpCtx) + }() + + wg.Wait() + elapsed := time.Since(start) + + assert.NoError(t, jsErr) + assert.NoError(t, cpErr) + assert.LessOrEqual(t, elapsed, batchDuration*2) + + mockJobSetEventsConsumer.assertDidAck(jobSetMessages) + mockControlPlaneEventsConsumer.assertDidAck(controlPlaneMessages) + sink.assertDidProcess(jobSetMessages) + sink.assertDidProcess(controlPlaneMessages) + sink.assertProcessedMessageCount(len(controlPlaneMessages) + len(jobSetMessages)) +} + func testJobSetEventsPipeline(consumer pulsar.Consumer, converter InstructionConverter[*simpleMessages, *armadaevents.EventSequence], sink Sink[*simpleMessages]) *IngestionPipeline[*simpleMessages, *armadaevents.EventSequence] { return &IngestionPipeline[*simpleMessages, *armadaevents.EventSequence]{ pulsarConfig: commonconfig.PulsarConfig{ @@ -509,7 +581,6 @@ func testJobSetEventsPipeline(consumer pulsar.Consumer, converter InstructionCon metricPublisher: jobsetevents.BatchMetricPublisher, converter: converter, sink: sink, - metricsPort: 8080, metrics: testMetrics, consumer: consumer, } diff --git a/internal/common/metrics/scheduler_metrics.go b/internal/common/metrics/scheduler_metrics.go index b6ce130f500..85a686ee6e9 100644 --- a/internal/common/metrics/scheduler_metrics.go +++ b/internal/common/metrics/scheduler_metrics.go @@ -1,6 +1,8 @@ package metrics import ( + "regexp" + "github.com/prometheus/client_golang/prometheus" "golang.org/x/exp/maps" "golang.org/x/exp/slices" @@ -394,8 +396,10 @@ func NewQueueLabelsMetric(queue string, labels map[string]string) prometheus.Met values = append(values, queue) for key, value := range labels { - metricLabels = append(metricLabels, key) - values = append(values, value) + if isValidMetricLabelName(key) { + metricLabels = append(metricLabels, key) + values = append(values, value) + } } queueLabelsDesc := prometheus.NewDesc( @@ -407,3 +411,10 @@ func NewQueueLabelsMetric(queue string, labels map[string]string) prometheus.Met return prometheus.MustNewConstMetric(queueLabelsDesc, prometheus.GaugeValue, 1, values...) } + +func isValidMetricLabelName(labelName string) bool { + // Prometheus metric label names must match the following regex: [a-zA-Z_][a-zA-Z0-9_]* + // See: https://prometheus.io/docs/concepts/data_model/ + match, _ := regexp.MatchString("^[a-zA-Z_][a-zA-Z0-9_]*$", labelName) + return match +} diff --git a/internal/common/metrics/scheduler_metrics_test.go b/internal/common/metrics/scheduler_metrics_test.go new file mode 100644 index 00000000000..7478c4d1505 --- /dev/null +++ b/internal/common/metrics/scheduler_metrics_test.go @@ -0,0 +1,54 @@ +package metrics + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestQueueLabelValidation(t *testing.T) { + tests := map[string]struct { + labelName string + isValid bool + }{ + "Empty label name": { + labelName: "", + isValid: false, + }, + "Priority label": { + labelName: "priority", + isValid: true, + }, + "Label name with underscores": { + labelName: "priority__cpu_pool", + isValid: true, + }, + "Label name with spaces": { + labelName: "priority cpu pool", + isValid: false, + }, + "Alphanumeric label name": { + labelName: "cluster_12_user", + isValid: true, + }, + "Invalid Kubernetes-style label name 1": { + labelName: "armadaproject.io/category", + isValid: false, + }, + "Invalid Kubernetes-style label name 2": { + labelName: "armadaproject.io/ttl", + isValid: false, + }, + "Invalid Kubernetes-style label name 3": { + labelName: "kubernetes.io/metadata.name", + isValid: false, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + isValid := isValidMetricLabelName(tc.labelName) + assert.Equal(t, tc.isValid, isValid) + }) + } +} diff --git a/internal/eventingester/ingester.go b/internal/eventingester/ingester.go index eaef96223d1..b6b7c207cbc 100644 --- a/internal/eventingester/ingester.go +++ b/internal/eventingester/ingester.go @@ -9,6 +9,7 @@ import ( "github.com/redis/go-redis/v9" log "github.com/sirupsen/logrus" + "github.com/armadaproject/armada/internal/common" "github.com/armadaproject/armada/internal/common/app" "github.com/armadaproject/armada/internal/common/armadacontext" "github.com/armadaproject/armada/internal/common/compress" @@ -62,6 +63,10 @@ func Run(config *configuration.EventIngesterConfiguration) { } converter := convert.NewEventConverter(compressor, uint(config.MaxOutputMessageSizeBytes), metrics) + // Start metric server + shutdownMetricServer := common.ServeMetrics(config.MetricsPort) + defer shutdownMetricServer() + ingester := ingest.NewIngestionPipeline[*model.BatchUpdate, *armadaevents.EventSequence]( config.Pulsar, config.Pulsar.JobsetEventsTopic, @@ -75,7 +80,6 @@ func Run(config *configuration.EventIngesterConfiguration) { jobsetevents.BatchMetricPublisher, converter, eventDb, - config.MetricsPort, metrics, ) if err := ingester.Run(app.CreateContextWithShutdown()); err != nil { diff --git a/internal/lookoutingesterv2/ingester.go b/internal/lookoutingesterv2/ingester.go index b3610e6805d..38eceaf7665 100644 --- a/internal/lookoutingesterv2/ingester.go +++ b/internal/lookoutingesterv2/ingester.go @@ -7,6 +7,7 @@ import ( "github.com/pkg/errors" log "github.com/sirupsen/logrus" + "github.com/armadaproject/armada/internal/common" "github.com/armadaproject/armada/internal/common/app" "github.com/armadaproject/armada/internal/common/armadacontext" "github.com/armadaproject/armada/internal/common/compress" @@ -55,6 +56,10 @@ func Run(config *configuration.LookoutIngesterV2Configuration) { log.Fatalf("Pprof setup failed, exiting, %v", err) } + // Start metric server + shutdownMetricServer := common.ServeMetrics(config.MetricsPort) + defer shutdownMetricServer() + converter := instructions.NewInstructionConverter(m.Metrics, config.UserAnnotationPrefix, compressor) ingester := ingest.NewIngestionPipeline[*model.InstructionSet, *armadaevents.EventSequence]( @@ -70,7 +75,6 @@ func Run(config *configuration.LookoutIngesterV2Configuration) { jobsetevents.BatchMetricPublisher, converter, lookoutDb, - config.MetricsPort, m.Metrics, ) diff --git a/internal/scheduleringester/ingester.go b/internal/scheduleringester/ingester.go index 7c536dbc9ad..92e13647b18 100644 --- a/internal/scheduleringester/ingester.go +++ b/internal/scheduleringester/ingester.go @@ -8,6 +8,7 @@ import ( "github.com/pkg/errors" log "github.com/sirupsen/logrus" + "github.com/armadaproject/armada/internal/common" "github.com/armadaproject/armada/internal/common/app" "github.com/armadaproject/armada/internal/common/armadacontext" "github.com/armadaproject/armada/internal/common/database" @@ -43,6 +44,10 @@ func Run(config Configuration) error { log.Fatalf("Pprof setup failed, exiting, %v", err) } + // Start metric server + shutdownMetricServer := common.ServeMetrics(config.MetricsPort) + defer shutdownMetricServer() + jobSetEventsIngester := ingest.NewIngestionPipeline[*DbOperationsWithMessageIds, *armadaevents.EventSequence]( config.Pulsar, config.Pulsar.JobsetEventsTopic, @@ -56,7 +61,6 @@ func Run(config Configuration) error { jobsetevents.BatchMetricPublisher, jobSetEventsConverter, schedulerDb, - config.MetricsPort, svcMetrics, ) @@ -78,7 +82,6 @@ func Run(config Configuration) error { controlplaneevents_ingest_utils.BatchMetricPublisher, controlPlaneEventsConverter, schedulerDb, - config.MetricsPort, svcMetrics, )