-
Notifications
You must be signed in to change notification settings - Fork 5
/
app.py
107 lines (90 loc) · 3.77 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import json
import sys
import socket
import logging
from datetime import datetime
from flask import request, Flask
from influxdb import InfluxDBClient
from geolib import geohash
DATAPOINTS_CHUNK = 80000
logger = logging.getLogger("console-output")
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
app = Flask(__name__)
app.debug = True
client = InfluxDBClient(host='localhost', port=8086)
client.create_database('db')
client.switch_database('db')
@app.route('/collect', methods=['POST', 'GET'])
def collect():
logger.info(f"Request received")
healthkit_data = None
transformed_data = []
try:
healthkit_data = json.loads(request.data)
except:
return "Invalid JSON Received", 400
try:
logger.info(f"Ingesting Metrics")
for metric in healthkit_data.get("data", {}).get("metrics", []):
number_fields = []
string_fields = []
for datapoint in metric["data"]:
metric_fields = set(datapoint.keys())
metric_fields.remove("date")
for mfield in metric_fields:
if type(datapoint[mfield]) == int or type(datapoint[mfield]) == float:
number_fields.append(mfield)
else:
string_fields.append(mfield)
point = {
"measurement": metric["name"],
"time": datapoint["date"],
"tags": {str(nfield): str(datapoint[nfield]) for nfield in string_fields},
"fields": {str(nfield): float(datapoint[nfield]) for nfield in number_fields}
}
transformed_data.append(point)
number_fields.clear()
string_fields.clear()
logger.info(f"Data Transformation Complete")
logger.info(f"Number of data points to write: {len(transformed_data)}")
logger.info(f"DB Write Started")
for i in range(0, len(transformed_data), DATAPOINTS_CHUNK):
logger.info(f"DB Writing chunk")
client.write_points(transformed_data[i:i + DATAPOINTS_CHUNK])
logger.info(f"DB Metrics Write Complete")
logger.info(f"Ingesting Workouts Routes")
transformed_workout_data = []
for workout in healthkit_data.get("data", {}).get("workouts", []):
tags = {
"id": workout["name"] + "-" + workout["start"] + "-" + workout["end"]
}
for gps_point in workout["route"]:
point = {
"measurement": "workouts",
"time": gps_point["timestamp"],
"tags": tags,
"fields": {
"lat": gps_point["lat"],
"lng": gps_point["lon"],
"geohash": geohash.encode(gps_point["lat"], gps_point["lon"], 7),
}
}
transformed_workout_data.append(point)
for i in range(0, len(transformed_workout_data), DATAPOINTS_CHUNK):
logger.info(f"DB Writing chunk")
client.write_points(transformed_workout_data[i:i + DATAPOINTS_CHUNK])
logger.info(f"Ingesting Workouts Complete")
except:
logger.exception("Caught Exception. See stacktrace for details.")
return "Server Error", 500
return "Success", 200
if __name__ == "__main__":
hostname = socket.gethostname()
ip_address = socket.gethostbyname(hostname)
logger.info(f"Local Network Endpoint: http://{ip_address}/collect")
app.run(host='0.0.0.0', port=5353)