"Aplex", short for "asynchronous pool executor", is a Python library for combining asyncio with multiprocessing and threading.
- Aplex helps you run coroutines and functions in other processes or threads with asyncio concurrently and in parallel (if with processes).
- Aplex provides a usage like that of standard library
concurrent.futures
, which is familiar to you and intuitive. - Aplex lets you do load balancing in a simple way if you need.
For general users, use the package manager pip to install aplex.
pip install aplex
For contributors, install with pipenv:
git clone https://github.com/lunluen/aplex.git
cd aplex
pipenv install --dev
or with setuptools:
git clone https://github.com/lunluen/aplex.git
cd aplex
python setup.py develop
Definition to know:
A
work
is acallable
you want to run with asyncio and multiprocessing or threading. It can be a coroutine function or just a function.
In below case, the work
is the coroutine function demo
.
You can submit your work like:
import aiohttp
from aplex import ProcessAsyncPoolExecutor
async def demo(url):
async with aiohttp.request('GET', url) as response:
return response.status
if __name__ == '__main__':
pool = ProcessAsyncPoolExecutor(pool_size=8)
future = pool.submit(demo, 'http://httpbin.org')
print('Status: %d.' % future.result())
Note: If you are running python on windows, if __name__ == '__main__':
is necessary. That's the design of multiprocessing.
Result:
Status: 200
For multiple works, try map
:
iterable = ('http://httpbin.org' for __ in range(10))
for status in pool.map(demo, iterable, timeout=10):
print('Status: %d.' % status)
Aplex allows one to await
results with the event loop that already exists.
It's quite simple.
Just set keyword argument awaitable
to True
!
For example:
pool = ProcessAsyncPoolExecutor(awaitable=True)
Then
future = pool.submit(demo, 'http://httpbin.org')
status = await future
How about map
?
async for status in pool.map(demo, iterable, timeout=10):
print('Status: %d.' % status)
In aplex, each worker running your works is the process or thread on your computer. That is, they have the same capability computing. But, your works might have different workloads. Then you need a load balancer.
Aplex provides some useful load balancers. They are RoundRobin
, Random
, and Average
. The default is RoundRobin
.
Simply set what you want in the keyword argument of contruction:
from aplex import ProcessAsyncPoolExecutor
from aplex.load_balancers import Average
if __name__ == '__main__':
pool = ProcessAsyncPoolExecutor(load_balancer=Average)
Done. So easy. 💯
You can also customize one:
from aplex import LoadBalancer
class MyAwesomeLoadBalancer(LoadBalancer):
def __init__(*args, **kwargs):
super().__init__(*args, **kwargs) # Don't forget this.
awesome_attribute = 'Hello Aplex!'
def get_proper_worker(self):
the_poor_guy = self.workers[0]
return the_poor_guy
See details of how to implement a load balancer at: LoadBalancer | API Reference
By the way, if you think the build-in asyncio loop is too slow:
import uvloop
from aplex import ProcessAsyncPoolExecutor
if __name__ == '__main__':
pool = ProcessAsyncPoolExecutor(worker_loop_factory=uvloop.Loop)
Taking Python3.6 for example, a graceful exit without aplex would be something like this:
try:
loop.run_forever()
finally:
try:
tasks = asyncio.Task.all_tasks()
if tasks:
for task in tasks:
task.cancel()
gather = asyncio.gather(*tasks)
loop.run_until_complete(gather)
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
loop.close()
...It's definitely a joke.
Here, just treat pool as a context manager:
with ProcessAsyncPoolExecutor() as pool:
do_something()
or remember to call pool.shutdown()
.
These help you deal with that joke.
...
What? You forget to call pool.shutdown()
?!
Ok, fine. It will shut down automatically when the program exits or it gets garbage-collected.
Scroll up and click Watch - Releases only
and Star
as a thumbs up! 👍
Feel free to open a issue (just don't abuse it).
Anything about aplex is welcome, such like bugs, system design, variable naming, even English grammer of docstrings!
Contribution are welcome.
Asking and advising are also kinds of contribution.
Please see CONTRIBUTING.md