From 64756c4c7c39ff74f921695dea2915fecfeb2747 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Tue, 3 Sep 2024 16:11:52 +0200 Subject: [PATCH 01/31] AP-5046 New package. --- packages/outbox-core/index.ts | 1 + packages/outbox-core/lib/outbox.ts | 9 ++++ packages/outbox-core/package.json | 61 ++++++++++++++++++++++ packages/outbox-core/tsconfig.json | 27 ++++++++++ packages/outbox-core/tsconfig.release.json | 5 ++ packages/outbox-core/vitest.config.mts | 23 ++++++++ 6 files changed, 126 insertions(+) create mode 100644 packages/outbox-core/index.ts create mode 100644 packages/outbox-core/lib/outbox.ts create mode 100644 packages/outbox-core/package.json create mode 100644 packages/outbox-core/tsconfig.json create mode 100644 packages/outbox-core/tsconfig.release.json create mode 100644 packages/outbox-core/vitest.config.mts diff --git a/packages/outbox-core/index.ts b/packages/outbox-core/index.ts new file mode 100644 index 00000000..cc01044a --- /dev/null +++ b/packages/outbox-core/index.ts @@ -0,0 +1 @@ +export * from './lib/outbox' diff --git a/packages/outbox-core/lib/outbox.ts b/packages/outbox-core/lib/outbox.ts new file mode 100644 index 00000000..d4aa8b92 --- /dev/null +++ b/packages/outbox-core/lib/outbox.ts @@ -0,0 +1,9 @@ +export interface OutboxStorage { + saveMessage(message: unknown): Promise + getMessage(): Promise + deleteMessage(): Promise +} + +export class OutboxProcessor { + +} diff --git a/packages/outbox-core/package.json b/packages/outbox-core/package.json new file mode 100644 index 00000000..9d8449a5 --- /dev/null +++ b/packages/outbox-core/package.json @@ -0,0 +1,61 @@ +{ + "name": "@message-queue-toolkit/outbox-core", + "version": "0.1.0", + "private": false, + "license": "MIT", + "description": "Outbox pattern implementation for message queue toolkit", + "maintainers": [ + { + "name": "Igor Savin", + "email": "kibertoad@gmail.com" + } + ], + "main": "dist/index.js", + "types": "dist/index.d.ts", + "scripts": { + "build": "del-cli dist && tsc", + "build:release": "del-cli dist && del-cli coverage && npm run lint && tsc --project tsconfig.release.json", + "test": "vitest", + "test:coverage": "npm test -- --coverage", + "test:ci": "npm run docker:start:dev && npm run test:coverage && npm run docker:stop:dev", + "lint": "biome check . && tsc --project tsconfig.json --noEmit", + "lint:fix": "biome check --write .", + "docker:start:dev": "docker compose up -d", + "docker:stop:dev": "docker compose down", + "prepublishOnly": "npm run build:release" + }, + "dependencies": { + "@lokalise/background-jobs-common": "^7.6.1" + }, + "peerDependencies": { + "@message-queue-toolkit/core": ">=14.0.0" + }, + "devDependencies": { + "@message-queue-toolkit/core": "*", + "@biomejs/biome": "1.8.3", + "@kibertoad/biome-config": "^1.2.1", + "@types/node": "^22.0.0", + "@vitest/coverage-v8": "^2.0.4", + "del-cli": "^5.1.0", + "typescript": "^5.5.3", + "vitest": "^2.0.4" + }, + "homepage": "https://github.com/kibertoad/message-queue-toolkit", + "repository": { + "type": "git", + "url": "git://github.com/kibertoad/message-queue-toolkit.git" + }, + "keywords": [ + "message", + "queue", + "queues", + "abstract", + "common", + "utils", + "notification", + "s3", + "store", + "claim-check" + ], + "files": ["README.md", "LICENSE", "dist/*"] +} diff --git a/packages/outbox-core/tsconfig.json b/packages/outbox-core/tsconfig.json new file mode 100644 index 00000000..9cd7c80a --- /dev/null +++ b/packages/outbox-core/tsconfig.json @@ -0,0 +1,27 @@ +{ + "compilerOptions": { + "outDir": "dist", + "module": "commonjs", + "target": "ES2022", + "lib": ["ES2022", "dom"], + "sourceMap": true, + "declaration": true, + "declarationMap": false, + "types": ["node", "vitest/globals"], + "strict": true, + "moduleResolution": "node", + "noUnusedLocals": false, + "noUnusedParameters": false, + "noFallthroughCasesInSwitch": true, + "strictNullChecks": true, + "importHelpers": true, + "baseUrl": ".", + "skipLibCheck": true, + "allowSyntheticDefaultImports": true, + "esModuleInterop": true, + "forceConsistentCasingInFileNames": true, + "resolveJsonModule": true + }, + "include": ["lib/**/*.ts", "test/**/*.ts", "index.ts"], + "exclude": ["node_modules", "dist"] +} diff --git a/packages/outbox-core/tsconfig.release.json b/packages/outbox-core/tsconfig.release.json new file mode 100644 index 00000000..93ab99f8 --- /dev/null +++ b/packages/outbox-core/tsconfig.release.json @@ -0,0 +1,5 @@ +{ + "extends": "./tsconfig.json", + "include": ["lib/**/*.ts", "index.ts"], + "exclude": ["node_modules", "dist", "lib/**/*.spec.ts"] +} diff --git a/packages/outbox-core/vitest.config.mts b/packages/outbox-core/vitest.config.mts new file mode 100644 index 00000000..2bcce478 --- /dev/null +++ b/packages/outbox-core/vitest.config.mts @@ -0,0 +1,23 @@ +import { defineConfig } from 'vitest/config' + +export default defineConfig({ + test: { + globals: true, + watch: false, + environment: 'node', + reporters: ['default'], + coverage: { + provider: 'v8', + include: ['lib/**/*.ts'], + exclude: ['lib/**/*.spec.ts', 'lib/**/*.test.ts', 'test/**/*.*'], + reporter: ['text'], + all: true, + thresholds: { + lines: 100, + functions: 100, + branches: 91.66, + statements: 100, + }, + }, + }, +}) From cf8755023c63a8ede1c1eadebbd2ba67fbb5fac2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Wed, 4 Sep 2024 11:15:33 +0200 Subject: [PATCH 02/31] AP-5046 Implementation + interfaces snippets. --- packages/outbox-core/lib/outbox.ts | 145 ++++++++++++++++++++++++++++- packages/outbox-core/package.json | 5 +- 2 files changed, 143 insertions(+), 7 deletions(-) diff --git a/packages/outbox-core/lib/outbox.ts b/packages/outbox-core/lib/outbox.ts index d4aa8b92..7f4ae983 100644 --- a/packages/outbox-core/lib/outbox.ts +++ b/packages/outbox-core/lib/outbox.ts @@ -1,9 +1,144 @@ -export interface OutboxStorage { - saveMessage(message: unknown): Promise - getMessage(): Promise - deleteMessage(): Promise +import { AbstractPeriodicJob, type JobExecutionContext } from '@lokalise/background-jobs-common' +import type { PeriodicJobDependencies } from '@lokalise/background-jobs-common/dist/periodic-jobs/periodicJobTypes' +import { generateUuid7 } from '@lokalise/id-utils' +import type { + CommonEventDefinition, + CommonEventDefinitionPublisherSchemaType, + ConsumerMessageMetadataType, + DomainEventEmitter, +} from '@message-queue-toolkit/core' + +/** + * Status of the outbox entry. + * - CREATED - entry was created and is waiting to be processed to publish actual event + * - ACKED - entry was picked up by outbox job and is being processed + * - SUCCESS - entry was successfully processed, event was published + * - FAILED - entry processing failed, it will be retried + */ +export type OutboxEntryStatus = 'CREATED' | 'ACKED' | 'SUCCESS' | 'FAILED' + +export type OutboxEntry = { + id: string + event: SupportedEvent + data: Omit, 'type'> + precedingMessageMetadata?: Partial + status: OutboxEntryStatus + created: Date + updated?: Date + retryCount: number +} + +/** + * Takes care of persisting and retrieving outbox entries. + * + * Implementation is required: + * - in order to fulfill at least once delivery guarantee, persisting entries should be performed inside isolated transaction + * - to return entries in the order they were created (UUID7 is used to create entries in OutboxEventEmitter) + * - returned entries should not include the ones with 'SUCCESS' status + */ +export interface OutboxStorage { + create( + outboxEntry: OutboxEntry, + ): Promise> + + update( + outboxEntry: OutboxEntry, + ): Promise> + + /** + * Returns entries in the order they were created. It doesn't return entries with 'SUCCESS' status. It doesn't return entries that have been retried more than maxRetryCount times. + * + * For example if entry retryCount is 1 and maxRetryCount is 1, entry MUST be returned. If it fails again then retry count is 2, in that case entry MUST NOT be returned. + */ + getEntries(maxRetryCount: number): Promise[]> +} + +/** + * Periodic job that processes outbox entries every second. If processing takes longer than 1 second, another subsequent job WILL NOT be started. + * + * Each entry is ACKed, then event is published, and then entry is marked as SUCCESS. If processing fails, entry is marked as FAILED and will be retried. + * + * Max retry count is defined by the user. + */ +export class OutboxPeriodicJob< + SupportedEvents extends CommonEventDefinition[], +> extends AbstractPeriodicJob { + constructor( + private readonly outboxStorage: OutboxStorage, + private readonly eventEmitter: DomainEventEmitter, + private readonly maxRetryCount: number, + dependencies: PeriodicJobDependencies, + ) { + super( + { + jobId: 'OutboxJob', + schedule: { + intervalInMs: 1000, + }, + singleConsumerMode: { + enabled: true, + }, + }, + { + redis: dependencies.redis, + logger: dependencies.logger, + transactionObservabilityManager: dependencies.transactionObservabilityManager, + errorReporter: dependencies.errorReporter, + scheduler: dependencies.scheduler, + }, + ) + } + + protected async processInternal(context: JobExecutionContext): Promise { + const entries = await this.outboxStorage.getEntries(this.maxRetryCount) + + for (const entry of entries) { + try { + const updatedEntry = await this.outboxStorage.update({ + ...entry, + updated: new Date(), + status: 'ACKED', + }) + + await this.eventEmitter.emit(entry.event, entry.data, entry.precedingMessageMetadata) + + await this.outboxStorage.update({ ...updatedEntry, updated: new Date(), status: 'SUCCESS' }) + } catch (e) { + context.logger.error({ error: e }, 'Failed to process outbox entry.') + + await this.outboxStorage.update({ + ...entry, + updated: new Date(), + status: 'FAILED', + retryCount: entry.retryCount + 1, + }) + } + } + } } -export class OutboxProcessor { +export class OutboxEventEmitter { + constructor(private storage: OutboxStorage) {} + /** + * Persists outbox entry in persistence layer, later it will be picked up by outbox job. + * @param supportedEvent + * @param data + * @param precedingMessageMetadata + */ + public async emit( + supportedEvent: SupportedEvent, + data: Omit, 'type'>, + precedingMessageMetadata?: Partial, + ) { + await this.storage.create({ + id: generateUuid7(), + event: supportedEvent, + data, + precedingMessageMetadata, + status: 'CREATED', + created: new Date(), + retryCount: 0, + }) + } } diff --git a/packages/outbox-core/package.json b/packages/outbox-core/package.json index 9d8449a5..fb6dd096 100644 --- a/packages/outbox-core/package.json +++ b/packages/outbox-core/package.json @@ -25,15 +25,16 @@ "prepublishOnly": "npm run build:release" }, "dependencies": { - "@lokalise/background-jobs-common": "^7.6.1" + "@lokalise/background-jobs-common": "^7.6.1", + "@lokalise/id-utils": "^2.2.0" }, "peerDependencies": { "@message-queue-toolkit/core": ">=14.0.0" }, "devDependencies": { - "@message-queue-toolkit/core": "*", "@biomejs/biome": "1.8.3", "@kibertoad/biome-config": "^1.2.1", + "@message-queue-toolkit/core": "*", "@types/node": "^22.0.0", "@vitest/coverage-v8": "^2.0.4", "del-cli": "^5.1.0", From 047dc08d85473e0407c5a18a2df34765be43cebe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Wed, 4 Sep 2024 12:59:25 +0200 Subject: [PATCH 03/31] Tests WIP. --- packages/outbox-core/lib/outbox.spec.ts | 32 +++++++++++ packages/outbox-core/lib/outbox.ts | 74 ++++++++++++++++--------- packages/outbox-core/package.json | 7 ++- 3 files changed, 83 insertions(+), 30 deletions(-) create mode 100644 packages/outbox-core/lib/outbox.spec.ts diff --git a/packages/outbox-core/lib/outbox.spec.ts b/packages/outbox-core/lib/outbox.spec.ts new file mode 100644 index 00000000..1e7cabe3 --- /dev/null +++ b/packages/outbox-core/lib/outbox.spec.ts @@ -0,0 +1,32 @@ +import { + type SnsAwareEventDefinition, + enrichEventSchemaWithBase, +} from '@message-queue-toolkit/schemas' +import { describe } from 'vitest' +import { z } from 'zod' +import type { OutboxStorage } from './outbox' + +const TEST_EVENT_SCHEMA = z.object({ + name: z.string(), + age: z.number(), +}) + +const testEvents = { + 'test-event.something-happened': { + ...enrichEventSchemaWithBase('test-event.something-happened', TEST_EVENT_SCHEMA), + snsTopic: 'TEST_TOPIC', + producedBy: ['unit tes'], + }, +} satisfies Record + +console.log(testEvents) + +const testEventsArray = [...Object.values(testEvents)] satisfies SnsAwareEventDefinition[] + +type TestEvents = typeof testEventsArray + +class InMemoryOutbox implements OutboxStorage {} + +describe('outbox', () => { + it('saves outbox entry to process it later', async () => {}) +}) diff --git a/packages/outbox-core/lib/outbox.ts b/packages/outbox-core/lib/outbox.ts index 7f4ae983..a7649031 100644 --- a/packages/outbox-core/lib/outbox.ts +++ b/packages/outbox-core/lib/outbox.ts @@ -53,6 +53,41 @@ export interface OutboxStorage getEntries(maxRetryCount: number): Promise[]> } +export class OutboxProcessor { + constructor( + private readonly outboxStorage: OutboxStorage, + private readonly eventEmitter: DomainEventEmitter, + private readonly maxRetryCount: number, + ) {} + + public async processOutboxEntries(context: JobExecutionContext) { + const entries = await this.outboxStorage.getEntries(this.maxRetryCount) + + for (const entry of entries) { + try { + const updatedEntry = await this.outboxStorage.update({ + ...entry, + updated: new Date(), + status: 'ACKED', + }) + + await this.eventEmitter.emit(entry.event, entry.data, entry.precedingMessageMetadata) + + await this.outboxStorage.update({ ...updatedEntry, updated: new Date(), status: 'SUCCESS' }) + } catch (e) { + context.logger.error({ error: e }, 'Failed to process outbox entry.') + + await this.outboxStorage.update({ + ...entry, + updated: new Date(), + status: 'FAILED', + retryCount: entry.retryCount + 1, + }) + } + } + } +} + /** * Periodic job that processes outbox entries every second. If processing takes longer than 1 second, another subsequent job WILL NOT be started. * @@ -63,10 +98,12 @@ export interface OutboxStorage export class OutboxPeriodicJob< SupportedEvents extends CommonEventDefinition[], > extends AbstractPeriodicJob { + private readonly outboxProcessor: OutboxProcessor + constructor( - private readonly outboxStorage: OutboxStorage, - private readonly eventEmitter: DomainEventEmitter, - private readonly maxRetryCount: number, + outboxStorage: OutboxStorage, + eventEmitter: DomainEventEmitter, + maxRetryCount: number, dependencies: PeriodicJobDependencies, ) { super( @@ -87,33 +124,16 @@ export class OutboxPeriodicJob< scheduler: dependencies.scheduler, }, ) + + this.outboxProcessor = new OutboxProcessor( + outboxStorage, + eventEmitter, + maxRetryCount, + ) } protected async processInternal(context: JobExecutionContext): Promise { - const entries = await this.outboxStorage.getEntries(this.maxRetryCount) - - for (const entry of entries) { - try { - const updatedEntry = await this.outboxStorage.update({ - ...entry, - updated: new Date(), - status: 'ACKED', - }) - - await this.eventEmitter.emit(entry.event, entry.data, entry.precedingMessageMetadata) - - await this.outboxStorage.update({ ...updatedEntry, updated: new Date(), status: 'SUCCESS' }) - } catch (e) { - context.logger.error({ error: e }, 'Failed to process outbox entry.') - - await this.outboxStorage.update({ - ...entry, - updated: new Date(), - status: 'FAILED', - retryCount: entry.retryCount + 1, - }) - } - } + await this.outboxProcessor.processOutboxEntries(context) } } diff --git a/packages/outbox-core/package.json b/packages/outbox-core/package.json index fb6dd096..da440c86 100644 --- a/packages/outbox-core/package.json +++ b/packages/outbox-core/package.json @@ -29,17 +29,18 @@ "@lokalise/id-utils": "^2.2.0" }, "peerDependencies": { - "@message-queue-toolkit/core": ">=14.0.0" + "@message-queue-toolkit/core": ">=14.0.0", + "@message-queue-toolkit/schemas": ">=4.0.0" }, "devDependencies": { "@biomejs/biome": "1.8.3", "@kibertoad/biome-config": "^1.2.1", - "@message-queue-toolkit/core": "*", "@types/node": "^22.0.0", "@vitest/coverage-v8": "^2.0.4", "del-cli": "^5.1.0", "typescript": "^5.5.3", - "vitest": "^2.0.4" + "vitest": "^2.0.4", + "zod": "^3.23.8" }, "homepage": "https://github.com/kibertoad/message-queue-toolkit", "repository": { From 9a9bf28858ffb5be5420e1e24bef575cb024a7ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Wed, 4 Sep 2024 15:19:24 +0200 Subject: [PATCH 04/31] WIP tests --- packages/outbox-core/lib/outbox.spec.ts | 176 +++++++++++++++++++++--- 1 file changed, 157 insertions(+), 19 deletions(-) diff --git a/packages/outbox-core/lib/outbox.spec.ts b/packages/outbox-core/lib/outbox.spec.ts index 1e7cabe3..79f33322 100644 --- a/packages/outbox-core/lib/outbox.spec.ts +++ b/packages/outbox-core/lib/outbox.spec.ts @@ -1,32 +1,170 @@ +import { randomUUID } from 'node:crypto' import { - type SnsAwareEventDefinition, - enrichEventSchemaWithBase, + CommonMetadataFiller, + DomainEventEmitter, + EventRegistry, +} from '@message-queue-toolkit/core' +import { + type CommonEventDefinition, + type CommonEventDefinitionPublisherSchemaType, + enrichMessageSchemaWithBase, } from '@message-queue-toolkit/schemas' -import { describe } from 'vitest' +import pino, { type Logger } from 'pino' +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' import { z } from 'zod' -import type { OutboxStorage } from './outbox' +import { type OutboxEntry, OutboxEventEmitter, OutboxProcessor, type OutboxStorage } from './outbox' -const TEST_EVENT_SCHEMA = z.object({ - name: z.string(), - age: z.number(), -}) +const TestEvents = { + created: { + ...enrichMessageSchemaWithBase( + 'entity.created', + z.object({ + message: z.string(), + }), + ), + }, + + updated: { + ...enrichMessageSchemaWithBase( + 'entity.updated', + z.object({ + message: z.string(), + }), + ), + }, +} as const satisfies Record + +type TestEventsType = (typeof TestEvents)[keyof typeof TestEvents][] + +const createdEventPayload: CommonEventDefinitionPublisherSchemaType = { + payload: { + message: 'msg', + }, + type: 'entity.created', + metadata: { + originatedFrom: 'service', + producedBy: 'producer', + schemaVersion: '1', + correlationId: randomUUID(), + }, +} -const testEvents = { - 'test-event.something-happened': { - ...enrichEventSchemaWithBase('test-event.something-happened', TEST_EVENT_SCHEMA), - snsTopic: 'TEST_TOPIC', - producedBy: ['unit tes'], +const updatedEventPayload: CommonEventDefinitionPublisherSchemaType = { + ...createdEventPayload, + type: 'entity.updated', +} + +const expectedCreatedPayload = { + id: expect.any(String), + timestamp: expect.any(String), + payload: { + message: 'msg', + }, + type: 'entity.created', + metadata: { + correlationId: expect.any(String), + originatedFrom: 'service', + producedBy: 'producer', + schemaVersion: '1', }, -} satisfies Record +} + +const expectedUpdatedPayload = { + ...expectedCreatedPayload, + type: 'entity.updated', +} + +const TestLogger: Logger = pino() + +class InMemoryOutboxStorage + implements OutboxStorage +{ + public entries: OutboxEntry[] = [] -console.log(testEvents) + create( + outboxEntry: OutboxEntry, + ): Promise> { + this.entries = [...this.entries, outboxEntry] -const testEventsArray = [...Object.values(testEvents)] satisfies SnsAwareEventDefinition[] + return Promise.resolve(outboxEntry) + } -type TestEvents = typeof testEventsArray + getEntries(maxRetryCount: number): Promise[]> { + const entries = this.entries.filter((entry) => { + return entry.status !== 'SUCCESS' && entry.retryCount <= maxRetryCount + }) -class InMemoryOutbox implements OutboxStorage {} + return Promise.resolve(entries) + } + + update( + outboxEntry: OutboxEntry, + ): Promise> { + this.entries = this.entries.map((entry) => { + if (entry.id === outboxEntry.id) { + return outboxEntry + } + return entry + }) + + return Promise.resolve(outboxEntry) + } +} describe('outbox', () => { - it('saves outbox entry to process it later', async () => {}) + let outboxProcessor: OutboxProcessor + let eventEmitter: DomainEventEmitter + let outboxEventEmitter: OutboxEventEmitter + let outboxStorage: InMemoryOutboxStorage + + beforeEach(() => { + eventEmitter = new DomainEventEmitter({ + logger: TestLogger, + errorReporter: { report: () => {} }, + eventRegistry: new EventRegistry(Object.values(TestEvents)), + metadataFiller: new CommonMetadataFiller({ + serviceId: 'test', + }), + }) + + outboxStorage = new InMemoryOutboxStorage() + outboxEventEmitter = new OutboxEventEmitter(outboxStorage) + outboxProcessor = new OutboxProcessor(outboxStorage, eventEmitter, 2) + }) + + afterEach(() => { + vi.restoreAllMocks() + }) + + it('saves outbox entry to storage', async () => { + await outboxEventEmitter.emit(TestEvents.created, createdEventPayload, { + correlationId: randomUUID(), + }) + + const entries = await outboxStorage.getEntries(2) + + expect(entries).toHaveLength(1) + }) + + it('saves outbox entry and process it', async () => { + await outboxEventEmitter.emit(TestEvents.created, createdEventPayload, { + correlationId: randomUUID(), + }) + + await outboxProcessor.processOutboxEntries({ + logger: TestLogger, + reqId: randomUUID(), + executorId: randomUUID(), + }) + + const entries = await outboxStorage.getEntries(2) + + expect(entries).toHaveLength(0) + + expect(outboxStorage.entries).toMatchObject([ + { + status: 'SUCCESS', + }, + ]) + }) }) From c3b08eac87063b0cb88825d2521141fd6ce9ce09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Wed, 4 Sep 2024 15:28:40 +0200 Subject: [PATCH 05/31] another test --- packages/outbox-core/lib/outbox.spec.ts | 54 +++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/packages/outbox-core/lib/outbox.spec.ts b/packages/outbox-core/lib/outbox.spec.ts index 79f33322..326f7be2 100644 --- a/packages/outbox-core/lib/outbox.spec.ts +++ b/packages/outbox-core/lib/outbox.spec.ts @@ -160,10 +160,64 @@ describe('outbox', () => { const entries = await outboxStorage.getEntries(2) expect(entries).toHaveLength(0) + expect(outboxStorage.entries).toMatchObject([ + { + status: 'SUCCESS', + }, + ]) + }) + + it('saves outbox entry and process it with error and retries', async () => { + await outboxEventEmitter.emit(TestEvents.created, createdEventPayload, { + correlationId: randomUUID(), + }) + + const mockedEventEmitter = vi.spyOn(eventEmitter, 'emit') + mockedEventEmitter.mockImplementationOnce(() => { + throw new Error('Could not emit event.') + }) + mockedEventEmitter.mockImplementationOnce(() => + Promise.resolve({ + ...createdEventPayload, + id: randomUUID(), + timestamp: new Date().toISOString(), + metadata: { + schemaVersion: '1', + producedBy: 'test', + originatedFrom: 'service', + correlationId: randomUUID(), + }, + }), + ) + + await outboxProcessor.processOutboxEntries({ + logger: TestLogger, + reqId: randomUUID(), + executorId: randomUUID(), + }) + + let entries = await outboxStorage.getEntries(2) + expect(entries).toHaveLength(1) + expect(outboxStorage.entries).toMatchObject([ + { + status: 'FAILED', + retryCount: 1, + }, + ]) + + //Now let's process again successfully + await outboxProcessor.processOutboxEntries({ + logger: TestLogger, + reqId: randomUUID(), + executorId: randomUUID(), + }) + entries = await outboxStorage.getEntries(2) + expect(entries).toHaveLength(0) //Nothing to process anymore expect(outboxStorage.entries).toMatchObject([ { status: 'SUCCESS', + retryCount: 1, }, ]) }) From 043c6266b3985f784f189cc564657e00fd7fcb0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Thu, 5 Sep 2024 10:49:06 +0200 Subject: [PATCH 06/31] AP-5046 Test. --- packages/outbox-core/lib/outbox.spec.ts | 74 ++++++++++++++++++++++--- 1 file changed, 65 insertions(+), 9 deletions(-) diff --git a/packages/outbox-core/lib/outbox.spec.ts b/packages/outbox-core/lib/outbox.spec.ts index 326f7be2..7f9955ea 100644 --- a/packages/outbox-core/lib/outbox.spec.ts +++ b/packages/outbox-core/lib/outbox.spec.ts @@ -111,6 +111,8 @@ class InMemoryOutboxStorage } } +const MAX_RETRY_COUNT = 2 + describe('outbox', () => { let outboxProcessor: OutboxProcessor let eventEmitter: DomainEventEmitter @@ -129,7 +131,11 @@ describe('outbox', () => { outboxStorage = new InMemoryOutboxStorage() outboxEventEmitter = new OutboxEventEmitter(outboxStorage) - outboxProcessor = new OutboxProcessor(outboxStorage, eventEmitter, 2) + outboxProcessor = new OutboxProcessor( + outboxStorage, + eventEmitter, + MAX_RETRY_COUNT, + ) }) afterEach(() => { @@ -141,7 +147,7 @@ describe('outbox', () => { correlationId: randomUUID(), }) - const entries = await outboxStorage.getEntries(2) + const entries = await outboxStorage.getEntries(MAX_RETRY_COUNT) expect(entries).toHaveLength(1) }) @@ -157,7 +163,7 @@ describe('outbox', () => { executorId: randomUUID(), }) - const entries = await outboxStorage.getEntries(2) + const entries = await outboxStorage.getEntries(MAX_RETRY_COUNT) expect(entries).toHaveLength(0) expect(outboxStorage.entries).toMatchObject([ @@ -168,10 +174,6 @@ describe('outbox', () => { }) it('saves outbox entry and process it with error and retries', async () => { - await outboxEventEmitter.emit(TestEvents.created, createdEventPayload, { - correlationId: randomUUID(), - }) - const mockedEventEmitter = vi.spyOn(eventEmitter, 'emit') mockedEventEmitter.mockImplementationOnce(() => { throw new Error('Could not emit event.') @@ -190,13 +192,17 @@ describe('outbox', () => { }), ) + await outboxEventEmitter.emit(TestEvents.created, createdEventPayload, { + correlationId: randomUUID(), + }) + await outboxProcessor.processOutboxEntries({ logger: TestLogger, reqId: randomUUID(), executorId: randomUUID(), }) - let entries = await outboxStorage.getEntries(2) + let entries = await outboxStorage.getEntries(MAX_RETRY_COUNT) expect(entries).toHaveLength(1) expect(outboxStorage.entries).toMatchObject([ { @@ -212,7 +218,7 @@ describe('outbox', () => { executorId: randomUUID(), }) - entries = await outboxStorage.getEntries(2) + entries = await outboxStorage.getEntries(MAX_RETRY_COUNT) expect(entries).toHaveLength(0) //Nothing to process anymore expect(outboxStorage.entries).toMatchObject([ { @@ -221,4 +227,54 @@ describe('outbox', () => { }, ]) }) + + it('no longer processes the event if exceeded retry count', async () => { + //Let's always fail the event + const mockedEventEmitter = vi.spyOn(eventEmitter, 'emit') + mockedEventEmitter.mockImplementation(() => { + throw new Error('Could not emit event.') + }) + + //Persist the event + await outboxEventEmitter.emit(TestEvents.created, createdEventPayload, { + correlationId: randomUUID(), + }) + + //Initially event is present in outbox storage. + expect(await outboxStorage.getEntries(MAX_RETRY_COUNT)).toHaveLength(1) + + //Retry +1 + await outboxProcessor.processOutboxEntries({ + logger: TestLogger, + reqId: randomUUID(), + executorId: randomUUID(), + }) + //Still present + expect(await outboxStorage.getEntries(MAX_RETRY_COUNT)).toHaveLength(1) + + //Retry +2 + await outboxProcessor.processOutboxEntries({ + logger: TestLogger, + reqId: randomUUID(), + executorId: randomUUID(), + }) + //Stil present + expect(await outboxStorage.getEntries(MAX_RETRY_COUNT)).toHaveLength(1) + + //Retry +3 + await outboxProcessor.processOutboxEntries({ + logger: TestLogger, + reqId: randomUUID(), + executorId: randomUUID(), + }) + //Now it's gone, we no longer try to process it + expect(await outboxStorage.getEntries(MAX_RETRY_COUNT)).toHaveLength(0) + + expect(outboxStorage.entries).toMatchObject([ + { + status: 'FAILED', + retryCount: 3, + }, + ]) + }) }) From 67808d5502a4d93c4dfca444d39d5126cca089c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Thu, 5 Sep 2024 13:25:36 +0200 Subject: [PATCH 07/31] AP-5046 Comment. --- packages/outbox-core/lib/outbox.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/packages/outbox-core/lib/outbox.ts b/packages/outbox-core/lib/outbox.ts index a7649031..8b91e547 100644 --- a/packages/outbox-core/lib/outbox.ts +++ b/packages/outbox-core/lib/outbox.ts @@ -26,6 +26,7 @@ export type OutboxEntry = { created: Date updated?: Date retryCount: number + lockedUntil?: Date } /** @@ -53,6 +54,11 @@ export interface OutboxStorage getEntries(maxRetryCount: number): Promise[]> } +/** + * Main logic for handling outbox entries. + * + * If entry is rejected, it is NOT going to be handled during the same execution. Next execution will pick it up. + */ export class OutboxProcessor { constructor( private readonly outboxStorage: OutboxStorage, From cae5c3496514f4830fa830ed03e2b38a36192462 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Thu, 5 Sep 2024 14:34:48 +0200 Subject: [PATCH 08/31] Configurable intervalInMs via constructor. --- packages/outbox-core/lib/outbox.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/packages/outbox-core/lib/outbox.ts b/packages/outbox-core/lib/outbox.ts index 8b91e547..6d89eba3 100644 --- a/packages/outbox-core/lib/outbox.ts +++ b/packages/outbox-core/lib/outbox.ts @@ -95,7 +95,7 @@ export class OutboxProcessor { } /** - * Periodic job that processes outbox entries every second. If processing takes longer than 1 second, another subsequent job WILL NOT be started. + * Periodic job that processes outbox entries every "intervalInMs". If processing takes longer than defined interval, another subsequent job WILL NOT be started. * * Each entry is ACKed, then event is published, and then entry is marked as SUCCESS. If processing fails, entry is marked as FAILED and will be retried. * @@ -109,14 +109,15 @@ export class OutboxPeriodicJob< constructor( outboxStorage: OutboxStorage, eventEmitter: DomainEventEmitter, - maxRetryCount: number, dependencies: PeriodicJobDependencies, + maxRetryCount: number, + intervalInMs: number, ) { super( { jobId: 'OutboxJob', schedule: { - intervalInMs: 1000, + intervalInMs: intervalInMs, }, singleConsumerMode: { enabled: true, From 9ac95faccd9934f97ef4b3e0a2c98c5509690309 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Thu, 5 Sep 2024 14:44:33 +0200 Subject: [PATCH 09/31] Fixed import. --- packages/outbox-core/lib/outbox.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/outbox-core/lib/outbox.ts b/packages/outbox-core/lib/outbox.ts index 6d89eba3..0cf7d3e0 100644 --- a/packages/outbox-core/lib/outbox.ts +++ b/packages/outbox-core/lib/outbox.ts @@ -1,5 +1,5 @@ import { AbstractPeriodicJob, type JobExecutionContext } from '@lokalise/background-jobs-common' -import type { PeriodicJobDependencies } from '@lokalise/background-jobs-common/dist/periodic-jobs/periodicJobTypes' +import type { PeriodicJobDependencies } from '@lokalise/background-jobs-common' import { generateUuid7 } from '@lokalise/id-utils' import type { CommonEventDefinition, From 3dcc037db7667188098e06cb7556436510d72297 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Thu, 5 Sep 2024 14:48:56 +0200 Subject: [PATCH 10/31] No locked until for now. --- packages/outbox-core/lib/outbox.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/outbox-core/lib/outbox.ts b/packages/outbox-core/lib/outbox.ts index 0cf7d3e0..ed4ec447 100644 --- a/packages/outbox-core/lib/outbox.ts +++ b/packages/outbox-core/lib/outbox.ts @@ -26,7 +26,6 @@ export type OutboxEntry = { created: Date updated?: Date retryCount: number - lockedUntil?: Date } /** From 4fe8053fa3bc6f438c2e9fb195e13e4d5e30c9db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Thu, 5 Sep 2024 16:00:18 +0200 Subject: [PATCH 11/31] Direct uuidv7 lib usage. --- packages/outbox-core/lib/outbox.spec.ts | 25 ------------------------- packages/outbox-core/lib/outbox.ts | 4 ++-- packages/outbox-core/package.json | 2 +- 3 files changed, 3 insertions(+), 28 deletions(-) diff --git a/packages/outbox-core/lib/outbox.spec.ts b/packages/outbox-core/lib/outbox.spec.ts index 7f9955ea..6edee772 100644 --- a/packages/outbox-core/lib/outbox.spec.ts +++ b/packages/outbox-core/lib/outbox.spec.ts @@ -49,31 +49,6 @@ const createdEventPayload: CommonEventDefinitionPublisherSchemaType = { - ...createdEventPayload, - type: 'entity.updated', -} - -const expectedCreatedPayload = { - id: expect.any(String), - timestamp: expect.any(String), - payload: { - message: 'msg', - }, - type: 'entity.created', - metadata: { - correlationId: expect.any(String), - originatedFrom: 'service', - producedBy: 'producer', - schemaVersion: '1', - }, -} - -const expectedUpdatedPayload = { - ...expectedCreatedPayload, - type: 'entity.updated', -} - const TestLogger: Logger = pino() class InMemoryOutboxStorage diff --git a/packages/outbox-core/lib/outbox.ts b/packages/outbox-core/lib/outbox.ts index ed4ec447..8fc77087 100644 --- a/packages/outbox-core/lib/outbox.ts +++ b/packages/outbox-core/lib/outbox.ts @@ -1,12 +1,12 @@ import { AbstractPeriodicJob, type JobExecutionContext } from '@lokalise/background-jobs-common' import type { PeriodicJobDependencies } from '@lokalise/background-jobs-common' -import { generateUuid7 } from '@lokalise/id-utils' import type { CommonEventDefinition, CommonEventDefinitionPublisherSchemaType, ConsumerMessageMetadataType, DomainEventEmitter, } from '@message-queue-toolkit/core' +import { uuidv7 } from 'uuidv7' /** * Status of the outbox entry. @@ -158,7 +158,7 @@ export class OutboxEventEmitter precedingMessageMetadata?: Partial, ) { await this.storage.create({ - id: generateUuid7(), + id: uuidv7(), event: supportedEvent, data, precedingMessageMetadata, diff --git a/packages/outbox-core/package.json b/packages/outbox-core/package.json index da440c86..bcc2a4ee 100644 --- a/packages/outbox-core/package.json +++ b/packages/outbox-core/package.json @@ -26,7 +26,7 @@ }, "dependencies": { "@lokalise/background-jobs-common": "^7.6.1", - "@lokalise/id-utils": "^2.2.0" + "uuidv7": "^1.0.2" }, "peerDependencies": { "@message-queue-toolkit/core": ">=14.0.0", From eb6aa5e6dfbf9d01b308cc595fb02968ccd49f06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Thu, 5 Sep 2024 16:20:18 +0200 Subject: [PATCH 12/31] Updated keywords. --- packages/outbox-core/package.json | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/packages/outbox-core/package.json b/packages/outbox-core/package.json index bcc2a4ee..3e4ebceb 100644 --- a/packages/outbox-core/package.json +++ b/packages/outbox-core/package.json @@ -55,9 +55,8 @@ "common", "utils", "notification", - "s3", - "store", - "claim-check" + "outbox", + "pattern" ], "files": ["README.md", "LICENSE", "dist/*"] } From 3333be54f67c1e2dcf47f208ca0c108c85db01ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Fri, 6 Sep 2024 12:42:08 +0200 Subject: [PATCH 13/31] AP-5046 PromisePool --- packages/outbox-core/README.md | 3 + packages/outbox-core/lib/outbox.spec.ts | 49 +++++++++++++- packages/outbox-core/lib/outbox.ts | 90 +++++++++++++++++++------ packages/outbox-core/package.json | 1 + 4 files changed, 120 insertions(+), 23 deletions(-) create mode 100644 packages/outbox-core/README.md diff --git a/packages/outbox-core/README.md b/packages/outbox-core/README.md new file mode 100644 index 00000000..5f2b9dfe --- /dev/null +++ b/packages/outbox-core/README.md @@ -0,0 +1,3 @@ +# outbox-core + +WIP diff --git a/packages/outbox-core/lib/outbox.spec.ts b/packages/outbox-core/lib/outbox.spec.ts index 6edee772..b1a11f51 100644 --- a/packages/outbox-core/lib/outbox.spec.ts +++ b/packages/outbox-core/lib/outbox.spec.ts @@ -12,7 +12,14 @@ import { import pino, { type Logger } from 'pino' import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' import { z } from 'zod' -import { type OutboxEntry, OutboxEventEmitter, OutboxProcessor, type OutboxStorage } from './outbox' +import { + InMemoryOutboxAccumulator, + type OutboxAccumulator, + type OutboxEntry, + OutboxEventEmitter, + OutboxProcessor, + type OutboxStorage, +} from './outbox' const TestEvents = { created: { @@ -84,6 +91,43 @@ class InMemoryOutboxStorage return Promise.resolve(outboxEntry) } + + public async flush(outboxAccumulator: OutboxAccumulator): Promise { + let successEntries = await outboxAccumulator.getEntries() + successEntries = successEntries.map((entry) => { + return { + ...entry, + status: 'SUCCESS', + updateAt: new Date(), + } + }) + this.entries = this.entries.map((entry) => { + const foundEntry = successEntries.find((successEntry) => successEntry.id === entry.id) + if (foundEntry) { + return foundEntry + } + return entry + }) + + let failedEntries = await outboxAccumulator.getFailedEntries() + failedEntries = failedEntries.map((entry) => { + return { + ...entry, + status: 'FAILED', + updateAt: new Date(), + retryCount: entry.retryCount + 1, + } + }) + this.entries = this.entries.map((entry) => { + const foundEntry = failedEntries.find((failedEntry) => failedEntry.id === entry.id) + if (foundEntry) { + return foundEntry + } + return entry + }) + + outboxAccumulator.clear() + } } const MAX_RETRY_COUNT = 2 @@ -108,8 +152,11 @@ describe('outbox', () => { outboxEventEmitter = new OutboxEventEmitter(outboxStorage) outboxProcessor = new OutboxProcessor( outboxStorage, + //@ts-ignore + new InMemoryOutboxAccumulator(), eventEmitter, MAX_RETRY_COUNT, + 1, ) }) diff --git a/packages/outbox-core/lib/outbox.ts b/packages/outbox-core/lib/outbox.ts index 8fc77087..12504e1e 100644 --- a/packages/outbox-core/lib/outbox.ts +++ b/packages/outbox-core/lib/outbox.ts @@ -6,6 +6,7 @@ import type { ConsumerMessageMetadataType, DomainEventEmitter, } from '@message-queue-toolkit/core' +import { PromisePool } from '@supercharge/promise-pool' import { uuidv7 } from 'uuidv7' /** @@ -28,6 +29,51 @@ export type OutboxEntry = { retryCount: number } +export interface OutboxAccumulator { + add(outboxEntry: OutboxEntry): Promise + + addFailure(outboxEntry: OutboxEntry): Promise + + getEntries(): Promise[]> + + getFailedEntries(): Promise[]> + + clear(): Promise +} + +export class InMemoryOutboxAccumulator + implements OutboxAccumulator +{ + private entries: OutboxEntry[] = [] + private failedEntries: OutboxEntry[] = [] + + public add(outboxEntry: OutboxEntry) { + this.entries = [...this.entries, outboxEntry] + + return Promise.resolve() + } + + public addFailure(outboxEntry: OutboxEntry) { + this.failedEntries = [...this.failedEntries, outboxEntry] + + return Promise.resolve() + } + + getEntries(): Promise[]> { + return Promise.resolve(this.entries) + } + + getFailedEntries(): Promise[]> { + return Promise.resolve(this.failedEntries) + } + + public clear(): Promise { + this.entries = [] + this.failedEntries = [] + return Promise.resolve() + } +} + /** * Takes care of persisting and retrieving outbox entries. * @@ -41,6 +87,8 @@ export interface OutboxStorage outboxEntry: OutboxEntry, ): Promise> + flush(outboxAccumulator: OutboxAccumulator): Promise + update( outboxEntry: OutboxEntry, ): Promise> @@ -61,35 +109,29 @@ export interface OutboxStorage export class OutboxProcessor { constructor( private readonly outboxStorage: OutboxStorage, + private readonly outboxAccumulator: OutboxAccumulator, private readonly eventEmitter: DomainEventEmitter, private readonly maxRetryCount: number, + private readonly emitBatchSize: number, ) {} public async processOutboxEntries(context: JobExecutionContext) { const entries = await this.outboxStorage.getEntries(this.maxRetryCount) - for (const entry of entries) { - try { - const updatedEntry = await this.outboxStorage.update({ - ...entry, - updated: new Date(), - status: 'ACKED', - }) - - await this.eventEmitter.emit(entry.event, entry.data, entry.precedingMessageMetadata) - - await this.outboxStorage.update({ ...updatedEntry, updated: new Date(), status: 'SUCCESS' }) - } catch (e) { - context.logger.error({ error: e }, 'Failed to process outbox entry.') - - await this.outboxStorage.update({ - ...entry, - updated: new Date(), - status: 'FAILED', - retryCount: entry.retryCount + 1, - }) - } - } + await PromisePool.for(entries) + .withConcurrency(this.emitBatchSize) + .process(async (entry) => { + try { + await this.eventEmitter.emit(entry.event, entry.data, entry.precedingMessageMetadata) + await this.outboxAccumulator.add(entry) + } catch (e) { + context.logger.error({ error: e }, 'Failed to process outbox entry.') + + await this.outboxAccumulator.addFailure(entry) + } + }) + + await this.outboxStorage.flush(this.outboxAccumulator) } } @@ -107,9 +149,11 @@ export class OutboxPeriodicJob< constructor( outboxStorage: OutboxStorage, + outboxAccumulator: OutboxAccumulator, eventEmitter: DomainEventEmitter, dependencies: PeriodicJobDependencies, maxRetryCount: number, + emitBatchSize: number, intervalInMs: number, ) { super( @@ -133,8 +177,10 @@ export class OutboxPeriodicJob< this.outboxProcessor = new OutboxProcessor( outboxStorage, + outboxAccumulator, eventEmitter, maxRetryCount, + emitBatchSize, ) } diff --git a/packages/outbox-core/package.json b/packages/outbox-core/package.json index 3e4ebceb..94aff34d 100644 --- a/packages/outbox-core/package.json +++ b/packages/outbox-core/package.json @@ -26,6 +26,7 @@ }, "dependencies": { "@lokalise/background-jobs-common": "^7.6.1", + "@supercharge/promise-pool": "^3.2.0", "uuidv7": "^1.0.2" }, "peerDependencies": { From 31522bbac6018dd3a612e0f2a684f7014aeea092 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Fri, 6 Sep 2024 12:46:48 +0200 Subject: [PATCH 14/31] Divided into files. --- packages/outbox-core/lib/accumulators.ts | 47 ++++++++++++ packages/outbox-core/lib/objects.ts | 25 +++++++ packages/outbox-core/lib/outbox.spec.ts | 13 +--- packages/outbox-core/lib/outbox.ts | 95 +----------------------- packages/outbox-core/lib/storage.ts | 30 ++++++++ 5 files changed, 108 insertions(+), 102 deletions(-) create mode 100644 packages/outbox-core/lib/accumulators.ts create mode 100644 packages/outbox-core/lib/objects.ts create mode 100644 packages/outbox-core/lib/storage.ts diff --git a/packages/outbox-core/lib/accumulators.ts b/packages/outbox-core/lib/accumulators.ts new file mode 100644 index 00000000..6360410c --- /dev/null +++ b/packages/outbox-core/lib/accumulators.ts @@ -0,0 +1,47 @@ +import type { CommonEventDefinition } from '@message-queue-toolkit/schemas' +import type { OutboxEntry } from './objects.ts' + +export interface OutboxAccumulator { + add(outboxEntry: OutboxEntry): Promise + + addFailure(outboxEntry: OutboxEntry): Promise + + getEntries(): Promise[]> + + getFailedEntries(): Promise[]> + + clear(): Promise +} + +export class InMemoryOutboxAccumulator + implements OutboxAccumulator +{ + private entries: OutboxEntry[] = [] + private failedEntries: OutboxEntry[] = [] + + public add(outboxEntry: OutboxEntry) { + this.entries = [...this.entries, outboxEntry] + + return Promise.resolve() + } + + public addFailure(outboxEntry: OutboxEntry) { + this.failedEntries = [...this.failedEntries, outboxEntry] + + return Promise.resolve() + } + + getEntries(): Promise[]> { + return Promise.resolve(this.entries) + } + + getFailedEntries(): Promise[]> { + return Promise.resolve(this.failedEntries) + } + + public clear(): Promise { + this.entries = [] + this.failedEntries = [] + return Promise.resolve() + } +} diff --git a/packages/outbox-core/lib/objects.ts b/packages/outbox-core/lib/objects.ts new file mode 100644 index 00000000..2ea6762b --- /dev/null +++ b/packages/outbox-core/lib/objects.ts @@ -0,0 +1,25 @@ +import type { + CommonEventDefinition, + CommonEventDefinitionPublisherSchemaType, + ConsumerMessageMetadataType, +} from '@message-queue-toolkit/schemas' + +/** + * Status of the outbox entry. + * - CREATED - entry was created and is waiting to be processed to publish actual event + * - ACKED - entry was picked up by outbox job and is being processed + * - SUCCESS - entry was successfully processed, event was published + * - FAILED - entry processing failed, it will be retried + */ +export type OutboxEntryStatus = 'CREATED' | 'ACKED' | 'SUCCESS' | 'FAILED' + +export type OutboxEntry = { + id: string + event: SupportedEvent + data: Omit, 'type'> + precedingMessageMetadata?: Partial + status: OutboxEntryStatus + created: Date + updated?: Date + retryCount: number +} diff --git a/packages/outbox-core/lib/outbox.spec.ts b/packages/outbox-core/lib/outbox.spec.ts index b1a11f51..e0ebbe13 100644 --- a/packages/outbox-core/lib/outbox.spec.ts +++ b/packages/outbox-core/lib/outbox.spec.ts @@ -12,14 +12,9 @@ import { import pino, { type Logger } from 'pino' import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' import { z } from 'zod' -import { - InMemoryOutboxAccumulator, - type OutboxAccumulator, - type OutboxEntry, - OutboxEventEmitter, - OutboxProcessor, - type OutboxStorage, -} from './outbox' +import { InMemoryOutboxAccumulator, type OutboxAccumulator } from './accumulators' +import type { OutboxEntry } from './objects' +import { OutboxEventEmitter, OutboxProcessor, type OutboxStorage } from './outbox' const TestEvents = { created: { @@ -125,8 +120,6 @@ class InMemoryOutboxStorage } return entry }) - - outboxAccumulator.clear() } } diff --git a/packages/outbox-core/lib/outbox.ts b/packages/outbox-core/lib/outbox.ts index 12504e1e..f7184793 100644 --- a/packages/outbox-core/lib/outbox.ts +++ b/packages/outbox-core/lib/outbox.ts @@ -8,98 +8,8 @@ import type { } from '@message-queue-toolkit/core' import { PromisePool } from '@supercharge/promise-pool' import { uuidv7 } from 'uuidv7' - -/** - * Status of the outbox entry. - * - CREATED - entry was created and is waiting to be processed to publish actual event - * - ACKED - entry was picked up by outbox job and is being processed - * - SUCCESS - entry was successfully processed, event was published - * - FAILED - entry processing failed, it will be retried - */ -export type OutboxEntryStatus = 'CREATED' | 'ACKED' | 'SUCCESS' | 'FAILED' - -export type OutboxEntry = { - id: string - event: SupportedEvent - data: Omit, 'type'> - precedingMessageMetadata?: Partial - status: OutboxEntryStatus - created: Date - updated?: Date - retryCount: number -} - -export interface OutboxAccumulator { - add(outboxEntry: OutboxEntry): Promise - - addFailure(outboxEntry: OutboxEntry): Promise - - getEntries(): Promise[]> - - getFailedEntries(): Promise[]> - - clear(): Promise -} - -export class InMemoryOutboxAccumulator - implements OutboxAccumulator -{ - private entries: OutboxEntry[] = [] - private failedEntries: OutboxEntry[] = [] - - public add(outboxEntry: OutboxEntry) { - this.entries = [...this.entries, outboxEntry] - - return Promise.resolve() - } - - public addFailure(outboxEntry: OutboxEntry) { - this.failedEntries = [...this.failedEntries, outboxEntry] - - return Promise.resolve() - } - - getEntries(): Promise[]> { - return Promise.resolve(this.entries) - } - - getFailedEntries(): Promise[]> { - return Promise.resolve(this.failedEntries) - } - - public clear(): Promise { - this.entries = [] - this.failedEntries = [] - return Promise.resolve() - } -} - -/** - * Takes care of persisting and retrieving outbox entries. - * - * Implementation is required: - * - in order to fulfill at least once delivery guarantee, persisting entries should be performed inside isolated transaction - * - to return entries in the order they were created (UUID7 is used to create entries in OutboxEventEmitter) - * - returned entries should not include the ones with 'SUCCESS' status - */ -export interface OutboxStorage { - create( - outboxEntry: OutboxEntry, - ): Promise> - - flush(outboxAccumulator: OutboxAccumulator): Promise - - update( - outboxEntry: OutboxEntry, - ): Promise> - - /** - * Returns entries in the order they were created. It doesn't return entries with 'SUCCESS' status. It doesn't return entries that have been retried more than maxRetryCount times. - * - * For example if entry retryCount is 1 and maxRetryCount is 1, entry MUST be returned. If it fails again then retry count is 2, in that case entry MUST NOT be returned. - */ - getEntries(maxRetryCount: number): Promise[]> -} +import type { OutboxAccumulator } from './accumulators' +import type { OutboxStorage } from './storage' /** * Main logic for handling outbox entries. @@ -132,6 +42,7 @@ export class OutboxProcessor { }) await this.outboxStorage.flush(this.outboxAccumulator) + await this.outboxAccumulator.clear() } } diff --git a/packages/outbox-core/lib/storage.ts b/packages/outbox-core/lib/storage.ts new file mode 100644 index 00000000..c78a6db5 --- /dev/null +++ b/packages/outbox-core/lib/storage.ts @@ -0,0 +1,30 @@ +import type { CommonEventDefinition } from '@message-queue-toolkit/schemas' +import type { OutboxAccumulator } from './accumulators' +import type { OutboxEntry } from './objects' + +/** + * Takes care of persisting and retrieving outbox entries. + * + * Implementation is required: + * - in order to fulfill at least once delivery guarantee, persisting entries should be performed inside isolated transaction + * - to return entries in the order they were created (UUID7 is used to create entries in OutboxEventEmitter) + * - returned entries should not include the ones with 'SUCCESS' status + */ +export interface OutboxStorage { + create( + outboxEntry: OutboxEntry, + ): Promise> + + flush(outboxAccumulator: OutboxAccumulator): Promise + + update( + outboxEntry: OutboxEntry, + ): Promise> + + /** + * Returns entries in the order they were created. It doesn't return entries with 'SUCCESS' status. It doesn't return entries that have been retried more than maxRetryCount times. + * + * For example if entry retryCount is 1 and maxRetryCount is 1, entry MUST be returned. If it fails again then retry count is 2, in that case entry MUST NOT be returned. + */ + getEntries(maxRetryCount: number): Promise[]> +} From be3f37bd3e9efd8d4d626653b617e11e81024305 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Fri, 6 Sep 2024 14:08:54 +0200 Subject: [PATCH 15/31] Removed update method from storage. --- packages/outbox-core/lib/outbox.spec.ts | 3 ++- packages/outbox-core/lib/storage.ts | 4 ---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/packages/outbox-core/lib/outbox.spec.ts b/packages/outbox-core/lib/outbox.spec.ts index e0ebbe13..99a4f95c 100644 --- a/packages/outbox-core/lib/outbox.spec.ts +++ b/packages/outbox-core/lib/outbox.spec.ts @@ -14,7 +14,8 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' import { z } from 'zod' import { InMemoryOutboxAccumulator, type OutboxAccumulator } from './accumulators' import type { OutboxEntry } from './objects' -import { OutboxEventEmitter, OutboxProcessor, type OutboxStorage } from './outbox' +import { OutboxEventEmitter, OutboxProcessor } from './outbox' +import type { OutboxStorage } from './storage' const TestEvents = { created: { diff --git a/packages/outbox-core/lib/storage.ts b/packages/outbox-core/lib/storage.ts index c78a6db5..617bf022 100644 --- a/packages/outbox-core/lib/storage.ts +++ b/packages/outbox-core/lib/storage.ts @@ -17,10 +17,6 @@ export interface OutboxStorage flush(outboxAccumulator: OutboxAccumulator): Promise - update( - outboxEntry: OutboxEntry, - ): Promise> - /** * Returns entries in the order they were created. It doesn't return entries with 'SUCCESS' status. It doesn't return entries that have been retried more than maxRetryCount times. * From d460478eb10252fa4fdfca3c9b58b7e7bd43f615 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Fri, 6 Sep 2024 14:13:26 +0200 Subject: [PATCH 16/31] JS doc for flush method. --- packages/outbox-core/lib/storage.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/packages/outbox-core/lib/storage.ts b/packages/outbox-core/lib/storage.ts index 617bf022..08493067 100644 --- a/packages/outbox-core/lib/storage.ts +++ b/packages/outbox-core/lib/storage.ts @@ -15,6 +15,12 @@ export interface OutboxStorage outboxEntry: OutboxEntry, ): Promise> + /** + * Responsible for taking all entries from the accumulator and persisting them in the storage. + * + * - Items that are in OutboxAccumulator::getEntries MUST be changed to SUCCESS status and `updatedAt` field needs to be set. + * - Items that are in OutboxAccumulator::getFailedEntries MUST be changed to FAILED status, `updatedAt` field needs to be set and retryCount needs to be incremented. + */ flush(outboxAccumulator: OutboxAccumulator): Promise /** From 1d0ef624e1a1d91cbbac04205642da83a3b72469 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Mon, 9 Sep 2024 09:21:50 +0200 Subject: [PATCH 17/31] AP-5046 Readme + extra comments. --- packages/outbox-core/README.md | 41 +++++++++++++++++++++++- packages/outbox-core/lib/accumulators.ts | 26 +++++++++++++++ 2 files changed, 66 insertions(+), 1 deletion(-) diff --git a/packages/outbox-core/README.md b/packages/outbox-core/README.md index 5f2b9dfe..61ca7352 100644 --- a/packages/outbox-core/README.md +++ b/packages/outbox-core/README.md @@ -1,3 +1,42 @@ # outbox-core -WIP +Main package that contains the core functionality of the Outbox pattern to provide "at least once" delivery semantics for messages. + +## Installation + +```bash +npm i -S outbox-core +``` + +## Usage + +To process outbox entries and emit them to the message queue, you need to create an instance of the `OutboxPeriodicJob` class: + +```typescript +import { OutboxPeriodicJob } from '@message-queue-toolkit/outbox-core'; + +const job = new OutboxPeriodicJob( + //Implementation of OutboxStorage interface, TODO: Point to other packages in message-queue-toolkit + outboxStorage, + //Default available accumulator for gathering outbox entries as the process job is progressing. + new InMemoryOutboxAccumulator(), //Default accumulator + //DomainEventEmitter, it will be used to publish events, see @message-queue-toolkit/core + eventEmitter, + //See PeriodicJobDependencies from @lokalise/background-jobs-common + dependencies, + //Retry count, how many times outbox entries should be retried to be processed + 3, + //emitBatchSize - how many outbox entries should be emitted at once + 10, + //internalInMs - how often the job should be executed, e.g. below it runs every 1sec + 1000 +) +``` + +Job will take care of processing outbox entries emitted by: +```typescript +const emitter = new OutboxEventEmitter( + //Same instance of outbox storage that is used by OutboxPeriodicJob + outboxStorage +) +``` diff --git a/packages/outbox-core/lib/accumulators.ts b/packages/outbox-core/lib/accumulators.ts index 6360410c..de0228a6 100644 --- a/packages/outbox-core/lib/accumulators.ts +++ b/packages/outbox-core/lib/accumulators.ts @@ -1,15 +1,41 @@ import type { CommonEventDefinition } from '@message-queue-toolkit/schemas' import type { OutboxEntry } from './objects.ts' +/** + * Accumulator is responsible for storing outbox entries in two cases: + * - successfully dispatched event + * - failed events + * + * Thanks to this, we can use aggregated result and persist in the storage in batches. + */ export interface OutboxAccumulator { + /** + * Accumulates successfully dispatched event. + * @param outboxEntry + */ add(outboxEntry: OutboxEntry): Promise + /** + * Accumulates failed event. + * @param outboxEntry + */ addFailure(outboxEntry: OutboxEntry): Promise + /** + * It's meant to be used by OutboxStorage::flush() to get all entries that should be persisted as successful ones. + */ getEntries(): Promise[]> + /** + * Also used by OutboxStorage::flush() to get all entries that should be persisted as failed ones. Such entries will be retried + their retryCount will be incremented. + */ getFailedEntries(): Promise[]> + /** + * After running clear(), no entries should be returned by getEntries() and getFailedEntries(). + * + * clear() is always called after flush() in OutboxStorage. + */ clear(): Promise } From fffca57a4476b6bd8693c7fcafaee1bb961dbfd5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Mon, 9 Sep 2024 09:27:45 +0200 Subject: [PATCH 18/31] AP-5046 Readme + extra comments. --- packages/outbox-core/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/outbox-core/README.md b/packages/outbox-core/README.md index 61ca7352..a2818067 100644 --- a/packages/outbox-core/README.md +++ b/packages/outbox-core/README.md @@ -5,7 +5,7 @@ Main package that contains the core functionality of the Outbox pattern to provi ## Installation ```bash -npm i -S outbox-core +npm i -S @message-queue-toolkit/outbox-core ``` ## Usage From d45a3747b17d214311975a41461fbab4cce0131c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Mon, 9 Sep 2024 09:29:55 +0200 Subject: [PATCH 19/31] AP-5046 Readme + extra comments. --- packages/outbox-core/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/outbox-core/README.md b/packages/outbox-core/README.md index a2818067..91e9c541 100644 --- a/packages/outbox-core/README.md +++ b/packages/outbox-core/README.md @@ -19,7 +19,7 @@ const job = new OutboxPeriodicJob( //Implementation of OutboxStorage interface, TODO: Point to other packages in message-queue-toolkit outboxStorage, //Default available accumulator for gathering outbox entries as the process job is progressing. - new InMemoryOutboxAccumulator(), //Default accumulator + new InMemoryOutboxAccumulator(), //DomainEventEmitter, it will be used to publish events, see @message-queue-toolkit/core eventEmitter, //See PeriodicJobDependencies from @lokalise/background-jobs-common From 037c8f1d2e51f3e05b74ccb824593465a5274a5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Mon, 9 Sep 2024 09:36:41 +0200 Subject: [PATCH 20/31] Updated comment. --- packages/outbox-core/lib/outbox.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/outbox-core/lib/outbox.ts b/packages/outbox-core/lib/outbox.ts index f7184793..d38fde00 100644 --- a/packages/outbox-core/lib/outbox.ts +++ b/packages/outbox-core/lib/outbox.ts @@ -49,7 +49,7 @@ export class OutboxProcessor { /** * Periodic job that processes outbox entries every "intervalInMs". If processing takes longer than defined interval, another subsequent job WILL NOT be started. * - * Each entry is ACKed, then event is published, and then entry is marked as SUCCESS. If processing fails, entry is marked as FAILED and will be retried. + * When event is published, and then entry is accumulated into SUCCESS group. If processing fails, entry is accumulated as FAILED and will be retried. * * Max retry count is defined by the user. */ From 29bc350437791d948056444ca8d927be16ea0cfd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Mon, 9 Sep 2024 10:39:13 +0200 Subject: [PATCH 21/31] AP-5046 config + dependencies object. --- packages/outbox-core/lib/outbox.spec.ts | 12 ++++--- packages/outbox-core/lib/outbox.ts | 48 ++++++++++++++----------- 2 files changed, 34 insertions(+), 26 deletions(-) diff --git a/packages/outbox-core/lib/outbox.spec.ts b/packages/outbox-core/lib/outbox.spec.ts index 99a4f95c..42348782 100644 --- a/packages/outbox-core/lib/outbox.spec.ts +++ b/packages/outbox-core/lib/outbox.spec.ts @@ -14,7 +14,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' import { z } from 'zod' import { InMemoryOutboxAccumulator, type OutboxAccumulator } from './accumulators' import type { OutboxEntry } from './objects' -import { OutboxEventEmitter, OutboxProcessor } from './outbox' +import { type OutboxDependencies, OutboxEventEmitter, OutboxProcessor } from './outbox' import type { OutboxStorage } from './storage' const TestEvents = { @@ -145,10 +145,12 @@ describe('outbox', () => { outboxStorage = new InMemoryOutboxStorage() outboxEventEmitter = new OutboxEventEmitter(outboxStorage) outboxProcessor = new OutboxProcessor( - outboxStorage, - //@ts-ignore - new InMemoryOutboxAccumulator(), - eventEmitter, + { + outboxStorage, + //@ts-ignore + outboxAccumulator: new InMemoryOutboxAccumulator(), + eventEmitter, + } satisfies OutboxDependencies, MAX_RETRY_COUNT, 1, ) diff --git a/packages/outbox-core/lib/outbox.ts b/packages/outbox-core/lib/outbox.ts index d38fde00..82880fc6 100644 --- a/packages/outbox-core/lib/outbox.ts +++ b/packages/outbox-core/lib/outbox.ts @@ -11,6 +11,18 @@ import { uuidv7 } from 'uuidv7' import type { OutboxAccumulator } from './accumulators' import type { OutboxStorage } from './storage' +export type OutboxDependencies = { + outboxStorage: OutboxStorage + outboxAccumulator: OutboxAccumulator + eventEmitter: DomainEventEmitter +} + +export type OutboxConfiguration = { + maxRetryCount: number + emitBatchSize: number + jobIntervalInMs: number +} + /** * Main logic for handling outbox entries. * @@ -18,31 +30,31 @@ import type { OutboxStorage } from './storage' */ export class OutboxProcessor { constructor( - private readonly outboxStorage: OutboxStorage, - private readonly outboxAccumulator: OutboxAccumulator, - private readonly eventEmitter: DomainEventEmitter, + private readonly outboxDependencies: OutboxDependencies, private readonly maxRetryCount: number, private readonly emitBatchSize: number, ) {} public async processOutboxEntries(context: JobExecutionContext) { - const entries = await this.outboxStorage.getEntries(this.maxRetryCount) + const { outboxStorage, eventEmitter, outboxAccumulator } = this.outboxDependencies + + const entries = await outboxStorage.getEntries(this.maxRetryCount) await PromisePool.for(entries) .withConcurrency(this.emitBatchSize) .process(async (entry) => { try { - await this.eventEmitter.emit(entry.event, entry.data, entry.precedingMessageMetadata) - await this.outboxAccumulator.add(entry) + await eventEmitter.emit(entry.event, entry.data, entry.precedingMessageMetadata) + await outboxAccumulator.add(entry) } catch (e) { context.logger.error({ error: e }, 'Failed to process outbox entry.') - await this.outboxAccumulator.addFailure(entry) + await outboxAccumulator.addFailure(entry) } }) - await this.outboxStorage.flush(this.outboxAccumulator) - await this.outboxAccumulator.clear() + await outboxStorage.flush(outboxAccumulator) + await outboxAccumulator.clear() } } @@ -59,19 +71,15 @@ export class OutboxPeriodicJob< private readonly outboxProcessor: OutboxProcessor constructor( - outboxStorage: OutboxStorage, - outboxAccumulator: OutboxAccumulator, - eventEmitter: DomainEventEmitter, + outboxDependencies: OutboxDependencies, + outboxConfiguration: OutboxConfiguration, dependencies: PeriodicJobDependencies, - maxRetryCount: number, - emitBatchSize: number, - intervalInMs: number, ) { super( { jobId: 'OutboxJob', schedule: { - intervalInMs: intervalInMs, + intervalInMs: outboxConfiguration.jobIntervalInMs, }, singleConsumerMode: { enabled: true, @@ -87,11 +95,9 @@ export class OutboxPeriodicJob< ) this.outboxProcessor = new OutboxProcessor( - outboxStorage, - outboxAccumulator, - eventEmitter, - maxRetryCount, - emitBatchSize, + outboxDependencies, + outboxConfiguration.maxRetryCount, + outboxConfiguration.emitBatchSize, ) } From 94bbe85387da9635e8d017c7b1dabd296c841d2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Mon, 9 Sep 2024 10:44:25 +0200 Subject: [PATCH 22/31] AP-5046 OutboxProcessorConfig. --- packages/outbox-core/lib/outbox.spec.ts | 3 +-- packages/outbox-core/lib/outbox.ts | 17 +++++++++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/packages/outbox-core/lib/outbox.spec.ts b/packages/outbox-core/lib/outbox.spec.ts index 42348782..7fdd67ed 100644 --- a/packages/outbox-core/lib/outbox.spec.ts +++ b/packages/outbox-core/lib/outbox.spec.ts @@ -151,8 +151,7 @@ describe('outbox', () => { outboxAccumulator: new InMemoryOutboxAccumulator(), eventEmitter, } satisfies OutboxDependencies, - MAX_RETRY_COUNT, - 1, + { maxRetryCount: MAX_RETRY_COUNT, emitBatchSize: 1 }, ) }) diff --git a/packages/outbox-core/lib/outbox.ts b/packages/outbox-core/lib/outbox.ts index 82880fc6..2f201e40 100644 --- a/packages/outbox-core/lib/outbox.ts +++ b/packages/outbox-core/lib/outbox.ts @@ -17,12 +17,15 @@ export type OutboxDependencies eventEmitter: DomainEventEmitter } -export type OutboxConfiguration = { +export type OutboxProcessorConfiguration = { maxRetryCount: number emitBatchSize: number - jobIntervalInMs: number } +export type OutboxConfiguration = { + jobIntervalInMs: number +} & OutboxProcessorConfiguration + /** * Main logic for handling outbox entries. * @@ -31,17 +34,16 @@ export type OutboxConfiguration = { export class OutboxProcessor { constructor( private readonly outboxDependencies: OutboxDependencies, - private readonly maxRetryCount: number, - private readonly emitBatchSize: number, + private readonly outboxProcessorConfiguration: OutboxProcessorConfiguration, ) {} public async processOutboxEntries(context: JobExecutionContext) { const { outboxStorage, eventEmitter, outboxAccumulator } = this.outboxDependencies - const entries = await outboxStorage.getEntries(this.maxRetryCount) + const entries = await outboxStorage.getEntries(this.outboxProcessorConfiguration.maxRetryCount) await PromisePool.for(entries) - .withConcurrency(this.emitBatchSize) + .withConcurrency(this.outboxProcessorConfiguration.emitBatchSize) .process(async (entry) => { try { await eventEmitter.emit(entry.event, entry.data, entry.precedingMessageMetadata) @@ -96,8 +98,7 @@ export class OutboxPeriodicJob< this.outboxProcessor = new OutboxProcessor( outboxDependencies, - outboxConfiguration.maxRetryCount, - outboxConfiguration.emitBatchSize, + outboxConfiguration, ) } From db1e1e3bb779341d58aaf4088a13e259333329c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Mon, 9 Sep 2024 12:36:16 +0200 Subject: [PATCH 23/31] Prefilter entries if any exist in accumulator. --- packages/outbox-core/lib/outbox.spec.ts | 32 ++++++++++++++++++++++++- packages/outbox-core/lib/outbox.ts | 7 +++++- 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/packages/outbox-core/lib/outbox.spec.ts b/packages/outbox-core/lib/outbox.spec.ts index 7fdd67ed..f5d6c450 100644 --- a/packages/outbox-core/lib/outbox.spec.ts +++ b/packages/outbox-core/lib/outbox.spec.ts @@ -131,6 +131,7 @@ describe('outbox', () => { let eventEmitter: DomainEventEmitter let outboxEventEmitter: OutboxEventEmitter let outboxStorage: InMemoryOutboxStorage + let inMemoryOutboxAccumulator: InMemoryOutboxAccumulator beforeEach(() => { eventEmitter = new DomainEventEmitter({ @@ -144,11 +145,12 @@ describe('outbox', () => { outboxStorage = new InMemoryOutboxStorage() outboxEventEmitter = new OutboxEventEmitter(outboxStorage) + inMemoryOutboxAccumulator = new InMemoryOutboxAccumulator() outboxProcessor = new OutboxProcessor( { outboxStorage, //@ts-ignore - outboxAccumulator: new InMemoryOutboxAccumulator(), + outboxAccumulator: inMemoryOutboxAccumulator, eventEmitter, } satisfies OutboxDependencies, { maxRetryCount: MAX_RETRY_COUNT, emitBatchSize: 1 }, @@ -294,4 +296,32 @@ describe('outbox', () => { }, ]) }) + + it("doesn't emit event again if it's already present in accumulator", async () => { + const mockedEventEmitter = vi.spyOn(eventEmitter, 'emit') + + await outboxEventEmitter.emit(TestEvents.created, createdEventPayload, { + correlationId: randomUUID(), + }) + + await inMemoryOutboxAccumulator.add(outboxStorage.entries[0]) + + await outboxProcessor.processOutboxEntries({ + logger: TestLogger, + reqId: randomUUID(), + executorId: randomUUID(), + }) + + //We pretended that event was emitted in previous run by adding state to accumulator + expect(mockedEventEmitter).toHaveBeenCalledTimes(0) + + //But after the loop, if successful, it should be marked as success anyway + expect(outboxStorage.entries).toMatchObject([ + { + status: 'SUCCESS', + }, + ]) + //And accumulator should be cleared + expect(await inMemoryOutboxAccumulator.getEntries()).toHaveLength(0) + }) }) diff --git a/packages/outbox-core/lib/outbox.ts b/packages/outbox-core/lib/outbox.ts index 2f201e40..bcb11f6e 100644 --- a/packages/outbox-core/lib/outbox.ts +++ b/packages/outbox-core/lib/outbox.ts @@ -42,7 +42,12 @@ export class OutboxProcessor { const entries = await outboxStorage.getEntries(this.outboxProcessorConfiguration.maxRetryCount) - await PromisePool.for(entries) + const currentEntriesInAccumulator = new Set( + (await outboxAccumulator.getEntries()).map((entry) => entry.id), + ) + const filteredEntries = entries.filter((entry) => !currentEntriesInAccumulator.has(entry.id)) + + await PromisePool.for(filteredEntries) .withConcurrency(this.outboxProcessorConfiguration.emitBatchSize) .process(async (entry) => { try { From 34dff000250b742f24a715bec953ee259064a3ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Mon, 9 Sep 2024 12:52:17 +0200 Subject: [PATCH 24/31] AP-5046 createEntry. --- packages/outbox-core/lib/outbox.spec.ts | 2 +- packages/outbox-core/lib/outbox.ts | 2 +- packages/outbox-core/lib/storage.ts | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/outbox-core/lib/outbox.spec.ts b/packages/outbox-core/lib/outbox.spec.ts index f5d6c450..34b94c45 100644 --- a/packages/outbox-core/lib/outbox.spec.ts +++ b/packages/outbox-core/lib/outbox.spec.ts @@ -59,7 +59,7 @@ class InMemoryOutboxStorage { public entries: OutboxEntry[] = [] - create( + createEntry( outboxEntry: OutboxEntry, ): Promise> { this.entries = [...this.entries, outboxEntry] diff --git a/packages/outbox-core/lib/outbox.ts b/packages/outbox-core/lib/outbox.ts index bcb11f6e..52f3b72e 100644 --- a/packages/outbox-core/lib/outbox.ts +++ b/packages/outbox-core/lib/outbox.ts @@ -126,7 +126,7 @@ export class OutboxEventEmitter data: Omit, 'type'>, precedingMessageMetadata?: Partial, ) { - await this.storage.create({ + await this.storage.createEntry({ id: uuidv7(), event: supportedEvent, data, diff --git a/packages/outbox-core/lib/storage.ts b/packages/outbox-core/lib/storage.ts index 08493067..c98ee764 100644 --- a/packages/outbox-core/lib/storage.ts +++ b/packages/outbox-core/lib/storage.ts @@ -11,7 +11,7 @@ import type { OutboxEntry } from './objects' * - returned entries should not include the ones with 'SUCCESS' status */ export interface OutboxStorage { - create( + createEntry( outboxEntry: OutboxEntry, ): Promise> From e7a80decfdcc78b153b08047660e1d490b622356 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Mon, 9 Sep 2024 14:50:43 +0200 Subject: [PATCH 25/31] Extra exports. --- packages/outbox-core/index.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/outbox-core/index.ts b/packages/outbox-core/index.ts index cc01044a..a0ff82c1 100644 --- a/packages/outbox-core/index.ts +++ b/packages/outbox-core/index.ts @@ -1 +1,4 @@ export * from './lib/outbox' +export * from './lib/objects' +export * from './lib/accumulators' +export * from './lib/storage' From b79bc83493c60877748cf12a66bf3e24feb94d60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Wed, 11 Sep 2024 08:51:11 +0200 Subject: [PATCH 26/31] Extended readme. --- packages/outbox-core/README.md | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/packages/outbox-core/README.md b/packages/outbox-core/README.md index 91e9c541..a1665a9d 100644 --- a/packages/outbox-core/README.md +++ b/packages/outbox-core/README.md @@ -35,8 +35,29 @@ const job = new OutboxPeriodicJob( Job will take care of processing outbox entries emitted by: ```typescript -const emitter = new OutboxEventEmitter( +import { + type CommonEventDefinition, + enrichMessageSchemaWithBase, +} from '@message-queue-toolkit/schemas' + +const MyEvents = { + created: { + ...enrichMessageSchemaWithBase( + 'entity.created', + z.object({ + message: z.string(), + }), + ), + }, +} as const satisfies Record + +type MySupportedEvents = (typeof TestEvents)[keyof typeof TestEvents][] + +const emitter = new OutboxEventEmitter( //Same instance of outbox storage that is used by OutboxPeriodicJob outboxStorage ) + +//It pushes the entry to the storage, later will be picked up by the OutboxPeriodicJob +await emitter.emit(/* args */) ``` From e35c33480fdc1447f04c95f9950c118e55a8d0ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Wed, 11 Sep 2024 08:52:18 +0200 Subject: [PATCH 27/31] Updated comment. --- packages/outbox-core/lib/accumulators.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/outbox-core/lib/accumulators.ts b/packages/outbox-core/lib/accumulators.ts index de0228a6..6cd1b4cb 100644 --- a/packages/outbox-core/lib/accumulators.ts +++ b/packages/outbox-core/lib/accumulators.ts @@ -22,12 +22,12 @@ export interface OutboxAccumulator): Promise /** - * It's meant to be used by OutboxStorage::flush() to get all entries that should be persisted as successful ones. + * Returns all entries that should be persisted as successful ones. */ getEntries(): Promise[]> /** - * Also used by OutboxStorage::flush() to get all entries that should be persisted as failed ones. Such entries will be retried + their retryCount will be incremented. + * Returns all entries that should be persisted as failed ones. Such entries will be retried + their retryCount will be incremented. */ getFailedEntries(): Promise[]> From 23d380c1bbe8ab6d909b595e64971d1a8862c479 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Wed, 11 Sep 2024 08:54:49 +0200 Subject: [PATCH 28/31] Push to array. --- packages/outbox-core/lib/accumulators.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/outbox-core/lib/accumulators.ts b/packages/outbox-core/lib/accumulators.ts index 6cd1b4cb..d5d06081 100644 --- a/packages/outbox-core/lib/accumulators.ts +++ b/packages/outbox-core/lib/accumulators.ts @@ -46,13 +46,13 @@ export class InMemoryOutboxAccumulator[] = [] public add(outboxEntry: OutboxEntry) { - this.entries = [...this.entries, outboxEntry] + this.entries.push(outboxEntry) return Promise.resolve() } public addFailure(outboxEntry: OutboxEntry) { - this.failedEntries = [...this.failedEntries, outboxEntry] + this.failedEntries.push(outboxEntry) return Promise.resolve() } From 6792331b2730147a8fcb44d2c259feaa646849cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Wed, 11 Sep 2024 08:58:04 +0200 Subject: [PATCH 29/31] Dedicated directory for tests. --- .../outbox-core/test/InMemoryOutboxStorage.ts | 74 ++++++++++++++++++ .../outbox-core/{lib => test}/outbox.spec.ts | 77 +------------------ 2 files changed, 77 insertions(+), 74 deletions(-) create mode 100644 packages/outbox-core/test/InMemoryOutboxStorage.ts rename packages/outbox-core/{lib => test}/outbox.spec.ts (76%) diff --git a/packages/outbox-core/test/InMemoryOutboxStorage.ts b/packages/outbox-core/test/InMemoryOutboxStorage.ts new file mode 100644 index 00000000..5f7ed8b5 --- /dev/null +++ b/packages/outbox-core/test/InMemoryOutboxStorage.ts @@ -0,0 +1,74 @@ +import type { CommonEventDefinition } from '@message-queue-toolkit/schemas' +import type { OutboxAccumulator } from '../lib/accumulators' +import type { OutboxEntry } from '../lib/objects' +import type { OutboxStorage } from '../lib/storage' + +export class InMemoryOutboxStorage + implements OutboxStorage +{ + public entries: OutboxEntry[] = [] + + createEntry( + outboxEntry: OutboxEntry, + ): Promise> { + this.entries = [...this.entries, outboxEntry] + + return Promise.resolve(outboxEntry) + } + + getEntries(maxRetryCount: number): Promise[]> { + const entries = this.entries.filter((entry) => { + return entry.status !== 'SUCCESS' && entry.retryCount <= maxRetryCount + }) + + return Promise.resolve(entries) + } + + update( + outboxEntry: OutboxEntry, + ): Promise> { + this.entries = this.entries.map((entry) => { + if (entry.id === outboxEntry.id) { + return outboxEntry + } + return entry + }) + + return Promise.resolve(outboxEntry) + } + + public async flush(outboxAccumulator: OutboxAccumulator): Promise { + let successEntries = await outboxAccumulator.getEntries() + successEntries = successEntries.map((entry) => { + return { + ...entry, + status: 'SUCCESS', + updateAt: new Date(), + } + }) + this.entries = this.entries.map((entry) => { + const foundEntry = successEntries.find((successEntry) => successEntry.id === entry.id) + if (foundEntry) { + return foundEntry + } + return entry + }) + + let failedEntries = await outboxAccumulator.getFailedEntries() + failedEntries = failedEntries.map((entry) => { + return { + ...entry, + status: 'FAILED', + updateAt: new Date(), + retryCount: entry.retryCount + 1, + } + }) + this.entries = this.entries.map((entry) => { + const foundEntry = failedEntries.find((failedEntry) => failedEntry.id === entry.id) + if (foundEntry) { + return foundEntry + } + return entry + }) + } +} diff --git a/packages/outbox-core/lib/outbox.spec.ts b/packages/outbox-core/test/outbox.spec.ts similarity index 76% rename from packages/outbox-core/lib/outbox.spec.ts rename to packages/outbox-core/test/outbox.spec.ts index 34b94c45..ec1950c5 100644 --- a/packages/outbox-core/lib/outbox.spec.ts +++ b/packages/outbox-core/test/outbox.spec.ts @@ -12,10 +12,9 @@ import { import pino, { type Logger } from 'pino' import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' import { z } from 'zod' -import { InMemoryOutboxAccumulator, type OutboxAccumulator } from './accumulators' -import type { OutboxEntry } from './objects' -import { type OutboxDependencies, OutboxEventEmitter, OutboxProcessor } from './outbox' -import type { OutboxStorage } from './storage' +import { InMemoryOutboxAccumulator } from '../lib/accumulators' +import { type OutboxDependencies, OutboxEventEmitter, OutboxProcessor } from '../lib/outbox' +import { InMemoryOutboxStorage } from './InMemoryOutboxStorage' const TestEvents = { created: { @@ -54,76 +53,6 @@ const createdEventPayload: CommonEventDefinitionPublisherSchemaType - implements OutboxStorage -{ - public entries: OutboxEntry[] = [] - - createEntry( - outboxEntry: OutboxEntry, - ): Promise> { - this.entries = [...this.entries, outboxEntry] - - return Promise.resolve(outboxEntry) - } - - getEntries(maxRetryCount: number): Promise[]> { - const entries = this.entries.filter((entry) => { - return entry.status !== 'SUCCESS' && entry.retryCount <= maxRetryCount - }) - - return Promise.resolve(entries) - } - - update( - outboxEntry: OutboxEntry, - ): Promise> { - this.entries = this.entries.map((entry) => { - if (entry.id === outboxEntry.id) { - return outboxEntry - } - return entry - }) - - return Promise.resolve(outboxEntry) - } - - public async flush(outboxAccumulator: OutboxAccumulator): Promise { - let successEntries = await outboxAccumulator.getEntries() - successEntries = successEntries.map((entry) => { - return { - ...entry, - status: 'SUCCESS', - updateAt: new Date(), - } - }) - this.entries = this.entries.map((entry) => { - const foundEntry = successEntries.find((successEntry) => successEntry.id === entry.id) - if (foundEntry) { - return foundEntry - } - return entry - }) - - let failedEntries = await outboxAccumulator.getFailedEntries() - failedEntries = failedEntries.map((entry) => { - return { - ...entry, - status: 'FAILED', - updateAt: new Date(), - retryCount: entry.retryCount + 1, - } - }) - this.entries = this.entries.map((entry) => { - const foundEntry = failedEntries.find((failedEntry) => failedEntry.id === entry.id) - if (foundEntry) { - return foundEntry - } - return entry - }) - } -} - const MAX_RETRY_COUNT = 2 describe('outbox', () => { From 9849e68eca63ce0ddd7f913ee77d34439568957a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Wed, 11 Sep 2024 09:08:09 +0200 Subject: [PATCH 30/31] AP-5046 Skip filtering if empty entries. --- packages/outbox-core/lib/outbox.ts | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/packages/outbox-core/lib/outbox.ts b/packages/outbox-core/lib/outbox.ts index 52f3b72e..ee944b9f 100644 --- a/packages/outbox-core/lib/outbox.ts +++ b/packages/outbox-core/lib/outbox.ts @@ -1,5 +1,5 @@ -import { AbstractPeriodicJob, type JobExecutionContext } from '@lokalise/background-jobs-common' import type { PeriodicJobDependencies } from '@lokalise/background-jobs-common' +import { AbstractPeriodicJob, type JobExecutionContext } from '@lokalise/background-jobs-common' import type { CommonEventDefinition, CommonEventDefinitionPublisherSchemaType, @@ -9,6 +9,7 @@ import type { import { PromisePool } from '@supercharge/promise-pool' import { uuidv7 } from 'uuidv7' import type { OutboxAccumulator } from './accumulators' +import type { OutboxEntry } from './objects' import type { OutboxStorage } from './storage' export type OutboxDependencies = { @@ -42,10 +43,8 @@ export class OutboxProcessor { const entries = await outboxStorage.getEntries(this.outboxProcessorConfiguration.maxRetryCount) - const currentEntriesInAccumulator = new Set( - (await outboxAccumulator.getEntries()).map((entry) => entry.id), - ) - const filteredEntries = entries.filter((entry) => !currentEntriesInAccumulator.has(entry.id)) + const filteredEntries = + entries.length === 0 ? entries : await this.getFilteredEntries(entries, outboxAccumulator) await PromisePool.for(filteredEntries) .withConcurrency(this.outboxProcessorConfiguration.emitBatchSize) @@ -63,6 +62,16 @@ export class OutboxProcessor { await outboxStorage.flush(outboxAccumulator) await outboxAccumulator.clear() } + + private async getFilteredEntries( + entries: OutboxEntry[], + outboxAccumulator: OutboxAccumulator, + ) { + const currentEntriesInAccumulator = new Set( + (await outboxAccumulator.getEntries()).map((entry) => entry.id), + ) + return entries.filter((entry) => !currentEntriesInAccumulator.has(entry.id)) + } } /** From 239914ea053bb0f449b1d1803377662a064ecd6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Wed, 11 Sep 2024 09:43:11 +0200 Subject: [PATCH 31/31] AP-5046 Updated package json + ignore job definition cc. --- packages/outbox-core/lib/outbox.ts | 2 ++ packages/outbox-core/package.json | 4 +--- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/outbox-core/lib/outbox.ts b/packages/outbox-core/lib/outbox.ts index ee944b9f..4d7b7079 100644 --- a/packages/outbox-core/lib/outbox.ts +++ b/packages/outbox-core/lib/outbox.ts @@ -81,6 +81,7 @@ export class OutboxProcessor { * * Max retry count is defined by the user. */ +/* c8 ignore start */ export class OutboxPeriodicJob< SupportedEvents extends CommonEventDefinition[], > extends AbstractPeriodicJob { @@ -120,6 +121,7 @@ export class OutboxPeriodicJob< await this.outboxProcessor.processOutboxEntries(context) } } +/* c8 ignore stop */ export class OutboxEventEmitter { constructor(private storage: OutboxStorage) {} diff --git a/packages/outbox-core/package.json b/packages/outbox-core/package.json index 94aff34d..99972529 100644 --- a/packages/outbox-core/package.json +++ b/packages/outbox-core/package.json @@ -17,11 +17,9 @@ "build:release": "del-cli dist && del-cli coverage && npm run lint && tsc --project tsconfig.release.json", "test": "vitest", "test:coverage": "npm test -- --coverage", - "test:ci": "npm run docker:start:dev && npm run test:coverage && npm run docker:stop:dev", + "test:ci": "npm run test:coverage", "lint": "biome check . && tsc --project tsconfig.json --noEmit", "lint:fix": "biome check --write .", - "docker:start:dev": "docker compose up -d", - "docker:stop:dev": "docker compose down", "prepublishOnly": "npm run build:release" }, "dependencies": {