Skip to content

Commit

Permalink
Merge pull request #187 from vgarvardt/feat/spanWorkOneNoJob
Browse files Browse the repository at this point in the history
feat: added support for WorkOne no-job tracing span
  • Loading branch information
vgarvardt authored May 26, 2023
2 parents dca0883 + 950a58b commit 0531bac
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 5 deletions.
26 changes: 21 additions & 5 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type Worker struct {
mDuration metric.Int64Histogram

panicStackBufSize int
spanWorkOneNoJob bool
}

// NewWorker returns a Worker that fetches Jobs from the Client and executes
Expand Down Expand Up @@ -180,11 +181,18 @@ func (w *Worker) runLoop(ctx context.Context) error {

// WorkOne tries to consume single message from the queue.
func (w *Worker) WorkOne(ctx context.Context) (didWork bool) {
j, err := w.pollFunc(ctx, w.queue)
ctx, span := w.tracer.Start(ctx, "Worker.WorkOne")
// worker option is set to generate spans even when no job is found - let it be
if w.spanWorkOneNoJob {
defer span.End()
}

j, err := w.pollFunc(ctx, w.queue)
if err != nil {
span.RecordError(fmt.Errorf("woker failed to lock a job: %w", err))
w.mWorked.Add(ctx, 1, metric.WithAttributes(attrJobType.String(""), attrSuccess.Bool(false)))
w.logger.Error("Worker failed to lock a job", adapter.Err(err))

for _, hook := range w.hooksJobLocked {
hook(ctx, nil, err)
}
Expand All @@ -194,11 +202,17 @@ func (w *Worker) WorkOne(ctx context.Context) (didWork bool) {
return // no job was available
}

// at this point we have a job, so we need to ensure that span will be generated
if !w.spanWorkOneNoJob {
defer span.End()
}

processingStartedAt := time.Now()
ctx, span := w.tracer.Start(ctx, "Worker.WorkOne", trace.WithAttributes(
span.SetAttributes(
attribute.String("job-id", j.ID.String()),
attribute.String("job-queue", j.Queue),
attribute.String("job-type", j.Type),
))
defer span.End()
)

ll := w.logger.With(adapter.F("job-id", j.ID.String()), adapter.F("job-type", j.Type))

Expand Down Expand Up @@ -226,7 +240,7 @@ func (w *Worker) WorkOne(ctx context.Context) (didWork bool) {
if !ok {
w.mWorked.Add(ctx, 1, metric.WithAttributes(attrJobType.String(j.Type), attrSuccess.Bool(false)))

span.RecordError(errors.New("job with unknown type"))
span.RecordError(fmt.Errorf("job with unknown type: %q", j.Type))
ll.Error("Got a job with unknown type")

errUnknownType := fmt.Errorf("worker[id=%s] unknown job type: %q", w.id, j.Type)
Expand Down Expand Up @@ -349,6 +363,7 @@ type WorkerPool struct {
hooksJobDone []HookFunc

panicStackBufSize int
spanWorkOneNoJob bool
}

// NewWorkerPool creates a new WorkerPool with count workers using the Client c.
Expand Down Expand Up @@ -394,6 +409,7 @@ func NewWorkerPool(c *Client, wm WorkMap, poolSize int, options ...WorkerPoolOpt
WithWorkerHooksUnknownJobType(w.hooksUnknownJobType...),
WithWorkerHooksJobDone(w.hooksJobDone...),
WithWorkerPanicStackBufSize(w.panicStackBufSize),
WithWorkerSpanWorkOneNoJob(w.spanWorkOneNoJob),
)

if err != nil {
Expand Down
18 changes: 18 additions & 0 deletions worker_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ func WithWorkerGracefulShutdown(handlerCtx func() context.Context) WorkerOption
}
}

// WithWorkerSpanWorkOneNoJob enables tracing span generation for every try to get one.
// When set to true - generates a span for every DB poll, even when no job was acquired. This may
// generate a lot of empty spans, but may help with some debugging, so use carefully.
func WithWorkerSpanWorkOneNoJob(spanWorkOneNoJob bool) WorkerOption {
return func(w *Worker) {
w.spanWorkOneNoJob = spanWorkOneNoJob
}
}

// WithPoolPollInterval overrides default poll interval with the given value.
// Poll interval is the "sleep" duration if there were no jobs found in the DB.
func WithPoolPollInterval(d time.Duration) WorkerPoolOption {
Expand Down Expand Up @@ -207,3 +216,12 @@ func WithPoolPanicStackBufSize(size int) WorkerPoolOption {
w.panicStackBufSize = size
}
}

// WithPoolSpanWorkOneNoJob enables tracing span generation for every try to get one.
// When set to true - generates a span for every DB poll, even when no job was acquired. This may
// generate a lot of empty spans, but may help with some debugging, so use carefully.
func WithPoolSpanWorkOneNoJob(spanWorkOneNoJob bool) WorkerPoolOption {
return func(w *WorkerPool) {
w.spanWorkOneNoJob = spanWorkOneNoJob
}
}
24 changes: 24 additions & 0 deletions worker_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,16 @@ func TestWithWorkerHooksJobDone(t *testing.T) {
require.Equal(t, 3, hook.counter)
}

func TestWithWorkerSpanWorkOneNoJob(t *testing.T) {
workerWOutSpanWorkOneNoJob, err := NewWorker(nil, dummyWM)
require.NoError(t, err)
assert.False(t, workerWOutSpanWorkOneNoJob.spanWorkOneNoJob)

workerWithSpanWorkOneNoJob, err := NewWorker(nil, dummyWM, WithWorkerSpanWorkOneNoJob(true))
require.NoError(t, err)
assert.True(t, workerWithSpanWorkOneNoJob.spanWorkOneNoJob)
}

func TestWithPoolHooksJobLocked(t *testing.T) {
ctx := context.Background()
hook := new(dummyHook)
Expand Down Expand Up @@ -392,3 +402,17 @@ func TestWithPoolPanicStackBufSize(t *testing.T) {
assert.Equal(t, 12345, w.panicStackBufSize)
}
}

func TestWithPoolSpanWorkOneNoJob(t *testing.T) {
poolWOutSpanWorkOneNoJob, err := NewWorkerPool(nil, dummyWM, 2)
require.NoError(t, err)
for _, w := range poolWOutSpanWorkOneNoJob.workers {
assert.False(t, w.spanWorkOneNoJob)
}

poolWithSpanWorkOneNoJob, err := NewWorkerPool(nil, dummyWM, 2, WithPoolSpanWorkOneNoJob(true))
require.NoError(t, err)
for _, w := range poolWithSpanWorkOneNoJob.workers {
assert.True(t, w.spanWorkOneNoJob)
}
}

0 comments on commit 0531bac

Please sign in to comment.