Skip to content

Commit

Permalink
Merge pull request #1 from axiom-data-science/threading
Browse files Browse the repository at this point in the history
Threading and cleaup
  • Loading branch information
kwilcox authored Sep 17, 2018
2 parents 3dd6e1d + 6b55a08 commit ff84cc1
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 59 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
*.pyc
.cache
.pytest_cache
10 changes: 2 additions & 8 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ services:

before_install:
- docker pull landoop/fast-data-dev
- docker run -d --net=host -p 50001:50001 -p 50002:50002 -e BROKER_PORT=50001 -e REGISTRY_PORT=50002 -e RUNTESTS=0 -e DISABLE=elastic,hbase -e DISABLE_JMX=1 -e RUNTESTS=0 -e FORWARDLOGS=0 -e SAMPLEDATA=0 --name easyavrotesting landoop/fast-data-dev
- docker run -d --net=host -p 4001:4001 -p 4002:4002 -e BROKER_PORT=4001 -e REGISTRY_PORT=4002 -e RUNTESTS=0 -e DISABLE=elastic,hbase -e DISABLE_JMX=1 -e RUNTESTS=0 -e FORWARDLOGS=0 -e SAMPLEDATA=0 --name easyavrotesting landoop/fast-data-dev

install:
# Install miniconda
Expand All @@ -36,15 +36,9 @@ install:
# Create our environment
# ------------------------------------
- ENV_NAME='test-environment'
- conda create --quiet -n $ENV_NAME python=$TRAVIS_PYTHON_VERSION
- conda env create -n $ENV_NAME environment.yml
- source activate $ENV_NAME

# Install testing requirements
# ------------------------------------
- conda install --file requirements.txt
- conda install --file requirements-test.txt
- conda list --export

- python setup.py sdist && version=$(python setup.py --version) && pushd dist && pip install --no-deps easyavro-${version}.tar.gz && popd

script:
Expand Down
21 changes: 5 additions & 16 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM debian:8.6
FROM debian:9.5
MAINTAINER Kyle Wilcox <kyle@axiomdatascience.com>
ENV DEBIAN_FRONTEND noninteractive
ENV LANG C.UTF-8
Expand All @@ -18,9 +18,11 @@ RUN apt-get update && apt-get install -y \
apt-get clean && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* /var/cache/oracle-jdk8-installer

# Copy over environment definition
COPY environment.yml /tmp/environment.yml

# Setup CONDA (https://hub.docker.com/r/continuumio/miniconda3/~/dockerfile/)
ENV MINICONDA_VERSION latest
ARG PYTHON_VERSION=3.6
RUN echo 'export PATH=/opt/conda/bin:$PATH' > /etc/profile.d/conda.sh && \
curl -k -o /miniconda.sh https://repo.continuum.io/miniconda/Miniconda3-$MINICONDA_VERSION-Linux-x86_64.sh && \
/bin/bash /miniconda.sh -b -p /opt/conda && \
Expand All @@ -30,24 +32,11 @@ RUN echo 'export PATH=/opt/conda/bin:$PATH' > /etc/profile.d/conda.sh && \
--set changeps1 no \
--set show_channel_urls True \
&& \
/opt/conda/bin/conda config \
--add channels conda-forge \
--add channels axiom-data-science \
&& \
/opt/conda/bin/conda install python=$PYTHON_VERSION && \
/opt/conda/bin/conda env update -n root --file /tmp/environment.yml && \
/opt/conda/bin/conda clean -a -y

ENV PATH /opt/conda/bin:$PATH

# Install python requirements
COPY requirements*.txt /tmp/

RUN conda install -y \
--file /tmp/requirements.txt \
--file /tmp/requirements-test.txt \
&& \
conda clean -a -y

# Copy packrat contents and install
ENV CODE_HOME /easyavro
WORKDIR $CODE_HOME
Expand Down
47 changes: 29 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# EasyAvro [![Build Status](https://travis-ci.org/axiom-data-science/easyavro.svg?branch=master)](https://travis-ci.org/axiom-data-science/easyavro)

A python helper for producing and consuming `avro` schema'd Kafka topics. Simplicity and the ability to execute a function for each message consumed is the top priority. This is not designed for high throughput.
A python helper for producing and consuming `avro` schema'd Kafka topics. Simplicity and the ability to execute a function for each message consumed is the top priority.


## Installation
Expand All @@ -20,8 +20,8 @@ The schemas `my-topic-key` and `my-topic-value` must be available in the schema
from easyavro import EasyAvroProducer

bp = EasyAvroProducer(
schema_registry_url='http://localhost:50002',
kafka_brokers=['localhost:50001'],
schema_registry_url='http://localhost:4002',
kafka_brokers=['localhost:4001'],
kafka_topic='my-topic'
)

Expand All @@ -39,8 +39,8 @@ Or pass in your own schemas.
from easyavro import EasyAvroProducer

bp = EasyAvroProducer(
schema_registry_url='http://localhost:50002',
kafka_brokers=['localhost:50001'],
schema_registry_url='http://localhost:4002',
kafka_brokers=['localhost:4001'],
kafka_topic='my-topic',
value_schema=SomeAvroSchemaObject,
key_schema=SomeAvroSchemaObject,
Expand All @@ -61,8 +61,8 @@ constructor and use `None` as the value of the key.
from easyavro import EasyAvroProducer

bp = EasyAvroProducer(
schema_registry_url='http://localhost:50002',
kafka_brokers=['localhost:50001'],
schema_registry_url='http://localhost:4002',
kafka_brokers=['localhost:4001'],
kafka_topic='my-topic',
value_schema=SomeAvroSchemaObject,
key_schema='not_used_because_no_keys_in_the_records',
Expand All @@ -81,15 +81,25 @@ bp.produce(records)

The defaults are sane. They will pull offsets from the broker and set the topic offset to `largest`. This will pull all new messages that haven't been acknowledged by a consumer with the same `consumer_group` (which translates to the `librdkafka` `group.id` setting).


##### Parameters

* `on_recieve` (`Callable[[str, str], None]`) - Function that is executed (in a new thread) for each retrieved message.
* `on_recieve_timeout` (`int`) - Seconds the `Consumer` will wait for the calls to `on_recieve` to exit before moving on. By default it will wait forever. You should set this to a reasonable maximum number seconds your `on_recieve` callback will take to prevent dead-lock when the `Consumer` is exiting and trying to cleanup its spawned threads.
* `timeout` (`int`) - The `timeout` parameter to the `poll` function in `confluent-kafka`. Controls how long `poll` will block while waiting for messages.
* `loop` (`bool`) - If the `Consumer` will keep looping for message or break after retrieving the first chunk message. This is useful when testing.
* `initial_wait` (`int`)- Seconds the Consumer should wait before starting to consume. This is useful when testing.
* `cleanup_every` (`int`) - Try to cleanup spawned thread after this many messages.

```python
from easyavro import EasyAvroConsumer

def on_recieve(key: str, value: str) -> None:
print("Got Key:{}\nValue:{}\n".format(key, value))

bc = EasyAvroConsumer(
schema_registry_url='http://localhost:50002',
kafka_brokers=['localhost:50001'],
schema_registry_url='http://localhost:4002',
kafka_brokers=['localhost:4001'],
consumer_group='easyavro.testing',
kafka_topic='my-topic'
)
Expand All @@ -105,8 +115,8 @@ def on_recieve(key: str, value: str) -> None:
print("Got Key:{}\nValue:{}\n".format(key, value))

bc = EasyAvroConsumer(
schema_registry_url='http://localhost:50002',
kafka_brokers=['localhost:50001'],
schema_registry_url='http://localhost:4002',
kafka_brokers=['localhost:4001'],
consumer_group='easyavro.testing',
kafka_topic='my-topic',
topic_config={'enable.auto.commit': False, 'offset.store.method': 'file'}
Expand All @@ -123,8 +133,8 @@ def on_recieve(key: str, value: str) -> None:
print("Got Key:{}\nValue:{}\n".format(key, value))

bc = EasyAvroConsumer(
schema_registry_url='http://localhost:50002',
kafka_brokers=['localhost:50001'],
schema_registry_url='http://localhost:4002',
kafka_brokers=['localhost:4001'],
consumer_group='easyavro.testing',
kafka_topic='my-topic',
offset='earliest'
Expand All @@ -141,11 +151,11 @@ There are only integration tests.
```
docker run -d --net=host \
-e ZK_PORT=50000 \
-e BROKER_PORT=50001 \
-e REGISTRY_PORT=50002 \
-e REST_PORT=50003 \
-e CONNECT_PORT=50004 \
-e WEB_PORT=50005 \
-e BROKER_PORT=4001 \
-e REGISTRY_PORT=4002 \
-e REST_PORT=4003 \
-e CONNECT_PORT=4004 \
-e WEB_PORT=4005 \
-e RUNTESTS=0 \
-e DISABLE=elastic,hbase \
-e DISABLE_JMX=1 \
Expand All @@ -166,5 +176,6 @@ docker run --net="host" easyavro
#### No Docker

```
conda env create environment.yml
py.test -s -rxs -v
```
7 changes: 7 additions & 0 deletions easyavro/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,10 @@
from .producer import EasyAvroProducer, schema

__version__ = "2.1.0"


__all__ = [
'EasyAvroConsumer',
'EasyAvroProducer',
'schema'
]
51 changes: 48 additions & 3 deletions easyavro/consumer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#!python
# coding=utf-8
import time
import logging
import threading
from typing import List, Callable
from datetime import datetime, timedelta

from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
Expand Down Expand Up @@ -42,13 +45,28 @@ def __init__(self,

def consume(self,
on_recieve: Callable[[str, str], None],
on_recieve_timeout: int = None,
timeout: int = None,
loop: bool = True) -> None:
loop: bool = True,
initial_wait: int = None,
cleanup_every: int = 1000
) -> None:

if on_recieve is None:
def on_recieve(k, v):
L.info("Recieved message:\nKey: {}\nValue: {}".format(k, v))

if initial_wait is not None:
initial_wait = int(initial_wait)
loops = 0
started = datetime.now()
start_delta = timedelta(seconds=initial_wait)
while (datetime.now() - started) < start_delta:
L.info("Starting in {} seconds".format(initial_wait - loops))
time.sleep(1)
loops += 1

callback_threads = []
self.subscribe([self.kafka_topic])
L.info("Starting consumer...")
try:
Expand All @@ -65,12 +83,39 @@ def on_recieve(k, v):
break
else:
L.error(msg.error())
# Call the function we passed in
on_recieve(msg.key(), msg.value())
else:
# Call the function we passed in if we consumed a valid message
t = threading.Thread(
name='EasyAvro-on_recieve',
target=on_recieve,
args=(msg.key(), msg.value())
)
t.start()
callback_threads.append(t)
except SerializerError as e:
L.warning('Message deserialization failed for "{}: {}"'.format(msg, e))
finally:
# Periodically clean up threads to prevent the list of callback_threads
# from becoming absolutely huge on long running Consumers
if len(callback_threads) >= cleanup_every:
for x in callback_threads:
x.join(0)
callback_threads = [
x for x in callback_threads if x.is_alive()
]
cleaned = cleanup_every - len(callback_threads)
L.info('Cleaned up {} completed threads'.format(cleaned))

except KeyboardInterrupt:
L.info("Aborted via keyboard")
finally:
L.info("Waiting for on_recieve callbacks to finish...")
# Block for `on_recieve_timeout` for each thread that isn't finished
[ ct.join(timeout=on_recieve_timeout) for ct in callback_threads ]
# Now see if any threads are still alive (didn't exit after `on_recieve_timeout`)
alive_threads = [ at for at in callback_threads if at.is_alive() ]
for at in alive_threads:
L.warning('{0.name}-{0.ident} never exited and is still running'.format(at))

L.debug("Closing consumer...")
self.close()
Expand Down
Loading

0 comments on commit ff84cc1

Please sign in to comment.