Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): Prevent __default__ jobs in scaling mode #12402

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion packages/cli/src/scaling/__tests__/scaling.service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { GlobalConfig } from '@n8n/config';
import * as BullModule from 'bull';
import { mock } from 'jest-mock-extended';
import { InstanceSettings } from 'n8n-core';
import { ApplicationError } from 'n8n-workflow';
import { ApplicationError, ExecutionCancelledError } from 'n8n-workflow';
import Container from 'typedi';

import { mockInstance, mockLogger } from '@test/mocking';
Expand Down Expand Up @@ -287,6 +287,8 @@ describe('ScalingService', () => {
const result = await scalingService.stopJob(job);

expect(job.progress).toHaveBeenCalledWith({ kind: 'abort-job' });
expect(job.discard).toHaveBeenCalled();
expect(job.moveToFailed).toHaveBeenCalledWith(new ExecutionCancelledError('123'), true);
expect(result).toBe(true);
});

Expand Down
25 changes: 20 additions & 5 deletions packages/cli/src/scaling/scaling.service.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
import { GlobalConfig } from '@n8n/config';
import { ErrorReporter, InstanceSettings, Logger } from 'n8n-core';
import { ApplicationError, BINARY_ENCODING, sleep, jsonStringify, ensureError } from 'n8n-workflow';
import {
ApplicationError,
BINARY_ENCODING,
sleep,
jsonStringify,
ensureError,
ExecutionCancelledError,
} from 'n8n-workflow';
import type { IExecuteResponsePromiseData } from 'n8n-workflow';
import { strict } from 'node:assert';
import assert, { strict } from 'node:assert';
import Container, { Service } from 'typedi';

import { ActiveExecutions } from '@/active-executions';
Expand Down Expand Up @@ -206,16 +213,24 @@ export class ScalingService {
try {
if (await job.isActive()) {
await job.progress({ kind: 'abort-job' }); // being processed by worker
this.logger.debug('Stopped active job', props);
await job.discard(); // prevent retries
await job.moveToFailed(new ExecutionCancelledError(job.data.executionId), true); // remove from queue
return true;
}

await job.remove(); // not yet picked up, or waiting for next pickup (stalled)
this.logger.debug('Stopped inactive job', props);
return true;
} catch (error: unknown) {
await job.progress({ kind: 'abort-job' });
this.logger.error('Failed to stop job', { ...props, error });
assert(error instanceof Error);
this.logger.error('Failed to stop job', {
...props,
error: {
message: error.message,
name: error.name,
stack: error.stack,
},
});
return false;
}
}
Expand Down
8 changes: 6 additions & 2 deletions packages/cli/src/workflow-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,15 @@ export class WorkflowRunner {
//
// FIXME: This is a quick fix. The proper fix would be to not remove
// the execution from the active executions while it's still running.
if (error instanceof ExecutionNotFoundError || error instanceof ExecutionCancelledError) {
if (
error instanceof ExecutionNotFoundError ||
error instanceof ExecutionCancelledError ||
error.message.includes('cancelled')
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

) {
return;
}

this.logger.error(`Problem with execution ${executionId}: ${error.message}. Aborting.`);
this.errorReporter.error(error, { executionId });

const isQueueMode = config.getEnv('executions.mode') === 'queue';
Expand Down Expand Up @@ -413,7 +418,6 @@ export class WorkflowRunner {
data.workflowData,
{ retryOf: data.retryOf ? data.retryOf.toString() : undefined },
);
this.logger.error(`Problem with execution ${executionId}: ${error.message}. Aborting.`);
await this.processError(error, new Date(), data.executionMode, executionId, hooks);

reject(error);
Expand Down
Loading