This package implements an MQTT (Message Queuing Telemetry Transport) producer for the Fleet Telemetry system. MQTT is particularly well-suited for fleet telemetry systems due to its lightweight, publish-subscribe architecture.
The MQTT datastore allows the Fleet Telemetry system to publish vehicle data, alerts, errors and connectivity to an MQTT broker. It uses the Paho MQTT client library for Go and implements the telemetry.Producer
interface.
-
Separate topics for different data types: We use distinct topic structures for metrics, alerts, errors and connectivity to allow easy filtering and processing by subscribers.
-
Individual field publishing: Each metric field is published as a separate MQTT message, allowing for granular updates and subscriptions.
-
Current state and history for alerts: We maintain both the current state and history of alerts, supporting both clients that require real-time monitoring and clients that require historical analysis.
-
Configurable QoS and retention: The MQTT QoS level and message retention can be configured to balance between performance and reliability.
-
Reliable acknowledgment support: The producer supports reliable acknowledgment for specified transaction types. However, it's important to note that the entire packet from the vehicle will be not be acknowledged if any of the related MQTT publish operations fail. This ensures data integrity by preventing partial updates and allows for retrying the complete set of data in case of any publishing issues.
The MQTT producer is configured using a JSON object with the following fields:
broker
: (string) The MQTT broker "host:port". (for example "localhost:1883")client_id
: (string) A unique identifier for the MQTT client.username
: (string) The username for MQTT broker authentication. (optional)password
: (string) The password for MQTT broker authentication. (optional)topic_base
: (string) The base topic for all MQTT messages.qos
: (number) The Quality of Service level (0, 1, or 2). Default: 0retained
: (boolean) Whether messages should be retained by the broker. Default: falseconnect_timeout_ms
: (number) Connection timeout in milliseconds. Default: 30000publish_timeout_ms
: (number) Publish operation timeout in milliseconds. Default: 2500disconnect_timeout_ms
: (number) Disconnection timeout in milliseconds. Default: 250connect_retry_interval_ms
: (number) Interval between connection retry attempts in milliseconds. Default: 10000keep_alive_seconds
: (number) Keep-alive interval in seconds. Default: 30
Example configuration:
{
"mqtt": {
"broker": "localhost:1883",
"client_id": "fleet-telemetry",
"username": "your_username",
"password": "your_password",
"topic_base": "telemetry",
"qos": 1,
"retained": false,
"connect_timeout_ms": 30000,
"publish_timeout_ms": 2500,
"disconnect_timeout_ms": 250,
"connect_retry_interval_ms": 10000,
"keep_alive_seconds": 30
}
}
The MQTT producer will use default values for any omitted fields as specified above.
- Metrics:
<topic_base>/<VIN>/v/<field_name>
- Alerts (current state):
<topic_base>/<VIN>/alerts/<alert_name>/current
- Alerts (history):
<topic_base>/<VIN>/alerts/<alert_name>/history
- Errors:
<topic_base>/<VIN>/errors/<error_name>
- Connectivity:
<topic_base>/<VIN>/connectivity
All payloads are JSON encoded. Please note that the metric field values are also JSON encoded.
- Metrics:
<field_value>
- Alerts:
{"Name": <string>, "StartedAt": <timestamp>, "EndedAt": <timestamp>, "Audiences": [<string>]}
- Errors:
{"Name": <string>, "Body": <string>, "Tags": {<string>: <string>}, "CreatedAt": <timestamp>}
- Connectivity:
{"ConnectionId": <string>, "Status": <string>, "CreatedAt": <timestamp>}
Note: The field contents and type are determined by the car. Fields may have their types updated with different software and vehicle versions to optimize for precision or space. For example, a float value like the vehicle's speed might be received as 12.3 (numeric) in one version and as "12.3" (string) in another version.
- The producer implements reconnection logic with configurable retry intervals.
- Publish operations have a configurable timeout to prevent blocking indefinitely.
- The producer supports reliable acknowledgment for specified transaction types, ensuring critical data is not lost.
- Each field is published as a separate MQTT message, which can increase network traffic but allows for more granular subscriptions.
- QoS levels can be configured to balance between performance and reliability.
- The producer uses goroutines to handle message publishing asynchronously.