-
Notifications
You must be signed in to change notification settings - Fork 0
/
pipe.go
115 lines (92 loc) · 2.14 KB
/
pipe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package batching
import (
"context"
"runtime"
"sync"
"sync/atomic"
)
// Pipe divides tasks into batches and runs provided function for each of them.
// Pipe returns nil and the first error encountered, or processed results and nil if there were no errors.
// Pipe panics on calling goroutine if provided function panics.
func Pipe[F, R any](
ctx context.Context,
tasks []F,
pipe func(context.Context, []F) ([]R, error),
options ...DoOption,
) ([]R, error) {
doOpts := doOptions{
batchSize: 1,
minSizeForConcurrency: 2,
maxThreads: runtime.NumCPU(),
}
for _, do := range options {
doOpts = do.do(doOpts)
}
if len(tasks) < doOpts.minSizeForConcurrency {
return pipe(ctx, tasks)
}
var (
doCtx, cancel = context.WithCancel(ctx)
semaphore = make(chan struct{}, doOpts.maxThreads)
firstError atomic.Pointer[error]
firstPanic atomic.Pointer[any]
wg sync.WaitGroup
res = make([]R, len(tasks))
index = 0
)
defer cancel()
defer close(semaphore)
for len(tasks) > 0 {
thisBatchSize := doOpts.batchSize
if len(tasks) < thisBatchSize {
thisBatchSize = len(tasks)
}
batch := tasks[:thisBatchSize]
tasks = tasks[thisBatchSize:]
batchStart := index
index += thisBatchSize
select {
case <-doCtx.Done():
if fp := firstPanic.Load(); fp != nil {
panic(*fp)
}
ctxErr := doCtx.Err()
firstError.CompareAndSwap(nil, &ctxErr)
return nil, *firstError.Load()
case semaphore <- struct{}{}:
}
wg.Add(1)
go func() {
defer func() {
if rec := recover(); rec != nil {
firstPanic.CompareAndSwap(nil, &rec)
cancel()
}
select {
case <-doCtx.Done():
ctxErr := doCtx.Err()
firstError.CompareAndSwap(nil, &ctxErr)
case <-semaphore:
}
wg.Done()
}()
batchRes, pipeErr := pipe(doCtx, batch)
if pipeErr != nil {
firstError.CompareAndSwap(nil, &pipeErr)
cancel()
return
}
for i := 0; i < thisBatchSize; i++ {
res[i+batchStart] = batchRes[i]
}
}()
}
wg.Wait()
if fp := firstPanic.Load(); fp != nil {
panic(*fp)
}
if err := firstError.Load(); err != nil {
return nil, *err
}
return res, nil
}