Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
d80tb7 committed Dec 2, 2024
1 parent 4728f78 commit 427b0b9
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 10 deletions.
21 changes: 14 additions & 7 deletions internal/scheduler/jobdb/comparison.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,23 +113,30 @@ func MarketSchedulingOrderCompare(job, other *Job) int {
return 1
}

// Next put a job with an active run ahead of queued jobs
jobIsActive := job.activeRun != nil && !job.activeRun.InTerminalState()
otherIsActive := other.activeRun != nil && !other.activeRun.InTerminalState()
if jobIsActive && !otherIsActive {
// PriorityClassPriority indicates urgency.
// Hence, jobs of higher priorityClassPriority come first.
if job.priorityClass.Priority > other.priorityClass.Priority {
return -1
} else if job.priorityClass.Priority < other.priorityClass.Priority {
return 1
}
if !jobIsActive && otherIsActive {

// Jobs higher in queue-priority come first.
if job.priority < other.priority {
return -1
} else if job.priority > other.priority {
return 1
}

// If both jobs are active, order by time since the job was scheduled.
// This ensures jobs that have been running for longer are rescheduled first,
// which reduces wasted compute time when preempting.
jobIsActive := job.activeRun != nil && !job.activeRun.InTerminalState()
otherIsActive := other.activeRun != nil && !other.activeRun.InTerminalState()
if jobIsActive && otherIsActive {
if job.activeRunTimestamp > other.activeRunTimestamp {
if job.activeRunTimestamp < other.activeRunTimestamp {
return -1
} else if job.activeRunTimestamp < other.activeRunTimestamp {
} else if job.activeRunTimestamp > other.activeRunTimestamp {
return 1
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,9 @@ func TestMarketDrivenPreemptingQueueScheduler(t *testing.T) {
},
"gang preemption": {
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Nodes: testfixtures.N32CpuNodes(2, testfixtures.TestPriorities),
Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities),
Rounds: []SchedulingRound{
{
// Fill half of node 1 and half of node 2.
JobsByQueue: map[string][]*jobdb.Job{
"A": testfixtures.N1Cpu4GiJobsWithPrice("A", 100.0, 16),
"B": testfixtures.N1Cpu4GiJobsWithPrice("B", 100.0, 16),
Expand All @@ -276,12 +275,20 @@ func TestMarketDrivenPreemptingQueueScheduler(t *testing.T) {
ExpectedScheduledIndices: map[string][]int{
"C": testfixtures.IntRange(0, 31),
},
ExpectedPreemptedIndices: map[string]map[int][]int{
"A": {
0: testfixtures.IntRange(0, 15),
},
"B": {
0: testfixtures.IntRange(0, 15),
},
},
},
{
// Schedule jobs that requires preempting one job in the gang,
// and assert that all jobs in the gang are preempted.
JobsByQueue: map[string][]*jobdb.Job{
"A": testfixtures.N1Cpu4GiJobsWithPrice("A", 101.0, 17),
"A": testfixtures.N1Cpu4GiJobsWithPrice("A", 102.0, 17),
},
ExpectedScheduledIndices: map[string][]int{
"A": testfixtures.IntRange(0, 16),
Expand Down

0 comments on commit 427b0b9

Please sign in to comment.