Skip to content

Commit

Permalink
Added test of stop of consumsion, fixed documentation and updated exa…
Browse files Browse the repository at this point in the history
…mples
  • Loading branch information
Sergio Medina Toledo committed Nov 14, 2016
1 parent 6ac5bd2 commit cda3b3a
Show file tree
Hide file tree
Showing 19 changed files with 170 additions and 111 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ __pycache__/
*.so

# Distribution / packaging
virtualenv
.Python
env/
venv/
Expand Down Expand Up @@ -34,7 +35,7 @@ pip-delete-this-directory.txt
.cache
nosetests.xml
coverage.xml

cover
# Translations
*.mo

Expand All @@ -55,3 +56,4 @@ docs/_build/

# editor stuffs
*.swp
.idea
1 change: 1 addition & 0 deletions AUTHORS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ AUTHORS are (and/or have been)::
* Igor `mastak`
* Hans Lellelid
* `iceboy-sjtu`
* Sergio Medina Toledo
12 changes: 8 additions & 4 deletions aioamqp/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ def __init__(self, protocol, channel_id):
self._futures = {}
self._ctag_events = {}

def __del__(self):
for queue in self.consumer_queues.values():
asyncio.ensure_future(queue.put(StopIteration()), loop=self._loop)

def _set_waiter(self, rpc_name):
if rpc_name in self._futures:
raise exceptions.SynchronizationError("Waiter already exists")
Expand Down Expand Up @@ -70,7 +74,7 @@ def connection_closed(self, server_code=None, server_reason=None, exception=None
self.protocol.release_channel_id(self.channel_id)

for queue in self.consumer_queues.values():
yield from queue.put(None)
asyncio.ensure_future(queue.put(StopIteration()), loop=self._loop)
self.close_event.set()

@asyncio.coroutine
Expand Down Expand Up @@ -169,7 +173,7 @@ def close_ok(self, frame):
logger.info("Channel closed")

for queue in self.consumer_queues.values():
yield from queue.put(None)
yield from queue.put(StopIteration())

self.protocol.release_channel_id(self.channel_id)

Expand Down Expand Up @@ -670,7 +674,7 @@ def server_basic_cancel(self, frame):
consumer_tag = frame.arguments['consumer_tag']
self.cancelled_consumers.add(consumer_tag)
consumer_queue = self.consumer_queues[consumer_tag]
yield from consumer_queue.put(None)
yield from consumer_queue.put(StopIteration())
logger.info("consume cancelled received")

@asyncio.coroutine
Expand All @@ -695,7 +699,7 @@ def basic_cancel_ok(self, frame):
future.set_result(results)

consumer_queue = self.consumer_queues[results["consumer_tag"]]
yield from consumer_queue.put(None)
yield from consumer_queue.put(StopIteration())

logger.debug("Cancel ok")

Expand Down
36 changes: 28 additions & 8 deletions aioamqp/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,50 @@

import sys

import logging

PY35 = sys.version_info >= (3, 5)
logger = logging.getLogger(__name__)


class ConsumerStoped(Exception):
pass


class Consumer:
def __init__(self, queue: asyncio.Queue, consumer_tag):
self.queue = queue
self._queue = queue
self.tag = consumer_tag
self.message = None
self._stoped = False

if PY35:
async def __aiter__(self):
return self

async def __anext__(self):
return self.fetch_message()
if not self._stoped:
self.message = await self._queue.get()
if isinstance(self.message, StopIteration):
self._stoped = True
raise StopAsyncIteration()
else:
return self.message
raise StopAsyncIteration()

@asyncio.coroutine
def fetch_message(self):

self.message = yield from self.queue.get()
if self.message:
return self.message
if not self._stoped:
self.message = yield from self._queue.get()
if isinstance(self.message, StopIteration):
self._stoped = True
return False
else:
return True
else:
raise StopIteration()
return False

def get_message(self):
return self.message
if self._stoped:
raise ConsumerStoped()
return self.message
11 changes: 4 additions & 7 deletions aioamqp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@
from . import version
from .compat import ensure_future


logger = logging.getLogger(__name__)


class _StreamWriter(asyncio.StreamWriter):

def write(self, data):
ret = super().write(data)
self._protocol._heartbeat_timer_send_reset()
Expand Down Expand Up @@ -135,7 +133,7 @@ def close_ok(self, frame):

@asyncio.coroutine
def start_connection(self, host, port, login, password, virtualhost, ssl=False,
login_method='AMQPLAIN', insist=False):
login_method='AMQPLAIN', insist=False):
"""Initiate a connection at the protocol level
We send `PROTOCOL_HEADER'
"""
Expand Down Expand Up @@ -194,8 +192,8 @@ def start_connection(self, host, port, login, password, virtualhost, ssl=False,
frame = yield from self.get_frame()
yield from self.dispatch_frame(frame)
if (frame.frame_type == amqp_constants.TYPE_METHOD and
frame.class_id == amqp_constants.CLASS_CONNECTION and
frame.method_id == amqp_constants.CONNECTION_CLOSE):
frame.class_id == amqp_constants.CLASS_CONNECTION and
frame.method_id == amqp_constants.CONNECTION_CLOSE):
raise exceptions.AmqpClosedConnection()

# for now, we read server's responses asynchronously
Expand Down Expand Up @@ -376,10 +374,9 @@ def server_close(self, frame):
method_id = response.read_short()
self.stop()
logger.warning("Server closed connection: %s, code=%s, class_id=%s, method_id=%s",
reply_text, reply_code, class_id, method_id)
reply_text, reply_code, class_id, method_id)
self._close_channels(reply_code, reply_text)


@asyncio.coroutine
def tune(self, frame):
decoder = amqp_frame.AmqpDecoder(frame.payload)
Expand Down
34 changes: 32 additions & 2 deletions aioamqp/tests/test_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@
import os
import unittest

import asyncio

from . import testcase
from . import testing
from .. import exceptions

IMPLEMENT_CHANNEL_FLOW = os.environ.get('IMPLEMENT_CHANNEL_FLOW', False)

class ChannelTestCase(testcase.RabbitTestCase, unittest.TestCase):

class ChannelTestCase(testcase.RabbitTestCase, unittest.TestCase):
_multiprocess_can_split_ = True

@testing.coroutine
Expand Down Expand Up @@ -83,9 +85,37 @@ def test_channel_active_inactive_flow(self):
result = yield from channel.flow(active=False)
self.assertFalse(result['active'])

@testing.coroutine
def test_channel_cancel_stops_consumer(self):
# declare
yield from self.channel.queue_declare("q", exclusive=True, no_wait=False)
yield from self.channel.exchange_declare("e", "fanout")
yield from self.channel.queue_bind("q", "e", routing_key='')

class ChannelIdTestCase(testcase.RabbitTestCase, unittest.TestCase):
# get a different channel
channel = yield from self.create_channel()

# publish
yield from channel.publish("coucou", "e", routing_key='', )

consumer_stoped = asyncio.Future()

@asyncio.coroutine
def consumer_task(consumer):
while (yield from consumer.fetch_message()):
channel, body, envelope, properties = consumer.get_message()

consumer_stoped.set_result(True)

consumer = yield from channel.basic_consume(queue_name="q")
asyncio.get_event_loop().create_task(consumer_task(consumer))

yield from channel.basic_cancel(consumer.tag)

assert (yield from consumer_stoped)


class ChannelIdTestCase(testcase.RabbitTestCase, unittest.TestCase):
@testing.coroutine
def test_channel_id_release_close(self):
channels_count_start = self.amqp.channels_ids_count
Expand Down
2 changes: 1 addition & 1 deletion aioamqp/version.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
__version__ = '0.8.2'
__version__ = '0.9.0'
__packagename__ = 'aioamqp'
22 changes: 12 additions & 10 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ API
Basics
------

There are two principal objects when using aioamqp:
There are three principal objects when using aioamqp:

* The protocol object, used to begin a connection to aioamqp,
* The channel object, used when creating a new channel to effectively use an AMQP channel.
* The consumer object, used for consuming messages from a queue.


Starting a connection
Expand Down Expand Up @@ -141,20 +142,21 @@ When consuming message, you connect to the same queue you previously created::
import asyncio
import aioamqp

@asyncio.coroutine
def callback(body, envelope, properties):
print(body)

channel = yield from protocol.channel()
yield from channel.basic_consume(callback, queue_name="my_queue")
consumer = yield from channel.basic_consume(queue_name="my_queue")

while (yield from consumer.fetch_message()):
channel, body, envelope, properties = consumer.get_message()
print(body)

The ``basic_consume`` method tells the server to send us the messages, and will call ``callback`` with amqp response arguments.
The ``basic_consume`` method tells the server to send us the messages, and will add the amqp response arguments to the consumer queue
from where you can get them and process using an async iterator or a while using the ``fetch_message`` and ``get_message`` methods of the consumer class.

The ``consumer_tag`` is the id of your consumer, and the ``delivery_tag`` is the tag used if you want to acknowledge the message.
The ``consumer.tag`` or ``envelope.consumer_tag`` is the id of your consumer, and the ``envelope.delivery_tag`` is the tag used if you want to acknowledge the message.

In the callback:
In the while:

* the first ``body`` parameter is the message
* the ``body`` returned by ``get_message`` is the message
* the ``envelope`` is an instance of envelope.Envelope class which encapsulate a group of amqp parameter such as::

consumer_tag
Expand Down
5 changes: 5 additions & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
Changelog
=========

Aioamqp 0.9.0
-------------

* Changed consumer API from callbacks to coroutines solving the problem of publish inside of a consumer callback

Aioamqp 0.8.2
-------------

Expand Down
16 changes: 12 additions & 4 deletions docs/examples/hello_world.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,21 @@ We have to ensure the queue is created. Queue declaration is indempotant.
yield from channel.queue_declare(queue_name='hello')
To consume a message, the library calls a callback (which **MUST** be a coroutine):
To consume a message is used the Consumer object using an async iter if is python 3.5 or while with ``fetch_message`` and ``get_message`` if is python < 3.5:

.. code-block:: python
@asyncio.coroutine
def callback(channel, body, envelope, properties):
consumer = yield from channel.basic_consume(queue_name='hello', no_ack=True)
while (yield from consumer.fetch_message()):
channel, body, envelope, properties = consumer.get_message()
print(body)
yield from channel.basic_consume(callback, queue_name='hello', no_ack=True)
For python 3.5 :

.. code-block:: python
consumer = await channel.basic_consume(queue_name='hello', no_ack=True)
async for channel, body, envelope, properties in consumer:
print(body)
6 changes: 3 additions & 3 deletions docs/examples/rpc.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ Note: the client use a `waiter` (an asyncio.Event) which will be set when receiv
Server
------

When unqueing a message, the server will publish a response directly in the callback. The `correlation_id` is used to let the client know it's a response from this request.
When unqueing a message, the server will enqueue a response directly in the consumer loop. The `correlation_id` is used to let the client know it's a response from this request.

.. code-block:: python
@asyncio.coroutine
def on_request(channel, body, envelope, properties):
while (yield from consumer.fetch_message()):
channel, body, envelope, properties = consumer.get_message()
n = int(body)
print(" [.] fib(%s)" % n)
Expand Down
8 changes: 4 additions & 4 deletions docs/examples/work_queue.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ Then, the worker configure the `QOS`: it specifies how the worker unqueues messa
yield from channel.basic_qos(prefetch_count=1, prefetch_size=0, connection_global=False)
Finaly we have to create a callback that will `ack` the message to mark it as `processed`.
Note: the code in the callback calls `asyncio.sleep` to simulate an asyncio compatible task that takes time.
Finaly we have to create a messages processor that will `ack` the message to mark it as `processed`.
Note: the code in the while calls `asyncio.sleep` to simulate an asyncio compatible task that takes time.
You probably want to block the eventloop to simulate a CPU intensive task using `time.sleep`.

.. code-block:: python
@asyncio.coroutine
def callback(channel, body, envelope, properties):
while (yield from consumer.fetch_message()):
channel, body, envelope, properties = consumer.get_message()
print(" [x] Received %r" % body)
yield from asyncio.sleep(body.count(b'.'))
print(" [x] Done")
Expand Down
9 changes: 4 additions & 5 deletions examples/receive.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,18 @@
import aioamqp


@asyncio.coroutine
def callback(channel, body, envelope, properties):
print(" [x] Received %r" % body)

@asyncio.coroutine
def receive():
transport, protocol = yield from aioamqp.connect()
channel = yield from protocol.channel()

yield from channel.queue_declare(queue_name='hello')

yield from channel.basic_consume(callback, queue_name='hello')
consumer = yield from channel.basic_consume(queue_name='hello')

while (yield from consumer.fetch_message()):
channel, body, envelope, properties = consumer.get_message()
print(" [x] Received %r" % body)

event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(receive())
Expand Down
Loading

0 comments on commit cda3b3a

Please sign in to comment.