forked from bitly/go-hostpool
-
Notifications
You must be signed in to change notification settings - Fork 0
/
selector.go
125 lines (110 loc) · 2.56 KB
/
selector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package hostpool
import (
"log"
"sync"
"time"
)
type Selector interface {
Init([]string)
SelectNextHost() string
MakeHostResponse(string) HostPoolResponse
MarkHost(string, error)
ResetAll()
}
type standardSelector struct {
sync.RWMutex
hosts map[string]*hostEntry
hostList []*hostEntry
initialRetryDelay time.Duration
maxRetryInterval time.Duration
nextHostIndex int
}
func (s *standardSelector) Init(hosts []string) {
s.Lock()
s.initInternal(hosts)
s.Unlock()
}
func (s *standardSelector) initInternal(hosts []string) {
s.hosts = make(map[string]*hostEntry, len(hosts))
s.hostList = make([]*hostEntry, len(hosts))
s.initialRetryDelay = time.Duration(30) * time.Second
s.maxRetryInterval = time.Duration(900) * time.Second
for i, h := range hosts {
e := &hostEntry{
host: h,
retryDelay: s.initialRetryDelay,
}
s.hosts[h] = e
s.hostList[i] = e
}
}
func (s *standardSelector) SelectNextHost() string {
s.Lock()
host := s.getRoundRobin()
s.Unlock()
return host
}
func (s *standardSelector) getRoundRobin() string {
now := time.Now()
hostCount := len(s.hostList)
for i := range s.hostList {
// iterate via sequenece from where we last iterated
currentIndex := (i + s.nextHostIndex) % hostCount
h := s.hostList[currentIndex]
if h.canTryHost(now) {
s.nextHostIndex = currentIndex + 1
return h.host
}
}
// all hosts are down. re-add them
log.Println("all hosts are down, resetting ...")
s.doResetAll()
s.nextHostIndex = 0
return s.hostList[0].host
}
func (s *standardSelector) MakeHostResponse(host string) HostPoolResponse {
s.Lock()
defer s.Unlock()
h, ok := s.hosts[host]
if !ok {
log.Fatalf("host %s not in HostPool", host)
}
now := time.Now()
if h.dead && h.nextRetry.Before(now) {
h.willRetryHost(s.maxRetryInterval)
}
return &standardHostPoolResponse{host: host, ss: s}
}
func (s *standardSelector) MarkHost(host string, err error) {
s.Lock()
defer s.Unlock()
h, ok := s.hosts[host]
if !ok {
log.Fatalf("host %s not in HostPool", host)
}
if err == nil {
// success - mark host alive
h.dead = false
} else {
// failure - mark host dead
if !h.dead {
h.dead = true
h.retryCount = 0
h.retryDelay = s.initialRetryDelay
h.nextRetry = time.Now().Add(h.retryDelay)
}
}
}
func (s *standardSelector) ResetAll() {
s.Lock()
defer s.Unlock()
s.doResetAll()
}
// this actually performs the logic to reset,
// and should only be called when the lock has
// already been acquired
func (s *standardSelector) doResetAll() {
for _, h := range s.hosts {
h.dead = false
}
}