-
-
Notifications
You must be signed in to change notification settings - Fork 176
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🔨 Adding a job counter to address Semaphore issues #408
Conversation
Codecov Report
Additional details and impacted files@@ Coverage Diff @@
## main #408 +/- ##
==========================================
- Coverage 98.66% 98.40% -0.27%
==========================================
Files 11 11
Lines 1052 1063 +11
Branches 199 200 +1
==========================================
+ Hits 1038 1046 +8
- Misses 6 8 +2
- Partials 8 9 +1
Continue to review full report in Codecov by Sentry.
|
Thank you! 😊 Would be great if you could add a test that ensure we don't regress in the future. 😊 |
Could you suggest a test? I couldn't come up with an appropriate one. |
Set max jobs to 1, queue a task and ensure health check is still logged? This should fail without this implementation, right? I'm not at home, but I'm pretty sure there's a health check test already implemented. |
I have setup a test that aims to cancel the job when max_jobs queue is full. The test is failing for Python 3.7. The logs show that the job was enqueued and cancelled. There is a stack trace for ConnectionError from Redis in between the 2 log lines. Can you help me debug whenever you can make some time? I do think just re-running the CI job for 3.7 should resolve this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you 😊
This looks good to me.
Samuel is busy with v2 of Pydantic, so I wouldn't expect this to be merged until that happens, but you can always use your own fork in the mean time 😊
|
||
if self.job_counter >= self.max_jobs: | ||
self.sem.release() | ||
return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just return
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @JonasKs! Any update on when can this be merged? |
Sorry this took some time, been a bit busy lately. As for the release, I'm gonna have to put that back into @samuelcolvin's hands - not sure what he has planned. 😊 |
|
Issue #405
Having a semaphore in the same event loop, while having the max number of jobs running, blocks heartbeat & cancellation tasks.
Possible solution
max_jobs + 1
job_counter
that increments when the semaphore is acquired and decrements when the semaphore is releasedjob_counter == max_jobs
, release the semaphore immediately and return from thestart_jobs
function, basically foregoing starting new jobs.The solution seems to work. The counter is threadsafe because we always increment or decrement it before releasing the semaphore. Quick test app below:
arq_app.py
start_workers.py
arq_app.py
worker.log
for the first heartbeat happeningcancellations.py
worker.log
for the latest heartbeat + cancelleation + new job