From d3cd146f82df2070b61a362dae122039aaa23ddb Mon Sep 17 00:00:00 2001 From: Jordan Bonecutter Date: Mon, 30 Oct 2023 14:54:57 -0700 Subject: [PATCH] add zmq (#66) --- .github/workflows/build.yml | 3 + Dockerfile | 21 ++- Dockerfile.integration | 2 +- Makefile | 1 + README.md | 10 ++ config/config.go | 15 ++ config/config_test.go | 59 ++++++- config/test_configs_test.go | 14 ++ datastore/zmq/zmq.go | 211 ++++++++++++++++++++++++++ docker-compose.yml | 1 + go.mod | 1 + go.sum | 2 + server/streaming/server_test.go | 17 +++ telemetry/producer.go | 2 + test/integration/Dockerfile | 22 ++- test/integration/config.json | 6 +- test/integration/integration_test.go | 17 +++ test/integration/zmq_consumer_test.go | 54 +++++++ 18 files changed, 450 insertions(+), 8 deletions(-) create mode 100644 datastore/zmq/zmq.go create mode 100644 test/integration/zmq_consumer_test.go diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 5211fdd..d3abbe0 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -22,6 +22,9 @@ jobs: make format git diff --exit-code + - name: Install ZMQ + run: sudo apt update -y && sudo apt upgrade -y && sudo apt install libsodium-dev libzmq3-dev + - name: Test run: make test diff --git a/Dockerfile b/Dockerfile index c89b153..be0d2be 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,15 +1,34 @@ # Start by building the application. FROM golang:1.20.5-bullseye as build +# build libsodium (dep of libzmq) +WORKDIR /build +RUN wget https://github.com/jedisct1/libsodium/releases/download/1.0.19-RELEASE/libsodium-1.0.19.tar.gz +RUN tar -xzvf libsodium-1.0.19.tar.gz +WORKDIR /build/libsodium-stable +RUN ./configure --disable-shared --enable-static +RUN make -j`nproc` +RUN make install + +# build libzmq (dep of zmq datastore) +WORKDIR /build +RUN wget https://github.com/zeromq/libzmq/releases/download/v4.3.4/zeromq-4.3.4.tar.gz +RUN tar -xvf zeromq-4.3.4.tar.gz +WORKDIR /build/zeromq-4.3.4 +RUN ./configure --enable-static --disable-shared --disable-Werror +RUN make -j`nproc` +RUN make install + WORKDIR /go/src/fleet-telemetry COPY . . ENV CGO_ENABLED=1 +ENV CGO_LDFLAGS="-lstdc++" RUN make # hadolint ignore=DL3006 -FROM gcr.io/distroless/base-debian11 +FROM gcr.io/distroless/cc-debian11 WORKDIR / COPY --from=build /go/bin/fleet-telemetry / diff --git a/Dockerfile.integration b/Dockerfile.integration index 54e1070..1ae2c47 100644 --- a/Dockerfile.integration +++ b/Dockerfile.integration @@ -1,6 +1,6 @@ # start the fleet-telemetry server # hadolint ignore=DL3006 -FROM gcr.io/distroless/base-debian11 +FROM gcr.io/distroless/cc-debian11 WORKDIR / COPY --from=fleet-telemetry-integration-tests /go/bin/fleet-telemetry / diff --git a/Makefile b/Makefile index 9548297..918400f 100644 --- a/Makefile +++ b/Makefile @@ -7,6 +7,7 @@ ALPHA_IMAGE_NAME=fleet-telemetry-server-aplha:v0.0.1 ALPHA_IMAGE_COMPRESSED_FILENAME := $(subst :,-, $(ALPHA_IMAGE_NAME)) GO_FLAGS ?= +GO_FLAGS += --ldflags 'extldflags="-static"' ifneq (,$(findstring darwin/arm,$(VERSION))) GO_FLAGS += -tags dynamic diff --git a/README.md b/README.md index de861ee..c414b33 100644 --- a/README.md +++ b/README.md @@ -142,6 +142,7 @@ The following [dispatchers](./telemetry/producer.go#L10-L19) are supported * Configure stream names directly by setting the streams config `"kinesis": { "streams": { *topic_name*: stream_name } }` * Override stream names with env variables: KINESIS_STREAM_\*uppercase topic\* ex.: `KINESIS_STREAM_V` * Google pubsub: Along with the required pubsub config (See ./test/integration/config.json for example), be sure to set the environment variable `GOOGLE_APPLICATION_CREDENTIALS` +* ZMQ: Configure with the config.json file. See implementation here: [config/config.go](./config/config.go) * Logger: This is a simple STDOUT logger that serializes the protos to json. >NOTE: To add a new dispatcher, please provide integration tests and updated documentation. @@ -197,6 +198,15 @@ make: *** [install] Error 1 ``` A reference to libcrypto is not set properly. To resolve find the reference to libcrypto by pkgconfig and set et the PKG_CONFIG_PATH accordingly. +libzmq is missing. Install with: +```sh +sudo apt install -y libsodium-dev libzmq3-dev +``` +Or for macOS: +```sh +brew install libsodium zmq +``` + ## Integration Tests (Optional): If you want to recreate fake certs for your test: `make generate-certs` diff --git a/config/config.go b/config/config.go index 97abcf6..95e8a2c 100644 --- a/config/config.go +++ b/config/config.go @@ -20,6 +20,7 @@ import ( "github.com/teslamotors/fleet-telemetry/datastore/kafka" "github.com/teslamotors/fleet-telemetry/datastore/kinesis" "github.com/teslamotors/fleet-telemetry/datastore/simple" + "github.com/teslamotors/fleet-telemetry/datastore/zmq" "github.com/teslamotors/fleet-telemetry/metrics" "github.com/teslamotors/fleet-telemetry/telemetry" ) @@ -61,6 +62,9 @@ type Config struct { // Pubsub is a configuration for the Google Pubsub Pubsub *Pubsub `json:"pubsub,omitempty"` + // ZMQ configures a zeromq socket + ZMQ *zmq.Config `json:"zmq,omitempty"` + // Namespace defines a prefix for the kafka/pubsub topic Namespace string `json:"namespace,omitempty"` @@ -236,6 +240,17 @@ func (c *Config) ConfigureProducers(logger *logrus.Logger) (map[string][]telemet producers[telemetry.Kinesis] = kinesis } + if _, ok := requiredDispatchers[telemetry.ZMQ]; ok { + if c.ZMQ == nil { + return nil, errors.New("Expected ZMQ to be configured") + } + zmqProducer, err := zmq.NewProducer(context.Background(), c.ZMQ, c.MetricCollector, c.Namespace, logger) + if err != nil { + return nil, err + } + producers[telemetry.ZMQ] = zmqProducer + } + dispatchProducerRules := make(map[string][]telemetry.Producer) for recordName, dispatchRules := range c.Records { var dispatchFuncs []telemetry.Producer diff --git a/config/config_test.go b/config/config_test.go index eebc0a1..d85098a 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -18,7 +18,10 @@ import ( var _ = Describe("Test full application config", func() { - var config *Config + var ( + config *Config + producers map[string][]telemetry.Producer + ) BeforeEach(func() { config = &Config{ @@ -43,6 +46,21 @@ var _ = Describe("Test full application config", func() { } }) + AfterEach(func() { + os.Clearenv() + type Closer interface { + Close() error + } + for _, typeProducers := range producers { + for _, producer := range typeProducers { + if closer, ok := producer.(Closer); ok { + err := closer.Close() + Expect(err).NotTo(HaveOccurred()) + } + } + } + }) + Context("ExtractServiceTLSConfig", func() { It("fails when TLS is nil ", func() { config = &Config{} @@ -104,7 +122,7 @@ var _ = Describe("Test full application config", func() { config, err := loadTestApplicationConfig(TestSmallConfig) Expect(err).NotTo(HaveOccurred()) - producers, err := config.ConfigureProducers(log) + producers, err = config.ConfigureProducers(log) Expect(err).NotTo(HaveOccurred()) Expect(producers["FS"]).To(HaveLen(1)) @@ -119,7 +137,8 @@ var _ = Describe("Test full application config", func() { log, _ := test.NewNullLogger() config.Records = map[string][]telemetry.Dispatcher{"FS": {"kinesis"}} - producers, err := config.ConfigureProducers(log) + var err error + producers, err = config.ConfigureProducers(log) Expect(err).To(MatchError("Expected Kinesis to be configured")) Expect(producers).To(BeNil()) }) @@ -165,7 +184,39 @@ var _ = Describe("Test full application config", func() { It("pubsub config works", func() { log, _ := test.NewNullLogger() _ = os.Setenv("PUBSUB_EMULATOR_HOST", "some_url") - producers, err := pubsubConfig.ConfigureProducers(log) + var err error + producers, err = pubsubConfig.ConfigureProducers(log) + Expect(err).NotTo(HaveOccurred()) + Expect(producers["FS"]).NotTo(BeNil()) + }) + }) + + Context("configure zmq", func() { + var zmqConfig *Config + + BeforeEach(func() { + var err error + zmqConfig, err = loadTestApplicationConfig(TestZMQConfig) + Expect(err).NotTo(HaveOccurred()) + }) + + It("returns an error if zmq isn't included", func() { + log, _ := test.NewNullLogger() + config.Records = map[string][]telemetry.Dispatcher{"FS": {"zmq"}} + var err error + producers, err = config.ConfigureProducers(log) + Expect(err).To(MatchError("Expected ZMQ to be configured")) + Expect(producers).To(BeNil()) + producers, err = zmqConfig.ConfigureProducers(log) + Expect(err).To(BeNil()) + }) + + It("zmq config works", func() { + // ZMQ close is async, this removes the need to sync between tests. + zmqConfig.ZMQ.Addr = "tcp://127.0.0.1:5285" + log, _ := test.NewNullLogger() + var err error + producers, err = zmqConfig.ConfigureProducers(log) Expect(err).NotTo(HaveOccurred()) Expect(producers["FS"]).NotTo(BeNil()) }) diff --git a/config/test_configs_test.go b/config/test_configs_test.go index 1b90534..13add27 100644 --- a/config/test_configs_test.go +++ b/config/test_configs_test.go @@ -81,3 +81,17 @@ const BadTopicConfig = ` "host": "127.0.0.1", "port": "", }` + +const TestZMQConfig = ` +{ + "host": "127.0.0.1", + "port": 443, + "status_port": 8080, + "zmq": { + "addr": "tcp://127.0.0.1:5288" + }, + "records": { + "FS": ["zmq"] + } +} +` diff --git a/datastore/zmq/zmq.go b/datastore/zmq/zmq.go new file mode 100644 index 0000000..602f805 --- /dev/null +++ b/datastore/zmq/zmq.go @@ -0,0 +1,211 @@ +package zmq + +import ( + "context" + "encoding/json" + "os" + "sync" + + "github.com/pebbe/zmq4" + "github.com/sirupsen/logrus" + "github.com/teslamotors/fleet-telemetry/metrics" + "github.com/teslamotors/fleet-telemetry/metrics/adapter" + "github.com/teslamotors/fleet-telemetry/telemetry" +) + +// Config contains the data necessary to configure a zmq producer. +type Config struct { + // Addr is the address to which to producer will attempt to bind. + Addr string `json:"addr"` + + // ServerKeyJSONPath is the path to a file which contains the server's secret + // key as a json. This key can be generated using zmq4.NewCurveKeypair + ServerKeyJSONPath string `json:"server_key_json_path"` + + // AllowedPublicKeysJSONPath is the path to a file which contains a list of + // allowed public keys for client connections. This field is optional. + AllowedPublicKeysJSONPath string `json:"allowed_public_keys_json_path"` + + // Verbose controls if verbose logging is enabled for the socket. + Verbose bool `json:"verbose"` +} + +// KeyJSON contains z85 key data +type KeyJSON struct { + // Secret is the secret key encoded as a 40 char z85 string. + Secret string `json:"secret"` + + // Public is the public key encoded as a 40 char z85 string. + Public string `json:"public"` +} + +// Metrics stores metrics reported from this package +type Metrics struct { + errorCount adapter.Counter + publishCount adapter.Counter + byteTotal adapter.Counter +} + +var ( + metricsRegistry Metrics + metricsOnce sync.Once +) + +// MonitorSocketAddr is the address at which the socket monitor connects to the +// ZMQ publisher socket. +const MonitorSocketAddr = "inproc://zmq_socket_monitor.rep" + +// ZMQProducer implements the telemetry.Producer interface by publishing to a +// bound zmq socket. +type ZMQProducer struct { + namespace string + ctx context.Context + sock *zmq4.Socket + logger *logrus.Logger +} + +// Publish the record to the socket. +func (p *ZMQProducer) Produce(rec *telemetry.Record) { + if p.ctx.Err() != nil { + return + } + if nBytes, err := p.sock.SendMessage(telemetry.BuildTopicName(p.namespace, rec.TxType), rec.Payload()); err != nil { + metricsRegistry.errorCount.Inc(map[string]string{"record_type": rec.TxType}) + p.logger.Errorf("Failed sending log on zmq socket: %s", err.Error()) + } else { + metricsRegistry.byteTotal.Add(int64(nBytes), map[string]string{"record_type": rec.TxType}) + metricsRegistry.publishCount.Inc(map[string]string{"record_type": rec.TxType}) + } +} + +// Close the underlying socket. +func (p *ZMQProducer) Close() error { + if p.sock != nil { + if err := p.sock.Close(); err != nil { + return err + } + } + p.sock = nil + return nil +} + +// NewProducer creates a ZMQProducer with the given config. +func NewProducer(ctx context.Context, config *Config, metrics metrics.MetricCollector, namespace string, logger *logrus.Logger) (producer telemetry.Producer, err error) { + registerMetricsOnce(metrics) + sock, err := zmq4.NewSocket(zmq4.PUB) + if err != nil { + return + } + + if config.Verbose { + ready := make(chan struct{}) + if err = logSocketInBackground(sock, logger, MonitorSocketAddr, ready, ctx); err != nil { + return + } + <-ready + } + + if config.ServerKeyJSONPath != "" { + var fi *os.File + fi, err = os.Open(config.ServerKeyJSONPath) + if err != nil { + return nil, err + } + defer fi.Close() + key := KeyJSON{} + if err = json.NewDecoder(fi).Decode(&key); err != nil { + return nil, err + } + + if len(config.AllowedPublicKeysJSONPath) > 0 { + fi2, err := os.Open(config.AllowedPublicKeysJSONPath) + if err != nil { + return nil, err + } + defer fi2.Close() + + var keys []string + if err = json.NewDecoder(fi2).Decode(&keys); err != nil { + return nil, err + } + + zmq4.AuthCurveAdd("*", keys...) + } + + if err = zmq4.AuthStart(); err != nil { + return + } + + if err = sock.ServerAuthCurve("*", key.Secret); err != nil { + return + } + } + + if err = sock.Bind(config.Addr); err != nil { + return + } + + return &ZMQProducer{ + namespace, ctx, sock, logger, + }, nil +} + +// logSocketInBackground logs the socket activity in the background. +func logSocketInBackground(target *zmq4.Socket, logger *logrus.Logger, addr string, ready chan<- struct{}, ctx context.Context) error { + if err := target.Monitor(addr, zmq4.EVENT_ALL); err != nil { + return err + } + + monitor, err := zmq4.NewSocket(zmq4.PAIR) + if err != nil { + return err + } + + if err := monitor.Connect(addr); err != nil { + return err + } + + go func() { + ready <- struct{}{} + defer monitor.Close() + for { + if ctx.Err() != nil { + return + } + + eventType, addr, value, err := monitor.RecvEvent(0) + if err != nil { + logger.Errorf("Failed to receive event on zmq socket: %s", err) + continue + } + + logger.Debugf("ZMQ socket event: %v %v %v", eventType, addr, value) + } + }() + + return nil +} + +func registerMetrics(metricsCollector metrics.MetricCollector) { + metricsRegistry.errorCount = metricsCollector.RegisterCounter(adapter.CollectorOptions{ + Name: "zmq_err", + Help: "The number of errors while producing to ZMQ.", + Labels: []string{"record_type"}, + }) + + metricsRegistry.publishCount = metricsCollector.RegisterCounter(adapter.CollectorOptions{ + Name: "zmq_publish_total", + Help: "The number of messages published to ZMQ.", + Labels: []string{"record_type"}, + }) + + metricsRegistry.byteTotal = metricsCollector.RegisterCounter(adapter.CollectorOptions{ + Name: "zmq_publish_total_bytes", + Help: "The number of bytes published to ZMQ.", + Labels: []string{"record_type"}, + }) +} + +func registerMetricsOnce(metricsCollector metrics.MetricCollector) { + metricsOnce.Do(func() { registerMetrics(metricsCollector) }) +} diff --git a/docker-compose.yml b/docker-compose.yml index 51bfd14..f93b86b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,6 +14,7 @@ services: ports: - "4443:4443" - "4269:4269" + - "5284:5284" - "8080:8080" - "9090:9090" environment: diff --git a/go.mod b/go.mod index 0978d49..d1f2c17 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,7 @@ require ( github.com/googleapis/gax-go/v2 v2.8.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect + github.com/pebbe/zmq4 v1.2.10 // indirect github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.42.0 // indirect github.com/prometheus/procfs v0.9.0 // indirect diff --git a/go.sum b/go.sum index dadc841..ed2b788 100644 --- a/go.sum +++ b/go.sum @@ -148,6 +148,8 @@ github.com/onsi/ginkgo/v2 v2.4.0 h1:+Ig9nvqgS5OBSACXNk15PLdp0U9XPYROt9CFzVdFGIs= github.com/onsi/ginkgo/v2 v2.4.0/go.mod h1:iHkDK1fKGcBoEHT5W7YBq4RFWaQulw+caOMkAt4OrFo= github.com/onsi/gomega v1.24.0 h1:+0glovB9Jd6z3VR+ScSwQqXVTIfJcGA9UBM8yzQxhqg= github.com/onsi/gomega v1.24.0/go.mod h1:Z/NWtiqwBrwUt4/2loMmHL63EDLnYHmVbuBpDr2vQAg= +github.com/pebbe/zmq4 v1.2.10 h1:wQkqRZ3CZeABIeidr3e8uQZMMH5YAykA/WN0L5zkd1c= +github.com/pebbe/zmq4 v1.2.10/go.mod h1:nqnPueOapVhE2wItZ0uOErngczsJdLOGkebMxaO8r48= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/server/streaming/server_test.go b/server/streaming/server_test.go index 562960d..c836a38 100644 --- a/server/streaming/server_test.go +++ b/server/streaming/server_test.go @@ -19,6 +19,23 @@ import ( var _ = Describe("Socket handler test", func() { + var producerRules map[string][]telemetry.Producer + + AfterEach(func() { + type Closer interface { + Close() error + } + + for _, typeProducers := range producerRules { + for _, producer := range typeProducers { + if closer, ok := producer.(Closer); ok { + err := closer.Close() + Expect(err).NotTo(HaveOccurred()) + } + } + } + }) + It("ServeBinaryWs test", func() { logger, hook := test.NewNullLogger() conf := &config.Config{ diff --git a/telemetry/producer.go b/telemetry/producer.go index b53506d..74c0180 100644 --- a/telemetry/producer.go +++ b/telemetry/producer.go @@ -16,6 +16,8 @@ const ( Kinesis Dispatcher = "kinesis" // Logger registers a simple logger Logger Dispatcher = "logger" + // ZMQ registers a zmq logger + ZMQ Dispatcher = "zmq" ) // BuildTopicName creates a topic from a namespace and a recordName diff --git a/test/integration/Dockerfile b/test/integration/Dockerfile index b3cb0e4..f14bad1 100644 --- a/test/integration/Dockerfile +++ b/test/integration/Dockerfile @@ -1,7 +1,26 @@ # build executable and run integration tests FROM golang:1.20.5-bullseye +# build libsodium (dep of libzmq) +WORKDIR /build +RUN wget https://github.com/jedisct1/libsodium/releases/download/1.0.19-RELEASE/libsodium-1.0.19.tar.gz +RUN tar -xzvf libsodium-1.0.19.tar.gz +WORKDIR /build/libsodium-stable +RUN ./configure --disable-shared --enable-static +RUN make -j`nproc` +RUN make install + +# build libzmq (dep of zmq datastore) +WORKDIR /build +RUN wget https://github.com/zeromq/libzmq/releases/download/v4.3.4/zeromq-4.3.4.tar.gz +RUN tar -xvf zeromq-4.3.4.tar.gz +WORKDIR /build/zeromq-4.3.4 +RUN ./configure --enable-static --disable-shared --disable-Werror +RUN make -j`nproc` +RUN make install + ENV CGO_ENABLED=1 +ENV CGO_LDFLAGS="-lstdc++" WORKDIR /go/src/fleet-telemetry @@ -10,8 +29,9 @@ RUN go mod download COPY . ./ + RUN --mount=type=cache,target=/root/.cache/go-build \ -go build -o /go/bin/fleet-telemetry cmd/main.go +make build WORKDIR /go/src/fleet-telemetry/test/integration diff --git a/test/integration/config.json b/test/integration/config.json index ba140bd..54decf5 100644 --- a/test/integration/config.json +++ b/test/integration/config.json @@ -23,6 +23,9 @@ "V": "test_V" } }, + "zmq": { + "addr": "tcp://*:5284" + }, "monitoring": { "prometheus_metrics_port": 9090, "profiler_port": 4269, @@ -38,7 +41,8 @@ "kafka", "kinesis", "pubsub", - "logger" + "logger", + "zmq" ] }, "tls": { diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index bbc900f..f85f9fb 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -31,6 +31,7 @@ const ( kafkaGroup = "test-kafka-consumer" kafkaBroker = "kafka:9092" pubsubHost = "pubsub:8085" + zmqAddr = "tcp://app:5284" kinesisHost = "http://kinesis:4567" kinesisStreamName = "test_V" ) @@ -50,6 +51,7 @@ var _ = Describe("Test messages", Ordered, func() { pubsubConsumer *TestConsumer kinesisConsumer *TestKinesisConsumer kafkaConsumer *kafka.Consumer + zmqConsumer *TestZMQConsumer tlsConfig *tls.Config timestamp *timestamppb.Timestamp logger *logrus.Logger @@ -78,12 +80,16 @@ var _ = Describe("Test messages", Ordered, func() { "auto.offset.reset": "earliest", }) Expect(err).NotTo(HaveOccurred()) + + zmqConsumer, err = NewTestZMQConsumer(zmqAddr, vehicleTopic, logger) + Expect(err).NotTo(HaveOccurred()) }) AfterAll(func() { _ = kafkaConsumer.Close() pubsubConsumer.ClearSubscriptions() _ = connection.Close() + zmqConsumer.Close() os.Clearenv() }) @@ -159,6 +165,17 @@ var _ = Describe("Test messages", Ordered, func() { }, time.Second*5, time.Millisecond*100).Should(BeNil()) VerifyMessageBody(record.Data, vehicleName) }) + + It("reads data from zmq subscriber", func() { + err := connection.WriteMessage(websocket.BinaryMessage, payload) + Expect(err).NotTo(HaveOccurred()) + + topic, data, err := zmqConsumer.NextMessage() + Expect(err).NotTo(HaveOccurred()) + Expect(data).NotTo(BeNil()) + Expect(topic).To(Equal(vehicleTopic)) + VerifyMessageBody(data, vehicleName) + }) }) // VerifyHTTPSRequest validates API returns 200 status code diff --git a/test/integration/zmq_consumer_test.go b/test/integration/zmq_consumer_test.go new file mode 100644 index 0000000..22bcdcb --- /dev/null +++ b/test/integration/zmq_consumer_test.go @@ -0,0 +1,54 @@ +package integration_test + +import ( + "github.com/pebbe/zmq4" + "github.com/sirupsen/logrus" +) + +type TestZMQConsumer struct { + sock *zmq4.Socket +} + +func NewTestZMQConsumer(addr string, subscription string, logger *logrus.Logger) (*TestZMQConsumer, error) { + sock, err := zmq4.NewSocket(zmq4.SUB) + if err != nil { + return nil, err + } + + if err := sock.SetSubscribe(subscription); err != nil { + return nil, err + } + + if err := sock.Connect(addr); err != nil { + return nil, err + } + + return &TestZMQConsumer{sock}, nil +} + +func (t *TestZMQConsumer) Close() { + t.sock.Close() +} + +type malformedZMQMessage struct{} + +func (malformedZMQMessage) Error() string { + return "Malformed message" +} + +var ErrMalformedZMQMessage malformedZMQMessage + +func (t *TestZMQConsumer) NextMessage() (topic string, data []byte, err error) { + messages, err := t.sock.RecvMessageBytes(zmq4.Flag(0)) + if err != nil { + return "", nil, err + } + + if len(messages) != 2 { + return "", nil, ErrMalformedZMQMessage + } + + topic = string(messages[0]) + data = messages[1] + return +}