diff --git a/datastore/mqtt/mqtt.go b/datastore/mqtt/mqtt.go index a011b28..9bdf83e 100644 --- a/datastore/mqtt/mqtt.go +++ b/datastore/mqtt/mqtt.go @@ -14,7 +14,8 @@ import ( "github.com/teslamotors/fleet-telemetry/telemetry" ) -type MQTTProducer struct { +// Producer is a telemetry.Producer that sends records to an MQTT broker. +type Producer struct { client pahomqtt.Client config *Config logger *logrus.Logger @@ -25,6 +26,7 @@ type MQTTProducer struct { reliableAckTxTypes map[string]interface{} } +// Config holds the configuration for the MQTT producer. type Config struct { Broker string `json:"broker"` ClientID string `json:"client_id"` @@ -40,6 +42,7 @@ type Config struct { KeepAlive int `json:"keep_alive_seconds"` } +// Metrics holds the metrics for the MQTT producer. type Metrics struct { errorCount adapter.Counter publishCount adapter.Counter @@ -47,6 +50,7 @@ type Metrics struct { reliableAckCount adapter.Counter } +// Default values for the MQTT producer configuration options. const ( DefaultPublishTimeout = 2500 DefaultConnectTimeout = 30000 @@ -61,9 +65,10 @@ var ( metricsOnce sync.Once ) -// Allow us to mock the mqtt.NewClient function for testing +// PahoNewClient allows mocking the mqtt.NewClient function for testing var PahoNewClient = pahomqtt.NewClient +// NewProducer creates a new MQTT producer. func NewProducer(ctx context.Context, config *Config, metrics metrics.MetricCollector, namespace string, airbrakeHandler *airbrake.Handler, ackChan chan (*telemetry.Record), reliableAckTxTypes map[string]interface{}, logger *logrus.Logger) (telemetry.Producer, error) { registerMetricsOnce(metrics) @@ -102,7 +107,7 @@ func NewProducer(ctx context.Context, config *Config, metrics metrics.MetricColl client := PahoNewClient(opts) client.Connect() - return &MQTTProducer{ + return &Producer{ client: client, config: config, logger: logger, @@ -114,7 +119,8 @@ func NewProducer(ctx context.Context, config *Config, metrics metrics.MetricColl }, nil } -func (p *MQTTProducer) Produce(rec *telemetry.Record) { +// Produce sends a record to the MQTT broker. +func (p *Producer) Produce(rec *telemetry.Record) { if p.ctx.Err() != nil { return } @@ -182,12 +188,12 @@ func waitTokenTimeout(t pahomqtt.Token, d time.Duration) error { return t.Error() } -func (p *MQTTProducer) updateMetrics(txType string, byteCount int) { +func (p *Producer) updateMetrics(txType string, byteCount int) { metricsRegistry.byteTotal.Add(int64(byteCount), map[string]string{"record_type": txType}) metricsRegistry.publishCount.Inc(map[string]string{"record_type": txType}) } -func (p *MQTTProducer) createLogInfo(rec *telemetry.Record) logrus.LogInfo { +func (p *Producer) createLogInfo(rec *telemetry.Record) logrus.LogInfo { logInfo := logrus.LogInfo{ "topic_name": telemetry.BuildTopicName(p.namespace, rec.TxType), "txid": rec.Txid, @@ -196,7 +202,8 @@ func (p *MQTTProducer) createLogInfo(rec *telemetry.Record) logrus.LogInfo { return logInfo } -func (p *MQTTProducer) ProcessReliableAck(entry *telemetry.Record) { +// ProcessReliableAck marks a message as successfully published. +func (p *Producer) ProcessReliableAck(entry *telemetry.Record) { _, ok := p.reliableAckTxTypes[entry.TxType] if ok { p.ackChan <- entry @@ -204,12 +211,14 @@ func (p *MQTTProducer) ProcessReliableAck(entry *telemetry.Record) { } } -func (p *MQTTProducer) ReportError(message string, err error, logInfo logrus.LogInfo) { +// ReportError logs an error and sends it to Airbrake. +func (p *Producer) ReportError(message string, err error, logInfo logrus.LogInfo) { p.airbrakeHandler.ReportLogMessage(logrus.ERROR, message, err, logInfo) p.logger.ErrorLog(message, err, logInfo) } -func (p *MQTTProducer) Close() error { +// Close disconnects from the MQTT client. +func (p *Producer) Close() error { p.client.Disconnect(uint(p.config.DisconnectTimeout)) return nil } diff --git a/datastore/mqtt/mqtt_payload.go b/datastore/mqtt/mqtt_payload.go index 0f4914f..265fc0a 100644 --- a/datastore/mqtt/mqtt_payload.go +++ b/datastore/mqtt/mqtt_payload.go @@ -11,7 +11,7 @@ import ( "google.golang.org/protobuf/reflect/protoreflect" ) -func (p *MQTTProducer) processVehicleFields(rec *telemetry.Record, payload *protos.Payload) ([]pahomqtt.Token, error) { +func (p *Producer) processVehicleFields(rec *telemetry.Record, payload *protos.Payload) ([]pahomqtt.Token, error) { var tokens []pahomqtt.Token convertedPayload := p.payloadToMap(payload) for key, value := range convertedPayload { @@ -27,7 +27,7 @@ func (p *MQTTProducer) processVehicleFields(rec *telemetry.Record, payload *prot return tokens, nil } -func (p *MQTTProducer) processVehicleAlerts(rec *telemetry.Record, payload *protos.VehicleAlerts) ([]pahomqtt.Token, error) { +func (p *Producer) processVehicleAlerts(rec *telemetry.Record, payload *protos.VehicleAlerts) ([]pahomqtt.Token, error) { tokens := make([]pahomqtt.Token, 0, len(payload.Alerts)*2) alertsHistory := make(map[string][]*protos.VehicleAlert, len(payload.Alerts)) alertsCurrentState := make(map[string]*protos.VehicleAlert, len(payload.Alerts)) @@ -80,7 +80,7 @@ func (p *MQTTProducer) processVehicleAlerts(rec *telemetry.Record, payload *prot return tokens, nil } -func (p *MQTTProducer) processVehicleErrors(rec *telemetry.Record, payload *protos.VehicleErrors) ([]pahomqtt.Token, error) { +func (p *Producer) processVehicleErrors(rec *telemetry.Record, payload *protos.VehicleErrors) ([]pahomqtt.Token, error) { var tokens []pahomqtt.Token for _, vehicleError := range payload.Errors { @@ -99,7 +99,7 @@ func (p *MQTTProducer) processVehicleErrors(rec *telemetry.Record, payload *prot return tokens, nil } -func (p *MQTTProducer) processVehicleConnectivity(rec *telemetry.Record, payload *protos.VehicleConnectivity) ([]pahomqtt.Token, error) { +func (p *Producer) processVehicleConnectivity(rec *telemetry.Record, payload *protos.VehicleConnectivity) ([]pahomqtt.Token, error) { topicName := fmt.Sprintf("%s/%s/connectivity", p.config.TopicBase, rec.Vin) value := map[string]interface{}{ "ConnectionId": payload.GetConnectionId(), @@ -149,7 +149,7 @@ func vehicleErrorToMqttMap(vehicleError *protos.VehicleError) map[string]interfa } // PayloadToMap transforms a Payload into a map for mqtt purposes -func (p *MQTTProducer) payloadToMap(payload *protos.Payload) map[string]interface{} { +func (p *Producer) payloadToMap(payload *protos.Payload) map[string]interface{} { convertedPayload := make(map[string]interface{}, len(payload.Data)) for _, datum := range payload.Data { convertedPayload[datum.Key.String()] = getDatumValue(datum.Value) diff --git a/datastore/mqtt/mqtt_test.go b/datastore/mqtt/mqtt_test.go index 3720968..1e1ce5c 100644 --- a/datastore/mqtt/mqtt_test.go +++ b/datastore/mqtt/mqtt_test.go @@ -105,7 +105,7 @@ func resetPublishedTopics() { publishedTopics = make(map[string][]byte) } -func mockPahoNewClient(o *pahomqtt.ClientOptions) pahomqtt.Client { +func mockPahoNewClient(_ *pahomqtt.ClientOptions) pahomqtt.Client { return &MockMQTTClient{ ConnectFunc: func() pahomqtt.Token { @@ -117,10 +117,10 @@ func mockPahoNewClient(o *pahomqtt.ClientOptions) pahomqtt.Client { IsConnectedFunc: func() bool { return true }, - PublishFunc: func(topic string, qos byte, retained bool, payload interface{}) pahomqtt.Token { + PublishFunc: func(topic string, _ byte, _ bool, payload interface{}) pahomqtt.Token { publishedTopics[topic] = payload.([]byte) return &MockToken{ - WaitTimeoutFunc: func(d time.Duration) bool { return true }, + WaitTimeoutFunc: func(_ time.Duration) bool { return true }, WaitFunc: func() bool { return true }, ErrorFunc: func() error { return nil }, } @@ -329,10 +329,10 @@ var _ = Describe("MQTTProducer", func() { var alert1Current, alert2Current map[string]interface{} var alert1History, alert2History []map[string]interface{} - json.Unmarshal(publishedTopics[alert1CurrentTopic], &alert1Current) - json.Unmarshal(publishedTopics[alert1HistoryTopic], &alert1History) - json.Unmarshal(publishedTopics[alert2CurrentTopic], &alert2Current) - json.Unmarshal(publishedTopics[alert2HistoryTopic], &alert2History) + Expect(json.Unmarshal(publishedTopics[alert1CurrentTopic], &alert1Current)).NotTo(HaveOccurred()) + Expect(json.Unmarshal(publishedTopics[alert1HistoryTopic], &alert1History)).NotTo(HaveOccurred()) + Expect(json.Unmarshal(publishedTopics[alert2CurrentTopic], &alert2Current)).NotTo(HaveOccurred()) + Expect(json.Unmarshal(publishedTopics[alert2HistoryTopic], &alert2History)).NotTo(HaveOccurred()) Expect(alert1Current).To(HaveKey("StartedAt")) Expect(alert1Current).NotTo(HaveKey("EndedAt")) @@ -405,8 +405,8 @@ var _ = Describe("MQTTProducer", func() { Expect(publishedTopics).To(HaveKey(error2Topic)) var error1, error2 map[string]interface{} - json.Unmarshal(publishedTopics[error1Topic], &error1) - json.Unmarshal(publishedTopics[error2Topic], &error2) + Expect(json.Unmarshal(publishedTopics[error1Topic], &error1)).NotTo(HaveOccurred()) + Expect(json.Unmarshal(publishedTopics[error2Topic], &error2)).NotTo(HaveOccurred()) Expect(error1).To(HaveKey("Body")) Expect(error1["Body"]).To(Equal("This is a test error")) @@ -422,7 +422,7 @@ var _ = Describe("MQTTProducer", func() { It("should handle timeouts when publishing MQTT messages", func() { // Mock a slow publish function that always times out - mqtt.PahoNewClient = func(o *pahomqtt.ClientOptions) pahomqtt.Client { + mqtt.PahoNewClient = func(_ *pahomqtt.ClientOptions) pahomqtt.Client { return &MockMQTTClient{ ConnectFunc: func() pahomqtt.Token { return &MockToken{ @@ -433,9 +433,9 @@ var _ = Describe("MQTTProducer", func() { IsConnectedFunc: func() bool { return true }, - PublishFunc: func(topic string, qos byte, retained bool, payload interface{}) pahomqtt.Token { + PublishFunc: func(_ string, _ byte, _ bool, _ interface{}) pahomqtt.Token { return &MockToken{ - WaitTimeoutFunc: func(d time.Duration) bool { return false }, + WaitTimeoutFunc: func(_ time.Duration) bool { return false }, WaitFunc: func() bool { return false }, ErrorFunc: func() error { return pahomqtt.TimedOut }, } diff --git a/test/integration/mqtt_consumer_test.go b/test/integration/mqtt_consumer_test.go index 5a6e0ff..034fe32 100644 --- a/test/integration/mqtt_consumer_test.go +++ b/test/integration/mqtt_consumer_test.go @@ -40,7 +40,7 @@ func NewTestMQTTConsumer(broker, topic string, logger *logrus.Logger) (*TestMQTT return consumer, nil } -func (t *TestMQTTConsumer) messageHandler(client pahomqtt.Client, msg pahomqtt.Message) { +func (t *TestMQTTConsumer) messageHandler(_ pahomqtt.Client, msg pahomqtt.Message) { t.msgChan <- msg.Payload() }