Skip to content

Commit

Permalink
Merge branch 'master' into sendToGitHub/new-airflow-operator
Browse files Browse the repository at this point in the history
  • Loading branch information
MustafaI authored Jun 21, 2024
2 parents 532e131 + 5daee4a commit 0d6638d
Showing 1 changed file with 52 additions and 31 deletions.
83 changes: 52 additions & 31 deletions internal/scheduler/nodedb/nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,11 +808,17 @@ func (nodeDb *NodeDb) selectNodeForPodWithItAtPriority(
// It does this by considering all evicted jobs in the reverse order they would be scheduled in and preventing
// from being re-scheduled the jobs that would be scheduled last.
func (nodeDb *NodeDb) selectNodeForJobWithFairPreemption(txn *memdb.Txn, jctx *schedulercontext.JobSchedulingContext) (*internaltypes.Node, error) {
type consideredNode struct {
node *internaltypes.Node
availableResource internaltypes.ResourceList
evictedJobs []*EvictedJobSchedulingContext
staticRequirementsNotMet bool
}

pctx := jctx.PodSchedulingContext

var selectedNode *internaltypes.Node
nodesById := make(map[string]*internaltypes.Node)
evictedJobSchedulingContextsByNodeId := make(map[string][]*EvictedJobSchedulingContext)
nodesById := make(map[string]*consideredNode)
it, err := txn.ReverseLowerBound("evictedJobs", "index", math.MaxInt)
if err != nil {
return nil, errors.WithStack(err)
Expand All @@ -827,53 +833,68 @@ func (nodeDb *NodeDb) selectNodeForJobWithFairPreemption(txn *memdb.Txn, jctx *s
}
node, ok := nodesById[nodeId]
if !ok {
node, err = nodeDb.GetNodeWithTxn(txn, nodeId)
nodeFromDb, err := nodeDb.GetNodeWithTxn(txn, nodeId)
if err != nil {
return nil, errors.WithStack(err)
}
node = node.UnsafeCopy()
node = &consideredNode{
node: nodeFromDb,
availableResource: nodeFromDb.AllocatableByPriority[evictedPriority],
staticRequirementsNotMet: false,
evictedJobs: []*EvictedJobSchedulingContext{},
}

nodesById[nodeId] = node
}

err = nodeDb.unbindJobFromNodeInPlace(nodeDb.priorityClasses, evictedJctx.Job, node)
if err != nil {
return nil, err
if node.staticRequirementsNotMet {
continue
}
evictedJobSchedulingContextsByNodeId[nodeId] = append(evictedJobSchedulingContextsByNodeId[nodeId], evictedJobSchedulingContext)

priority, ok := nodeDb.GetScheduledAtPriority(evictedJctx.JobId)
if !ok {
priority = evictedJctx.PodRequirements.Priority
}
if priority > maxPriority {
maxPriority = priority
// Evict job, update available resource
node.availableResource = node.availableResource.Add(evictedJctx.ResourceRequirements)
node.evictedJobs = append(node.evictedJobs, evictedJobSchedulingContext)

dynamicRequirementsMet, _ := DynamicJobRequirementsMet(node.availableResource, jctx)
if !dynamicRequirementsMet {
continue
}
matches, reason, err := JobRequirementsMet(
node,
// At this point, we've unbound the jobs running on the node.
// Hence, we should check if the job is schedulable at evictedPriority,
// since that indicates the job can be scheduled without causing further preemptions.
evictedPriority,
jctx,
)

staticRequirementsMet, reason, err := StaticJobRequirementsMet(node.node, jctx)
if err != nil {
return nil, err
}
if matches {
selectedNode = node
} else {
if !staticRequirementsMet {
node.staticRequirementsNotMet = true
s := nodeDb.stringFromPodRequirementsNotMetReason(reason)
pctx.NumExcludedNodesByReason[s] += 1
continue
}
}
if selectedNode != nil {
pctx.NodeId = selectedNode.GetId()
pctx.PreemptedAtPriority = maxPriority
for _, evictedJobSchedulingContext := range evictedJobSchedulingContextsByNodeId[selectedNode.GetId()] {
if err := txn.Delete("evictedJobs", evictedJobSchedulingContext); err != nil {

nodeCopy := node.node.UnsafeCopy()
for _, job := range node.evictedJobs {
// Remove preempted job from node
err = nodeDb.unbindJobFromNodeInPlace(nodeDb.priorityClasses, job.JobSchedulingContext.Job, nodeCopy)
if err != nil {
return nil, err
}
// Remove preempted job from list of evicted jobs
if err := txn.Delete("evictedJobs", job); err != nil {
return nil, errors.WithStack(err)
}

priority, ok := nodeDb.GetScheduledAtPriority(evictedJctx.JobId)
if !ok {
priority = evictedJctx.PodRequirements.Priority
}
if priority > maxPriority {
maxPriority = priority
}
}

selectedNode = nodeCopy
pctx.NodeId = selectedNode.GetId()
pctx.PreemptedAtPriority = maxPriority
}
return selectedNode, nil
}
Expand Down

0 comments on commit 0d6638d

Please sign in to comment.