Skip to content

Commit

Permalink
Merge branch 'master' into f/chrisma/queues/prometheus
Browse files Browse the repository at this point in the history
  • Loading branch information
d80tb7 authored Jul 3, 2024
2 parents 979e7e2 + c62ef88 commit 7929de7
Show file tree
Hide file tree
Showing 25 changed files with 938 additions and 839 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ require (
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.17.0
github.com/rakyll/statik v0.1.7
github.com/renstrom/shortuuid v3.0.0+incompatible
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.8.0
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -472,8 +472,6 @@ github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lne
github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/rakyll/statik v0.1.7 h1:OF3QCZUuyPxuGEP7B4ypUa7sB/iHtqOTDYZXGM8KOdQ=
github.com/rakyll/statik v0.1.7/go.mod h1:AlZONWzMtEnMs7W4e/1LURLiI49pIMmp6V9Unghqrcc=
github.com/redis/go-redis/extra/redisprometheus/v9 v9.0.5 h1:kvl0LOTQD23VR1R7A9vDti9msfV6mOE2+j6ngYkFsfg=
github.com/redis/go-redis/extra/redisprometheus/v9 v9.0.5/go.mod h1:VhyLk7MdSTKbJCx6+wXlj3/ebh49gTq3yBiXymYrG7w=
github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8=
Expand Down
2 changes: 1 addition & 1 deletion internal/armada/event/conversion/conversions.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func FromInternalSubmit(owner string, groups []string, queue string, jobSet stri
JobSetId: jobSet,
Queue: queue,
Created: protoutil.ToTimestamp(time),
Job: *job,
Job: job,
}

queuedEvent := &api.JobQueuedEvent{
Expand Down
33 changes: 19 additions & 14 deletions internal/armada/event/conversion/conversions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestConvertSubmitted(t *testing.T) {
JobSetId: jobSetName,
Queue: queue,
Created: protoutil.ToTimestamp(baseTime),
Job: api.Job{
Job: &api.Job{
Id: jobIdString,
JobSetId: jobSetName,
Queue: queue,
Expand All @@ -99,7 +99,7 @@ func TestConvertSubmitted(t *testing.T) {
},
},
},
SchedulingResourceRequirements: v1.ResourceRequirements{
SchedulingResourceRequirements: &v1.ResourceRequirements{
Requests: make(v1.ResourceList),
Limits: make(v1.ResourceList),
},
Expand Down Expand Up @@ -795,13 +795,13 @@ func TestConvertResourceUtilisation(t *testing.T) {
},
},
},
MaxResourcesForPeriod: map[string]resource.Quantity{
"cpu": resource.MustParse("2.0"),
"mem": resource.MustParse("100Gi"),
MaxResourcesForPeriod: map[string]*resource.Quantity{
"cpu": resourcePointer("2.0"),
"mem": resourcePointer("100Gi"),
},
TotalCumulativeUsage: map[string]resource.Quantity{
"cpu": resource.MustParse("3.0"),
"mem": resource.MustParse("200Gi"),
TotalCumulativeUsage: map[string]*resource.Quantity{
"cpu": resourcePointer("3.0"),
"mem": resourcePointer("200Gi"),
},
},
},
Expand All @@ -817,17 +817,17 @@ func TestConvertResourceUtilisation(t *testing.T) {
Created: protoutil.ToTimestamp(baseTime),
ClusterId: executorId,
KubernetesId: runIdString,
MaxResourcesForPeriod: map[string]resource.Quantity{
"cpu": resource.MustParse("2.0"),
"mem": resource.MustParse("100Gi"),
MaxResourcesForPeriod: map[string]*resource.Quantity{
"cpu": resourcePointer("2.0"),
"mem": resourcePointer("100Gi"),
},
NodeName: nodeName,
PodNumber: podNumber,
PodName: podName,
PodNamespace: namespace,
TotalCumulativeUsage: map[string]resource.Quantity{
"cpu": resource.MustParse("3.0"),
"mem": resource.MustParse("200Gi"),
TotalCumulativeUsage: map[string]*resource.Quantity{
"cpu": resourcePointer("3.0"),
"mem": resourcePointer("200Gi"),
},
},
},
Expand Down Expand Up @@ -982,3 +982,8 @@ func toEventSeq(event ...*armadaevents.EventSequence_Event) *armadaevents.EventS
UserId: userId,
}
}

func resourcePointer(s string) *resource.Quantity {
r := resource.MustParse(s)
return &r
}
29 changes: 17 additions & 12 deletions internal/armada/queue/queue_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,22 @@ import (

"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/database/lookout"
"github.com/armadaproject/armada/pkg/api"
"github.com/armadaproject/armada/pkg/client/queue"
)

var (
queueA = queue.Queue{
Name: "queueA",
PriorityFactor: 1000,
Permissions: []queue.Permissions{},
Name: "queueA",
PriorityFactor: 1000,
Permissions: []queue.Permissions{},
ResourceLimitsByPriorityClassName: map[string]api.PriorityClassResourceLimits{},
}
queueB = queue.Queue{
Name: "queueB",
PriorityFactor: 2000,
Permissions: []queue.Permissions{},
Name: "queueB",
PriorityFactor: 2000,
Permissions: []queue.Permissions{},
ResourceLimitsByPriorityClassName: map[string]api.PriorityClassResourceLimits{},
}
twoQueues = []queue.Queue{queueA, queueB}
)
Expand Down Expand Up @@ -111,17 +114,19 @@ func TestGetAndUpdateQueue(t *testing.T) {
"Queue Doesn't Exist": {
intialQueues: twoQueues,
queueToUpdate: queue.Queue{
Name: "queueC",
PriorityFactor: 1,
Permissions: []queue.Permissions{},
Name: "queueC",
Permissions: []queue.Permissions{},
PriorityFactor: 1,
ResourceLimitsByPriorityClassName: map[string]api.PriorityClassResourceLimits{},
},
},
"Queue Does Exist": {
intialQueues: twoQueues,
queueToUpdate: queue.Queue{
Name: "queueA",
PriorityFactor: queueA.PriorityFactor + 100,
Permissions: []queue.Permissions{},
Name: "queueA",
PriorityFactor: queueA.PriorityFactor + 100,
Permissions: []queue.Permissions{},
ResourceLimitsByPriorityClassName: map[string]api.PriorityClassResourceLimits{},
},
},
}
Expand Down
46 changes: 0 additions & 46 deletions internal/common/database/migrations.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package database

import (
"bytes"
"io/fs"
"path"
"sort"
"strconv"
"strings"

stakikfs "github.com/rakyll/statik/fs"

"github.com/armadaproject/armada/internal/common/armadacontext"
)

Expand Down Expand Up @@ -120,46 +117,3 @@ func ReadMigrations(fsys fs.FS, basePath string) ([]Migration, error) {
}
return migrations, nil
}

// TODO: remove this when we've migrated over to iofs
func ReadMigrationsFromStatik(namespace string) ([]Migration, error) {
vfs, err := stakikfs.NewWithNamespace(namespace)
if err != nil {
return nil, err
}

dir, err := vfs.Open("/")
if err != nil {
return nil, err
}

files, err := dir.Readdir(-1)
if err != nil {
return nil, err
}

sort.Slice(files, func(i, j int) bool { return files[i].Name() < files[j].Name() })

var migrations []Migration
for _, f := range files {
file, err := vfs.Open("/" + f.Name())
if err != nil {
return nil, err
}
buf := new(bytes.Buffer)
_, err = buf.ReadFrom(file)
if err != nil {
return nil, err
}
id, err := strconv.Atoi(strings.Split(f.Name(), "_")[0])
if err != nil {
return nil, err
}
migrations = append(migrations, Migration{
id: id,
name: f.Name(),
sql: buf.String(),
})
}
return migrations, nil
}
2 changes: 1 addition & 1 deletion internal/common/eventutil/eventutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func ApiJobFromLogSubmitJob(ownerId string, groups []string, queueName string, j

PodSpec: podSpec,
PodSpecs: podSpecs,
SchedulingResourceRequirements: schedulingResourceRequirements,
SchedulingResourceRequirements: &schedulingResourceRequirements,

Created: protoutil.ToTimestamp(time),
Owner: ownerId,
Expand Down
19 changes: 19 additions & 0 deletions internal/common/resource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,25 @@ func FromResourceList(list v1.ResourceList) ComputeResources {
return resources
}

func FromProtoMap(m map[string]*resource.Quantity) ComputeResources {
resources := make(ComputeResources, len(m))
for k, v := range m {
if v != nil {
resources[k] = *v
}
}
return resources
}

func (a ComputeResources) ToProtoMap() map[string]*resource.Quantity {
resources := make(map[string]*resource.Quantity, len(a))
for k, v := range a {
r := v.DeepCopy()
resources[k] = &r
}
return resources
}

// QuantityAsFloat64 returns a float64 representation of a quantity.
// We need our own function because q.AsApproximateFloat64 sometimes returns surprising results.
// For example, resource.MustParse("5188205838208Ki").AsApproximateFloat64() returns 0.004291583283300088,
Expand Down
33 changes: 33 additions & 0 deletions internal/common/resource/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,39 @@ func TestTotalResourceRequest_ShouldCombineMaxInitContainerResourcesWithSummedCo
assert.Equal(t, result, FromResourceList(expectedResult))
}

func TestToProtoMap(t *testing.T) {
oneCpu := resource.MustParse("1")
oneGi := resource.MustParse("1Gi")

input := ComputeResources{
"cpu": resource.MustParse("1"),
"memory": resource.MustParse("1Gi"),
}
expected := map[string]*resource.Quantity{
"cpu": &oneCpu,
"memory": &oneGi,
}
actual := input.ToProtoMap()
assert.Equal(t, actual, expected)
}

func TestFromProtoMap(t *testing.T) {
oneCpu := resource.MustParse("1")
oneGi := resource.MustParse("1Gi")

input := map[string]*resource.Quantity{
"cpu": &oneCpu,
"memory": &oneGi,
}
expected := ComputeResources{
"cpu": resource.MustParse("1"),
"memory": resource.MustParse("1Gi"),
}

actual := FromProtoMap(input)
assert.Equal(t, actual, expected)
}

func makeDefaultNodeResource() v1.ResourceList {
cpuResource := resource.NewQuantity(100, resource.DecimalSI)
memoryResource := resource.NewQuantity(50*1024*1024*1024, resource.DecimalSI)
Expand Down
5 changes: 0 additions & 5 deletions internal/executor/domain/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,6 @@ package domain

import armadaresource "github.com/armadaproject/armada/internal/common/resource"

const (
AcceleratorDutyCycle = "armadaproject.io/accelerator-duty-cycle"
AcceleratorMemory = "armadaproject.io/accelerator-memory"
)

type UtilisationData struct {
CurrentUsage armadaresource.ComputeResources
CumulativeUsage armadaresource.ComputeResources
Expand Down
4 changes: 2 additions & 2 deletions internal/executor/reporter/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,8 @@ func CreateJobUtilisationEvent(pod *v1.Pod, utilisationData *domain.UtilisationD
},
},
},
MaxResourcesForPeriod: utilisationData.CurrentUsage,
TotalCumulativeUsage: utilisationData.CumulativeUsage,
MaxResourcesForPeriod: utilisationData.CurrentUsage.ToProtoMap(),
TotalCumulativeUsage: utilisationData.CumulativeUsage.ToProtoMap(),
},
},
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ func TestUtilisationEventReporter_ReportUtilisationEvents(t *testing.T) {
_, ok = fakeEventReporter.ReceivedEvents[1].Event.Events[0].Event.(*armadaevents.EventSequence_Event_ResourceUtilisation)
assert.True(t, ok)

assert.Equal(t, testPodResources.CurrentUsage, armadaresource.ComputeResources(event1.ResourceUtilisation.MaxResourcesForPeriod))
assert.Equal(t, testPodResources.CumulativeUsage, armadaresource.ComputeResources(event1.ResourceUtilisation.TotalCumulativeUsage))
assert.Equal(t, testPodResources.CurrentUsage, armadaresource.FromProtoMap(event1.ResourceUtilisation.MaxResourcesForPeriod))
assert.Equal(t, testPodResources.CumulativeUsage, armadaresource.FromProtoMap(event1.ResourceUtilisation.TotalCumulativeUsage))

event1CreatedTime := fakeEventReporter.ReceivedEvents[0].Event.Events[0].Created
event2CreatedTime := fakeEventReporter.ReceivedEvents[1].Event.Events[0].Created
Expand Down
16 changes: 8 additions & 8 deletions internal/scheduler/constraints/constraints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestConstraints(t *testing.T) {
makeResourceList("1000", "1000Gi"),
makeResourceList("0", "0"),
makeSchedulingConfig(),
[]*api.Queue{{Name: "queue-1", ResourceLimitsByPriorityClassName: map[string]api.PriorityClassResourceLimits{}}},
[]*api.Queue{{Name: "queue-1", ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{}}},
)),
"within-constraints": makeConstraintsTest(NewSchedulingConstraints(
"pool-1",
Expand All @@ -51,7 +51,7 @@ func TestConstraints(t *testing.T) {
MaxQueueLookback: 1000,
PriorityClasses: map[string]types.PriorityClass{"priority-class-1": {MaximumResourceFractionPerQueueByPool: map[string]map[string]float64{"pool-1": {"cpu": 0.9, "memory": 0.9}}}},
},
[]*api.Queue{{Name: "queue-1", ResourceLimitsByPriorityClassName: map[string]api.PriorityClassResourceLimits{"priority-class-1": {MaximumResourceFraction: map[string]float64{"cpu": 0.9, "memory": 0.9}}}}},
[]*api.Queue{{Name: "queue-1", ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{"priority-class-1": {MaximumResourceFraction: map[string]float64{"cpu": 0.9, "memory": 0.9}}}}},
)),
"exceeds-queue-priority-class-constraint": func() *constraintTest {
t := makeConstraintsTest(NewSchedulingConstraints(
Expand All @@ -62,7 +62,7 @@ func TestConstraints(t *testing.T) {
[]*api.Queue{
{
Name: "queue-1",
ResourceLimitsByPriorityClassName: map[string]api.PriorityClassResourceLimits{
ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{
"priority-class-1": {
MaximumResourceFraction: map[string]float64{"cpu": 0.000001, "memory": 0.9},
},
Expand All @@ -82,9 +82,9 @@ func TestConstraints(t *testing.T) {
[]*api.Queue{
{
Name: "queue-1",
ResourceLimitsByPriorityClassName: map[string]api.PriorityClassResourceLimits{
ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{
"priority-class-1": {
MaximumResourceFractionByPool: map[string]api.PriorityClassPoolResourceLimits{
MaximumResourceFractionByPool: map[string]*api.PriorityClassPoolResourceLimits{
"pool-1": {
MaximumResourceFraction: map[string]float64{"cpu": 0.000001, "memory": 0.9},
},
Expand Down Expand Up @@ -121,7 +121,7 @@ func TestConstraints(t *testing.T) {
MaxQueueLookback: 1000,
PriorityClasses: map[string]types.PriorityClass{"priority-class-1": {MaximumResourceFractionPerQueueByPool: map[string]map[string]float64{"pool-1": {"cpu": 0.00000001, "memory": 0.9}}}},
},
[]*api.Queue{{Name: "queue-1", ResourceLimitsByPriorityClassName: map[string]api.PriorityClassResourceLimits{"priority-class-1": {MaximumResourceFraction: map[string]float64{"cpu": 0.9, "memory": 0.9}}}}},
[]*api.Queue{{Name: "queue-1", ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{"priority-class-1": {MaximumResourceFraction: map[string]float64{"cpu": 0.9, "memory": 0.9}}}}},
)),
"one-constraint-per-level-falls-back-as-expected--within-limits": makeMultiLevelConstraintsTest(
map[string]resource.Quantity{"a": resource.MustParse("99"), "b": resource.MustParse("19"), "c": resource.MustParse("2.9"), "d": resource.MustParse("0.39")},
Expand Down Expand Up @@ -251,10 +251,10 @@ func makeMultiLevelConstraints() SchedulingConstraints {
[]*api.Queue{
{
Name: "queue-1",
ResourceLimitsByPriorityClassName: map[string]api.PriorityClassResourceLimits{
ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{
"priority-class-1": {
MaximumResourceFraction: map[string]float64{"a": 0.01, "b": 0.02},
MaximumResourceFractionByPool: map[string]api.PriorityClassPoolResourceLimits{
MaximumResourceFractionByPool: map[string]*api.PriorityClassPoolResourceLimits{
"pool-1": {
MaximumResourceFraction: map[string]float64{"a": 0.1},
},
Expand Down
4 changes: 2 additions & 2 deletions internal/scheduler/queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,9 @@ func TestQueueScheduler(t *testing.T) {
{
Name: "A",
PriorityFactor: 1.0,
ResourceLimitsByPriorityClassName: map[string]api.PriorityClassResourceLimits{
ResourceLimitsByPriorityClassName: map[string]*api.PriorityClassResourceLimits{
testfixtures.PriorityClass0: {
MaximumResourceFractionByPool: map[string]api.PriorityClassPoolResourceLimits{
MaximumResourceFractionByPool: map[string]*api.PriorityClassPoolResourceLimits{
"pool": {
MaximumResourceFraction: map[string]float64{"cpu": 0.5, "memory": 1.0},
},
Expand Down
Loading

0 comments on commit 7929de7

Please sign in to comment.