Skip to content

Commit

Permalink
Fix failed test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
wind-c committed Dec 14, 2023
1 parent 1a9fa37 commit 5ec4ce2
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 20 deletions.
2 changes: 1 addition & 1 deletion cluster/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ func (a *Agent) processOutboundConnect(pk *packets.Packet) {
// pickNodes pick nodes, if the filter is shared, select a node at random
func (a *Agent) pickNodes(filter string, sharedFilters map[string]bool) (ns []string) {
tmpNs := a.raftPeer.Lookup(filter)
if len(tmpNs) == 0 {
if tmpNs == nil || len(tmpNs) == 0 {
return ns
}

Expand Down
18 changes: 12 additions & 6 deletions cluster/raft/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func (k *KV) Add(key, value string) (new bool) {
}

// Del return true if the array corresponding to key is deleted
// If the value is "", the key-values pair is deleted
func (k *KV) Del(key, value string) (empty bool) {
k.Lock()
defer k.Unlock()
Expand All @@ -74,29 +75,34 @@ func (k *KV) Del(key, value string) (empty bool) {
}
}

if value == "" || len(vs) == 0 {
if value == "" || len(k.data[key]) == 0 {
delete(k.data, key)
empty = true
}
}
return
}

// DelByValue delete the specified value from the key-values array
// and delete the key-value pair if the key-values array is empty
func (k *KV) DelByValue(value string) int {
k.Lock()
defer k.Unlock()
c := 0
if value == "" {
return c
}

for f, vs := range k.data {
for i, v := range vs {
if v == value {
if len(vs) == 1 {
delete(k.data, f)
} else {
k.data[f] = append(vs[:i], vs[i+1:]...)
}
k.data[f] = append(vs[:i], vs[i+1:]...)
c++
}
}
if len(k.data[f]) == 0 {
delete(k.data, f)
}
}
return c
}
61 changes: 61 additions & 0 deletions cluster/raft/kv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package raft

import (
"github.com/stretchr/testify/require"
"testing"
)

func TestKV_Add(t *testing.T) {
kv := NewKV()

new := kv.Add("key1", "value1")
require.Equal(t, true, new)
new = kv.Add("key1", "value2")
require.Equal(t, false, new)
ln := len(kv.Get("key1"))
require.Equal(t, 2, ln)

new = kv.Add("key2", "value1")
ln = len(kv.Get("key1"))
require.Equal(t, true, new)
require.Equal(t, 2, ln)
}

func TestKV_Del(t *testing.T) {
kv := NewKV()

kv.Add("key1", "value1")
kv.Add("key1", "value2")
ln := len(kv.Get("key1"))
require.Equal(t, 2, ln)

empty := kv.Del("key1", "value3")
require.Equal(t, false, empty)
empty = kv.Del("key1", "value2")
require.Equal(t, false, empty)
empty = kv.Del("key1", "value1")
require.Equal(t, true, empty)
}

func TestKV_DelByValue(t *testing.T) {
kv := NewKV()

// Test for value exists in multiple keys
kv.Add("key1", "value1")
kv.Add("key2", "value1")
kv.Add("key3", "value1")
c := kv.DelByValue("value1")
require.Equal(t, 3, c)
vs := kv.Get("key1")
require.Empty(t, nil, vs)
ln := len(kv.data)
require.Equal(t, 0, ln)

kv.Add("key4", "value4")
kv.Add("key5", "value5")
kv.DelByValue("value4")
ln = len(kv.data)
require.Equal(t, 1, ln)
vs = kv.Get("key5")
require.EqualValues(t, []string{"value5"}, vs)
}
18 changes: 18 additions & 0 deletions cluster/topics/trie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,3 +313,21 @@ func BenchmarkIsolateParticle(b *testing.B) {
isolateParticle("path/to/my/mqtt", 3)
}
}

func TestConvertSharedFilter(t *testing.T) {
srcFilter := "$share/group1/filter/1"

group, destFilter := convertSharedFilter(srcFilter)
require.Equal(t, "group1/", group)
require.Equal(t, "filter/1", destFilter)
}

func TestRestoreShareFilter(t *testing.T) {
filter := "filter"
sharedGroups := []string{"Group1/", "Group2/", "Group3/"}

result := restoreShareFilter(filter, sharedGroups)
expectedResult := []string{"$share/Group1/filter", "$share/Group2/filter", "$share/Group3/filter"}

require.EqualValues(t, expectedResult, result)
}
26 changes: 13 additions & 13 deletions mqtt/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1689,7 +1689,7 @@ func TestPublishToSubscribersSelfNoLocal(t *testing.T) {
go func() {
pkx := *packets.TPacketData[packets.Publish].Get(packets.TPublishBasic).Packet
pkx.Origin = cl.ID
s.PublishToSubscribers(pkx)
s.publishToSubscribers(pkx)
time.Sleep(time.Millisecond)
_ = w.Close()
}()
Expand Down Expand Up @@ -1747,7 +1747,7 @@ func TestPublishToSubscribers(t *testing.T) {
}()

go func() {
s.PublishToSubscribers(*packets.TPacketData[packets.Publish].Get(packets.TPublishBasic).Packet)
s.publishToSubscribers(*packets.TPacketData[packets.Publish].Get(packets.TPublishBasic).Packet)
time.Sleep(time.Millisecond)
_ = w1.Close()
_ = w2.Close()
Expand Down Expand Up @@ -1791,7 +1791,7 @@ func TestPublishToSubscribersMessageExpiryDelta(t *testing.T) {
go func() {
pkx := *packets.TPacketData[packets.Publish].Get(packets.TPublishBasic).Packet
pkx.Created = time.Now().Unix() - 30
s.PublishToSubscribers(pkx)
s.publishToSubscribers(pkx)
time.Sleep(time.Millisecond)
_ = w1.Close()
}()
Expand All @@ -1815,7 +1815,7 @@ func TestPublishToSubscribersIdentifiers(t *testing.T) {
require.True(t, subbed)

go func() {
s.PublishToSubscribers(*packets.TPacketData[packets.Publish].Get(packets.TPublishBasic).Packet)
s.publishToSubscribers(*packets.TPacketData[packets.Publish].Get(packets.TPublishBasic).Packet)
time.Sleep(time.Millisecond)
_ = w.Close()
}()
Expand All @@ -1840,7 +1840,7 @@ func TestPublishToSubscribersPkIgnore(t *testing.T) {
go func() {
pk := *packets.TPacketData[packets.Publish].Get(packets.TPublishBasic).Packet
pk.Ignore = true
s.PublishToSubscribers(pk)
s.publishToSubscribers(pk)
time.Sleep(time.Millisecond)
_ = w.Close()
}()
Expand Down Expand Up @@ -2071,7 +2071,7 @@ func TestPublishToSubscribersExhaustedSendQuota(t *testing.T) {
_ = r.Close()
pkx := *packets.TPacketData[packets.Publish].Get(packets.TPublishQos1).Packet
pkx.PacketID = 0
s.PublishToSubscribers(pkx)
s.publishToSubscribers(pkx)
time.Sleep(time.Millisecond)
_ = w.Close()
}
Expand All @@ -2093,7 +2093,7 @@ func TestPublishToSubscribersExhaustedPacketIDs(t *testing.T) {
_ = r.Close()
pkx := *packets.TPacketData[packets.Publish].Get(packets.TPublishQos1).Packet
pkx.PacketID = 0
s.PublishToSubscribers(pkx)
s.publishToSubscribers(pkx)
time.Sleep(time.Millisecond)
_ = w.Close()
}
Expand All @@ -2109,7 +2109,7 @@ func TestPublishToSubscribersNoConnection(t *testing.T) {
// coverage: subscriber publish errors are non-returnable
// can we hook into zerolog ?
_ = r.Close()
s.PublishToSubscribers(*packets.TPacketData[packets.Publish].Get(packets.TPublishBasic).Packet)
s.publishToSubscribers(*packets.TPacketData[packets.Publish].Get(packets.TPublishBasic).Packet)
time.Sleep(time.Millisecond)
_ = w.Close()
}
Expand Down Expand Up @@ -2566,7 +2566,7 @@ func TestServerProcessOutboundQos2Flow(t *testing.T) {
}()

if i == 0 {
s.PublishToSubscribers(*tx.in.Packet)
s.publishToSubscribers(*tx.in.Packet)
} else {
err := s.processPacket(cl, *tx.in.Packet)
require.NoError(t, err)
Expand Down Expand Up @@ -3495,7 +3495,7 @@ func TestPublishToInlineSubscriber(t *testing.T) {

go func() {
pkx := *packets.TPacketData[packets.Publish].Get(packets.TPublishBasic).Packet
s.PublishToSubscribers(pkx)
s.publishToSubscribers(pkx)
}()

require.Equal(t, true, <-finishCh)
Expand Down Expand Up @@ -3528,10 +3528,10 @@ func TestPublishToInlineSubscribersDifferentFilter(t *testing.T) {

go func() {
pkx := *packets.TPacketData[packets.Publish].Get(packets.TPublishBasic).Packet
s.PublishToSubscribers(pkx)
s.publishToSubscribers(pkx)

pkx = *packets.TPacketData[packets.Publish].Get(packets.TPublishCopyBasic).Packet
s.PublishToSubscribers(pkx)
s.publishToSubscribers(pkx)
}()

for i := 0; i < subNumber; i++ {
Expand Down Expand Up @@ -3566,7 +3566,7 @@ func TestPublishToInlineSubscribersDifferentIdentifier(t *testing.T) {

go func() {
pkx := *packets.TPacketData[packets.Publish].Get(packets.TPublishBasic).Packet
s.PublishToSubscribers(pkx)
s.publishToSubscribers(pkx)
}()

for i := 0; i < subNumber; i++ {
Expand Down

0 comments on commit 5ec4ce2

Please sign in to comment.