Skip to content

Commit

Permalink
Merge pull request #481 from ChIoT-Tech/master
Browse files Browse the repository at this point in the history
Documentation update (addressing #476 and #474). Also update docker sample to support mosquitto v2+
  • Loading branch information
MattBrittan authored Jan 28, 2021
2 parents eb1be68 + 9071b28 commit 89c3cd2
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 16 deletions.
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,12 @@ func main() {

* Seemingly random disconnections may be caused by another client connecting to the broker with the same client
identifier; this is as per the [spec](https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc384800405).
* A `MessageHandler` (called when a new message is received) must not block. If you wish to perform a long-running task,
or publish a message, then please use a go routine (blocking in the handler is a common cause of unexpected `pingresp
* Unless ordered delivery of messages is essential (and you have configured your broker to support this e.g.
`max_inflight_messages=1` in mosquitto) then set `ClientOptions.SetOrderMatters(false)`. Doing so will avoid the
below issue (deadlocks due to blocking message handlers).
* A `MessageHandler` (called when a new message is received) must not block (unless
`ClientOptions.SetOrderMatters(false)` set). If you wish to perform a long-running task, or publish a message, then
please use a go routine (blocking in the handler is a common cause of unexpected `pingresp
not received, disconnecting` errors).
* When QOS1+ subscriptions have been created previously and you connect with `CleanSession` set to false it is possible that the broker will deliver retained
messages before `Subscribe` can be called. To process these messages either configure a handler with `AddRoute` or
Expand Down
44 changes: 37 additions & 7 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ const (
// information can be found in their respective documentation.
// Numerous connection options may be specified by configuring a
// and then supplying a ClientOptions type.
// Implementations of Client must be safe for concurrent use by multiple
// goroutines
type Client interface {
// IsConnected returns a bool signifying whether
// the client is connected or not.
Expand All @@ -75,26 +77,44 @@ type Client interface {
// Returns a token to track delivery of the message to the broker
Publish(topic string, qos byte, retained bool, payload interface{}) Token
// Subscribe starts a new subscription. Provide a MessageHandler to be executed when
// a message is published on the topic provided, or nil for the default handler
// a message is published on the topic provided, or nil for the default handler.
//
// If `options.OrderMatters` is true (the default) then `callback` must not block or
// call functions within this package that may block (e.g. `Publish`) other than in
// a new go routine.
// `callback` must be safe for concurrent use by multiple goroutines.
Subscribe(topic string, qos byte, callback MessageHandler) Token
// SubscribeMultiple starts a new subscription for multiple topics. Provide a MessageHandler to
// be executed when a message is published on one of the topics provided, or nil for the
// default handler
// default handler.
//
// If `options.OrderMatters` is true (the default) then `callback` must not block or
// call functions within this package that may block (e.g. `Publish`) other than in
// a new go routine.
// `callback` must be safe for concurrent use by multiple goroutines.
SubscribeMultiple(filters map[string]byte, callback MessageHandler) Token
// Unsubscribe will end the subscription from each of the topics provided.
// Messages published to those topics from other clients will no longer be
// received.
Unsubscribe(topics ...string) Token
// AddRoute allows you to add a handler for messages on a specific topic
// without making a subscription. For example having a different handler
// for parts of a wildcard subscription
// for parts of a wildcard subscription or for receiving retained messages
// upon connection (before Sub scribe can be processed).
//
// If `options.OrderMatters` is true (the default) then `callback` must not block or
// call functions within this package that may block (e.g. `Publish`) other than in
// a new go routine.
// `callback` must be safe for concurrent use by multiple goroutines.
AddRoute(topic string, callback MessageHandler)
// OptionsReader returns a ClientOptionsReader which is a copy of the clientoptions
// in use by the client.
OptionsReader() ClientOptionsReader
}

// client implements the Client interface
// clients are safe for concurrent use by multiple
// goroutines
type client struct {
lastSent atomic.Value // time.Time - the last time a packet was successfully sent to network
lastReceived atomic.Value // time.Time - the last time a packet was successfully received from network
Expand Down Expand Up @@ -153,6 +173,11 @@ func NewClient(o *ClientOptions) Client {
// AddRoute allows you to add a handler for messages on a specific topic
// without making a subscription. For example having a different handler
// for parts of a wildcard subscription
//
// If `options.OrderMatters` is true (the default) then `callback` must not block or
// call functions within this package that may block (e.g. `Publish`) other than in
// a new go routine.
// `callback` must be safe for concurrent use by multiple goroutines.
func (c *client) AddRoute(topic string, callback MessageHandler) {
if callback != nil {
c.msgRouter.addRoute(topic, callback)
Expand Down Expand Up @@ -686,10 +711,10 @@ func (c *client) Publish(topic string, qos byte, retained bool, payload interfac
// Subscribe starts a new subscription. Provide a MessageHandler to be executed when
// a message is published on the topic provided.
//
// Please note: you should try to keep the execution time of the callback to be
// as low as possible, especially when SetOrderMatters(true) (the default) is in
// place. Blocking calls in message handlers might otherwise delay delivery to
// other message handlers.
// If `options.OrderMatters` is true (the default) then `callback` must not block or
// call functions within this package that may block (e.g. `Publish`) other than in
// a new go routine.
// `callback` must be safe for concurrent use by multiple goroutines.
func (c *client) Subscribe(topic string, qos byte, callback MessageHandler) Token {
token := newToken(packets.Subscribe).(*SubscribeToken)
DEBUG.Println(CLI, "enter Subscribe")
Expand Down Expand Up @@ -766,6 +791,11 @@ func (c *client) Subscribe(topic string, qos byte, callback MessageHandler) Toke

// SubscribeMultiple starts a new subscription for multiple topics. Provide a MessageHandler to
// be executed when a message is published on one of the topics provided.
//
// If `options.OrderMatters` is true (the default) then `callback` must not block or
// call functions within this package that may block (e.g. `Publish`) other than in
// a new go routine.
// `callback` must be safe for concurrent use by multiple goroutines.
func (c *client) SubscribeMultiple(filters map[string]byte, callback MessageHandler) Token {
var err error
token := newToken(packets.Subscribe).(*SubscribeToken)
Expand Down
5 changes: 3 additions & 2 deletions cmd/docker/binds/mosquitto/config/mosquitto.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@

allow_anonymous true # Anyone can connect

# Port to use for the default listener.
#port 1883
# Mosquitto v2+ requires that a listener be definer (otherwise it listens on loopback)
listener 1883


#log_type error
#log_type warning
Expand Down
6 changes: 6 additions & 0 deletions cmd/docker/publisher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ const (
SERVERADDRESS = "tcp://mosquitto:1883"
DELAY = time.Second
CLIENTID = "mqtt_publisher"

WRITETOLOG = true // If true then published messages will be written to the console
)

func main() {
Expand All @@ -32,6 +34,7 @@ func main() {
opts.AddBroker(SERVERADDRESS)
opts.SetClientID(CLIENTID)

opts.SetOrderMatters(false) // Allow out of order messages (use this option unless in order delivery is essential)
opts.ConnectTimeout = time.Second // Minimal delays on connect
opts.WriteTimeout = time.Second // Minimal delays on writes
opts.KeepAlive = 10 // Keepalive every 10 seconds so we quickly detect network outages
Expand Down Expand Up @@ -85,6 +88,9 @@ func main() {
panic(err)
}

if WRITETOLOG {
fmt.Printf("sending message: %s\n", msg)
}
t := client.Publish(TOPIC, QOS, false, msg)
// Handle the token in a go routine so this loop keeps sending messages regardless of delivery status
go func() {
Expand Down
1 change: 1 addition & 0 deletions cmd/docker/subscriber/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func main() {
opts.AddBroker(SERVERADDRESS)
opts.SetClientID(CLIENTID)

opts.SetOrderMatters(false) // Allow out of order messages (use this option unless in order delivery is essential)
opts.ConnectTimeout = time.Second // Minimal delays on connect
opts.WriteTimeout = time.Second // Minimal delays on writes
opts.KeepAlive = 10 // Keepalive every 10 seconds so we quickly detect network outages
Expand Down
17 changes: 15 additions & 2 deletions fvt/docker/runTests.cmd
Original file line number Diff line number Diff line change
@@ -1,9 +1,22 @@
@ECHO OFF
REM Windows CMD file to run golang Paho tests with docker mosquitto instance
cls
:start
docker-compose up -d
REM Docker for windows does not support publishing to 127.0.0.1 so set the address for the tests to use.
set TEST_FVT_ADDR=0.0.0.0
go test -v ../../
REM `--count 1` prevents the system from using cached results. Note that running the tests multiple times may fail
REM because the broker state will not be as expected
go test --count 1 -v ../../
rem go test -race -v ../../
docker-compose down
IF ERRORLEVEL 1 GOTO failed
GOTO successful

:failed
docker-compose down
echo "Error"
exit

:successful
docker-compose down
REM goto start
15 changes: 12 additions & 3 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ type OnConnectHandler func(Client)
// the initial connection is lost
type ReconnectHandler func(Client, *ClientOptions)

// ClientOptions contains configurable options for an Client.
// ClientOptions contains configurable options for an Client. Note that these should be set using the
// relevant methods (e.g. `AddBroker`) rather than directly. See those functions for information on usage.
type ClientOptions struct {
Servers []*url.URL
ClientID string
Expand Down Expand Up @@ -89,7 +90,7 @@ type ClientOptions struct {
// default values.
// Port: 1883
// CleanSession: True
// Order: True
// Order: True (note: it is recommended that this be set to FALSE unless order is important)
// KeepAlive: 30 (seconds)
// ConnectTimeout: 30 (seconds)
// MaxReconnectInterval 10 (minutes)
Expand Down Expand Up @@ -203,10 +204,13 @@ func (o *ClientOptions) SetCleanSession(clean bool) *ClientOptions {
}

// SetOrderMatters will set the message routing to guarantee order within
// each QoS level. By default, this value is true. If set to false,
// each QoS level. By default, this value is true. If set to false (recommended),
// this flag indicates that messages can be delivered asynchronously
// from the client to the application and possibly arrive out of order.
// Specifically, the message handler is called in its own go routine.
// Note that setting this to true does not guarantee in-order delivery
// (this is subject to broker settings like `max_inflight_messages=1` in mosquitto)
// and if `true` then handlers must not block.
func (o *ClientOptions) SetOrderMatters(order bool) *ClientOptions {
o.Order = order
return o
Expand Down Expand Up @@ -286,6 +290,11 @@ func (o *ClientOptions) SetBinaryWill(topic string, payload []byte, qos byte, re

// SetDefaultPublishHandler sets the MessageHandler that will be called when a message
// is received that does not match any known subscriptions.
//
// If `options.OrderMatters` is true (the default) then `defaultHandler` must not block or
// call functions within this package that may block (e.g. `Publish`) other than in
// a new go routine.
// `defaultHandler` must be safe for concurrent use by multiple goroutines.
func (o *ClientOptions) SetDefaultPublishHandler(defaultHandler MessageHandler) *ClientOptions {
o.DefaultPublishHandler = defaultHandler
return o
Expand Down

0 comments on commit 89c3cd2

Please sign in to comment.