Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
v9n committed Dec 14, 2024
1 parent 8c7bc1c commit aa80fe1
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 47 deletions.
100 changes: 55 additions & 45 deletions core/taskengine/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ type Step struct {
Next []string
}

type CommonProcessor struct {
vm *VM
}

// The VM is the core component that load the node information and execute them, yield finaly result
type VM struct {
// Input raw task data
Expand Down Expand Up @@ -101,59 +105,65 @@ func NewVMWithData(taskID string, triggerMetadata *avsproto.TriggerMetadata, nod
v.vars = macros.GetEnvs(map[string]any{})

// popular trigger data for trigger variable
if triggerMetadata != nil && triggerMetadata.LogIndex > 0 && triggerMetadata.TxHash != "" {
// if it contains event, we need to fetch and pop
receipt, err := rpcConn.TransactionReceipt(context.Background(), common.HexToHash(triggerMetadata.TxHash))
if err != nil {
return nil, err
}
if triggerMetadata != nil {
if triggerMetadata.LogIndex > 0 && triggerMetadata.TxHash != "" {
// if it contains event, we need to fetch and pop
receipt, err := rpcConn.TransactionReceipt(context.Background(), common.HexToHash(triggerMetadata.TxHash))
if err != nil {
return nil, err
}

var event *types.Log
//event := receipt.Logs[triggerMetadata.LogIndex]
var event *types.Log
//event := receipt.Logs[triggerMetadata.LogIndex]

for _, l := range receipt.Logs {
if uint64(l.Index) == triggerMetadata.LogIndex {
event = l
for _, l := range receipt.Logs {
if uint64(l.Index) == triggerMetadata.LogIndex {
event = l
}
}
}

if event == nil {
return nil, fmt.Errorf("tx %s doesn't content event %d", triggerMetadata.TxHash, triggerMetadata.LogIndex)
}
if event == nil {
return nil, fmt.Errorf("tx %s doesn't content event %d", triggerMetadata.TxHash, triggerMetadata.LogIndex)
}

tokenMetadata, err := GetMetadataForTransfer(event)
ef, err := erc20.NewErc20(event.Address, nil)
tokenMetadata, err := GetMetadataForTransfer(event)
ef, err := erc20.NewErc20(event.Address, nil)

blockHeader, err := GetBlock(event.BlockNumber)
if err != nil {
return nil, fmt.Errorf("RPC error getting block header. Retry: %w", err)
}
blockHeader, err := GetBlock(event.BlockNumber)
if err != nil {
return nil, fmt.Errorf("RPC error getting block header. Retry: %w", err)
}

parseTransfer, err := ef.ParseTransfer(*event)
formattedValue := ToDecimal(parseTransfer.Value, int(tokenMetadata.Decimals)).String()

v.vars["trigger1"] = map[string]interface{}{
"data": map[string]interface{}{
"topics": godash.Map(event.Topics, func(topic common.Hash) string {
return "0x" + strings.ToLower(strings.TrimLeft(topic.String(), "0x0"))
}),
"data": "0x" + common.Bytes2Hex(event.Data),

"token_name": tokenMetadata.Name,
"token_symbol": tokenMetadata.Symbol,
"token_decimals": tokenMetadata.Decimals,
"transaction_hash": event.TxHash,
"address": strings.ToLower(event.Address.Hex()),
"block_number": event.BlockNumber,
"block_timestamp": blockHeader.Time,
"from_address": parseTransfer.From.String(),
"to_address": parseTransfer.To.String(),
"value": parseTransfer.Value.String(),
"value_formatted": formattedValue,
"transaction_index": event.TxIndex,
},
parseTransfer, err := ef.ParseTransfer(*event)
formattedValue := ToDecimal(parseTransfer.Value, int(tokenMetadata.Decimals)).String()

// TODO: Implement a decoder to help standarize common event
v.vars["trigger1"] = map[string]interface{}{
"data": map[string]interface{}{
"topics": godash.Map(event.Topics, func(topic common.Hash) string {
return "0x" + strings.ToLower(strings.TrimLeft(topic.String(), "0x0"))
}),
"data": "0x" + common.Bytes2Hex(event.Data),

"token_name": tokenMetadata.Name,
"token_symbol": tokenMetadata.Symbol,
"token_decimals": tokenMetadata.Decimals,
"transaction_hash": event.TxHash,
"address": strings.ToLower(event.Address.Hex()),
"block_number": event.BlockNumber,
"block_timestamp": blockHeader.Time,
"from_address": parseTransfer.From.String(),
"to_address": parseTransfer.To.String(),
"value": parseTransfer.Value.String(),
"value_formatted": formattedValue,
"transaction_index": event.TxIndex,
},
}
}

if triggerMetadata.Epoch > 0 {
v.vars["trigger1"]["epoch"] = triggerMetadata.Epoch
}
}

return v, nil
Expand Down Expand Up @@ -312,7 +322,7 @@ func (v *VM) executeNode(node *avsproto.TaskNode) (*avsproto.Execution_Step, err
}

func (v *VM) runGraphQL(stepID string, node *avsproto.GraphQLQueryNode) (*avsproto.Execution_Step, error) {
g, err := NewGraphqlQueryProcessor(node.Url)
g, err := NewGraphqlQueryProcessor(v, node.Url)
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions core/taskengine/vm_runner_contract_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@ import (
)

type ContractReadProcessor struct {
*CommonProcessor
client *ethclient.Client
}

func NewContractReadProcessor(client *ethclient.Client) *ContractReadProcessor {
func NewContractReadProcessor(vm *VM, client *ethclient.Client) *ContractReadProcessor {
return &ContractReadProcessor{
client: client,
vm: vm,
}
}

func (r *ContractReadProcessor) Execute(stepID string, node *avsproto.ContractReadNode) (*avsproto.Execution_Step, error) {
func (r *ContractReadProcessor) Execute(stepID string, node *avsproto.ContractReadNode) (*avsproto.Execution_Step, anyerror) {
ctx := context.Background()
t0 := time.Now().Unix()
s := &avsproto.Execution_Step{
Expand Down
2 changes: 2 additions & 0 deletions core/taskengine/vm_runner_graphql_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
)

type GraphqlQueryProcessor struct {
*CommonProcessor

client *graphql.Client
sb *strings.Builder
url *url.URL
Expand Down
1 change: 1 addition & 0 deletions core/taskengine/vm_runner_rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (r *RestProcessor) Execute(stepID string, node *avsproto.RestAPINode) (*avs
s.Log = log.String()

s.OutputData = string(resp.Body())

if err != nil {
s.Success = false
s.Error = err.Error()
Expand Down

0 comments on commit aa80fe1

Please sign in to comment.