From 9831dbcf86f2bd09b5dfb7e4a0cc4e9791ca567b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Thu, 12 Sep 2024 12:55:24 +0200 Subject: [PATCH] fix(core): Restore queue listeners for `webhook` process (#10781) --- .../scaling/__tests__/scaling.service.test.ts | 28 +++++++++++++++---- packages/cli/src/scaling/scaling.service.ts | 8 +++--- 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/packages/cli/src/scaling/__tests__/scaling.service.test.ts b/packages/cli/src/scaling/__tests__/scaling.service.test.ts index 0395ce5577780..9fe36a36370c8 100644 --- a/packages/cli/src/scaling/__tests__/scaling.service.test.ts +++ b/packages/cli/src/scaling/__tests__/scaling.service.test.ts @@ -51,7 +51,7 @@ describe('ScalingService', () => { let scalingService: ScalingService; - let registerMainListenersSpy: jest.SpyInstance; + let registerMainOrWebhookListenersSpy: jest.SpyInstance; let registerWorkerListenersSpy: jest.SpyInstance; let scheduleQueueRecoverySpy: jest.SpyInstance; let stopQueueRecoverySpy: jest.SpyInstance; @@ -86,8 +86,11 @@ describe('ScalingService', () => { // @ts-expect-error Private method ScalingService.prototype.scheduleQueueRecovery = jest.fn(); - // @ts-expect-error Private method - registerMainListenersSpy = jest.spyOn(scalingService, 'registerMainListeners'); + registerMainOrWebhookListenersSpy = jest.spyOn( + scalingService, + // @ts-expect-error Private method + 'registerMainOrWebhookListeners', + ); // @ts-expect-error Private method registerWorkerListenersSpy = jest.spyOn(scalingService, 'registerWorkerListeners'); // @ts-expect-error Private method @@ -102,7 +105,7 @@ describe('ScalingService', () => { await scalingService.setupQueue(); expect(Bull).toHaveBeenCalledWith(...bullConstructorArgs); - expect(registerMainListenersSpy).toHaveBeenCalled(); + expect(registerMainOrWebhookListenersSpy).toHaveBeenCalled(); expect(registerWorkerListenersSpy).not.toHaveBeenCalled(); expect(scheduleQueueRecoverySpy).toHaveBeenCalled(); }); @@ -115,7 +118,7 @@ describe('ScalingService', () => { await scalingService.setupQueue(); expect(Bull).toHaveBeenCalledWith(...bullConstructorArgs); - expect(registerMainListenersSpy).toHaveBeenCalled(); + expect(registerMainOrWebhookListenersSpy).toHaveBeenCalled(); expect(registerWorkerListenersSpy).not.toHaveBeenCalled(); expect(scheduleQueueRecoverySpy).not.toHaveBeenCalled(); }); @@ -130,7 +133,20 @@ describe('ScalingService', () => { expect(Bull).toHaveBeenCalledWith(...bullConstructorArgs); expect(registerWorkerListenersSpy).toHaveBeenCalled(); - expect(registerMainListenersSpy).not.toHaveBeenCalled(); + expect(registerMainOrWebhookListenersSpy).not.toHaveBeenCalled(); + }); + }); + + describe('webhook', () => { + it('should set up a queue + listeners', async () => { + // @ts-expect-error Private field + scalingService.instanceType = 'webhook'; + + await scalingService.setupQueue(); + + expect(Bull).toHaveBeenCalledWith(...bullConstructorArgs); + expect(registerWorkerListenersSpy).not.toHaveBeenCalled(); + expect(registerMainOrWebhookListenersSpy).toHaveBeenCalled(); }); }); }); diff --git a/packages/cli/src/scaling/scaling.service.ts b/packages/cli/src/scaling/scaling.service.ts index 0803ac1ced796..77d5323b0f89b 100644 --- a/packages/cli/src/scaling/scaling.service.ts +++ b/packages/cli/src/scaling/scaling.service.ts @@ -209,8 +209,8 @@ export class ScalingService { throw error; }); - if (this.instanceType === 'main') { - this.registerMainListeners(); + if (this.instanceType === 'main' || this.instanceType === 'webhook') { + this.registerMainOrWebhookListeners(); } else if (this.instanceType === 'worker') { this.registerWorkerListeners(); } @@ -246,9 +246,9 @@ export class ScalingService { } /** - * Register listeners on a `main` process for Bull queue events. + * Register listeners on a `main` or `webhook` process for Bull queue events. */ - private registerMainListeners() { + private registerMainOrWebhookListeners() { this.queue.on('global:progress', (_jobId: JobId, msg: unknown) => { if (!this.isPubSubMessage(msg)) return;