-
Notifications
You must be signed in to change notification settings - Fork 0
/
storage.py
172 lines (135 loc) · 5.34 KB
/
storage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
from apiclient import discovery
from google.oauth2 import service_account
from datetime import datetime, timedelta
from itertools import zip_longest
from enum import Enum, auto
import pytz
import common as c
# from aqi import aqi
CREDS_FILE = c.get_abs_path('credentials.json')
SCOPES = [
"https://www.googleapis.com/auth/drive",
"https://www.googleapis.com/auth/drive.file",
"https://www.googleapis.com/auth/spreadsheets",
]
TIMEZONE = 'Europe/Sofia'
TS_FMT = "%Y%m%d_%H%M%S"
TS_FMT_PRETTY = "%Y-%m-%d %H:%M:%S" # parsable by Sheets
def build_sheets():
credentials = service_account.Credentials \
.from_service_account_file(CREDS_FILE, scopes=SCOPES)
max_exception_count = 3
# Let's try a couple of times, just to be sure
for i in range(max_exception_count):
try:
return discovery.build(
'sheets', 'v4', credentials=credentials).spreadsheets()
except:
natural = i + 1
if natural == max_exception_count: raise
print('Issues building spreadsheet service, attempt: %d' % natural)
sheet_service = build_sheets().values()
class Mode(Enum):
avg = auto()
min = auto()
max = auto()
def put(LOCAL_ENV, client, readouts):
localized = datetime.now()
if not LOCAL_ENV:
timezone = pytz.timezone(TIMEZONE)
utc = pytz.utc
localized = utc.localize(localized).astimezone(timezone)
timestamp = localized.strftime(TS_FMT)
timestamp_pretty = localized.strftime(TS_FMT_PRETTY)
data = {
'values': [ [timestamp, timestamp_pretty] + readouts ]
}
if LOCAL_ENV: data['values'][0].append("test")
sheet_service.append(
spreadsheetId=client.sheet['id'],
body=data,
range=client.get_sheet_range(),
valueInputOption='USER_ENTERED').execute()
# def add_aqi(entry):
# pm25 = Sensor.sds_pm25
# if pm25.name in entry:
# pm25_aqi, pm25_label = aqi(entry[pm25.name], pm25).get()
# entry['pm25_aqi'] = pm25_aqi
# entry['pm25_aqi_label'] = pm25_label.name
# pm10 = Sensor.sds_pm10
# if pm10.name in entry:
# pm10_aqi, pm10_label = aqi(entry[pm10.name], pm10).get()
# entry['pm10_aqi'] = pm10_aqi
# entry['pm10_aqi_label'] = pm10_label.name
# return entry
def filter_per_delta(matrix, delta):
# We're going to count our timedelta from the last entry in the matrix
# array onwards.
last = datetime.strptime(matrix[0][0], TS_FMT)
start_date = last - delta;
for i, row in enumerate(matrix):
if datetime.strptime(row[0], TS_FMT) < start_date:
return matrix[:i - 1], start_date
else:
# OK, so the period specified goes outside of the bonds of the matrix.
# No biggie, just return the entire matrix and say the start date is the
# earliest time specified there.
return matrix, datetime.strptime(matrix[-1][0], TS_FMT)
def squash_by_mode(input, mode):
operations = {
Mode.avg: c.avg,
Mode.min: min,
Mode.max: max
}
return [operations[mode](col) for col in input]
def get_last_record(client, entries):
last_entry = entries[-1]
# Parse out numbers, so it's easier to calculate AQI on them later.
last_entry[2:] = [float(x) for x in last_entry[2:]]
keys = ['timestamp', 'timestamp_pretty'] + \
[id.name for id in client.sensors]
output = { 'client': client.name }
output.update({ **dict(zip(keys, last_entry)) })
return output
def get_last_period(client, entries, delta, mode):
# Let's prep the array for processing by sorting it in reverse chronological
# order.
entries = sorted(
entries,
key = lambda x: x[0], reverse=True
)
# Filter out needless data per the timedelta needed.
entries, start_date = filter_per_delta(entries, delta)
# Swap columns for rows, so that we get all values for averaging in
# separate arrays.
entries = list([x for x in y] for y in zip_longest(*entries))
# Remove redundant timestamps.
entries = entries[2:]
# Parse all numbers.
entries = [[float(y) for y in x] for x in entries]
# Squash data as per required mode.
squashed = squash_by_mode(entries, mode)
# Prep the readouts + their labels.
readouts = dict(zip(\
[sensor.name for sensor in client.sensors], \
squashed))
# Add client, mode of operation, start date & readouts to output dict.
squashed_dict = { 'client': client.name }
squashed_dict.update({ 'mode': mode.name })
squashed_dict.update({ 'period_start': start_date.strftime(TS_FMT) })
squashed_dict.update(**readouts)
# And finally return processed dict.
return squashed_dict
def get(LOCAL_ENV, delta, mode, clients):
readouts = []
for client in clients:
entries = sheet_service.get(
spreadsheetId=client.sheet['id'],
range=client.get_sheet_range()).execute()['values']
if delta == timedelta(): # as in, no user inputted data
# return add_aqi(get_last_record(client, entries))
readouts.append(get_last_record(client, entries))
else:
readouts.append(get_last_period(client, entries, delta, mode))
# return add_aqi(get_last_period(client, entries, delta, mode))
return [ c.round_num_dict(item) for item in readouts ]