diff --git a/packages/cli/src/scaling/__tests__/scaling.service.test.ts b/packages/cli/src/scaling/__tests__/scaling.service.test.ts index b400bf6dfbbb7..56836b18c6c79 100644 --- a/packages/cli/src/scaling/__tests__/scaling.service.test.ts +++ b/packages/cli/src/scaling/__tests__/scaling.service.test.ts @@ -2,7 +2,7 @@ import { GlobalConfig } from '@n8n/config'; import * as BullModule from 'bull'; import { mock } from 'jest-mock-extended'; import { InstanceSettings } from 'n8n-core'; -import { ApplicationError } from 'n8n-workflow'; +import { ApplicationError, ExecutionCancelledError } from 'n8n-workflow'; import Container from 'typedi'; import { mockInstance, mockLogger } from '@test/mocking'; @@ -287,6 +287,8 @@ describe('ScalingService', () => { const result = await scalingService.stopJob(job); expect(job.progress).toHaveBeenCalledWith({ kind: 'abort-job' }); + expect(job.discard).toHaveBeenCalled(); + expect(job.moveToFailed).toHaveBeenCalledWith(new ExecutionCancelledError('123'), true); expect(result).toBe(true); }); diff --git a/packages/cli/src/scaling/scaling.service.ts b/packages/cli/src/scaling/scaling.service.ts index 5896e2976be42..1963aa9ac9fc1 100644 --- a/packages/cli/src/scaling/scaling.service.ts +++ b/packages/cli/src/scaling/scaling.service.ts @@ -1,8 +1,15 @@ import { GlobalConfig } from '@n8n/config'; import { ErrorReporter, InstanceSettings, Logger } from 'n8n-core'; -import { ApplicationError, BINARY_ENCODING, sleep, jsonStringify, ensureError } from 'n8n-workflow'; +import { + ApplicationError, + BINARY_ENCODING, + sleep, + jsonStringify, + ensureError, + ExecutionCancelledError, +} from 'n8n-workflow'; import type { IExecuteResponsePromiseData } from 'n8n-workflow'; -import { strict } from 'node:assert'; +import assert, { strict } from 'node:assert'; import Container, { Service } from 'typedi'; import { ActiveExecutions } from '@/active-executions'; @@ -206,7 +213,8 @@ export class ScalingService { try { if (await job.isActive()) { await job.progress({ kind: 'abort-job' }); // being processed by worker - this.logger.debug('Stopped active job', props); + await job.discard(); // prevent retries + await job.moveToFailed(new ExecutionCancelledError(job.data.executionId), true); // remove from queue return true; } @@ -214,8 +222,15 @@ export class ScalingService { this.logger.debug('Stopped inactive job', props); return true; } catch (error: unknown) { - await job.progress({ kind: 'abort-job' }); - this.logger.error('Failed to stop job', { ...props, error }); + assert(error instanceof Error); + this.logger.error('Failed to stop job', { + ...props, + error: { + message: error.message, + name: error.name, + stack: error.stack, + }, + }); return false; } } diff --git a/packages/cli/src/workflow-runner.ts b/packages/cli/src/workflow-runner.ts index bb27fca216882..93917d79874a7 100644 --- a/packages/cli/src/workflow-runner.ts +++ b/packages/cli/src/workflow-runner.ts @@ -66,10 +66,15 @@ export class WorkflowRunner { // // FIXME: This is a quick fix. The proper fix would be to not remove // the execution from the active executions while it's still running. - if (error instanceof ExecutionNotFoundError || error instanceof ExecutionCancelledError) { + if ( + error instanceof ExecutionNotFoundError || + error instanceof ExecutionCancelledError || + error.message.includes('cancelled') + ) { return; } + this.logger.error(`Problem with execution ${executionId}: ${error.message}. Aborting.`); this.errorReporter.error(error, { executionId }); const isQueueMode = config.getEnv('executions.mode') === 'queue'; @@ -413,7 +418,6 @@ export class WorkflowRunner { data.workflowData, { retryOf: data.retryOf ? data.retryOf.toString() : undefined }, ); - this.logger.error(`Problem with execution ${executionId}: ${error.message}. Aborting.`); await this.processError(error, new Date(), data.executionMode, executionId, hooks); reject(error);