Skip to content
/ aplex Public

📢 Asyncio + multiprocessing/threading. As easy as concurrent.futures!

License

Notifications You must be signed in to change notification settings

lunluen/aplex

Repository files navigation

Aplex Quickstart

Build Status codecov platform supported pythons package version license maintenance

Translations: 简体中文 | 繁體中文

"Aplex", short for "asynchronous pool executor", is a Python library for combining asyncio with multiprocessing and threading.

  • Aplex helps you run coroutines and functions in other processes or threads with asyncio concurrently and in parallel (if with processes).
  • Aplex provides a usage like that of standard library concurrent.futures, which is familiar to you and intuitive.
  • Aplex lets you do load balancing in a simple way if you need.

Installation

For general users, use the package manager pip to install aplex.

pip install aplex

For contributors, install with pipenv:

git clone https://github.com/lunluen/aplex.git
cd aplex
pipenv install --dev

or with setuptools:

git clone https://github.com/lunluen/aplex.git
cd aplex
python setup.py develop

Usage

Definition to know:

A work is a callable you want to run with asyncio and multiprocessing or threading. It can be a coroutine function or just a function.

In below case, the work is the coroutine function demo.

Submit

You can submit your work like:

import aiohttp
from aplex import ProcessAsyncPoolExecutor

async def demo(url):
    async with aiohttp.request('GET', url) as response:
        return response.status

if __name__ == '__main__':
    pool = ProcessAsyncPoolExecutor(pool_size=8)
    future = pool.submit(demo, 'http://httpbin.org')
    print('Status: %d.' % future.result())

Note: If you are running python on windows, if __name__ == '__main__': is necessary. That's the design of multiprocessing.

Result:

Status: 200

Map

For multiple works, try map:

iterable = ('http://httpbin.org' for __ in range(10))
for status in pool.map(demo, iterable, timeout=10):
    print('Status: %d.' % status)

Awaiting results

Aplex allows one to await results with the event loop that already exists. It's quite simple.

Just set keyword argument awaitable to True!

For example:

pool = ProcessAsyncPoolExecutor(awaitable=True)

Then

future = pool.submit(demo, 'http://httpbin.org')
status = await future

How about map?

async for status in pool.map(demo, iterable, timeout=10):
    print('Status: %d.' % status)

Load balancing

In aplex, each worker running your works is the process or thread on your computer. That is, they have the same capability computing. But, your works might have different workloads. Then you need a load balancer.

Aplex provides some useful load balancers. They are RoundRobin, Random, and Average. The default is RoundRobin.

Simply set what you want in the keyword argument of contruction:

from aplex import ProcessAsyncPoolExecutor
from aplex.load_balancers import Average

if __name__ == '__main__':
    pool = ProcessAsyncPoolExecutor(load_balancer=Average)

Done. So easy. 💯

You can also customize one:

from aplex import LoadBalancer

class MyAwesomeLoadBalancer(LoadBalancer):
    def __init__(*args, **kwargs):
        super().__init__(*args, **kwargs)  # Don't forget this.
        awesome_attribute = 'Hello Aplex!'

    def get_proper_worker(self):
        the_poor_guy = self.workers[0]
        return the_poor_guy

See details of how to implement a load balancer at: LoadBalancer | API Reference

Worker loop factory

By the way, if you think the build-in asyncio loop is too slow:

import uvloop
from aplex import ProcessAsyncPoolExecutor

if __name__ == '__main__':
    pool = ProcessAsyncPoolExecutor(worker_loop_factory=uvloop.Loop)

Graceful Exit

Taking Python3.6 for example, a graceful exit without aplex would be something like this:

try:
    loop.run_forever()
finally:
    try:
        tasks = asyncio.Task.all_tasks()
        if tasks:
            for task in tasks:
                task.cancel()
            gather = asyncio.gather(*tasks)
            loop.run_until_complete(gather)
        loop.run_until_complete(loop.shutdown_asyncgens())
    finally:
        loop.close()

...It's definitely a joke.

Here, just treat pool as a context manager:

with ProcessAsyncPoolExecutor() as pool:
    do_something()

or remember to call pool.shutdown(). These help you deal with that joke.

...

What? You forget to call pool.shutdown()?!

Ok, fine. It will shut down automatically when the program exits or it gets garbage-collected.

Like this?

Scroll up and click Watch - Releases only and Star as a thumbs up! 👍

Any feedback?

Feel free to open a issue (just don't abuse it).

Anything about aplex is welcome, such like bugs, system design, variable naming, even English grammer of docstrings!

How to contribute

Contribution are welcome.

Asking and advising are also kinds of contribution.

Please see CONTRIBUTING.md

License

MIT