Skip to content

Commit

Permalink
implement javascript runner with goja
Browse files Browse the repository at this point in the history
  • Loading branch information
v9n committed Dec 27, 2024
1 parent 53dd37d commit 0a2ab8a
Show file tree
Hide file tree
Showing 9 changed files with 360 additions and 42 deletions.
9 changes: 9 additions & 0 deletions core/taskengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,10 @@ func (n *Engine) StreamCheckToOperator(payload *avsproto.SyncMessagesReq, srv av
continue
}

if !n.CanStreamCheck(address) {
continue
}

for _, task := range n.tasks {
if _, ok := n.trackSyncedTasks[address].TaskID[task.Id]; ok {
continue
Expand Down Expand Up @@ -797,3 +801,8 @@ func (n *Engine) NewSeqID() (string, error) {
}
return strconv.FormatInt(int64(num), 10), nil
}

func (n *Engine) CanStreamCheck(address string) bool {
// Only enable for our own operator first, once it's stable we will roll out to all
return address == "0x997e5d40a32c44a3d93e59fc55c4fd20b7d2d49d" || address == "0xc6b87cc9e85b07365b6abefff061f237f7cf7dc3"
}
30 changes: 30 additions & 0 deletions core/taskengine/macros/exp.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ var (
rpcConn *ethclient.Client
)

type Builtin struct {
}

func SetRpc(rpcURL string) {
if conn, err := ethclient.Dial(rpcURL); err == nil {
rpcConn = conn
Expand Down Expand Up @@ -125,6 +128,33 @@ func ToBigInt(val string) *big.Int {
return b
}

func (bi *Builtin) ToBigInt(val string) *big.Int {
return ToBigInt(val)
}

func (bi *Builtin) ChainlinkLatestRoundData(tokenPair string) *big.Int {
return chainlinkLatestRoundData(tokenPair)
}
func (bi *Builtin) ChainlinkLatestAnswer(tokenPair string) *big.Int {
return chainlinkLatestAnswer(tokenPair)
}

func (bi *Builtin) BigCmp(a *big.Int, b *big.Int) (r int) {
return BigCmp(a, b)
}

func (bi *Builtin) BigGt(a *big.Int, b *big.Int) bool {
return BigGt(a, b)
}

func (bi *Builtin) BigLt(a *big.Int, b *big.Int) bool {
return BigLt(a, b)
}

func (bi *Builtin) ParseUnit(val string, decimal uint) *big.Int {
return ParseUnit(val, decimal)
}

var (
exprEnv = map[string]any{
"readContractData": readContractData,
Expand Down
84 changes: 53 additions & 31 deletions core/taskengine/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,37 +278,7 @@ func (v *VM) executeNode(node *avsproto.TaskNode) (*avsproto.Execution_Step, err
}

if nodeValue := node.GetRestApi(); nodeValue != nil {
// TODO: refactor into function
p := NewRestProrcessor(v)

// only evaluate string when there is string interpolation
if nodeValue.Body != "" && (strings.Contains(nodeValue.Body, "$") || strings.Contains(nodeValue.Body, "`")) {
nodeValue2 := &avsproto.RestAPINode{
Url: macros.RenderString(nodeValue.Url, macroEnvs),
Headers: nodeValue.Headers,
Method: nodeValue.Method,
Body: strings.Clone(nodeValue.Body),
}
jsvm := goja.New()

for key, value := range v.vars {
jsvm.Set(key, map[string]any{
"data": value,
})
}

renderBody, err := jsvm.RunString(nodeValue.Body)
if err == nil {
nodeValue2.Body = renderBody.Export().(string)
} else {
fmt.Println("error render string with goja", err)
}
executionLog, err = p.Execute(node.Id, nodeValue2)
} else {
executionLog, err = p.Execute(node.Id, nodeValue)
}

v.ExecutionLogs = append(v.ExecutionLogs, executionLog)
executionLog, err = v.runRestApi(node.Id, nodeValue)
} else if nodeValue := node.GetBranch(); nodeValue != nil {
outcomeID := ""
executionLog, outcomeID, err = v.runBranch(node.Id, nodeValue)
Expand All @@ -331,8 +301,49 @@ func (v *VM) executeNode(node *avsproto.TaskNode) (*avsproto.Execution_Step, err
}
} else if nodeValue := node.GetGraphqlQuery(); nodeValue != nil {
executionLog, err = v.runGraphQL(node.Id, nodeValue)
} else if nodeValue := node.GetCustomCode(); nodeValue != nil {
executionLog, err = v.runCustomCode(node.Id, nodeValue)
}

return executionLog, err
}

func (v *VM) runRestApi(stepID string, nodeValue *avsproto.RestAPINode) (*avsproto.Execution_Step, error) {
p := NewRestProrcessor(v)

var err error
executionLog := &avsproto.Execution_Step{
NodeId: stepID,
}

// only evaluate string when there is string interpolation
if nodeValue.Body != "" && (strings.Contains(nodeValue.Body, "$") || strings.Contains(nodeValue.Body, "`")) {
nodeValue2 := &avsproto.RestAPINode{
Url: macros.RenderString(nodeValue.Url, macroEnvs),
Headers: nodeValue.Headers,
Method: nodeValue.Method,
Body: strings.Clone(nodeValue.Body),
}
jsvm := goja.New()

for key, value := range v.vars {
jsvm.Set(key, map[string]any{
"data": value,
})
}

renderBody, err := jsvm.RunString(nodeValue.Body)
if err == nil {
nodeValue2.Body = renderBody.Export().(string)
} else {
v.logger.Error("error render string with goja", "error", err)
}
executionLog, err = p.Execute(stepID, nodeValue2)
} else {
executionLog, err = p.Execute(stepID, nodeValue)
}

v.ExecutionLogs = append(v.ExecutionLogs, executionLog)
return executionLog, err
}

Expand All @@ -350,6 +361,17 @@ func (v *VM) runGraphQL(stepID string, node *avsproto.GraphQLQueryNode) (*avspro
return executionLog, nil
}

func (v *VM) runCustomCode(stepID string, node *avsproto.CustomCodeNode) (*avsproto.Execution_Step, error) {
r := NewJSProcessor(v)
executionLog, err := r.Execute(stepID, node)
if err != nil {
v.logger.Error("error execute JavaScript code", "task_id", v.TaskID, "step", stepID, "error", err)
}
v.ExecutionLogs = append(v.ExecutionLogs, executionLog)

return executionLog, nil
}

func (v *VM) runBranch(stepID string, node *avsproto.BranchNode) (*avsproto.Execution_Step, string, error) {
t0 := time.Now()
s := &avsproto.Execution_Step{
Expand Down
51 changes: 45 additions & 6 deletions core/taskengine/vm_runner_contract_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,35 @@ import (
)

func TestContractReadSimpleReturn(t *testing.T) {
n := NewContractReadProcessor(testutil.GetRpcClient())

node := &avsproto.ContractReadNode{
ContractAddress: "0x1c7d4b196cb0c7b01d743fbc6116a902379c7238",
CallData: "0x70a08231000000000000000000000000ce289bb9fb0a9591317981223cbe33d5dc42268d",
ContractAbi: `[{"inputs":[{"internalType":"address","name":"account","type":"address"}],"name":"balanceOf","outputs":[{"internalType":"uint256","name":"","type":"uint256"}],"stateMutability":"view","type":"function"}]`,
Method: "balanceOf",
}
step, err := n.Execute("check balance", node)
nodes := []*avsproto.TaskNode{
&avsproto.TaskNode{
Id: "123",
Name: "contractQuery",
TaskType: &avsproto.TaskNode_ContractRead{
ContractRead: node,
},
},
}

edges := []*avsproto.TaskEdge{
&avsproto.TaskEdge{
Id: "e1",
Source: "__TRIGGER__",
Target: "123",
},
}

vm, err := NewVMWithData("123", nil, nodes, edges)

n := NewContractReadProcessor(vm, testutil.GetRpcClient())

step, err := n.Execute("123", node)

if err != nil {
t.Errorf("expected contract read node run succesfull but got error: %v", err)
Expand All @@ -41,15 +61,34 @@ func TestContractReadSimpleReturn(t *testing.T) {
}

func TestContractReadComplexReturn(t *testing.T) {
n := NewContractReadProcessor(testutil.GetRpcClient())

node := &avsproto.ContractReadNode{
ContractAddress: "0xc59E3633BAAC79493d908e63626716e204A45EdF",
CallData: "0x9a6fc8f500000000000000000000000000000000000000000000000100000000000052e7",
ContractAbi: `[{"inputs":[{"internalType":"uint80","name":"_roundId","type":"uint80"}],"name":"getRoundData","outputs":[{"internalType":"uint80","name":"roundId","type":"uint80"},{"internalType":"int256","name":"answer","type":"int256"},{"internalType":"uint256","name":"startedAt","type":"uint256"},{"internalType":"uint256","name":"updatedAt","type":"uint256"},{"internalType":"uint80","name":"answeredInRound","type":"uint80"}],"stateMutability":"view","type":"function"}]`,
Method: "getRoundData",
}
step, err := n.Execute("latest round data", node)

nodes := []*avsproto.TaskNode{
&avsproto.TaskNode{
Id: "123abc",
Name: "contractQuery",
TaskType: &avsproto.TaskNode_ContractRead{
ContractRead: node,
},
},
}

edges := []*avsproto.TaskEdge{
&avsproto.TaskEdge{
Id: "e1",
Source: "__TRIGGER__",
Target: "123abc",
},
}

vm, err := NewVMWithData("123abc", nil, nodes, edges)
n := NewContractReadProcessor(vm, testutil.GetRpcClient())
step, err := n.Execute("123abc", node)

if err != nil {
t.Errorf("expected contract read node run succesfull but got error: %v", err)
Expand Down
86 changes: 86 additions & 0 deletions core/taskengine/vm_runner_customcode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package taskengine

import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/dop251/goja"

"github.com/AvaProtocol/ap-avs/core/taskengine/macros"
avsproto "github.com/AvaProtocol/ap-avs/protobuf"
)

type JSProcessor struct {
*CommonProcessor
jsvm *goja.Runtime
}

func NewJSProcessor(vm *VM) *JSProcessor {
r := JSProcessor{
CommonProcessor: &CommonProcessor{
vm: vm,
},
jsvm: goja.New(),
}

for key, value := range macros.GetEnvs(nil) {
r.jsvm.Set(key, value)
}
for key, value := range vm.vars {
r.jsvm.Set(key, map[string]any{
"data": value,
})
}

return &r
}

func (r *JSProcessor) Execute(stepID string, node *avsproto.CustomCodeNode) (*avsproto.Execution_Step, error) {
t0 := time.Now().Unix()

s := &avsproto.Execution_Step{
NodeId: stepID,
Log: "",
OutputData: "",
Success: true,
Error: "",
StartAt: t0,
}

var err error
defer func() {
s.EndAt = time.Now().Unix()
s.Success = err == nil
if err != nil {
s.Error = err.Error()
}
}()

var log strings.Builder

log.WriteString(fmt.Sprintf("Start execute user-input JS code at %s", time.Now()))
//result, err := r.jsvm.RunString("(function() {" + node.Source + "})()")
result, err := r.jsvm.RunString(node.Source)
log.WriteString(fmt.Sprintf("Complete Execute user-input JS code at %s", time.Now()))
if err != nil {
s.Success = false
s.Error = err.Error()
log.WriteString("\nerror running JavaScript code:")
log.WriteString(err.Error())
}
s.Log = log.String()

resultValue := result.Export()
// TODO: capsize
if outputData, serilizeError := json.Marshal(resultValue); serilizeError == nil {
s.OutputData = string(outputData)
} else {
log.WriteString("cannot serilize output data to log")
}

r.SetOutputVarForStep(stepID, resultValue)

return s, err
}
Loading

0 comments on commit 0a2ab8a

Please sign in to comment.