forked from cloudfoundry/go-diodes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
poller_test.go
82 lines (66 loc) · 1.59 KB
/
poller_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package diodes_test
import (
"context"
"sync"
"time"
"code.cloudfoundry.org/go-diodes"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = Describe("Poller", func() {
var (
spy *spyDiode
p *diodes.Poller
)
BeforeEach(func() {
spy = new(spyDiode)
p = diodes.NewPoller(spy, diodes.WithPollingInterval(time.Millisecond))
})
It("returns the available result", func() {
spy.dataList = [][]byte{[]byte("a"), []byte("b")}
Expect(*(*[]byte)(p.Next())).To(Equal([]byte("a")))
Expect(*(*[]byte)(p.Next())).To(Equal([]byte("b")))
})
It("polls the given diode until data is available", func() {
go func() {
time.Sleep(250 * time.Millisecond)
spy.mu.Lock()
defer spy.mu.Unlock()
spy.dataList = [][]byte{[]byte("a")}
}()
Expect(*(*[]byte)(p.Next())).To(Equal([]byte("a")))
})
It("cancels Next() with context", func() {
ctx, cancel := context.WithCancel(context.Background())
p = diodes.NewPoller(spy, diodes.WithPollingContext(ctx))
cancel()
done := make(chan struct{})
go func() {
defer close(done)
p.Next()
}()
Eventually(done).Should(BeClosed())
})
})
type spyDiode struct {
diodes.Diode
mu sync.Mutex
dataList [][]byte
called int
}
func (s *spyDiode) Set(data diodes.GenericDataType) {
s.mu.Lock()
defer s.mu.Unlock()
s.dataList = append(s.dataList, *(*[]byte)(data))
}
func (s *spyDiode) TryNext() (diodes.GenericDataType, bool) {
s.mu.Lock()
defer s.mu.Unlock()
s.called++
if len(s.dataList) == 0 {
return nil, false
}
next := s.dataList[0]
s.dataList = s.dataList[1:]
return diodes.GenericDataType(&next), true
}