Skip to content

Commit

Permalink
implement accessing to previous node data
Browse files Browse the repository at this point in the history
  • Loading branch information
v9n committed Dec 27, 2024
1 parent d21ec36 commit ad81242
Show file tree
Hide file tree
Showing 6 changed files with 261 additions and 60 deletions.
5 changes: 5 additions & 0 deletions core/taskengine/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ func (x *TaskExecutor) Perform(job *apqueue.Job) error {

func (x *TaskExecutor) RunTask(task *model.Task, triggerMetadata *avsproto.TriggerMetadata) (*avsproto.Execution, error) {
vm, err := NewVMWithData(task.Id, triggerMetadata, task.Nodes, task.Edges)
if err != nil {
return nil, err
}

vm.WithLogger(x.logger)
initialTaskStatus := task.Status

if err != nil {
Expand Down
162 changes: 109 additions & 53 deletions core/taskengine/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

sdklogging "github.com/Layr-Labs/eigensdk-go/logging"
"github.com/dop251/goja"
"github.com/ginkgoch/godash/v2"

Expand Down Expand Up @@ -36,6 +37,19 @@ type Step struct {
Next []string
}

type CommonProcessor struct {
vm *VM
}

func (c *CommonProcessor) SetVar(name string, data any) {
c.vm.AddVar(name, data)
}

// Set the variable for step output so it can be refer and use in subsequent steps
func (c *CommonProcessor) SetOutputVarForStep(stepID string, data any) {
c.vm.AddVar(c.vm.GetNodeNameAsVar(stepID), data)
}

// The VM is the core component that load the node information and execute them, yield finaly result
type VM struct {
// Input raw task data
Expand All @@ -55,16 +69,18 @@ type VM struct {
plans map[string]*Step
entrypoint string
instructionCount int64

logger sdklogging.Logger
}

func NewVM() (*VM, error) {
func NewVM() *VM {
v := &VM{
Status: VMStateInitialize,
mu: &sync.Mutex{},
instructionCount: 0,
}

return v, nil
return v
}

func (v *VM) Reset() {
Expand All @@ -75,6 +91,17 @@ func (v *VM) Reset() {
v.instructionCount = 0
}

func (v *VM) WithLogger(logger sdklogging.Logger) *VM {
v.logger = logger

return v
}

func (v *VM) GetNodeNameAsVar(nodeID string) string {
name := v.TaskNodes[nodeID].Name
return name
}

func NewVMWithData(taskID string, triggerMetadata *avsproto.TriggerMetadata, nodes []*avsproto.TaskNode, edges []*avsproto.TaskEdge) (*VM, error) {
v := &VM{
Status: VMStateInitialize,
Expand All @@ -92,59 +119,67 @@ func NewVMWithData(taskID string, triggerMetadata *avsproto.TriggerMetadata, nod
v.vars = macros.GetEnvs(map[string]any{})

// popular trigger data for trigger variable
if triggerMetadata != nil && triggerMetadata.LogIndex > 0 && triggerMetadata.TxHash != "" {
// if it contains event, we need to fetch and pop
receipt, err := rpcConn.TransactionReceipt(context.Background(), common.HexToHash(triggerMetadata.TxHash))
if err != nil {
return nil, err
}
if triggerMetadata != nil {
if triggerMetadata.LogIndex > 0 && triggerMetadata.TxHash != "" {
// if it contains event, we need to fetch and pop
receipt, err := rpcConn.TransactionReceipt(context.Background(), common.HexToHash(triggerMetadata.TxHash))
if err != nil {
return nil, err
}

var event *types.Log
//event := receipt.Logs[triggerMetadata.LogIndex]

var event *types.Log
//event := receipt.Logs[triggerMetadata.LogIndex]
for _, l := range receipt.Logs {
if uint64(l.Index) == triggerMetadata.LogIndex {
event = l
}
}

for _, l := range receipt.Logs {
if uint64(l.Index) == triggerMetadata.LogIndex {
event = l
if event == nil {
return nil, fmt.Errorf("tx %s doesn't content event %d", triggerMetadata.TxHash, triggerMetadata.LogIndex)
}
}

if event == nil {
return nil, fmt.Errorf("tx %s doesn't content event %d", triggerMetadata.TxHash, triggerMetadata.LogIndex)
}
tokenMetadata, err := GetMetadataForTransfer(event)
ef, err := erc20.NewErc20(event.Address, nil)

tokenMetadata, err := GetMetadataForTransfer(event)
ef, err := erc20.NewErc20(event.Address, nil)
blockHeader, err := GetBlock(event.BlockNumber)
if err != nil {
return nil, fmt.Errorf("RPC error getting block header. Retry: %w", err)
}

blockHeader, err := GetBlock(event.BlockNumber)
if err != nil {
return nil, fmt.Errorf("RPC error getting block header. Retry: %w", err)
parseTransfer, err := ef.ParseTransfer(*event)
formattedValue := ToDecimal(parseTransfer.Value, int(tokenMetadata.Decimals)).String()

// TODO: Implement a decoder to help standarize common event
v.vars["trigger1"] = map[string]interface{}{
"data": map[string]interface{}{
"topics": godash.Map(event.Topics, func(topic common.Hash) string {
return "0x" + strings.ToLower(strings.TrimLeft(topic.String(), "0x0"))
}),
"data": "0x" + common.Bytes2Hex(event.Data),

"token_name": tokenMetadata.Name,
"token_symbol": tokenMetadata.Symbol,
"token_decimals": tokenMetadata.Decimals,
"transaction_hash": event.TxHash,
"address": strings.ToLower(event.Address.Hex()),
"block_number": event.BlockNumber,
"block_timestamp": blockHeader.Time,
"from_address": parseTransfer.From.String(),
"to_address": parseTransfer.To.String(),
"value": parseTransfer.Value.String(),
"value_formatted": formattedValue,
"transaction_index": event.TxIndex,
},
}
}

parseTransfer, err := ef.ParseTransfer(*event)
formattedValue := ToDecimal(parseTransfer.Value, int(tokenMetadata.Decimals)).String()

v.vars["trigger1"] = map[string]interface{}{
"data": map[string]interface{}{
"topics": godash.Map(event.Topics, func(topic common.Hash) string {
return "0x" + strings.ToLower(strings.TrimLeft(topic.String(), "0x0"))
}),
"data": "0x" + common.Bytes2Hex(event.Data),

"token_name": tokenMetadata.Name,
"token_symbol": tokenMetadata.Symbol,
"token_decimals": tokenMetadata.Decimals,
"transaction_hash": event.TxHash,
"address": strings.ToLower(event.Address.Hex()),
"block_number": event.BlockNumber,
"block_timestamp": blockHeader.Time,
"from_address": parseTransfer.From.String(),
"to_address": parseTransfer.To.String(),
"value": parseTransfer.Value.String(),
"value_formatted": formattedValue,
"transaction_index": event.TxIndex,
},
if triggerMetadata.Epoch > 0 {
v.vars["trigger1"] = map[string]any{
"epoch": triggerMetadata.Epoch,
}
}

}

return v, nil
Expand Down Expand Up @@ -244,24 +279,29 @@ func (v *VM) executeNode(node *avsproto.TaskNode) (*avsproto.Execution_Step, err

if nodeValue := node.GetRestApi(); nodeValue != nil {
// TODO: refactor into function
p := NewRestProrcessor()
p := NewRestProrcessor(v)

// only evaluate string when there is string interpolation
if nodeValue.Body != "" && strings.Contains(nodeValue.Body, "$") {
if nodeValue.Body != "" && (strings.Contains(nodeValue.Body, "$") || strings.Contains(nodeValue.Body, "`")) {
nodeValue2 := &avsproto.RestAPINode{
Url: macros.RenderString(nodeValue.Url, macroEnvs),
Headers: nodeValue.Headers,
Method: nodeValue.Method,
Body: strings.Clone(nodeValue.Body),
}
vm := goja.New()
// TODO: dynamically set var instead of hardcode the name
// client would need to send this over
vm.Set("trigger1", v.vars["trigger1"])
jsvm := goja.New()

renderBody, err := vm.RunString(nodeValue.Body)
for key, value := range v.vars {
jsvm.Set(key, map[string]any{
"data": value,
})
}

renderBody, err := jsvm.RunString(nodeValue.Body)
if err == nil {
nodeValue2.Body = renderBody.Export().(string)
} else {
fmt.Println("error render string with goja", err)
}
executionLog, err = p.Execute(node.Id, nodeValue2)
} else {
Expand Down Expand Up @@ -289,11 +329,27 @@ func (v *VM) executeNode(node *avsproto.TaskNode) (*avsproto.Execution_Step, err
}
}
}
} else if nodeValue := node.GetGraphqlQuery(); nodeValue != nil {
executionLog, err = v.runGraphQL(node.Id, nodeValue)
}

return executionLog, err
}

func (v *VM) runGraphQL(stepID string, node *avsproto.GraphQLQueryNode) (*avsproto.Execution_Step, error) {
g, err := NewGraphqlQueryProcessor(v, node.Url)
if err != nil {
return nil, err
}
executionLog, _, err := g.Execute(stepID, node)
if err != nil {
v.logger.Error("error execute graphql node", "task_id", v.TaskID, "step", stepID, "url", node.Url, "error", err)
}
v.ExecutionLogs = append(v.ExecutionLogs, executionLog)

return executionLog, nil
}

func (v *VM) runBranch(stepID string, node *avsproto.BranchNode) (*avsproto.Execution_Step, string, error) {
t0 := time.Now()
s := &avsproto.Execution_Step{
Expand Down
7 changes: 6 additions & 1 deletion core/taskengine/vm_runner_contract_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@ import (
)

type ContractReadProcessor struct {
*CommonProcessor
client *ethclient.Client
}

func NewContractReadProcessor(client *ethclient.Client) *ContractReadProcessor {
func NewContractReadProcessor(vm *VM, client *ethclient.Client) *ContractReadProcessor {
return &ContractReadProcessor{
client: client,
CommonProcessor: &CommonProcessor{
vm: vm,
},
}
}

Expand Down Expand Up @@ -81,6 +85,7 @@ func (r *ContractReadProcessor) Execute(stepID string, node *avsproto.ContractRe
s.Log = log.String()
outputData, err := json.Marshal(result)
s.OutputData = string(outputData)
r.SetOutputVarForStep(stepID, outputData)
if err != nil {
s.Success = false
s.Error = err.Error()
Expand Down
13 changes: 9 additions & 4 deletions core/taskengine/vm_runner_graphql_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ import (
)

type GraphqlQueryProcessor struct {
*CommonProcessor

client *graphql.Client
sb *strings.Builder
url *url.URL
}

func NewGraphqlQueryProcessor(endpoint string) (*GraphqlQueryProcessor, error) {
func NewGraphqlQueryProcessor(vm *VM, endpoint string) (*GraphqlQueryProcessor, error) {
sb := &strings.Builder{}
log := func(s string) {
fmt.Println("LOGLOG", s)
Expand All @@ -39,10 +41,12 @@ func NewGraphqlQueryProcessor(endpoint string) (*GraphqlQueryProcessor, error) {
client: client,
sb: sb,
url: u,

CommonProcessor: &CommonProcessor{vm},
}, nil
}

func (r *GraphqlQueryProcessor) Execute(stepID string, node *avsproto.GraphQLQueryNode) (*avsproto.Execution_Step, error) {
func (r *GraphqlQueryProcessor) Execute(stepID string, node *avsproto.GraphQLQueryNode) (*avsproto.Execution_Step, any, error) {
ctx := context.Background()
t0 := time.Now().Unix()
step := &avsproto.Execution_Step{
Expand All @@ -68,11 +72,12 @@ func (r *GraphqlQueryProcessor) Execute(stepID string, node *avsproto.GraphQLQue
query := graphql.NewRequest(node.Query)
err = r.client.Run(ctx, query, &resp)
if err != nil {
return step, err
return step, nil, err
}

step.Log = r.sb.String()
data, err := json.Marshal(resp)
step.OutputData = string(data)
return step, err
r.SetOutputVarForStep(stepID, resp)
return step, resp, err
}
19 changes: 18 additions & 1 deletion core/taskengine/vm_runner_rest.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package taskengine

import (
"encoding/json"
"fmt"
"net/url"
"strings"
Expand All @@ -12,10 +13,11 @@ import (
)

type RestProcessor struct {
*CommonProcessor
client *resty.Client
}

func NewRestProrcessor() *RestProcessor {
func NewRestProrcessor(vm *VM) *RestProcessor {
client := resty.New()

// Unique settings at Client level
Expand All @@ -28,6 +30,9 @@ func NewRestProrcessor() *RestProcessor {

r := RestProcessor{
client: client,
CommonProcessor: &CommonProcessor{
vm: vm,
},
}

return &r
Expand Down Expand Up @@ -83,6 +88,18 @@ func (r *RestProcessor) Execute(stepID string, node *avsproto.RestAPINode) (*avs
s.Log = log.String()

s.OutputData = string(resp.Body())
// Attempt to detect json
if s.OutputData[0] == '{' || s.OutputData[0] == '[' {
var parseData map[string]any
if err := json.Unmarshal([]byte(s.OutputData), &parseData); err == nil {
r.SetOutputVarForStep(stepID, parseData)
} else {
r.SetOutputVarForStep(stepID, s.OutputData)
}
} else {
r.SetOutputVarForStep(stepID, s.OutputData)
}

if err != nil {
s.Success = false
s.Error = err.Error()
Expand Down
Loading

0 comments on commit ad81242

Please sign in to comment.