diff --git a/core/taskengine/engine.go b/core/taskengine/engine.go index 2b1bf01..8ec840e 100644 --- a/core/taskengine/engine.go +++ b/core/taskengine/engine.go @@ -340,6 +340,10 @@ func (n *Engine) StreamCheckToOperator(payload *avsproto.SyncMessagesReq, srv av continue } + if !n.CanStreamCheck(address) { + continue + } + for _, task := range n.tasks { if _, ok := n.trackSyncedTasks[address].TaskID[task.Id]; ok { continue @@ -797,3 +801,8 @@ func (n *Engine) NewSeqID() (string, error) { } return strconv.FormatInt(int64(num), 10), nil } + +func (n *Engine) CanStreamCheck(address string) bool { + // Only enable for our own operator first, once it's stable we will roll out to all + return address == "0x997e5d40a32c44a3d93e59fc55c4fd20b7d2d49d" || address == "0xc6b87cc9e85b07365b6abefff061f237f7cf7dc3" +} diff --git a/core/taskengine/macros/exp.go b/core/taskengine/macros/exp.go index 55e32ea..a367ed7 100644 --- a/core/taskengine/macros/exp.go +++ b/core/taskengine/macros/exp.go @@ -19,6 +19,9 @@ var ( rpcConn *ethclient.Client ) +type Builtin struct { +} + func SetRpc(rpcURL string) { if conn, err := ethclient.Dial(rpcURL); err == nil { rpcConn = conn @@ -125,6 +128,33 @@ func ToBigInt(val string) *big.Int { return b } +func (bi *Builtin) ToBigInt(val string) *big.Int { + return ToBigInt(val) +} + +func (bi *Builtin) ChainlinkLatestRoundData(tokenPair string) *big.Int { + return chainlinkLatestRoundData(tokenPair) +} +func (bi *Builtin) ChainlinkLatestAnswer(tokenPair string) *big.Int { + return chainlinkLatestAnswer(tokenPair) +} + +func (bi *Builtin) BigCmp(a *big.Int, b *big.Int) (r int) { + return BigCmp(a, b) +} + +func (bi *Builtin) BigGt(a *big.Int, b *big.Int) bool { + return BigGt(a, b) +} + +func (bi *Builtin) BigLt(a *big.Int, b *big.Int) bool { + return BigLt(a, b) +} + +func (bi *Builtin) ParseUnit(val string, decimal uint) *big.Int { + return ParseUnit(val, decimal) +} + var ( exprEnv = map[string]any{ "readContractData": readContractData, diff --git a/core/taskengine/vm.go b/core/taskengine/vm.go index 539a0a9..cb1c07d 100644 --- a/core/taskengine/vm.go +++ b/core/taskengine/vm.go @@ -278,37 +278,7 @@ func (v *VM) executeNode(node *avsproto.TaskNode) (*avsproto.Execution_Step, err } if nodeValue := node.GetRestApi(); nodeValue != nil { - // TODO: refactor into function - p := NewRestProrcessor(v) - - // only evaluate string when there is string interpolation - if nodeValue.Body != "" && (strings.Contains(nodeValue.Body, "$") || strings.Contains(nodeValue.Body, "`")) { - nodeValue2 := &avsproto.RestAPINode{ - Url: macros.RenderString(nodeValue.Url, macroEnvs), - Headers: nodeValue.Headers, - Method: nodeValue.Method, - Body: strings.Clone(nodeValue.Body), - } - jsvm := goja.New() - - for key, value := range v.vars { - jsvm.Set(key, map[string]any{ - "data": value, - }) - } - - renderBody, err := jsvm.RunString(nodeValue.Body) - if err == nil { - nodeValue2.Body = renderBody.Export().(string) - } else { - fmt.Println("error render string with goja", err) - } - executionLog, err = p.Execute(node.Id, nodeValue2) - } else { - executionLog, err = p.Execute(node.Id, nodeValue) - } - - v.ExecutionLogs = append(v.ExecutionLogs, executionLog) + executionLog, err = v.runRestApi(node.Id, nodeValue) } else if nodeValue := node.GetBranch(); nodeValue != nil { outcomeID := "" executionLog, outcomeID, err = v.runBranch(node.Id, nodeValue) @@ -331,8 +301,49 @@ func (v *VM) executeNode(node *avsproto.TaskNode) (*avsproto.Execution_Step, err } } else if nodeValue := node.GetGraphqlQuery(); nodeValue != nil { executionLog, err = v.runGraphQL(node.Id, nodeValue) + } else if nodeValue := node.GetCustomCode(); nodeValue != nil { + executionLog, err = v.runCustomCode(node.Id, nodeValue) + } + + return executionLog, err +} + +func (v *VM) runRestApi(stepID string, nodeValue *avsproto.RestAPINode) (*avsproto.Execution_Step, error) { + p := NewRestProrcessor(v) + + var err error + executionLog := &avsproto.Execution_Step{ + NodeId: stepID, + } + + // only evaluate string when there is string interpolation + if nodeValue.Body != "" && (strings.Contains(nodeValue.Body, "$") || strings.Contains(nodeValue.Body, "`")) { + nodeValue2 := &avsproto.RestAPINode{ + Url: macros.RenderString(nodeValue.Url, macroEnvs), + Headers: nodeValue.Headers, + Method: nodeValue.Method, + Body: strings.Clone(nodeValue.Body), + } + jsvm := goja.New() + + for key, value := range v.vars { + jsvm.Set(key, map[string]any{ + "data": value, + }) + } + + renderBody, err := jsvm.RunString(nodeValue.Body) + if err == nil { + nodeValue2.Body = renderBody.Export().(string) + } else { + v.logger.Error("error render string with goja", "error", err) + } + executionLog, err = p.Execute(stepID, nodeValue2) + } else { + executionLog, err = p.Execute(stepID, nodeValue) } + v.ExecutionLogs = append(v.ExecutionLogs, executionLog) return executionLog, err } @@ -350,6 +361,17 @@ func (v *VM) runGraphQL(stepID string, node *avsproto.GraphQLQueryNode) (*avspro return executionLog, nil } +func (v *VM) runCustomCode(stepID string, node *avsproto.CustomCodeNode) (*avsproto.Execution_Step, error) { + r := NewJSProcessor(v) + executionLog, err := r.Execute(stepID, node) + if err != nil { + v.logger.Error("error execute JavaScript code", "task_id", v.TaskID, "step", stepID, "error", err) + } + v.ExecutionLogs = append(v.ExecutionLogs, executionLog) + + return executionLog, nil +} + func (v *VM) runBranch(stepID string, node *avsproto.BranchNode) (*avsproto.Execution_Step, string, error) { t0 := time.Now() s := &avsproto.Execution_Step{ diff --git a/core/taskengine/vm_runner_contract_read_test.go b/core/taskengine/vm_runner_contract_read_test.go index 1a989ca..d57ce63 100644 --- a/core/taskengine/vm_runner_contract_read_test.go +++ b/core/taskengine/vm_runner_contract_read_test.go @@ -10,15 +10,35 @@ import ( ) func TestContractReadSimpleReturn(t *testing.T) { - n := NewContractReadProcessor(testutil.GetRpcClient()) - node := &avsproto.ContractReadNode{ ContractAddress: "0x1c7d4b196cb0c7b01d743fbc6116a902379c7238", CallData: "0x70a08231000000000000000000000000ce289bb9fb0a9591317981223cbe33d5dc42268d", ContractAbi: `[{"inputs":[{"internalType":"address","name":"account","type":"address"}],"name":"balanceOf","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"}]`, Method: "balanceOf", } - step, err := n.Execute("check balance", node) + nodes := []*avsproto.TaskNode{ + &avsproto.TaskNode{ + Id: "123", + Name: "contractQuery", + TaskType: &avsproto.TaskNode_ContractRead{ + ContractRead: node, + }, + }, + } + + edges := []*avsproto.TaskEdge{ + &avsproto.TaskEdge{ + Id: "e1", + Source: "__TRIGGER__", + Target: "123", + }, + } + + vm, err := NewVMWithData("123", nil, nodes, edges) + + n := NewContractReadProcessor(vm, testutil.GetRpcClient()) + + step, err := n.Execute("123", node) if err != nil { t.Errorf("expected contract read node run succesfull but got error: %v", err) @@ -41,15 +61,34 @@ func TestContractReadSimpleReturn(t *testing.T) { } func TestContractReadComplexReturn(t *testing.T) { - n := NewContractReadProcessor(testutil.GetRpcClient()) - node := &avsproto.ContractReadNode{ ContractAddress: "0xc59E3633BAAC79493d908e63626716e204A45EdF", CallData: "0x9a6fc8f500000000000000000000000000000000000000000000000100000000000052e7", ContractAbi: `[{"inputs":[{"internalType":"uint80","name":"_roundId","type":"uint80"}],"name":"getRoundData","outputs":[{"internalType":"uint80","name":"roundId","type":"uint80"},{"internalType":"int256","name":"answer","type":"int256"},{"internalType":"uint256","name":"startedAt","type":"uint256"},{"internalType":"uint256","name":"updatedAt","type":"uint256"},{"internalType":"uint80","name":"answeredInRound","type":"uint80"}],"stateMutability":"view","type":"function"}]`, Method: "getRoundData", } - step, err := n.Execute("latest round data", node) + + nodes := []*avsproto.TaskNode{ + &avsproto.TaskNode{ + Id: "123abc", + Name: "contractQuery", + TaskType: &avsproto.TaskNode_ContractRead{ + ContractRead: node, + }, + }, + } + + edges := []*avsproto.TaskEdge{ + &avsproto.TaskEdge{ + Id: "e1", + Source: "__TRIGGER__", + Target: "123abc", + }, + } + + vm, err := NewVMWithData("123abc", nil, nodes, edges) + n := NewContractReadProcessor(vm, testutil.GetRpcClient()) + step, err := n.Execute("123abc", node) if err != nil { t.Errorf("expected contract read node run succesfull but got error: %v", err) diff --git a/core/taskengine/vm_runner_customcode.go b/core/taskengine/vm_runner_customcode.go new file mode 100644 index 0000000..7adb547 --- /dev/null +++ b/core/taskengine/vm_runner_customcode.go @@ -0,0 +1,86 @@ +package taskengine + +import ( + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/dop251/goja" + + "github.com/AvaProtocol/ap-avs/core/taskengine/macros" + avsproto "github.com/AvaProtocol/ap-avs/protobuf" +) + +type JSProcessor struct { + *CommonProcessor + jsvm *goja.Runtime +} + +func NewJSProcessor(vm *VM) *JSProcessor { + r := JSProcessor{ + CommonProcessor: &CommonProcessor{ + vm: vm, + }, + jsvm: goja.New(), + } + + for key, value := range macros.GetEnvs(nil) { + r.jsvm.Set(key, value) + } + for key, value := range vm.vars { + r.jsvm.Set(key, map[string]any{ + "data": value, + }) + } + + return &r +} + +func (r *JSProcessor) Execute(stepID string, node *avsproto.CustomCodeNode) (*avsproto.Execution_Step, error) { + t0 := time.Now().Unix() + + s := &avsproto.Execution_Step{ + NodeId: stepID, + Log: "", + OutputData: "", + Success: true, + Error: "", + StartAt: t0, + } + + var err error + defer func() { + s.EndAt = time.Now().Unix() + s.Success = err == nil + if err != nil { + s.Error = err.Error() + } + }() + + var log strings.Builder + + log.WriteString(fmt.Sprintf("Start execute user-input JS code at %s", time.Now())) + //result, err := r.jsvm.RunString("(function() {" + node.Source + "})()") + result, err := r.jsvm.RunString(node.Source) + log.WriteString(fmt.Sprintf("Complete Execute user-input JS code at %s", time.Now())) + if err != nil { + s.Success = false + s.Error = err.Error() + log.WriteString("\nerror running JavaScript code:") + log.WriteString(err.Error()) + } + s.Log = log.String() + + resultValue := result.Export() + // TODO: capsize + if outputData, serilizeError := json.Marshal(resultValue); serilizeError == nil { + s.OutputData = string(outputData) + } else { + log.WriteString("cannot serilize output data to log") + } + + r.SetOutputVarForStep(stepID, resultValue) + + return s, err +} diff --git a/core/taskengine/vm_runner_customcode_test.go b/core/taskengine/vm_runner_customcode_test.go new file mode 100644 index 0000000..fb85478 --- /dev/null +++ b/core/taskengine/vm_runner_customcode_test.go @@ -0,0 +1,89 @@ +package taskengine + +import ( + "strings" + "testing" + + avsproto "github.com/AvaProtocol/ap-avs/protobuf" +) + +func TestRunJavaScript(t *testing.T) { + node := &avsproto.CustomCodeNode{ + Source: "3>2", + } + nodes := []*avsproto.TaskNode{ + &avsproto.TaskNode{ + Id: "123abc", + Name: "customJs", + TaskType: &avsproto.TaskNode_CustomCode{ + CustomCode: node, + }, + }, + } + + edges := []*avsproto.TaskEdge{ + &avsproto.TaskEdge{ + Id: "e1", + Source: "__TRIGGER__", + Target: "123abc", + }, + } + + vm, err := NewVMWithData("123abc", nil, nodes, edges) + n := NewJSProcessor(vm) + + step, err := n.Execute("123abc", node) + + if err != nil { + t.Errorf("expected JavaScript node run succesfull but got error: %v", err) + } + + if !step.Success { + t.Errorf("expected JavaScript node run succesfully but failed") + } + + if !strings.Contains(step.Log, "Start execute user-input JS code at") { + t.Errorf("expected log contains trace data but found no") + } + + if step.Error != "" { + t.Errorf("expected log contains request trace data but found no") + } + + if step.OutputData != "true" { + t.Errorf("wrong result, expect true got %s", step.OutputData) + } + +} + +func TestRunJavaScriptComplex(t *testing.T) { + node := &avsproto.CustomCodeNode{ + Source: "const a=[1,2,3]; a.filter((i) => i >= 2);", + } + nodes := []*avsproto.TaskNode{ + &avsproto.TaskNode{ + Id: "123abc", + Name: "customJs", + TaskType: &avsproto.TaskNode_CustomCode{ + CustomCode: node, + }, + }, + } + + edges := []*avsproto.TaskEdge{ + &avsproto.TaskEdge{ + Id: "e1", + Source: "__TRIGGER__", + Target: "123abc", + }, + } + + vm, _ := NewVMWithData("123abc", nil, nodes, edges) + n := NewJSProcessor(vm) + + step, _ := n.Execute("123abc", node) + + if step.OutputData != "[2,3]" { + t.Errorf("wrong JS code evaluation result, expect [2,3] got %s", step.OutputData) + } +} diff --git a/core/taskengine/vm_runner_graphql_query_test.go b/core/taskengine/vm_runner_graphql_query_test.go index 57d979a..cb87d9f 100644 --- a/core/taskengine/vm_runner_graphql_query_test.go +++ b/core/taskengine/vm_runner_graphql_query_test.go @@ -22,9 +22,28 @@ func TestGraphlQlNodeSimpleQuery(t *testing.T) { }`, } - n, _ := NewGraphqlQueryProcessor(node.Url) + nodes := []*avsproto.TaskNode{ + &avsproto.TaskNode{ + Id: "123abc", + Name: "graphqlQuery", + TaskType: &avsproto.TaskNode_GraphqlQuery{ + GraphqlQuery: node, + }, + }, + } + + edges := []*avsproto.TaskEdge{ + &avsproto.TaskEdge{ + Id: "e1", + Source: "__TRIGGER__", + Target: "123abc", + }, + } + + vm, err := NewVMWithData("123abc", nil, nodes, edges) + n, _ := NewGraphqlQueryProcessor(vm, node.Url) - step, err := n.Execute("lido approval", node) + step, _, err := n.Execute("123abc", node) if err != nil { t.Errorf("expected rest node run succesfull but got error: %v", err) diff --git a/core/taskengine/vm_runner_rest_test.go b/core/taskengine/vm_runner_rest_test.go index af4ff97..1b3f938 100644 --- a/core/taskengine/vm_runner_rest_test.go +++ b/core/taskengine/vm_runner_rest_test.go @@ -8,8 +8,6 @@ import ( ) func TestRestRequest(t *testing.T) { - n := NewRestProrcessor() - node := &avsproto.RestAPINode{ Url: "https://httpbin.org/post", Headers: map[string]string{ @@ -18,7 +16,28 @@ func TestRestRequest(t *testing.T) { Body: "chat_id=123&disable_notification=true&text=%2AThis+is+a+test+format%2A", Method: "POST", } - step, err := n.Execute("foo123", node) + + nodes := []*avsproto.TaskNode{ + &avsproto.TaskNode{ + Id: "123abc", + Name: "restApi", + TaskType: &avsproto.TaskNode_RestApi{ + RestApi: node, + }, + }, + } + + edges := []*avsproto.TaskEdge{ + &avsproto.TaskEdge{ + Id: "e1", + Source: "__TRIGGER__", + Target: "123abc", + }, + } + + vm, err := NewVMWithData("123abc", nil, nodes, edges) + n := NewRestProrcessor(vm) + step, err := n.Execute("123abc", node) if err != nil { t.Errorf("expected rest node run succesfull but got error: %v", err) diff --git a/core/taskengine/vm_test.go b/core/taskengine/vm_test.go index 66aace6..cdaaf9e 100644 --- a/core/taskengine/vm_test.go +++ b/core/taskengine/vm_test.go @@ -90,6 +90,11 @@ func TestRunSimpleTasks(t *testing.T) { if !strings.Contains(vm.ExecutionLogs[0].Log, "Execute") { t.Errorf("error generating log for executing. expect a log line displaying the request attempt, got nothing") } + + data := vm.vars["httpnode"].(map[string]any) + if data["data"].(string) != "a=123" { + t.Errorf("step result isn't store properly, expect 123 got %s", data["data"]) + } } func TestRunSequentialTasks(t *testing.T) {