Skip to content

Commit

Permalink
Resolve rare deadlock that could occur when network error occurs whil…
Browse files Browse the repository at this point in the history
…e multiple other operations are in progress.

Ref #509
  • Loading branch information
MattBrittan committed Jun 3, 2021
1 parent 1c925c4 commit 9995b72
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 1 deletion.
17 changes: 16 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,22 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet
commsIncomingPub = nil
continue
}
incomingPubChan <- pub
// Care is needed here because an error elsewhere could trigger a deadlock
sendPubLoop:
for {
select {
case incomingPubChan <- pub:
break sendPubLoop
case err, ok := <-commsErrors:
if !ok { // commsErrors has been closed so we can ignore it
commsErrors = nil
continue
}
ERROR.Println(CLI, "Connect comms goroutine - error triggered during send Pub", err)
c.internalConnLost(err) // no harm in calling this if the connection is already down (or shutdown is in progress)
continue
}
}
case err, ok := <-commsErrors:
if !ok {
commsErrors = nil
Expand Down
101 changes: 101 additions & 0 deletions fvt_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ package mqtt

import (
"bytes"
"context"
"fmt"
"runtime"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -1444,3 +1447,101 @@ func Test_ResumeSubsWithReconnect(t *testing.T) {

c.Disconnect(250)
}

// Issue 209 - occasional deadlock when connections are lost unexpectedly
// This was quite a nasty deadlock which occurred in very rare circumstances; I could not come up with a reliable way of
// replicating this but the below would cause it to happen fairly consistently (when the test was run a decent number
// of times). Following the fix it ran 10,000 times without issue.
// go test -count 10000 -run DisconnectWhileProcessingIncomingPublish
func Test_DisconnectWhileProcessingIncomingPublish(t *testing.T) {
topic := "/test/DisconnectWhileProcessingIncomingPublish"

pops := NewClientOptions()
pops.AddBroker(FVTTCP)
// pops.SetOrderMatters(false) // Not really needed but consistent...
pops.SetClientID("dwpip-pub")
p := NewClient(pops)

sops := NewClientOptions()
sops.AddBroker(FVTTCP)
sops.SetAutoReconnect(false) // We dont want the connection to be re-established
sops.SetWriteTimeout(500 * time.Millisecond) // We will be sending a lot of publish messages and want go routines to clear...
// sops.SetOrderMatters(false)
sops.SetClientID("dwpip-sub")
// We need to know when the subscriber has lost its connection (this indicates that the deadlock has not occured)
sDisconnected := make(chan struct{})
sops.SetConnectionLostHandler(func(Client, error) { close(sDisconnected) })

msgReceived := make(chan struct{})
var oneMsgReceived sync.Once
var f MessageHandler = func(client Client, msg Message) {
// No need to do anything when message received (just want ACK sent ASAP)
oneMsgReceived.Do(func() { close(msgReceived) })
}

s := NewClient(sops).(*client) // s = subscriber
if sToken := s.Connect(); sToken.Wait() && sToken.Error() != nil {
t.Fatalf("Error on subscriber Client.Connect(): %v", sToken.Error())
}

if sToken := s.Subscribe(topic, 1, f); sToken.Wait() && sToken.Error() != nil {
t.Fatalf("Error on subscriber Client.Subscribe(): %v", sToken.Error())
}

// Use a go routine to swamp the broker with messages
if pToken := p.Connect(); pToken.Wait() && pToken.Error() != nil { // p = publisher
t.Fatalf("Error on publisher Client.Connect(): %v", pToken.Error())
}
// We will hammer both the publisher and subscriber with messages
ctx, cancel := context.WithCancel(context.Background())
pubDone := make(chan struct{})
go func() {
defer close(pubDone)
i := 0
for {
p.Publish(topic, 1, false, fmt.Sprintf("test message: %d", i))
// After the connection goes down s.Publish will start blocking (this is not ideal but fixing its a problem for another time)
go func() { s.Publish(topic+"IGNORE", 1, false, fmt.Sprintf("test message: %d", i)) }()
i++

if ctx.Err() != nil {
return
}
}
}()

// Wait until we have received a message (ensuring that the stream of messages has started)
select {
case <-msgReceived: // All good
case <-time.After(time.Second):
t.Errorf("no messages received")
}

// We need the connection to drop; unfortunately using any internal method (`s.conn.Close()` etc) will hide the
// behaviour because any calls to Read/Write will return immediately. So we just ask the broker to disconnect..
dm := packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket)
err := dm.Write(s.conn)
if err != nil {
t.Fatalf("error dending disconnect packet: %s", err)
}

// Lets give the library up to a second to shutdown (indicated by the status changing)
select {
case <-sDisconnected: // All good
case <-time.After(time.Second):
cancel() // no point leaving publisher running
time.Sleep(time.Second) // Allow publish calls to timeout (otherwise there will be tons of go routines running!)
buf := make([]byte, 1<<20)
stacklen := runtime.Stack(buf, true)
t.Fatalf("connection was not lost as expected - probable deadlock. Stacktrace follows: %s", buf[:stacklen])
}

cancel() // no point leaving publisher running

select {
case <-pubDone:
case <-time.After(time.Second):
t.Errorf("pubdone not closed within a second")
}
p.Disconnect(250) // Close publisher
}

0 comments on commit 9995b72

Please sign in to comment.