Skip to content

Commit

Permalink
feat(eventbus): add exotic jetstream support (#2878)
Browse files Browse the repository at this point in the history
Signed-off-by: gokulav137 <gokulav999@gmail.com>
Signed-off-by: gokulav137 <52277763+gokulav137@users.noreply.github.com>
  • Loading branch information
gokulav137 authored Oct 25, 2023
1 parent 5b7a0f1 commit 79ea8a6
Show file tree
Hide file tree
Showing 22 changed files with 460 additions and 132 deletions.
31 changes: 30 additions & 1 deletion api/event-bus.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 28 additions & 1 deletion api/event-bus.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions api/jsonschema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,10 @@
"jetstream": {
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.JetStreamBus"
},
"jetstreamExotic": {
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.JetStreamConfig",
"description": "Exotic JetStream"
},
"kafka": {
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.KafkaBus",
"description": "Kafka eventbus"
Expand Down
4 changes: 4 additions & 0 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion common/leaderelection/leaderelection.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ func NewElector(ctx context.Context, eventBusConfig eventbusv1alpha1.BusConfig,
case eventBusConfig.NATS != nil:
return newEventBusElector(ctx, eventBusConfig.NATS.Auth, clusterName, clusterSize, eventBusConfig.NATS.URL)
case eventBusConfig.JetStream != nil:
return newEventBusElector(ctx, &eventbusv1alpha1.AuthStrategyBasic, clusterName, clusterSize, eventBusConfig.JetStream.URL)
if eventBusConfig.JetStream.AccessSecret != nil {
return newEventBusElector(ctx, &eventbusv1alpha1.AuthStrategyBasic, clusterName, clusterSize, eventBusConfig.JetStream.URL)
} else {
return newEventBusElector(ctx, &eventbusv1alpha1.AuthStrategyNone, clusterName, clusterSize, eventBusConfig.JetStream.URL)
}
default:
return nil, fmt.Errorf("invalid event bus")
}
Expand Down
2 changes: 2 additions & 0 deletions common/leaderelection/leaderelection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"github.com/argoproj/argo-events/common"
eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
)

var (
configs = []eventbusv1alpha1.BusConfig{
{NATS: &eventbusv1alpha1.NATSConfig{}},
{JetStream: &eventbusv1alpha1.JetStreamConfig{}},
{JetStream: &eventbusv1alpha1.JetStreamConfig{AccessSecret: &v1.SecretKeySelector{}}},
}
)

Expand Down
43 changes: 43 additions & 0 deletions controllers/eventbus/installer/exotic_jetstream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package installer

import (
"context"
"fmt"

"go.uber.org/zap"

"github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
)

// exoticJetStreamInstaller is an inalleration implementation of exotic jetstream config.
type exoticJetStreamInstaller struct {
eventBus *v1alpha1.EventBus

logger *zap.SugaredLogger
}

// NewExoticJetStreamInstaller return a new exoticJetStreamInstaller
func NewExoticJetStreamInstaller(eventBus *v1alpha1.EventBus, logger *zap.SugaredLogger) Installer {
return &exoticJetStreamInstaller{
eventBus: eventBus,
logger: logger.Named("exotic-jetstream"),
}
}

func (i *exoticJetStreamInstaller) Install(ctx context.Context) (*v1alpha1.BusConfig, error) {
JetStreamObj := i.eventBus.Spec.JetStreamExotic
if JetStreamObj == nil {
return nil, fmt.Errorf("invalid request")
}
i.eventBus.Status.MarkDeployed("Skipped", "Skip deployment because of using exotic config.")
i.logger.Info("use exotic config")
busConfig := &v1alpha1.BusConfig{
JetStream: JetStreamObj,
}
return busConfig, nil
}

func (i *exoticJetStreamInstaller) Uninstall(ctx context.Context) error {
i.logger.Info("nothing to uninstall")
return nil
}
50 changes: 50 additions & 0 deletions controllers/eventbus/installer/exotic_jetstream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package installer

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/argoproj/argo-events/common/logging"
"github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
)

var (
testJSExoticURL = "nats://nats:4222"

testJSExoticBus = &v1alpha1.EventBus{
TypeMeta: metav1.TypeMeta{
APIVersion: v1alpha1.SchemeGroupVersion.String(),
Kind: "EventBus",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: testNamespace,
Name: testExoticName,
},
Spec: v1alpha1.EventBusSpec{
JetStreamExotic: &v1alpha1.JetStreamConfig{
URL: testJSExoticURL,
},
},
}
)

func TestInstallationJSExotic(t *testing.T) {
t.Run("installation with exotic jetstream config", func(t *testing.T) {
installer := NewExoticJetStreamInstaller(testJSExoticBus, logging.NewArgoEventsLogger())
conf, err := installer.Install(context.TODO())
assert.NoError(t, err)
assert.NotNil(t, conf.JetStream)
assert.Equal(t, conf.JetStream.URL, testJSExoticURL)
})
}

func TestUninstallationJSExotic(t *testing.T) {
t.Run("uninstallation with exotic jetstream config", func(t *testing.T) {
installer := NewExoticJetStreamInstaller(testJSExoticBus, logging.NewArgoEventsLogger())
err := installer.Uninstall(context.TODO())
assert.NoError(t, err)
})
}
2 changes: 2 additions & 0 deletions controllers/eventbus/installer/installer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ func getInstaller(eventBus *v1alpha1.EventBus, client client.Client, kubeClient
return NewJetStreamInstaller(client, eventBus, config, getLabels(eventBus), kubeClient, logger), nil
} else if kafka := eventBus.Spec.Kafka; kafka != nil {
return NewExoticKafkaInstaller(eventBus, logger), nil
} else if js := eventBus.Spec.JetStreamExotic; js != nil {
return NewExoticJetStreamInstaller(eventBus, logger), nil
}
return nil, fmt.Errorf("invalid eventbus spec")
}
Expand Down
6 changes: 6 additions & 0 deletions controllers/eventbus/installer/installer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ func TestGetInstaller(t *testing.T) {
assert.NotNil(t, installer)
_, ok := installer.(*jetStreamInstaller)
assert.True(t, ok)

installer, err = getInstaller(testJetStreamExoticBus, nil, nil, fakeConfig, zaptest.NewLogger(t).Sugar())
assert.NoError(t, err)
assert.NotNil(t, installer)
_, ok = installer.(*exoticJetStreamInstaller)
assert.True(t, ok)
})
}

Expand Down
16 changes: 16 additions & 0 deletions controllers/eventbus/installer/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,22 @@ var (
},
},
}

testJetStreamExoticBus = &v1alpha1.EventBus{
TypeMeta: metav1.TypeMeta{
APIVersion: v1alpha1.SchemeGroupVersion.String(),
Kind: "EventBus",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: testNamespace,
Name: testName,
},
Spec: v1alpha1.EventBusSpec{
JetStreamExotic: &v1alpha1.JetStreamConfig{
URL: "nats://nats:4222",
},
},
}
)

func TestJetStreamBadInstallation(t *testing.T) {
Expand Down
9 changes: 7 additions & 2 deletions controllers/eventbus/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (

// ValidateEventBus accepts an EventBus and performs validation against it
func ValidateEventBus(eb *v1alpha1.EventBus) error {
if eb.Spec.NATS == nil && eb.Spec.JetStream == nil && eb.Spec.Kafka == nil {
return fmt.Errorf("invalid spec: either \"nats\", \"jetstream\", or \"kafka\" needs to be specified")
if eb.Spec.NATS == nil && eb.Spec.JetStream == nil && eb.Spec.Kafka == nil && eb.Spec.JetStreamExotic == nil {
return fmt.Errorf("invalid spec: either \"nats\", \"jetstream\", \"jetstreamExotic\", or \"kafka\" needs to be specified")
}
if x := eb.Spec.NATS; x != nil {
if x.Native != nil && x.Exotic != nil {
Expand Down Expand Up @@ -41,5 +41,10 @@ func ValidateEventBus(eb *v1alpha1.EventBus) error {
return fmt.Errorf("\"spec.kafka.url\" is missing")
}
}
if x := eb.Spec.JetStreamExotic; x != nil {
if x.URL == "" {
return fmt.Errorf("\"spec.jetstreamExotic.url\" is missing")
}
}
return nil
}
25 changes: 25 additions & 0 deletions controllers/eventbus/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ var (
},
}

testJetStreamExoticBus = &v1alpha1.EventBus{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test-ns",
Name: common.DefaultEventBusName,
},
Spec: v1alpha1.EventBusSpec{
JetStreamExotic: &v1alpha1.JetStreamConfig{
URL: "nats://nats:4222",
},
},
}

testKafkaEventBus = &v1alpha1.EventBus{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test-ns",
Expand Down Expand Up @@ -68,6 +80,11 @@ func TestValidate(t *testing.T) {
assert.NoError(t, err)
})

t.Run("test good js exotic eventbus", func(t *testing.T) {
err := ValidateEventBus(testJetStreamExoticBus)
assert.NoError(t, err)
})

t.Run("test bad eventbus", func(t *testing.T) {
eb := testNatsEventBus.DeepCopy()
eb.Spec.NATS = nil
Expand Down Expand Up @@ -130,4 +147,12 @@ func TestValidate(t *testing.T) {
assert.Error(t, err)
assert.Contains(t, err.Error(), "\"spec.kafka.url\" is missing")
})

t.Run("test exotic js eventbus empty URL", func(t *testing.T) {
eb := testJetStreamExoticBus.DeepCopy()
eb.Spec.JetStreamExotic.URL = ""
err := ValidateEventBus(eb)
assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), "\"spec.jetstreamExotic.url\" is missing"))
})
}
18 changes: 18 additions & 0 deletions docs/eventbus/jetstream.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,21 @@ For Jetstream, TLS is turned on for all client-server communication as well as b
## How it works under the hood

Jetstream has the concept of a Stream, and Subjects (i.e. topics) which are used on a Stream. From the documentation: “Each Stream defines how messages are stored and what the limits (duration, size, interest) of the retention are.” For Argo Events, we have one Stream called "default" with a single set of settings, but we have multiple subjects, each of which is named `default.<eventsourcename>.<eventname>`. Sensors subscribe to the subjects they need using durable consumers.

### Exotic

To use an existing JetStream service, follow the example below.

```yaml
apiVersion: argoproj.io/v1alpha1
kind: EventBus
metadata:
name: default
spec:
jetstreamExotic:
url: nats://xxxxx:xxx
accessSecret:
name: my-secret-name
key: secret-key
streamConfig: ""
```
6 changes: 5 additions & 1 deletion eventbus/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,11 @@ func GetAuth(ctx context.Context, eventBusConfig eventbusv1alpha1.BusConfig) (*e
case eventBusConfig.NATS != nil:
eventBusAuth = eventBusConfig.NATS.Auth
case eventBusConfig.JetStream != nil:
eventBusAuth = &eventbusv1alpha1.AuthStrategyBasic
if eventBusConfig.JetStream.AccessSecret != nil {
eventBusAuth = &eventbusv1alpha1.AuthStrategyBasic
} else {
eventBusAuth = nil
}
case eventBusConfig.Kafka != nil:
eventBusAuth = nil
default:
Expand Down
Loading

0 comments on commit 79ea8a6

Please sign in to comment.