diff --git a/internal/common/maps/maps.go b/internal/common/maps/maps.go index ce9f7ec170b..28ba6e3540b 100644 --- a/internal/common/maps/maps.go +++ b/internal/common/maps/maps.go @@ -2,6 +2,15 @@ package maps import "github.com/armadaproject/armada/internal/common/interfaces" +// FromSlice maps element e of slice into map entry keyFunc(e): valueFunc(e) +func FromSlice[S []E, E any, K comparable, V any](slice S, keyFunc func(E) K, valueFunc func(E) V) map[K]V { + rv := make(map[K]V, len(slice)) + for _, elem := range slice { + rv[keyFunc(elem)] = valueFunc(elem) + } + return rv +} + // MapValues maps the values of m into valueFunc(v). func MapValues[M ~map[K]VA, K comparable, VA any, VB any](m M, valueFunc func(VA) VB) map[K]VB { rv := make(map[K]VB, len(m)) @@ -75,3 +84,22 @@ func Filter[M ~map[K]V, K comparable, V any](m M, predicate func(K, V) bool) M { } return rv } + +// RemoveInPlace removes elements that match keySelector +func RemoveInPlace[K comparable, V any](m map[K]V, keySelector func(K) bool) { + for k := range m { + if keySelector(k) { + delete(m, k) + } + } +} + +func Keys[K comparable, V any](m map[K]V) []K { + i := 0 + result := make([]K, len(m)) + for k := range m { + result[i] = k + i++ + } + return result +} diff --git a/internal/common/maps/maps_test.go b/internal/common/maps/maps_test.go index e829d88bb2c..f7d6838dae5 100644 --- a/internal/common/maps/maps_test.go +++ b/internal/common/maps/maps_test.go @@ -1,12 +1,26 @@ package maps import ( + "strconv" "testing" "github.com/stretchr/testify/assert" "golang.org/x/exp/slices" ) +func TestFromSlice(t *testing.T) { + actual := FromSlice( + []int{1, 2}, + func(elem int) string { return strconv.Itoa(elem) }, + func(elem int) float64 { return float64(elem) * 0.1 }, + ) + expected := map[string]float64{ + "1": 0.1, + "2": 0.2, + } + assert.Equal(t, expected, actual) +} + func TestMapKeys(t *testing.T) { m := map[string][]int{ "foo": {1, 2, 3}, @@ -176,3 +190,31 @@ func TestFilterKeys(t *testing.T) { } assert.Equal(t, expected, actual) } + +func TestRemoveInPlace(t *testing.T) { + m := map[int]string{ + 1: "one", + 2: "two", + 3: "three", + 4: "four", + } + RemoveInPlace(m, func(i int) bool { + return i > 2 + }) + expected := map[int]string{ + 1: "one", + 2: "two", + } + assert.Equal(t, expected, m) +} + +func TestKeys(t *testing.T) { + m := map[int]string{ + 1: "one", + 2: "two", + } + result := Keys(m) + slices.Sort(result) + expected := []int{1, 2} + assert.Equal(t, expected, result) +} diff --git a/internal/common/metrics/domain.go b/internal/common/metrics/domain.go index ba6eb2eaf34..599f1fdc8b6 100644 --- a/internal/common/metrics/domain.go +++ b/internal/common/metrics/domain.go @@ -10,6 +10,7 @@ import ( type QueueMetricProvider interface { GetQueuedJobMetrics(queueName string) []*QueueMetrics GetRunningJobMetrics(queueName string) []*QueueMetrics + GetQueuePriorites() map[string]float64 } type QueueMetrics struct { diff --git a/internal/common/metrics/scheduler_metrics.go b/internal/common/metrics/scheduler_metrics.go index 64bf0ab5ff1..a99af7e7509 100644 --- a/internal/common/metrics/scheduler_metrics.go +++ b/internal/common/metrics/scheduler_metrics.go @@ -11,154 +11,147 @@ const MetricPrefix = "armada_" var QueueSizeDesc = prometheus.NewDesc( MetricPrefix+"queue_size", "Number of jobs in a queue", - []string{"queueName"}, + []string{"queueName", "queue"}, nil, ) var QueueDistinctSchedulingKeysDesc = prometheus.NewDesc( MetricPrefix+"queue_distinct_scheduling_keys", "Number of distinct scheduling keys requested by a queue", - []string{"queueName"}, - nil, -) - -var QueuePriorityDesc = prometheus.NewDesc( - MetricPrefix+"queue_priority", - "Priority of a queue", - []string{"pool", "queueName"}, + []string{"queueName", "queue"}, nil, ) var QueueResourcesDesc = prometheus.NewDesc( MetricPrefix+"queue_resource_queued", "Resource required by queued jobs", - []string{"pool", "priorityClass", "queueName", "resourceType"}, + []string{"pool", "priorityClass", "queueName", "queue", "resourceType"}, nil, ) var MinQueueResourcesDesc = prometheus.NewDesc( MetricPrefix+"queue_resource_queued_min", "Min resource required by queued job", - []string{"pool", "priorityClass", "queueName", "resourceType"}, + []string{"pool", "priorityClass", "queueName", "queue", "resourceType"}, nil, ) var MaxQueueResourcesDesc = prometheus.NewDesc( MetricPrefix+"queue_resource_queued_max", "Max resource required by queued job", - []string{"pool", "priorityClass", "queueName", "resourceType"}, + []string{"pool", "priorityClass", "queueName", "queue", "resourceType"}, nil, ) var MedianQueueResourcesDesc = prometheus.NewDesc( MetricPrefix+"queue_resource_queued_median", "Median resource required by queued jobs", - []string{"pool", "priorityClass", "queueName", "resourceType"}, + []string{"pool", "priorityClass", "queueName", "queue", "resourceType"}, nil, ) var CountQueueResourcesDesc = prometheus.NewDesc( MetricPrefix+"queue_resource_queued_count", "Count of queued jobs requiring resource", - []string{"pool", "priorityClass", "queueName", "resourceType"}, + []string{"pool", "priorityClass", "queueName", "queue", "resourceType"}, nil, ) var MinQueueDurationDesc = prometheus.NewDesc( MetricPrefix+"job_queued_seconds_min", "Min queue time for Armada jobs", - []string{"pool", "priorityClass", "queueName"}, + []string{"pool", "priorityClass", "queueName", "queue"}, nil, ) var MaxQueueDurationDesc = prometheus.NewDesc( MetricPrefix+"job_queued_seconds_max", "Max queue time for Armada jobs", - []string{"pool", "priorityClass", "queueName"}, + []string{"pool", "priorityClass", "queueName", "queue"}, nil, ) var MedianQueueDurationDesc = prometheus.NewDesc( MetricPrefix+"job_queued_seconds_median", "Median queue time for Armada jobs", - []string{"pool", "priorityClass", "queueName"}, + []string{"pool", "priorityClass", "queueName", "queue"}, nil, ) var QueueDurationDesc = prometheus.NewDesc( MetricPrefix+"job_queued_seconds", "Queued time for Armada jobs", - []string{"pool", "priorityClass", "queueName"}, + []string{"pool", "priorityClass", "queueName", "queue"}, nil, ) var MinJobRunDurationDesc = prometheus.NewDesc( MetricPrefix+"job_run_time_seconds_min", "Min run time for Armada jobs", - []string{"pool", "priorityClass", "queueName"}, + []string{"pool", "priorityClass", "queueName", "queue"}, nil, ) var MaxJobRunDurationDesc = prometheus.NewDesc( MetricPrefix+"job_run_time_seconds_max", "Max run time for Armada jobs", - []string{"pool", "priorityClass", "queueName"}, + []string{"pool", "priorityClass", "queueName", "queue"}, nil, ) var MedianJobRunDurationDesc = prometheus.NewDesc( MetricPrefix+"job_run_time_seconds_median", "Median run time for Armada jobs", - []string{"pool", "priorityClass", "queueName"}, + []string{"pool", "priorityClass", "queueName", "queue"}, nil, ) var JobRunDurationDesc = prometheus.NewDesc( MetricPrefix+"job_run_time_seconds", "Run time for Armada jobs", - []string{"pool", "priorityClass", "queueName"}, + []string{"pool", "priorityClass", "queueName", "queue"}, nil, ) var QueueAllocatedDesc = prometheus.NewDesc( MetricPrefix+"queue_resource_allocated", "Resource allocated to running jobs of a queue", - []string{"cluster", "pool", "priorityClass", "queueName", "resourceType", "nodeType"}, + []string{"cluster", "pool", "priorityClass", "queueName", "queue", "resourceType", "nodeType"}, nil, ) var MinQueueAllocatedDesc = prometheus.NewDesc( MetricPrefix+"queue_resource_allocated_min", "Min resource allocated by a running job", - []string{"pool", "priorityClass", "queueName", "resourceType"}, + []string{"pool", "priorityClass", "queueName", "queue", "resourceType"}, nil, ) var MaxQueueAllocatedDesc = prometheus.NewDesc( MetricPrefix+"queue_resource_allocated_max", "Max resource allocated by a running job", - []string{"pool", "priorityClass", "queueName", "resourceType"}, + []string{"pool", "priorityClass", "queueName", "queue", "resourceType"}, nil, ) var MedianQueueAllocatedDesc = prometheus.NewDesc( MetricPrefix+"queue_resource_allocated_median", "Median resource allocated by a running job", - []string{"pool", "priorityClass", "queueName", "resourceType"}, + []string{"pool", "priorityClass", "queueName", "queue", "resourceType"}, nil, ) var QueueUsedDesc = prometheus.NewDesc( MetricPrefix+"queue_resource_used", "Resource actually being used by running jobs of a queue", - []string{"cluster", "pool", "queueName", "resourceType", "nodeType"}, + []string{"cluster", "pool", "queueName", "queue", "resourceType", "nodeType"}, nil, ) var QueueLeasedPodCountDesc = prometheus.NewDesc( MetricPrefix+"queue_leased_pod_count", "Number of leased pods", - []string{"cluster", "pool", "queueName", "phase", "nodeType"}, + []string{"cluster", "pool", "queueName", "queue", "phase", "nodeType"}, nil, ) @@ -176,6 +169,13 @@ var ClusterAvailableCapacityDesc = prometheus.NewDesc( nil, ) +var QueuePriorityDesc = prometheus.NewDesc( + MetricPrefix+"queue_priority", + "Queue priority factor", + []string{"queueName", "queue"}, + nil, +) + var AllDescs = []*prometheus.Desc{ QueueSizeDesc, QueuePriorityDesc, @@ -201,6 +201,7 @@ var AllDescs = []*prometheus.Desc{ QueueLeasedPodCountDesc, ClusterCapacityDesc, ClusterAvailableCapacityDesc, + QueuePriorityDesc, } func Describe(out chan<- *prometheus.Desc) { @@ -264,83 +265,86 @@ func CollectQueueMetrics(queueCounts map[string]int, queueDistinctSchedulingKeyC } } } + for q, priority := range metricsProvider.GetQueuePriorites() { + metrics = append(metrics, NewQueuePriorityMetric(priority, q)) + } return metrics } func NewQueueSizeMetric(value int, queue string) prometheus.Metric { - return prometheus.MustNewConstMetric(QueueSizeDesc, prometheus.GaugeValue, float64(value), queue) + return prometheus.MustNewConstMetric(QueueSizeDesc, prometheus.GaugeValue, float64(value), queue, queue) } func NewQueueDistinctSchedulingKeyMetric(value int, queue string) prometheus.Metric { - return prometheus.MustNewConstMetric(QueueDistinctSchedulingKeysDesc, prometheus.GaugeValue, float64(value), queue) + return prometheus.MustNewConstMetric(QueueDistinctSchedulingKeysDesc, prometheus.GaugeValue, float64(value), queue, queue) } func NewQueueDuration(count uint64, sum float64, buckets map[float64]uint64, pool string, priorityClass string, queue string) prometheus.Metric { - return prometheus.MustNewConstHistogram(QueueDurationDesc, count, sum, buckets, pool, priorityClass, queue) + return prometheus.MustNewConstHistogram(QueueDurationDesc, count, sum, buckets, pool, priorityClass, queue, queue) } func NewQueueResources(value float64, pool string, priorityClass string, queue string, resource string) prometheus.Metric { - return prometheus.MustNewConstMetric(QueueResourcesDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, resource) + return prometheus.MustNewConstMetric(QueueResourcesDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue, resource) } func NewMaxQueueResources(value float64, pool string, priorityClass string, queue string, resource string) prometheus.Metric { - return prometheus.MustNewConstMetric(MaxQueueResourcesDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, resource) + return prometheus.MustNewConstMetric(MaxQueueResourcesDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue, resource) } func NewMinQueueResources(value float64, pool string, priorityClass string, queue string, resource string) prometheus.Metric { - return prometheus.MustNewConstMetric(MinQueueResourcesDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, resource) + return prometheus.MustNewConstMetric(MinQueueResourcesDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue, resource) } func NewMedianQueueResources(value float64, pool string, priorityClass string, queue string, resource string) prometheus.Metric { - return prometheus.MustNewConstMetric(MedianQueueResourcesDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, resource) + return prometheus.MustNewConstMetric(MedianQueueResourcesDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue, resource) } func NewCountQueueResources(value uint64, pool string, priorityClass string, queue string, resource string) prometheus.Metric { - return prometheus.MustNewConstMetric(CountQueueResourcesDesc, prometheus.GaugeValue, float64(value), pool, priorityClass, queue, resource) + return prometheus.MustNewConstMetric(CountQueueResourcesDesc, prometheus.GaugeValue, float64(value), pool, priorityClass, queue, queue, resource) } func NewMinQueueDuration(value float64, pool string, priorityClass string, queue string) prometheus.Metric { - return prometheus.MustNewConstMetric(MinQueueDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue) + return prometheus.MustNewConstMetric(MinQueueDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue) } func NewMaxQueueDuration(value float64, pool string, priorityClass string, queue string) prometheus.Metric { - return prometheus.MustNewConstMetric(MaxQueueDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue) + return prometheus.MustNewConstMetric(MaxQueueDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue) } func NewMedianQueueDuration(value float64, pool string, priorityClass string, queue string) prometheus.Metric { - return prometheus.MustNewConstMetric(MedianQueueDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue) + return prometheus.MustNewConstMetric(MedianQueueDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue) } func NewMinJobRunDuration(value float64, pool string, priorityClass string, queue string) prometheus.Metric { - return prometheus.MustNewConstMetric(MinJobRunDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue) + return prometheus.MustNewConstMetric(MinJobRunDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue) } func NewMaxJobRunDuration(value float64, pool string, priorityClass string, queue string) prometheus.Metric { - return prometheus.MustNewConstMetric(MaxJobRunDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue) + return prometheus.MustNewConstMetric(MaxJobRunDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue) } func NewMedianJobRunDuration(value float64, pool string, priorityClass string, queue string) prometheus.Metric { - return prometheus.MustNewConstMetric(MedianJobRunDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue) + return prometheus.MustNewConstMetric(MedianJobRunDurationDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue) } func NewJobRunRunDuration(count uint64, sum float64, buckets map[float64]uint64, pool string, priorityClass string, queue string) prometheus.Metric { - return prometheus.MustNewConstHistogram(JobRunDurationDesc, count, sum, buckets, pool, priorityClass, queue) + return prometheus.MustNewConstHistogram(JobRunDurationDesc, count, sum, buckets, pool, priorityClass, queue, queue) } func NewMinQueueAllocated(value float64, pool string, priorityClass string, queue string, resource string) prometheus.Metric { - return prometheus.MustNewConstMetric(MinQueueAllocatedDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, resource) + return prometheus.MustNewConstMetric(MinQueueAllocatedDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue, resource) } func NewMaxQueueAllocated(value float64, pool string, priorityClass string, queue string, resource string) prometheus.Metric { - return prometheus.MustNewConstMetric(MaxQueueAllocatedDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, resource) + return prometheus.MustNewConstMetric(MaxQueueAllocatedDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue, resource) } func NewMedianQueueAllocated(value float64, pool string, priorityClass string, queue string, resource string) prometheus.Metric { - return prometheus.MustNewConstMetric(MedianQueueAllocatedDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, resource) + return prometheus.MustNewConstMetric(MedianQueueAllocatedDesc, prometheus.GaugeValue, value, pool, priorityClass, queue, queue, resource) } func NewQueueLeasedPodCount(value float64, cluster string, pool string, queue string, phase string, nodeType string) prometheus.Metric { - return prometheus.MustNewConstMetric(QueueLeasedPodCountDesc, prometheus.GaugeValue, value, cluster, pool, queue, phase, nodeType) + return prometheus.MustNewConstMetric(QueueLeasedPodCountDesc, prometheus.GaugeValue, value, cluster, pool, queue, queue, phase, nodeType) } func NewClusterAvailableCapacity(value float64, cluster string, pool string, resource string, nodeType string) prometheus.Metric { @@ -352,9 +356,13 @@ func NewClusterTotalCapacity(value float64, cluster string, pool string, resourc } func NewQueueAllocated(value float64, queue string, cluster string, pool string, priorityClass string, resource string, nodeType string) prometheus.Metric { - return prometheus.MustNewConstMetric(QueueAllocatedDesc, prometheus.GaugeValue, value, cluster, pool, priorityClass, queue, resource, nodeType) + return prometheus.MustNewConstMetric(QueueAllocatedDesc, prometheus.GaugeValue, value, cluster, pool, priorityClass, queue, queue, resource, nodeType) } func NewQueueUsed(value float64, queue string, cluster string, pool string, resource string, nodeType string) prometheus.Metric { - return prometheus.MustNewConstMetric(QueueUsedDesc, prometheus.GaugeValue, value, cluster, pool, queue, resource, nodeType) + return prometheus.MustNewConstMetric(QueueUsedDesc, prometheus.GaugeValue, value, cluster, pool, queue, queue, resource, nodeType) +} + +func NewQueuePriorityMetric(value float64, queue string) prometheus.Metric { + return prometheus.MustNewConstMetric(QueuePriorityDesc, prometheus.GaugeValue, value, queue, queue) } diff --git a/internal/scheduler/adapters/adapters.go b/internal/scheduler/adapters/adapters.go index 1115d88869d..19b314c1413 100644 --- a/internal/scheduler/adapters/adapters.go +++ b/internal/scheduler/adapters/adapters.go @@ -4,6 +4,7 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" + k8sResource "k8s.io/apimachinery/pkg/api/resource" "github.com/armadaproject/armada/internal/common/logging" "github.com/armadaproject/armada/internal/common/types" @@ -62,3 +63,14 @@ func PriorityFromPodSpec(podSpec *v1.PodSpec, priorityClasses map[string]types.P // Couldn't find anything return 0, false } + +func K8sResourceListToMap(resources v1.ResourceList) map[string]k8sResource.Quantity { + if resources == nil { + return nil + } + result := make(map[string]k8sResource.Quantity, len(resources)) + for k, v := range resources { + result[string(k)] = v + } + return result +} diff --git a/internal/scheduler/adapters/adapters_test.go b/internal/scheduler/adapters/adapters_test.go index 6ab18152df8..520892a50d2 100644 --- a/internal/scheduler/adapters/adapters_test.go +++ b/internal/scheduler/adapters/adapters_test.go @@ -205,3 +205,20 @@ func TestPriorityFromPodSpec(t *testing.T) { }) } } + +func TestK8sResourceListToMap(t *testing.T) { + result := K8sResourceListToMap(v1.ResourceList{ + "one": resource.MustParse("1"), + "two": resource.MustParse("2"), + }) + expected := map[string]resource.Quantity{ + "one": resource.MustParse("1"), + "two": resource.MustParse("2"), + } + + assert.Equal(t, expected, result) +} + +func TestK8sResourceListToMap_PreservesNil(t *testing.T) { + assert.Nil(t, K8sResourceListToMap(nil)) +} diff --git a/internal/scheduler/api.go b/internal/scheduler/api.go index b493873908b..8094c6a5ce3 100644 --- a/internal/scheduler/api.go +++ b/internal/scheduler/api.go @@ -17,6 +17,7 @@ import ( "github.com/armadaproject/armada/internal/common/armadacontext" "github.com/armadaproject/armada/internal/common/compress" "github.com/armadaproject/armada/internal/common/logging" + "github.com/armadaproject/armada/internal/common/maps" "github.com/armadaproject/armada/internal/common/pulsarutils" "github.com/armadaproject/armada/internal/common/slices" priorityTypes "github.com/armadaproject/armada/internal/common/types" @@ -40,6 +41,9 @@ type ExecutorApi struct { allowedPriorities []int32 // Known priority classes priorityClasses map[string]priorityTypes.PriorityClass + // Allowed resource names - resource requests/limits not on this list are dropped. + // This is needed to ensure floating resources are not passed to k8s. + allowedResources map[string]bool // Max number of events in published Pulsar messages maxEventsPerPulsarMessage int // Max size of Pulsar messages produced. @@ -55,6 +59,7 @@ func NewExecutorApi(producer pulsar.Producer, jobRepository database.JobRepository, executorRepository database.ExecutorRepository, allowedPriorities []int32, + allowedResources []string, nodeIdLabel string, priorityClassNameOverride *string, priorityClasses map[string]priorityTypes.PriorityClass, @@ -69,6 +74,7 @@ func NewExecutorApi(producer pulsar.Producer, jobRepository: jobRepository, executorRepository: executorRepository, allowedPriorities: allowedPriorities, + allowedResources: maps.FromSlice(allowedResources, func(name string) string { return name }, func(name string) bool { return true }), maxEventsPerPulsarMessage: maxEventsPerPulsarMessage, maxPulsarMessageSizeBytes: maxPulsarMessageSizeBytes, nodeIdLabel: nodeIdLabel, @@ -109,7 +115,7 @@ func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRuns return err } ctx.Infof( - "executor currently has %d job runs; sending %d cancellations and %d new runs", + "Executor currently has %d job runs; sending %d cancellations and %d new runs", len(requestRuns), len(runsToCancel), len(newRuns), ) @@ -149,6 +155,8 @@ func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRuns srv.addPreemptibleLabel(submitMsg) + srv.dropDisallowedResources(submitMsg.MainObject.GetPodSpec().PodSpec) + // This must happen after anything that relies on the priorityClassName if srv.priorityClassNameOverride != nil { srv.setPriorityClassName(submitMsg, *srv.priorityClassNameOverride) @@ -216,6 +224,29 @@ func (srv *ExecutorApi) addPreemptibleLabel(job *armadaevents.SubmitJob) { addLabels(job, labels) } +// Drop non-supported resources. This is needed to ensure floating resources +// are not passed to k8s. +func (srv *ExecutorApi) dropDisallowedResources(pod *v1.PodSpec) { + if pod == nil { + return + } + srv.dropDisallowedResourcesFromContainers(pod.InitContainers) + srv.dropDisallowedResourcesFromContainers(pod.Containers) +} + +func (srv *ExecutorApi) dropDisallowedResourcesFromContainers(containers []v1.Container) { + for _, container := range containers { + removeDisallowedKeys(container.Resources.Limits, srv.allowedResources) + removeDisallowedKeys(container.Resources.Requests, srv.allowedResources) + } +} + +func removeDisallowedKeys(rl v1.ResourceList, allowedKeys map[string]bool) { + maps.RemoveInPlace(rl, func(name v1.ResourceName) bool { + return !allowedKeys[string(name)] + }) +} + func (srv *ExecutorApi) isPreemptible(job *armadaevents.SubmitJob) bool { priorityClassName := "" diff --git a/internal/scheduler/api_test.go b/internal/scheduler/api_test.go index 4b080235a52..27f48c0d925 100644 --- a/internal/scheduler/api_test.go +++ b/internal/scheduler/api_test.go @@ -19,10 +19,13 @@ import ( "github.com/armadaproject/armada/internal/common/mocks" protoutil "github.com/armadaproject/armada/internal/common/proto" "github.com/armadaproject/armada/internal/common/pulsarutils" + "github.com/armadaproject/armada/internal/common/slices" "github.com/armadaproject/armada/internal/common/types" + schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration" "github.com/armadaproject/armada/internal/scheduler/database" schedulermocks "github.com/armadaproject/armada/internal/scheduler/mocks" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" + "github.com/armadaproject/armada/internal/scheduler/testfixtures" "github.com/armadaproject/armada/pkg/api" "github.com/armadaproject/armada/pkg/armadaevents" "github.com/armadaproject/armada/pkg/executorapi" @@ -321,6 +324,7 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) { mockJobRepository, mockExecutorRepository, []int32{1000, 2000}, + testResourceNames(), "kubernetes.io/hostname", nil, priorityClasses, @@ -448,6 +452,7 @@ func TestExecutorApi_Publish(t *testing.T) { mockJobRepository, mockExecutorRepository, []int32{1000, 2000}, + testResourceNames(), "kubernetes.io/hostname", nil, priorityClasses, @@ -495,3 +500,7 @@ func groups(t *testing.T) ([]string, []byte) { require.NoError(t, err) return groups, compressed } + +func testResourceNames() []string { + return slices.Map(testfixtures.GetTestSupportedResourceTypes(), func(rt schedulerconfig.ResourceType) string { return rt.Name }) +} diff --git a/internal/scheduler/configuration/configuration.go b/internal/scheduler/configuration/configuration.go index 54800d0e7d9..8ec05a725fb 100644 --- a/internal/scheduler/configuration/configuration.go +++ b/internal/scheduler/configuration/configuration.go @@ -112,6 +112,19 @@ type LeaderConfig struct { LeaderConnection client.ApiConnectionDetails } +type FloatingResourceConfig struct { + // Resource name, e.g. "s3-connections" + Name string + // Per-pool config. + Pools []FloatingResourcePoolConfig +} + +type FloatingResourcePoolConfig struct { + // Name of the pool. + Name string + // Amount of this resource that can be allocated across all jobs in this pool. + Quantity resource.Quantity +} type HttpConfig struct { Port int `validate:"required"` } @@ -230,6 +243,13 @@ type SchedulingConfig struct { // // If not set, all taints are indexed. IndexedTaints []string + // Experimental - subject to change + // Resources that are outside of k8s, and not tied to a given k8s node or cluster. + // For example connections to an S3 server that sits outside of k8s could be rationed to limit load on the server. + // These can be requested like a normal k8s resource. Note there is no mechanism in armada + // to enforce actual usage, it relies on honesty. For example, there is nothing to stop a badly-behaved job + // requesting 2 S3 server connections and then opening 10. + ExperimentalFloatingResources []FloatingResourceConfig // WellKnownNodeTypes defines a set of well-known node types used to define "home" and "away" nodes for a given priority class. WellKnownNodeTypes []WellKnownNodeType `validate:"dive"` // Executor that haven't heartbeated in this time period are considered stale. diff --git a/internal/scheduler/context/context.go b/internal/scheduler/context/context.go index 9cf10b02fdf..b4d4f88603b 100644 --- a/internal/scheduler/context/context.go +++ b/internal/scheduler/context/context.go @@ -51,8 +51,10 @@ type SchedulingContext struct { WeightSum float64 // Per-queue scheduling contexts. QueueSchedulingContexts map[string]*QueueSchedulingContext - // Total resources across all clusters available at the start of the scheduling cycle. + // Total resources across all clusters in this pool available at the start of the scheduling cycle. TotalResources schedulerobjects.ResourceList + // Allocated resources across all clusters in this pool + Allocated schedulerobjects.ResourceList // Resources assigned across all queues during this scheduling cycle. ScheduledResources schedulerobjects.ResourceList ScheduledResourcesByPriorityClass schedulerobjects.QuantityByTAndResourceType[string] @@ -130,6 +132,8 @@ func (sctx *SchedulingContext) AddQueueSchedulingContext( allocated.Add(rl) } sctx.WeightSum += weight + sctx.Allocated.Add(allocated) + qctx := &QueueSchedulingContext{ SchedulingContext: sctx, Created: time.Now(), @@ -307,7 +311,7 @@ func (sctx *SchedulingContext) AddJobSchedulingContext(jctx *JobSchedulingContex if !ok { return false, errors.Errorf("failed adding job %s to scheduling context: no context for queue %s", jctx.JobId, jctx.Job.Queue()) } - evictedInThisRound, err := qctx.AddJobSchedulingContext(jctx) + evictedInThisRound, err := qctx.addJobSchedulingContext(jctx) if err != nil { return false, err } @@ -321,6 +325,7 @@ func (sctx *SchedulingContext) AddJobSchedulingContext(jctx *JobSchedulingContex sctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.PriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests) sctx.NumScheduledJobs++ } + sctx.Allocated.AddV1ResourceList(jctx.PodRequirements.ResourceRequirements.Requests) } return evictedInThisRound, nil } @@ -345,7 +350,7 @@ func (sctx *SchedulingContext) EvictJob(job *jobdb.Job) (bool, error) { if !ok { return false, errors.Errorf("failed evicting job %s from scheduling context: no context for queue %s", job.Id(), job.Queue()) } - scheduledInThisRound, err := qctx.EvictJob(job) + scheduledInThisRound, err := qctx.evictJob(job) if err != nil { return false, err } @@ -359,6 +364,7 @@ func (sctx *SchedulingContext) EvictJob(job *jobdb.Job) (bool, error) { sctx.EvictedResourcesByPriorityClass.AddV1ResourceList(job.PriorityClassName(), rl) sctx.NumEvictedJobs++ } + sctx.Allocated.SubV1ResourceList(rl) return scheduledInThisRound, nil } @@ -526,9 +532,9 @@ func (qctx *QueueSchedulingContext) ReportString(verbosity int32) string { return sb.String() } -// AddJobSchedulingContext adds a job scheduling context. +// addJobSchedulingContext adds a job scheduling context. // Automatically updates scheduled resources. -func (qctx *QueueSchedulingContext) AddJobSchedulingContext(jctx *JobSchedulingContext) (bool, error) { +func (qctx *QueueSchedulingContext) addJobSchedulingContext(jctx *JobSchedulingContext) (bool, error) { if _, ok := qctx.SuccessfulJobSchedulingContexts[jctx.JobId]; ok { return false, errors.Errorf("failed adding job %s to queue: job already marked successful", jctx.JobId) } @@ -561,7 +567,7 @@ func (qctx *QueueSchedulingContext) AddJobSchedulingContext(jctx *JobSchedulingC return evictedInThisRound, nil } -func (qctx *QueueSchedulingContext) EvictJob(job *jobdb.Job) (bool, error) { +func (qctx *QueueSchedulingContext) evictJob(job *jobdb.Job) (bool, error) { jobId := job.Id() if _, ok := qctx.UnsuccessfulJobSchedulingContexts[jobId]; ok { return false, errors.Errorf("failed evicting job %s from queue: job already marked unsuccessful", jobId) @@ -597,28 +603,34 @@ type GangSchedulingContext struct { Created time.Time Queue string GangInfo - JobSchedulingContexts []*JobSchedulingContext - TotalResourceRequests schedulerobjects.ResourceList - AllJobsEvicted bool + JobSchedulingContexts []*JobSchedulingContext + TotalResourceRequests schedulerobjects.ResourceList + AllJobsEvicted bool + RequestsFloatingResources bool } func NewGangSchedulingContext(jctxs []*JobSchedulingContext) *GangSchedulingContext { allJobsEvicted := true totalResourceRequests := schedulerobjects.NewResourceList(4) + requestsFloatingResources := false for _, jctx := range jctxs { allJobsEvicted = allJobsEvicted && jctx.IsEvicted totalResourceRequests.AddV1ResourceList(jctx.PodRequirements.ResourceRequirements.Requests) + if jctx.Job.RequestsFloatingResources() { + requestsFloatingResources = true + } } // Uniformity of the values that we pick off the first job in the gang was // checked when the jobs were submitted (e.g., in ValidateApiJobs). representative := jctxs[0] return &GangSchedulingContext{ - Created: time.Now(), - Queue: representative.Job.Queue(), - GangInfo: representative.GangInfo, - JobSchedulingContexts: jctxs, - TotalResourceRequests: totalResourceRequests, - AllJobsEvicted: allJobsEvicted, + Created: time.Now(), + Queue: representative.Job.Queue(), + GangInfo: representative.GangInfo, + JobSchedulingContexts: jctxs, + TotalResourceRequests: totalResourceRequests, + AllJobsEvicted: allJobsEvicted, + RequestsFloatingResources: requestsFloatingResources, } } diff --git a/internal/scheduler/floatingresources/floating_resource_types.go b/internal/scheduler/floatingresources/floating_resource_types.go new file mode 100644 index 00000000000..a2d15e00e01 --- /dev/null +++ b/internal/scheduler/floatingresources/floating_resource_types.go @@ -0,0 +1,118 @@ +package floatingresources + +import ( + "fmt" + "slices" + "strings" + + "k8s.io/apimachinery/pkg/api/resource" + + "github.com/armadaproject/armada/internal/common/maps" + "github.com/armadaproject/armada/internal/scheduler/configuration" + "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" +) + +type FloatingResourceTypes struct { + zeroFloatingResources schedulerobjects.ResourceList + pools map[string]*floatingResourcePool +} + +type floatingResourcePool struct { + totalResources schedulerobjects.ResourceList +} + +func NewFloatingResourceTypes(config []configuration.FloatingResourceConfig) (*FloatingResourceTypes, error) { + zeroFloatingResources := schedulerobjects.ResourceList{Resources: make(map[string]resource.Quantity, len(config))} + for _, c := range config { + if _, exists := zeroFloatingResources.Resources[c.Name]; exists { + return nil, fmt.Errorf("duplicate floating resource %s", c.Name) + } + zeroFloatingResources.Resources[c.Name] = resource.Quantity{} + } + + pools := map[string]*floatingResourcePool{} + for _, fr := range config { + for _, poolConfig := range fr.Pools { + pool, exists := pools[poolConfig.Name] + if !exists { + pool = &floatingResourcePool{ + totalResources: zeroFloatingResources.DeepCopy(), + } + pools[poolConfig.Name] = pool + } + existing := pool.totalResources.Resources[fr.Name] + if existing.Cmp(resource.Quantity{}) != 0 { + return nil, fmt.Errorf("duplicate floating resource %s for pool %s", fr.Name, poolConfig.Name) + } + pool.totalResources.Resources[fr.Name] = poolConfig.Quantity.DeepCopy() + } + } + + return &FloatingResourceTypes{ + zeroFloatingResources: zeroFloatingResources, + pools: pools, + }, nil +} + +func (frt *FloatingResourceTypes) WithinLimits(poolName string, allocated schedulerobjects.ResourceList) (bool, string) { + pool, exists := frt.pools[poolName] + if !exists { + return false, fmt.Sprintf("floating resources not connfigured for pool %s", poolName) + } + rl := pool.totalResources.DeepCopy() + rl.Sub(allocated) + for resourceName, quantity := range rl.Resources { + if !frt.isFloatingResource(resourceName) { + continue + } + if quantity.Cmp(resource.Quantity{}) == -1 { + return false, fmt.Sprintf("not enough floating resource %s in pool %s", resourceName, poolName) + } + } + return true, "" +} + +func (frt *FloatingResourceTypes) RemoveFloatingResources(allResources map[string]resource.Quantity) map[string]resource.Quantity { + result := make(map[string]resource.Quantity) + for k, v := range allResources { + if !frt.isFloatingResource(k) { + result[k] = v + } + } + return result +} + +func (frt *FloatingResourceTypes) HasFloatingResources(resources map[string]resource.Quantity) bool { + for resourceName, quantity := range resources { + if frt.isFloatingResource(resourceName) && quantity.Cmp(resource.Quantity{}) == 1 { + return true + } + } + return false +} + +func (frt *FloatingResourceTypes) AllPools() []string { + result := maps.Keys(frt.pools) + slices.Sort(result) + return result +} + +func (frt *FloatingResourceTypes) GetTotalAvailableForPool(poolName string) schedulerobjects.ResourceList { + pool, exists := frt.pools[poolName] + if !exists { + return frt.zeroFloatingResources.DeepCopy() + } + return pool.totalResources.DeepCopy() +} + +func (frt *FloatingResourceTypes) SummaryString() string { + if len(frt.zeroFloatingResources.Resources) == 0 { + return "none" + } + return strings.Join(maps.Keys(frt.zeroFloatingResources.Resources), " ") +} + +func (frt *FloatingResourceTypes) isFloatingResource(resourceName string) bool { + _, exists := frt.zeroFloatingResources.Resources[resourceName] + return exists +} diff --git a/internal/scheduler/floatingresources/floating_resource_types_test.go b/internal/scheduler/floatingresources/floating_resource_types_test.go new file mode 100644 index 00000000000..938d4daf775 --- /dev/null +++ b/internal/scheduler/floatingresources/floating_resource_types_test.go @@ -0,0 +1,130 @@ +package floatingresources + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/api/resource" + + "github.com/armadaproject/armada/internal/common/maps" + "github.com/armadaproject/armada/internal/scheduler/configuration" + "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" +) + +func TestHasFloatingResources(t *testing.T) { + sut := makeSut(t) + assert.False(t, sut.HasFloatingResources(map[string]resource.Quantity{})) + assert.False(t, sut.HasFloatingResources(map[string]resource.Quantity{"some-other-resource": resource.MustParse("10")})) + assert.False(t, sut.HasFloatingResources(map[string]resource.Quantity{"floating-resource-1": resource.MustParse("0")})) + assert.True(t, sut.HasFloatingResources(map[string]resource.Quantity{"floating-resource-1": resource.MustParse("10")})) + assert.True(t, sut.HasFloatingResources(map[string]resource.Quantity{"some-other-resource": resource.MustParse("10"), "floating-resource-1": resource.MustParse("10")})) +} + +func TestAllPools(t *testing.T) { + sut := makeSut(t) + assert.Equal(t, []string{"cpu", "gpu"}, sut.AllPools()) +} + +func TestRemoveFloatingResources(t *testing.T) { + sut := makeSut(t) + input := map[string]resource.Quantity{"floating-resource-1": resource.MustParse("200"), "some-other-resource": resource.MustParse("300")} + inputBefore := maps.DeepCopy(input) + result := sut.RemoveFloatingResources(input) + assert.Equal(t, map[string]resource.Quantity{"some-other-resource": resource.MustParse("300")}, result) + assert.Equal(t, inputBefore, input) +} + +func TestGetTotalAvailableForPool(t *testing.T) { + sut := makeSut(t) + zero := resource.Quantity{} + assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": resource.MustParse("200"), "floating-resource-2": resource.MustParse("300")}, sut.GetTotalAvailableForPool("cpu").Resources) + assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": resource.MustParse("100"), "floating-resource-2": zero}, sut.GetTotalAvailableForPool("gpu").Resources) + assert.Equal(t, map[string]resource.Quantity{"floating-resource-1": zero, "floating-resource-2": zero}, sut.GetTotalAvailableForPool("some-other-pool").Resources) +} + +func TestWithinLimits_WhenWithinLimits_ReturnsTrue(t *testing.T) { + sut := makeSut(t) + withinLimits, errorMessage := sut.WithinLimits("cpu", + schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"floating-resource-1": resource.MustParse("199")}}, + ) + assert.True(t, withinLimits) + assert.Empty(t, errorMessage) +} + +func TestWithinLimits_WhenAtLimit_ReturnsTrue(t *testing.T) { + sut := makeSut(t) + withinLimits, errorMessage := sut.WithinLimits("cpu", + schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"floating-resource-1": resource.MustParse("200")}}, + ) + assert.True(t, withinLimits) + assert.Empty(t, errorMessage) +} + +func TestWithinLimits_WhenExceedsLimit_ReturnsFalse(t *testing.T) { + sut := makeSut(t) + withinLimits, errorMessage := sut.WithinLimits("cpu", + schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"floating-resource-1": resource.MustParse("201")}}, + ) + assert.False(t, withinLimits) + assert.NotEmpty(t, errorMessage) +} + +func TestWithinLimits_IgnoresNonFloatingResources(t *testing.T) { + sut := makeSut(t) + withinLimits, errorMessage := sut.WithinLimits("cpu", + schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"some-other-resource": resource.MustParse("1000")}}, + ) + assert.True(t, withinLimits) + assert.Empty(t, errorMessage) +} + +func TestWithinLimits_WhenResourceNotSpecifiedForAPool_ReturnsFalse(t *testing.T) { + sut := makeSut(t) + withinLimits, errorMessage := sut.WithinLimits("gpu", + schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"floating-resource-2": resource.MustParse("1")}}, + ) + assert.False(t, withinLimits) + assert.NotEmpty(t, errorMessage) +} + +func TestWithinLimits_WhenPoolDoesNotExist_ReturnsFalse(t *testing.T) { + sut := makeSut(t) + withinLimits, errorMessage := sut.WithinLimits("some-other-pool", + schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"floating-resource-1": resource.MustParse("1")}}, + ) + assert.False(t, withinLimits) + assert.NotEmpty(t, errorMessage) +} + +func testConfig() []configuration.FloatingResourceConfig { + return []configuration.FloatingResourceConfig{ + { + Name: "floating-resource-1", + Pools: []configuration.FloatingResourcePoolConfig{ + { + Name: "cpu", + Quantity: resource.MustParse("200"), + }, + { + Name: "gpu", + Quantity: resource.MustParse("100"), + }, + }, + }, + { + Name: "floating-resource-2", + Pools: []configuration.FloatingResourcePoolConfig{ + { + Name: "cpu", + Quantity: resource.MustParse("300"), + }, + }, + }, + } +} + +func makeSut(t *testing.T) *FloatingResourceTypes { + sut, err := NewFloatingResourceTypes(testConfig()) + assert.Nil(t, err) + return sut +} diff --git a/internal/scheduler/gang_scheduler.go b/internal/scheduler/gang_scheduler.go index 5fb28d49fbe..6b611704b24 100644 --- a/internal/scheduler/gang_scheduler.go +++ b/internal/scheduler/gang_scheduler.go @@ -10,6 +10,7 @@ import ( "github.com/armadaproject/armada/internal/common/slices" schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/constraints" schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" + "github.com/armadaproject/armada/internal/scheduler/floatingresources" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/nodedb" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" @@ -17,9 +18,10 @@ import ( // GangScheduler schedules one gang at a time. GangScheduler is not aware of queues. type GangScheduler struct { - constraints schedulerconstraints.SchedulingConstraints - schedulingContext *schedulercontext.SchedulingContext - nodeDb *nodedb.NodeDb + constraints schedulerconstraints.SchedulingConstraints + floatingResourceTypes *floatingresources.FloatingResourceTypes + schedulingContext *schedulercontext.SchedulingContext + nodeDb *nodedb.NodeDb // If true, the unsuccessfulSchedulingKeys check is omitted. skipUnsuccessfulSchedulingKeyCheck bool } @@ -27,12 +29,14 @@ type GangScheduler struct { func NewGangScheduler( sctx *schedulercontext.SchedulingContext, constraints schedulerconstraints.SchedulingConstraints, + floatingResourceTypes *floatingresources.FloatingResourceTypes, nodeDb *nodedb.NodeDb, ) (*GangScheduler, error) { return &GangScheduler{ - constraints: constraints, - schedulingContext: sctx, - nodeDb: nodeDb, + constraints: constraints, + floatingResourceTypes: floatingResourceTypes, + schedulingContext: sctx, + nodeDb: nodeDb, }, nil } @@ -138,6 +142,13 @@ func (sch *GangScheduler) Schedule(ctx *armadacontext.Context, gctx *schedulerco return } } + + if gctx.RequestsFloatingResources { + if ok, unschedulableReason = sch.floatingResourceTypes.WithinLimits(sch.schedulingContext.Pool, sch.schedulingContext.Allocated); !ok { + return + } + } + return sch.trySchedule(ctx, gctx) } diff --git a/internal/scheduler/gang_scheduler_test.go b/internal/scheduler/gang_scheduler_test.go index 09ce9fc04dd..fdb8cd8ae39 100644 --- a/internal/scheduler/gang_scheduler_test.go +++ b/internal/scheduler/gang_scheduler_test.go @@ -20,6 +20,7 @@ import ( schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/constraints" schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/fairness" + "github.com/armadaproject/armada/internal/scheduler/floatingresources" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/nodedb" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" @@ -90,6 +91,22 @@ func TestGangScheduler(t *testing.T) { ExpectedCumulativeScheduledJobs: []int{64}, ExpectedRuntimeGangCardinality: []int{64}, }, + "floating resources": { + SchedulingConfig: func() configuration.SchedulingConfig { + cfg := testfixtures.TestSchedulingConfig() + cfg.ExperimentalFloatingResources = testfixtures.TestFloatingResourceConfig + return cfg + }(), + Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Gangs: [][]*jobdb.Job{ + // we have 10 of test-floating-resource so only the first of these two jobs should fit + addFloatingResourceRequest("6", testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1))), + addFloatingResourceRequest("6", testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1))), + }, + ExpectedScheduledIndices: testfixtures.IntRange(0, 0), + ExpectedCumulativeScheduledJobs: []int{1, 1}, + ExpectedRuntimeGangCardinality: []int{1, 0}, + }, "MaximumResourceFractionToSchedule": { SchedulingConfig: testfixtures.WithRoundLimitsConfig( map[string]float64{"cpu": 0.5}, @@ -571,7 +588,9 @@ func TestGangScheduler(t *testing.T) { tc.SchedulingConfig, nil, ) - sch, err := NewGangScheduler(sctx, constraints, nodeDb) + floatingResourceTypes, err := floatingresources.NewFloatingResourceTypes(tc.SchedulingConfig.ExperimentalFloatingResources) + require.NoError(t, err) + sch, err := NewGangScheduler(sctx, constraints, floatingResourceTypes, nodeDb) require.NoError(t, err) var actualScheduledIndices []int @@ -638,3 +657,13 @@ func TestGangScheduler(t *testing.T) { }) } } + +func addFloatingResourceRequest(request string, jobs []*jobdb.Job) []*jobdb.Job { + return testfixtures.WithRequestsJobs( + schedulerobjects.ResourceList{ + Resources: map[string]resource.Quantity{ + "test-floating-resource": resource.MustParse(request), + }, + }, + jobs) +} diff --git a/internal/scheduler/internaltypes/resource_list_factory.go b/internal/scheduler/internaltypes/resource_list_factory.go index c610b8040dc..771e4b59669 100644 --- a/internal/scheduler/internaltypes/resource_list_factory.go +++ b/internal/scheduler/internaltypes/resource_list_factory.go @@ -6,7 +6,6 @@ import ( "github.com/pkg/errors" - v1 "k8s.io/api/core/v1" k8sResource "k8s.io/apimachinery/pkg/api/resource" "github.com/armadaproject/armada/internal/scheduler/configuration" @@ -82,13 +81,10 @@ func (factory *ResourceListFactory) FromJobResourceListIgnoreUnknown(resources m } // Fail on unknown resources, round up. -func (factory *ResourceListFactory) FromJobResourceListFailOnUnknown(resources v1.ResourceList) (ResourceList, error) { - if resources == nil { - return ResourceList{}, nil - } +func (factory *ResourceListFactory) FromJobResourceListFailOnUnknown(resources map[string]k8sResource.Quantity) (ResourceList, error) { result := make([]int64, len(factory.indexToName)) for k, v := range resources { - index, ok := factory.nameToIndex[string(k)] + index, ok := factory.nameToIndex[k] if ok { result[index] = QuantityToInt64RoundUp(v, factory.scales[index]) } else { diff --git a/internal/scheduler/internaltypes/resource_list_factory_test.go b/internal/scheduler/internaltypes/resource_list_factory_test.go index 5224a553e7b..1010c9e8cdb 100644 --- a/internal/scheduler/internaltypes/resource_list_factory_test.go +++ b/internal/scheduler/internaltypes/resource_list_factory_test.go @@ -4,8 +4,6 @@ import ( "math" "testing" - v1 "k8s.io/api/core/v1" - "github.com/stretchr/testify/assert" k8sResource "k8s.io/apimachinery/pkg/api/resource" @@ -48,7 +46,7 @@ func TestFromNodeProto(t *testing.T) { func TestFromJobResourceListFailOnUnknown(t *testing.T) { factory := testFactory() - result, err := factory.FromJobResourceListFailOnUnknown(map[v1.ResourceName]k8sResource.Quantity{ + result, err := factory.FromJobResourceListFailOnUnknown(map[string]k8sResource.Quantity{ "memory": k8sResource.MustParse("100Mi"), "cpu": k8sResource.MustParse("9999999n"), }) @@ -60,7 +58,7 @@ func TestFromJobResourceListFailOnUnknown(t *testing.T) { func TestFromJobResourceListFailOnUnknownErrorsIfMissing(t *testing.T) { factory := testFactory() - _, err := factory.FromJobResourceListFailOnUnknown(map[v1.ResourceName]k8sResource.Quantity{ + _, err := factory.FromJobResourceListFailOnUnknown(map[string]k8sResource.Quantity{ "memory": k8sResource.MustParse("100Mi"), "missing": k8sResource.MustParse("1"), }) diff --git a/internal/scheduler/jobdb/comparison_test.go b/internal/scheduler/jobdb/comparison_test.go index 875c139ab39..502f7ccf613 100644 --- a/internal/scheduler/jobdb/comparison_test.go +++ b/internal/scheduler/jobdb/comparison_test.go @@ -7,9 +7,11 @@ import ( "github.com/armadaproject/armada/internal/common/stringinterner" "github.com/armadaproject/armada/internal/common/types" + "github.com/armadaproject/armada/internal/scheduler/floatingresources" ) func TestJobPriorityComparer(t *testing.T) { + emptyFloatingResourceTypes, _ := floatingresources.NewFloatingResourceTypes(nil) tests := map[string]struct { a *Job b *Job @@ -52,7 +54,7 @@ func TestJobPriorityComparer(t *testing.T) { }, "Running jobs come before queued jobs": { a: &Job{id: "a", priority: 1}, - b: (&Job{id: "b", priority: 2, jobDb: NewJobDb(map[string]types.PriorityClass{"foo": {}}, "foo", stringinterner.New(1), TestResourceListFactory)}).WithNewRun("", "", "", 0), + b: (&Job{id: "b", priority: 2, jobDb: NewJobDb(map[string]types.PriorityClass{"foo": {}}, "foo", stringinterner.New(1), TestResourceListFactory, emptyFloatingResourceTypes)}).WithNewRun("", "", "", 0), expected: 1, }, "Running jobs are ordered third by runtime": { diff --git a/internal/scheduler/jobdb/job.go b/internal/scheduler/jobdb/job.go index 8f9e8bdb7d9..c2ca8a45acc 100644 --- a/internal/scheduler/jobdb/job.go +++ b/internal/scheduler/jobdb/job.go @@ -14,6 +14,7 @@ import ( armadamaps "github.com/armadaproject/armada/internal/common/maps" "github.com/armadaproject/armada/internal/common/types" + "github.com/armadaproject/armada/internal/scheduler/adapters" "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) @@ -644,7 +645,8 @@ func (job *Job) ValidateResourceRequests() error { return nil } - _, err := job.jobDb.resourceListFactory.FromJobResourceListFailOnUnknown(req) + resourcesExclFloating := job.jobDb.floatingResourceTypes.RemoveFloatingResources(adapters.K8sResourceListToMap(req)) + _, err := job.jobDb.resourceListFactory.FromJobResourceListFailOnUnknown(resourcesExclFloating) return err } @@ -765,6 +767,11 @@ func (job *Job) Validated() bool { return job.validated } +// Does this job request any floating resources? +func (job *Job) RequestsFloatingResources() bool { + return job.jobDb.floatingResourceTypes.HasFloatingResources(safeGetRequirements(job.jobSchedulingInfo)) +} + // WithJobSchedulingInfo returns a copy of the job with the job scheduling info updated. func (job *Job) WithJobSchedulingInfo(jobSchedulingInfo *schedulerobjects.JobSchedulingInfo) (*Job, error) { j := copyJob(*job) diff --git a/internal/scheduler/jobdb/job_run_test.go b/internal/scheduler/jobdb/job_run_test.go index f00ee4b0782..91a5b7e3f01 100644 --- a/internal/scheduler/jobdb/job_run_test.go +++ b/internal/scheduler/jobdb/job_run_test.go @@ -8,6 +8,7 @@ import ( "github.com/armadaproject/armada/internal/common/stringinterner" "github.com/armadaproject/armada/internal/common/types" + "github.com/armadaproject/armada/internal/scheduler/floatingresources" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) @@ -35,6 +36,7 @@ var ( SchedulingKeyGenerator, stringinterner.New(1024), MakeTestResourceListFactory(), + makeTestEmptyFloatingResources(), ) scheduledAtPriority = int32(5) ) @@ -168,3 +170,8 @@ func TestDeepCopy(t *testing.T) { run.executor = "new executor" assert.Equal(t, expected, actual) } + +func makeTestEmptyFloatingResources() *floatingresources.FloatingResourceTypes { + result, _ := floatingresources.NewFloatingResourceTypes(nil) + return result +} diff --git a/internal/scheduler/jobdb/jobdb.go b/internal/scheduler/jobdb/jobdb.go index 8541d599bed..dead632849d 100644 --- a/internal/scheduler/jobdb/jobdb.go +++ b/internal/scheduler/jobdb/jobdb.go @@ -9,10 +9,13 @@ import ( "github.com/hashicorp/go-multierror" "github.com/pkg/errors" "golang.org/x/exp/maps" + k8sResource "k8s.io/apimachinery/pkg/api/resource" "k8s.io/utils/clock" "github.com/armadaproject/armada/internal/common/stringinterner" "github.com/armadaproject/armada/internal/common/types" + "github.com/armadaproject/armada/internal/scheduler/adapters" + "github.com/armadaproject/armada/internal/scheduler/floatingresources" "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) @@ -41,6 +44,8 @@ type JobDb struct { uuidProvider UUIDProvider // Used to make efficient ResourceList types. resourceListFactory *internaltypes.ResourceListFactory + // Info about floating resources + floatingResourceTypes *floatingresources.FloatingResourceTypes } // UUIDProvider is an interface used to mock UUID generation for tests. @@ -55,7 +60,11 @@ func (_ RealUUIDProvider) New() uuid.UUID { return uuid.New() } -func NewJobDb(priorityClasses map[string]types.PriorityClass, defaultPriorityClassName string, stringInterner *stringinterner.StringInterner, resourceListFactory *internaltypes.ResourceListFactory, +func NewJobDb(priorityClasses map[string]types.PriorityClass, + defaultPriorityClassName string, + stringInterner *stringinterner.StringInterner, + resourceListFactory *internaltypes.ResourceListFactory, + floatingResourceTypes *floatingresources.FloatingResourceTypes, ) *JobDb { return NewJobDbWithSchedulingKeyGenerator( priorityClasses, @@ -63,6 +72,7 @@ func NewJobDb(priorityClasses map[string]types.PriorityClass, defaultPriorityCla schedulerobjects.NewSchedulingKeyGenerator(), stringInterner, resourceListFactory, + floatingResourceTypes, ) } @@ -72,6 +82,7 @@ func NewJobDbWithSchedulingKeyGenerator( skg *schedulerobjects.SchedulingKeyGenerator, stringInterner *stringinterner.StringInterner, resourceListFactory *internaltypes.ResourceListFactory, + floatingResourceTypes *floatingresources.FloatingResourceTypes, ) *JobDb { defaultPriorityClass, ok := priorityClasses[defaultPriorityClassName] if !ok { @@ -91,6 +102,7 @@ func NewJobDbWithSchedulingKeyGenerator( clock: clock.RealClock{}, uuidProvider: RealUUIDProvider{}, resourceListFactory: resourceListFactory, + floatingResourceTypes: floatingResourceTypes, } } @@ -139,8 +151,6 @@ func (jobDb *JobDb) NewJob( priorityClass = jobDb.defaultPriorityClass } - rr := jobDb.getResourceRequirements(schedulingInfo) - job := &Job{ jobDb: jobDb, id: jobId, @@ -152,7 +162,7 @@ func (jobDb *JobDb) NewJob( requestedPriority: priority, submittedTime: created, jobSchedulingInfo: jobDb.internJobSchedulingInfoStrings(schedulingInfo), - resourceRequirements: rr, + resourceRequirements: jobDb.getResourceRequirements(schedulingInfo), priorityClass: priorityClass, cancelRequested: cancelRequested, cancelByJobSetRequested: cancelByJobSetRequested, @@ -167,17 +177,21 @@ func (jobDb *JobDb) NewJob( } func (jobDb *JobDb) getResourceRequirements(schedulingInfo *schedulerobjects.JobSchedulingInfo) internaltypes.ResourceList { + return jobDb.resourceListFactory.FromJobResourceListIgnoreUnknown(safeGetRequirements(schedulingInfo)) +} + +func safeGetRequirements(schedulingInfo *schedulerobjects.JobSchedulingInfo) map[string]k8sResource.Quantity { pr := schedulingInfo.GetPodRequirements() if pr == nil { - return internaltypes.ResourceList{} + return map[string]k8sResource.Quantity{} } req := pr.ResourceRequirements.Requests if req == nil { - return internaltypes.ResourceList{} + return map[string]k8sResource.Quantity{} } - return jobDb.resourceListFactory.FromJobResourceListIgnoreUnknown(schedulerobjects.ResourceListFromV1ResourceList(req).Resources) + return adapters.K8sResourceListToMap(req) } func (jobDb *JobDb) internJobSchedulingInfoStrings(info *schedulerobjects.JobSchedulingInfo) *schedulerobjects.JobSchedulingInfo { diff --git a/internal/scheduler/jobdb/jobdb_test.go b/internal/scheduler/jobdb/jobdb_test.go index 4919456a36a..8183439bcd7 100644 --- a/internal/scheduler/jobdb/jobdb_test.go +++ b/internal/scheduler/jobdb/jobdb_test.go @@ -18,10 +18,12 @@ import ( "github.com/armadaproject/armada/internal/common/stringinterner" "github.com/armadaproject/armada/internal/common/types" "github.com/armadaproject/armada/internal/common/util" + "github.com/armadaproject/armada/internal/scheduler/floatingresources" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) func NewTestJobDb() *JobDb { + emptyFloatingResourceTypes, _ := floatingresources.NewFloatingResourceTypes(nil) return NewJobDb( map[string]types.PriorityClass{ "foo": {}, @@ -30,6 +32,7 @@ func NewTestJobDb() *JobDb { "foo", stringinterner.New(1024), TestResourceListFactory, + emptyFloatingResourceTypes, ) } diff --git a/internal/scheduler/metrics.go b/internal/scheduler/metrics.go index a5eeeddc9a1..edda06c530c 100644 --- a/internal/scheduler/metrics.go +++ b/internal/scheduler/metrics.go @@ -15,6 +15,7 @@ import ( commonmetrics "github.com/armadaproject/armada/internal/common/metrics" "github.com/armadaproject/armada/internal/common/resource" "github.com/armadaproject/armada/internal/scheduler/database" + "github.com/armadaproject/armada/internal/scheduler/floatingresources" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/queue" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" @@ -24,6 +25,7 @@ import ( type queueState struct { queuedJobRecorder *commonmetrics.JobMetricsRecorder runningJobRecorder *commonmetrics.JobMetricsRecorder + priority float64 } // metricProvider is a simple implementation of QueueMetricProvider @@ -31,6 +33,12 @@ type metricProvider struct { queueStates map[string]*queueState } +func (m metricProvider) GetQueuePriorites() map[string]float64 { + return armadamaps.MapValues(m.queueStates, func(v *queueState) float64 { + return v.priority + }) +} + func (m metricProvider) GetQueuedJobMetrics(queueName string) []*commonmetrics.QueueMetrics { state, ok := m.queueStates[queueName] if ok { @@ -50,13 +58,14 @@ func (m metricProvider) GetRunningJobMetrics(queueName string) []*commonmetrics. // MetricsCollector is a Prometheus Collector that handles scheduler metrics. // The metrics themselves are calculated asynchronously every refreshPeriod type MetricsCollector struct { - jobDb *jobdb.JobDb - queueCache queue.QueueCache - executorRepository database.ExecutorRepository - poolAssigner PoolAssigner - refreshPeriod time.Duration - clock clock.WithTicker - state atomic.Value + jobDb *jobdb.JobDb + queueCache queue.QueueCache + executorRepository database.ExecutorRepository + poolAssigner PoolAssigner + refreshPeriod time.Duration + clock clock.WithTicker + state atomic.Value + floatingResourceTypes *floatingresources.FloatingResourceTypes } func NewMetricsCollector( @@ -65,15 +74,17 @@ func NewMetricsCollector( executorRepository database.ExecutorRepository, poolAssigner PoolAssigner, refreshPeriod time.Duration, + floatingResourceTypes *floatingresources.FloatingResourceTypes, ) *MetricsCollector { return &MetricsCollector{ - jobDb: jobDb, - queueCache: queueCache, - executorRepository: executorRepository, - poolAssigner: poolAssigner, - refreshPeriod: refreshPeriod, - clock: clock.RealClock{}, - state: atomic.Value{}, + jobDb: jobDb, + queueCache: queueCache, + executorRepository: executorRepository, + poolAssigner: poolAssigner, + refreshPeriod: refreshPeriod, + clock: clock.RealClock{}, + state: atomic.Value{}, + floatingResourceTypes: floatingResourceTypes, } } @@ -143,6 +154,7 @@ func (c *MetricsCollector) updateQueueMetrics(ctx *armadacontext.Context) ([]pro provider.queueStates[queue.Name] = &queueState{ queuedJobRecorder: commonmetrics.NewJobMetricsRecorder(), runningJobRecorder: commonmetrics.NewJobMetricsRecorder(), + priority: queue.PriorityFactor, } queuedJobsCount[queue.Name] = 0 schedulingKeysByQueue[queue.Name] = map[schedulerobjects.SchedulingKey]bool{} @@ -310,6 +322,17 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p } } + for _, pool := range c.floatingResourceTypes.AllPools() { + totalFloatingResources := c.floatingResourceTypes.GetTotalAvailableForPool(pool) + clusterKey := clusterMetricKey{ + cluster: "floating", + pool: pool, + nodeType: "", + } + addToResourceListMap(availableResourceByCluster, clusterKey, totalFloatingResources) + addToResourceListMap(totalResourceByCluster, clusterKey, totalFloatingResources) + } + clusterMetrics := make([]prometheus.Metric, 0, len(phaseCountByQueue)) for k, v := range phaseCountByQueue { clusterMetrics = append(clusterMetrics, commonmetrics.NewQueueLeasedPodCount(float64(v), k.cluster, k.pool, k.queueName, k.phase, k.nodeType)) diff --git a/internal/scheduler/metrics_test.go b/internal/scheduler/metrics_test.go index 53ad50e9c68..11a8ee455a3 100644 --- a/internal/scheduler/metrics_test.go +++ b/internal/scheduler/metrics_test.go @@ -13,6 +13,7 @@ import ( "github.com/armadaproject/armada/internal/common/armadacontext" commonmetrics "github.com/armadaproject/armada/internal/common/metrics" + "github.com/armadaproject/armada/internal/scheduler/floatingresources" "github.com/armadaproject/armada/internal/scheduler/jobdb" schedulermocks "github.com/armadaproject/armada/internal/scheduler/mocks" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" @@ -111,6 +112,7 @@ func TestMetricsCollector_TestCollect_QueueMetrics(t *testing.T) { executorRepository, poolAssigner, 2*time.Second, + testfixtures.TestEmptyFloatingResources, ) collector.clock = testClock err = collector.refresh(ctx) @@ -155,9 +157,10 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) { executorWithJobs := createExecutor("cluster-1", nodeWithJobs) tests := map[string]struct { - jobDbJobs []*jobdb.Job - executors []*schedulerobjects.Executor - expected []prometheus.Metric + jobDbJobs []*jobdb.Job + floatingResourceTypes *floatingresources.FloatingResourceTypes + executors []*schedulerobjects.Executor + expected []prometheus.Metric }{ "empty cluster single node type": { jobDbJobs: []*jobdb.Job{}, @@ -233,6 +236,15 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) { commonmetrics.NewClusterTotalCapacity(1, "cluster-1", testfixtures.TestPool, "nodes", "type-1"), }, }, + "floating resources": { + jobDbJobs: []*jobdb.Job{}, + floatingResourceTypes: testfixtures.TestFloatingResources, + executors: []*schedulerobjects.Executor{}, + expected: []prometheus.Metric{ + commonmetrics.NewClusterAvailableCapacity(10, "floating", "pool", "test-floating-resource", ""), + commonmetrics.NewClusterTotalCapacity(10, "floating", "pool", "test-floating-resource", ""), + }, + }, } for name, tc := range tests { t.Run(name, func(t *testing.T) { @@ -255,12 +267,17 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) { executorRepository := schedulermocks.NewMockExecutorRepository(ctrl) executorRepository.EXPECT().GetExecutors(ctx).Return(tc.executors, nil) + if tc.floatingResourceTypes == nil { + tc.floatingResourceTypes = testfixtures.TestEmptyFloatingResources + } + collector := NewMetricsCollector( jobDb, queueCache, executorRepository, poolAssigner, 2*time.Second, + tc.floatingResourceTypes, ) collector.clock = testClock err = collector.refresh(ctx) diff --git a/internal/scheduler/nodedb/nodedb_test.go b/internal/scheduler/nodedb/nodedb_test.go index 5c98c3bc770..dc5e577e7c1 100644 --- a/internal/scheduler/nodedb/nodedb_test.go +++ b/internal/scheduler/nodedb/nodedb_test.go @@ -4,6 +4,8 @@ import ( "fmt" "testing" + "github.com/armadaproject/armada/internal/scheduler/adapters" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/exp/maps" @@ -123,7 +125,7 @@ func TestNodeBindingEvictionUnbinding(t *testing.T) { jobFilter := func(job *jobdb.Job) bool { return true } job := testfixtures.Test1GpuJob("A", testfixtures.PriorityClass0) request := job.EfficientResourceRequirements() - requestInternalRl, err := nodeDb.resourceListFactory.FromJobResourceListFailOnUnknown(job.ResourceRequirements().Requests) + requestInternalRl, err := nodeDb.resourceListFactory.FromJobResourceListFailOnUnknown(adapters.K8sResourceListToMap(job.ResourceRequirements().Requests)) assert.Nil(t, err) jobId := job.Id() diff --git a/internal/scheduler/nodedb/nodeiteration_test.go b/internal/scheduler/nodedb/nodeiteration_test.go index 9a61a6b1e92..af39f364e4a 100644 --- a/internal/scheduler/nodedb/nodeiteration_test.go +++ b/internal/scheduler/nodedb/nodeiteration_test.go @@ -437,7 +437,7 @@ func TestNodeTypeIterator(t *testing.T) { require.NoError(t, nodeDb.UpsertMany(entries)) indexedResourceRequests := make([]int64, len(testfixtures.TestResources)) - rr, err := testfixtures.TestResourceListFactory.FromJobResourceListFailOnUnknown(schedulerobjects.V1ResourceListFromResourceList(tc.resourceRequests)) + rr, err := testfixtures.TestResourceListFactory.FromJobResourceListFailOnUnknown(tc.resourceRequests.Resources) assert.Nil(t, err) for i, resourceName := range nodeDb.indexedResources { indexedResourceRequests[i], err = rr.GetByName(resourceName) @@ -830,7 +830,7 @@ func TestNodeTypesIterator(t *testing.T) { } require.NoError(t, nodeDb.UpsertMany(entries)) - rr, err := testfixtures.TestResourceListFactory.FromJobResourceListFailOnUnknown(schedulerobjects.V1ResourceListFromResourceList(tc.resourceRequests)) + rr, err := testfixtures.TestResourceListFactory.FromJobResourceListFailOnUnknown(tc.resourceRequests.Resources) assert.Nil(t, err) indexedResourceRequests := make([]int64, len(testfixtures.TestResources)) diff --git a/internal/scheduler/nodedb/nodematching_test.go b/internal/scheduler/nodedb/nodematching_test.go index 5ae54c65f8a..f84ff20699d 100644 --- a/internal/scheduler/nodedb/nodematching_test.go +++ b/internal/scheduler/nodedb/nodematching_test.go @@ -665,12 +665,12 @@ func makeTestNodeTaintsLabels(taints []v1.Taint, labels map[string]string) *inte } func makeTestNodeResources(t *testing.T, allocatableByPriority schedulerobjects.AllocatableByPriorityAndResourceType, totalResources schedulerobjects.ResourceList) *internaltypes.Node { - tr, err := testfixtures.TestResourceListFactory.FromJobResourceListFailOnUnknown(schedulerobjects.V1ResourceListFromResourceList(totalResources)) + tr, err := testfixtures.TestResourceListFactory.FromJobResourceListFailOnUnknown(totalResources.Resources) assert.Nil(t, err) abp := map[int32]internaltypes.ResourceList{} for pri, rl := range allocatableByPriority { - abp[pri], err = testfixtures.TestResourceListFactory.FromJobResourceListFailOnUnknown(schedulerobjects.V1ResourceListFromResourceList(rl)) + abp[pri], err = testfixtures.TestResourceListFactory.FromJobResourceListFailOnUnknown(rl.Resources) assert.Nil(t, err) } diff --git a/internal/scheduler/preempting_queue_scheduler.go b/internal/scheduler/preempting_queue_scheduler.go index 8732836cbc0..46d11625e0c 100644 --- a/internal/scheduler/preempting_queue_scheduler.go +++ b/internal/scheduler/preempting_queue_scheduler.go @@ -18,6 +18,7 @@ import ( schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/constraints" schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/fairness" + "github.com/armadaproject/armada/internal/scheduler/floatingresources" "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/nodedb" @@ -29,6 +30,7 @@ import ( type PreemptingQueueScheduler struct { schedulingContext *schedulercontext.SchedulingContext constraints schedulerconstraints.SchedulingConstraints + floatingResourceTypes *floatingresources.FloatingResourceTypes protectedFractionOfFairShare float64 useAdjustedFairShareProtection bool jobRepo JobRepository @@ -50,6 +52,7 @@ type PreemptingQueueScheduler struct { func NewPreemptingQueueScheduler( sctx *schedulercontext.SchedulingContext, constraints schedulerconstraints.SchedulingConstraints, + floatingResourceTypes *floatingresources.FloatingResourceTypes, protectedFractionOfFairShare float64, useAdjustedFairShareProtection bool, jobRepo JobRepository, @@ -74,6 +77,7 @@ func NewPreemptingQueueScheduler( return &PreemptingQueueScheduler{ schedulingContext: sctx, constraints: constraints, + floatingResourceTypes: floatingResourceTypes, protectedFractionOfFairShare: protectedFractionOfFairShare, useAdjustedFairShareProtection: useAdjustedFairShareProtection, jobRepo: jobRepo, @@ -155,7 +159,7 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche maps.Copy(sch.nodeIdByJobId, evictorResult.NodeIdByJobId) // Re-schedule evicted jobs/schedule new jobs. - ctx.WithField("stage", "scheduling-algo").Info("Performing initial scheduling jobs onto nodes") + ctx.WithField("stage", "scheduling-algo").Info("Performing initial scheduling of jobs onto nodes") schedulerResult, err := sch.schedule( armadacontext.WithLogField(ctx, "stage", "re-schedule after balancing eviction"), inMemoryJobRepo, @@ -552,6 +556,7 @@ func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, inMemo sched, err := NewQueueScheduler( sch.schedulingContext, sch.constraints, + sch.floatingResourceTypes, sch.nodeDb, jobIteratorByQueue, ) diff --git a/internal/scheduler/preempting_queue_scheduler_test.go b/internal/scheduler/preempting_queue_scheduler_test.go index 5e31e10553a..cfdb75d837b 100644 --- a/internal/scheduler/preempting_queue_scheduler_test.go +++ b/internal/scheduler/preempting_queue_scheduler_test.go @@ -51,7 +51,7 @@ func TestEvictOversubscribed(t *testing.T) { err = nodeDb.CreateAndInsertWithJobDbJobsWithTxn(nodeDbTxn, jobs, node) require.NoError(t, err) - jobDb := jobdb.NewJobDb(config.PriorityClasses, config.DefaultPriorityClassName, stringInterner, testfixtures.TestResourceListFactory) + jobDb := jobdb.NewJobDb(config.PriorityClasses, config.DefaultPriorityClassName, stringInterner, testfixtures.TestResourceListFactory, testfixtures.TestEmptyFloatingResources) jobDbTxn := jobDb.WriteTxn() err = jobDbTxn.Upsert(jobs) require.NoError(t, err) @@ -1737,7 +1737,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { priorities := types.AllowedPriorities(tc.SchedulingConfig.PriorityClasses) - jobDb := jobdb.NewJobDb(tc.SchedulingConfig.PriorityClasses, tc.SchedulingConfig.DefaultPriorityClassName, stringinterner.New(1024), testfixtures.TestResourceListFactory) + jobDb := jobdb.NewJobDb(tc.SchedulingConfig.PriorityClasses, tc.SchedulingConfig.DefaultPriorityClassName, stringinterner.New(1024), testfixtures.TestResourceListFactory, testfixtures.TestEmptyFloatingResources) jobDbTxn := jobDb.WriteTxn() // Accounting across scheduling rounds. @@ -1871,6 +1871,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { sch := NewPreemptingQueueScheduler( sctx, constraints, + testfixtures.TestEmptyFloatingResources, tc.SchedulingConfig.ProtectedFractionOfFairShare, tc.SchedulingConfig.UseAdjustedFairShareProtection, NewSchedulerJobRepositoryAdapter(jobDbTxn), @@ -2175,7 +2176,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { } txn.Commit() - jobDb := jobdb.NewJobDb(tc.SchedulingConfig.PriorityClasses, tc.SchedulingConfig.DefaultPriorityClassName, stringinterner.New(1024), testfixtures.TestResourceListFactory) + jobDb := jobdb.NewJobDb(tc.SchedulingConfig.PriorityClasses, tc.SchedulingConfig.DefaultPriorityClassName, stringinterner.New(1024), testfixtures.TestResourceListFactory, testfixtures.TestEmptyFloatingResources) jobDbTxn := jobDb.WriteTxn() var queuedJobs []*jobdb.Job for _, jobs := range jobsByQueue { @@ -2227,6 +2228,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { sch := NewPreemptingQueueScheduler( sctx, constraints, + testfixtures.TestEmptyFloatingResources, tc.SchedulingConfig.ProtectedFractionOfFairShare, tc.SchedulingConfig.UseAdjustedFairShareProtection, NewSchedulerJobRepositoryAdapter(jobDbTxn), @@ -2288,6 +2290,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { sch := NewPreemptingQueueScheduler( sctx, constraints, + testfixtures.TestEmptyFloatingResources, tc.SchedulingConfig.ProtectedFractionOfFairShare, tc.SchedulingConfig.UseAdjustedFairShareProtection, NewSchedulerJobRepositoryAdapter(jobDbTxn), diff --git a/internal/scheduler/queue_scheduler.go b/internal/scheduler/queue_scheduler.go index de7cb68dd5c..1251e3ea1da 100644 --- a/internal/scheduler/queue_scheduler.go +++ b/internal/scheduler/queue_scheduler.go @@ -13,6 +13,7 @@ import ( schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/constraints" schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/fairness" + "github.com/armadaproject/armada/internal/scheduler/floatingresources" "github.com/armadaproject/armada/internal/scheduler/nodedb" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) @@ -28,6 +29,7 @@ type QueueScheduler struct { func NewQueueScheduler( sctx *schedulercontext.SchedulingContext, constraints schedulerconstraints.SchedulingConstraints, + floatingResourceTypes *floatingresources.FloatingResourceTypes, nodeDb *nodedb.NodeDb, jobIteratorByQueue map[string]JobIterator, ) (*QueueScheduler, error) { @@ -36,7 +38,7 @@ func NewQueueScheduler( return nil, errors.Errorf("no scheduling context for queue %s", queue) } } - gangScheduler, err := NewGangScheduler(sctx, constraints, nodeDb) + gangScheduler, err := NewGangScheduler(sctx, constraints, floatingResourceTypes, nodeDb) if err != nil { return nil, err } diff --git a/internal/scheduler/queue_scheduler_test.go b/internal/scheduler/queue_scheduler_test.go index 7ac1bb43b64..582e0543bd4 100644 --- a/internal/scheduler/queue_scheduler_test.go +++ b/internal/scheduler/queue_scheduler_test.go @@ -589,7 +589,7 @@ func TestQueueScheduler(t *testing.T) { it := jobRepo.GetJobIterator(q.Name) jobIteratorByQueue[q.Name] = it } - sch, err := NewQueueScheduler(sctx, constraints, nodeDb, jobIteratorByQueue) + sch, err := NewQueueScheduler(sctx, constraints, testfixtures.TestEmptyFloatingResources, nodeDb, jobIteratorByQueue) require.NoError(t, err) result, err := sch.Schedule(armadacontext.Background()) diff --git a/internal/scheduler/schedulerapp.go b/internal/scheduler/schedulerapp.go index 130a36d9af4..4750475a458 100644 --- a/internal/scheduler/schedulerapp.go +++ b/internal/scheduler/schedulerapp.go @@ -32,11 +32,13 @@ import ( "github.com/armadaproject/armada/internal/common/profiling" "github.com/armadaproject/armada/internal/common/pulsarutils" "github.com/armadaproject/armada/internal/common/serve" + "github.com/armadaproject/armada/internal/common/slices" "github.com/armadaproject/armada/internal/common/stringinterner" "github.com/armadaproject/armada/internal/common/types" schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration" "github.com/armadaproject/armada/internal/scheduler/database" "github.com/armadaproject/armada/internal/scheduler/failureestimator" + "github.com/armadaproject/armada/internal/scheduler/floatingresources" "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/leader" @@ -84,6 +86,12 @@ func Run(config schedulerconfig.Configuration) error { } ctx.Infof("Supported resource types: %s", resourceListFactory.SummaryString()) + floatingResourceTypes, err := floatingresources.NewFloatingResourceTypes(config.Scheduling.ExperimentalFloatingResources) + if err != nil { + return err + } + ctx.Infof("Floating resource types: %s", floatingResourceTypes.SummaryString()) + // List of services to run concurrently. // Because we want to start services only once all input validation has been completed, // we add all services to a slice and start them together at the end of this function. @@ -179,6 +187,7 @@ func Run(config schedulerconfig.Configuration) error { jobRepository, executorRepository, types.AllowedPriorities(config.Scheduling.PriorityClasses), + slices.Map(config.Scheduling.SupportedResourceTypes, func(rt schedulerconfig.ResourceType) string { return rt.Name }), config.Scheduling.NodeIdLabel, config.Scheduling.PriorityClassNameOverride, config.Scheduling.PriorityClasses, @@ -281,6 +290,7 @@ func Run(config schedulerconfig.Configuration) error { queueQuarantiner, stringInterner, resourceListFactory, + floatingResourceTypes, ) if err != nil { return errors.WithMessage(err, "error creating scheduling algo") @@ -290,6 +300,7 @@ func Run(config schedulerconfig.Configuration) error { config.Scheduling.DefaultPriorityClassName, stringInterner, resourceListFactory, + floatingResourceTypes, ) schedulingRoundMetrics := NewSchedulerMetrics(config.Metrics.Metrics) if err := prometheus.Register(schedulingRoundMetrics); err != nil { @@ -338,6 +349,7 @@ func Run(config schedulerconfig.Configuration) error { executorRepository, poolAssigner, config.Metrics.RefreshInterval, + floatingResourceTypes, ) if err := prometheus.Register(metricsCollector); err != nil { return errors.WithStack(err) diff --git a/internal/scheduler/scheduling_algo.go b/internal/scheduler/scheduling_algo.go index 0af6e58c435..f9e1e01f8e1 100644 --- a/internal/scheduler/scheduling_algo.go +++ b/internal/scheduler/scheduling_algo.go @@ -23,6 +23,7 @@ import ( schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/database" "github.com/armadaproject/armada/internal/scheduler/fairness" + "github.com/armadaproject/armada/internal/scheduler/floatingresources" "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/nodedb" @@ -61,10 +62,11 @@ type FairSchedulingAlgo struct { // Used to reduce the rate at which jobs are scheduled from misbehaving queues. queueQuarantiner *quarantine.QueueQuarantiner // Function that is called every time an executor is scheduled. Useful for testing. - onExecutorScheduled func(executor *schedulerobjects.Executor) - clock clock.Clock - stringInterner *stringinterner.StringInterner - resourceListFactory *internaltypes.ResourceListFactory + onExecutorScheduled func(executor *schedulerobjects.Executor) + clock clock.Clock + stringInterner *stringinterner.StringInterner + resourceListFactory *internaltypes.ResourceListFactory + floatingResourceTypes *floatingresources.FloatingResourceTypes } func NewFairSchedulingAlgo( @@ -77,6 +79,7 @@ func NewFairSchedulingAlgo( queueQuarantiner *quarantine.QueueQuarantiner, stringInterner *stringinterner.StringInterner, resourceListFactory *internaltypes.ResourceListFactory, + floatingResourceTypes *floatingresources.FloatingResourceTypes, ) (*FairSchedulingAlgo, error) { if _, ok := config.PriorityClasses[config.DefaultPriorityClassName]; !ok { return nil, errors.Errorf( @@ -98,6 +101,7 @@ func NewFairSchedulingAlgo( clock: clock.RealClock{}, stringInterner: stringInterner, resourceListFactory: resourceListFactory, + floatingResourceTypes: floatingResourceTypes, }, nil } @@ -291,6 +295,10 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con } } + for pool, poolCapacity := range totalCapacityByPool { + poolCapacity.Add(l.floatingResourceTypes.GetTotalAvailableForPool(pool)) + } + // Create a map of jobs associated with each executor. jobsByExecutorId := make(map[string][]*jobdb.Job) nodeIdByJobId := make(map[string]string) @@ -495,6 +503,7 @@ func (l *FairSchedulingAlgo) scheduleOnExecutors( scheduler := NewPreemptingQueueScheduler( sctx, constraints, + l.floatingResourceTypes, l.schedulingConfig.ProtectedFractionOfFairShare, l.schedulingConfig.UseAdjustedFairShareProtection, NewSchedulerJobRepositoryAdapter(fsctx.txn), diff --git a/internal/scheduler/scheduling_algo_test.go b/internal/scheduler/scheduling_algo_test.go index 94825c2a287..558022a8782 100644 --- a/internal/scheduler/scheduling_algo_test.go +++ b/internal/scheduler/scheduling_algo_test.go @@ -380,6 +380,7 @@ func TestSchedule(t *testing.T) { nil, stringinterner.New(1024), testfixtures.TestResourceListFactory, + testfixtures.TestEmptyFloatingResources, ) require.NoError(t, err) @@ -532,6 +533,7 @@ func BenchmarkNodeDbConstruction(b *testing.B) { nil, stringInterner, testfixtures.TestResourceListFactory, + testfixtures.TestEmptyFloatingResources, ) require.NoError(b, err) b.StartTimer() diff --git a/internal/scheduler/simulator/simulator.go b/internal/scheduler/simulator/simulator.go index 059e4ee0deb..c901dc0388a 100644 --- a/internal/scheduler/simulator/simulator.go +++ b/internal/scheduler/simulator/simulator.go @@ -26,6 +26,7 @@ import ( schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/constraints" schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/fairness" + "github.com/armadaproject/armada/internal/scheduler/floatingresources" "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/nodedb" @@ -102,6 +103,11 @@ func NewSimulator(clusterSpec *ClusterSpec, workloadSpec *WorkloadSpec, scheduli if err != nil { return nil, errors.WithMessage(err, "Error with the .scheduling.supportedResourceTypes field in config") } + floatingResourceTypes, err := floatingresources.NewFloatingResourceTypes(schedulingConfig.ExperimentalFloatingResources) + if err != nil { + return nil, err + } + clusterSpec = proto.Clone(clusterSpec).(*ClusterSpec) workloadSpec = proto.Clone(workloadSpec).(*WorkloadSpec) initialiseClusterSpec(clusterSpec) @@ -117,6 +123,7 @@ func NewSimulator(clusterSpec *ClusterSpec, workloadSpec *WorkloadSpec, scheduli schedulingConfig.DefaultPriorityClassName, stringinterner.New(1024), resourceListFactory, + floatingResourceTypes, ) randomSeed := workloadSpec.RandomSeed if randomSeed == 0 { @@ -503,9 +510,16 @@ func (s *Simulator) handleScheduleEvent(ctx *armadacontext.Context) error { s.schedulingConfig, nil, ) + + nloatingResourceTypes, err := floatingresources.NewFloatingResourceTypes(s.schedulingConfig.ExperimentalFloatingResources) + if err != nil { + return err + } + sch := scheduler.NewPreemptingQueueScheduler( sctx, constraints, + nloatingResourceTypes, s.schedulingConfig.ProtectedFractionOfFairShare, s.schedulingConfig.UseAdjustedFairShareProtection, scheduler.NewSchedulerJobRepositoryAdapter(txn), diff --git a/internal/scheduler/testfixtures/testfixtures.go b/internal/scheduler/testfixtures/testfixtures.go index 36bac3dfb58..fffd7d7a082 100644 --- a/internal/scheduler/testfixtures/testfixtures.go +++ b/internal/scheduler/testfixtures/testfixtures.go @@ -21,6 +21,7 @@ import ( "github.com/armadaproject/armada/internal/common/types" "github.com/armadaproject/armada/internal/common/util" schedulerconfiguration "github.com/armadaproject/armada/internal/scheduler/configuration" + "github.com/armadaproject/armada/internal/scheduler/floatingresources" "github.com/armadaproject/armada/internal/scheduler/internaltypes" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" @@ -41,9 +42,22 @@ const ( ) var ( - TestResourceListFactory = MakeTestResourceListFactory() - BaseTime, _ = time.Parse("2006-01-02T15:04:05.000Z", "2022-03-01T15:04:05.000Z") - TestPriorityClasses = map[string]types.PriorityClass{ + TestResourceListFactory = MakeTestResourceListFactory() + TestEmptyFloatingResources = MakeTestFloatingResourceTypes(nil) + TestFloatingResources = MakeTestFloatingResourceTypes(TestFloatingResourceConfig) + TestFloatingResourceConfig = []schedulerconfiguration.FloatingResourceConfig{ + { + Name: "test-floating-resource", + Pools: []schedulerconfiguration.FloatingResourcePoolConfig{ + { + Name: "pool", + Quantity: resource.MustParse("10"), + }, + }, + }, + } + BaseTime, _ = time.Parse("2006-01-02T15:04:05.000Z", "2022-03-01T15:04:05.000Z") + TestPriorityClasses = map[string]types.PriorityClass{ PriorityClass0: {Priority: 0, Preemptible: true}, PriorityClass1: {Priority: 1, Preemptible: true}, PriorityClass2: {Priority: 2, Preemptible: true}, @@ -100,6 +114,7 @@ func NewJobDb(resourceListFactory *internaltypes.ResourceListFactory) *jobdb.Job SchedulingKeyGenerator, stringinterner.New(1024), resourceListFactory, + TestFloatingResources, ) // Mock out the clock and uuid provider to ensure consistent ids and timestamps are generated. jobDb.SetClock(NewMockPassiveClock()) @@ -954,6 +969,11 @@ func MakeTestResourceListFactory() *internaltypes.ResourceListFactory { return result } +func MakeTestFloatingResourceTypes(config []schedulerconfiguration.FloatingResourceConfig) *floatingresources.FloatingResourceTypes { + result, _ := floatingresources.NewFloatingResourceTypes(config) + return result +} + func GetTestSupportedResourceTypes() []schedulerconfiguration.ResourceType { return []schedulerconfiguration.ResourceType{ {Name: "memory", Resolution: resource.MustParse("1")}, diff --git a/pkg/executorapi/util.go b/pkg/executorapi/util.go index c7751511a7b..e6180575a16 100644 --- a/pkg/executorapi/util.go +++ b/pkg/executorapi/util.go @@ -3,14 +3,13 @@ package executorapi import ( "time" + "github.com/pkg/errors" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" - armadaslices "github.com/armadaproject/armada/internal/common/slices" - - "github.com/pkg/errors" - "github.com/armadaproject/armada/internal/common/armadaerrors" + armadamaps "github.com/armadaproject/armada/internal/common/maps" + armadaslices "github.com/armadaproject/armada/internal/common/slices" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" "github.com/armadaproject/armada/pkg/api" ) @@ -75,14 +74,14 @@ func NewNodeFromNodeInfo(nodeInfo *NodeInfo, executor string, allowedPriorities } func ResourceListFromProtoResources(r map[string]*resource.Quantity) schedulerobjects.ResourceList { - resources := make(map[string]resource.Quantity, len(r)) - for k, v := range r { - if v != nil { - r := v.DeepCopy() - resources[k] = r - } + return schedulerobjects.ResourceList{ + Resources: armadamaps.MapValues(r, func(v *resource.Quantity) resource.Quantity { + if v != nil { + return *v + } + return resource.Quantity{} + }), } - return schedulerobjects.ResourceList{Resources: resources} } func ComputeResourceFromProtoResources(r map[string]resource.Quantity) *ComputeResource {