Skip to content

Commit

Permalink
Merge branch 'master' into unit_test_limits
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesMurkin committed Jul 8, 2024
2 parents 787f4b5 + d45b7c2 commit 992bb98
Show file tree
Hide file tree
Showing 141 changed files with 4,908 additions and 6,748 deletions.
1 change: 1 addition & 0 deletions config/executor/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ kubernetes:
Burst: 10000
nodeIdLabel: kubernetes.io/hostname
nodeTypeLabel: armadaproject.io/node-type
nodePoolLabel: armadaproject.io/pool
minimumPodAge: 3m
failedPodExpiry: 10m
maxTerminatedPods: 1000 # Should be lower than kube-controller-managed terminated-pod-gc-threshold (default 12500)
Expand Down
13 changes: 1 addition & 12 deletions config/scheduler/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,4 @@ scheduling:
maxUnacknowledgedJobsPerExecutor: 2500
alwaysAttemptScheduling: false
executorUpdateFrequency: "60s"
failureProbabilityEstimation:
# Optimised default parameters.
numInnerIterations: 10
innerOptimiserStepSize: 0.05
outerOptimiserStepSize: 0.05
outerOptimiserNesterovAcceleration: 0.2
nodeQuarantining:
failureProbabilityQuarantineThreshold: 0.95
failureProbabilityEstimateTimeout: "10m"
queueQuarantining:
quarantineFactorMultiplier: 0.5 # At most halve the scheduling rate of misbehaving queues.
failureProbabilityEstimateTimeout: "10m"

1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ require (
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.17.0
github.com/rakyll/statik v0.1.7
github.com/renstrom/shortuuid v3.0.0+incompatible
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.8.0
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -472,8 +472,6 @@ github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lne
github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/rakyll/statik v0.1.7 h1:OF3QCZUuyPxuGEP7B4ypUa7sB/iHtqOTDYZXGM8KOdQ=
github.com/rakyll/statik v0.1.7/go.mod h1:AlZONWzMtEnMs7W4e/1LURLiI49pIMmp6V9Unghqrcc=
github.com/redis/go-redis/extra/redisprometheus/v9 v9.0.5 h1:kvl0LOTQD23VR1R7A9vDti9msfV6mOE2+j6ngYkFsfg=
github.com/redis/go-redis/extra/redisprometheus/v9 v9.0.5/go.mod h1:VhyLk7MdSTKbJCx6+wXlj3/ebh49gTq3yBiXymYrG7w=
github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8=
Expand Down
82 changes: 42 additions & 40 deletions internal/armada/event/conversion/conversions.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
log "github.com/sirupsen/logrus"

"github.com/armadaproject/armada/internal/common/eventutil"
protoutil "github.com/armadaproject/armada/internal/common/proto"
"github.com/armadaproject/armada/pkg/api"
"github.com/armadaproject/armada/pkg/armadaevents"
)
Expand All @@ -20,37 +21,38 @@ func FromEventSequence(es *armadaevents.EventSequence) ([]*api.EventMessage, err

for _, event := range es.Events {
var convertedEvents []*api.EventMessage = nil
eventTs := protoutil.ToStdTime(event.Created)
switch esEvent := event.GetEvent().(type) {
case *armadaevents.EventSequence_Event_SubmitJob:
convertedEvents, err = FromInternalSubmit(es.UserId, es.Groups, es.Queue, es.JobSetName, *event.Created, esEvent.SubmitJob)
convertedEvents, err = FromInternalSubmit(es.UserId, es.Groups, es.Queue, es.JobSetName, eventTs, esEvent.SubmitJob)
case *armadaevents.EventSequence_Event_CancelledJob:
convertedEvents, err = FromInternalCancelled(es.UserId, es.Queue, es.JobSetName, *event.Created, esEvent.CancelledJob)
convertedEvents, err = FromInternalCancelled(es.UserId, es.Queue, es.JobSetName, eventTs, esEvent.CancelledJob)
case *armadaevents.EventSequence_Event_CancelJob:
convertedEvents, err = FromInternalCancel(es.UserId, es.Queue, es.JobSetName, *event.Created, esEvent.CancelJob)
convertedEvents, err = FromInternalCancel(es.UserId, es.Queue, es.JobSetName, eventTs, esEvent.CancelJob)
case *armadaevents.EventSequence_Event_JobPreemptionRequested:
convertedEvents, err = FromInternalPreemptionRequested(es.UserId, es.Queue, es.JobSetName, *event.Created, esEvent.JobPreemptionRequested)
convertedEvents, err = FromInternalPreemptionRequested(es.UserId, es.Queue, es.JobSetName, eventTs, esEvent.JobPreemptionRequested)
case *armadaevents.EventSequence_Event_ReprioritiseJob:
convertedEvents, err = FromInternalReprioritiseJob(es.UserId, es.Queue, es.JobSetName, *event.Created, esEvent.ReprioritiseJob)
convertedEvents, err = FromInternalReprioritiseJob(es.UserId, es.Queue, es.JobSetName, eventTs, esEvent.ReprioritiseJob)
case *armadaevents.EventSequence_Event_ReprioritisedJob:
convertedEvents, err = FromInternalReprioritisedJob(es.UserId, es.Queue, es.JobSetName, *event.Created, esEvent.ReprioritisedJob)
convertedEvents, err = FromInternalReprioritisedJob(es.UserId, es.Queue, es.JobSetName, eventTs, esEvent.ReprioritisedJob)
case *armadaevents.EventSequence_Event_JobRunLeased:
convertedEvents, err = FromInternalLogJobRunLeased(es.Queue, es.JobSetName, *event.Created, esEvent.JobRunLeased)
convertedEvents, err = FromInternalLogJobRunLeased(es.Queue, es.JobSetName, eventTs, esEvent.JobRunLeased)
case *armadaevents.EventSequence_Event_JobRunErrors:
convertedEvents, err = FromInternalJobRunErrors(es.Queue, es.JobSetName, *event.Created, esEvent.JobRunErrors)
convertedEvents, err = FromInternalJobRunErrors(es.Queue, es.JobSetName, eventTs, esEvent.JobRunErrors)
case *armadaevents.EventSequence_Event_JobSucceeded:
convertedEvents, err = FromInternalJobSucceeded(es.Queue, es.JobSetName, *event.Created, esEvent.JobSucceeded)
convertedEvents, err = FromInternalJobSucceeded(es.Queue, es.JobSetName, eventTs, esEvent.JobSucceeded)
case *armadaevents.EventSequence_Event_JobErrors:
convertedEvents, err = FromInternalJobErrors(es.Queue, es.JobSetName, *event.Created, esEvent.JobErrors)
convertedEvents, err = FromInternalJobErrors(es.Queue, es.JobSetName, eventTs, esEvent.JobErrors)
case *armadaevents.EventSequence_Event_JobRunRunning:
convertedEvents, err = FromInternalJobRunRunning(es.Queue, es.JobSetName, *event.Created, esEvent.JobRunRunning)
convertedEvents, err = FromInternalJobRunRunning(es.Queue, es.JobSetName, eventTs, esEvent.JobRunRunning)
case *armadaevents.EventSequence_Event_JobRunAssigned:
convertedEvents, err = FromInternalJobRunAssigned(es.Queue, es.JobSetName, *event.Created, esEvent.JobRunAssigned)
convertedEvents, err = FromInternalJobRunAssigned(es.Queue, es.JobSetName, eventTs, esEvent.JobRunAssigned)
case *armadaevents.EventSequence_Event_ResourceUtilisation:
convertedEvents, err = FromInternalResourceUtilisation(es.Queue, es.JobSetName, *event.Created, esEvent.ResourceUtilisation)
convertedEvents, err = FromInternalResourceUtilisation(es.Queue, es.JobSetName, eventTs, esEvent.ResourceUtilisation)
case *armadaevents.EventSequence_Event_StandaloneIngressInfo:
convertedEvents, err = FromInternalStandaloneIngressInfo(es.Queue, es.JobSetName, *event.Created, esEvent.StandaloneIngressInfo)
convertedEvents, err = FromInternalStandaloneIngressInfo(es.Queue, es.JobSetName, eventTs, esEvent.StandaloneIngressInfo)
case *armadaevents.EventSequence_Event_JobRunPreempted:
convertedEvents, err = FromInternalJobRunPreempted(es.Queue, es.JobSetName, *event.Created, esEvent.JobRunPreempted)
convertedEvents, err = FromInternalJobRunPreempted(es.Queue, es.JobSetName, eventTs, esEvent.JobRunPreempted)
case *armadaevents.EventSequence_Event_ReprioritiseJobSet,
*armadaevents.EventSequence_Event_CancelJobSet,
*armadaevents.EventSequence_Event_JobRunSucceeded,
Expand Down Expand Up @@ -87,15 +89,15 @@ func FromInternalSubmit(owner string, groups []string, queue string, jobSet stri
JobId: jobId,
JobSetId: jobSet,
Queue: queue,
Created: time,
Job: *job,
Created: protoutil.ToTimestamp(time),
Job: job,
}

queuedEvent := &api.JobQueuedEvent{
JobId: jobId,
JobSetId: jobSet,
Queue: queue,
Created: time,
Created: protoutil.ToTimestamp(time),
}

return []*api.EventMessage{
Expand Down Expand Up @@ -125,7 +127,7 @@ func FromInternalPreemptionRequested(userId string, queueName string, jobSetName
JobId: jobId,
JobSetId: jobSetName,
Queue: queueName,
Created: time,
Created: protoutil.ToTimestamp(time),
Requestor: userId,
},
},
Expand All @@ -146,7 +148,7 @@ func FromInternalCancel(userId string, queueName string, jobSetName string, time
JobId: jobId,
JobSetId: jobSetName,
Queue: queueName,
Created: time,
Created: protoutil.ToTimestamp(time),
Requestor: userId,
},
},
Expand All @@ -167,7 +169,7 @@ func FromInternalCancelled(userId string, queueName string, jobSetName string, t
JobId: jobId,
JobSetId: jobSetName,
Queue: queueName,
Created: time,
Created: protoutil.ToTimestamp(time),
Requestor: userId,
},
},
Expand All @@ -187,7 +189,7 @@ func FromInternalReprioritiseJob(userId string, queueName string, jobSetName str
JobId: jobId,
JobSetId: jobSetName,
Queue: queueName,
Created: time,
Created: protoutil.ToTimestamp(time),
NewPriority: float64(e.Priority),
Requestor: userId,
},
Expand All @@ -208,7 +210,7 @@ func FromInternalReprioritisedJob(userId string, queueName string, jobSetName st
JobId: jobId,
JobSetId: jobSetName,
Queue: queueName,
Created: time,
Created: protoutil.ToTimestamp(time),
NewPriority: float64(e.Priority),
Requestor: userId,
},
Expand All @@ -229,7 +231,7 @@ func FromInternalLogJobRunLeased(queueName string, jobSetName string, time time.
JobId: jobId,
JobSetId: jobSetName,
Queue: queueName,
Created: time,
Created: protoutil.ToTimestamp(time),
ClusterId: e.ExecutorId,
},
},
Expand All @@ -247,7 +249,7 @@ func FromInternalJobSucceeded(queueName string, jobSetName string, time time.Tim
JobId: jobId,
JobSetId: jobSetName,
Queue: queueName,
Created: time,
Created: protoutil.ToTimestamp(time),
}

if len(e.ResourceInfos) > 0 {
Expand Down Expand Up @@ -285,7 +287,7 @@ func FromInternalJobRunErrors(queueName string, jobSetName string, time time.Tim
JobId: jobId,
JobSetId: jobSetName,
Queue: queueName,
Created: time,
Created: protoutil.ToTimestamp(time),
},
},
}
Expand All @@ -305,7 +307,7 @@ func FromInternalJobRunErrors(queueName string, jobSetName string, time time.Tim
PodNumber: reason.PodUnschedulable.GetPodNumber(),
JobSetId: jobSetName,
Queue: queueName,
Created: time,
Created: protoutil.ToTimestamp(time),
},
},
}
Expand All @@ -318,7 +320,7 @@ func FromInternalJobRunErrors(queueName string, jobSetName string, time time.Tim
JobId: jobId,
JobSetId: jobSetName,
Queue: queueName,
Created: time,
Created: protoutil.ToTimestamp(time),
ClusterId: objectMeta.GetExecutorId(),
Reason: reason.PodLeaseReturned.GetMessage(),
KubernetesId: objectMeta.GetKubernetesId(),
Expand All @@ -338,7 +340,7 @@ func FromInternalJobRunErrors(queueName string, jobSetName string, time time.Tim
PodNamespace: objectMeta.GetNamespace(),
PodName: objectMeta.GetName(),
Queue: queueName,
Created: time,
Created: protoutil.ToTimestamp(time),
ClusterId: objectMeta.GetExecutorId(),
Reason: reason.PodTerminated.GetMessage(),
KubernetesId: objectMeta.GetKubernetesId(),
Expand Down Expand Up @@ -379,7 +381,7 @@ func FromInternalJobErrors(queueName string, jobSetName string, time time.Time,
JobId: jobId,
JobSetId: jobSetName,
Queue: queueName,
Created: time,
Created: protoutil.ToTimestamp(time),
Reason: "preempted",
},
},
Expand All @@ -392,7 +394,7 @@ func FromInternalJobErrors(queueName string, jobSetName string, time time.Time,
JobId: jobId,
JobSetId: jobSetName,
Queue: queueName,
Created: time,
Created: protoutil.ToTimestamp(time),
Reason: reason.MaxRunsExceeded.Message,
},
},
Expand All @@ -405,7 +407,7 @@ func FromInternalJobErrors(queueName string, jobSetName string, time time.Time,
JobId: jobId,
JobSetId: jobSetName,
Queue: queueName,
Created: time,
Created: protoutil.ToTimestamp(time),
Reason: reason.GangJobUnschedulable.Message,
},
},
Expand All @@ -418,7 +420,7 @@ func FromInternalJobErrors(queueName string, jobSetName string, time time.Time,
JobId: jobId,
JobSetId: jobSetName,
Queue: queueName,
Created: time,
Created: protoutil.ToTimestamp(time),
Reason: reason.JobRejected.Message,
Cause: api.Cause_Rejected,
},
Expand All @@ -433,7 +435,7 @@ func FromInternalJobErrors(queueName string, jobSetName string, time time.Time,
JobId: jobId,
JobSetId: jobSetName,
Queue: queueName,
Created: time,
Created: protoutil.ToTimestamp(time),
},
},
}
Expand All @@ -453,7 +455,7 @@ func FromInternalJobRunRunning(queueName string, jobSetName string, time time.Ti
JobId: jobId,
JobSetId: jobSetName,
Queue: queueName,
Created: time,
Created: protoutil.ToTimestamp(time),
}

if len(e.ResourceInfos) > 0 {
Expand Down Expand Up @@ -485,7 +487,7 @@ func FromInternalJobRunAssigned(queueName string, jobSetName string, time time.T
JobId: jobId,
JobSetId: jobSetName,
Queue: queueName,
Created: time,
Created: protoutil.ToTimestamp(time),
}

if len(e.ResourceInfos) > 0 {
Expand Down Expand Up @@ -540,7 +542,7 @@ func FromInternalJobRunPreempted(queueName string, jobSetName string, time time.
JobId: jobId,
JobSetId: jobSetName,
Queue: queueName,
Created: time,
Created: protoutil.ToTimestamp(time),
RunId: runId,
PreemptiveJobId: preemptiveJobId,
PreemptiveRunId: preemptiveRunId,
Expand All @@ -565,7 +567,7 @@ func FromInternalResourceUtilisation(queueName string, jobSetName string, time t
JobId: jobId,
JobSetId: jobSetName,
Queue: queueName,
Created: time,
Created: protoutil.ToTimestamp(time),
ClusterId: e.GetResourceInfo().GetObjectMeta().GetExecutorId(),
KubernetesId: e.GetResourceInfo().GetObjectMeta().GetKubernetesId(),
MaxResourcesForPeriod: e.MaxResourcesForPeriod,
Expand Down Expand Up @@ -595,7 +597,7 @@ func FromInternalStandaloneIngressInfo(queueName string, jobSetName string, time
JobId: jobId,
JobSetId: jobSetName,
Queue: queueName,
Created: time,
Created: protoutil.ToTimestamp(time),
ClusterId: e.GetObjectMeta().GetExecutorId(),
KubernetesId: e.GetObjectMeta().GetKubernetesId(),
NodeName: e.GetNodeName(),
Expand All @@ -620,7 +622,7 @@ func makeJobFailed(jobId string, queueName string, jobSetName string, time time.
JobId: jobId,
JobSetId: jobSetName,
Queue: queueName,
Created: time,
Created: protoutil.ToTimestamp(time),
ClusterId: podError.GetObjectMeta().GetExecutorId(),
PodNamespace: podError.GetObjectMeta().GetNamespace(),
KubernetesId: podError.GetObjectMeta().GetKubernetesId(),
Expand Down
Loading

0 comments on commit 992bb98

Please sign in to comment.