diff --git a/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts b/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts index 6511ae4d035a0..1b7df31473156 100644 --- a/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts +++ b/packages/cli/src/concurrency/__tests__/concurrency-control.service.test.ts @@ -1,6 +1,7 @@ import { mock } from 'jest-mock-extended'; import type { WorkflowExecuteMode as ExecutionMode } from 'n8n-workflow'; +import type { ConcurrencyType } from '@/concurrency/concurrency-control.service'; import { CLOUD_TEMP_PRODUCTION_LIMIT, CLOUD_TEMP_REPORTABLE_THRESHOLDS, @@ -24,61 +25,69 @@ describe('ConcurrencyControlService', () => { afterEach(() => { config.set('executions.concurrency.productionLimit', -1); + config.set('executions.concurrency.evaluationLimit', -1); config.set('executions.mode', 'integrated'); jest.clearAllMocks(); }); describe('constructor', () => { - it('should be enabled if production cap is positive', () => { - /** - * Arrange - */ - config.set('executions.concurrency.productionLimit', 1); - - /** - * Act - */ - const service = new ConcurrencyControlService( - logger, - executionRepository, - telemetry, - eventService, - ); - - /** - * Assert - */ - // @ts-expect-error Private property - expect(service.isEnabled).toBe(true); - // @ts-expect-error Private property - expect(service.productionQueue).toBeDefined(); - }); - - it('should throw if production cap is 0', () => { - /** - * Arrange - */ - config.set('executions.concurrency.productionLimit', 0); + it.each(['production', 'evaluation'])( + 'should be enabled if %s cap is positive', + (type: ConcurrencyType) => { + /** + * Arrange + */ + config.set(`executions.concurrency.${type}Limit`, 1); - try { /** * Act */ - new ConcurrencyControlService(logger, executionRepository, telemetry, eventService); - } catch (error) { + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventService, + ); + /** * Assert */ - expect(error).toBeInstanceOf(InvalidConcurrencyLimitError); - } - }); + // @ts-expect-error Private property + expect(service.isEnabled).toBe(true); + // @ts-expect-error Private property + expect(service.queues.get(type)).toBeDefined(); + }, + ); + + it.each(['production', 'evaluation'])( + 'should throw if %s cap is 0', + (type: ConcurrencyType) => { + /** + * Arrange + */ + config.set(`executions.concurrency.${type}Limit`, 0); + + try { + /** + * Act + */ + new ConcurrencyControlService(logger, executionRepository, telemetry, eventService); + } catch (error) { + /** + * Assert + */ + expect(error).toBeInstanceOf(InvalidConcurrencyLimitError); + } + }, + ); - it('should be disabled if production cap is -1', () => { + it('should be disabled if both production and evaluation caps are -1', () => { /** * Arrange */ config.set('executions.concurrency.productionLimit', -1); + config.set('executions.concurrency.evaluationLimit', -1); /** * Act @@ -97,28 +106,31 @@ describe('ConcurrencyControlService', () => { expect(service.isEnabled).toBe(false); }); - it('should be disabled if production cap is lower than -1', () => { - /** - * Arrange - */ - config.set('executions.concurrency.productionLimit', -2); + it.each(['production', 'evaluation'])( + 'should be disabled if %s cap is lower than -1', + (type: ConcurrencyType) => { + /** + * Arrange + */ + config.set(`executions.concurrency.${type}Limit`, -2); - /** - * Act - */ - const service = new ConcurrencyControlService( - logger, - executionRepository, - telemetry, - eventService, - ); + /** + * Act + */ + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventService, + ); - /** - * Act - */ - // @ts-expect-error Private property - expect(service.isEnabled).toBe(false); - }); + /** + * Act + */ + // @ts-expect-error Private property + expect(service.isEnabled).toBe(false); + }, + ); it('should be disabled on queue mode', () => { /** @@ -203,6 +215,31 @@ describe('ConcurrencyControlService', () => { */ expect(enqueueSpy).toHaveBeenCalled(); }); + + it('should enqueue on evaluation mode', async () => { + /** + * Arrange + */ + config.set('executions.concurrency.evaluationLimit', 1); + + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventService, + ); + const enqueueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'enqueue'); + + /** + * Act + */ + await service.throttle({ mode: 'evaluation', executionId: '1' }); + + /** + * Assert + */ + expect(enqueueSpy).toHaveBeenCalled(); + }); }); describe('release', () => { @@ -258,6 +295,31 @@ describe('ConcurrencyControlService', () => { */ expect(dequeueSpy).toHaveBeenCalled(); }); + + it('should dequeue on evaluation mode', () => { + /** + * Arrange + */ + config.set('executions.concurrency.evaluationLimit', 1); + + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventService, + ); + const dequeueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'dequeue'); + + /** + * Act + */ + service.release({ mode: 'evaluation' }); + + /** + * Assert + */ + expect(dequeueSpy).toHaveBeenCalled(); + }); }); describe('remove', () => { @@ -316,43 +378,125 @@ describe('ConcurrencyControlService', () => { expect(removeSpy).toHaveBeenCalled(); }, ); + + it('should remove an execution on evaluation mode', () => { + /** + * Arrange + */ + config.set('executions.concurrency.evaluationLimit', 1); + + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventService, + ); + const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove'); + + /** + * Act + */ + service.remove({ mode: 'evaluation', executionId: '1' }); + + /** + * Assert + */ + expect(removeSpy).toHaveBeenCalled(); + }); }); describe('removeAll', () => { - it('should remove all executions from the production queue', async () => { + it.each(['production', 'evaluation'])( + 'should remove all executions from the %s queue', + async (type: ConcurrencyType) => { + /** + * Arrange + */ + config.set(`executions.concurrency.${type}Limit`, 2); + + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventService, + ); + + jest + .spyOn(ConcurrencyQueue.prototype, 'getAll') + .mockReturnValueOnce(new Set(['1', '2', '3'])); + + const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove'); + + /** + * Act + */ + await service.removeAll({ + '1': mock(), + '2': mock(), + '3': mock(), + }); + + /** + * Assert + */ + expect(removeSpy).toHaveBeenNthCalledWith(1, '1'); + expect(removeSpy).toHaveBeenNthCalledWith(2, '2'); + expect(removeSpy).toHaveBeenNthCalledWith(3, '3'); + }, + ); + }); + + describe('get queue', () => { + it('should choose the production queue', async () => { /** * Arrange */ config.set('executions.concurrency.productionLimit', 2); + config.set('executions.concurrency.evaluationLimit', 2); + /** + * Act + */ const service = new ConcurrencyControlService( logger, executionRepository, telemetry, eventService, ); + // @ts-expect-error Private property + const queue = service.getQueue('webhook'); - jest - .spyOn(ConcurrencyQueue.prototype, 'getAll') - .mockReturnValueOnce(new Set(['1', '2', '3'])); + /** + * Assert + */ + // @ts-expect-error Private property + expect(queue).toEqual(service.queues.get('production')); + }); - const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove'); + it('should choose the evaluation queue', async () => { + /** + * Arrange + */ + config.set('executions.concurrency.productionLimit', 2); + config.set('executions.concurrency.evaluationLimit', 2); /** * Act */ - await service.removeAll({ - '1': mock(), - '2': mock(), - '3': mock(), - }); + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventService, + ); + // @ts-expect-error Private property + const queue = service.getQueue('evaluation'); /** * Assert */ - expect(removeSpy).toHaveBeenNthCalledWith(1, '1'); - expect(removeSpy).toHaveBeenNthCalledWith(2, '2'); - expect(removeSpy).toHaveBeenNthCalledWith(3, '3'); + // @ts-expect-error Private property + expect(queue).toEqual(service.queues.get('evaluation')); }); }); }); @@ -388,6 +532,32 @@ describe('ConcurrencyControlService', () => { */ expect(enqueueSpy).not.toHaveBeenCalled(); }); + + it('should do nothing for evaluation executions', async () => { + /** + * Arrange + */ + config.set('executions.concurrency.evaluationLimit', -1); + + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventService, + ); + const enqueueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'enqueue'); + + /** + * Act + */ + await service.throttle({ mode: 'evaluation', executionId: '1' }); + await service.throttle({ mode: 'evaluation', executionId: '2' }); + + /** + * Assert + */ + expect(enqueueSpy).not.toHaveBeenCalled(); + }); }); describe('release', () => { @@ -415,6 +585,31 @@ describe('ConcurrencyControlService', () => { */ expect(dequeueSpy).not.toHaveBeenCalled(); }); + + it('should do nothing for evaluation executions', () => { + /** + * Arrange + */ + config.set('executions.concurrency.evaluationLimit', -1); + + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventService, + ); + const dequeueSpy = jest.spyOn(ConcurrencyQueue.prototype, 'dequeue'); + + /** + * Act + */ + service.release({ mode: 'evaluation' }); + + /** + * Assert + */ + expect(dequeueSpy).not.toHaveBeenCalled(); + }); }); describe('remove', () => { @@ -442,6 +637,31 @@ describe('ConcurrencyControlService', () => { */ expect(removeSpy).not.toHaveBeenCalled(); }); + + it('should do nothing for evaluation executions', () => { + /** + * Arrange + */ + config.set('executions.concurrency.evaluationLimit', -1); + + const service = new ConcurrencyControlService( + logger, + executionRepository, + telemetry, + eventService, + ); + const removeSpy = jest.spyOn(ConcurrencyQueue.prototype, 'remove'); + + /** + * Act + */ + service.remove({ mode: 'evaluation', executionId: '1' }); + + /** + * Assert + */ + expect(removeSpy).not.toHaveBeenCalled(); + }); }); }); @@ -470,14 +690,17 @@ describe('ConcurrencyControlService', () => { * Act */ // @ts-expect-error Private property - service.productionQueue.emit('concurrency-check', { + service.queues.get('production').emit('concurrency-check', { capacity: CLOUD_TEMP_PRODUCTION_LIMIT - threshold, }); /** * Assert */ - expect(telemetry.track).toHaveBeenCalledWith('User hit concurrency limit', { threshold }); + expect(telemetry.track).toHaveBeenCalledWith('User hit concurrency limit', { + threshold, + concurrencyType: 'production', + }); }, ); @@ -500,7 +723,7 @@ describe('ConcurrencyControlService', () => { * Act */ // @ts-expect-error Private property - service.productionQueue.emit('concurrency-check', { + service.queues.get('production').emit('concurrency-check', { capacity: CLOUD_TEMP_PRODUCTION_LIMIT - threshold, }); @@ -532,7 +755,7 @@ describe('ConcurrencyControlService', () => { * Act */ // @ts-expect-error Private property - service.productionQueue.emit('concurrency-check', { + service.queues.get('production').emit('concurrency-check', { capacity: CLOUD_TEMP_PRODUCTION_LIMIT - threshold, }); diff --git a/packages/cli/src/concurrency/concurrency-control.service.ts b/packages/cli/src/concurrency/concurrency-control.service.ts index 0daef59b49723..060375457484e 100644 --- a/packages/cli/src/concurrency/concurrency-control.service.ts +++ b/packages/cli/src/concurrency/concurrency-control.service.ts @@ -1,3 +1,4 @@ +import { capitalize } from 'lodash'; import type { WorkflowExecuteMode as ExecutionMode } from 'n8n-workflow'; import { Service } from 'typedi'; @@ -15,21 +16,15 @@ import { ConcurrencyQueue } from './concurrency-queue'; export const CLOUD_TEMP_PRODUCTION_LIMIT = 999; export const CLOUD_TEMP_REPORTABLE_THRESHOLDS = [5, 10, 20, 50, 100, 200]; +export type ConcurrencyType = 'production' | 'evaluation'; + @Service() export class ConcurrencyControlService { private isEnabled: boolean; - // private readonly limits: Map; - - private readonly productionLimit: number; - - private readonly evaluationLimit: number; - - // private readonly queues: Map; + private readonly limits: Map; - private readonly productionQueue: ConcurrencyQueue; - - private readonly evaluationQueue: ConcurrencyQueue; + private readonly queues: Map; private readonly limitsToReport = CLOUD_TEMP_REPORTABLE_THRESHOLDS.map( (t) => CLOUD_TEMP_PRODUCTION_LIMIT - t, @@ -43,76 +38,66 @@ export class ConcurrencyControlService { ) { this.logger = this.logger.scoped('concurrency'); - this.productionLimit = config.getEnv('executions.concurrency.productionLimit'); - - this.evaluationLimit = config.getEnv('executions.concurrency.evaluationLimit'); + this.limits = new Map([ + ['production', config.getEnv('executions.concurrency.productionLimit')], + ['evaluation', config.getEnv('executions.concurrency.evaluationLimit')], + ]); - if (this.productionLimit === 0) { - throw new InvalidConcurrencyLimitError(this.productionLimit); - } - - if (this.evaluationLimit === 0) { - throw new InvalidConcurrencyLimitError(this.evaluationLimit); - } - - if (this.productionLimit < -1) { - this.productionLimit = -1; - } + this.limits.forEach((limit, type) => { + if (limit === 0) { + throw new InvalidConcurrencyLimitError(limit); + } - if (this.evaluationLimit < -1) { - this.evaluationLimit = -1; - } + if (limit < -1) { + this.limits.set(type, -1); + } + }); - if (this.productionLimit === -1 || config.getEnv('executions.mode') === 'queue') { + if ( + Array.from(this.limits.values()).every((limit) => limit === -1) || + config.getEnv('executions.mode') === 'queue' + ) { this.isEnabled = false; return; } - this.productionQueue = new ConcurrencyQueue(this.productionLimit); - - this.evaluationQueue = new ConcurrencyQueue(this.evaluationLimit); + this.queues = new Map(); + this.limits.forEach((limit, type) => { + this.queues.set(type, new ConcurrencyQueue(limit)); + }); this.logInit(); this.isEnabled = true; - this.productionQueue.on('concurrency-check', ({ capacity }) => { - if (this.shouldReport(capacity)) { - this.telemetry.track('User hit concurrency limit', { - threshold: CLOUD_TEMP_PRODUCTION_LIMIT - capacity, - }); - } - }); - - this.productionQueue.on('execution-throttled', ({ executionId }) => { - this.logger.debug('Execution throttled', { executionId }); - this.eventService.emit('execution-throttled', { executionId }); - }); - - this.productionQueue.on('execution-released', async (executionId) => { - this.logger.debug('Execution released', { executionId }); - }); - - this.evaluationQueue.on('execution-throttled', ({ executionId }) => { - this.logger.debug('Evaluation execution throttled', { executionId }); - this.eventService.emit('execution-throttled', { executionId }); - }); - - this.evaluationQueue.on('execution-released', async (executionId) => { - this.logger.debug('Evaluation execution released', { executionId }); + this.queues.forEach((queue, type) => { + queue.on('concurrency-check', ({ capacity }) => { + if (this.shouldReport(capacity)) { + this.telemetry.track('User hit concurrency limit', { + threshold: CLOUD_TEMP_PRODUCTION_LIMIT - capacity, + concurrencyType: type, + }); + } + }); + + queue.on('execution-throttled', ({ executionId }) => { + this.logger.debug('Execution throttled', { executionId, type }); + this.eventService.emit('execution-throttled', { executionId, type }); + }); + + queue.on('execution-released', async (executionId) => { + this.logger.debug('Execution released', { executionId, type }); + }); }); } /** - * Check whether an execution is in the production queue. + * Check whether an execution is in any of the queues. */ has(executionId: string) { if (!this.isEnabled) return false; - return ( - this.productionQueue.getAll().has(executionId) || - this.evaluationQueue.getAll().has(executionId) - ); + return Array.from(this.queues.values()).some((queue) => queue.getAll().has(executionId)); } /** @@ -121,23 +106,23 @@ export class ConcurrencyControlService { async throttle({ mode, executionId }: { mode: ExecutionMode; executionId: string }) { if (!this.isEnabled || this.isUnlimited(mode)) return; - if (mode === 'evaluation') { - await this.evaluationQueue.enqueue(executionId); - } else { - await this.productionQueue.enqueue(executionId); + const queue = this.getQueue(mode); + + if (queue) { + await queue.enqueue(executionId); } } /** - * Release capacity back so the next execution in the production queue can proceed. + * Release capacity back so the next execution in the queue can proceed. */ release({ mode }: { mode: ExecutionMode }) { if (!this.isEnabled || this.isUnlimited(mode)) return; - if (mode === 'evaluation') { - this.evaluationQueue.dequeue(); - } else { - this.productionQueue.dequeue(); + const queue = this.getQueue(mode); + + if (queue) { + queue.dequeue(); } } @@ -147,10 +132,10 @@ export class ConcurrencyControlService { remove({ mode, executionId }: { mode: ExecutionMode; executionId: string }) { if (!this.isEnabled || this.isUnlimited(mode)) return; - if (mode === 'evaluation') { - this.evaluationQueue.remove(executionId); - } else { - this.productionQueue.remove(executionId); + const queue = this.getQueue(mode); + + if (queue) { + queue.remove(executionId); } } @@ -162,17 +147,13 @@ export class ConcurrencyControlService { async removeAll(activeExecutions: { [executionId: string]: IExecutingWorkflowData }) { if (!this.isEnabled) return; - const enqueuedProductionIds = this.productionQueue.getAll(); - - for (const id of enqueuedProductionIds) { - this.productionQueue.remove(id); - } + this.queues.forEach((queue) => { + const enqueuedExecutionIds = queue.getAll(); - const enqueuedEvaluationIds = this.evaluationQueue.getAll(); - - for (const id of enqueuedEvaluationIds) { - this.evaluationQueue.remove(id); - } + for (const id of enqueuedExecutionIds) { + queue.remove(id); + } + }); const executionIds = Object.entries(activeExecutions) .filter(([_, execution]) => execution.status === 'new' && execution.responsePromise) @@ -196,22 +177,30 @@ export class ConcurrencyControlService { private logInit() { this.logger.debug('Enabled'); - this.logger.debug( - [ - 'Production execution concurrency is', - this.productionLimit === -1 ? 'unlimited' : 'limited to ' + this.productionLimit.toString(), - ].join(' '), - ); - - this.logger.debug( - [ - 'Evaluation execution concurrency is', - this.productionLimit === -1 ? 'unlimited' : 'limited to ' + this.evaluationLimit.toString(), - ].join(' '), - ); + this.limits.forEach((limit, type) => { + this.logger.debug( + [ + `${capitalize(type)} execution concurrency is`, + limit === -1 ? 'unlimited' : 'limited to ' + limit.toString(), + ].join(' '), + ); + }); } private isUnlimited(mode: ExecutionMode) { + const queue = this.getQueue(mode); + + return queue === undefined; + } + + private shouldReport(capacity: number) { + return config.getEnv('deployment.type') === 'cloud' && this.limitsToReport.includes(capacity); + } + + /** + * Get the concurrency queue based on the execution mode. + */ + private getQueue(mode: ExecutionMode) { if ( mode === 'error' || mode === 'integrated' || @@ -220,25 +209,13 @@ export class ConcurrencyControlService { mode === 'manual' || mode === 'retry' ) { - return true; + return undefined; } - if (mode === 'webhook' || mode === 'trigger') return this.productionLimit === -1; + if (mode === 'webhook' || mode === 'trigger') return this.queues.get('production'); - if (mode === 'evaluation') return this.evaluationLimit === -1; + if (mode === 'evaluation') return this.queues.get('evaluation'); throw new UnknownExecutionModeError(mode); } - - private shouldReport(capacity: number) { - return config.getEnv('deployment.type') === 'cloud' && this.limitsToReport.includes(capacity); - } - - // private getQueue(mode: ExecutionMode) { - // if (['production', 'evaluation'].includes(mode)) { - // return this.queues.get(mode); - // } - // - // throw new UnknownExecutionModeError(mode); - // } } diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index 79ab2b5f04890..629e0a212c9a3 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -38,7 +38,7 @@ export const schema = { evaluationLimit: { doc: 'Max evaluation executions allowed to run concurrently. Default is `1`.', format: Number, - default: 1, + default: -1, env: 'N8N_CONCURRENCY_EVALUATION_LIMIT', }, }, diff --git a/packages/cli/src/events/maps/relay.event-map.ts b/packages/cli/src/events/maps/relay.event-map.ts index 0e725645715e2..4794bdedbf553 100644 --- a/packages/cli/src/events/maps/relay.event-map.ts +++ b/packages/cli/src/events/maps/relay.event-map.ts @@ -12,6 +12,7 @@ import type { GlobalRole, User } from '@/databases/entities/user'; import type { IWorkflowDb } from '@/interfaces'; import type { AiEventMap } from './ai.event-map'; +import { ConcurrencyType } from '@/concurrency/concurrency-control.service'; export type UserLike = { id: string; @@ -338,6 +339,7 @@ export type RelayEventMap = { 'execution-throttled': { executionId: string; + type: ConcurrencyType; }; 'execution-started-during-bootup': {