-
Notifications
You must be signed in to change notification settings - Fork 0
/
project.go
162 lines (131 loc) · 4.72 KB
/
project.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package main
import (
"context"
"log/slog"
"sync"
"github.com/shurcooL/githubv4"
)
// GetProjectItems pages through the list of items within the GitHub Project. It requires a context, GitHub client,
// the ID of the GitHub Project, and a channel on which to send errors. It returns a channel that receives ProjectItemEdgeFragment
// types, and a WaitGroup used for synchronizing when the next page should be queried.
func GetProjectItems(ctx context.Context, gh *githubv4.Client, projectId githubv4.ID, errChan chan<- error) (<-chan ProjectItemEdgeFragment, *sync.WaitGroup) {
out := make(chan ProjectItemEdgeFragment)
var wg sync.WaitGroup
var query ProjectItemsQuery
variables := map[string]interface{}{
"nodeId": projectId,
"cursor": (*githubv4.String)(nil),
// TODO: Fix this
// not used here, but a required variable nonetheless
"timelineCursor": (*githubv4.String)(nil),
}
go func() {
pager:
for {
// paginated query, errors should cancel the context, need error channel as input
if err := gh.Query(ctx, &query, variables); err != nil {
// send the error to the channel so that the context gets cancelled,
// break the for loop so that the channel gets closed
errChan <- err
break
}
// work through the project items to see which ones should be skipped
for _, item := range query.Items.Edges {
if !item.Skip() {
wg.Add(1)
out <- item
}
}
// wait on waitgroup, context to be cancelled
wg.Wait()
select {
case <-ctx.Done():
break pager
default:
if !query.HasNextPage() {
break pager
}
// update the cursor before breaking the select and moving to the next iteration
variables["cursor"] = query.Items.EndCursor
break
}
}
close(out)
}()
return out, &wg
}
// ProcessProjectItems processing incoming ProjectItemEdgeFragment types, calculates the number of upvotes, and
// generates an Update type, representing the data required to update a project item's upvotes. It requires a context,
// GitHub client, a channel in which to receive ProjectItemEdgeFragment types, and a channel on which to report errors.
// It returns a channel that receives Update types.
func ProcessProjectItems(ctx context.Context, gh *githubv4.Client, in <-chan ProjectItemEdgeFragment, errChan chan<- error) <-chan Update {
out := make(chan Update)
process := func(item ProjectItemEdgeFragment) {
content := item.GetContent()
if content.TimelineItems.HasNextPage {
var query ProjectItemQuery
variables := map[string]interface{}{
"nodeId": item.Id,
"timelineCursor": content.TimelineItems.EndCursor,
}
for {
slog.Debug("querying for additional timeline items", "node_id", item.Id)
if err := gh.Query(ctx, &query, variables); err != nil {
errChan <- err
// TODO: This doesn't decrement the waitgroup from GetProjectItems
// which I think is a bug -- if I'm not mistaken, this could lead to deadlock
return
}
content.TimelineItems.Nodes = append(content.TimelineItems.Nodes, query.GetContent().TimelineItems.Nodes...)
if !query.HasNextPage() {
break
}
variables["timelineCursor"] = query.GetContent().TimelineItems.EndCursor
}
}
out <- Update{
Id: item.Id,
Upvotes: githubv4.NewFloat(githubv4.Float(content.Upvotes())),
Cursor: item.Cursor,
}
}
go func() {
for item := range in {
go process(item)
}
close(out)
}()
return out
}
// UpdateProjectItems processes incoming Update types and uses them to update the project item's upvote count.
// It requires a context, GitHub client, a WaitGroup for syncronizing pagination, the GitHub Project's ID,
// and the ID of the custom 'upvotes' field on the Project. It returns a channel used to indicate that all
// updates have completed.
func UpdateProjectItems(ctx context.Context, gh *githubv4.Client, wg *sync.WaitGroup, projectId githubv4.ID, fieldId githubv4.ID, in <-chan Update, errChan chan<- error) <-chan struct{} {
out := make(chan struct{})
var mutation struct {
UpdateProjectItemV2FieldValue struct {
ClientMutationId string
} `graphql:"updateProjectV2ItemFieldValue(input: $input)"`
}
input := githubv4.UpdateProjectV2ItemFieldValueInput{
ProjectID: projectId,
FieldID: fieldId,
}
go func() {
for update := range in {
input.ItemID = update.Id
input.Value = githubv4.ProjectV2FieldValue{Number: update.Upvotes}
if err := gh.Mutate(ctx, &mutation, input, nil); err != nil {
errChan <- err
// TODO: This doesn't decrement the waitgroup from GetProjectItems
// which I think is a bug -- if I'm not mistaken, this could lead to deadlock
break
}
wg.Done()
slog.Info("updated project item", "item_id", update.Id, "upvotes", *update.Upvotes)
}
close(out)
}()
return out
}