Skip to content

Commit

Permalink
Bug fixes (#259) (#4013)
Browse files Browse the repository at this point in the history
* Starting metric server before running ingestion pipeline

Fixes a bug where mutliple ingestion pipelines attempt to start the same metrics server, resulting in a panic

* Validating queue label metric names

Co-authored-by: Mustafa Ilyas <Mustafa.Ilyas@gresearch.co.uk>
  • Loading branch information
MustafaI and mustafai-gr authored Oct 21, 2024
1 parent 421dc6d commit 7115d29
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 17 deletions.
7 changes: 0 additions & 7 deletions internal/common/ingest/ingestion_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"

"github.com/armadaproject/armada/internal/common"
"github.com/armadaproject/armada/internal/common/armadacontext"
commonconfig "github.com/armadaproject/armada/internal/common/config"
commonmetrics "github.com/armadaproject/armada/internal/common/ingest/metrics"
Expand Down Expand Up @@ -63,7 +62,6 @@ type Sink[T HasPulsarMessageIds] interface {
// exhausted and a Sink capable of exhausting these objects
type IngestionPipeline[T HasPulsarMessageIds, U utils.ArmadaEvent] struct {
pulsarConfig commonconfig.PulsarConfig
metricsPort uint16
metrics *commonmetrics.Metrics
pulsarTopic string
pulsarSubscriptionName string
Expand Down Expand Up @@ -93,13 +91,11 @@ func NewIngestionPipeline[T HasPulsarMessageIds, U utils.ArmadaEvent](
metricPublisher BatchMetricPublisher[U],
converter InstructionConverter[T, U],
sink Sink[T],
metricsPort uint16,
metrics *commonmetrics.Metrics,
) *IngestionPipeline[T, U] {
return &IngestionPipeline[T, U]{
pulsarConfig: pulsarConfig,
pulsarTopic: pulsarTopic,
metricsPort: metricsPort,
metrics: metrics,
pulsarSubscriptionName: pulsarSubscriptionName,
pulsarBatchSize: pulsarBatchSize,
Expand All @@ -116,9 +112,6 @@ func NewIngestionPipeline[T HasPulsarMessageIds, U utils.ArmadaEvent](

// Run will run the ingestion pipeline until the supplied context is shut down
func (i *IngestionPipeline[T, U]) Run(ctx *armadacontext.Context) error {
shutdownMetricServer := common.ServeMetrics(i.metricsPort)
defer shutdownMetricServer()

// Waitgroup that wil fire when the pipeline has been torn down
wg := &sync.WaitGroup{}
wg.Add(1)
Expand Down
79 changes: 75 additions & 4 deletions internal/common/ingest/ingestion_pipeline_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ingest

import (
"sync"
"testing"
"time"

Expand Down Expand Up @@ -263,6 +264,7 @@ func TestRun_ControlPlaneEvents_HappyPath_SingleMessage(t *testing.T) {

mockConsumer.assertDidAck(messages)
sink.assertDidProcess(messages)
sink.assertProcessedMessageCount(len(messages))
}

func TestRun_ControlPlaneEvents_HappyPath_MultipleMessages(t *testing.T) {
Expand All @@ -286,6 +288,7 @@ func TestRun_ControlPlaneEvents_HappyPath_MultipleMessages(t *testing.T) {

mockConsumer.assertDidAck(messages)
sink.assertDidProcess(messages)
sink.assertProcessedMessageCount(len(messages))
}

func TestRun_ControlPlaneEvents_LimitsProcessingBatchSize(t *testing.T) {
Expand Down Expand Up @@ -337,6 +340,7 @@ func TestRun_ControlPlaneEvents_LimitsProcessingBatchSize(t *testing.T) {
assert.True(t, eventCount < tc.batchSize+1)
}
sink.assertDidProcess(messages)
sink.assertProcessedMessageCount(len(messages))
})
}
}
Expand All @@ -356,7 +360,6 @@ func testControlPlaneEventsPipeline(consumer pulsar.Consumer, converter Instruct
metricPublisher: controlplaneevents_ingest_utils.BatchMetricPublisher,
converter: converter,
sink: sink,
metricsPort: 8080,
metrics: testMetrics,
consumer: consumer,
}
Expand All @@ -365,32 +368,42 @@ func testControlPlaneEventsPipeline(consumer pulsar.Consumer, converter Instruct
type simpleSink struct {
simpleMessages map[pulsar.MessageID]*simpleMessage
t *testing.T
mutex sync.Mutex
}

func newSimpleSink(t *testing.T) *simpleSink {
return &simpleSink{
simpleMessages: make(map[pulsar.MessageID]*simpleMessage),
t: t,
mutex: sync.Mutex{},
}
}

func (s *simpleSink) Store(_ *armadacontext.Context, msg *simpleMessages) error {
for _, simpleMessage := range msg.msgs {
s.simpleMessages[simpleMessage.id] = simpleMessage
s.mutex.Lock()
if simpleMessage != nil {
s.simpleMessages[simpleMessage.id] = simpleMessage
}
s.mutex.Unlock()
}
return nil
}

func (s *simpleSink) assertDidProcess(messages []pulsar.Message) {
s.t.Helper()
assert.Len(s.t, s.simpleMessages, len(messages))
for _, msg := range messages {
simpleMessage, ok := s.simpleMessages[msg.ID()]
assert.True(s.t, ok)
assert.Greater(s.t, simpleMessage.size, 0)
}
}

func (s *simpleSink) assertProcessedMessageCount(count int) {
s.t.Helper()
assert.Len(s.t, s.simpleMessages, count)
}

func TestRun_JobSetEvents_HappyPath_SingleMessage(t *testing.T) {
ctx, cancel := armadacontext.WithDeadline(armadacontext.Background(), time.Now().Add(10*time.Second))
messages := []pulsar.Message{
Expand All @@ -410,6 +423,7 @@ func TestRun_JobSetEvents_HappyPath_SingleMessage(t *testing.T) {

mockConsumer.assertDidAck(messages)
sink.assertDidProcess(messages)
sink.assertProcessedMessageCount(len(messages))
}

func TestRun_JobSetEvents_HappyPath_MultipleMessages(t *testing.T) {
Expand All @@ -433,6 +447,7 @@ func TestRun_JobSetEvents_HappyPath_MultipleMessages(t *testing.T) {

mockConsumer.assertDidAck(messages)
sink.assertDidProcess(messages)
sink.assertProcessedMessageCount(len(messages))
}

func TestRun_JobSetEvents_LimitsProcessingBatchSize(t *testing.T) {
Expand Down Expand Up @@ -490,10 +505,67 @@ func TestRun_JobSetEvents_LimitsProcessingBatchSize(t *testing.T) {
assert.True(t, eventCount < tc.batchSize+tc.numberOfEventsPerMessage)
}
sink.assertDidProcess(messages)
sink.assertProcessedMessageCount(len(messages))
})
}
}

// This will become a more common use case - multiple ingesters ingesting into the same sink
func TestRun_MultipleSimultaneousIngesters(t *testing.T) {
jsCtx, jsCancel := armadacontext.WithDeadline(armadacontext.Background(), time.Now().Add(10*time.Second))
cpCtx, cpCancel := armadacontext.WithDeadline(armadacontext.Background(), time.Now().Add(10*time.Second))
jobSetMessages := []pulsar.Message{
pulsarutils.NewPulsarMessage(1, baseTime, marshal(t, succeeded)),
pulsarutils.NewPulsarMessage(2, baseTime.Add(1*time.Second), marshal(t, pendingAndRunning)),
pulsarutils.NewPulsarMessage(3, baseTime.Add(2*time.Second), marshal(t, failed)),
}
controlPlaneMessages := []pulsar.Message{
pulsarutils.NewPulsarMessage(4, baseTime, marshal(t, f.UpsertExecutorSettingsCordon)),
pulsarutils.NewPulsarMessage(5, baseTime.Add(1*time.Second), marshal(t, f.UpsertExecutorSettingsUncordon)),
pulsarutils.NewPulsarMessage(6, baseTime.Add(2*time.Second), marshal(t, f.DeleteExecutorSettings)),
}
mockJobSetEventsConsumer := newMockPulsarConsumer(t, jobSetMessages, jsCancel)
mockControlPlaneEventsConsumer := newMockPulsarConsumer(t, controlPlaneMessages, cpCancel)

jobSetEventsConverter := newSimpleEventSequenceConverter(t)
controlPlaneEventsConverter := newSimpleControlPlaneEventConverter(t)

sink := newSimpleSink(t)

jobSetEventsPipeline := testJobSetEventsPipeline(mockJobSetEventsConsumer, jobSetEventsConverter, sink)
controlPlaneEventsPipeline := testControlPlaneEventsPipeline(mockControlPlaneEventsConsumer, controlPlaneEventsConverter, sink)

var jsErr error
var cpErr error
wg := sync.WaitGroup{}
start := time.Now()

wg.Add(1)
go func() {
defer wg.Done()
jsErr = jobSetEventsPipeline.Run(jsCtx)
}()

wg.Add(1)
go func() {
defer wg.Done()
cpErr = controlPlaneEventsPipeline.Run(cpCtx)
}()

wg.Wait()
elapsed := time.Since(start)

assert.NoError(t, jsErr)
assert.NoError(t, cpErr)
assert.LessOrEqual(t, elapsed, batchDuration*2)

mockJobSetEventsConsumer.assertDidAck(jobSetMessages)
mockControlPlaneEventsConsumer.assertDidAck(controlPlaneMessages)
sink.assertDidProcess(jobSetMessages)
sink.assertDidProcess(controlPlaneMessages)
sink.assertProcessedMessageCount(len(controlPlaneMessages) + len(jobSetMessages))
}

func testJobSetEventsPipeline(consumer pulsar.Consumer, converter InstructionConverter[*simpleMessages, *armadaevents.EventSequence], sink Sink[*simpleMessages]) *IngestionPipeline[*simpleMessages, *armadaevents.EventSequence] {
return &IngestionPipeline[*simpleMessages, *armadaevents.EventSequence]{
pulsarConfig: commonconfig.PulsarConfig{
Expand All @@ -509,7 +581,6 @@ func testJobSetEventsPipeline(consumer pulsar.Consumer, converter InstructionCon
metricPublisher: jobsetevents.BatchMetricPublisher,
converter: converter,
sink: sink,
metricsPort: 8080,
metrics: testMetrics,
consumer: consumer,
}
Expand Down
15 changes: 13 additions & 2 deletions internal/common/metrics/scheduler_metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package metrics

import (
"regexp"

"github.com/prometheus/client_golang/prometheus"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
Expand Down Expand Up @@ -394,8 +396,10 @@ func NewQueueLabelsMetric(queue string, labels map[string]string) prometheus.Met
values = append(values, queue)

for key, value := range labels {
metricLabels = append(metricLabels, key)
values = append(values, value)
if isValidMetricLabelName(key) {
metricLabels = append(metricLabels, key)
values = append(values, value)
}
}

queueLabelsDesc := prometheus.NewDesc(
Expand All @@ -407,3 +411,10 @@ func NewQueueLabelsMetric(queue string, labels map[string]string) prometheus.Met

return prometheus.MustNewConstMetric(queueLabelsDesc, prometheus.GaugeValue, 1, values...)
}

func isValidMetricLabelName(labelName string) bool {
// Prometheus metric label names must match the following regex: [a-zA-Z_][a-zA-Z0-9_]*
// See: https://prometheus.io/docs/concepts/data_model/
match, _ := regexp.MatchString("^[a-zA-Z_][a-zA-Z0-9_]*$", labelName)
return match
}
54 changes: 54 additions & 0 deletions internal/common/metrics/scheduler_metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package metrics

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestQueueLabelValidation(t *testing.T) {
tests := map[string]struct {
labelName string
isValid bool
}{
"Empty label name": {
labelName: "",
isValid: false,
},
"Priority label": {
labelName: "priority",
isValid: true,
},
"Label name with underscores": {
labelName: "priority__cpu_pool",
isValid: true,
},
"Label name with spaces": {
labelName: "priority cpu pool",
isValid: false,
},
"Alphanumeric label name": {
labelName: "cluster_12_user",
isValid: true,
},
"Invalid Kubernetes-style label name 1": {
labelName: "armadaproject.io/category",
isValid: false,
},
"Invalid Kubernetes-style label name 2": {
labelName: "armadaproject.io/ttl",
isValid: false,
},
"Invalid Kubernetes-style label name 3": {
labelName: "kubernetes.io/metadata.name",
isValid: false,
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
isValid := isValidMetricLabelName(tc.labelName)
assert.Equal(t, tc.isValid, isValid)
})
}
}
6 changes: 5 additions & 1 deletion internal/eventingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"

"github.com/armadaproject/armada/internal/common"
"github.com/armadaproject/armada/internal/common/app"
"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/compress"
Expand Down Expand Up @@ -62,6 +63,10 @@ func Run(config *configuration.EventIngesterConfiguration) {
}
converter := convert.NewEventConverter(compressor, uint(config.MaxOutputMessageSizeBytes), metrics)

// Start metric server
shutdownMetricServer := common.ServeMetrics(config.MetricsPort)
defer shutdownMetricServer()

ingester := ingest.NewIngestionPipeline[*model.BatchUpdate, *armadaevents.EventSequence](
config.Pulsar,
config.Pulsar.JobsetEventsTopic,
Expand All @@ -75,7 +80,6 @@ func Run(config *configuration.EventIngesterConfiguration) {
jobsetevents.BatchMetricPublisher,
converter,
eventDb,
config.MetricsPort,
metrics,
)
if err := ingester.Run(app.CreateContextWithShutdown()); err != nil {
Expand Down
6 changes: 5 additions & 1 deletion internal/lookoutingesterv2/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"

"github.com/armadaproject/armada/internal/common"
"github.com/armadaproject/armada/internal/common/app"
"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/compress"
Expand Down Expand Up @@ -55,6 +56,10 @@ func Run(config *configuration.LookoutIngesterV2Configuration) {
log.Fatalf("Pprof setup failed, exiting, %v", err)
}

// Start metric server
shutdownMetricServer := common.ServeMetrics(config.MetricsPort)
defer shutdownMetricServer()

converter := instructions.NewInstructionConverter(m.Metrics, config.UserAnnotationPrefix, compressor)

ingester := ingest.NewIngestionPipeline[*model.InstructionSet, *armadaevents.EventSequence](
Expand All @@ -70,7 +75,6 @@ func Run(config *configuration.LookoutIngesterV2Configuration) {
jobsetevents.BatchMetricPublisher,
converter,
lookoutDb,
config.MetricsPort,
m.Metrics,
)

Expand Down
7 changes: 5 additions & 2 deletions internal/scheduleringester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"

"github.com/armadaproject/armada/internal/common"
"github.com/armadaproject/armada/internal/common/app"
"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/database"
Expand Down Expand Up @@ -43,6 +44,10 @@ func Run(config Configuration) error {
log.Fatalf("Pprof setup failed, exiting, %v", err)
}

// Start metric server
shutdownMetricServer := common.ServeMetrics(config.MetricsPort)
defer shutdownMetricServer()

jobSetEventsIngester := ingest.NewIngestionPipeline[*DbOperationsWithMessageIds, *armadaevents.EventSequence](
config.Pulsar,
config.Pulsar.JobsetEventsTopic,
Expand All @@ -56,7 +61,6 @@ func Run(config Configuration) error {
jobsetevents.BatchMetricPublisher,
jobSetEventsConverter,
schedulerDb,
config.MetricsPort,
svcMetrics,
)

Expand All @@ -78,7 +82,6 @@ func Run(config Configuration) error {
controlplaneevents_ingest_utils.BatchMetricPublisher,
controlPlaneEventsConverter,
schedulerDb,
config.MetricsPort,
svcMetrics,
)

Expand Down

0 comments on commit 7115d29

Please sign in to comment.