From 9d8cc25f255e2f7338bb2b02c91a4ce7282ba555 Mon Sep 17 00:00:00 2001 From: Patrick Demers Date: Wed, 20 Dec 2023 17:23:09 -0600 Subject: [PATCH] create http dispatcher --- README.md | 1 + config/config.go | 15 ++ config/config_test.go | 28 ++++ config/test_configs_test.go | 14 ++ datastore/http/http.go | 186 +++++++++++++++++++++++++ datastore/http/http_suite_test.go | 13 ++ datastore/http/http_test.go | 50 +++++++ datastore/simple/logger.go | 50 +------ datastore/simple/logger_test.go | 46 ++---- telemetry/producer.go | 2 + telemetry/record.go | 40 ++++++ telemetry/record_test.go | 77 ++++++++++ test/integration/Dockerfile | 3 + test/integration/config.json | 7 +- test/integration/http_consumer_test.go | 62 +++++++++ test/integration/integration_test.go | 26 ++++ test/integration/test.sh | 2 +- 17 files changed, 540 insertions(+), 82 deletions(-) create mode 100644 datastore/http/http.go create mode 100644 datastore/http/http_suite_test.go create mode 100644 datastore/http/http_test.go create mode 100644 test/integration/http_consumer_test.go diff --git a/README.md b/README.md index a7cd7cb..d1a5158 100644 --- a/README.md +++ b/README.md @@ -143,6 +143,7 @@ The following [dispatchers](./telemetry/producer.go#L10-L19) are supported * Override stream names with env variables: KINESIS_STREAM_\*uppercase topic\* ex.: `KINESIS_STREAM_V` * Google pubsub: Along with the required pubsub config (See ./test/integration/config.json for example), be sure to set the environment variable `GOOGLE_APPLICATION_CREDENTIALS` * ZMQ: Configure with the config.json file. See implementation here: [config/config.go](./config/config.go) +* HTTP: Send events to an HTTP endpoint. This should **never** be used in production grade systems as failed requests are discarded and there is limited scalability. It is suitable for owners wishing to stream data directly from their own vehicles. [View configuration options](./datastore/http/http.go#L37). * Logger: This is a simple STDOUT logger that serializes the protos to json. >NOTE: To add a new dispatcher, please provide integration tests and updated documentation. diff --git a/config/config.go b/config/config.go index 95e8a2c..211da7b 100644 --- a/config/config.go +++ b/config/config.go @@ -17,6 +17,7 @@ import ( confluent "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/teslamotors/fleet-telemetry/datastore/googlepubsub" + "github.com/teslamotors/fleet-telemetry/datastore/http" "github.com/teslamotors/fleet-telemetry/datastore/kafka" "github.com/teslamotors/fleet-telemetry/datastore/kinesis" "github.com/teslamotors/fleet-telemetry/datastore/simple" @@ -65,6 +66,9 @@ type Config struct { // ZMQ configures a zeromq socket ZMQ *zmq.Config `json:"zmq,omitempty"` + // HTTP is a configuration for HTTP producer + HTTP *http.Config `json:"http,omitempty"` + // Namespace defines a prefix for the kafka/pubsub topic Namespace string `json:"namespace,omitempty"` @@ -251,6 +255,17 @@ func (c *Config) ConfigureProducers(logger *logrus.Logger) (map[string][]telemet producers[telemetry.ZMQ] = zmqProducer } + if _, ok := requiredDispatchers[telemetry.HTTP]; ok { + if c.HTTP == nil { + return nil, errors.New("expected http to be configured") + } + httpProducer, err := http.NewProducer(c.HTTP, c.MetricCollector, c.Namespace, logger) + if err != nil { + return nil, err + } + producers[telemetry.HTTP] = httpProducer + } + dispatchProducerRules := make(map[string][]telemetry.Producer) for recordName, dispatchRules := range c.Records { var dispatchFuncs []telemetry.Producer diff --git a/config/config_test.go b/config/config_test.go index d85098a..218fce1 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -222,6 +222,34 @@ var _ = Describe("Test full application config", func() { }) }) + Context("configure http", func() { + var httpConfig *Config + + BeforeEach(func() { + var err error + httpConfig, err = loadTestApplicationConfig(TestHTTPConfig) + Expect(err).NotTo(HaveOccurred()) + }) + + It("returns an error if http isn't included", func() { + log, _ := test.NewNullLogger() + config.Records = map[string][]telemetry.Dispatcher{"FS": {"http"}} + var err error + producers, err = config.ConfigureProducers(log) + Expect(err).To(MatchError("expected http to be configured")) + Expect(producers).To(BeNil()) + producers, err = httpConfig.ConfigureProducers(log) + Expect(err).To(BeNil()) + }) + + It("http config works", func() { + log, _ := test.NewNullLogger() + producers, err := httpConfig.ConfigureProducers(log) + Expect(err).NotTo(HaveOccurred()) + Expect(producers["FS"]).NotTo(BeNil()) + }) + }) + Context("configureMetricsCollector", func() { It("does not fail when TLS is nil ", func() { log, _ := test.NewNullLogger() diff --git a/config/test_configs_test.go b/config/test_configs_test.go index 13add27..7ac8119 100644 --- a/config/test_configs_test.go +++ b/config/test_configs_test.go @@ -95,3 +95,17 @@ const TestZMQConfig = ` } } ` + +const TestHTTPConfig = ` +{ + "host": "127.0.0.1", + "port": 443, + "status_port": 8080, + "http": { + "url": "https://tesla.com/webhook" + }, + "records": { + "FS": ["http"] + } +} +` diff --git a/datastore/http/http.go b/datastore/http/http.go new file mode 100644 index 0000000..0b76f96 --- /dev/null +++ b/datastore/http/http.go @@ -0,0 +1,186 @@ +package http + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "net/http" + "sync" + "time" + + "github.com/sirupsen/logrus" + + "github.com/teslamotors/fleet-telemetry/metrics" + "github.com/teslamotors/fleet-telemetry/metrics/adapter" + "github.com/teslamotors/fleet-telemetry/telemetry" +) + +// Producer client to handle http +type Producer struct { + namespace string + metricsCollector metrics.MetricCollector + logger *logrus.Logger + records chan *telemetry.Record + address string + httpClient http.Client +} + +// Metrics stores metrics reported from this package +type Metrics struct { + produceCount adapter.Counter + byteTotal adapter.Counter + errorCount adapter.Counter + bufferSize adapter.Gauge +} + +// Config contains the data necessary to configure an http producer. +type Config struct { + // WorkerCount is the number of http producer routines running. + // This number should be increased if `http_produce_buffer_size` is growing. + WorkerCount int `json:"worker_count"` + + // Address is the address to produce requests to. + Address string `json:"address"` + + // Timeout is the number of seconds to wait for a response. Defaults to 10. + Timeout int `json:"timeout"` +} + +const ( + statusCodePreSendErr = "NOT_SENT" + bufferSize = 128 +) + +var ( + metricsRegistry Metrics + metricsOnce sync.Once +) + +// NewProducer sets up an HTTP producer +func NewProducer(config *Config, metricsCollector metrics.MetricCollector, namespace string, logger *logrus.Logger) (telemetry.Producer, error) { + registerMetricsOnce(metricsCollector) + + err := validateConfig(config) + if err != nil { + return nil, err + } + requests := make(chan *telemetry.Record, bufferSize) + + producer := &Producer{ + namespace: namespace, + metricsCollector: metricsCollector, + records: requests, + address: config.Address, + logger: logger, + httpClient: http.Client{ + Timeout: time.Duration(config.Timeout) * time.Second, + }, + } + + for i := 0; i < config.WorkerCount; i++ { + go producer.worker(requests) + } + + logger.Infof("registered http producer for namespace: %s", namespace) + return producer, nil +} + +// validateConfig validates configuration values and sets defaults if value not set +func validateConfig(c *Config) error { + if c.WorkerCount < 0 { + return errors.New("invalid http worker count") + } + if c.WorkerCount == 0 { + c.WorkerCount = 5 + } + if c.Timeout < 0 { + return errors.New("invalid http timeout") + } + if c.Timeout == 0 { + c.Timeout = 10 + } + return nil +} + +func (p *Producer) worker(records <-chan *telemetry.Record) { + for record := range records { + metricsRegistry.bufferSize.Sub(1, map[string]string{}) + p.sendHTTP(record) + } +} + +func (p *Producer) sendHTTP(record *telemetry.Record) { + url := fmt.Sprintf("%s?namespace=%s&type=%s", p.address, p.namespace, record.TxType) + payloadJSON, err := record.ToJSON() + if err != nil { + p.logError(fmt.Errorf("serialize_body_err %s", err.Error())) + return + } + + req, err := http.NewRequest("POST", url, bytes.NewBuffer(json.RawMessage(payloadJSON))) + if err != nil { + p.logError(fmt.Errorf("create_request_err %s", err.Error())) + return + } + + req.Header.Set("Content-Type", "application/json") + for key, value := range record.Metadata() { + req.Header.Set(key, value) + } + + resp, err := p.httpClient.Do(req) + if err != nil { + p.logError(fmt.Errorf("send_request_err %s", err.Error())) + return + } + defer resp.Body.Close() + + if resp.StatusCode >= 200 && resp.StatusCode <= 299 { + metricsRegistry.produceCount.Inc(map[string]string{"record_type": record.TxType}) + metricsRegistry.byteTotal.Add(int64(record.Length()), map[string]string{"record_type": record.TxType}) + return + } + p.logError(fmt.Errorf("response_status_code %d", resp.StatusCode)) +} + +// Produce asynchronously sends the record payload to http endpoint +func (p *Producer) Produce(record *telemetry.Record) { + p.records <- record + metricsRegistry.bufferSize.Inc(map[string]string{}) +} + +func (p *Producer) logError(err error) { + p.logger.Errorf("http_producer_err err: %v", err) + metricsRegistry.errorCount.Inc(map[string]string{}) +} + +func registerMetricsOnce(metricsCollector metrics.MetricCollector) { + metricsOnce.Do(func() { registerMetrics(metricsCollector) }) +} + +func registerMetrics(metricsCollector metrics.MetricCollector) { + metricsRegistry.produceCount = metricsCollector.RegisterCounter(adapter.CollectorOptions{ + Name: "http_produce_total", + Help: "The number of records produced to http.", + Labels: []string{"record_type"}, + }) + + metricsRegistry.byteTotal = metricsCollector.RegisterCounter(adapter.CollectorOptions{ + Name: "http_produce_total_bytes", + Help: "The number of bytes produced to http.", + Labels: []string{"record_type"}, + }) + + metricsRegistry.errorCount = metricsCollector.RegisterCounter(adapter.CollectorOptions{ + Name: "http_produce_err", + Help: "The number of errors while producing to http.", + Labels: []string{}, + }) + + metricsRegistry.bufferSize = metricsCollector.RegisterGauge(adapter.CollectorOptions{ + Name: "http_produce_buffer_size", + Help: "The number of records waiting to be produced.", + Labels: []string{}, + }) +} diff --git a/datastore/http/http_suite_test.go b/datastore/http/http_suite_test.go new file mode 100644 index 0000000..68e193b --- /dev/null +++ b/datastore/http/http_suite_test.go @@ -0,0 +1,13 @@ +package http_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestConfigs(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Http Producer Suite Tests") +} diff --git a/datastore/http/http_test.go b/datastore/http/http_test.go new file mode 100644 index 0000000..8955351 --- /dev/null +++ b/datastore/http/http_test.go @@ -0,0 +1,50 @@ +package http_test + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/sirupsen/logrus" + "github.com/teslamotors/fleet-telemetry/datastore/http" + "github.com/teslamotors/fleet-telemetry/metrics" +) + +var _ = Describe("HTTP Producer", func() { + var ( + mockLogger *logrus.Logger + mockCollector metrics.MetricCollector + mockConfig *http.Config + ) + + BeforeEach(func() { + mockLogger = logrus.New() + mockCollector = metrics.NewCollector(nil, mockLogger) + mockConfig = &http.Config{ + WorkerCount: 2, + Address: "https://tesla.com", + Timeout: 5, + } + }) + + Context("NewProducer", func() { + It("creates a new HTTP producer with valid config", func() { + producer, err := http.NewProducer(mockConfig, mockCollector, "test", mockLogger) + Expect(err).ToNot(HaveOccurred()) + Expect(producer).ToNot(BeNil()) + }) + + It("returns an error for negative worker count", func() { + mockConfig.WorkerCount = -1 + producer, err := http.NewProducer(mockConfig, mockCollector, "test", mockLogger) + Expect(err).To(HaveOccurred()) + Expect(producer).To(BeNil()) + }) + + It("returns an error for negative timeout", func() { + mockConfig.Timeout = -1 + producer, err := http.NewProducer(mockConfig, mockCollector, "test", mockLogger) + Expect(err).To(HaveOccurred()) + Expect(producer).To(BeNil()) + }) + }) +}) diff --git a/datastore/simple/logger.go b/datastore/simple/logger.go index 164bcba..b6573cf 100644 --- a/datastore/simple/logger.go +++ b/datastore/simple/logger.go @@ -1,69 +1,27 @@ package simple import ( - "fmt" - "github.com/sirupsen/logrus" - "google.golang.org/protobuf/encoding/protojson" - "google.golang.org/protobuf/proto" - "github.com/teslamotors/fleet-telemetry/protos" "github.com/teslamotors/fleet-telemetry/telemetry" ) // ProtoLogger is a simple protobuf logger type ProtoLogger struct { - logger *logrus.Logger - options protojson.MarshalOptions + logger *logrus.Logger } -var ( - protobufMap = map[string]func() proto.Message{ - "alerts": func() proto.Message { - return &protos.VehicleAlerts{} - }, - "errors": func() proto.Message { - return &protos.VehicleErrors{} - }, - "V": func() proto.Message { - return &protos.Payload{} - }, - } -) - // NewProtoLogger initializes the parameters for protobuf payload logging func NewProtoLogger(logger *logrus.Logger) telemetry.Producer { - return &ProtoLogger{ - logger: logger, - options: protojson.MarshalOptions{ - UseEnumNumbers: false, - EmitUnpopulated: true, - Indent: ""}} - -} - -// GetProtoMessage converts telemetry record to protoMessage based on txType -func (p *ProtoLogger) GetProtoMessage(entry *telemetry.Record) (proto.Message, error) { - msgFunc, ok := protobufMap[entry.TxType] - if !ok { - return nil, fmt.Errorf("no mapping for txType: %s", entry.TxType) - } - message := msgFunc() - err := proto.Unmarshal(entry.Payload(), message) - return message, err + return &ProtoLogger{logger: logger} } // Produce sends the data to the logger func (p *ProtoLogger) Produce(entry *telemetry.Record) { - payload, err := p.GetProtoMessage(entry) + output, err := entry.ToJSON() if err != nil { p.logger.Errorf("logger_proto_unmarshal_error %s %v %s\n", entry.Vin, entry.Metadata(), err.Error()) return } - output, err := p.options.Marshal(payload) - if err != nil { - p.logger.Errorf("logger_json_unmarshal_error %s %v %s\n", entry.Vin, entry.Metadata(), err.Error()) - } else { - p.logger.Infof("logger_json_unmarshal %s %v %s\n", entry.Vin, entry.Metadata(), output) - } + p.logger.Infof("logger_json_unmarshal %s %v %s\n", entry.Vin, entry.Metadata(), output) } diff --git a/datastore/simple/logger_test.go b/datastore/simple/logger_test.go index 19e202b..b47130d 100644 --- a/datastore/simple/logger_test.go +++ b/datastore/simple/logger_test.go @@ -4,6 +4,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/test" "github.com/teslamotors/fleet-telemetry/datastore/simple" "github.com/teslamotors/fleet-telemetry/protos" @@ -14,51 +15,28 @@ import ( var _ = Describe("Proto Logger", func() { var ( protoLogger *simple.ProtoLogger + logger *logrus.Logger + hook *test.Hook ) BeforeEach(func() { - logger, _ := test.NewNullLogger() + logger, hook = test.NewNullLogger() protoLogger, _ = simple.NewProtoLogger(logger).(*simple.ProtoLogger) }) - DescribeTable("Proto Parsing", - func(txType string, input proto.Message, verifyOutput func(proto.Message) bool) { + DescribeTable("Produce", + func(txType string, input proto.Message, logContains string) { payloadBytes, err := proto.Marshal(input) Expect(err).NotTo(HaveOccurred()) - output, err := protoLogger.GetProtoMessage(&telemetry.Record{ + protoLogger.Produce(&telemetry.Record{ TxType: txType, PayloadBytes: payloadBytes, }) - Expect(err).NotTo(HaveOccurred()) - Expect(verifyOutput(output)).To(BeTrue()) + Expect(hook.LastEntry().Message).To(ContainSubstring(logContains)) }, - Entry("for txType alerts", "alerts", &protos.VehicleAlerts{Vin: "testAlertVin"}, func(msg proto.Message) bool { - myMsg, ok := msg.(*protos.VehicleAlerts) - if !ok { - return false - } - return myMsg.GetVin() == "testAlertVin" - }), - Entry("for txType errors", "errors", &protos.VehicleErrors{Vin: "testErrorVin"}, func(msg proto.Message) bool { - myMsg, ok := msg.(*protos.VehicleErrors) - if !ok { - return false - } - return myMsg.GetVin() == "testErrorVin" - }), - Entry("for txType V", "V", &protos.Payload{Vin: "testPayloadVIN"}, func(msg proto.Message) bool { - myMsg, ok := msg.(*protos.Payload) - if !ok { - return false - } - return myMsg.GetVin() == "testPayloadVIN" - }), + Entry("for txType alerts", "alerts", &protos.VehicleAlerts{Vin: "testAlertVin"}, "logger_json_unmarshal"), + Entry("for txType errors", "errors", &protos.VehicleErrors{Vin: "testErrorVin"}, "logger_json_unmarshal"), + Entry("for txType V", "V", &protos.Payload{Vin: "testPayloadVIN"}, "logger_json_unmarshal"), + Entry("for txType INVALID", "INVALID", &protos.Payload{Vin: "testPayloadVIN"}, "logger_proto_unmarshal_error"), ) - - It("Doesn't process unknown txtype", func() { - _, err := protoLogger.GetProtoMessage(&telemetry.Record{ - TxType: "badTxType", - }) - Expect(err).To(MatchError("no mapping for txType: badTxType")) - }) }) diff --git a/telemetry/producer.go b/telemetry/producer.go index 74c0180..7fc13f6 100644 --- a/telemetry/producer.go +++ b/telemetry/producer.go @@ -18,6 +18,8 @@ const ( Logger Dispatcher = "logger" // ZMQ registers a zmq logger ZMQ Dispatcher = "zmq" + // HTTP registers an http publisher + HTTP Dispatcher = "http" ) // BuildTopicName creates a topic from a namespace and a recordName diff --git a/telemetry/record.go b/telemetry/record.go index 24c030e..c7807b9 100644 --- a/telemetry/record.go +++ b/telemetry/record.go @@ -6,6 +6,7 @@ import ( "time" "github.com/teslamotors/fleet-telemetry/protos" + "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -17,6 +18,24 @@ const ( maxSecondsInDuration = 315576000000 ) +var ( + jsonOptions = protojson.MarshalOptions{ + UseEnumNumbers: false, + EmitUnpopulated: true, + Indent: ""} + protobufMap = map[string]func() proto.Message{ + "alerts": func() proto.Message { + return &protos.VehicleAlerts{} + }, + "errors": func() proto.Message { + return &protos.VehicleErrors{} + }, + "V": func() proto.Message { + return &protos.Payload{} + }, + } +) + // Record is a structs that represents the telemetry records vehicles send to the backend // vin is used as kafka produce partitioning key by default, can be configured to random type Record struct { @@ -100,6 +119,27 @@ func (record *Record) Dispatch() { record.Serializer.Dispatch(record) } +// GetProtoMessage converts the record to a proto Message +func (record *Record) GetProtoMessage() (proto.Message, error) { + msgFunc, ok := protobufMap[record.TxType] + if !ok { + return nil, fmt.Errorf("no mapping for txType: %s", record.TxType) + } + message := msgFunc() + err := proto.Unmarshal(record.Payload(), message) + return message, err +} + +// ToJSON serializes the record to a JSON string +func (record *Record) ToJSON() (string, error) { + payload, err := record.GetProtoMessage() + if err != nil { + return "", err + } + bytes, err := jsonOptions.Marshal(payload) + return string(bytes), err +} + func (record *Record) ensureEncoded() { if record.RawBytes == nil && record.Serializer != nil && record.Serializer.Logger() != nil { record.Serializer.Logger().Error("record_RawBytes_blank") diff --git a/telemetry/record_test.go b/telemetry/record_test.go index e397144..d8ad316 100644 --- a/telemetry/record_test.go +++ b/telemetry/record_test.go @@ -222,6 +222,83 @@ var _ = Describe("Socket handler test", func() { Entry("for a valid loc, SW", "(37.412374 S, 122.145867 W)", &protos.LocationValue{Latitude: -37.412374, Longitude: -122.145867}, ""), Entry("for a valid loc, SE", "(37.412374 S, 122.145867 E)", &protos.LocationValue{Latitude: -37.412374, Longitude: 122.145867}, ""), ) + + Describe("GetProtoMessage", func() { + DescribeTable("valid alert types", + func(txType string, input proto.Message, verifyOutput func(proto.Message) bool) { + payloadBytes, err := proto.Marshal(input) + Expect(err).NotTo(HaveOccurred()) + record := &telemetry.Record{ + TxType: txType, + PayloadBytes: payloadBytes, + } + output, err := record.GetProtoMessage() + Expect(err).NotTo(HaveOccurred()) + Expect(verifyOutput(output)).To(BeTrue()) + }, + Entry("for txType alerts", "alerts", &protos.VehicleAlerts{Vin: "testAlertVin"}, func(msg proto.Message) bool { + myMsg, ok := msg.(*protos.VehicleAlerts) + if !ok { + return false + } + return myMsg.GetVin() == "testAlertVin" + }), + Entry("for txType errors", "errors", &protos.VehicleErrors{Vin: "testErrorVin"}, func(msg proto.Message) bool { + myMsg, ok := msg.(*protos.VehicleErrors) + if !ok { + return false + } + return myMsg.GetVin() == "testErrorVin" + }), + Entry("for txType V", "V", &protos.Payload{Vin: "testPayloadVIN"}, func(msg proto.Message) bool { + myMsg, ok := msg.(*protos.Payload) + if !ok { + return false + } + return myMsg.GetVin() == "testPayloadVIN" + }), + ) + + It("errors on unknown txtype", func() { + record := &telemetry.Record{ + TxType: "badTxType", + } + _, err := record.GetProtoMessage() + Expect(err).To(MatchError("no mapping for txType: badTxType")) + }) + }) + + Describe("ToJson", func() { + It("outputs json with all data", func() { + message := messages.StreamMessage{TXID: []byte("1234"), SenderID: []byte("vehicle_device.42"), MessageTopic: []byte("V"), Payload: generatePayload("cybertruck", "42", nil)} + recordMsg, err := message.ToBytes() + Expect(err).NotTo(HaveOccurred()) + + record, err := telemetry.NewRecord(serializer, recordMsg, "1") + Expect(err).NotTo(HaveOccurred()) + Expect(record).NotTo(BeNil()) + + actualJSON, err := record.ToJSON() + Expect(err).NotTo(HaveOccurred()) + Expect(actualJSON).NotTo(BeEmpty()) + + expectedJSON := "{\"data\":[{\"key\":\"VehicleName\",\"value\":{\"stringValue\":\"cybertruck\"}}],\"createdAt\":null,\"vin\":\"42\"}" + Expect(actualJSON).To(MatchJSON(expectedJSON)) + }) + + It("returns error on invalid txType", func() { + message := messages.StreamMessage{TXID: []byte("1234"), SenderID: []byte("vehicle_device.42"), MessageTopic: []byte("INVALID"), Payload: generatePayload("cybertruck", "42", nil)} + recordMsg, err := message.ToBytes() + Expect(err).NotTo(HaveOccurred()) + + record, err := telemetry.NewRecord(serializer, recordMsg, "1") + Expect(err).NotTo(HaveOccurred()) + Expect(record).NotTo(BeNil()) + + _, err = record.ToJSON() + Expect(err).To(MatchError("no mapping for txType: INVALID")) + }) + }) }) func generatePayload(vehicleName string, vin string, timestamp *timestamppb.Timestamp, extraData ...*protos.Datum) []byte { diff --git a/test/integration/Dockerfile b/test/integration/Dockerfile index f14bad1..8a655bc 100644 --- a/test/integration/Dockerfile +++ b/test/integration/Dockerfile @@ -38,5 +38,8 @@ WORKDIR /go/src/fleet-telemetry/test/integration ARG it_target ENV IT_TARGET=${it_target} +# used for testing http publisher +EXPOSE 3000 + # hadolint ignore=DL3025 CMD go test -v ${IT_TARGET} diff --git a/test/integration/config.json b/test/integration/config.json index 54decf5..979dca8 100644 --- a/test/integration/config.json +++ b/test/integration/config.json @@ -26,6 +26,10 @@ "zmq": { "addr": "tcp://*:5284" }, + "http": { + "address": "http://integration-test:3000/webhook", + "worker_count": 1 + }, "monitoring": { "prometheus_metrics_port": 9090, "profiler_port": 4269, @@ -42,7 +46,8 @@ "kinesis", "pubsub", "logger", - "zmq" + "zmq", + "http" ] }, "tls": { diff --git a/test/integration/http_consumer_test.go b/test/integration/http_consumer_test.go new file mode 100644 index 0000000..3acc24b --- /dev/null +++ b/test/integration/http_consumer_test.go @@ -0,0 +1,62 @@ +package integration_test + +import ( + "io" + "net/http" + + "github.com/sirupsen/logrus" + "github.com/teslamotors/fleet-telemetry/telemetry" +) + +type incomingRequest struct { + topic string + data []byte +} + +type TestHTTPConsumer struct { + remote *http.Server + received chan *incomingRequest +} + +func NewTestHTTPConsumer(addr string, logger *logrus.Logger) (*TestHTTPConsumer, error) { + consumer := &TestHTTPConsumer{ + received: make(chan *incomingRequest, 10), + } + + mux := http.NewServeMux() + mux.HandleFunc("/webhook", consumer.handleRequest) + + consumer.remote = &http.Server{ + Addr: addr, + Handler: mux, + } + + go func() { + if err := consumer.remote.ListenAndServe(); err != nil && err != http.ErrServerClosed { + logger.Fatalf("Server error: %s", err) + } + }() + + return consumer, nil +} + +func (c *TestHTTPConsumer) handleRequest(w http.ResponseWriter, r *http.Request) { + bodyBytes, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "error reading body", http.StatusInternalServerError) + } + q := r.URL.Query() + c.received <- &incomingRequest{ + topic: telemetry.BuildTopicName(q.Get("namespace"), q.Get("type")), + data: bodyBytes, + } +} + +func (c *TestHTTPConsumer) NextMessage() (topic string, data []byte, err error) { + message := <-c.received + return message.topic, message.data, nil +} + +func (c *TestHTTPConsumer) Close() { + _ = c.remote.Close() +} diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index f85f9fb..49a3dce 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -2,6 +2,7 @@ package integration_test import ( "crypto/tls" + "encoding/json" "fmt" "io" "net/http" @@ -32,6 +33,7 @@ const ( kafkaBroker = "kafka:9092" pubsubHost = "pubsub:8085" zmqAddr = "tcp://app:5284" + httpAddr = ":3000" kinesisHost = "http://kinesis:4567" kinesisStreamName = "test_V" ) @@ -52,6 +54,7 @@ var _ = Describe("Test messages", Ordered, func() { kinesisConsumer *TestKinesisConsumer kafkaConsumer *kafka.Consumer zmqConsumer *TestZMQConsumer + httpConsumer *TestHTTPConsumer tlsConfig *tls.Config timestamp *timestamppb.Timestamp logger *logrus.Logger @@ -83,6 +86,9 @@ var _ = Describe("Test messages", Ordered, func() { zmqConsumer, err = NewTestZMQConsumer(zmqAddr, vehicleTopic, logger) Expect(err).NotTo(HaveOccurred()) + + httpConsumer, err = NewTestHTTPConsumer(httpAddr, logger) + Expect(err).NotTo(HaveOccurred()) }) AfterAll(func() { @@ -90,6 +96,7 @@ var _ = Describe("Test messages", Ordered, func() { pubsubConsumer.ClearSubscriptions() _ = connection.Close() zmqConsumer.Close() + httpConsumer.Close() os.Clearenv() }) @@ -176,6 +183,25 @@ var _ = Describe("Test messages", Ordered, func() { Expect(topic).To(Equal(vehicleTopic)) VerifyMessageBody(data, vehicleName) }) + + It("reads data from http subscriber", func() { + time.Sleep(3 * time.Second) + err := connection.WriteMessage(websocket.BinaryMessage, payload) + Expect(err).NotTo(HaveOccurred()) + time.Sleep(3 * time.Second) + + topic, data, err := httpConsumer.NextMessage() + Expect(err).NotTo(HaveOccurred()) + Expect(data).NotTo(BeNil()) + Expect(topic).To(Equal(vehicleTopic)) + + var dataMap map[string]interface{} + err = json.Unmarshal(data, &dataMap) + Expect(err).ToNot(HaveOccurred()) + Expect(dataMap["data"]).To(HaveLen(2)) + Expect(dataMap["vin"]).To(Equal(deviceID)) + Expect(dataMap["createdAt"]).To(Equal(timestamp.AsTime().Format(time.RFC3339Nano))) + }) }) // VerifyHTTPSRequest validates API returns 200 status code diff --git a/test/integration/test.sh b/test/integration/test.sh index c0ade82..a145ef7 100755 --- a/test/integration/test.sh +++ b/test/integration/test.sh @@ -1,3 +1,3 @@ #!/bin/sh -docker run --net=app_default fleet-telemetry-integration-tests +docker run --net=app_default --name integration-test --rm fleet-telemetry-integration-tests