diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index 247c4ba..8826329 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -848,6 +848,9 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string) (err error) { return } + ctx = withJobContext(ctx, job) + ctx = context.WithValue(ctx, txCtxVarKey, tx) + if job.Deadline != nil && job.Deadline.Before(time.Now().UTC()) { err = jobs.ErrJobExceededDeadline p.logger.Debug("job deadline is in the past, skipping", slog.String("queue", job.Queue), slog.Int64("job_id", job.ID)) @@ -855,9 +858,6 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string) (err error) { return } - ctx = withJobContext(ctx, job) - ctx = context.WithValue(ctx, txCtxVarKey, tx) - // check if the job is being retried and increment retry count accordingly if job.Status != internal.JobStatusNew { job.Retries++ diff --git a/backends/postgres/postgres_backend_test.go b/backends/postgres/postgres_backend_test.go index 9e2bcb5..63d4540 100644 --- a/backends/postgres/postgres_backend_test.go +++ b/backends/postgres/postgres_backend_test.go @@ -999,3 +999,49 @@ func TestGetPQConnectionString(t *testing.T) { }) } } + +// TestJobWithPastDeadline ensures that when a job is scheduled and its deadline is in the past, that the job is updated +// with an error indicating that its deadline was not met +// https://github.com/acaloiaro/neoq/issues/123 +func TestJobWithPastDeadline(t *testing.T) { + connString, _ := prepareAndCleanupDB(t) + const queue = "testing" + maxRetries := 5 + done := make(chan bool) + defer close(done) + + ctx := context.Background() + nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(connString)) + if err != nil { + t.Fatal(err) + } + defer nq.Shutdown(ctx) + + h := handler.New(queue, func(_ context.Context) (err error) { + done <- true + return + }) + + err = nq.Start(ctx, h) + if err != nil { + t.Error(err) + } + + // deadline in the past + deadline := time.Now().UTC().Add(time.Duration(-5) * time.Second) + jid, e := nq.Enqueue(ctx, &jobs.Job{ + Queue: queue, + Payload: map[string]interface{}{ + "message": "hello world", + }, + Deadline: &deadline, + MaxRetries: &maxRetries, + }) + if e != nil || jid == jobs.DuplicateJobID { + t.Error(e) + } + + if e != nil && !errors.Is(e, jobs.ErrJobExceededDeadline) { + t.Error(err) + } +} diff --git a/flake.nix b/flake.nix index cd68305..b79134a 100644 --- a/flake.nix +++ b/flake.nix @@ -22,7 +22,7 @@ in { devShells = forEachSystem (system: let pkgs = nixpkgs.legacyPackages.${system}; - postgresPort = 5433; + postgresPort = 5434; redisPort = 6380; in { default = devenv.lib.mkShell {