-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathasynco.py
57 lines (41 loc) · 1.35 KB
/
asynco.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import heapq
import time
from collections import deque
# Problem: How to achieve concurrency without threads ?
# Issue: Figure out how to switch between tasks.
class Scheduler:
def __init__(self):
self.ready = deque() # Functions ready to execute
self.sleeping = []
self.sequence = 0
def call_soon(self, func):
self.ready.append(func)
def call_later(self, delay, func):
self.sequence += 1
deadline = time.time() + delay # Expiration time
heapq.heappush(self.sleeping, (deadline, self.sequence, func))
def run(self):
while self.ready or self.sleeping:
if not self.ready:
deadline, _, func = heapq.heappop(self.sleeping)
delta = deadline - time.time()
if delta > 0:
time.sleep(delta)
self.ready.append(func)
while self.ready:
func = self.ready.popleft()
func()
sched = Scheduler()
def countdown(n):
if n > 0:
print("Down " + str(n))
sched.call_later(4, lambda: countdown(n - 1))
def countup(stop):
def _run(x):
if x < stop:
print("Up " + str(x))
sched.call_later(1, lambda: _run(x + 1))
_run(0)
sched.call_soon(lambda: countdown(5))
sched.call_soon(lambda: countup(20))
sched.run()