-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathstream.go
97 lines (77 loc) · 2.45 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package observer
import "context"
// Stream represents the list of values a property is updated to. For every
// property update, that value is appended to the list in the order they
// happen. The value is discarded once you advance the stream. Please note
// that Stream is not goroutine safe: you cannot use the same stream on
// multiple goroutines concurrently. If you want to use multiple streams for
// the same property, either use Property.Observe (goroutine-safe) or use
// Stream.Clone (before passing it to another goroutine).
type Stream[T any] interface {
// Value returns the current value for this stream.
Value() T
// Changes returns the channel that is closed when a new value is available.
Changes() chan struct{}
// Next advances this stream to the next state.
// You should never call this unless Changes channel is closed.
Next() T
// HasNext checks whether there is a new value available.
HasNext() bool
// WaitNext waits for Changes to be closed, advances the stream and returns
// the current value.
WaitNext() T
// WaitNextCtx does the same as WaitNext but returns earlier with an error if the given context is cancelled first.
WaitNextCtx(ctx context.Context) (T, error)
// Clone creates a new independent stream from this one but sharing the same
// Property. Updates to the property will be reflected in both streams but
// they may have different values depending on when they advance the stream
// with Next.
Clone() Stream[T]
// Peek return the value in the next state
// You should never call this unless Changes channel is closed.
Peek() T
}
type stream[T any] struct {
state *state[T]
}
func (s *stream[T]) Clone() Stream[T] {
return &stream[T]{state: s.state}
}
func (s *stream[T]) Value() T {
return s.state.value
}
func (s *stream[T]) Changes() chan struct{} {
return s.state.done
}
func (s *stream[T]) Next() T {
s.state = s.state.next
return s.state.value
}
func (s *stream[T]) HasNext() bool {
select {
case <-s.state.done:
return true
default:
return false
}
}
func (s *stream[T]) WaitNext() T {
<-s.state.done
s.state = s.state.next
return s.state.value
}
func (s *stream[T]) WaitNextCtx(ctx context.Context) (T, error) {
select {
case <-s.Changes():
// ensure that context is not canceled, only then advance the stream
if ctx.Err() == nil {
return s.Next(), nil
}
case <-ctx.Done():
}
var zeroVal T
return zeroVal, ctx.Err()
}
func (s *stream[T]) Peek() T {
return s.state.next.value
}