-
Notifications
You must be signed in to change notification settings - Fork 0
/
streaming.py
52 lines (47 loc) · 1.89 KB
/
streaming.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import requests
import json
from event import TickEvent
class StreamingForexPrices(object):
def __init__(
self, domain, access_token,
account_id, instruments, events_queue
):
self.domain = domain
self.access_token = access_token
self.account_id = account_id
self.instruments = instruments
self.events_queue = events_queue
def connect_to_stream(self):
try:
requests.packages.urllib3.disable_warnings()
s = requests.Session()
url = "https://" + self.domain + "/v1/prices"
headers = {'Authorization' : 'Bearer ' + self.access_token}
params = {'instruments' : self.instruments, 'accountId' : self.account_id}
req = requests.Request('GET', url, headers=headers, params=params)
pre = req.prepare()
resp = s.send(pre, stream=True, verify=False)
return resp
except Exception as e:
s.close()
print ("Caught exception when connecting to stream\n" + str(e))
def stream_to_queue(self):
response = self.connect_to_stream()
if response.status_code != 200:
return
for line in response.iter_lines(1):
if line:
try:
dline = line.decode('utf-8')
msg = json.loads(dline)
except Exception as e:
print ("Caught exception when converting message into json\n" + str(e))
return
if "instrument" in msg or "tick" in msg:
print (msg)
instrument = msg["tick"]["instrument"]
time = msg["tick"]["time"]
bid = msg["tick"]["bid"]
ask = msg["tick"]["ask"]
tev = TickEvent(instrument, time, bid, ask)
self.events_queue.put(tev)