diff --git a/backends/memory/memory_backend.go b/backends/memory/memory_backend.go index 3a074db..2479968 100644 --- a/backends/memory/memory_backend.go +++ b/backends/memory/memory_backend.go @@ -321,7 +321,7 @@ func (m *MemBackend) scheduleFutureJobs(ctx context.Context) { } func (m *MemBackend) handleJob(ctx context.Context, job *jobs.Job, h handler.Handler) (err error) { - ctx = withJobContext(ctx, job) + ctx = jobs.WithJobContext(ctx, job) m.logger.Debug( "handling job", @@ -381,8 +381,3 @@ func (m *MemBackend) removeFutureJob(jobID int64) { m.futureJobs.Delete(job.ID) } } - -// withJobContext creates a new context with the Job set -func withJobContext(ctx context.Context, j *jobs.Job) context.Context { - return context.WithValue(ctx, internal.JobCtxVarKey, j) -} diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index ca78ca1..c1aba83 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -885,7 +885,7 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string) (err error) { return } - ctx = withJobContext(ctx, job) + ctx = jobs.WithJobContext(ctx, job) ctx = context.WithValue(ctx, txCtxVarKey, tx) if job.Deadline != nil && job.Deadline.Before(time.Now().UTC()) { @@ -1068,11 +1068,6 @@ func (p *PgBackend) acquire(ctx context.Context) (conn *pgxpool.Conn, err error) } } -// withJobContext creates a new context with the Job set -func withJobContext(ctx context.Context, j *jobs.Job) context.Context { - return context.WithValue(ctx, internal.JobCtxVarKey, j) -} - func GetPQConnectionString(connectionString string) (string, error) { pgxCfg, err := pgx.ParseConfig(connectionString) if err != nil { diff --git a/backends/redis/redis_backend.go b/backends/redis/redis_backend.go index a463673..4de9ae4 100644 --- a/backends/redis/redis_backend.go +++ b/backends/redis/redis_backend.go @@ -242,7 +242,7 @@ func (b *RedisBackend) Start(_ context.Context, h handler.Handler) (err error) { RunAfter: ti.NextProcessAt, } - ctx = withJobContext(ctx, job) + ctx = jobs.WithJobContext(ctx, job) err = handler.Exec(ctx, h) if err != nil { b.logger.Error("error handling job", slog.Any("error", err)) @@ -349,8 +349,3 @@ func (b *RedisBackend) Shutdown(_ context.Context) { b.client.Close() b.server.Shutdown() } - -// withJobContext creates a new context with the Job set -func withJobContext(ctx context.Context, j *jobs.Job) context.Context { - return context.WithValue(ctx, internal.JobCtxVarKey, j) -} diff --git a/jobs/jobs.go b/jobs/jobs.go index 16bb7dd..a5210bf 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -75,6 +75,11 @@ func FingerprintJob(j *Job) (err error) { return } +// WithJobContext adds a job to the provided context +func WithJobContext(ctx context.Context, job *Job) context.Context { + return context.WithValue(ctx, internal.JobCtxVarKey, job) +} + // FromContext fetches the job from a context if the job context variable is set func FromContext(ctx context.Context) (j *Job, err error) { var ok bool