From bb51cc25a41205e8fc0c5280c2f155f2c28f0810 Mon Sep 17 00:00:00 2001 From: Vladimir Garvardt Date: Mon, 3 Jul 2023 07:24:06 +0200 Subject: [PATCH] feat: added job ttl option --- worker.go | 12 +++++++++++- worker_option.go | 16 ++++++++++++++++ worker_option_test.go | 26 ++++++++++++++++++++++++++ worker_test.go | 43 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 96 insertions(+), 1 deletion(-) diff --git a/worker.go b/worker.go index e3c79fa..1ae2986 100644 --- a/worker.go +++ b/worker.go @@ -73,6 +73,7 @@ type Worker struct { running bool pollStrategy PollStrategy pollFunc pollFunc + jobTTL time.Duration graceful bool gracefulCtx func() context.Context @@ -256,7 +257,14 @@ func (w *Worker) WorkOne(ctx context.Context) (didWork bool) { return } - if err = wf(ctx, j); err != nil { + handlerCtx := ctx + cancel := context.CancelFunc(func() {}) + if w.jobTTL > 0 { + handlerCtx, cancel = context.WithTimeout(ctx, w.jobTTL) + } + defer cancel() + + if err = wf(handlerCtx, j); err != nil { w.mWorked.Add(ctx, 1, metric.WithAttributes(attrJobType.String(j.Type), attrSuccess.Bool(false))) for _, hook := range w.hooksJobDone { @@ -351,6 +359,7 @@ type WorkerPool struct { mu sync.Mutex running bool pollStrategy PollStrategy + jobTTL time.Duration graceful bool gracefulCtx func() context.Context @@ -410,6 +419,7 @@ func NewWorkerPool(c *Client, wm WorkMap, poolSize int, options ...WorkerPoolOpt WithWorkerHooksJobDone(w.hooksJobDone...), WithWorkerPanicStackBufSize(w.panicStackBufSize), WithWorkerSpanWorkOneNoJob(w.spanWorkOneNoJob), + WithWorkerJobTTL(w.jobTTL), ) if err != nil { diff --git a/worker_option.go b/worker_option.go index f5e534e..caece43 100644 --- a/worker_option.go +++ b/worker_option.go @@ -128,6 +128,14 @@ func WithWorkerSpanWorkOneNoJob(spanWorkOneNoJob bool) WorkerOption { } } +// WithWorkerJobTTL sets max time a job can run. Implementation-wise the job runs with the timeout context, +// so it is up to the job implementation to handle context cancellation properly. +func WithWorkerJobTTL(d time.Duration) WorkerOption { + return func(w *Worker) { + w.jobTTL = d + } +} + // WithPoolPollInterval overrides default poll interval with the given value. // Poll interval is the "sleep" duration if there were no jobs found in the DB. func WithPoolPollInterval(d time.Duration) WorkerPoolOption { @@ -225,3 +233,11 @@ func WithPoolSpanWorkOneNoJob(spanWorkOneNoJob bool) WorkerPoolOption { w.spanWorkOneNoJob = spanWorkOneNoJob } } + +// WithPoolJobTTL sets max time a job can run. Implementation-wise the job runs with the timeout context, +// so it is up to the job implementation to handle context cancellation properly. +func WithPoolJobTTL(d time.Duration) WorkerPoolOption { + return func(w *WorkerPool) { + w.jobTTL = d + } +} diff --git a/worker_option_test.go b/worker_option_test.go index a487592..f4543db 100644 --- a/worker_option_test.go +++ b/worker_option_test.go @@ -286,6 +286,16 @@ func TestWithWorkerSpanWorkOneNoJob(t *testing.T) { assert.True(t, workerWithSpanWorkOneNoJob.spanWorkOneNoJob) } +func TestWithWorkerJobTTL(t *testing.T) { + workerWOutJobTTL, err := NewWorker(nil, dummyWM) + require.NoError(t, err) + assert.Equal(t, time.Duration(0), workerWOutJobTTL.jobTTL) + + workerWithJobTTL, err := NewWorker(nil, dummyWM, WithWorkerJobTTL(5*time.Minute)) + require.NoError(t, err) + assert.Equal(t, 5*time.Minute, workerWithJobTTL.jobTTL) +} + func TestWithPoolHooksJobLocked(t *testing.T) { ctx := context.Background() hook := new(dummyHook) @@ -416,3 +426,19 @@ func TestWithPoolSpanWorkOneNoJob(t *testing.T) { assert.True(t, w.spanWorkOneNoJob) } } + +func TestWithPoolJobTTL(t *testing.T) { + poolWOutJobTTL, err := NewWorkerPool(nil, dummyWM, 2) + require.NoError(t, err) + assert.Equal(t, time.Duration(0), poolWOutJobTTL.jobTTL) + for _, w := range poolWOutJobTTL.workers { + assert.Equal(t, time.Duration(0), w.jobTTL) + } + + poolWithJobTTL, err := NewWorkerPool(nil, dummyWM, 2, WithPoolJobTTL(10*time.Minute)) + require.NoError(t, err) + assert.Equal(t, 10*time.Minute, poolWithJobTTL.jobTTL) + for _, w := range poolWithJobTTL.workers { + assert.Equal(t, 10*time.Minute, w.jobTTL) + } +} diff --git a/worker_test.go b/worker_test.go index d411601..3801c97 100644 --- a/worker_test.go +++ b/worker_test.go @@ -600,6 +600,10 @@ func TestNewWorker_GracefulShutdown(t *testing.T) { require.NoError(t, err) chDone := make(chan bool) + t.Cleanup(func() { + close(chDone) + }) + go func() { err := wNonGraceful.Run(ctxNonGraceful) assert.NoError(t, err) @@ -628,6 +632,45 @@ func TestNewWorker_GracefulShutdown(t *testing.T) { require.False(t, jobCancelled) } +func TestNewWorker_JobTTL(t *testing.T) { + connPool := adapterTesting.OpenTestPoolLibPQ(t) + + c, err := NewClient(connPool) + require.NoError(t, err) + + var jobCancelled bool + wm := WorkMap{ + "MyJob": func(ctx context.Context, j *Job) error { + select { + case <-ctx.Done(): + jobCancelled = true + case <-time.After(5 * time.Second): + jobCancelled = false + } + + return nil + }, + } + + wNoJobTTL, err := NewWorker(c, wm) + require.NoError(t, err) + wWithJobTTL, err := NewWorker(c, wm, WithWorkerJobTTL(2*time.Second)) + require.NoError(t, err) + + err = c.Enqueue(context.Background(), &Job{Type: "MyJob"}) + require.NoError(t, err) + err = c.Enqueue(context.Background(), &Job{Type: "MyJob"}) + require.NoError(t, err) + + didWork := wNoJobTTL.WorkOne(context.Background()) + require.True(t, didWork) + require.False(t, jobCancelled) + + didWork = wWithJobTTL.WorkOne(context.Background()) + require.True(t, didWork) + require.True(t, jobCancelled) +} + func TestNewWorkerPool_GracefulShutdown(t *testing.T) { connPool := adapterTesting.OpenTestPoolLibPQ(t)