-
-
Notifications
You must be signed in to change notification settings - Fork 176
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Scheduling repeated unique jobs #457
Comments
I loosely recollect having to do this in the past... wouldn't defining a |
I'd prefer not to mix Regarding your I tried to workaround this by querying the amount of queued jobs of this task, if it's exactly 1 (the current job) then re-enqueue the next: import asyncio
from arq import create_pool
from arq.connections import RedisSettings
async def repeated_async_job(ctx):
# do stuff
asyncio.sleep(3)
# re-enqueue if there is exactly one job running (this job)
redis = ctx['redis']
queued_jobs = await redis.queued_jobs()
queued_jobs_len = len([job for job in queued_jobs if job.function == 'repeated_async_job'])
if queued_jobs_len == 0:
print("ERROR: should not happen")
elif queued_jobs == 1:
# the current job so we can enqueue the next, but without a job_id
await redis.enqueue_job('repeated_async_job', _job_try=1)
else:
print("ERROR: too many jobs")
async def main():
redis = await create_pool(RedisSettings())
# startup job with unique ID
await redis.enqueue_job('repeated_async_job', _job_id='app.main.startup', _job_try=1)
class WorkerSettings:
functions = [repeated_async_job]
if __name__ == '__main__':
asyncio.run(main())
it seems to work, but not sure if there is a better/native way to do this. I was also wondering what
but how is it useful? |
Ah, now I remember that's where I got stuck, how do you re-enqueue a job_id when one is already running with the same id? (or if its result is saved, but not explicitly retrieved/deleted). Regarding _job_try: My understanding is that If you find a better solution, keen to learn too! |
since a job can have more states than import asyncio
from arq import create_pool
from arq.connections import RedisSettings
async def repeated_async_job(ctx):
# do stuff
asyncio.sleep(3)
# Check if any job with the same function is deferred, queued, or in progress
pool = ctx['redis']
all_jobs = await pool.all_job_results()
in_progress_jobs = [
job for job in all_jobs
if job.status in {JobStatus.deferred, JobStatus.queued, JobStatus.in_progress}
and job.function == 'repeated_async_job'
]
if in_progress_jobs:
return 'done'
await pool.enqueue_job('repeated_async_job')
return 'done'
async def main():
redis = await create_pool(RedisSettings())
await redis.enqueue_job('repeated_async_job')
class WorkerSettings:
functions = [repeated_async_job]
if __name__ == '__main__':
asyncio.run(main()) it does not account for params (i.e. same job but different parameters), but for this job I don't need it. |
@davidhuser are you facing the issue I've filed here by any chance, or know how to solve it? #459 where there's an in-progress key created for 60 seconds, even for a cron which I want to run every 5 or 10 seconds? |
Thanks for the great library. Is it possible to schedule repeated unique jobs? My goal is to enqueue a job, ensure only one instance runs at a time, and re-enqueue it immediately after it finishes, possibly with a configurable interval.
I noticed that RQ offers rq-scheduler for this purpose.
Self-enqueuing, as mentioned in #432 might not be ideal.
Currently, I'm using a cron job with the second parameter set to every 10 seconds. However, the job duration varies:
Would it be better to handle scheduling with a different tool, like APScheduler's AsyncIOScheduler? If so, how would the scheduler know when the job is finished, and would it matter if it runs with multiple workers (e.g. Gunicorn)?
The text was updated successfully, but these errors were encountered: