From b1d9168538a0f48330389ffb988eb53bb3c7be15 Mon Sep 17 00:00:00 2001 From: Mustafa Ilyas Date: Tue, 22 Oct 2024 11:19:58 +0100 Subject: [PATCH] Adding scheduler metrics to expose cluster cordoned status (#261) (#4019) Co-authored-by: Mustafa Ilyas --- internal/common/metrics/scheduler_metrics.go | 11 +++++ internal/scheduler/metrics.go | 33 +++++++++++++++ internal/scheduler/metrics_test.go | 43 ++++++++++++++++++-- 3 files changed, 83 insertions(+), 4 deletions(-) diff --git a/internal/common/metrics/scheduler_metrics.go b/internal/common/metrics/scheduler_metrics.go index 85a686ee6e9..063a80b5738 100644 --- a/internal/common/metrics/scheduler_metrics.go +++ b/internal/common/metrics/scheduler_metrics.go @@ -171,6 +171,13 @@ var ClusterAvailableCapacityDesc = prometheus.NewDesc( nil, ) +var ClusterCordonedStatusDesc = prometheus.NewDesc( + MetricPrefix+"cluster_cordoned_status", + "Cluster cordoned status", + []string{"cluster", "reason"}, + nil, +) + var QueuePriorityDesc = prometheus.NewDesc( MetricPrefix+"queue_priority", "Queue priority factor", @@ -375,6 +382,10 @@ func NewClusterTotalCapacity(value float64, cluster string, pool string, resourc return prometheus.MustNewConstMetric(ClusterCapacityDesc, prometheus.GaugeValue, value, cluster, pool, resource, nodeType) } +func NewClusterCordonedStatus(value float64, cluster string, reason string) prometheus.Metric { + return prometheus.MustNewConstMetric(ClusterCordonedStatusDesc, prometheus.GaugeValue, value, cluster, reason) +} + func NewQueueAllocated(value float64, queue string, cluster string, pool string, priorityClass string, resource string, nodeType string) prometheus.Metric { return prometheus.MustNewConstMetric(QueueAllocatedDesc, prometheus.GaugeValue, value, cluster, pool, priorityClass, queue, queue, resource, nodeType) } diff --git a/internal/scheduler/metrics.go b/internal/scheduler/metrics.go index d356b147d85..78e22da11a2 100644 --- a/internal/scheduler/metrics.go +++ b/internal/scheduler/metrics.go @@ -255,12 +255,22 @@ type clusterMetricKey struct { nodeType string } +type clusterCordonedStatus struct { + status float64 + reason string +} + func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]prometheus.Metric, error) { executors, err := c.executorRepository.GetExecutors(ctx) if err != nil { return nil, err } + executorSettings, err := c.executorRepository.GetExecutorSettings(ctx) + if err != nil { + return nil, err + } + cordonedStatusByCluster := map[string]*clusterCordonedStatus{} phaseCountByQueue := map[queuePhaseMetricKey]int{} allocatedResourceByQueue := map[queueMetricKey]schedulerobjects.ResourceList{} usedResourceByQueue := map[queueMetricKey]schedulerobjects.ResourceList{} @@ -278,6 +288,11 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p txn := c.jobDb.ReadTxn() for _, executor := range executors { + // We may not have executorSettings for all known executors, but we still want a cordon status metric for them. + cordonedStatusByCluster[executor.Id] = &clusterCordonedStatus{ + status: 0.0, + reason: "", + } for _, node := range executor.Nodes { nodePool := node.GetPool() awayPools := poolToAwayPools[nodePool] @@ -367,6 +382,21 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p } } + for _, executorSetting := range executorSettings { + if executorSetting.Cordoned { + if cordonedValue, ok := cordonedStatusByCluster[executorSetting.ExecutorId]; ok { + cordonedValue.status = 1.0 + cordonedValue.reason = executorSetting.CordonReason + } else { + // We may have settings for executors that don't exist in the repository. + cordonedStatusByCluster[executorSetting.ExecutorId] = &clusterCordonedStatus{ + status: 1.0, + reason: executorSetting.CordonReason, + } + } + } + } + for _, pool := range c.floatingResourceTypes.AllPools() { totalFloatingResources := c.floatingResourceTypes.GetTotalAvailableForPool(pool) clusterKey := clusterMetricKey{ @@ -408,6 +438,9 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p for k, v := range totalNodeCountByCluster { clusterMetrics = append(clusterMetrics, commonmetrics.NewClusterTotalCapacity(float64(v), k.cluster, k.pool, "nodes", k.nodeType)) } + for cluster, v := range cordonedStatusByCluster { + clusterMetrics = append(clusterMetrics, commonmetrics.NewClusterCordonedStatus(v.status, cluster, v.reason)) + } return clusterMetrics, nil } diff --git a/internal/scheduler/metrics_test.go b/internal/scheduler/metrics_test.go index f21c5ceec25..50b0627fbd0 100644 --- a/internal/scheduler/metrics_test.go +++ b/internal/scheduler/metrics_test.go @@ -144,6 +144,7 @@ func TestMetricsCollector_TestCollect_QueueMetrics(t *testing.T) { executorRepository := schedulermocks.NewMockExecutorRepository(ctrl) executorRepository.EXPECT().GetExecutors(ctx).Return([]*schedulerobjects.Executor{}, nil) + executorRepository.EXPECT().GetExecutorSettings(ctx).Return([]*schedulerobjects.ExecutorSettings{}, nil) collector := NewMetricsCollector( jobDb, @@ -203,10 +204,11 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) { executorWithJobs := createExecutor("cluster-1", nodeWithJobs) tests := map[string]struct { - jobDbJobs []*jobdb.Job - floatingResourceTypes *floatingresources.FloatingResourceTypes - executors []*schedulerobjects.Executor - expected []prometheus.Metric + jobDbJobs []*jobdb.Job + floatingResourceTypes *floatingresources.FloatingResourceTypes + executors []*schedulerobjects.Executor + expected []prometheus.Metric + expectedExecutorSettings []*schedulerobjects.ExecutorSettings }{ "empty cluster single node type": { jobDbJobs: []*jobdb.Job{}, @@ -218,7 +220,9 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) { commonmetrics.NewClusterTotalCapacity(64, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), commonmetrics.NewClusterTotalCapacity(512*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"), commonmetrics.NewClusterTotalCapacity(2, "cluster-1", testfixtures.TestPool, "nodes", "type-1"), + commonmetrics.NewClusterCordonedStatus(0.0, "cluster-1", ""), }, + expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{}, }, "empty cluster multi node type": { jobDbJobs: []*jobdb.Job{}, @@ -236,7 +240,9 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) { commonmetrics.NewClusterTotalCapacity(32, "cluster-1", testfixtures.TestPool, "cpu", "type-2"), commonmetrics.NewClusterTotalCapacity(256*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-2"), commonmetrics.NewClusterTotalCapacity(1, "cluster-1", testfixtures.TestPool, "nodes", "type-2"), + commonmetrics.NewClusterCordonedStatus(0.0, "cluster-1", ""), }, + expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{}, }, "empty cluster with unschedulable node": { jobDbJobs: []*jobdb.Job{}, @@ -248,7 +254,9 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) { commonmetrics.NewClusterTotalCapacity(64, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), commonmetrics.NewClusterTotalCapacity(512*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"), commonmetrics.NewClusterTotalCapacity(2, "cluster-1", testfixtures.TestPool, "nodes", "type-1"), + commonmetrics.NewClusterCordonedStatus(0.0, "cluster-1", ""), }, + expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{}, }, "cluster with jobs": { jobDbJobs: []*jobdb.Job{job1, job2}, @@ -266,7 +274,9 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) { commonmetrics.NewClusterTotalCapacity(32, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), commonmetrics.NewClusterTotalCapacity(256*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"), commonmetrics.NewClusterTotalCapacity(1, "cluster-1", testfixtures.TestPool, "nodes", "type-1"), + commonmetrics.NewClusterCordonedStatus(0.0, "cluster-1", ""), }, + expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{}, }, "jobs missing from jobDb": { jobDbJobs: []*jobdb.Job{}, @@ -280,7 +290,9 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) { commonmetrics.NewClusterTotalCapacity(32, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), commonmetrics.NewClusterTotalCapacity(256*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"), commonmetrics.NewClusterTotalCapacity(1, "cluster-1", testfixtures.TestPool, "nodes", "type-1"), + commonmetrics.NewClusterCordonedStatus(0.0, "cluster-1", ""), }, + expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{}, }, "floating resources": { jobDbJobs: []*jobdb.Job{}, @@ -290,6 +302,27 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) { commonmetrics.NewClusterAvailableCapacity(10, "floating", "pool", "test-floating-resource", ""), commonmetrics.NewClusterTotalCapacity(10, "floating", "pool", "test-floating-resource", ""), }, + expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{}, + }, + "cordoned cluster single node type": { + jobDbJobs: []*jobdb.Job{}, + executors: []*schedulerobjects.Executor{executor}, + expected: []prometheus.Metric{ + commonmetrics.NewClusterAvailableCapacity(64, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), + commonmetrics.NewClusterAvailableCapacity(512*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"), + commonmetrics.NewClusterAvailableCapacity(2, "cluster-1", testfixtures.TestPool, "nodes", "type-1"), + commonmetrics.NewClusterTotalCapacity(64, "cluster-1", testfixtures.TestPool, "cpu", "type-1"), + commonmetrics.NewClusterTotalCapacity(512*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"), + commonmetrics.NewClusterTotalCapacity(2, "cluster-1", testfixtures.TestPool, "nodes", "type-1"), + commonmetrics.NewClusterCordonedStatus(1.0, "cluster-1", "bad executor"), + }, + expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{ + { + ExecutorId: "cluster-1", + Cordoned: true, + CordonReason: "bad executor", + }, + }, }, } for name, tc := range tests { @@ -311,6 +344,7 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) { executorRepository := schedulermocks.NewMockExecutorRepository(ctrl) executorRepository.EXPECT().GetExecutors(ctx).Return(tc.executors, nil) + executorRepository.EXPECT().GetExecutorSettings(ctx).Return(tc.expectedExecutorSettings, nil) if tc.floatingResourceTypes == nil { tc.floatingResourceTypes = testfixtures.TestEmptyFloatingResources @@ -412,6 +446,7 @@ func TestMetricsCollector_TestCollect_ClusterMetricsAvailableCapacity(t *testing executorRepository := schedulermocks.NewMockExecutorRepository(ctrl) executorRepository.EXPECT().GetExecutors(ctx).Return(executors, nil) + executorRepository.EXPECT().GetExecutorSettings(ctx).Return([]*schedulerobjects.ExecutorSettings{}, nil) collector := NewMetricsCollector( jobDb,