diff --git a/core/taskengine/executor.go b/core/taskengine/executor.go index f183e60..9fd69d1 100644 --- a/core/taskengine/executor.go +++ b/core/taskengine/executor.go @@ -73,6 +73,11 @@ func (x *TaskExecutor) Perform(job *apqueue.Job) error { func (x *TaskExecutor) RunTask(task *model.Task, triggerMetadata *avsproto.TriggerMetadata) (*avsproto.Execution, error) { vm, err := NewVMWithData(task.Id, triggerMetadata, task.Nodes, task.Edges) + if err != nil { + return nil, err + } + + vm.WithLogger(x.logger) initialTaskStatus := task.Status if err != nil { diff --git a/core/taskengine/vm.go b/core/taskengine/vm.go index 07068e9..539a0a9 100644 --- a/core/taskengine/vm.go +++ b/core/taskengine/vm.go @@ -7,6 +7,7 @@ import ( "sync" "time" + sdklogging "github.com/Layr-Labs/eigensdk-go/logging" "github.com/dop251/goja" "github.com/ginkgoch/godash/v2" @@ -36,6 +37,19 @@ type Step struct { Next []string } +type CommonProcessor struct { + vm *VM +} + +func (c *CommonProcessor) SetVar(name string, data any) { + c.vm.AddVar(name, data) +} + +// Set the variable for step output so it can be refer and use in subsequent steps +func (c *CommonProcessor) SetOutputVarForStep(stepID string, data any) { + c.vm.AddVar(c.vm.GetNodeNameAsVar(stepID), data) +} + // The VM is the core component that load the node information and execute them, yield finaly result type VM struct { // Input raw task data @@ -55,16 +69,18 @@ type VM struct { plans map[string]*Step entrypoint string instructionCount int64 + + logger sdklogging.Logger } -func NewVM() (*VM, error) { +func NewVM() *VM { v := &VM{ Status: VMStateInitialize, mu: &sync.Mutex{}, instructionCount: 0, } - return v, nil + return v } func (v *VM) Reset() { @@ -75,6 +91,17 @@ func (v *VM) Reset() { v.instructionCount = 0 } +func (v *VM) WithLogger(logger sdklogging.Logger) *VM { + v.logger = logger + + return v +} + +func (v *VM) GetNodeNameAsVar(nodeID string) string { + name := v.TaskNodes[nodeID].Name + return name +} + func NewVMWithData(taskID string, triggerMetadata *avsproto.TriggerMetadata, nodes []*avsproto.TaskNode, edges []*avsproto.TaskEdge) (*VM, error) { v := &VM{ Status: VMStateInitialize, @@ -92,59 +119,67 @@ func NewVMWithData(taskID string, triggerMetadata *avsproto.TriggerMetadata, nod v.vars = macros.GetEnvs(map[string]any{}) // popular trigger data for trigger variable - if triggerMetadata != nil && triggerMetadata.LogIndex > 0 && triggerMetadata.TxHash != "" { - // if it contains event, we need to fetch and pop - receipt, err := rpcConn.TransactionReceipt(context.Background(), common.HexToHash(triggerMetadata.TxHash)) - if err != nil { - return nil, err - } + if triggerMetadata != nil { + if triggerMetadata.LogIndex > 0 && triggerMetadata.TxHash != "" { + // if it contains event, we need to fetch and pop + receipt, err := rpcConn.TransactionReceipt(context.Background(), common.HexToHash(triggerMetadata.TxHash)) + if err != nil { + return nil, err + } + + var event *types.Log + //event := receipt.Logs[triggerMetadata.LogIndex] - var event *types.Log - //event := receipt.Logs[triggerMetadata.LogIndex] + for _, l := range receipt.Logs { + if uint64(l.Index) == triggerMetadata.LogIndex { + event = l + } + } - for _, l := range receipt.Logs { - if uint64(l.Index) == triggerMetadata.LogIndex { - event = l + if event == nil { + return nil, fmt.Errorf("tx %s doesn't content event %d", triggerMetadata.TxHash, triggerMetadata.LogIndex) } - } - if event == nil { - return nil, fmt.Errorf("tx %s doesn't content event %d", triggerMetadata.TxHash, triggerMetadata.LogIndex) - } + tokenMetadata, err := GetMetadataForTransfer(event) + ef, err := erc20.NewErc20(event.Address, nil) - tokenMetadata, err := GetMetadataForTransfer(event) - ef, err := erc20.NewErc20(event.Address, nil) + blockHeader, err := GetBlock(event.BlockNumber) + if err != nil { + return nil, fmt.Errorf("RPC error getting block header. Retry: %w", err) + } - blockHeader, err := GetBlock(event.BlockNumber) - if err != nil { - return nil, fmt.Errorf("RPC error getting block header. Retry: %w", err) + parseTransfer, err := ef.ParseTransfer(*event) + formattedValue := ToDecimal(parseTransfer.Value, int(tokenMetadata.Decimals)).String() + + // TODO: Implement a decoder to help standarize common event + v.vars["trigger1"] = map[string]interface{}{ + "data": map[string]interface{}{ + "topics": godash.Map(event.Topics, func(topic common.Hash) string { + return "0x" + strings.ToLower(strings.TrimLeft(topic.String(), "0x0")) + }), + "data": "0x" + common.Bytes2Hex(event.Data), + + "token_name": tokenMetadata.Name, + "token_symbol": tokenMetadata.Symbol, + "token_decimals": tokenMetadata.Decimals, + "transaction_hash": event.TxHash, + "address": strings.ToLower(event.Address.Hex()), + "block_number": event.BlockNumber, + "block_timestamp": blockHeader.Time, + "from_address": parseTransfer.From.String(), + "to_address": parseTransfer.To.String(), + "value": parseTransfer.Value.String(), + "value_formatted": formattedValue, + "transaction_index": event.TxIndex, + }, + } } - parseTransfer, err := ef.ParseTransfer(*event) - formattedValue := ToDecimal(parseTransfer.Value, int(tokenMetadata.Decimals)).String() - - v.vars["trigger1"] = map[string]interface{}{ - "data": map[string]interface{}{ - "topics": godash.Map(event.Topics, func(topic common.Hash) string { - return "0x" + strings.ToLower(strings.TrimLeft(topic.String(), "0x0")) - }), - "data": "0x" + common.Bytes2Hex(event.Data), - - "token_name": tokenMetadata.Name, - "token_symbol": tokenMetadata.Symbol, - "token_decimals": tokenMetadata.Decimals, - "transaction_hash": event.TxHash, - "address": strings.ToLower(event.Address.Hex()), - "block_number": event.BlockNumber, - "block_timestamp": blockHeader.Time, - "from_address": parseTransfer.From.String(), - "to_address": parseTransfer.To.String(), - "value": parseTransfer.Value.String(), - "value_formatted": formattedValue, - "transaction_index": event.TxIndex, - }, + if triggerMetadata.Epoch > 0 { + v.vars["trigger1"] = map[string]any{ + "epoch": triggerMetadata.Epoch, + } } - } return v, nil @@ -244,24 +279,29 @@ func (v *VM) executeNode(node *avsproto.TaskNode) (*avsproto.Execution_Step, err if nodeValue := node.GetRestApi(); nodeValue != nil { // TODO: refactor into function - p := NewRestProrcessor() + p := NewRestProrcessor(v) // only evaluate string when there is string interpolation - if nodeValue.Body != "" && strings.Contains(nodeValue.Body, "$") { + if nodeValue.Body != "" && (strings.Contains(nodeValue.Body, "$") || strings.Contains(nodeValue.Body, "`")) { nodeValue2 := &avsproto.RestAPINode{ Url: macros.RenderString(nodeValue.Url, macroEnvs), Headers: nodeValue.Headers, Method: nodeValue.Method, Body: strings.Clone(nodeValue.Body), } - vm := goja.New() - // TODO: dynamically set var instead of hardcode the name - // client would need to send this over - vm.Set("trigger1", v.vars["trigger1"]) + jsvm := goja.New() - renderBody, err := vm.RunString(nodeValue.Body) + for key, value := range v.vars { + jsvm.Set(key, map[string]any{ + "data": value, + }) + } + + renderBody, err := jsvm.RunString(nodeValue.Body) if err == nil { nodeValue2.Body = renderBody.Export().(string) + } else { + fmt.Println("error render string with goja", err) } executionLog, err = p.Execute(node.Id, nodeValue2) } else { @@ -289,11 +329,27 @@ func (v *VM) executeNode(node *avsproto.TaskNode) (*avsproto.Execution_Step, err } } } + } else if nodeValue := node.GetGraphqlQuery(); nodeValue != nil { + executionLog, err = v.runGraphQL(node.Id, nodeValue) } return executionLog, err } +func (v *VM) runGraphQL(stepID string, node *avsproto.GraphQLQueryNode) (*avsproto.Execution_Step, error) { + g, err := NewGraphqlQueryProcessor(v, node.Url) + if err != nil { + return nil, err + } + executionLog, _, err := g.Execute(stepID, node) + if err != nil { + v.logger.Error("error execute graphql node", "task_id", v.TaskID, "step", stepID, "url", node.Url, "error", err) + } + v.ExecutionLogs = append(v.ExecutionLogs, executionLog) + + return executionLog, nil +} + func (v *VM) runBranch(stepID string, node *avsproto.BranchNode) (*avsproto.Execution_Step, string, error) { t0 := time.Now() s := &avsproto.Execution_Step{ diff --git a/core/taskengine/vm_runner_contract_read.go b/core/taskengine/vm_runner_contract_read.go index 9b2f691..0b2e5bc 100644 --- a/core/taskengine/vm_runner_contract_read.go +++ b/core/taskengine/vm_runner_contract_read.go @@ -16,12 +16,16 @@ import ( ) type ContractReadProcessor struct { + *CommonProcessor client *ethclient.Client } -func NewContractReadProcessor(client *ethclient.Client) *ContractReadProcessor { +func NewContractReadProcessor(vm *VM, client *ethclient.Client) *ContractReadProcessor { return &ContractReadProcessor{ client: client, + CommonProcessor: &CommonProcessor{ + vm: vm, + }, } } @@ -81,6 +85,7 @@ func (r *ContractReadProcessor) Execute(stepID string, node *avsproto.ContractRe s.Log = log.String() outputData, err := json.Marshal(result) s.OutputData = string(outputData) + r.SetOutputVarForStep(stepID, outputData) if err != nil { s.Success = false s.Error = err.Error() diff --git a/core/taskengine/vm_runner_graphql_query.go b/core/taskengine/vm_runner_graphql_query.go index e4a8b5d..186b88d 100644 --- a/core/taskengine/vm_runner_graphql_query.go +++ b/core/taskengine/vm_runner_graphql_query.go @@ -13,12 +13,14 @@ import ( ) type GraphqlQueryProcessor struct { + *CommonProcessor + client *graphql.Client sb *strings.Builder url *url.URL } -func NewGraphqlQueryProcessor(endpoint string) (*GraphqlQueryProcessor, error) { +func NewGraphqlQueryProcessor(vm *VM, endpoint string) (*GraphqlQueryProcessor, error) { sb := &strings.Builder{} log := func(s string) { fmt.Println("LOGLOG", s) @@ -39,10 +41,12 @@ func NewGraphqlQueryProcessor(endpoint string) (*GraphqlQueryProcessor, error) { client: client, sb: sb, url: u, + + CommonProcessor: &CommonProcessor{vm}, }, nil } -func (r *GraphqlQueryProcessor) Execute(stepID string, node *avsproto.GraphQLQueryNode) (*avsproto.Execution_Step, error) { +func (r *GraphqlQueryProcessor) Execute(stepID string, node *avsproto.GraphQLQueryNode) (*avsproto.Execution_Step, any, error) { ctx := context.Background() t0 := time.Now().Unix() step := &avsproto.Execution_Step{ @@ -68,11 +72,12 @@ func (r *GraphqlQueryProcessor) Execute(stepID string, node *avsproto.GraphQLQue query := graphql.NewRequest(node.Query) err = r.client.Run(ctx, query, &resp) if err != nil { - return step, err + return step, nil, err } step.Log = r.sb.String() data, err := json.Marshal(resp) step.OutputData = string(data) - return step, err + r.SetOutputVarForStep(stepID, resp) + return step, resp, err } diff --git a/core/taskengine/vm_runner_rest.go b/core/taskengine/vm_runner_rest.go index 47c098a..5b87370 100644 --- a/core/taskengine/vm_runner_rest.go +++ b/core/taskengine/vm_runner_rest.go @@ -1,6 +1,7 @@ package taskengine import ( + "encoding/json" "fmt" "net/url" "strings" @@ -12,10 +13,11 @@ import ( ) type RestProcessor struct { + *CommonProcessor client *resty.Client } -func NewRestProrcessor() *RestProcessor { +func NewRestProrcessor(vm *VM) *RestProcessor { client := resty.New() // Unique settings at Client level @@ -28,6 +30,9 @@ func NewRestProrcessor() *RestProcessor { r := RestProcessor{ client: client, + CommonProcessor: &CommonProcessor{ + vm: vm, + }, } return &r @@ -83,6 +88,18 @@ func (r *RestProcessor) Execute(stepID string, node *avsproto.RestAPINode) (*avs s.Log = log.String() s.OutputData = string(resp.Body()) + // Attempt to detect json + if s.OutputData[0] == '{' || s.OutputData[0] == '[' { + var parseData map[string]any + if err := json.Unmarshal([]byte(s.OutputData), &parseData); err == nil { + r.SetOutputVarForStep(stepID, parseData) + } else { + r.SetOutputVarForStep(stepID, s.OutputData) + } + } else { + r.SetOutputVarForStep(stepID, s.OutputData) + } + if err != nil { s.Success = false s.Error = err.Error() diff --git a/examples/example.js b/examples/example.js index 606f995..1edda88 100644 --- a/examples/example.js +++ b/examples/example.js @@ -301,6 +301,9 @@ const main = async (cmd) => { case "schedule-monitor": scheduleMonitor(owner, token, process.argv[3]); break; + case "schedule-aave": + scheduleAaveMonitor(owner, token); + break; case "schedule": case "schedule-cron": case "schedule-event": @@ -384,7 +387,6 @@ const main = async (cmd) => { case "delete": await deleteTask(owner, token, process.argv[3]); break; - case "wallet": await getWallets(owner, token); break; @@ -413,6 +415,7 @@ const main = async (cmd) => { schedule-cron : to schedule a task that run on cron schedule-event : to schedule a task that run on occurenct of an event schedule-generic: to schedule a task with an arbitrary contract query + schedule-aave: monitor and report aavee liquidity rate every block monitor-address : to monitor erc20 in/out for an address trigger : manually trigger a task. Example: trigger abcdef '{"block_number":1234}' for blog trigger @@ -674,6 +677,116 @@ async function scheduleMonitor(owner, token, target) { return result; } +// setup a task to monitor in/out transfer for a wallet and send notification +async function scheduleAaveMonitor(owner, token) { + const wallets = await getWallets(owner, token); + const smartWalletAddress = wallets[0].address; + + const metadata = new grpc.Metadata(); + metadata.add("authkey", token); + + let trigger = { + name: "trigger1", + block: { + interval: 1, + }, + }; + + const getReserveId = UlidMonotonic.generate().toCanonical(); + const sendSummaryId = UlidMonotonic.generate().toCanonical(); + const getIpId = UlidMonotonic.generate().toCanonical(); + + const result = await asyncRPC( + client, + "CreateTask", + { + smart_wallet_address: smartWalletAddress, + nodes: [ + { + id: getReserveId, + name: 'getReserveUSDC', + graphql_query: { + url: 'https://gateway.thegraph.com/api/10186dcf11921c7d1bc140721c69da38/subgraphs/id/Cd2gEDVeqnjBn1hSeqFMitw8Q1iiyV9FYUZkLNRcL87g', + query: ` + { + reserves(where: {underlyingAsset: "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"}) { + id + underlyingAsset + name + decimals + liquidityRate + aToken { + id + } + sToken { + id + } + } + } + ` + } + }, + { + id: getIpId, + name: 'getIpAddress', + rest_api: { + url: 'https://ipinfo.io/json', + } + }, + + { + id: sendSummaryId, + name: 'notification', + rest_api: { + url: "https://api.telegram.org/bot{{notify_bot_token}}/sendMessage?", + //url: `https://webhook.site/ca416047-5ba0-4485-8f98-76790b63add7`, + method: "POST", + body: `JSON.stringify({ + chat_id:-4609037622, + text: \`Node IP is: \${getIpAddress.data.ip}.\nCurrent USDC liquidity rate in RAY unit is \${getReserveUSDC.data.reserves[0].liquidityRate} \` + })`, + headers: { + "content-type": "application/json" + } + } + }, + ], + + edges: [ + { + id: UlidMonotonic.generate().toCanonical(), + // __TRIGGER__ is a special node. It doesn't appear directly in the task data, but it should be draw on the UI to show what is the entrypoint + source: "__TRIGGER__", + target: getIpId, + }, + { + id: UlidMonotonic.generate().toCanonical(), + // __TRIGGER__ is a special node. It doesn't appear directly in the task data, but it should be draw on the UI to show what is the entrypoint + source: getIpId, + target: getReserveId, + }, + { + id: UlidMonotonic.generate().toCanonical(), + // __TRIGGER__ is a special node. It doesn't appear directly in the task data, but it should be draw on the UI to show what is the entrypoint + source: getReserveId, + target: sendSummaryId, + }, + ], + + trigger, + start_at: Math.floor(Date.now() / 1000) + 30, + expired_at: Math.floor(Date.now() / 1000 + 3600 * 24 * 30), + memo: `Montoring USDC aavee on ethereum`, + }, + metadata + ); + + console.log("create task", result); + + return result; +} + + (async () => { try { main(process.argv[2]);