Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(eventbus): add exotic jetstream support #2878

Merged
merged 13 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion api/event-bus.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 28 additions & 1 deletion api/event-bus.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions api/jsonschema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,10 @@
"jetstream": {
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.JetStreamBus"
},
"jetstreamExotic": {
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.JetStreamConfig",
"description": "Exotic JetStream"
},
"kafka": {
"$ref": "#/definitions/io.argoproj.eventbus.v1alpha1.KafkaBus",
"description": "Kafka eventbus"
Expand Down
4 changes: 4 additions & 0 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion common/leaderelection/leaderelection.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ func NewElector(ctx context.Context, eventBusConfig eventbusv1alpha1.BusConfig,
case eventBusConfig.NATS != nil:
return newEventBusElector(ctx, eventBusConfig.NATS.Auth, clusterName, clusterSize, eventBusConfig.NATS.URL)
case eventBusConfig.JetStream != nil:
return newEventBusElector(ctx, &eventbusv1alpha1.AuthStrategyBasic, clusterName, clusterSize, eventBusConfig.JetStream.URL)
if eventBusConfig.JetStream.AccessSecret != nil {
return newEventBusElector(ctx, &eventbusv1alpha1.AuthStrategyBasic, clusterName, clusterSize, eventBusConfig.JetStream.URL)
} else {
return newEventBusElector(ctx, &eventbusv1alpha1.AuthStrategyNone, clusterName, clusterSize, eventBusConfig.JetStream.URL)
}
default:
return nil, fmt.Errorf("invalid event bus")
}
Expand Down
2 changes: 2 additions & 0 deletions common/leaderelection/leaderelection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"github.com/argoproj/argo-events/common"
eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
)

var (
configs = []eventbusv1alpha1.BusConfig{
{NATS: &eventbusv1alpha1.NATSConfig{}},
{JetStream: &eventbusv1alpha1.JetStreamConfig{}},
{JetStream: &eventbusv1alpha1.JetStreamConfig{AccessSecret: &v1.SecretKeySelector{}}},
}
)

Expand Down
43 changes: 43 additions & 0 deletions controllers/eventbus/installer/exotic_jetstream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package installer

import (
"context"
"fmt"

"go.uber.org/zap"

"github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
)

// exoticJetStreamInstaller is an inalleration implementation of exotic jetstream config.
type exoticJetStreamInstaller struct {
eventBus *v1alpha1.EventBus

logger *zap.SugaredLogger
}

// NewExoticJetStreamInstaller return a new exoticJetStreamInstaller
func NewExoticJetStreamInstaller(eventBus *v1alpha1.EventBus, logger *zap.SugaredLogger) Installer {
return &exoticJetStreamInstaller{
eventBus: eventBus,
logger: logger.Named("exotic-jetstream"),
}
}

func (i *exoticJetStreamInstaller) Install(ctx context.Context) (*v1alpha1.BusConfig, error) {
JetStreamObj := i.eventBus.Spec.JetStreamExotic
if JetStreamObj == nil {
return nil, fmt.Errorf("invalid request")
}
i.eventBus.Status.MarkDeployed("Skipped", "Skip deployment because of using exotic config.")
i.logger.Info("use exotic config")
busConfig := &v1alpha1.BusConfig{
JetStream: JetStreamObj,
}
return busConfig, nil
}

func (i *exoticJetStreamInstaller) Uninstall(ctx context.Context) error {
i.logger.Info("nothing to uninstall")
return nil
}
50 changes: 50 additions & 0 deletions controllers/eventbus/installer/exotic_jetstream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package installer

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/argoproj/argo-events/common/logging"
"github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
)

var (
testJSExoticURL = "nats://nats:4222"

testJSExoticBus = &v1alpha1.EventBus{
TypeMeta: metav1.TypeMeta{
APIVersion: v1alpha1.SchemeGroupVersion.String(),
Kind: "EventBus",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: testNamespace,
Name: testExoticName,
},
Spec: v1alpha1.EventBusSpec{
JetStreamExotic: &v1alpha1.JetStreamConfig{
URL: testJSExoticURL,
},
},
}
)

func TestInstallationJSExotic(t *testing.T) {
t.Run("installation with exotic jetstream config", func(t *testing.T) {
installer := NewExoticJetStreamInstaller(testJSExoticBus, logging.NewArgoEventsLogger())
conf, err := installer.Install(context.TODO())
assert.NoError(t, err)
assert.NotNil(t, conf.JetStream)
assert.Equal(t, conf.JetStream.URL, testJSExoticURL)
})
}

func TestUninstallationJSExotic(t *testing.T) {
t.Run("uninstallation with exotic jetstream config", func(t *testing.T) {
installer := NewExoticJetStreamInstaller(testJSExoticBus, logging.NewArgoEventsLogger())
err := installer.Uninstall(context.TODO())
assert.NoError(t, err)
})
}
2 changes: 2 additions & 0 deletions controllers/eventbus/installer/installer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ func getInstaller(eventBus *v1alpha1.EventBus, client client.Client, kubeClient
return NewJetStreamInstaller(client, eventBus, config, getLabels(eventBus), kubeClient, logger), nil
} else if kafka := eventBus.Spec.Kafka; kafka != nil {
return NewExoticKafkaInstaller(eventBus, logger), nil
} else if js := eventBus.Spec.JetStreamExotic; js != nil {
return NewExoticJetStreamInstaller(eventBus, logger), nil
}
return nil, fmt.Errorf("invalid eventbus spec")
}
Expand Down
6 changes: 6 additions & 0 deletions controllers/eventbus/installer/installer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ func TestGetInstaller(t *testing.T) {
assert.NotNil(t, installer)
_, ok := installer.(*jetStreamInstaller)
assert.True(t, ok)

installer, err = getInstaller(testJetStreamExoticBus, nil, nil, fakeConfig, zaptest.NewLogger(t).Sugar())
assert.NoError(t, err)
assert.NotNil(t, installer)
_, ok = installer.(*exoticJetStreamInstaller)
assert.True(t, ok)
})
}

Expand Down
16 changes: 16 additions & 0 deletions controllers/eventbus/installer/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,22 @@ var (
},
},
}

testJetStreamExoticBus = &v1alpha1.EventBus{
TypeMeta: metav1.TypeMeta{
APIVersion: v1alpha1.SchemeGroupVersion.String(),
Kind: "EventBus",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: testNamespace,
Name: testName,
},
Spec: v1alpha1.EventBusSpec{
JetStreamExotic: &v1alpha1.JetStreamConfig{
URL: "nats://nats:4222",
},
},
}
)

func TestJetStreamBadInstallation(t *testing.T) {
Expand Down
9 changes: 7 additions & 2 deletions controllers/eventbus/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (

// ValidateEventBus accepts an EventBus and performs validation against it
func ValidateEventBus(eb *v1alpha1.EventBus) error {
if eb.Spec.NATS == nil && eb.Spec.JetStream == nil && eb.Spec.Kafka == nil {
return fmt.Errorf("invalid spec: either \"nats\", \"jetstream\", or \"kafka\" needs to be specified")
if eb.Spec.NATS == nil && eb.Spec.JetStream == nil && eb.Spec.Kafka == nil && eb.Spec.JetStreamExotic == nil {
return fmt.Errorf("invalid spec: either \"nats\", \"jetstream\", \"jetstreamExotic\", or \"kafka\" needs to be specified")
}
if x := eb.Spec.NATS; x != nil {
if x.Native != nil && x.Exotic != nil {
Expand Down Expand Up @@ -41,5 +41,10 @@ func ValidateEventBus(eb *v1alpha1.EventBus) error {
return fmt.Errorf("\"spec.kafka.url\" is missing")
}
}
if x := eb.Spec.JetStreamExotic; x != nil {
if x.URL == "" {
return fmt.Errorf("\"spec.jetstreamExotic.url\" is missing")
}
}
return nil
}
25 changes: 25 additions & 0 deletions controllers/eventbus/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ var (
},
}

testJetStreamExoticBus = &v1alpha1.EventBus{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test-ns",
Name: common.DefaultEventBusName,
},
Spec: v1alpha1.EventBusSpec{
JetStreamExotic: &v1alpha1.JetStreamConfig{
URL: "nats://nats:4222",
},
},
}

testKafkaEventBus = &v1alpha1.EventBus{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test-ns",
Expand Down Expand Up @@ -68,6 +80,11 @@ func TestValidate(t *testing.T) {
assert.NoError(t, err)
})

t.Run("test good js exotic eventbus", func(t *testing.T) {
err := ValidateEventBus(testJetStreamExoticBus)
assert.NoError(t, err)
})

t.Run("test bad eventbus", func(t *testing.T) {
eb := testNatsEventBus.DeepCopy()
eb.Spec.NATS = nil
Expand Down Expand Up @@ -130,4 +147,12 @@ func TestValidate(t *testing.T) {
assert.Error(t, err)
assert.Contains(t, err.Error(), "\"spec.kafka.url\" is missing")
})

t.Run("test exotic js eventbus empty URL", func(t *testing.T) {
eb := testJetStreamExoticBus.DeepCopy()
eb.Spec.JetStreamExotic.URL = ""
err := ValidateEventBus(eb)
assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), "\"spec.jetstreamExotic.url\" is missing"))
})
}
18 changes: 18 additions & 0 deletions docs/eventbus/jetstream.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,21 @@ For Jetstream, TLS is turned on for all client-server communication as well as b
## How it works under the hood

Jetstream has the concept of a Stream, and Subjects (i.e. topics) which are used on a Stream. From the documentation: “Each Stream defines how messages are stored and what the limits (duration, size, interest) of the retention are.” For Argo Events, we have one Stream called "default" with a single set of settings, but we have multiple subjects, each of which is named `default.<eventsourcename>.<eventname>`. Sensors subscribe to the subjects they need using durable consumers.

### Exotic

To use an existing JetStream service, follow the example below.

```yaml
apiVersion: argoproj.io/v1alpha1
kind: EventBus
metadata:
name: default
spec:
jetstreamExotic:
url: nats://xxxxx:xxx
accessSecret:
name: my-secret-name
key: secret-key
streamConfig: ""
```
6 changes: 5 additions & 1 deletion eventbus/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,11 @@ func GetAuth(ctx context.Context, eventBusConfig eventbusv1alpha1.BusConfig) (*e
case eventBusConfig.NATS != nil:
eventBusAuth = eventBusConfig.NATS.Auth
case eventBusConfig.JetStream != nil:
eventBusAuth = &eventbusv1alpha1.AuthStrategyBasic
if eventBusConfig.JetStream.AccessSecret != nil {
eventBusAuth = &eventbusv1alpha1.AuthStrategyBasic
} else {
eventBusAuth = nil
}
case eventBusConfig.Kafka != nil:
eventBusAuth = nil
default:
Expand Down
Loading
Loading