-
Notifications
You must be signed in to change notification settings - Fork 1
/
MiningDAO.go.diff
201 lines (191 loc) · 7.76 KB
/
MiningDAO.go.diff
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
diff --git a/eth/fetcher/block_fetcher.go b/eth/fetcher/block_fetcher.go
index 3177a877e..6b77b3e58 100644
--- a/eth/fetcher/block_fetcher.go
+++ b/eth/fetcher/block_fetcher.go
@@ -97,6 +97,9 @@ type chainInsertFn func(types.Blocks) (int, error)
// peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func(id string)
+// peerStatsUpdateFn is a callback type for reporting peer events for stats update.
+type peerStatsUpdateFn func(peer string, event string)
+
// blockAnnounce is the hash notification of the availability of a new block in the
// network.
type blockAnnounce struct {
@@ -187,6 +190,7 @@ type BlockFetcher struct {
insertHeaders headersInsertFn // Injects a batch of headers into the chain
insertChain chainInsertFn // Injects a batch of blocks into the chain
dropPeer peerDropFn // Drops a peer for misbehaving
+ updatePeerStats peerStatsUpdateFn // Callback to update peer stats
// Testing hooks
announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the blockAnnounce list
@@ -197,7 +201,7 @@ type BlockFetcher struct {
}
// NewBlockFetcher creates a block fetcher to retrieve blocks based on hash announcements.
-func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertHeaders headersInsertFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher {
+func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertHeaders headersInsertFn, insertChain chainInsertFn, dropPeer peerDropFn, updatePeerStats peerStatsUpdateFn) *BlockFetcher {
return &BlockFetcher{
light: light,
notify: make(chan *blockAnnounce),
@@ -222,6 +226,7 @@ func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetr
insertHeaders: insertHeaders,
insertChain: insertChain,
dropPeer: dropPeer,
+ updatePeerStats: updatePeerStats,
}
}
@@ -788,6 +793,11 @@ func (f *BlockFetcher) importBlocks(peer string, block *types.Block) {
// Run the import on a new thread
log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)
+
+ if f.updatePeerStats != nil {
+ f.updatePeerStats(peer, "importBlocks")
+ }
+
go func() {
defer func() { f.done <- hash }()
diff --git a/eth/handler.go b/eth/handler.go
index aff4871af..7e449fb59 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -108,6 +108,7 @@ type handler struct {
blockFetcher *fetcher.BlockFetcher
txFetcher *fetcher.TxFetcher
peers *peerSet
+ peersStats *peerSetStats
eventMux *event.TypeMux
txsCh chan core.NewTxsEvent
@@ -143,6 +144,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
}
+ h.peersStats = newPeerSetStats(h.peers)
if config.Sync == downloader.FullSync {
// The database seems empty as the current block is the genesis. Yet the fast
// block is ahead, so fast sync was enabled for this node at a certain point.
@@ -219,7 +221,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
}
return n, err
}
- h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.BroadcastBlock, heighter, nil, inserter, h.removePeer)
+ h.blockFetcher = fetcher.NewBlockFetcher(false, nil, h.chain.GetBlockByHash, validator, h.BroadcastBlock, heighter, nil, inserter, h.removePeer, h.peersStats.UpdatePeerStats)
fetchTx := func(peer string, hashes []common.Hash) error {
p := h.peers.peer(peer)
@@ -438,6 +440,14 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
hash := block.Hash()
peers := h.peers.peersWithoutBlock(hash)
+ if propagate {
+ h.peersStats.ReportPeersWithoutBlock(peers)
+ }
+ // Print peers stats every 100 blocks
+ if block.NumberU64() % 100 == 0 {
+ h.peersStats.LogStats()
+ }
+
// If propagation is requested, send to a subset of the peer
if propagate {
// Calculate the TD of the block (it's not imported yet, so block.Td is not valid)
@@ -448,12 +458,20 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)
return
}
- // Send the block to a subset of our peers
- transfer := peers[:int(math.Sqrt(float64(len(peers))))]
- for _, peer := range transfer {
- peer.AsyncSendNewBlock(block, td)
+ // Prioritize sending to trusted peers
+ log.Warn("Broadcasting block to trusted peers", "number", block.Number(), "hash", hash)
+ for _, peer := range peers {
+ if peer.Peer.Info().Network.Trusted {
+ peer.AsyncSendNewBlock(block, td)
+ }
+ }
+ // Send to all remaining peers as well
+ for _, peer := range peers {
+ if !peer.Peer.Info().Network.Trusted {
+ peer.AsyncSendNewBlock(block, td)
+ }
}
- log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
+ log.Trace("Propagated block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
return
}
// Otherwise if the block is indeed in out own chain, announce it
diff --git a/eth/peerset.go b/eth/peerset.go
index 1e864a8e4..8d5293e41 100644
--- a/eth/peerset.go
+++ b/eth/peerset.go
@@ -24,6 +24,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/eth/protocols/snap"
+ "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
)
@@ -257,3 +258,54 @@ func (ps *peerSet) close() {
}
ps.closed = true
}
+
+type peerSetStats struct {
+ ps *peerSet
+ importedBlocksFrom map[string]int
+ totalBroadcastsTo map[string]int
+ broadcastsToWithoutBlock map[string]int
+ lock sync.RWMutex
+}
+
+func newPeerSetStats(ps *peerSet) *peerSetStats {
+ return &peerSetStats{
+ ps: ps,
+ importedBlocksFrom: make(map[string]int),
+ totalBroadcastsTo: make(map[string]int),
+ broadcastsToWithoutBlock: make(map[string]int),
+ }
+}
+
+func (s *peerSetStats) ReportPeersWithoutBlock(peers []*ethPeer) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ for _, p := range s.ps.peers {
+ s.totalBroadcastsTo[p.Peer.ID()] += 1
+ }
+ for _, p := range peers {
+ s.broadcastsToWithoutBlock[p.Peer.ID()] += 1
+ }
+}
+
+func (s *peerSetStats) UpdatePeerStats(peer string, event string) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ if event == "importBlocks" {
+ s.importedBlocksFrom[peer] += 1
+ }
+}
+
+func (s *peerSetStats) LogStats() {
+ s.lock.RLock()
+ defer s.lock.RUnlock()
+
+ log.Info("Peers:")
+ for _, p := range s.ps.peers {
+ log.Info("Peer: ", "id", p.Peer.ID(), "enode", p.Peer.Node().URLv4(),
+ "importedBlocksFrom", s.importedBlocksFrom[p.Peer.ID()],
+ "totalBroadcastsTo", s.totalBroadcastsTo[p.Peer.ID()],
+ "broadcastsToWithoutBlock", s.broadcastsToWithoutBlock[p.Peer.ID()])
+ }
+}
diff --git a/les/fetcher.go b/les/fetcher.go
index a6d1c93c4..38a4f8417 100644
--- a/les/fetcher.go
+++ b/les/fetcher.go
@@ -181,7 +181,7 @@ func newLightFetcher(chain *light.LightChain, engine consensus.Engine, peers *se
chaindb: chaindb,
chain: chain,
reqDist: reqDist,
- fetcher: fetcher.NewBlockFetcher(true, chain.GetHeaderByHash, nil, validator, nil, heighter, inserter, nil, dropper),
+ fetcher: fetcher.NewBlockFetcher(true, chain.GetHeaderByHash, nil, validator, nil, heighter, inserter, nil, dropper, nil),
peers: make(map[enode.ID]*fetcherPeer),
synchronise: syncFn,
announceCh: make(chan *announce),