Skip to content

Commit

Permalink
impl pagination, split exec log out of task list
Browse files Browse the repository at this point in the history
  • Loading branch information
v9n committed Dec 9, 2024
1 parent e4c5ab1 commit dfb40e4
Show file tree
Hide file tree
Showing 22 changed files with 6,601 additions and 6,762 deletions.
34 changes: 22 additions & 12 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
ROOT_DIR := $(shell pwd)

MAIN_PACKAGE_PATH ?= ./
BINARY_NAME ?= ap-avs
BINARY_NAME ?= ap

# ==================================================================================== #
# HELPERS
Expand Down Expand Up @@ -56,15 +58,6 @@ build:
run: build
/tmp/bin/${BINARY_NAME}

## run/live: run the application with reloading on file changes
.PHONY: run/live
run/live:
go run github.com/cosmtrek/air@v1.43.0 \
--build.cmd "make build" --build.bin "/tmp/bin/${BINARY_NAME}" --build.delay "100" \
--build.exclude_dir "" \
--build.include_ext "go, tpl, tmpl, html, css, scss, js, ts, sql, jpeg, jpg, gif, png, bmp, svg, webp, ico" \
--misc.clean_on_exit "true"


## push: push changes to the remote Git repository
.PHONY: push
Expand All @@ -77,14 +70,22 @@ production/deploy: confirm tidy audit no-dirty
GOOS=linux GOARCH=amd64 go build -ldflags='-s' -o=/tmp/bin/linux_amd64/${BINARY_NAME} ${MAIN_PACKAGE_PATH}
upx -5 /tmp/bin/linux_amd64/${BINARY_NAME}

## dev: generate protoc
## protoc-gen: generate protoc buf Go binding
.PHONY: protoc-gen
protoc-gen:
protoc \
--go_out=. \
--go_opt=paths=source_relative \
--go-grpc_out=. \
--go-grpc_opt=paths=source_relative \
protobuf/avs.proto
protobuf/avs.proto

protoc \
--go_out=. \
--go_opt=paths=source_relative \
--go-grpc_out=. \
--go-grpc_opt=paths=source_relative \
protobuf/node.proto

## up: bring up docker compose stack
up:
Expand All @@ -96,6 +97,15 @@ unstable-build:
docker push avaprotocol/avs-dev:unstable


## dev/live: run the application with reloading on file changes
.PHONY: dev-live
dev-live:
go run github.com/air-verse/air@v1.61.1 \
--build.cmd "make dev-build" --build.bin "./out/${BINARY_NAME}" --build.args_bin "aggregator" --build.delay "100" \
--build.exclude_dir "certs,client-sdk,contracts,examples,out,docs,tmp" \
--build.include_ext "go, tpl, tmpl, html, css, scss, js, ts, sql, jpeg, jpg, gif, png, bmp, svg, webp, ico" \
--misc.clean_on_exit "true"

## dev-build: build a dev version for local development
dev-build:
mkdir out || true
Expand Down
15 changes: 10 additions & 5 deletions aggregator/rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,20 @@ func (r *RpcServer) ListTasks(ctx context.Context, payload *avsproto.ListTasksRe
"user", user.Address.String(),
"smart_wallet_address", payload.SmartWalletAddress,
)
tasks, err := r.engine.ListTasksByUser(user, payload)
return r.engine.ListTasksByUser(user, payload)
}

func (r *RpcServer) ListExecutions(ctx context.Context, payload *avsproto.ListExecutionsReq) (*avsproto.ListExecutionsResp, error) {
user, err := r.verifyAuth(ctx)
if err != nil {
return nil, err
return nil, status.Errorf(codes.Unauthenticated, "%s: %s", auth.InvalidAuthenticationKey, err.Error())
}

return &avsproto.ListTasksResp{
Tasks: tasks,
}, nil
r.config.Logger.Info("process list execution",
"user", user.Address.String(),
"task_id", payload.Id,
)
return r.engine.ListExecutions(user, payload)
}

func (r *RpcServer) GetTask(ctx context.Context, payload *avsproto.IdReq) (*avsproto.Task, error) {
Expand Down
97 changes: 97 additions & 0 deletions core/taskengine/cursor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package taskengine

import (
"encoding/base64"
"encoding/json"
"strconv"

"github.com/oklog/ulid/v2"
)

type CursorDirection string

const (
CursorDirectionNext = CursorDirection("next")
CursorDirectionPrevious = CursorDirection("prev")
)

type Cursor struct {
Direction CursorDirection `json:"d"`
Position string `json:"p"`

parsePos bool `json:"-"`
int64Pos int64 `json:"-"`
ulidPos ulid.ULID `json:"-"`
}

func CursorFromString(data string) (*Cursor, error) {
c := &Cursor{
Direction: CursorDirectionNext,
Position: "0",
parsePos: false,
int64Pos: 0,
ulidPos: ulid.Zero,
}

decoded, err := base64.StdEncoding.DecodeString(data)
if err != nil {
return c, err
}

if err = json.Unmarshal(decoded, &c); err == nil {
return c, nil
} else {
return c, err
}
}

func NewCursor(direction CursorDirection, position string) *Cursor {
return &Cursor{
Direction: direction,
Position: position,

parsePos: false,
int64Pos: 0,
}
}
func (c *Cursor) String() string {
var d []byte
d, err := json.Marshal(c)

if err != nil {
return ""
}

encoded := base64.StdEncoding.EncodeToString(d)

return encoded
}

// Given a value, return true if the value is after the cursor
func (c *Cursor) AfterInt64(value int64) bool {
if !c.parsePos {
c.int64Pos, _ = strconv.ParseInt(c.Position, 10, 64)
c.parsePos = true
}
if c.Direction == CursorDirectionNext {
return c.int64Pos <= value
}

return c.int64Pos >= value
}

// Given a value, return true if the value is after the cursor
func (c *Cursor) AfterUlid(value ulid.ULID) bool {
if !c.parsePos {
var err error
c.ulidPos, err = ulid.Parse(c.Position)
if err != nil {
c.ulidPos = ulid.Zero
}
c.parsePos = true
}
if c.Direction == CursorDirectionNext {
return c.ulidPos.Compare(value) < 0
}
return c.ulidPos.Compare(value) > 0
}
108 changes: 100 additions & 8 deletions core/taskengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"math/big"
"strconv"
"strings"
"sync"
"time"

Expand All @@ -20,6 +19,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/oklog/ulid/v2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
grpcstatus "google.golang.org/grpc/status"
Expand All @@ -29,7 +29,8 @@ import (
)

const (
ExecuteTask = "execute_task"
ExecuteTask = "execute_task"
DefaultItemPerPage = 50
)

var (
Expand Down Expand Up @@ -388,7 +389,7 @@ func (n *Engine) AggregateChecksResult(address string, payload *avsproto.NotifyT
return nil
}

func (n *Engine) ListTasksByUser(user *model.User, payload *avsproto.ListTasksReq) ([]*avsproto.Task, error) {
func (n *Engine) ListTasksByUser(user *model.User, payload *avsproto.ListTasksReq) (*avsproto.ListTasksResp, error) {
// by default show the task from the default smart wallet, if proving we look into that wallet specifically
owner := user.SmartAccountAddress
if payload.SmartWalletAddress == "" {
Expand All @@ -412,10 +413,26 @@ func (n *Engine) ListTasksByUser(user *model.User, payload *avsproto.ListTasksRe
return nil, grpcstatus.Errorf(codes.Code(avsproto.Error_StorageUnavailable), StorageUnavailableError)
}

tasks := make([]*avsproto.Task, len(taskIDs))
for i, kv := range taskIDs {
taskResp := &avsproto.ListTasksResp{
Tasks: []*avsproto.ListTasksResp_Item{},
Cursor: "",
}

total := 0
cursor, err := CursorFromString(payload.Cursor)
itemPerPage := int(payload.ItemPerPage)
if itemPerPage == 0 {
itemPerPage = DefaultItemPerPage
}
for _, kv := range taskIDs {
status, _ := strconv.Atoi(string(kv.Value))
taskID := string(model.TaskKeyToId(kv.Key[2:]))

taskIDUlid := model.UlidFromTaskId(taskID)
if !cursor.AfterUlid(taskIDUlid) {
continue
}

taskRawByte, err := n.db.GetKey(TaskStorageKey(taskID, avsproto.TaskStatus(status)))
if err != nil {
continue
Expand All @@ -427,10 +444,34 @@ func (n *Engine) ListTasksByUser(user *model.User, payload *avsproto.ListTasksRe
}
task.Id = taskID

tasks[i], _ = task.ToProtoBuf()
if t, err := task.ToProtoBuf(); err == nil {
taskResp.Tasks = append(taskResp.Tasks, &avsproto.ListTasksResp_Item{
Id: t.Id,
Owner: t.Owner,
SmartWalletAddress: t.SmartWalletAddress,
StartAt: t.StartAt,
ExpiredAt: t.ExpiredAt,
Memo: t.Memo,
CompletedAt: t.CompletedAt,
MaxExecution: t.MaxExecution,
TotalExecution: t.TotalExecution,
LastRanAt: t.LastRanAt,
Status: t.Status,
Trigger: t.Trigger,
})
total += 1
}

if total >= itemPerPage {
break
}
}

return tasks, nil
if total >= itemPerPage {
taskResp.Cursor = NewCursor(CursorDirectionNext, taskResp.Tasks[total-1].Id).String()
}

return taskResp, nil
}

func (n *Engine) GetTaskByID(taskID string) (*model.Task, error) {
Expand All @@ -456,13 +497,64 @@ func (n *Engine) GetTask(user *model.User, taskID string) (*model.Task, error) {
return nil, err
}

if strings.ToLower(task.Owner) != strings.ToLower(user.Address.Hex()) {
if !task.OwnedBy(user.Address) {
return nil, grpcstatus.Errorf(codes.NotFound, TaskNotFoundError)
}

return task, nil
}

// List Execution for a given task id
func (n *Engine) ListExecutions(user *model.User, payload *avsproto.ListExecutionsReq) (*avsproto.ListExecutionsResp, error) {
task, err := n.GetTaskByID(payload.Id)
if err != nil {
return nil, err
}

if !task.OwnedBy(user.Address) {
return nil, grpcstatus.Errorf(codes.NotFound, TaskNotFoundError)
}

executionKVs, err := n.db.GetByPrefix(TaskExecutionPrefix(task.Id))

if err != nil {
return nil, grpcstatus.Errorf(codes.Code(avsproto.Error_StorageUnavailable), StorageUnavailableError)
}

executioResp := &avsproto.ListExecutionsResp{
Executions: []*avsproto.Execution{},
Cursor: "",
}

total := 0
cursor, err := CursorFromString(payload.Cursor)

itemPerPage := int(payload.ItemPerPage)
if itemPerPage == 0 {
itemPerPage = DefaultItemPerPage
}
for _, kv := range executionKVs {
executionUlid := ulid.MustParse(string(ExecutionIdFromStorageKey(kv.Key)))
if !cursor.AfterUlid(executionUlid) {
continue
}

exec := avsproto.Execution{}
if err := protojson.Unmarshal(kv.Value, &exec); err == nil {
executioResp.Executions = append(executioResp.Executions, &exec)
total += 1
}
if total >= itemPerPage {
break
}
}

if total >= itemPerPage {
executioResp.Cursor = NewCursor(CursorDirectionNext, executioResp.Executions[total-1].Id).String()
}
return executioResp, nil
}

func (n *Engine) DeleteTaskByUser(user *model.User, taskID string) (bool, error) {
task, err := n.GetTask(user, taskID)

Expand Down
Loading

0 comments on commit dfb40e4

Please sign in to comment.