Skip to content

Commit

Permalink
replaced govaluate with expr
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Chubatiuk <andrew.chubatiuk@motional.com>
  • Loading branch information
Andrew Chubatiuk committed Dec 11, 2023
1 parent bf455ea commit 8215f19
Show file tree
Hide file tree
Showing 39 changed files with 174 additions and 277 deletions.
2 changes: 1 addition & 1 deletion common/expr/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (

"encoding/json"

"github.com/antonmedv/expr"
"github.com/doublerebel/bellows"
"github.com/expr-lang/expr"

sprig "github.com/Masterminds/sprig/v3"
exprpkg "github.com/argoproj/pkg/expr"
Expand Down
2 changes: 1 addition & 1 deletion docs/eventsources/filtering.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ spec:
jitter: 0.2
```
The `expression` string is evaluated with the [expr](https://github.com/antonmedv/expr) package which offers a wide set of basic operators and comparators.
The `expression` string is evaluated with the [expr](https://github.com/expr-lang/expr) package which offers a wide set of basic operators and comparators.

# Example

Expand Down
10 changes: 5 additions & 5 deletions docs/sensors/filters/expr.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,18 @@ The `expr` field defines the expression to be evaluated. The `fields` stanza def

`name` is arbitrary and used in the `expr`, `path` defines how to find the value in the data payload then to be assigned to a parameter.

The expr filter evaluates the expression contained in `expr` using [govaluate](https://github.com/Knetic/govaluate). This library leverages an incredible flexibility and power.
The expr filter evaluates the expression contained in `expr` using [expr](https://github.com/expr-lang/expr). This library leverages an incredible flexibility and power.

With govaluate we are able to define complex combination of arithmetic (`-`, `*`, `/`, `**`, `%`), negation (`-`), inversion (`!`), bitwise not (`~`), logical (`&&`, `||`), ternary conditional (`?`, `:`) operators,
With expr we are able to define complex combination of arithmetic (`-`, `*`, `/`, `**`, `%`), negation (`not` or `!`), inversion (`!`), logical (`&&` or `and`, `||` or `or`), ternary conditional (`?`, `:`) operators,
together with comparators (`>`, `<`, `>=`, `<=`), comma-separated arrays and custom functions.

Here some examples:

- `action =~ "start"`
- `action matches "start"`
- `action == "end" && started == true`
- `action =~ "start" || (started == true && instances == 2)`
- `action matches "start" || (started == true && instances == 2)`

To discover all options offered by govaluate, take a look at its [manual](https://github.com/Knetic/govaluate/blob/master/MANUAL.md).
To discover all options offered by expr, take a look at its [manual](https://expr.medv.io/docs/Language-Definition).

## Practical example

Expand Down
11 changes: 6 additions & 5 deletions eventbus/jetstream/sensor/trigger_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (
"sync"
"time"

"github.com/Knetic/govaluate"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/expr-lang/expr"
"github.com/expr-lang/expr/vm"
nats "github.com/nats-io/nats.go"

eventbuscommon "github.com/argoproj/argo-events/eventbus/common"
Expand All @@ -24,7 +25,7 @@ type JetstreamTriggerConn struct {
keyValueStore nats.KeyValue
dependencyExpression string
requiresANDLogic bool
evaluableExpression *govaluate.EvaluableExpression
evaluableExpression *vm.Program
deps []eventbuscommon.Dependency
sourceDepMap map[string][]string // maps EventSource and EventName to dependency name
recentMsgsByID map[string]*msg // prevent re-processing the same message as before (map of msg ID to time)
Expand Down Expand Up @@ -65,7 +66,7 @@ func NewJetstreamTriggerConn(conn *jetstreambase.JetstreamConnection,
recentMsgsByTime: make([]*msg, 0)}
connection.Logger = connection.Logger.With("triggerName", connection.triggerName, "sensorName", connection.sensorName)

connection.evaluableExpression, err = govaluate.NewEvaluableExpression(strings.ReplaceAll(dependencyExpression, "-", "\\-"))
connection.evaluableExpression, err = expr.Compile(dependencyExpression)
if err != nil {
errStr := fmt.Sprintf("failed to evaluate expression %s: %v", dependencyExpression, err)
connection.Logger.Error(errStr)
Expand Down Expand Up @@ -161,7 +162,7 @@ func (conn *JetstreamTriggerConn) Subscribe(ctx context.Context,
subscriptionIndex++
}

// create a single goroutine which which handle receiving messages to ensure that all of the processing is occurring on that
// create a single goroutine which handles receiving messages to ensure that all of the processing is occurring on that
// one goroutine and we don't need to worry about race conditions
go conn.processMsgs(ch, processMsgsCloseCh, resetConditionsCh, transform, filter, action, &wg)
wg.Add(1)
Expand Down Expand Up @@ -379,7 +380,7 @@ func (conn *JetstreamTriggerConn) processDependency(
log.Infof("Current state of dependencies: %v", parameters)

// evaluate the filter expression
result, err := conn.evaluableExpression.Evaluate(parameters)
result, err := expr.Run(conn.evaluableExpression, parameters)
if err != nil {
errStr := fmt.Sprintf("failed to evaluate dependency expression: %v", err)
log.Error(errStr)
Expand Down
16 changes: 4 additions & 12 deletions eventbus/kafka/sensor/kafka_sensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,16 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"time"

"github.com/IBM/sarama"
"github.com/Knetic/govaluate"
eventbuscommon "github.com/argoproj/argo-events/eventbus/common"
"github.com/argoproj/argo-events/eventbus/kafka/base"
eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
sensorv1alpha1 "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/expr-lang/expr"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -165,7 +164,7 @@ func (s *KafkaSensor) Connect(ctx context.Context, triggerName string, depExpres
}

if _, ok := s.triggers[triggerName]; !ok {
expr, err := govaluate.NewEvaluableExpression(strings.ReplaceAll(depExpression, "-", "\\-"))
expr, err := expr.Compile(depExpression)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -278,15 +277,8 @@ func (s *KafkaSensor) Event(msg *sarama.ConsumerMessage) ([]*sarama.ProducerMess
// can skip ahead to the action topic, otherwise produce to
// the trigger topic

var data any
var topic string
if trigger.OneAndDone() {
data = []*cloudevents.Event{event}
topic = s.topics.action
} else {
data = event
topic = s.topics.trigger
}
data := event
topic := s.topics.trigger

value, err := json.Marshal(data)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions eventbus/kafka/sensor/trigger_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"fmt"
"time"

"github.com/Knetic/govaluate"
"github.com/argoproj/argo-events/eventbus/common"
"github.com/argoproj/argo-events/eventbus/kafka/base"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/expr-lang/expr/vm"
)

type KafkaTriggerConnection struct {
Expand All @@ -17,7 +17,7 @@ type KafkaTriggerConnection struct {

sensorName string
triggerName string
depExpression *govaluate.EvaluableExpression
depExpression *vm.Program
dependencies map[string]common.Dependency
atLeastOnce bool

Expand Down
17 changes: 3 additions & 14 deletions eventbus/kafka/sensor/trigger_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package kafka
import (
"time"

"github.com/Knetic/govaluate"
"github.com/argoproj/argo-events/eventbus/common"
"github.com/argoproj/argo-events/eventbus/kafka/base"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/expr-lang/expr"
"go.uber.org/zap"
)

Expand All @@ -15,7 +15,6 @@ type KafkaTriggerHandler interface {
Name() string
Ready() bool
Reset()
OneAndDone() bool
DependsOn(*cloudevents.Event) (string, bool)
Transform(string, *cloudevents.Event) (*cloudevents.Event, error)
Filter(string, *cloudevents.Event) bool
Expand All @@ -42,16 +41,6 @@ func (c *KafkaTriggerConnection) DependsOn(event *cloudevents.Event) (string, bo
return "", false
}

func (c *KafkaTriggerConnection) OneAndDone() bool {
for _, token := range c.depExpression.Tokens() {
if token.Kind == govaluate.LOGICALOP && token.Value == "&&" {
return false
}
}

return true
}

func (c *KafkaTriggerConnection) Transform(depName string, event *cloudevents.Event) (*cloudevents.Event, error) {
return c.transform(depName, *event)
}
Expand Down Expand Up @@ -140,9 +129,9 @@ func (c *KafkaTriggerConnection) satisfied() (interface{}, error) {
}
}

c.Logger.Infow("Evaluating", zap.String("expr", c.depExpression.String()), zap.Any("parameters", parameters))
c.Logger.Infow("Evaluating", zap.String("expr", c.depExpression.Source().Content()), zap.Any("parameters", parameters))

return c.depExpression.Eval(parameters)
return expr.Run(c.depExpression, parameters)
}

func (c *KafkaTriggerConnection) Reset() {
Expand Down
32 changes: 11 additions & 21 deletions eventbus/stan/sensor/trigger_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import (
"sync"
"time"

"github.com/Knetic/govaluate"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/expr-lang/expr"
"github.com/expr-lang/expr/vm"
"github.com/gobwas/glob"
"github.com/nats-io/stan.go"
"github.com/nats-io/stan.go/pb"
Expand Down Expand Up @@ -255,7 +256,7 @@ func (n *STANTriggerConn) processEventSourceMsg(m *stan.Msg, msgHolder *eventSou
}
}

result, err := msgHolder.expr.Evaluate(msgHolder.parameters)
result, err := expr.Run(msgHolder.expr, msgHolder.parameters)
if err != nil {
log.Errorf("failed to evaluate dependency expression: %v", err)
// TODO: how to handle this situation?
Expand Down Expand Up @@ -315,7 +316,7 @@ type eventSourceMessageHolder struct {
lastResetTime time.Time
// if we reach this time, we reset everything (occurs 60 seconds after lastResetTime)
resetTimeout int64
expr *govaluate.EvaluableExpression
expr *vm.Program
depNames []string
// Mapping of [eventSourceName + eventName]dependencyName
sourceDepMap map[string]string
Expand All @@ -330,12 +331,16 @@ type eventSourceMessageHolder struct {
}

func newEventSourceMessageHolder(logger *zap.SugaredLogger, dependencyExpr string, dependencies []eventbuscommon.Dependency, lastResetTime time.Time) (*eventSourceMessageHolder, error) {
dependencyExpr = strings.ReplaceAll(dependencyExpr, "-", "\\-")
expression, err := govaluate.NewEvaluableExpression(dependencyExpr)
expression, err := expr.Compile(dependencyExpr)
if err != nil {
return nil, err
}
deps := unique(expression.Vars())
deps := []string{}
for _, c := range expression.Constants {
if v, ok := c.(string); ok {
deps = append(deps, v)
}
}
if len(dependencyExpr) == 0 {
return nil, fmt.Errorf("no dependencies found: %s", dependencyExpr)
}
Expand Down Expand Up @@ -447,18 +452,3 @@ func (mh *eventSourceMessageHolder) isCleanedUp() bool {
}
return len(mh.msgs) == 0
}

func unique(stringSlice []string) []string {
if len(stringSlice) == 0 {
return stringSlice
}
keys := make(map[string]bool)
list := []string{}
for _, entry := range stringSlice {
if _, value := keys[entry]; !value {
keys[entry] = true
list = append(list, entry)
}
}
return list
}
2 changes: 1 addition & 1 deletion eventsources/eventing.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ func filterEvent(data []byte, filter *v1alpha1.EventSourceFilter) (bool, error)

params := make(map[string]interface{})
for key, value := range dataMap {
params[strings.ReplaceAll(key, "-", "_")] = value
params[key] = value
}
env := expr.GetFuncMap(params)
return expr.EvalBool(filter.Expression, env)
Expand Down
33 changes: 15 additions & 18 deletions eventsources/sources/hdfs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package hdfs
import (
"fmt"

"github.com/colinmarc/hdfs"
krb "gopkg.in/jcmturner/gokrb5.v5/client"
"gopkg.in/jcmturner/gokrb5.v5/config"
"gopkg.in/jcmturner/gokrb5.v5/credentials"
"gopkg.in/jcmturner/gokrb5.v5/keytab"
"github.com/colinmarc/hdfs/v2"
krb "github.com/jcmturner/gokrb5/v8/client"
"github.com/jcmturner/gokrb5/v8/config"
"github.com/jcmturner/gokrb5/v8/credentials"
"github.com/jcmturner/gokrb5/v8/keytab"
corev1 "k8s.io/api/core/v1"

"github.com/argoproj/argo-events/common"
Expand All @@ -31,12 +31,12 @@ type KrbOptions struct {

// CCacheOptions is options for ccache
type CCacheOptions struct {
CCache credentials.CCache
CCache *credentials.CCache
}

// KeytabOptions is options for keytab
type KeytabOptions struct {
Keytab keytab.Keytab
Keytab *keytab.Keytab
Username string
Realm string
}
Expand Down Expand Up @@ -74,7 +74,8 @@ func createHDFSConfig(hdfsEventSource *v1alpha1.HDFSEventSource) (*HDFSConfig, e
if err != nil {
return nil, err
}
ccache, err := credentials.ParseCCache(bytes)
ccache := new(credentials.CCache)
err = ccache.Unmarshal(bytes)
if err != nil {
return nil, err
}
Expand All @@ -91,7 +92,8 @@ func createHDFSConfig(hdfsEventSource *v1alpha1.HDFSEventSource) (*HDFSConfig, e
if err != nil {
return nil, err
}
ktb, err := keytab.Parse(bytes)
ktb := new(keytab.Keytab)
err = ktb.Unmarshal(bytes)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -134,25 +136,20 @@ func createHDFSClient(addresses []string, user string, krbOptions *KrbOptions) (
}

func createKrbClient(krbOptions *KrbOptions) (*krb.Client, error) {
krbConfig, err := config.NewConfigFromString(krbOptions.Config)
krbConfig, err := config.NewFromString(krbOptions.Config)
if err != nil {
return nil, err
}

if krbOptions.CCacheOptions != nil {
client, err := krb.NewClientFromCCache(krbOptions.CCacheOptions.CCache)
if err != nil {
return nil, err
}
return client.WithConfig(krbConfig), nil
return krb.NewFromCCache(krbOptions.CCacheOptions.CCache, krbConfig)
} else if krbOptions.KeytabOptions != nil {
client := krb.NewClientWithKeytab(krbOptions.KeytabOptions.Username, krbOptions.KeytabOptions.Realm, krbOptions.KeytabOptions.Keytab)
client = *client.WithConfig(krbConfig)
client := krb.NewWithKeytab(krbOptions.KeytabOptions.Username, krbOptions.KeytabOptions.Realm, krbOptions.KeytabOptions.Keytab, krbConfig)
err = client.Login()
if err != nil {
return nil, err
}
return &client, nil
return client, nil
}

return nil, fmt.Errorf("Failed to get a Kerberos client")
Expand Down
2 changes: 1 addition & 1 deletion eventsources/sources/hdfs/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"strings"
"time"

"github.com/colinmarc/hdfs"
"github.com/colinmarc/hdfs/v2"
"go.uber.org/zap"

"github.com/argoproj/argo-events/common/logging"
Expand Down
Loading

0 comments on commit 8215f19

Please sign in to comment.