-
Notifications
You must be signed in to change notification settings - Fork 2
/
utility.py
149 lines (129 loc) · 5.55 KB
/
utility.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import os, sys, re, shutil
import json
from pathlib import Path
from datetime import *
import urllib.request
from argparse import ArgumentParser, RawTextHelpFormatter, ArgumentTypeError
from enums import *
def get_destination_dir(file_url, folder=None):
store_directory = os.environ.get('STORE_DIRECTORY')
if folder:
store_directory = folder
if not store_directory:
store_directory = os.path.dirname(os.path.realpath(__file__))
return os.path.join(store_directory, file_url)
def get_download_url(file_url):
return "{}{}".format(BASE_URL, file_url)
def get_all_symbols(type):
if type == 'um':
response = urllib.request.urlopen("https://fapi.binance.com/fapi/v1/exchangeInfo").read()
elif type == 'cm':
response = urllib.request.urlopen("https://dapi.binance.com/dapi/v1/exchangeInfo").read()
else:
response = urllib.request.urlopen("https://api.binance.com/api/v3/exchangeInfo").read()
return list(map(lambda symbol: symbol['symbol'], json.loads(response)['symbols']))
def download_file(base_path, file_name, date_range=None, folder=None):
download_path = "{}{}".format(base_path, file_name)
if folder:
base_path = os.path.join(folder, base_path)
if date_range:
date_range = date_range.replace(" ","_")
base_path = os.path.join(base_path, date_range)
save_path = get_destination_dir(os.path.join(base_path, file_name), folder)
if os.path.exists(save_path):
print("\nfile already exists! {}".format(save_path))
return
# make the directory
if not os.path.exists(base_path):
Path(get_destination_dir(base_path)).mkdir(parents=True, exist_ok=True)
try:
download_url = get_download_url(download_path)
dl_file = urllib.request.urlopen(download_url)
length = dl_file.getheader('content-length')
if length:
length = int(length)
blocksize = max(4096,length//100)
with open(save_path, 'wb') as out_file:
dl_progress = 0
print("\nFile Download: {}".format(save_path))
while True:
buf = dl_file.read(blocksize)
if not buf:
break
dl_progress += len(buf)
out_file.write(buf)
done = int(50 * dl_progress / length)
sys.stdout.write("\r[%s%s]" % ('#' * done, '.' * (50-done)) )
sys.stdout.flush()
except urllib.error.HTTPError:
print("\nFile not found: {}".format(download_url))
pass
def convert_to_date_object(d):
year, month, day = [int(x) for x in d.split('-')]
date_obj = date(year, month, day)
return date_obj
def get_start_end_date_objects(date_range):
start, end = date_range.split()
start_date = convert_to_date_object(start)
end_date = convert_to_date_object(end)
return start_date, end_date
def match_date_regex(arg_value, pat=re.compile(r'\d{4}-\d{2}-\d{2}')):
if not pat.match(arg_value):
raise ArgumentTypeError
return arg_value
def check_directory(arg_value):
if os.path.exists(arg_value):
while True:
option = input('Folder already exists! Do you want to overwrite it? y/n ')
if option != 'y' and option != 'n':
print('Invalid Option!')
continue
elif option == 'y':
shutil.rmtree(arg_value)
break
else:
break
return arg_value
def get_path(trading_type, market_data_type, time_period, symbol, interval=None):
trading_type_path = 'data/spot'
if trading_type != 'spot':
trading_type_path = f'data/futures/{trading_type}'
if interval is not None:
path = f'{trading_type_path}/{time_period}/{market_data_type}/{symbol.upper()}/{interval}/'
else:
path = f'{trading_type_path}/{time_period}/{market_data_type}/{symbol.upper()}/'
return path
def get_parser(parser_type):
parser = ArgumentParser(description=("This is a script to download historical {} data").format(parser_type), formatter_class=RawTextHelpFormatter)
parser.add_argument(
'-s', dest='symbols', nargs='+',
help='Single symbol or multiple symbols separated by space')
parser.add_argument(
'-y', dest='years', default=YEARS, nargs='+', choices=YEARS,
help='Single year or multiple years separated by space\n-y 2019 2021 means to download {} from 2019 and 2021'.format(parser_type))
parser.add_argument(
'-m', dest='months', default=MONTHS, nargs='+', type=int, choices=MONTHS,
help='Single month or multiple months separated by space\n-m 2 12 means to download {} from feb and dec'.format(parser_type))
parser.add_argument(
'-d', dest='dates', nargs='+', type=match_date_regex,
help='Date to download in [YYYY-MM-DD] format\nsingle date or multiple dates separated by space\ndownload past 35 days if no argument is parsed')
parser.add_argument(
'-startDate', dest='startDate', type=match_date_regex,
help='Starting date to download in [YYYY-MM-DD] format')
parser.add_argument(
'-endDate', dest='endDate', type=match_date_regex,
help='Ending date to download in [YYYY-MM-DD] format')
parser.add_argument(
'-folder', dest='folder', type=check_directory,
help='Directory to store the downloaded data')
parser.add_argument(
'-c', dest='checksum', default=0, type=int, choices=[0,1],
help='1 to download checksum file, default 0')
parser.add_argument(
'-t', dest='type', default='spot', choices=TRADING_TYPE,
help='Valid trading types: {}'.format(TRADING_TYPE))
if parser_type == 'klines':
parser.add_argument(
'-i', dest='intervals', default=INTERVALS, nargs='+', choices=INTERVALS,
help='single kline interval or multiple intervals separated by space\n-i 1m 1w means to download klines interval of 1minute and 1week')
return parser