Skip to content

Commit

Permalink
Merge pull request #512 from ChIoT-Tech/master
Browse files Browse the repository at this point in the history
Resolve rare deadlock caused by loss of connection (in particular circumstances)
  • Loading branch information
MattBrittan authored Jun 3, 2021
2 parents 1c925c4 + 9995b72 commit 88d5334
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 1 deletion.
17 changes: 16 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,22 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet
commsIncomingPub = nil
continue
}
incomingPubChan <- pub
// Care is needed here because an error elsewhere could trigger a deadlock
sendPubLoop:
for {
select {
case incomingPubChan <- pub:
break sendPubLoop
case err, ok := <-commsErrors:
if !ok { // commsErrors has been closed so we can ignore it
commsErrors = nil
continue
}
ERROR.Println(CLI, "Connect comms goroutine - error triggered during send Pub", err)
c.internalConnLost(err) // no harm in calling this if the connection is already down (or shutdown is in progress)
continue
}
}
case err, ok := <-commsErrors:
if !ok {
commsErrors = nil
Expand Down
101 changes: 101 additions & 0 deletions fvt_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ package mqtt

import (
"bytes"
"context"
"fmt"
"runtime"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -1444,3 +1447,101 @@ func Test_ResumeSubsWithReconnect(t *testing.T) {

c.Disconnect(250)
}

// Issue 209 - occasional deadlock when connections are lost unexpectedly
// This was quite a nasty deadlock which occurred in very rare circumstances; I could not come up with a reliable way of
// replicating this but the below would cause it to happen fairly consistently (when the test was run a decent number
// of times). Following the fix it ran 10,000 times without issue.
// go test -count 10000 -run DisconnectWhileProcessingIncomingPublish
func Test_DisconnectWhileProcessingIncomingPublish(t *testing.T) {
topic := "/test/DisconnectWhileProcessingIncomingPublish"

pops := NewClientOptions()
pops.AddBroker(FVTTCP)
// pops.SetOrderMatters(false) // Not really needed but consistent...
pops.SetClientID("dwpip-pub")
p := NewClient(pops)

sops := NewClientOptions()
sops.AddBroker(FVTTCP)
sops.SetAutoReconnect(false) // We dont want the connection to be re-established
sops.SetWriteTimeout(500 * time.Millisecond) // We will be sending a lot of publish messages and want go routines to clear...
// sops.SetOrderMatters(false)
sops.SetClientID("dwpip-sub")
// We need to know when the subscriber has lost its connection (this indicates that the deadlock has not occured)
sDisconnected := make(chan struct{})
sops.SetConnectionLostHandler(func(Client, error) { close(sDisconnected) })

msgReceived := make(chan struct{})
var oneMsgReceived sync.Once
var f MessageHandler = func(client Client, msg Message) {
// No need to do anything when message received (just want ACK sent ASAP)
oneMsgReceived.Do(func() { close(msgReceived) })
}

s := NewClient(sops).(*client) // s = subscriber
if sToken := s.Connect(); sToken.Wait() && sToken.Error() != nil {
t.Fatalf("Error on subscriber Client.Connect(): %v", sToken.Error())
}

if sToken := s.Subscribe(topic, 1, f); sToken.Wait() && sToken.Error() != nil {
t.Fatalf("Error on subscriber Client.Subscribe(): %v", sToken.Error())
}

// Use a go routine to swamp the broker with messages
if pToken := p.Connect(); pToken.Wait() && pToken.Error() != nil { // p = publisher
t.Fatalf("Error on publisher Client.Connect(): %v", pToken.Error())
}
// We will hammer both the publisher and subscriber with messages
ctx, cancel := context.WithCancel(context.Background())
pubDone := make(chan struct{})
go func() {
defer close(pubDone)
i := 0
for {
p.Publish(topic, 1, false, fmt.Sprintf("test message: %d", i))
// After the connection goes down s.Publish will start blocking (this is not ideal but fixing its a problem for another time)
go func() { s.Publish(topic+"IGNORE", 1, false, fmt.Sprintf("test message: %d", i)) }()
i++

if ctx.Err() != nil {
return
}
}
}()

// Wait until we have received a message (ensuring that the stream of messages has started)
select {
case <-msgReceived: // All good
case <-time.After(time.Second):
t.Errorf("no messages received")
}

// We need the connection to drop; unfortunately using any internal method (`s.conn.Close()` etc) will hide the
// behaviour because any calls to Read/Write will return immediately. So we just ask the broker to disconnect..
dm := packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket)
err := dm.Write(s.conn)
if err != nil {
t.Fatalf("error dending disconnect packet: %s", err)
}

// Lets give the library up to a second to shutdown (indicated by the status changing)
select {
case <-sDisconnected: // All good
case <-time.After(time.Second):
cancel() // no point leaving publisher running
time.Sleep(time.Second) // Allow publish calls to timeout (otherwise there will be tons of go routines running!)
buf := make([]byte, 1<<20)
stacklen := runtime.Stack(buf, true)
t.Fatalf("connection was not lost as expected - probable deadlock. Stacktrace follows: %s", buf[:stacklen])
}

cancel() // no point leaving publisher running

select {
case <-pubDone:
case <-time.After(time.Second):
t.Errorf("pubdone not closed within a second")
}
p.Disconnect(250) // Close publisher
}

0 comments on commit 88d5334

Please sign in to comment.