-
Notifications
You must be signed in to change notification settings - Fork 2
/
boxqueue_mailbox.go
288 lines (240 loc) · 6.04 KB
/
boxqueue_mailbox.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
package actorkit
import (
"sync"
"sync/atomic"
"github.com/gokit/errors"
)
// ErrPushFailed is returned when mailbox has reached storage limit.
var ErrPushFailed = errors.New("failed to push into mailbox")
// ErrMailboxEmpty is returned when mailbox is empty of pending envelopes.
var ErrMailboxEmpty = errors.New("mailbox is empty")
var (
_ Mailbox = &BoxQueue{}
nodePool = sync.Pool{New: func() interface{} {
return new(node)
}}
)
// Strategy defines a int type to represent a giving strategy.
type Strategy int
// constants.
const (
DropNew Strategy = iota
DropOld
)
type node struct {
addr Addr
value *Envelope
next *node
prev *node
}
// BoxQueue defines a queue implementation safe for concurrent-use
// across go-routines, which provides ability to requeue, pop and push
// new envelop messages. BoxQueue uses lock to guarantee safe concurrent use.
type BoxQueue struct {
bm sync.Mutex
pushCond *sync.Cond
head *node
tail *node
capped int
total int64
strategy Strategy
invoker MailInvoker
}
// BoundedBoxQueue returns a new instance of a unbounded box queue.
// Items will be queue till the capped is reached and then old items
// will be dropped till queue has enough space for new item.
// A cap value of -1 means there will be no maximum limit
// of allow messages in queue.
func BoundedBoxQueue(capped int, method Strategy, invoker MailInvoker) *BoxQueue {
bq := &BoxQueue{
capped: capped,
strategy: method,
invoker: invoker,
}
bq.pushCond = sync.NewCond(&bq.bm)
return bq
}
// UnboundedBoxQueue returns a new instance of a unbounded box queue.
// Items will be queue endlessly.
func UnboundedBoxQueue(invoker MailInvoker) *BoxQueue {
bq := &BoxQueue{
capped: -1,
invoker: invoker,
}
bq.pushCond = sync.NewCond(&bq.bm)
return bq
}
// Signal sends a signal to all listening go-routines to
// attempt checks for new message.
func (bq *BoxQueue) Signal() {
bq.pushCond.Broadcast()
}
// Clear resets and deletes all elements pending within queue
func (bq *BoxQueue) Clear() {
bq.pushCond.L.Lock()
if bq.isEmpty() {
bq.pushCond.L.Unlock()
return
}
bq.tail = nil
bq.head = nil
bq.pushCond.L.Unlock()
bq.pushCond.Broadcast()
}
// Wait will block current goroutine till there is a message pushed into
// the queue, allowing you to effectively rely on it as a schedule and processing
// signal for when messages are in queue.
func (bq *BoxQueue) Wait() {
bq.pushCond.L.Lock()
if !bq.isEmpty() {
bq.pushCond.L.Unlock()
return
}
bq.pushCond.Wait()
bq.pushCond.L.Unlock()
}
// Push adds the item to the back of the queue.
//
// Push can be safely called from multiple goroutines.
// Based on strategy if capped, then a message will be dropped.
func (bq *BoxQueue) Push(addr Addr, env Envelope) error {
available := int(atomic.LoadInt64(&bq.total))
if bq.capped != -1 && available >= bq.capped {
if bq.invoker != nil {
bq.invoker.InvokedFull()
}
switch bq.strategy {
case DropNew:
if bq.invoker != nil {
bq.invoker.InvokedDropped(addr, env)
}
return errors.Wrap(ErrPushFailed, "")
case DropOld:
if addrs, envs, err := bq.Pop(); err == nil {
if bq.invoker != nil {
bq.invoker.InvokedDropped(addrs, envs)
}
}
}
}
atomic.AddInt64(&bq.total, 1)
n := nodePool.Get().(*node)
n.value = &env
n.addr = addr
if bq.invoker != nil {
bq.invoker.InvokedReceived(addr, env)
}
bq.pushCond.L.Lock()
if bq.head == nil && bq.tail == nil {
bq.head, bq.tail = n, n
bq.pushCond.L.Unlock()
bq.pushCond.Broadcast()
return nil
}
bq.tail.next = n
n.prev = bq.tail
bq.tail = n
bq.pushCond.L.Unlock()
bq.pushCond.Broadcast()
return nil
}
// Unpop adds back item to the font of the queue.
//
// Unpop can be safely called from multiple goroutines.
// If queue is capped and max was reached, then last added
// message is removed to make space for message to be added back.
// This means strategy will be ignored since this is an attempt
// to re-add an item back into the top of the queue.
func (bq *BoxQueue) Unpop(addr Addr, env Envelope) {
available := int(atomic.LoadInt64(&bq.total))
if bq.capped != -1 && available >= bq.capped {
bq.unshift()
}
atomic.AddInt64(&bq.total, 1)
n := nodePool.Get().(*node)
n.value = &env
n.addr = addr
if bq.invoker != nil {
bq.invoker.InvokedReceived(addr, env)
}
bq.pushCond.L.Lock()
head := bq.head
if head != nil {
n.next = head
bq.head = n
bq.pushCond.L.Unlock()
bq.pushCond.Broadcast()
return
}
bq.head = n
bq.tail = n
bq.pushCond.L.Unlock()
bq.pushCond.Broadcast()
}
// Pop removes the item from the front of the queue.
//
// Pop can be safely called from multiple goroutines.
func (bq *BoxQueue) Pop() (Addr, Envelope, error) {
bq.pushCond.L.Lock()
head := bq.head
if head != nil {
atomic.AddInt64(&bq.total, -1)
v := head.value
addr := head.addr
if bq.invoker != nil {
bq.invoker.InvokedDispatched(addr, *v)
}
bq.head = head.next
if bq.tail == head {
bq.tail = bq.head
}
head.next = nil
head.prev = nil
head.addr = nil
head.value = nil
bq.pushCond.L.Unlock()
nodePool.Put(head)
return addr, *v, nil
}
bq.pushCond.L.Unlock()
if bq.invoker != nil {
bq.invoker.InvokedEmpty()
}
return nil, Envelope{}, errors.Wrap(ErrMailboxEmpty, "empty mailbox")
}
// unshift discards the tail of queue, allowing new space.
func (bq *BoxQueue) unshift() {
bq.pushCond.L.Lock()
tail := bq.tail
if tail != nil {
atomic.AddInt64(&bq.total, -1)
bq.tail = tail.prev
if tail == bq.head {
bq.head = bq.tail
}
tail.next = nil
tail.prev = nil
tail.value = nil
}
bq.pushCond.L.Unlock()
return
}
// Cap returns current cap of items.
func (bq *BoxQueue) Cap() int {
return bq.capped
}
// Total returns total of item in mailbox.
func (bq *BoxQueue) Total() int {
return int(atomic.LoadInt64(&bq.total))
}
// IsEmpty returns true/false if the queue is empty.
func (bq *BoxQueue) IsEmpty() bool {
var empty bool
bq.pushCond.L.Lock()
empty = bq.isEmpty()
bq.pushCond.L.Unlock()
return empty
}
func (bq *BoxQueue) isEmpty() bool {
return bq.head == nil && bq.tail == nil
}