-
Notifications
You must be signed in to change notification settings - Fork 54
/
p2pt.js
375 lines (313 loc) Β· 9.62 KB
/
p2pt.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
/**
* Peer 2 Peer WebRTC connections with WebTorrent Trackers as signalling server
* Copyright Subin Siby <mail@subinsb.com>, 2020
* Licensed under MIT
*/
import WebSocketTracker from 'bittorrent-tracker/lib/client/websocket-tracker.js'
import EventEmitter from 'events'
import Debug from 'debug'
import { randomBytes, arr2hex, hex2bin, hex2arr, hash, arr2text } from 'uint8-util'
const debug = Debug('p2pt')
/**
* This character would be prepended to easily identify JSON msgs
*/
const JSON_MESSAGE_IDENTIFIER = '^'
/**
* WebRTC data channel limit beyond which data is split into chunks
* Chose 16KB considering Chromium
* https://developer.mozilla.org/en-US/docs/Web/API/WebRTC_API/Using_data_channels#Concerns_with_large_messages
*/
const MAX_MESSAGE_LENGTH = 16000
export default class P2PT extends EventEmitter {
/**
*
* @param array announceURLs List of announce tracker URLs
* @param string identifierString Identifier used to discover peers in the network
*/
constructor (announceURLs = [], identifierString = '') {
super()
this.announceURLs = announceURLs
this.trackers = {}
this.peers = {}
this.msgChunks = {}
this.responseWaiting = {}
if (identifierString) { this.setIdentifier(identifierString) }
this._peerIdBuffer = randomBytes(20)
this._peerId = arr2hex(this._peerIdBuffer)
this._peerIdBinary = hex2bin(this._peerId)
debug('my peer id: ' + this._peerId)
}
/**
* Set the identifier string used to discover peers in the network
* @param string identifierString
*/
async setIdentifier (identifierString) {
this.identifierString = identifierString
this.infoHash = hash(identifierString, 'hex')
this._infoHashBuffer = hex2arr((await this.infoHash).toLowerCase())
this._infoHashBinary = hex2bin((await this.infoHash).toLowerCase())
}
/**
* Connect to network and start discovering peers
*/
async start () {
await this.infoHash
this.on('peer', peer => {
let newpeer = false
if (!this.peers[peer.id]) {
newpeer = true
this.peers[peer.id] = {}
this.responseWaiting[peer.id] = {}
}
peer.on('connect', () => {
/**
* Multiple data channels to one peer is possible
* The `peer` object actually refers to a peer with a data channel. Even though it may have same `id` (peerID) property, the data channel will be different. Different trackers giving the same "peer" will give the `peer` object with different channels.
* We will store all channels as backups in case any one of them fails
* A peer is removed if all data channels become unavailable
*/
this.peers[peer.id][peer.channelName] = peer
if (newpeer) {
this.emit('peerconnect', peer)
}
})
peer.on('data', data => {
this.emit('data', peer, data)
if (ArrayBuffer.isView(data)) data = arr2text(data)
debug('got a message from ' + peer.id)
if (data[0] === JSON_MESSAGE_IDENTIFIER) {
try {
data = JSON.parse(data.slice(1))
// A respond function
peer.respond = this._peerRespond(peer, data.id)
let msg = this._chunkHandler(data)
// msg fully retrieved
if (msg !== false) {
if (data.o) {
msg = JSON.parse(msg)
}
/**
* If there's someone waiting for a response, call them
*/
if (this.responseWaiting[peer.id][data.id]) {
this.responseWaiting[peer.id][data.id]([peer, msg])
delete this.responseWaiting[peer.id][data.id]
} else {
this.emit('msg', peer, msg)
}
this._destroyChunks(data.id)
}
} catch (e) {
console.log(e)
}
}
})
peer.on('error', err => {
this._removePeer(peer)
debug('Error in connection : ' + err)
})
peer.on('close', () => {
this._removePeer(peer)
debug('Connection closed with ' + peer.id)
})
})
// Tracker responded to the announce request
this.on('update', response => {
const tracker = this.trackers[this.announceURLs.indexOf(response.announce)]
this.emit(
'trackerconnect',
tracker,
this.getTrackerStats()
)
})
// Errors in tracker connection
this.on('warning', err => {
this.emit(
'trackerwarning',
err,
this.getTrackerStats()
)
})
this._fetchPeers()
}
/**
* Add a tracker
* @param string announceURL Tracker Announce URL
*/
addTracker (announceURL) {
if (this.announceURLs.indexOf(announceURL) !== -1) {
throw new Error('Tracker already added')
}
const key = this.announceURLs.push(announceURL)
this.trackers[key] = new WebSocketTracker(this, announceURL)
this.trackers[key].announce(this._defaultAnnounceOpts())
}
/**
* Remove a tracker without destroying peers
*/
removeTracker (announceURL) {
const key = this.announceURLs.indexOf(announceURL)
if (key === -1) {
throw new Error('Tracker does not exist')
}
// hack to not destroy peers
this.trackers[key].peers = []
this.trackers[key].destroy()
delete this.trackers[key]
delete this.announceURLs[key]
}
/**
* Remove a peer from the list if all channels are closed
* @param integer id Peer ID
*/
_removePeer (peer) {
if (!this.peers[peer.id]) { return false }
delete this.peers[peer.id][peer.channelName]
// All data channels are gone. Peer lost
if (Object.keys(this.peers[peer.id]).length === 0) {
this.emit('peerclose', peer)
delete this.responseWaiting[peer.id]
delete this.peers[peer.id]
}
}
/**
* Send a msg and get response for it
* @param Peer peer simple-peer object to send msg to
* @param string msg Message to send
* @param integer msgID ID of message if it's a response to a previous message
*/
send (peer, msg, msgID = '') {
return new Promise((resolve, reject) => {
const data = {
id: msgID !== '' ? msgID : Math.floor(Math.random() * 100000 + 100000),
msg
}
if (typeof msg === 'object') {
data.msg = JSON.stringify(msg)
data.o = 1 // indicating object
}
try {
/**
* Maybe peer channel is closed, so use a different channel if available
* Array should atleast have one channel, otherwise peer connection is closed
*/
if (!peer.connected) {
for (const index in this.peers[peer.id]) {
peer = this.peers[peer.id][index]
if (peer.connected) break
}
}
if (!this.responseWaiting[peer.id]) {
this.responseWaiting[peer.id] = {}
}
this.responseWaiting[peer.id][data.id] = resolve
} catch (e) {
return reject(Error('Connection to peer closed' + e))
}
let chunks = 0
let remaining = ''
while (data.msg.length > 0) {
data.c = chunks
remaining = data.msg.slice(MAX_MESSAGE_LENGTH)
data.msg = data.msg.slice(0, MAX_MESSAGE_LENGTH)
if (!remaining) { data.last = true }
peer.send(JSON_MESSAGE_IDENTIFIER + JSON.stringify(data))
data.msg = remaining
chunks++
}
debug('sent a message to ' + peer.id)
})
}
/**
* Request more peers
*/
requestMorePeers () {
return new Promise(resolve => {
for (const key in this.trackers) {
this.trackers[key].announce(this._defaultAnnounceOpts())
}
resolve(this.peers)
})
}
/**
* Get basic stats about tracker connections
*/
getTrackerStats () {
let connectedCount = 0
for (const key in this.trackers) {
if (this.trackers[key].socket && this.trackers[key].socket.connected) {
connectedCount++
}
}
return {
connected: connectedCount,
total: this.announceURLs.length
}
}
/**
* Destroy object
*/
destroy () {
let key
for (key in this.peers) {
for (const key2 in this.peers[key]) {
this.peers[key][key2].destroy()
}
}
for (key in this.trackers) {
this.trackers[key].destroy()
}
}
/**
* A custom function binded on Peer object to easily respond back to message
* @param Peer peer Peer to send msg to
* @param integer msgID Message ID
*/
_peerRespond (peer, msgID) {
return msg => {
return this.send(peer, msg, msgID)
}
}
/**
* Handle msg chunks. Returns false until the last chunk is received. Finally returns the entire msg
* @param object data
*/
_chunkHandler (data) {
if (!this.msgChunks[data.id]) {
this.msgChunks[data.id] = []
}
this.msgChunks[data.id][data.c] = data.msg
if (data.last) {
const completeMsg = this.msgChunks[data.id].join('')
return completeMsg
} else {
return false
}
}
/**
* Remove all stored chunks of a particular message
* @param integer msgID Message ID
*/
_destroyChunks (msgID) {
delete this.msgChunks[msgID]
}
/**
* Default announce options
* @param object opts Options
*/
_defaultAnnounceOpts (opts = {}) {
if (opts.numwant == null) opts.numwant = 50
if (opts.uploaded == null) opts.uploaded = 0
if (opts.downloaded == null) opts.downloaded = 0
return opts
}
/**
* Initialize trackers and fetch peers
*/
_fetchPeers () {
for (const key in this.announceURLs) {
this.trackers[key] = new WebSocketTracker(this, this.announceURLs[key])
this.trackers[key].announce(this._defaultAnnounceOpts())
}
}
}