Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message consumption is totally NOT async #150

Closed
Skorpyon opened this issue Jul 23, 2017 · 1 comment
Closed

Message consumption is totally NOT async #150

Skorpyon opened this issue Jul 23, 2017 · 1 comment

Comments

@Skorpyon
Copy link

Skorpyon commented Jul 23, 2017

Today I was surprized, because aioamqp don't launch message callbacks in async loop.

A little explanations here:

protocol.py

@asyncio.coroutine
def run(self):
    while not self.stop_now.done():
        try:
            yield from self.dispatch_frame()   # <-- HERE WE WAIT NEXT FRAME
        except exceptions.AmqpClosedConnection as exc:
        ...

async def dispatch_frame(self, frame=None):
    """Dispatch the received frame to the corresponding handler"""
    ...
    if frame.channel is not 0:
        channel = self.channels.get(frame.channel)
            if channel is not None:
                await channel.dispatch_frame(frame)  # <-- HERE WE WAIT UNTIL CHANNEL DISPATCH FRAME
            else:
                logger.info("Unknown channel %s", frame.channel)
            return
            ...

channel.py

    @asyncio.coroutine
    def basic_deliver(self, frame):
    ...
    yield from callback(self, body, envelope, properties)  # <-- HERE FINALY WE WAIT UNTIL CALLBACK DONE

So each time, when it got frame, it launch callback and wait until it return result.
If you have difficult logic in callback works with DB and do long work, like me, you will be surprized when all your system stuck in one callback and even dont receive new messages from AMQP provider.

You may easy check it with basic example code from spec, just add asyncio.sleep():

import asyncio

async def callback(channel, body, envelope, properties):
    print(body)
    await asyncio.sleep(30, loop=channel._loop)

await channel.basic_consume(callback, queue_name='hello', no_ack=True)

So, just feed hello exchange with hundred messages.
You will see, how it print first body and go to sllep for 30 seconds. All next messages will be lost or delayed for unpredictable time.

For me I temporary solved it this way:
protocol.py

@asyncio.coroutine
def basic_deliver(self, frame):
...
    self._loop.create_task(callback(self, body, envelope, properties))

It just create new task in loop for callback, dont wait until it finished and unlock loop for new message.

I'm not very professional in async, maybe exists more elegant way.
But right now aioamqp is not usable with this issue.

Really it require deep check for same problems in sending messages and internal logic.
I hope you fix it ASAP, because few my projects totally depend from your perfect library.
Regards.

@RemiCardona
Copy link
Contributor

Hi @Skorpyon,

You've hit one of the major API limitations of aioamqp as it stands today: callbacks.

Here are a few things you should know:

  • handling tasks is no easy business. As per the python documentation, tasks should be properly cared for and cleaned up. As a library, aioamqp has no way of knowing what kind of code is going to run in callbacks. So from our point of view, it's muuuuch safer not to create any tasks for application code.
  • we've been trying to get away from callbacks altogether (alas, aioamqp is not our top priority) and move towards a much more obvious API (see [WIP] First implementation of a new consumer API #118 for more information). Ideally, it would look like this:
consumer = yield from channel.basic_consume(...)
while True:
    body, envelope, props = yield from consumer
    <application logic goes here>

Until we manage to get there, creating tasks is a user responsibility to avoid blocking aioamqp's processing loop. So you'll have to call self._loop.create_task(...) from your own callback. Yes, I understand it doesn't feel right, but it is the best solution.

Hope that helps,

Cheers

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants