diff --git a/worker.go b/worker.go index 103de9e..e3c79fa 100644 --- a/worker.go +++ b/worker.go @@ -88,6 +88,7 @@ type Worker struct { mDuration metric.Int64Histogram panicStackBufSize int + spanWorkOneNoJob bool } // NewWorker returns a Worker that fetches Jobs from the Client and executes @@ -180,11 +181,18 @@ func (w *Worker) runLoop(ctx context.Context) error { // WorkOne tries to consume single message from the queue. func (w *Worker) WorkOne(ctx context.Context) (didWork bool) { - j, err := w.pollFunc(ctx, w.queue) + ctx, span := w.tracer.Start(ctx, "Worker.WorkOne") + // worker option is set to generate spans even when no job is found - let it be + if w.spanWorkOneNoJob { + defer span.End() + } + j, err := w.pollFunc(ctx, w.queue) if err != nil { + span.RecordError(fmt.Errorf("woker failed to lock a job: %w", err)) w.mWorked.Add(ctx, 1, metric.WithAttributes(attrJobType.String(""), attrSuccess.Bool(false))) w.logger.Error("Worker failed to lock a job", adapter.Err(err)) + for _, hook := range w.hooksJobLocked { hook(ctx, nil, err) } @@ -194,11 +202,17 @@ func (w *Worker) WorkOne(ctx context.Context) (didWork bool) { return // no job was available } + // at this point we have a job, so we need to ensure that span will be generated + if !w.spanWorkOneNoJob { + defer span.End() + } + processingStartedAt := time.Now() - ctx, span := w.tracer.Start(ctx, "Worker.WorkOne", trace.WithAttributes( + span.SetAttributes( + attribute.String("job-id", j.ID.String()), + attribute.String("job-queue", j.Queue), attribute.String("job-type", j.Type), - )) - defer span.End() + ) ll := w.logger.With(adapter.F("job-id", j.ID.String()), adapter.F("job-type", j.Type)) @@ -226,7 +240,7 @@ func (w *Worker) WorkOne(ctx context.Context) (didWork bool) { if !ok { w.mWorked.Add(ctx, 1, metric.WithAttributes(attrJobType.String(j.Type), attrSuccess.Bool(false))) - span.RecordError(errors.New("job with unknown type")) + span.RecordError(fmt.Errorf("job with unknown type: %q", j.Type)) ll.Error("Got a job with unknown type") errUnknownType := fmt.Errorf("worker[id=%s] unknown job type: %q", w.id, j.Type) @@ -349,6 +363,7 @@ type WorkerPool struct { hooksJobDone []HookFunc panicStackBufSize int + spanWorkOneNoJob bool } // NewWorkerPool creates a new WorkerPool with count workers using the Client c. @@ -394,6 +409,7 @@ func NewWorkerPool(c *Client, wm WorkMap, poolSize int, options ...WorkerPoolOpt WithWorkerHooksUnknownJobType(w.hooksUnknownJobType...), WithWorkerHooksJobDone(w.hooksJobDone...), WithWorkerPanicStackBufSize(w.panicStackBufSize), + WithWorkerSpanWorkOneNoJob(w.spanWorkOneNoJob), ) if err != nil { diff --git a/worker_option.go b/worker_option.go index a79d6be..f5e534e 100644 --- a/worker_option.go +++ b/worker_option.go @@ -119,6 +119,15 @@ func WithWorkerGracefulShutdown(handlerCtx func() context.Context) WorkerOption } } +// WithWorkerSpanWorkOneNoJob enables tracing span generation for every try to get one. +// When set to true - generates a span for every DB poll, even when no job was acquired. This may +// generate a lot of empty spans, but may help with some debugging, so use carefully. +func WithWorkerSpanWorkOneNoJob(spanWorkOneNoJob bool) WorkerOption { + return func(w *Worker) { + w.spanWorkOneNoJob = spanWorkOneNoJob + } +} + // WithPoolPollInterval overrides default poll interval with the given value. // Poll interval is the "sleep" duration if there were no jobs found in the DB. func WithPoolPollInterval(d time.Duration) WorkerPoolOption { @@ -207,3 +216,12 @@ func WithPoolPanicStackBufSize(size int) WorkerPoolOption { w.panicStackBufSize = size } } + +// WithPoolSpanWorkOneNoJob enables tracing span generation for every try to get one. +// When set to true - generates a span for every DB poll, even when no job was acquired. This may +// generate a lot of empty spans, but may help with some debugging, so use carefully. +func WithPoolSpanWorkOneNoJob(spanWorkOneNoJob bool) WorkerPoolOption { + return func(w *WorkerPool) { + w.spanWorkOneNoJob = spanWorkOneNoJob + } +} diff --git a/worker_option_test.go b/worker_option_test.go index 2d27358..a487592 100644 --- a/worker_option_test.go +++ b/worker_option_test.go @@ -276,6 +276,16 @@ func TestWithWorkerHooksJobDone(t *testing.T) { require.Equal(t, 3, hook.counter) } +func TestWithWorkerSpanWorkOneNoJob(t *testing.T) { + workerWOutSpanWorkOneNoJob, err := NewWorker(nil, dummyWM) + require.NoError(t, err) + assert.False(t, workerWOutSpanWorkOneNoJob.spanWorkOneNoJob) + + workerWithSpanWorkOneNoJob, err := NewWorker(nil, dummyWM, WithWorkerSpanWorkOneNoJob(true)) + require.NoError(t, err) + assert.True(t, workerWithSpanWorkOneNoJob.spanWorkOneNoJob) +} + func TestWithPoolHooksJobLocked(t *testing.T) { ctx := context.Background() hook := new(dummyHook) @@ -392,3 +402,17 @@ func TestWithPoolPanicStackBufSize(t *testing.T) { assert.Equal(t, 12345, w.panicStackBufSize) } } + +func TestWithPoolSpanWorkOneNoJob(t *testing.T) { + poolWOutSpanWorkOneNoJob, err := NewWorkerPool(nil, dummyWM, 2) + require.NoError(t, err) + for _, w := range poolWOutSpanWorkOneNoJob.workers { + assert.False(t, w.spanWorkOneNoJob) + } + + poolWithSpanWorkOneNoJob, err := NewWorkerPool(nil, dummyWM, 2, WithPoolSpanWorkOneNoJob(true)) + require.NoError(t, err) + for _, w := range poolWithSpanWorkOneNoJob.workers { + assert.True(t, w.spanWorkOneNoJob) + } +}