-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsynchronizer_test.py
195 lines (161 loc) · 7.62 KB
/
synchronizer_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import asyncio
import json
from datetime import datetime, timedelta
from gpiozero import DigitalOutputDevice
import ntplib
from concurrent.futures import ThreadPoolExecutor
import logging
import requests
import time
def wait_for_internet_connection():
primary_url = "http://www.google.com"
fallback_url = "http://www.cloudflare.com"
while True:
try:
response = requests.get(primary_url, timeout=5)
# If the request was successful, break out of the loop
if response.status_code == 200:
logger.info("Internet Connection Successful!!")
break
elif response.status_code == 429:
logger.warning("Too many requests, Changing to fall back url!!")
primary_url = fallback_url
time.sleep(5)
else :
logger.warning(f"Unexpected status code : {response.status_code}")
except (requests.ConnectionError,requests.Timeout) as e:
# If the connection failed, wait for a bit and then retry
logger.error("No internet connection. Waiting to retry...")
time.sleep(5)
time.sleep(5)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# NTP setup
PRIMARY_NTP_SERVER = 'time.nist.gov'
BACKUP_NTP_SERVER = 'pool.ntp.org'
executor = ThreadPoolExecutor(max_workers=2)
class EventScheduler:
def __init__(self, offset=timedelta(seconds=0)):
self.offset = offset
self.events = []
self.devices = {} # Store DigitalOutputDevice instances
async def update_offset(self):
client = ntplib.NTPClient()
for server in [PRIMARY_NTP_SERVER, BACKUP_NTP_SERVER]:
try:
response = await asyncio.get_event_loop().run_in_executor(executor, lambda: client.request(server, version=3, timeout=5))
self.offset = timedelta(seconds=response.offset)
logger.info(f"Offset updated from {server}: {self.offset.total_seconds()} seconds")
return
except Exception as e:
logger.error(f"Error updating offset from {server}: {e}")
logger.warning("Failed to update offset from both servers; continuing with last known offset.")
def load_events_from_json(self, file_path):
with open(file_path, 'r') as file:
try:
event_data = json.load(file)
except json.JSONDecodeError as e:
logger.error(f"Failed to decode JSON from {file_path}: {e}")
return
now = datetime.utcnow() + self.offset
for event in event_data:
try:
event_datetime = self.calculate_initial_datetime(event, now)
except Exception as e:
logger.error(f"Error processing event {event}: {e}")
continue
pin = event.get('pin')
duration = event.get('duration', 0.1)
initial_value = event.get('pin_initial_value', False)
if pin is not None:
self.setup_device(pin, initial_value)
callback = self.get_callback(event['callback'], pin, duration)
description = event.get('description', 'No description provided')
repeat = event.get('repeat', 'none')
self.add_event(event_datetime, callback, description, repeat)
def calculate_initial_datetime(self, event, now):
if event['repeat'] == 'none':
event_time = datetime.fromisoformat(event['event_time'])
if now <= event_time:
return event_time
else:
raise ValueError("The specified event time is in the past.")
else:
time_parts = datetime.strptime(event['event_time'], '%H:%M:%S').time()
return self.adjust_datetime(now, time_parts, event['repeat'])
def adjust_datetime(self, now, time_parts, repeat):
if repeat == 'day':
result = datetime.combine(now.date(), time_parts)
return result + timedelta(days=1) if result <= now else result
elif repeat == 'hour':
result = now.replace(minute=time_parts.minute, second=time_parts.second, microsecond=0)
return result + timedelta(hours=1) if result <= now else result
elif repeat == 'minute':
result = now.replace(second=time_parts.second, microsecond=0)
return result + timedelta(minutes=1) if result <= now else result
def setup_device(self, pin, initial_value):
if pin not in self.devices:
self.devices[pin] = DigitalOutputDevice(pin, initial_value=initial_value)
def get_callback(self, callback_name, pin, duration):
if pin is not None:
if callback_name == "turn_on_starter":
return lambda: self.turn_on_starter(pin)
elif callback_name == "turn_off_starter":
return lambda: self.turn_off_starter(pin)
elif callback_name == "give_pulse":
return lambda: self.give_pulse(pin, duration)
return lambda: self.update_offset()
async def turn_on_starter(self, pin):
device = self.devices[pin]
device.off()
logger.info(f"Starter on pin {pin} activated.")
async def turn_off_starter(self, pin):
device = self.devices[pin]
device.on()
logger.info(f"Starter on pin {pin} deactivated.")
async def give_pulse(self, pin, duration):
device = self.devices[pin]
device.on()
await asyncio.sleep(duration)
device.off()
logger.info(f"Pulse given on pin {pin}.")
def add_event(self, event_time, callback, description, repeat):
self.events.append((event_time, callback, description, repeat))
self.events.sort(key=lambda x: x[0])
async def run(self):
while True:
if not self.events:
await asyncio.sleep(1)
continue
now = datetime.utcnow() + self.offset
next_event_time, next_callback, description, repeat = self.events[0]
sleep_duration = (next_event_time - now).total_seconds() - 0.5 if (next_event_time - now).total_seconds() > 0.5 else 0
if sleep_duration > 0:
await asyncio.sleep(sleep_duration)
now = datetime.utcnow() + self.offset
if now >= next_event_time:
try:
await next_callback()
logger.info(f"Executing event: {description} at time: {now}")
except Exception as e:
logger.error(f"Error executing {description}: {e}")
self.events.pop(0) # Remove the executed event
if repeat != 'none':
if repeat == 'hour':
next_event_time += timedelta(hours=1)
elif repeat == 'day':
next_event_time += timedelta(days=1)
elif repeat == 'minute':
next_event_time += timedelta(minutes=1)
self.add_event(next_event_time, next_callback, description, repeat) # Reschedule if needed
async def main():
logger.info("Started the Scheduler Program")
wait_for_internet_connection()
scheduler = EventScheduler()
await scheduler.update_offset()
scheduler.load_events_from_json('events.json')
asyncio.create_task(scheduler.run())
while True:
await asyncio.sleep(3600)
if __name__ == "__main__":
asyncio.run(main())