Skip to content

Commit

Permalink
Adding scheduler metrics to expose cluster cordoned status (#261) (#4019
Browse files Browse the repository at this point in the history
)

Co-authored-by: Mustafa Ilyas <Mustafa.Ilyas@gresearch.co.uk>
  • Loading branch information
MustafaI and mustafai-gr authored Oct 22, 2024
1 parent be8f657 commit b1d9168
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 4 deletions.
11 changes: 11 additions & 0 deletions internal/common/metrics/scheduler_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,13 @@ var ClusterAvailableCapacityDesc = prometheus.NewDesc(
nil,
)

var ClusterCordonedStatusDesc = prometheus.NewDesc(
MetricPrefix+"cluster_cordoned_status",
"Cluster cordoned status",
[]string{"cluster", "reason"},
nil,
)

var QueuePriorityDesc = prometheus.NewDesc(
MetricPrefix+"queue_priority",
"Queue priority factor",
Expand Down Expand Up @@ -375,6 +382,10 @@ func NewClusterTotalCapacity(value float64, cluster string, pool string, resourc
return prometheus.MustNewConstMetric(ClusterCapacityDesc, prometheus.GaugeValue, value, cluster, pool, resource, nodeType)
}

func NewClusterCordonedStatus(value float64, cluster string, reason string) prometheus.Metric {
return prometheus.MustNewConstMetric(ClusterCordonedStatusDesc, prometheus.GaugeValue, value, cluster, reason)
}

func NewQueueAllocated(value float64, queue string, cluster string, pool string, priorityClass string, resource string, nodeType string) prometheus.Metric {
return prometheus.MustNewConstMetric(QueueAllocatedDesc, prometheus.GaugeValue, value, cluster, pool, priorityClass, queue, queue, resource, nodeType)
}
Expand Down
33 changes: 33 additions & 0 deletions internal/scheduler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,22 @@ type clusterMetricKey struct {
nodeType string
}

type clusterCordonedStatus struct {
status float64
reason string
}

func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]prometheus.Metric, error) {
executors, err := c.executorRepository.GetExecutors(ctx)
if err != nil {
return nil, err
}
executorSettings, err := c.executorRepository.GetExecutorSettings(ctx)
if err != nil {
return nil, err
}

cordonedStatusByCluster := map[string]*clusterCordonedStatus{}
phaseCountByQueue := map[queuePhaseMetricKey]int{}
allocatedResourceByQueue := map[queueMetricKey]schedulerobjects.ResourceList{}
usedResourceByQueue := map[queueMetricKey]schedulerobjects.ResourceList{}
Expand All @@ -278,6 +288,11 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p

txn := c.jobDb.ReadTxn()
for _, executor := range executors {
// We may not have executorSettings for all known executors, but we still want a cordon status metric for them.
cordonedStatusByCluster[executor.Id] = &clusterCordonedStatus{
status: 0.0,
reason: "",
}
for _, node := range executor.Nodes {
nodePool := node.GetPool()
awayPools := poolToAwayPools[nodePool]
Expand Down Expand Up @@ -367,6 +382,21 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p
}
}

for _, executorSetting := range executorSettings {
if executorSetting.Cordoned {
if cordonedValue, ok := cordonedStatusByCluster[executorSetting.ExecutorId]; ok {
cordonedValue.status = 1.0
cordonedValue.reason = executorSetting.CordonReason
} else {
// We may have settings for executors that don't exist in the repository.
cordonedStatusByCluster[executorSetting.ExecutorId] = &clusterCordonedStatus{
status: 1.0,
reason: executorSetting.CordonReason,
}
}
}
}

for _, pool := range c.floatingResourceTypes.AllPools() {
totalFloatingResources := c.floatingResourceTypes.GetTotalAvailableForPool(pool)
clusterKey := clusterMetricKey{
Expand Down Expand Up @@ -408,6 +438,9 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p
for k, v := range totalNodeCountByCluster {
clusterMetrics = append(clusterMetrics, commonmetrics.NewClusterTotalCapacity(float64(v), k.cluster, k.pool, "nodes", k.nodeType))
}
for cluster, v := range cordonedStatusByCluster {
clusterMetrics = append(clusterMetrics, commonmetrics.NewClusterCordonedStatus(v.status, cluster, v.reason))
}
return clusterMetrics, nil
}

Expand Down
43 changes: 39 additions & 4 deletions internal/scheduler/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func TestMetricsCollector_TestCollect_QueueMetrics(t *testing.T) {

executorRepository := schedulermocks.NewMockExecutorRepository(ctrl)
executorRepository.EXPECT().GetExecutors(ctx).Return([]*schedulerobjects.Executor{}, nil)
executorRepository.EXPECT().GetExecutorSettings(ctx).Return([]*schedulerobjects.ExecutorSettings{}, nil)

collector := NewMetricsCollector(
jobDb,
Expand Down Expand Up @@ -203,10 +204,11 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) {
executorWithJobs := createExecutor("cluster-1", nodeWithJobs)

tests := map[string]struct {
jobDbJobs []*jobdb.Job
floatingResourceTypes *floatingresources.FloatingResourceTypes
executors []*schedulerobjects.Executor
expected []prometheus.Metric
jobDbJobs []*jobdb.Job
floatingResourceTypes *floatingresources.FloatingResourceTypes
executors []*schedulerobjects.Executor
expected []prometheus.Metric
expectedExecutorSettings []*schedulerobjects.ExecutorSettings
}{
"empty cluster single node type": {
jobDbJobs: []*jobdb.Job{},
Expand All @@ -218,7 +220,9 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) {
commonmetrics.NewClusterTotalCapacity(64, "cluster-1", testfixtures.TestPool, "cpu", "type-1"),
commonmetrics.NewClusterTotalCapacity(512*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"),
commonmetrics.NewClusterTotalCapacity(2, "cluster-1", testfixtures.TestPool, "nodes", "type-1"),
commonmetrics.NewClusterCordonedStatus(0.0, "cluster-1", ""),
},
expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{},
},
"empty cluster multi node type": {
jobDbJobs: []*jobdb.Job{},
Expand All @@ -236,7 +240,9 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) {
commonmetrics.NewClusterTotalCapacity(32, "cluster-1", testfixtures.TestPool, "cpu", "type-2"),
commonmetrics.NewClusterTotalCapacity(256*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-2"),
commonmetrics.NewClusterTotalCapacity(1, "cluster-1", testfixtures.TestPool, "nodes", "type-2"),
commonmetrics.NewClusterCordonedStatus(0.0, "cluster-1", ""),
},
expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{},
},
"empty cluster with unschedulable node": {
jobDbJobs: []*jobdb.Job{},
Expand All @@ -248,7 +254,9 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) {
commonmetrics.NewClusterTotalCapacity(64, "cluster-1", testfixtures.TestPool, "cpu", "type-1"),
commonmetrics.NewClusterTotalCapacity(512*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"),
commonmetrics.NewClusterTotalCapacity(2, "cluster-1", testfixtures.TestPool, "nodes", "type-1"),
commonmetrics.NewClusterCordonedStatus(0.0, "cluster-1", ""),
},
expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{},
},
"cluster with jobs": {
jobDbJobs: []*jobdb.Job{job1, job2},
Expand All @@ -266,7 +274,9 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) {
commonmetrics.NewClusterTotalCapacity(32, "cluster-1", testfixtures.TestPool, "cpu", "type-1"),
commonmetrics.NewClusterTotalCapacity(256*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"),
commonmetrics.NewClusterTotalCapacity(1, "cluster-1", testfixtures.TestPool, "nodes", "type-1"),
commonmetrics.NewClusterCordonedStatus(0.0, "cluster-1", ""),
},
expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{},
},
"jobs missing from jobDb": {
jobDbJobs: []*jobdb.Job{},
Expand All @@ -280,7 +290,9 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) {
commonmetrics.NewClusterTotalCapacity(32, "cluster-1", testfixtures.TestPool, "cpu", "type-1"),
commonmetrics.NewClusterTotalCapacity(256*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"),
commonmetrics.NewClusterTotalCapacity(1, "cluster-1", testfixtures.TestPool, "nodes", "type-1"),
commonmetrics.NewClusterCordonedStatus(0.0, "cluster-1", ""),
},
expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{},
},
"floating resources": {
jobDbJobs: []*jobdb.Job{},
Expand All @@ -290,6 +302,27 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) {
commonmetrics.NewClusterAvailableCapacity(10, "floating", "pool", "test-floating-resource", ""),
commonmetrics.NewClusterTotalCapacity(10, "floating", "pool", "test-floating-resource", ""),
},
expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{},
},
"cordoned cluster single node type": {
jobDbJobs: []*jobdb.Job{},
executors: []*schedulerobjects.Executor{executor},
expected: []prometheus.Metric{
commonmetrics.NewClusterAvailableCapacity(64, "cluster-1", testfixtures.TestPool, "cpu", "type-1"),
commonmetrics.NewClusterAvailableCapacity(512*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"),
commonmetrics.NewClusterAvailableCapacity(2, "cluster-1", testfixtures.TestPool, "nodes", "type-1"),
commonmetrics.NewClusterTotalCapacity(64, "cluster-1", testfixtures.TestPool, "cpu", "type-1"),
commonmetrics.NewClusterTotalCapacity(512*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"),
commonmetrics.NewClusterTotalCapacity(2, "cluster-1", testfixtures.TestPool, "nodes", "type-1"),
commonmetrics.NewClusterCordonedStatus(1.0, "cluster-1", "bad executor"),
},
expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{
{
ExecutorId: "cluster-1",
Cordoned: true,
CordonReason: "bad executor",
},
},
},
}
for name, tc := range tests {
Expand All @@ -311,6 +344,7 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) {

executorRepository := schedulermocks.NewMockExecutorRepository(ctrl)
executorRepository.EXPECT().GetExecutors(ctx).Return(tc.executors, nil)
executorRepository.EXPECT().GetExecutorSettings(ctx).Return(tc.expectedExecutorSettings, nil)

if tc.floatingResourceTypes == nil {
tc.floatingResourceTypes = testfixtures.TestEmptyFloatingResources
Expand Down Expand Up @@ -412,6 +446,7 @@ func TestMetricsCollector_TestCollect_ClusterMetricsAvailableCapacity(t *testing

executorRepository := schedulermocks.NewMockExecutorRepository(ctrl)
executorRepository.EXPECT().GetExecutors(ctx).Return(executors, nil)
executorRepository.EXPECT().GetExecutorSettings(ctx).Return([]*schedulerobjects.ExecutorSettings{}, nil)

collector := NewMetricsCollector(
jobDb,
Expand Down

0 comments on commit b1d9168

Please sign in to comment.