Skip to content

Commit

Permalink
use custom executor for awatch and arun_process
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelcolvin committed Apr 6, 2018
1 parent 24485dd commit ec33e8a
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 10 deletions.
27 changes: 18 additions & 9 deletions watchgod/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import signal
import sys
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from multiprocessing import Process
from pathlib import Path
Expand Down Expand Up @@ -59,14 +60,17 @@ class awatch:
3.5 doesn't support yield in coroutines so we need all this fluff. Yawwwwn.
"""
__slots__ = '_loop', '_path', '_watcher_cls', '_debounce', '_min_sleep', '_stop_event', '_start', '_w', 'lock'
__slots__ = (
'_loop', '_path', '_watcher_cls', '_debounce', '_min_sleep', '_stop_event', '_start', '_w', 'lock', '_executor'
)

def __init__(self, path: Union[Path, str], *,
watcher_cls: Type[AllWatcher]=DefaultWatcher,
debounce=400,
min_sleep=100,
stop_event: asyncio.Event=None):
self._loop = asyncio.get_event_loop()
self._executor = ThreadPoolExecutor(max_workers=4)
self._path = path
self._watcher_cls = watcher_cls
self._debounce = debounce
Expand All @@ -82,7 +86,7 @@ def __aiter__(self):

async def __anext__(self):
if not self._w:
self._w = await self._loop.run_in_executor(None, self._watcher_cls, self._path)
self._w = await self.run_in_executor(self._watcher_cls, self._path)
while True:
if self._stop_event and self._stop_event.is_set():
raise StopAsyncIteration()
Expand All @@ -92,14 +96,20 @@ async def __anext__(self):
await asyncio.sleep(sleep_time / 1000)

self._start = unix_ms()
changes = await self._loop.run_in_executor(None, self._w.check)
changes = await self.run_in_executor(self._w.check)
if logger.isEnabledFor(logging.DEBUG):
logger.debug('time=%0.0fms files=%d changes=%d',
unix_ms() - self._start, len(self._w.files), len(changes))

if changes:
return changes

async def run_in_executor(self, func, *args):
return await self._loop.run_in_executor(self._executor, func, *args)

def __del__(self):
self._executor.shutdown()


def _start_process(target, args, kwargs):
process = Process(target=target, args=args, kwargs=kwargs or {})
Expand Down Expand Up @@ -154,15 +164,14 @@ async def arun_process(path: Union[Path, str], target: Callable, *,
"""
Run a function in a subprocess using multiprocessing.Process, restart it whenever files change in path.
"""

loop = asyncio.get_event_loop()
watcher = awatch(path, watcher_cls=watcher_cls, debounce=debounce, min_sleep=min_sleep)
start_process = partial(_start_process, target=target, args=args, kwargs=kwargs)
process = await loop.run_in_executor(None, start_process)
process = await watcher.run_in_executor(start_process)
reloads = 0

async for changes in awatch(path, watcher_cls=watcher_cls, debounce=debounce, min_sleep=min_sleep):
async for changes in watcher:
callback and await callback(changes)
await loop.run_in_executor(None, _stop_process, process)
process = await loop.run_in_executor(None, start_process)
await watcher.run_in_executor(_stop_process, process)
process = await watcher.run_in_executor(start_process)
reloads += 1
return reloads
2 changes: 1 addition & 1 deletion watchgod/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

__all__ = ['VERSION']

VERSION = StrictVersion('0.1.0')
VERSION = StrictVersion('0.1.1')

0 comments on commit ec33e8a

Please sign in to comment.