Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler: Add floating resource support #3767

Merged
merged 7 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions internal/common/maps/maps.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@ package maps

import "github.com/armadaproject/armada/internal/common/interfaces"

// FromSlice maps element e of slice into map entry keyFunc(e): valueFunc(e)
func FromSlice[S []E, E any, K comparable, V any](slice S, keyFunc func(E) K, valueFunc func(E) V) map[K]V {
rv := make(map[K]V, len(slice))
for _, elem := range slice {
rv[keyFunc(elem)] = valueFunc(elem)
}
return rv
}

// MapValues maps the values of m into valueFunc(v).
func MapValues[M ~map[K]VA, K comparable, VA any, VB any](m M, valueFunc func(VA) VB) map[K]VB {
rv := make(map[K]VB, len(m))
Expand Down Expand Up @@ -75,3 +84,22 @@ func Filter[M ~map[K]V, K comparable, V any](m M, predicate func(K, V) bool) M {
}
return rv
}

// RemoveInPlace removes elements that match keySelector
func RemoveInPlace[K comparable, V any](m map[K]V, keySelector func(K) bool) {
for k := range m {
if keySelector(k) {
delete(m, k)
}
}
}

func Keys[K comparable, V any](m map[K]V) []K {
i := 0
result := make([]K, len(m))
for k := range m {
result[i] = k
i++
}
return result
}
42 changes: 42 additions & 0 deletions internal/common/maps/maps_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,26 @@
package maps

import (
"strconv"
"testing"

"github.com/stretchr/testify/assert"
"golang.org/x/exp/slices"
)

func TestFromSlice(t *testing.T) {
actual := FromSlice(
[]int{1, 2},
func(elem int) string { return strconv.Itoa(elem) },
func(elem int) float64 { return float64(elem) * 0.1 },
)
expected := map[string]float64{
"1": 0.1,
"2": 0.2,
}
assert.Equal(t, expected, actual)
}

func TestMapKeys(t *testing.T) {
m := map[string][]int{
"foo": {1, 2, 3},
Expand Down Expand Up @@ -176,3 +190,31 @@ func TestFilterKeys(t *testing.T) {
}
assert.Equal(t, expected, actual)
}

func TestRemoveInPlace(t *testing.T) {
m := map[int]string{
1: "one",
2: "two",
3: "three",
4: "four",
}
RemoveInPlace(m, func(i int) bool {
return i > 2
})
expected := map[int]string{
1: "one",
2: "two",
}
assert.Equal(t, expected, m)
}

func TestKeys(t *testing.T) {
m := map[int]string{
1: "one",
2: "two",
}
result := Keys(m)
slices.Sort(result)
expected := []int{1, 2}
assert.Equal(t, expected, result)
}
12 changes: 12 additions & 0 deletions internal/scheduler/adapters/adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
k8sResource "k8s.io/apimachinery/pkg/api/resource"

"github.com/armadaproject/armada/internal/common/logging"
"github.com/armadaproject/armada/internal/common/types"
Expand Down Expand Up @@ -62,3 +63,14 @@ func PriorityFromPodSpec(podSpec *v1.PodSpec, priorityClasses map[string]types.P
// Couldn't find anything
return 0, false
}

func K8sResourceListToMap(resources v1.ResourceList) map[string]k8sResource.Quantity {
if resources == nil {
return nil
}
result := make(map[string]k8sResource.Quantity, len(resources))
for k, v := range resources {
result[string(k)] = v
}
return result
}
17 changes: 17 additions & 0 deletions internal/scheduler/adapters/adapters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,20 @@ func TestPriorityFromPodSpec(t *testing.T) {
})
}
}

func TestK8sResourceListToMap(t *testing.T) {
result := K8sResourceListToMap(v1.ResourceList{
"one": resource.MustParse("1"),
"two": resource.MustParse("2"),
})
expected := map[string]resource.Quantity{
"one": resource.MustParse("1"),
"two": resource.MustParse("2"),
}

assert.Equal(t, expected, result)
}

func TestK8sResourceListToMap_PreservesNil(t *testing.T) {
assert.Nil(t, K8sResourceListToMap(nil))
}
33 changes: 32 additions & 1 deletion internal/scheduler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/compress"
"github.com/armadaproject/armada/internal/common/logging"
"github.com/armadaproject/armada/internal/common/maps"
"github.com/armadaproject/armada/internal/common/pulsarutils"
"github.com/armadaproject/armada/internal/common/slices"
priorityTypes "github.com/armadaproject/armada/internal/common/types"
Expand All @@ -40,6 +41,9 @@ type ExecutorApi struct {
allowedPriorities []int32
// Known priority classes
priorityClasses map[string]priorityTypes.PriorityClass
// Allowed resource names - resource requests/limits not on this list are dropped.
// This is needed to ensure floating resources are not passed to k8s.
allowedResources map[string]bool
// Max number of events in published Pulsar messages
maxEventsPerPulsarMessage int
// Max size of Pulsar messages produced.
Expand All @@ -55,6 +59,7 @@ func NewExecutorApi(producer pulsar.Producer,
jobRepository database.JobRepository,
executorRepository database.ExecutorRepository,
allowedPriorities []int32,
allowedResources []string,
nodeIdLabel string,
priorityClassNameOverride *string,
priorityClasses map[string]priorityTypes.PriorityClass,
Expand All @@ -69,6 +74,7 @@ func NewExecutorApi(producer pulsar.Producer,
jobRepository: jobRepository,
executorRepository: executorRepository,
allowedPriorities: allowedPriorities,
allowedResources: maps.FromSlice(allowedResources, func(name string) string { return name }, func(name string) bool { return true }),
maxEventsPerPulsarMessage: maxEventsPerPulsarMessage,
maxPulsarMessageSizeBytes: maxPulsarMessageSizeBytes,
nodeIdLabel: nodeIdLabel,
Expand Down Expand Up @@ -109,7 +115,7 @@ func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRuns
return err
}
ctx.Infof(
"executor currently has %d job runs; sending %d cancellations and %d new runs",
"Executor currently has %d job runs; sending %d cancellations and %d new runs",
len(requestRuns), len(runsToCancel), len(newRuns),
)

Expand Down Expand Up @@ -149,6 +155,8 @@ func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRuns

srv.addPreemptibleLabel(submitMsg)

srv.dropDisallowedResources(submitMsg.MainObject.GetPodSpec().PodSpec)

// This must happen after anything that relies on the priorityClassName
if srv.priorityClassNameOverride != nil {
srv.setPriorityClassName(submitMsg, *srv.priorityClassNameOverride)
Expand Down Expand Up @@ -216,6 +224,29 @@ func (srv *ExecutorApi) addPreemptibleLabel(job *armadaevents.SubmitJob) {
addLabels(job, labels)
}

// Drop non-supported resources. This is needed to ensure floating resources
// are not passed to k8s.
func (srv *ExecutorApi) dropDisallowedResources(pod *v1.PodSpec) {
if pod == nil {
return
}
srv.dropDisallowedResourcesFromContainers(pod.InitContainers)
srv.dropDisallowedResourcesFromContainers(pod.Containers)
}

func (srv *ExecutorApi) dropDisallowedResourcesFromContainers(containers []v1.Container) {
for _, container := range containers {
removeDisallowedKeys(container.Resources.Limits, srv.allowedResources)
removeDisallowedKeys(container.Resources.Requests, srv.allowedResources)
}
}

func removeDisallowedKeys(rl v1.ResourceList, allowedKeys map[string]bool) {
maps.RemoveInPlace(rl, func(name v1.ResourceName) bool {
return !allowedKeys[string(name)]
})
}

func (srv *ExecutorApi) isPreemptible(job *armadaevents.SubmitJob) bool {
priorityClassName := ""

Expand Down
9 changes: 9 additions & 0 deletions internal/scheduler/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ import (
"github.com/armadaproject/armada/internal/common/mocks"
protoutil "github.com/armadaproject/armada/internal/common/proto"
"github.com/armadaproject/armada/internal/common/pulsarutils"
"github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/internal/common/types"
schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration"
"github.com/armadaproject/armada/internal/scheduler/database"
schedulermocks "github.com/armadaproject/armada/internal/scheduler/mocks"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/armadaproject/armada/internal/scheduler/testfixtures"
"github.com/armadaproject/armada/pkg/api"
"github.com/armadaproject/armada/pkg/armadaevents"
"github.com/armadaproject/armada/pkg/executorapi"
Expand Down Expand Up @@ -321,6 +324,7 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {
mockJobRepository,
mockExecutorRepository,
[]int32{1000, 2000},
testResourceNames(),
"kubernetes.io/hostname",
nil,
priorityClasses,
Expand Down Expand Up @@ -448,6 +452,7 @@ func TestExecutorApi_Publish(t *testing.T) {
mockJobRepository,
mockExecutorRepository,
[]int32{1000, 2000},
testResourceNames(),
"kubernetes.io/hostname",
nil,
priorityClasses,
Expand Down Expand Up @@ -495,3 +500,7 @@ func groups(t *testing.T) ([]string, []byte) {
require.NoError(t, err)
return groups, compressed
}

func testResourceNames() []string {
return slices.Map(testfixtures.GetTestSupportedResourceTypes(), func(rt schedulerconfig.ResourceType) string { return rt.Name })
}
20 changes: 20 additions & 0 deletions internal/scheduler/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,19 @@ type LeaderConfig struct {
LeaderConnection client.ApiConnectionDetails
}

type FloatingResourceConfig struct {
// Resource name, e.g. "s3-connections"
Name string
// Per-pool config.
Pools []FloatingResourcePoolConfig
}

type FloatingResourcePoolConfig struct {
// Name of the pool.
Name string
// Amount of this resource that can be allocated across all jobs in this pool.
Quantity resource.Quantity
}
type HttpConfig struct {
Port int `validate:"required"`
}
Expand Down Expand Up @@ -230,6 +243,13 @@ type SchedulingConfig struct {
//
// If not set, all taints are indexed.
IndexedTaints []string
// Experimental - subject to change
// Resources that are outside of k8s, and not tied to a given k8s node or cluster.
// For example connections to an S3 server that sits outside of k8s could be rationed to limit load on the server.
// These can be requested like a normal k8s resource. Note there is no mechanism in armada
// to enforce actual usage, it relies on honesty. For example, there is nothing to stop a badly-behaved job
// requesting 2 S3 server connections and then opening 10.
ExperimentalFloatingResources []FloatingResourceConfig
// WellKnownNodeTypes defines a set of well-known node types used to define "home" and "away" nodes for a given priority class.
WellKnownNodeTypes []WellKnownNodeType `validate:"dive"`
// Executor that haven't heartbeated in this time period are considered stale.
Expand Down
Loading