Skip to content

Commit

Permalink
fix mqtt linting errors
Browse files Browse the repository at this point in the history
  • Loading branch information
patrickdemers6 committed Jan 10, 2025
1 parent 3813455 commit 9f3403f
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 27 deletions.
27 changes: 18 additions & 9 deletions datastore/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import (
"github.com/teslamotors/fleet-telemetry/telemetry"
)

type MQTTProducer struct {
// Producer is a telemetry.Producer that sends records to an MQTT broker.
type Producer struct {
client pahomqtt.Client
config *Config
logger *logrus.Logger
Expand All @@ -25,6 +26,7 @@ type MQTTProducer struct {
reliableAckTxTypes map[string]interface{}
}

// Config holds the configuration for the MQTT producer.
type Config struct {
Broker string `json:"broker"`
ClientID string `json:"client_id"`
Expand All @@ -40,13 +42,15 @@ type Config struct {
KeepAlive int `json:"keep_alive_seconds"`
}

// Metrics holds the metrics for the MQTT producer.
type Metrics struct {
errorCount adapter.Counter
publishCount adapter.Counter
byteTotal adapter.Counter
reliableAckCount adapter.Counter
}

// Default values for the MQTT producer configuration options.
const (
DefaultPublishTimeout = 2500
DefaultConnectTimeout = 30000
Expand All @@ -61,9 +65,10 @@ var (
metricsOnce sync.Once
)

// Allow us to mock the mqtt.NewClient function for testing
// PahoNewClient allows mocking the mqtt.NewClient function for testing
var PahoNewClient = pahomqtt.NewClient

// NewProducer creates a new MQTT producer.
func NewProducer(ctx context.Context, config *Config, metrics metrics.MetricCollector, namespace string, airbrakeHandler *airbrake.Handler, ackChan chan (*telemetry.Record), reliableAckTxTypes map[string]interface{}, logger *logrus.Logger) (telemetry.Producer, error) {
registerMetricsOnce(metrics)

Expand Down Expand Up @@ -102,7 +107,7 @@ func NewProducer(ctx context.Context, config *Config, metrics metrics.MetricColl
client := PahoNewClient(opts)
client.Connect()

return &MQTTProducer{
return &Producer{
client: client,
config: config,
logger: logger,
Expand All @@ -114,7 +119,8 @@ func NewProducer(ctx context.Context, config *Config, metrics metrics.MetricColl
}, nil
}

func (p *MQTTProducer) Produce(rec *telemetry.Record) {
// Produce sends a record to the MQTT broker.
func (p *Producer) Produce(rec *telemetry.Record) {
if p.ctx.Err() != nil {
return
}
Expand Down Expand Up @@ -182,12 +188,12 @@ func waitTokenTimeout(t pahomqtt.Token, d time.Duration) error {
return t.Error()
}

func (p *MQTTProducer) updateMetrics(txType string, byteCount int) {
func (p *Producer) updateMetrics(txType string, byteCount int) {
metricsRegistry.byteTotal.Add(int64(byteCount), map[string]string{"record_type": txType})
metricsRegistry.publishCount.Inc(map[string]string{"record_type": txType})
}

func (p *MQTTProducer) createLogInfo(rec *telemetry.Record) logrus.LogInfo {
func (p *Producer) createLogInfo(rec *telemetry.Record) logrus.LogInfo {
logInfo := logrus.LogInfo{
"topic_name": telemetry.BuildTopicName(p.namespace, rec.TxType),
"txid": rec.Txid,
Expand All @@ -196,20 +202,23 @@ func (p *MQTTProducer) createLogInfo(rec *telemetry.Record) logrus.LogInfo {
return logInfo
}

func (p *MQTTProducer) ProcessReliableAck(entry *telemetry.Record) {
// ProcessReliableAck marks a message as successfully published.
func (p *Producer) ProcessReliableAck(entry *telemetry.Record) {
_, ok := p.reliableAckTxTypes[entry.TxType]
if ok {
p.ackChan <- entry
metricsRegistry.reliableAckCount.Inc(map[string]string{"record_type": entry.TxType})
}
}

func (p *MQTTProducer) ReportError(message string, err error, logInfo logrus.LogInfo) {
// ReportError logs an error and sends it to Airbrake.
func (p *Producer) ReportError(message string, err error, logInfo logrus.LogInfo) {
p.airbrakeHandler.ReportLogMessage(logrus.ERROR, message, err, logInfo)
p.logger.ErrorLog(message, err, logInfo)
}

func (p *MQTTProducer) Close() error {
// Close disconnects from the MQTT client.
func (p *Producer) Close() error {
p.client.Disconnect(uint(p.config.DisconnectTimeout))
return nil
}
Expand Down
10 changes: 5 additions & 5 deletions datastore/mqtt/mqtt_payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"google.golang.org/protobuf/reflect/protoreflect"
)

func (p *MQTTProducer) processVehicleFields(rec *telemetry.Record, payload *protos.Payload) ([]pahomqtt.Token, error) {
func (p *Producer) processVehicleFields(rec *telemetry.Record, payload *protos.Payload) ([]pahomqtt.Token, error) {
var tokens []pahomqtt.Token
convertedPayload := p.payloadToMap(payload)
for key, value := range convertedPayload {
Expand All @@ -27,7 +27,7 @@ func (p *MQTTProducer) processVehicleFields(rec *telemetry.Record, payload *prot
return tokens, nil
}

func (p *MQTTProducer) processVehicleAlerts(rec *telemetry.Record, payload *protos.VehicleAlerts) ([]pahomqtt.Token, error) {
func (p *Producer) processVehicleAlerts(rec *telemetry.Record, payload *protos.VehicleAlerts) ([]pahomqtt.Token, error) {
tokens := make([]pahomqtt.Token, 0, len(payload.Alerts)*2)
alertsHistory := make(map[string][]*protos.VehicleAlert, len(payload.Alerts))
alertsCurrentState := make(map[string]*protos.VehicleAlert, len(payload.Alerts))
Expand Down Expand Up @@ -80,7 +80,7 @@ func (p *MQTTProducer) processVehicleAlerts(rec *telemetry.Record, payload *prot
return tokens, nil
}

func (p *MQTTProducer) processVehicleErrors(rec *telemetry.Record, payload *protos.VehicleErrors) ([]pahomqtt.Token, error) {
func (p *Producer) processVehicleErrors(rec *telemetry.Record, payload *protos.VehicleErrors) ([]pahomqtt.Token, error) {
var tokens []pahomqtt.Token

for _, vehicleError := range payload.Errors {
Expand All @@ -99,7 +99,7 @@ func (p *MQTTProducer) processVehicleErrors(rec *telemetry.Record, payload *prot
return tokens, nil
}

func (p *MQTTProducer) processVehicleConnectivity(rec *telemetry.Record, payload *protos.VehicleConnectivity) ([]pahomqtt.Token, error) {
func (p *Producer) processVehicleConnectivity(rec *telemetry.Record, payload *protos.VehicleConnectivity) ([]pahomqtt.Token, error) {
topicName := fmt.Sprintf("%s/%s/connectivity", p.config.TopicBase, rec.Vin)
value := map[string]interface{}{
"ConnectionId": payload.GetConnectionId(),
Expand Down Expand Up @@ -149,7 +149,7 @@ func vehicleErrorToMqttMap(vehicleError *protos.VehicleError) map[string]interfa
}

// PayloadToMap transforms a Payload into a map for mqtt purposes
func (p *MQTTProducer) payloadToMap(payload *protos.Payload) map[string]interface{} {
func (p *Producer) payloadToMap(payload *protos.Payload) map[string]interface{} {
convertedPayload := make(map[string]interface{}, len(payload.Data))
for _, datum := range payload.Data {
convertedPayload[datum.Key.String()] = getDatumValue(datum.Value)
Expand Down
24 changes: 12 additions & 12 deletions datastore/mqtt/mqtt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func resetPublishedTopics() {
publishedTopics = make(map[string][]byte)
}

func mockPahoNewClient(o *pahomqtt.ClientOptions) pahomqtt.Client {
func mockPahoNewClient(_ *pahomqtt.ClientOptions) pahomqtt.Client {
return &MockMQTTClient{

ConnectFunc: func() pahomqtt.Token {
Expand All @@ -117,10 +117,10 @@ func mockPahoNewClient(o *pahomqtt.ClientOptions) pahomqtt.Client {
IsConnectedFunc: func() bool {
return true
},
PublishFunc: func(topic string, qos byte, retained bool, payload interface{}) pahomqtt.Token {
PublishFunc: func(topic string, _ byte, _ bool, payload interface{}) pahomqtt.Token {
publishedTopics[topic] = payload.([]byte)
return &MockToken{
WaitTimeoutFunc: func(d time.Duration) bool { return true },
WaitTimeoutFunc: func(_ time.Duration) bool { return true },
WaitFunc: func() bool { return true },
ErrorFunc: func() error { return nil },
}
Expand Down Expand Up @@ -329,10 +329,10 @@ var _ = Describe("MQTTProducer", func() {

var alert1Current, alert2Current map[string]interface{}
var alert1History, alert2History []map[string]interface{}
json.Unmarshal(publishedTopics[alert1CurrentTopic], &alert1Current)
json.Unmarshal(publishedTopics[alert1HistoryTopic], &alert1History)
json.Unmarshal(publishedTopics[alert2CurrentTopic], &alert2Current)
json.Unmarshal(publishedTopics[alert2HistoryTopic], &alert2History)
Expect(json.Unmarshal(publishedTopics[alert1CurrentTopic], &alert1Current)).NotTo(HaveOccurred())
Expect(json.Unmarshal(publishedTopics[alert1HistoryTopic], &alert1History)).NotTo(HaveOccurred())
Expect(json.Unmarshal(publishedTopics[alert2CurrentTopic], &alert2Current)).NotTo(HaveOccurred())
Expect(json.Unmarshal(publishedTopics[alert2HistoryTopic], &alert2History)).NotTo(HaveOccurred())

Expect(alert1Current).To(HaveKey("StartedAt"))
Expect(alert1Current).NotTo(HaveKey("EndedAt"))
Expand Down Expand Up @@ -405,8 +405,8 @@ var _ = Describe("MQTTProducer", func() {
Expect(publishedTopics).To(HaveKey(error2Topic))

var error1, error2 map[string]interface{}
json.Unmarshal(publishedTopics[error1Topic], &error1)
json.Unmarshal(publishedTopics[error2Topic], &error2)
Expect(json.Unmarshal(publishedTopics[error1Topic], &error1)).NotTo(HaveOccurred())
Expect(json.Unmarshal(publishedTopics[error2Topic], &error2)).NotTo(HaveOccurred())

Expect(error1).To(HaveKey("Body"))
Expect(error1["Body"]).To(Equal("This is a test error"))
Expand All @@ -422,7 +422,7 @@ var _ = Describe("MQTTProducer", func() {

It("should handle timeouts when publishing MQTT messages", func() {
// Mock a slow publish function that always times out
mqtt.PahoNewClient = func(o *pahomqtt.ClientOptions) pahomqtt.Client {
mqtt.PahoNewClient = func(_ *pahomqtt.ClientOptions) pahomqtt.Client {
return &MockMQTTClient{
ConnectFunc: func() pahomqtt.Token {
return &MockToken{
Expand All @@ -433,9 +433,9 @@ var _ = Describe("MQTTProducer", func() {
IsConnectedFunc: func() bool {
return true
},
PublishFunc: func(topic string, qos byte, retained bool, payload interface{}) pahomqtt.Token {
PublishFunc: func(_ string, _ byte, _ bool, _ interface{}) pahomqtt.Token {
return &MockToken{
WaitTimeoutFunc: func(d time.Duration) bool { return false },
WaitTimeoutFunc: func(_ time.Duration) bool { return false },
WaitFunc: func() bool { return false },
ErrorFunc: func() error { return pahomqtt.TimedOut },
}
Expand Down
2 changes: 1 addition & 1 deletion test/integration/mqtt_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func NewTestMQTTConsumer(broker, topic string, logger *logrus.Logger) (*TestMQTT
return consumer, nil
}

func (t *TestMQTTConsumer) messageHandler(client pahomqtt.Client, msg pahomqtt.Message) {
func (t *TestMQTTConsumer) messageHandler(_ pahomqtt.Client, msg pahomqtt.Message) {
t.msgChan <- msg.Payload()
}

Expand Down

0 comments on commit 9f3403f

Please sign in to comment.