Skip to content

Commit

Permalink
replaced govaluate with expr
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Chubatiuk <andrew.chubatiuk@gmail.com>
Signed-off-by: Andrew Chubatiuk <andrew.chubatiuk@motional.com>
  • Loading branch information
Andrew Chubatiuk authored and AndrewChubatiuk committed Apr 1, 2024
1 parent 6b44292 commit fddf83f
Show file tree
Hide file tree
Showing 17 changed files with 167 additions and 209 deletions.
2 changes: 1 addition & 1 deletion common/expr/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (

"encoding/json"

"github.com/antonmedv/expr"
"github.com/doublerebel/bellows"
"github.com/expr-lang/expr"

sprig "github.com/Masterminds/sprig/v3"
exprpkg "github.com/argoproj/pkg/expr"
Expand Down
2 changes: 1 addition & 1 deletion docs/eventsources/filtering.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ spec:
jitter: 0.2
```
The `expression` string is evaluated with the [expr](https://github.com/antonmedv/expr) package which offers a wide set of basic operators and comparators.
The `expression` string is evaluated with the [expr](https://github.com/expr-lang/expr) package which offers a wide set of basic operators and comparators.

# Example

Expand Down
10 changes: 5 additions & 5 deletions docs/sensors/filters/expr.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,18 @@ The `expr` field defines the expression to be evaluated. The `fields` stanza def

`name` is arbitrary and used in the `expr`, `path` defines how to find the value in the data payload then to be assigned to a parameter.

The expr filter evaluates the expression contained in `expr` using [govaluate](https://github.com/Knetic/govaluate). This library leverages an incredible flexibility and power.
The expr filter evaluates the expression contained in `expr` using [expr](https://github.com/expr-lang/expr). This library leverages an incredible flexibility and power.

With govaluate we are able to define complex combination of arithmetic (`-`, `*`, `/`, `**`, `%`), negation (`-`), inversion (`!`), bitwise not (`~`), logical (`&&`, `||`), ternary conditional (`?`, `:`) operators,
With expr we are able to define complex combination of arithmetic (`-`, `*`, `/`, `**`, `%`), negation (`not` or `!`), inversion (`!`), logical (`&&` or `and`, `||` or `or`), ternary conditional (`?`, `:`) operators,
together with comparators (`>`, `<`, `>=`, `<=`), comma-separated arrays and custom functions.

Here some examples:

- `action =~ "start"`
- `action matches "start"`
- `action == "end" && started == true`
- `action =~ "start" || (started == true && instances == 2)`
- `action matches "start" || (started == true && instances == 2)`

To discover all options offered by govaluate, take a look at its [manual](https://github.com/Knetic/govaluate/blob/master/MANUAL.md).
To discover all options offered by expr, take a look at its [manual](https://expr.medv.io/docs/Language-Definition).

## Practical example

Expand Down
26 changes: 16 additions & 10 deletions eventbus/jetstream/sensor/trigger_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (
"sync"
"time"

"github.com/Knetic/govaluate"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/expr-lang/expr"
"github.com/expr-lang/expr/vm"
nats "github.com/nats-io/nats.go"

eventbuscommon "github.com/argoproj/argo-events/eventbus/common"
Expand All @@ -24,7 +25,7 @@ type JetstreamTriggerConn struct {
keyValueStore nats.KeyValue
dependencyExpression string
requiresANDLogic bool
evaluableExpression *govaluate.EvaluableExpression
evaluableExpression *vm.Program
deps []eventbuscommon.Dependency
sourceDepMap map[string][]string // maps EventSource and EventName to dependency name
recentMsgsByID map[string]*msg // prevent re-processing the same message as before (map of msg ID to time)
Expand All @@ -44,30 +45,35 @@ func NewJetstreamTriggerConn(conn *jetstreambase.JetstreamConnection,
var err error

sourceDepMap := make(map[string][]string)
for _, d := range deps {
sanitizedDepExpr := dependencyExpression

for i, d := range deps {
sanitizedDepName := strings.ReplaceAll(d.Name, "-", "_")
key := d.EventSourceName + "__" + d.EventName
_, found := sourceDepMap[key]
if !found {
sourceDepMap[key] = make([]string, 0)
}
sourceDepMap[key] = append(sourceDepMap[key], d.Name)
sanitizedDepExpr = strings.ReplaceAll(sanitizedDepExpr, d.Name, sanitizedDepName)
deps[i].Name = sanitizedDepName
sourceDepMap[key] = append(sourceDepMap[key], sanitizedDepName)
}

connection := &JetstreamTriggerConn{
JetstreamConnection: conn,
sensorName: sensorName,
triggerName: triggerName,
dependencyExpression: dependencyExpression,
requiresANDLogic: strings.Contains(dependencyExpression, "&"),
dependencyExpression: sanitizedDepExpr,
requiresANDLogic: strings.Contains(sanitizedDepExpr, "&"),
deps: deps,
sourceDepMap: sourceDepMap,
recentMsgsByID: make(map[string]*msg),
recentMsgsByTime: make([]*msg, 0)}
connection.Logger = connection.Logger.With("triggerName", connection.triggerName, "sensorName", connection.sensorName)

connection.evaluableExpression, err = govaluate.NewEvaluableExpression(strings.ReplaceAll(dependencyExpression, "-", "\\-"))
connection.evaluableExpression, err = expr.Compile(sanitizedDepExpr)
if err != nil {
errStr := fmt.Sprintf("failed to evaluate expression %s: %v", dependencyExpression, err)
errStr := fmt.Sprintf("failed to evaluate expression %s: %v", sanitizedDepExpr, err)
connection.Logger.Error(errStr)
return nil, fmt.Errorf(errStr)
}
Expand Down Expand Up @@ -161,7 +167,7 @@ func (conn *JetstreamTriggerConn) Subscribe(ctx context.Context,
subscriptionIndex++
}

// create a single goroutine which which handle receiving messages to ensure that all of the processing is occurring on that
// create a single goroutine which handles receiving messages to ensure that all of the processing is occurring on that
// one goroutine and we don't need to worry about race conditions
go conn.processMsgs(ch, processMsgsCloseCh, resetConditionsCh, transform, filter, action, &wg)
wg.Add(1)
Expand Down Expand Up @@ -379,7 +385,7 @@ func (conn *JetstreamTriggerConn) processDependency(
log.Infof("Current state of dependencies: %v", parameters)

// evaluate the filter expression
result, err := conn.evaluableExpression.Evaluate(parameters)
result, err := expr.Run(conn.evaluableExpression, parameters)
if err != nil {
errStr := fmt.Sprintf("failed to evaluate dependency expression: %v", err)
log.Error(errStr)
Expand Down
23 changes: 12 additions & 11 deletions eventbus/kafka/sensor/kafka_sensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (
"time"

"github.com/IBM/sarama"
"github.com/Knetic/govaluate"
eventbuscommon "github.com/argoproj/argo-events/eventbus/common"
"github.com/argoproj/argo-events/eventbus/kafka/base"
eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
sensorv1alpha1 "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/expr-lang/expr"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -172,7 +172,15 @@ func (s *KafkaSensor) Connect(ctx context.Context, triggerName string, depExpres
}

if _, ok := s.triggers[triggerName]; !ok {
expr, err := govaluate.NewEvaluableExpression(strings.ReplaceAll(depExpression, "-", "\\-"))
sanitizedDepExpr := depExpression

for i, d := range dependencies {
sanitizedDepName := strings.ReplaceAll(d.Name, "-", "_")
sanitizedDepExpr = strings.ReplaceAll(sanitizedDepExpr, d.Name, sanitizedDepName)
dependencies[i].Name = sanitizedDepName
}

expr, err := expr.Compile(sanitizedDepExpr)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -285,15 +293,8 @@ func (s *KafkaSensor) Event(msg *sarama.ConsumerMessage) ([]*sarama.ProducerMess
// can skip ahead to the action topic, otherwise produce to
// the trigger topic

var data any
var topic string
if trigger.OneAndDone() {
data = []*cloudevents.Event{event}
topic = s.topics.action
} else {
data = event
topic = s.topics.trigger
}
data := event
topic := s.topics.trigger

value, err := json.Marshal(data)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions eventbus/kafka/sensor/trigger_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"fmt"
"time"

"github.com/Knetic/govaluate"
"github.com/argoproj/argo-events/eventbus/common"
"github.com/argoproj/argo-events/eventbus/kafka/base"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/expr-lang/expr/vm"
)

type KafkaTriggerConnection struct {
Expand All @@ -17,7 +17,7 @@ type KafkaTriggerConnection struct {

sensorName string
triggerName string
depExpression *govaluate.EvaluableExpression
depExpression *vm.Program
dependencies map[string]common.Dependency
atLeastOnce bool

Expand Down
17 changes: 3 additions & 14 deletions eventbus/kafka/sensor/trigger_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package kafka
import (
"time"

"github.com/Knetic/govaluate"
"github.com/argoproj/argo-events/eventbus/common"
"github.com/argoproj/argo-events/eventbus/kafka/base"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/expr-lang/expr"
"go.uber.org/zap"
)

Expand All @@ -15,7 +15,6 @@ type KafkaTriggerHandler interface {
Name() string
Ready() bool
Reset()
OneAndDone() bool
DependsOn(*cloudevents.Event) (string, bool)
Transform(string, *cloudevents.Event) (*cloudevents.Event, error)
Filter(string, *cloudevents.Event) bool
Expand All @@ -42,16 +41,6 @@ func (c *KafkaTriggerConnection) DependsOn(event *cloudevents.Event) (string, bo
return "", false
}

func (c *KafkaTriggerConnection) OneAndDone() bool {
for _, token := range c.depExpression.Tokens() {
if token.Kind == govaluate.LOGICALOP && token.Value == "&&" {
return false
}
}

return true
}

func (c *KafkaTriggerConnection) Transform(depName string, event *cloudevents.Event) (*cloudevents.Event, error) {
return c.transform(depName, *event)
}
Expand Down Expand Up @@ -140,9 +129,9 @@ func (c *KafkaTriggerConnection) satisfied() (interface{}, error) {
}
}

c.Logger.Infow("Evaluating", zap.String("expr", c.depExpression.String()), zap.Any("parameters", parameters))
c.Logger.Infow("Evaluating", zap.String("expr", c.depExpression.Source().Content()), zap.Any("parameters", parameters))

return c.depExpression.Eval(parameters)
return expr.Run(c.depExpression, parameters)
}

func (c *KafkaTriggerConnection) Reset() {
Expand Down
44 changes: 21 additions & 23 deletions eventbus/stan/sensor/trigger_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import (
"sync"
"time"

"github.com/Knetic/govaluate"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/expr-lang/expr"
"github.com/expr-lang/expr/vm"
"github.com/gobwas/glob"
"github.com/nats-io/stan.go"
"github.com/nats-io/stan.go/pb"
Expand Down Expand Up @@ -255,7 +256,7 @@ func (n *STANTriggerConn) processEventSourceMsg(m *stan.Msg, msgHolder *eventSou
}
}

result, err := msgHolder.expr.Evaluate(msgHolder.parameters)
result, err := expr.Run(msgHolder.expr, msgHolder.parameters)
if err != nil {
log.Errorf("failed to evaluate dependency expression: %v", err)
// TODO: how to handle this situation?
Expand Down Expand Up @@ -315,7 +316,7 @@ type eventSourceMessageHolder struct {
lastResetTime time.Time
// if we reach this time, we reset everything (occurs 60 seconds after lastResetTime)
resetTimeout int64
expr *govaluate.EvaluableExpression
expr *vm.Program
depNames []string
// Mapping of [eventSourceName + eventName]dependencyName
sourceDepMap map[string]string
Expand All @@ -330,14 +331,26 @@ type eventSourceMessageHolder struct {
}

func newEventSourceMessageHolder(logger *zap.SugaredLogger, dependencyExpr string, dependencies []eventbuscommon.Dependency, lastResetTime time.Time) (*eventSourceMessageHolder, error) {
dependencyExpr = strings.ReplaceAll(dependencyExpr, "-", "\\-")
expression, err := govaluate.NewEvaluableExpression(dependencyExpr)
sanitizedDepExpr := dependencyExpr

for i, d := range dependencies {
sanitizedDepName := strings.ReplaceAll(d.Name, "-", "_")
sanitizedDepExpr = strings.ReplaceAll(sanitizedDepExpr, d.Name, sanitizedDepName)
dependencies[i].Name = sanitizedDepName
}

expression, err := expr.Compile(sanitizedDepExpr)
if err != nil {
return nil, err
}
deps := unique(expression.Vars())
if len(dependencyExpr) == 0 {
return nil, fmt.Errorf("no dependencies found: %s", dependencyExpr)
deps := []string{}
for _, c := range expression.Constants {
if v, ok := c.(string); ok {
deps = append(deps, v)
}
}
if len(sanitizedDepExpr) == 0 {
return nil, fmt.Errorf("no dependencies found: %s", sanitizedDepExpr)
}

srcDepMap := make(map[string]string)
Expand Down Expand Up @@ -447,18 +460,3 @@ func (mh *eventSourceMessageHolder) isCleanedUp() bool {
}
return len(mh.msgs) == 0
}

func unique(stringSlice []string) []string {
if len(stringSlice) == 0 {
return stringSlice
}
keys := make(map[string]bool)
list := []string{}
for _, entry := range stringSlice {
if _, value := keys[entry]; !value {
keys[entry] = true
list = append(list, entry)
}
}
return list
}
2 changes: 1 addition & 1 deletion eventsources/eventing.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ func filterEvent(data []byte, filter *v1alpha1.EventSourceFilter) (bool, error)

params := make(map[string]interface{})
for key, value := range dataMap {
params[strings.ReplaceAll(key, "-", "_")] = value
params[key] = value
}
env := expr.GetFuncMap(params)
return expr.EvalBool(filter.Expression, env)
Expand Down
Loading

0 comments on commit fddf83f

Please sign in to comment.