-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
executable file
·61 lines (51 loc) · 1.89 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#!/usr/bin/env python
"""An exporter to fetch a fields value and count it for prometheus."""
from time import sleep
import prometheus_client as prom
import sentry_sdk
from broker import KafkaHandler
from config import Config
service = Config().service
counter_name = Config().counter_name
header_fields = Config().header_fields
counter = prom.Counter(
"{}_{}".format(service, counter_name),
"The {}'s {} counter".format(service, (header_fields)),
header_fields + ["service", "topic"]
)
exposed_port = Config().port
sentry = Config().sentry
if sentry is not None:
sentry_sdk.init(
dsn = sentry.get("dsn"),
release = sentry.get("release"),
environment = sentry.get("environment"),
send_default_pii = sentry.get("send-default-pii"),
debug = sentry.get("debug"),
traces_sample_rate = sentry.get("traces-sample-rate")
)
welcome_message = \
"Daemon started successfully on port {} ...".format(exposed_port)
if __name__ == "__main__":
prom.start_http_server(exposed_port)
consumer = KafkaHandler()
print(welcome_message)
sentry_sdk.capture_message(welcome_message)
for message in consumer.consume_loop():
try:
headers = {k: v.decode('utf-8')
for k, v in dict(message.headers()).items()}
headers['service'] = service
headers['topic'] = message.topic()
except Exception as e:
print("ERROR: ", e)
sentry_sdk.set_context("message-headers", message.headers())
sentry_sdk.capture_exception(e)
finally:
consumer.try_commit()
try:
counter.labels(**headers).inc()
except Exception as e:
print("ERROR: Error in mapping headers to configured ones", e)
sentry_sdk.set_context("prometheus-labels", headers)
sentry_sdk.capture_exception(e)