Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
v9n committed Jan 2, 2025
1 parent 395e147 commit e7acfef
Show file tree
Hide file tree
Showing 7 changed files with 433 additions and 355 deletions.
16 changes: 15 additions & 1 deletion aggregator/rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (r *RpcServer) ListExecutions(ctx context.Context, payload *avsproto.ListEx
return r.engine.ListExecutions(user, payload)
}

func (r *RpcServer) GetExecution(ctx context.Context, payload *avsproto.GetExecutionReq) (*avsproto.GetExecutionResp, error) {
func (r *RpcServer) GetExecution(ctx context.Context, payload *avsproto.ExecutionReq) (*avsproto.Execution, error) {
user, err := r.verifyAuth(ctx)
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, "%s: %s", auth.AuthenticationError, err.Error())
Expand All @@ -191,6 +191,20 @@ func (r *RpcServer) GetExecution(ctx context.Context, payload *avsproto.GetExecu
return r.engine.GetExecution(user, payload)
}

func (r *RpcServer) GetExecutionStatus(ctx context.Context, payload *avsproto.ExecutionReq) (*avsproto.ExecutionStatusResp, error) {
user, err := r.verifyAuth(ctx)
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, "%s: %s", auth.AuthenticationError, err.Error())
}

r.config.Logger.Info("process get execution",
"user", user.Address.String(),
"task_id", payload.TaskId,
"execution_id", payload.ExecutionId,
)
return r.engine.GetExecutionStatus(user, payload)
}

func (r *RpcServer) GetTask(ctx context.Context, payload *avsproto.IdReq) (*avsproto.Task, error) {
user, err := r.verifyAuth(ctx)
if err != nil {
Expand Down
47 changes: 33 additions & 14 deletions core/taskengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,11 +706,11 @@ func (n *Engine) ListExecutions(user *model.User, payload *avsproto.ListExecutio
}

func (n *Engine) setExecutionStatusQueue(task *model.Task, executionID string) error {
status := strconv.Itoa(int(avsproto.GetExecutionResp_Queue))
status := strconv.Itoa(int(avsproto.ExecutionStatus_Queued))
return n.db.Set(TaskTriggerKey(task, executionID), []byte(status))
}

func (n *Engine) getExecutonStatusFromQueue(task *model.Task, executionID string) (*avsproto.GetExecutionResp_ExecutionStatus, error) {
func (n *Engine) getExecutonStatusFromQueue(task *model.Task, executionID string) (*avsproto.ExecutionStatus, error) {
status, err := n.db.GetKey(TaskTriggerKey(task, executionID))
if err != nil {
return nil, err
Expand All @@ -720,12 +720,12 @@ func (n *Engine) getExecutonStatusFromQueue(task *model.Task, executionID string
if err != nil {
return nil, err
}
statusValue := avsproto.GetExecutionResp_ExecutionStatus(value)
statusValue := avsproto.ExecutionStatus(value)
return &statusValue, nil
}

// Get xecution for a given task id and execution id
func (n *Engine) GetExecution(user *model.User, payload *avsproto.GetExecutionReq) (*avsproto.GetExecutionResp, error) {
func (n *Engine) GetExecution(user *model.User, payload *avsproto.ExecutionReq) (*avsproto.Execution, error) {
// Validate all tasks own by the caller, if there are any tasks won't be owned by caller, we return permission error
task, err := n.GetTaskByID(payload.TaskId)

Expand All @@ -739,12 +739,6 @@ func (n *Engine) GetExecution(user *model.User, payload *avsproto.GetExecutionRe

executionValue, err := n.db.GetKey(TaskExecutionKey(task, payload.ExecutionId))
if err != nil {
// When execution not found, it could be in pending status, we will check that storage
if status, err := n.getExecutonStatusFromQueue(task, payload.ExecutionId); err == nil {
return &avsproto.GetExecutionResp{
Status: *status,
}, nil
}
return nil, grpcstatus.Errorf(codes.NotFound, ExecutionNotFoundError)
}
exec := avsproto.Execution{}
Expand All @@ -763,11 +757,36 @@ func (n *Engine) GetExecution(user *model.User, payload *avsproto.GetExecutionRe
}
}

result := &avsproto.GetExecutionResp{
Status: avsproto.GetExecutionResp_Completed,
Data: &exec,
return &exec, nil
}

func (n *Engine) GetExecutionStatus(user *model.User, payload *avsproto.ExecutionReq) (*avsproto.ExecutionStatusResp, error) {
task, err := n.GetTaskByID(payload.TaskId)

if err != nil {
return nil, grpcstatus.Errorf(codes.NotFound, TaskNotFoundError)
}
return result, nil

if !task.OwnedBy(user.Address) {
return nil, grpcstatus.Errorf(codes.NotFound, TaskNotFoundError)
}

// First look into execution first
if _, err = n.db.GetKey(TaskExecutionKey(task, payload.ExecutionId)); err != nil {
// When execution not found, it could be in pending status, we will check that storage
if status, err := n.getExecutonStatusFromQueue(task, payload.ExecutionId); err == nil {
return &avsproto.ExecutionStatusResp{
Status: *status,
}, nil
}
return nil, fmt.Errorf("invalid ")
}

// if the key existed, the execution has finished, no need to decode the whole storage, we just return the status in this call
return &avsproto.ExecutionStatusResp{
Status: avsproto.ExecutionStatus_Finished,
}, nil

}

func (n *Engine) DeleteTaskByUser(user *model.User, taskID string) (bool, error) {
Expand Down
63 changes: 31 additions & 32 deletions core/taskengine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,22 +208,22 @@ func TestGetExecution(t *testing.T) {
IsBlocking: true,
})

// Now get back that exectuon id
execution, err := n.GetExecution(testutil.TestUser1(), &avsproto.GetExecutionReq{
// Now get back that execution data using the log
execution, err := n.GetExecution(testutil.TestUser1(), &avsproto.ExecutionReq{
TaskId: result.Id,
ExecutionId: resultTrigger.ExecutionId,
})

if execution.Data.Id != resultTrigger.ExecutionId {
t.Errorf("invalid execution id. expect %s got %s", resultTrigger.ExecutionId, execution.Data.Id)
if execution.Id != resultTrigger.ExecutionId {
t.Errorf("invalid execution id. expect %s got %s", resultTrigger.ExecutionId, execution.Id)
}

if execution.Data.TriggerMetadata.BlockNumber != 101 {
t.Errorf("invalid triggered block. expect 101 got %d", execution.Data.TriggerMetadata.BlockNumber)
if execution.TriggerMetadata.BlockNumber != 101 {
t.Errorf("invalid triggered block. expect 101 got %d", execution.TriggerMetadata.BlockNumber)
}

// Another user cannot get this executin id
execution, err = n.GetExecution(testutil.TestUser2(), &avsproto.GetExecutionReq{
execution, err = n.GetExecution(testutil.TestUser2(), &avsproto.ExecutionReq{
TaskId: result.Id,
ExecutionId: resultTrigger.ExecutionId,
})
Expand Down Expand Up @@ -318,21 +318,17 @@ func TestTriggerSync(t *testing.T) {
}

// Now get back that execution id
execution, err := n.GetExecution(testutil.TestUser1(), &avsproto.GetExecutionReq{
execution, err := n.GetExecution(testutil.TestUser1(), &avsproto.ExecutionReq{
TaskId: result.Id,
ExecutionId: resultTrigger.ExecutionId,
})

if execution.Status != avsproto.GetExecutionResp_Completed {
t.Errorf("invalid execution status, expected conpleted but got %s", avsproto.GetExecutionResp_ExecutionStatus_name[int32(execution.Status)])
if execution.Id != resultTrigger.ExecutionId {
t.Errorf("invalid execution id. expect %s got %s", resultTrigger.ExecutionId, execution.Id)
}

if execution.Data.Id != resultTrigger.ExecutionId {
t.Errorf("invalid execution id. expect %s got %s", resultTrigger.ExecutionId, execution.Data.Id)
}

if execution.Data.TriggerMetadata.BlockNumber != 101 {
t.Errorf("invalid triggered block. expect 101 got %d", execution.Data.TriggerMetadata.BlockNumber)
if execution.TriggerMetadata.BlockNumber != 101 {
t.Errorf("invalid triggered block. expect 101 got %d", execution.TriggerMetadata.BlockNumber)
}
}

Expand Down Expand Up @@ -374,43 +370,46 @@ func TestTriggerAsync(t *testing.T) {

// Now get back that execution id, because the task is run async we won't have any data yet,
// just the status for now
execution, err := n.GetExecution(testutil.TestUser1(), &avsproto.GetExecutionReq{
executionStatus, err := n.GetExecutionStatus(testutil.TestUser1(), &avsproto.ExecutionReq{
TaskId: result.Id,
ExecutionId: resultTrigger.ExecutionId,
})

if execution.Data != nil {
t.Errorf("malform execution result. expect no data but got %s", execution.Data)
}

if execution.Status != avsproto.GetExecutionResp_Queue {
t.Errorf("invalid execution status, expected queue but got %s", avsproto.GetExecutionResp_ExecutionStatus_name[int32(execution.Status)])
if executionStatus.Status != avsproto.ExecutionStatus_Queued {
t.Errorf("invalid execution status, expected queue but got %s", avsproto.TaskStatus_name[int32(executionStatus.Status)])
}

// Now let the queue start and process job
// In our end to end system the worker will process the job eventually
worker.ProcessSignal(1)

execution, err = n.GetExecution(testutil.TestUser1(), &avsproto.GetExecutionReq{
execution, err := n.GetExecution(testutil.TestUser1(), &avsproto.ExecutionReq{
TaskId: result.Id,
ExecutionId: resultTrigger.ExecutionId,
})
if execution.Status != avsproto.GetExecutionResp_Completed {
t.Errorf("invalid execution status, expected completed but got %s", avsproto.GetExecutionResp_ExecutionStatus_name[int32(execution.Status)])
}

if execution.Data.Id != resultTrigger.ExecutionId {
t.Errorf("wring execution id, expected %s got %s", resultTrigger.ExecutionId, execution.Data.Id)
if execution.Id != resultTrigger.ExecutionId {
t.Errorf("wring execution id, expected %s got %s", resultTrigger.ExecutionId, execution.Id)
}

if !execution.Data.Success {
if !execution.Success {
t.Errorf("wrong success result, expected true got false")
}

if execution.Data.Steps[0].NodeId != "ping1" {
if execution.Steps[0].NodeId != "ping1" {
t.Errorf("wrong node id in execution log")
}
if !strings.Contains(execution.Data.Steps[0].OutputData, "httpbin.org") {
if !strings.Contains(execution.Steps[0].OutputData, "httpbin.org") {
t.Error("Invalid output data")
}

// If we get the status back it also reflected
executionStatus, err = n.GetExecutionStatus(testutil.TestUser1(), &avsproto.ExecutionReq{
TaskId: result.Id,
ExecutionId: resultTrigger.ExecutionId,
})

if executionStatus.Status != avsproto.ExecutionStatus_Finished {
t.Errorf("invalid execution status, expected completed but got %s", avsproto.TaskStatus_name[int32(executionStatus.Status)])
}
}
5 changes: 5 additions & 0 deletions core/taskengine/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ func (x *TaskExecutor) Perform(job *apqueue.Job) error {
}

func (x *TaskExecutor) RunTask(task *model.Task, queueData *QueueExecutionData) (*avsproto.Execution, error) {
defer func() {
// Delete the task trigger queue when we're done, the execution log is available in main task storage at this point
x.db.GetKey(TaskTriggerKey(task, queueData.ExecutionID))
}()

if queueData == nil || queueData.ExecutionID == "" {
return nil, fmt.Errorf("internal error: invalid execution id")
}
Expand Down
Loading

0 comments on commit e7acfef

Please sign in to comment.