Skip to content

Commit

Permalink
add zmq (#66)
Browse files Browse the repository at this point in the history
  • Loading branch information
jordan-bonecutter authored Oct 30, 2023
1 parent 6349f27 commit d3cd146
Show file tree
Hide file tree
Showing 18 changed files with 450 additions and 8 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ jobs:
make format
git diff --exit-code
- name: Install ZMQ
run: sudo apt update -y && sudo apt upgrade -y && sudo apt install libsodium-dev libzmq3-dev

- name: Test
run: make test

Expand Down
21 changes: 20 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,15 +1,34 @@
# Start by building the application.
FROM golang:1.20.5-bullseye as build

# build libsodium (dep of libzmq)
WORKDIR /build
RUN wget https://github.com/jedisct1/libsodium/releases/download/1.0.19-RELEASE/libsodium-1.0.19.tar.gz
RUN tar -xzvf libsodium-1.0.19.tar.gz
WORKDIR /build/libsodium-stable
RUN ./configure --disable-shared --enable-static
RUN make -j`nproc`
RUN make install

# build libzmq (dep of zmq datastore)
WORKDIR /build
RUN wget https://github.com/zeromq/libzmq/releases/download/v4.3.4/zeromq-4.3.4.tar.gz
RUN tar -xvf zeromq-4.3.4.tar.gz
WORKDIR /build/zeromq-4.3.4
RUN ./configure --enable-static --disable-shared --disable-Werror
RUN make -j`nproc`
RUN make install

WORKDIR /go/src/fleet-telemetry

COPY . .
ENV CGO_ENABLED=1
ENV CGO_LDFLAGS="-lstdc++"

RUN make

# hadolint ignore=DL3006
FROM gcr.io/distroless/base-debian11
FROM gcr.io/distroless/cc-debian11
WORKDIR /
COPY --from=build /go/bin/fleet-telemetry /

Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.integration
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# start the fleet-telemetry server
# hadolint ignore=DL3006
FROM gcr.io/distroless/base-debian11
FROM gcr.io/distroless/cc-debian11
WORKDIR /

COPY --from=fleet-telemetry-integration-tests /go/bin/fleet-telemetry /
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ ALPHA_IMAGE_NAME=fleet-telemetry-server-aplha:v0.0.1
ALPHA_IMAGE_COMPRESSED_FILENAME := $(subst :,-, $(ALPHA_IMAGE_NAME))

GO_FLAGS ?=
GO_FLAGS += --ldflags 'extldflags="-static"'

ifneq (,$(findstring darwin/arm,$(VERSION)))
GO_FLAGS += -tags dynamic
Expand Down
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ The following [dispatchers](./telemetry/producer.go#L10-L19) are supported
* Configure stream names directly by setting the streams config `"kinesis": { "streams": { *topic_name*: stream_name } }`
* Override stream names with env variables: KINESIS_STREAM_\*uppercase topic\* ex.: `KINESIS_STREAM_V`
* Google pubsub: Along with the required pubsub config (See ./test/integration/config.json for example), be sure to set the environment variable `GOOGLE_APPLICATION_CREDENTIALS`
* ZMQ: Configure with the config.json file. See implementation here: [config/config.go](./config/config.go)
* Logger: This is a simple STDOUT logger that serializes the protos to json.

>NOTE: To add a new dispatcher, please provide integration tests and updated documentation.
Expand Down Expand Up @@ -197,6 +198,15 @@ make: *** [install] Error 1
```
A reference to libcrypto is not set properly. To resolve find the reference to libcrypto by pkgconfig and set et the PKG_CONFIG_PATH accordingly.

libzmq is missing. Install with:
```sh
sudo apt install -y libsodium-dev libzmq3-dev
```
Or for macOS:
```sh
brew install libsodium zmq
```

## Integration Tests
(Optional): If you want to recreate fake certs for your test: `make generate-certs`

Expand Down
15 changes: 15 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/teslamotors/fleet-telemetry/datastore/kafka"
"github.com/teslamotors/fleet-telemetry/datastore/kinesis"
"github.com/teslamotors/fleet-telemetry/datastore/simple"
"github.com/teslamotors/fleet-telemetry/datastore/zmq"
"github.com/teslamotors/fleet-telemetry/metrics"
"github.com/teslamotors/fleet-telemetry/telemetry"
)
Expand Down Expand Up @@ -61,6 +62,9 @@ type Config struct {
// Pubsub is a configuration for the Google Pubsub
Pubsub *Pubsub `json:"pubsub,omitempty"`

// ZMQ configures a zeromq socket
ZMQ *zmq.Config `json:"zmq,omitempty"`

// Namespace defines a prefix for the kafka/pubsub topic
Namespace string `json:"namespace,omitempty"`

Expand Down Expand Up @@ -236,6 +240,17 @@ func (c *Config) ConfigureProducers(logger *logrus.Logger) (map[string][]telemet
producers[telemetry.Kinesis] = kinesis
}

if _, ok := requiredDispatchers[telemetry.ZMQ]; ok {
if c.ZMQ == nil {
return nil, errors.New("Expected ZMQ to be configured")
}
zmqProducer, err := zmq.NewProducer(context.Background(), c.ZMQ, c.MetricCollector, c.Namespace, logger)
if err != nil {
return nil, err
}
producers[telemetry.ZMQ] = zmqProducer
}

dispatchProducerRules := make(map[string][]telemetry.Producer)
for recordName, dispatchRules := range c.Records {
var dispatchFuncs []telemetry.Producer
Expand Down
59 changes: 55 additions & 4 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import (

var _ = Describe("Test full application config", func() {

var config *Config
var (
config *Config
producers map[string][]telemetry.Producer
)

BeforeEach(func() {
config = &Config{
Expand All @@ -43,6 +46,21 @@ var _ = Describe("Test full application config", func() {
}
})

AfterEach(func() {
os.Clearenv()
type Closer interface {
Close() error
}
for _, typeProducers := range producers {
for _, producer := range typeProducers {
if closer, ok := producer.(Closer); ok {
err := closer.Close()
Expect(err).NotTo(HaveOccurred())
}
}
}
})

Context("ExtractServiceTLSConfig", func() {
It("fails when TLS is nil ", func() {
config = &Config{}
Expand Down Expand Up @@ -104,7 +122,7 @@ var _ = Describe("Test full application config", func() {
config, err := loadTestApplicationConfig(TestSmallConfig)
Expect(err).NotTo(HaveOccurred())

producers, err := config.ConfigureProducers(log)
producers, err = config.ConfigureProducers(log)
Expect(err).NotTo(HaveOccurred())
Expect(producers["FS"]).To(HaveLen(1))

Expand All @@ -119,7 +137,8 @@ var _ = Describe("Test full application config", func() {
log, _ := test.NewNullLogger()
config.Records = map[string][]telemetry.Dispatcher{"FS": {"kinesis"}}

producers, err := config.ConfigureProducers(log)
var err error
producers, err = config.ConfigureProducers(log)
Expect(err).To(MatchError("Expected Kinesis to be configured"))
Expect(producers).To(BeNil())
})
Expand Down Expand Up @@ -165,7 +184,39 @@ var _ = Describe("Test full application config", func() {
It("pubsub config works", func() {
log, _ := test.NewNullLogger()
_ = os.Setenv("PUBSUB_EMULATOR_HOST", "some_url")
producers, err := pubsubConfig.ConfigureProducers(log)
var err error
producers, err = pubsubConfig.ConfigureProducers(log)
Expect(err).NotTo(HaveOccurred())
Expect(producers["FS"]).NotTo(BeNil())
})
})

Context("configure zmq", func() {
var zmqConfig *Config

BeforeEach(func() {
var err error
zmqConfig, err = loadTestApplicationConfig(TestZMQConfig)
Expect(err).NotTo(HaveOccurred())
})

It("returns an error if zmq isn't included", func() {
log, _ := test.NewNullLogger()
config.Records = map[string][]telemetry.Dispatcher{"FS": {"zmq"}}
var err error
producers, err = config.ConfigureProducers(log)
Expect(err).To(MatchError("Expected ZMQ to be configured"))
Expect(producers).To(BeNil())
producers, err = zmqConfig.ConfigureProducers(log)
Expect(err).To(BeNil())
})

It("zmq config works", func() {
// ZMQ close is async, this removes the need to sync between tests.
zmqConfig.ZMQ.Addr = "tcp://127.0.0.1:5285"
log, _ := test.NewNullLogger()
var err error
producers, err = zmqConfig.ConfigureProducers(log)
Expect(err).NotTo(HaveOccurred())
Expect(producers["FS"]).NotTo(BeNil())
})
Expand Down
14 changes: 14 additions & 0 deletions config/test_configs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,17 @@ const BadTopicConfig = `
"host": "127.0.0.1",
"port": "",
}`

const TestZMQConfig = `
{
"host": "127.0.0.1",
"port": 443,
"status_port": 8080,
"zmq": {
"addr": "tcp://127.0.0.1:5288"
},
"records": {
"FS": ["zmq"]
}
}
`
Loading

0 comments on commit d3cd146

Please sign in to comment.