Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
d80tb7 committed Nov 29, 2024
1 parent 688d7ae commit ac33b8c
Show file tree
Hide file tree
Showing 15 changed files with 349 additions and 316 deletions.
21 changes: 0 additions & 21 deletions .run/Server.run.xml

This file was deleted.

12 changes: 6 additions & 6 deletions internal/common/eventutil/eventutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ func ApiJobFromLogSubmitJob(ownerId string, groups []string, queueName string, j
}

var priceInfo *api.ExperimentalPriceInfo
if e.PriceInfo != nil {
if e.ExperimentalPriceInfo != nil {
priceInfo = &api.ExperimentalPriceInfo{
BidPrice: e.PriceInfo.BidPrice,
BidPrice: e.ExperimentalPriceInfo.BidPrice,
}
}

Expand All @@ -177,10 +177,10 @@ func ApiJobFromLogSubmitJob(ownerId string, groups []string, queueName string, j
Labels: e.ObjectMeta.Labels,
Annotations: e.ObjectMeta.Annotations,

K8SIngress: k8sIngresses,
K8SService: k8sServices,
PriceInfo: priceInfo,
Priority: float64(e.Priority),
K8SIngress: k8sIngresses,
K8SService: k8sServices,
ExperimentalPriceInfo: priceInfo,
Priority: float64(e.Priority),

PodSpec: podSpec,
PodSpecs: podSpecs,
Expand Down
4 changes: 2 additions & 2 deletions internal/scheduler/jobdb/comparison.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ func MarketSchedulingOrderCompare(job, other *Job) int {
return 1
}

// Next we sort on price
if job.price < other.price {
// Next we sort on bidPrice
if job.bidPrice < other.bidPrice {
return -1
} else if job.priority > other.priority {
return 1
Expand Down
16 changes: 8 additions & 8 deletions internal/scheduler/jobdb/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ type Job struct {
jobSet string
// Per-queue priority of this job.
priority uint32
// Price the user us willing to pay to have this job scheduled
price float64
// BidPrice the user is willing to pay to have this job scheduled
bidPrice float64
// Requested per queue priority of this job.
// This is used when syncing the postgres database with the scheduler-internal database.
requestedPriority uint32
Expand Down Expand Up @@ -302,7 +302,7 @@ func (job *Job) Equal(other *Job) bool {
if job.priority != other.priority {
return false
}
if job.price != other.price {
if job.bidPrice != other.bidPrice {
return false
}
if job.requestedPriority != other.requestedPriority {
Expand Down Expand Up @@ -383,9 +383,9 @@ func (job *Job) Priority() uint32 {
return job.priority
}

// Price returns the price of the job.
func (job *Job) Price() float64 {
return job.price
// BidPrice returns the bidPrice of the job.
func (job *Job) BidPrice() float64 {
return job.bidPrice
}

// PriorityClass returns the priority class of the job.
Expand Down Expand Up @@ -423,10 +423,10 @@ func (job *Job) WithPriority(priority uint32) *Job {
return j
}

// WithPrice returns a copy of the job with the price updated.
// WithPrice returns a copy of the job with the bidPrice updated.
func (job *Job) WithPrice(price float64) *Job {
j := copyJob(*job)
j.price = price
j.bidPrice = price
return j
}

Expand Down
3 changes: 2 additions & 1 deletion internal/scheduler/jobdb/jobdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func (jq jobQueue) delete(j *Job) jobQueue {
marketQueue: jq.marketQueue.Delete(j),
}
}

func (jq jobQueue) has(j *Job) bool {
return jq.fairShareQueue.Has(j)
}
Expand Down Expand Up @@ -199,7 +200,7 @@ func (jobDb *JobDb) NewJob(
queue: jobDb.stringInterner.Intern(queue),
jobSet: jobDb.stringInterner.Intern(jobSet),
priority: priority,
price: price,
bidPrice: price,
queued: queued,
queuedVersion: queuedVersion,
requestedPriority: priority,
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1790,7 +1790,7 @@ func jobDbJobFromDbJob(resourceListFactory *internaltypes.ResourceListFactory, j
job.JobSet,
job.Queue,
uint32(job.Priority),
job.Price,
job.BidPrice,
&schedulingInfo,
job.Queued,
job.QueuedVersion,
Expand Down
8 changes: 4 additions & 4 deletions internal/scheduler/scheduling/marketPriorityQueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package scheduling

import (
"container/heap"
"time"

"github.com/armadaproject/armada/internal/scheduler/internaltypes"
schedulercontext "github.com/armadaproject/armada/internal/scheduler/scheduling/context"
"github.com/armadaproject/armada/internal/scheduler/scheduling/fairness"
"time"
)

type MarketIteratorPQ struct {
Expand Down Expand Up @@ -54,7 +55,7 @@ func (it *MarketBasedCandidateGangIterator) newPQItem(queue string, queueIt *Que
}

func (it *MarketBasedCandidateGangIterator) GetAllocationForQueue(queue string) (internaltypes.ResourceList, bool) {
//TODO implement me
// TODO implement me
panic("implement me")
}

Expand Down Expand Up @@ -130,7 +131,7 @@ func (it *MarketBasedCandidateGangIterator) updatePQItem(item *MarketIteratorPQI

job := gctx.JobSchedulingContexts[0].Job
item.gctx = gctx
item.price = job.Price()
item.price = job.BidPrice()
if !job.Queued() && job.LatestRun() != nil {
item.runtime = time.Now().UnixNano() - job.LatestRun().Created()
} else {
Expand Down Expand Up @@ -167,7 +168,6 @@ type MarketIteratorPQItem struct {
func (pq *MarketIteratorPQ) Len() int { return len(pq.items) }

func (pq *MarketIteratorPQ) Less(i, j int) bool {

// First by price
if pq.items[i].price != pq.items[j].price {
return pq.items[i].price < pq.items[j].price
Expand Down
4 changes: 2 additions & 2 deletions internal/scheduleringester/instructions.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ func (c *JobSetEventsInstructionConverter) handleSubmitJob(job *armadaevents.Sub
}

bidPrice := 0.0
if job.PriceInfo != nil {
bidPrice = job.PriceInfo.BidPrice
if job.ExperimentalPriceInfo != nil {
bidPrice = job.ExperimentalPriceInfo.BidPrice
}

return []DbOperation{InsertJobs{jobId: &schedulerdb.Job{
Expand Down
8 changes: 7 additions & 1 deletion internal/server/submit/conversion/conversions.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ func SubmitJobFromApiRequest(
JobId: jobId,
DeduplicationId: jobReq.GetClientId(),
Priority: priority,
Price: jobReq.GetPrice(),
ObjectMeta: &armadaevents.ObjectMeta{
Namespace: jobReq.GetNamespace(),
Annotations: jobReq.GetAnnotations(),
Expand All @@ -48,6 +47,13 @@ func SubmitJobFromApiRequest(
Objects: ingressesAndServices,
Scheduler: jobReq.Scheduler,
}

if jobReq.ExperimentalPriceInfo != nil {
msg.ExperimentalPriceInfo = &armadaevents.ExperimentalPriceInfo{
BidPrice: jobReq.ExperimentalPriceInfo.BidPrice,
}
}

postProcess(msg, config)
return msg
}
Expand Down
9 changes: 6 additions & 3 deletions internal/server/submit/validation/submit_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,13 @@ func validatePriorityClasses(j *api.JobSubmitRequestItem, config configuration.S
return nil
}

// Ensures that if a request specifies a Price, that the price is noon-negative
// Ensures that if a request specifies a BidPrice, that the price is non-negative
func validatePrice(j *api.JobSubmitRequestItem, _ configuration.SubmissionConfig) error {
if j.Price < 0 {
return fmt.Errorf("price %.2f is invalid Prices must be greater than zero", j.Price)
if j.ExperimentalPriceInfo == nil {
return nil
}
if j.ExperimentalPriceInfo.BidPrice < 0 {
return fmt.Errorf("price %.2f is invalid Prices must be greater than zero", j.ExperimentalPriceInfo.BidPrice)
}
return nil
}
Expand Down
20 changes: 17 additions & 3 deletions internal/server/submit/validation/submit_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -841,16 +841,30 @@ func TestValidatePrice(t *testing.T) {
item *api.JobSubmitRequestItem
expectSuccess bool
}{
"zero price is ok": {
"nil ExperimentalPriceInfo is ok": {
item: &api.JobSubmitRequestItem{},
expectSuccess: true,
},
"zero price is ok": {
item: &api.JobSubmitRequestItem{
ExperimentalPriceInfo: &api.ExperimentalPriceInfo{},
},
expectSuccess: true,
},
"positive price is ok": {
item: &api.JobSubmitRequestItem{Price: 1.0},
item: &api.JobSubmitRequestItem{
ExperimentalPriceInfo: &api.ExperimentalPriceInfo{
BidPrice: 1.0,
},
},
expectSuccess: true,
},
"negative price is rejected": {
item: &api.JobSubmitRequestItem{Price: -1.0},
item: &api.JobSubmitRequestItem{
ExperimentalPriceInfo: &api.ExperimentalPriceInfo{
BidPrice: -1.0,
},
},
expectSuccess: false,
},
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/api/api.swagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -1741,6 +1741,9 @@ func SwaggerJsonTemplate() string {
" \"clientId\": {\n" +
" \"type\": \"string\"\n" +
" },\n" +
" \"experimentalPriceInfo\": {\n" +
" \"$ref\": \"#/definitions/apiExperimentalPriceInfo\"\n" +
" },\n" +
" \"ingress\": {\n" +
" \"type\": \"array\",\n" +
" \"items\": {\n" +
Expand All @@ -1765,10 +1768,6 @@ func SwaggerJsonTemplate() string {
" \"$ref\": \"#/definitions/v1PodSpec\"\n" +
" }\n" +
" },\n" +
" \"price\": {\n" +
" \"type\": \"number\",\n" +
" \"format\": \"double\"\n" +
" },\n" +
" \"priority\": {\n" +
" \"type\": \"number\",\n" +
" \"format\": \"double\"\n" +
Expand Down
7 changes: 3 additions & 4 deletions pkg/api/api.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1730,6 +1730,9 @@
"clientId": {
"type": "string"
},
"experimentalPriceInfo": {
"$ref": "#/definitions/apiExperimentalPriceInfo"
},
"ingress": {
"type": "array",
"items": {
Expand All @@ -1754,10 +1757,6 @@
"$ref": "#/definitions/v1PodSpec"
}
},
"price": {
"type": "number",
"format": "double"
},
"priority": {
"type": "number",
"format": "double"
Expand Down
Loading

0 comments on commit ac33b8c

Please sign in to comment.