From 9849e68eca63ce0ddd7f913ee77d34439568957a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Wed, 11 Sep 2024 09:08:09 +0200 Subject: [PATCH] AP-5046 Skip filtering if empty entries. --- packages/outbox-core/lib/outbox.ts | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/packages/outbox-core/lib/outbox.ts b/packages/outbox-core/lib/outbox.ts index 52f3b72e..ee944b9f 100644 --- a/packages/outbox-core/lib/outbox.ts +++ b/packages/outbox-core/lib/outbox.ts @@ -1,5 +1,5 @@ -import { AbstractPeriodicJob, type JobExecutionContext } from '@lokalise/background-jobs-common' import type { PeriodicJobDependencies } from '@lokalise/background-jobs-common' +import { AbstractPeriodicJob, type JobExecutionContext } from '@lokalise/background-jobs-common' import type { CommonEventDefinition, CommonEventDefinitionPublisherSchemaType, @@ -9,6 +9,7 @@ import type { import { PromisePool } from '@supercharge/promise-pool' import { uuidv7 } from 'uuidv7' import type { OutboxAccumulator } from './accumulators' +import type { OutboxEntry } from './objects' import type { OutboxStorage } from './storage' export type OutboxDependencies = { @@ -42,10 +43,8 @@ export class OutboxProcessor { const entries = await outboxStorage.getEntries(this.outboxProcessorConfiguration.maxRetryCount) - const currentEntriesInAccumulator = new Set( - (await outboxAccumulator.getEntries()).map((entry) => entry.id), - ) - const filteredEntries = entries.filter((entry) => !currentEntriesInAccumulator.has(entry.id)) + const filteredEntries = + entries.length === 0 ? entries : await this.getFilteredEntries(entries, outboxAccumulator) await PromisePool.for(filteredEntries) .withConcurrency(this.outboxProcessorConfiguration.emitBatchSize) @@ -63,6 +62,16 @@ export class OutboxProcessor { await outboxStorage.flush(outboxAccumulator) await outboxAccumulator.clear() } + + private async getFilteredEntries( + entries: OutboxEntry[], + outboxAccumulator: OutboxAccumulator, + ) { + const currentEntriesInAccumulator = new Set( + (await outboxAccumulator.getEntries()).map((entry) => entry.id), + ) + return entries.filter((entry) => !currentEntriesInAccumulator.has(entry.id)) + } } /**