Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Redis Streams option for job delivery #451

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion arq/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from .typing import WorkerSettingsType

burst_help = 'Batch mode: exit once no jobs are found in any queue.'
stream_help = 'Stream mode: use redis streams for job delivery. Does not support batch mode.'
health_check_help = 'Health Check: run a health check and exit.'
watch_help = 'Watch a directory and reload the worker upon changes.'
verbose_help = 'Enable verbose output.'
Expand All @@ -26,11 +27,14 @@
@click.version_option(VERSION, '-V', '--version', prog_name='arq')
@click.argument('worker-settings', type=str, required=True)
@click.option('--burst/--no-burst', default=None, help=burst_help)
@click.option('--stream/--no-stream', default=None, help=stream_help)
@click.option('--check', is_flag=True, help=health_check_help)
@click.option('--watch', type=click.Path(exists=True, dir_okay=True, file_okay=False), help=watch_help)
@click.option('-v', '--verbose', is_flag=True, help=verbose_help)
@click.option('--custom-log-dict', type=str, help=logdict_help)
def cli(*, worker_settings: str, burst: bool, check: bool, watch: str, verbose: bool, custom_log_dict: str) -> None:
def cli(
*, worker_settings: str, burst: bool, stream: bool, check: bool, watch: str, verbose: bool, custom_log_dict: str
) -> None:
"""
Job queues in python with asyncio and redis.

Expand All @@ -48,6 +52,8 @@ def cli(*, worker_settings: str, burst: bool, check: bool, watch: str, verbose:
exit(check_health(worker_settings_))
else:
kwargs = {} if burst is None else {'burst': burst}
if stream:
kwargs['stream'] = stream
if watch:
asyncio.run(watch_reload(watch, worker_settings_))
else:
Expand Down
6 changes: 5 additions & 1 deletion arq/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from redis.asyncio.sentinel import Sentinel
from redis.exceptions import RedisError, WatchError

from .constants import default_queue_name, expires_extra_ms, job_key_prefix, result_key_prefix
from .constants import default_queue_name, expires_extra_ms, job_key_prefix, result_key_prefix, stream_prefix
from .jobs import Deserializer, Job, JobDef, JobResult, Serializer, deserialize_job, serialize_job
from .utils import timestamp_ms, to_ms, to_unix_ms

Expand Down Expand Up @@ -120,6 +120,7 @@ async def enqueue_job(
self,
function: str,
*args: Any,
_use_stream: bool = False,
_job_id: Optional[str] = None,
_queue_name: Optional[str] = None,
_defer_until: Optional[datetime] = None,
Expand All @@ -133,6 +134,7 @@ async def enqueue_job(

:param function: Name of the function to call
:param args: args to pass to the function
:param _use_stream: queue the job through redis streams. Stream mode must be enabled in worker.
:param _job_id: ID of the job, can be used to enforce job uniqueness
:param _queue_name: queue of the job, can be used to create job in different queue
:param _defer_until: datetime at which to run the job
Expand Down Expand Up @@ -171,6 +173,8 @@ async def enqueue_job(

job = serialize_job(function, args, kwargs, _job_try, enqueue_time_ms, serializer=self.job_serializer)
pipe.multi()
if _use_stream:
pipe.xadd(stream_prefix + _queue_name, {job_key_prefix: job})
pipe.psetex(job_key, expires_ms, job)
pipe.zadd(_queue_name, {job_id: score})
try:
Expand Down
3 changes: 3 additions & 0 deletions arq/constants.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
default_queue_name = 'arq:queue'
default_worker_name = 'arq:worker'
default_worker_group = 'arq:workers'
stream_prefix = 'arq:stream:'
job_key_prefix = 'arq:job:'
in_progress_key_prefix = 'arq:in-progress:'
result_key_prefix = 'arq:result:'
Expand Down
59 changes: 49 additions & 10 deletions arq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from functools import partial
from signal import Signals
from time import time
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Sequence, Set, Tuple, Union, cast
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Literal, Optional, Sequence, Set, Tuple, Union, cast

from redis.exceptions import ResponseError, WatchError

Expand All @@ -20,13 +20,16 @@
abort_job_max_age,
abort_jobs_ss,
default_queue_name,
default_worker_group,
default_worker_name,
expires_extra_ms,
health_check_key_suffix,
in_progress_key_prefix,
job_key_prefix,
keep_cronjob_progress,
result_key_prefix,
retry_key_prefix,
stream_prefix,
)
from .utils import (
args_to_string,
Expand Down Expand Up @@ -144,10 +147,13 @@ class Worker:
:param functions: list of functions to register, can either be raw coroutine functions or the
result of :func:`arq.worker.func`.
:param queue_name: queue name to get jobs from
:param worker_name: unique name to identify this worker
:param worker_group: worker group that this worker belongs to
:param cron_jobs: list of cron jobs to run, use :func:`arq.cron.cron` to create them
:param redis_settings: settings for creating a redis connection
:param redis_pool: existing redis pool, generally None
:param burst: whether to stop the worker once all jobs have been run
:param stream: whether to constantly listen for new jobs from a redis stream
:param on_startup: coroutine function to run at startup
:param on_shutdown: coroutine function to run at shutdown
:param on_job_start: coroutine function to run on job start
Expand Down Expand Up @@ -188,10 +194,13 @@ def __init__(
functions: Sequence[Union[Function, 'WorkerCoroutine']] = (),
*,
queue_name: Optional[str] = default_queue_name,
worker_name: Optional[str] = None,
worker_group: Optional[str] = None,
cron_jobs: Optional[Sequence[CronJob]] = None,
redis_settings: Optional[RedisSettings] = None,
redis_pool: Optional[ArqRedis] = None,
burst: bool = False,
stream: bool = False,
on_startup: Optional['StartupShutdown'] = None,
on_shutdown: Optional['StartupShutdown'] = None,
on_job_start: Optional['StartupShutdown'] = None,
Expand Down Expand Up @@ -234,6 +243,10 @@ def __init__(
if len(self.functions) == 0:
raise RuntimeError('at least one function or cron_job must be registered')
self.burst = burst
self.stream = stream
if stream is True:
self.worker_name = worker_name if worker_name is not None else default_worker_name
self.worker_group = worker_group if worker_group is not None else default_worker_group
self.on_startup = on_startup
self.on_shutdown = on_shutdown
self.on_job_start = on_job_start
Expand Down Expand Up @@ -357,17 +370,43 @@ async def main(self) -> None:
if self.on_startup:
await self.on_startup(self.ctx)

async for _ in poll(self.poll_delay_s):
await self._poll_iteration()
if self.stream is False:
async for _ in poll(self.poll_delay_s):
await self._poll_iteration()

if self.burst:
if 0 <= self.max_burst_jobs <= self._jobs_started():
await asyncio.gather(*self.tasks.values())
return None
queued_jobs = await self.pool.zcard(self.queue_name)
if queued_jobs == 0:
await asyncio.gather(*self.tasks.values())
return None
else:
stream_name = stream_prefix + self.queue_name

with contextlib.suppress(ResponseError):
await self.pool.xgroup_create(stream_name, self.worker_group, '$', mkstream=True)
logger.info('Stream consumer group created with name: %s', self.worker_group)

async def read_messages(name: Literal['>', '0']) -> None:
if event := await self.pool.xreadgroup(
consumername=self.worker_name, groupname=self.worker_group, streams={stream_name: name}, block=0
):
await self._poll_iteration()

acknowledge = cast(Callable[..., Any], self.pool.xack)
for message in event[0][1]:
await acknowledge(stream_name, self.worker_group, message[0])

# Heartbeat before blocking, or health check will fail previous to receiving first message
await self.heart_beat()

if self.burst:
if 0 <= self.max_burst_jobs <= self._jobs_started():
await asyncio.gather(*self.tasks.values())
return None
queued_jobs = await self.pool.zcard(self.queue_name)
if queued_jobs == 0:
await asyncio.gather(*self.tasks.values())
return None
await read_messages('0')
else:
while True:
await read_messages('>')

async def _poll_iteration(self) -> None:
"""
Expand Down
26 changes: 26 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,32 @@ def create(functions=[], burst=True, poll_delay=0, max_jobs=10, arq_redis=arq_re
await worker_.close()


poll_worker = worker


@pytest.fixture
async def stream_worker(arq_redis):
worker_: Worker = None

def create(functions=[], burst=True, stream=True, poll_delay=0, max_jobs=10, arq_redis=arq_redis, **kwargs):
nonlocal worker_
worker_ = Worker(
functions=functions,
redis_pool=arq_redis,
burst=burst,
stream=stream,
poll_delay=poll_delay,
max_jobs=max_jobs,
**kwargs,
)
return worker_

yield create

if worker_:
await worker_.close()


@pytest.fixture
async def worker_retry(arq_redis_retry):
worker_retry_: Worker = None
Expand Down
Loading