From 1faf89b548eb4355789bf73ab6be48edfc82fbd2 Mon Sep 17 00:00:00 2001 From: Kamil Wylegala Date: Mon, 16 Sep 2024 10:02:14 +0200 Subject: [PATCH] AP-5046 outbox-core package for transactional outbox pattern (#204) --- packages/outbox-core/README.md | 63 +++++ packages/outbox-core/index.ts | 4 + packages/outbox-core/lib/accumulators.ts | 73 +++++ packages/outbox-core/lib/objects.ts | 25 ++ packages/outbox-core/lib/outbox.ts | 150 ++++++++++ packages/outbox-core/lib/storage.ts | 32 +++ packages/outbox-core/package.json | 61 +++++ .../outbox-core/test/InMemoryOutboxStorage.ts | 74 +++++ packages/outbox-core/test/outbox.spec.ts | 256 ++++++++++++++++++ packages/outbox-core/tsconfig.json | 27 ++ packages/outbox-core/tsconfig.release.json | 5 + packages/outbox-core/vitest.config.mts | 23 ++ 12 files changed, 793 insertions(+) create mode 100644 packages/outbox-core/README.md create mode 100644 packages/outbox-core/index.ts create mode 100644 packages/outbox-core/lib/accumulators.ts create mode 100644 packages/outbox-core/lib/objects.ts create mode 100644 packages/outbox-core/lib/outbox.ts create mode 100644 packages/outbox-core/lib/storage.ts create mode 100644 packages/outbox-core/package.json create mode 100644 packages/outbox-core/test/InMemoryOutboxStorage.ts create mode 100644 packages/outbox-core/test/outbox.spec.ts create mode 100644 packages/outbox-core/tsconfig.json create mode 100644 packages/outbox-core/tsconfig.release.json create mode 100644 packages/outbox-core/vitest.config.mts diff --git a/packages/outbox-core/README.md b/packages/outbox-core/README.md new file mode 100644 index 00000000..a1665a9d --- /dev/null +++ b/packages/outbox-core/README.md @@ -0,0 +1,63 @@ +# outbox-core + +Main package that contains the core functionality of the Outbox pattern to provide "at least once" delivery semantics for messages. + +## Installation + +```bash +npm i -S @message-queue-toolkit/outbox-core +``` + +## Usage + +To process outbox entries and emit them to the message queue, you need to create an instance of the `OutboxPeriodicJob` class: + +```typescript +import { OutboxPeriodicJob } from '@message-queue-toolkit/outbox-core'; + +const job = new OutboxPeriodicJob( + //Implementation of OutboxStorage interface, TODO: Point to other packages in message-queue-toolkit + outboxStorage, + //Default available accumulator for gathering outbox entries as the process job is progressing. + new InMemoryOutboxAccumulator(), + //DomainEventEmitter, it will be used to publish events, see @message-queue-toolkit/core + eventEmitter, + //See PeriodicJobDependencies from @lokalise/background-jobs-common + dependencies, + //Retry count, how many times outbox entries should be retried to be processed + 3, + //emitBatchSize - how many outbox entries should be emitted at once + 10, + //internalInMs - how often the job should be executed, e.g. below it runs every 1sec + 1000 +) +``` + +Job will take care of processing outbox entries emitted by: +```typescript +import { + type CommonEventDefinition, + enrichMessageSchemaWithBase, +} from '@message-queue-toolkit/schemas' + +const MyEvents = { + created: { + ...enrichMessageSchemaWithBase( + 'entity.created', + z.object({ + message: z.string(), + }), + ), + }, +} as const satisfies Record + +type MySupportedEvents = (typeof TestEvents)[keyof typeof TestEvents][] + +const emitter = new OutboxEventEmitter( + //Same instance of outbox storage that is used by OutboxPeriodicJob + outboxStorage +) + +//It pushes the entry to the storage, later will be picked up by the OutboxPeriodicJob +await emitter.emit(/* args */) +``` diff --git a/packages/outbox-core/index.ts b/packages/outbox-core/index.ts new file mode 100644 index 00000000..a0ff82c1 --- /dev/null +++ b/packages/outbox-core/index.ts @@ -0,0 +1,4 @@ +export * from './lib/outbox' +export * from './lib/objects' +export * from './lib/accumulators' +export * from './lib/storage' diff --git a/packages/outbox-core/lib/accumulators.ts b/packages/outbox-core/lib/accumulators.ts new file mode 100644 index 00000000..d5d06081 --- /dev/null +++ b/packages/outbox-core/lib/accumulators.ts @@ -0,0 +1,73 @@ +import type { CommonEventDefinition } from '@message-queue-toolkit/schemas' +import type { OutboxEntry } from './objects.ts' + +/** + * Accumulator is responsible for storing outbox entries in two cases: + * - successfully dispatched event + * - failed events + * + * Thanks to this, we can use aggregated result and persist in the storage in batches. + */ +export interface OutboxAccumulator { + /** + * Accumulates successfully dispatched event. + * @param outboxEntry + */ + add(outboxEntry: OutboxEntry): Promise + + /** + * Accumulates failed event. + * @param outboxEntry + */ + addFailure(outboxEntry: OutboxEntry): Promise + + /** + * Returns all entries that should be persisted as successful ones. + */ + getEntries(): Promise[]> + + /** + * Returns all entries that should be persisted as failed ones. Such entries will be retried + their retryCount will be incremented. + */ + getFailedEntries(): Promise[]> + + /** + * After running clear(), no entries should be returned by getEntries() and getFailedEntries(). + * + * clear() is always called after flush() in OutboxStorage. + */ + clear(): Promise +} + +export class InMemoryOutboxAccumulator + implements OutboxAccumulator +{ + private entries: OutboxEntry[] = [] + private failedEntries: OutboxEntry[] = [] + + public add(outboxEntry: OutboxEntry) { + this.entries.push(outboxEntry) + + return Promise.resolve() + } + + public addFailure(outboxEntry: OutboxEntry) { + this.failedEntries.push(outboxEntry) + + return Promise.resolve() + } + + getEntries(): Promise[]> { + return Promise.resolve(this.entries) + } + + getFailedEntries(): Promise[]> { + return Promise.resolve(this.failedEntries) + } + + public clear(): Promise { + this.entries = [] + this.failedEntries = [] + return Promise.resolve() + } +} diff --git a/packages/outbox-core/lib/objects.ts b/packages/outbox-core/lib/objects.ts new file mode 100644 index 00000000..2ea6762b --- /dev/null +++ b/packages/outbox-core/lib/objects.ts @@ -0,0 +1,25 @@ +import type { + CommonEventDefinition, + CommonEventDefinitionPublisherSchemaType, + ConsumerMessageMetadataType, +} from '@message-queue-toolkit/schemas' + +/** + * Status of the outbox entry. + * - CREATED - entry was created and is waiting to be processed to publish actual event + * - ACKED - entry was picked up by outbox job and is being processed + * - SUCCESS - entry was successfully processed, event was published + * - FAILED - entry processing failed, it will be retried + */ +export type OutboxEntryStatus = 'CREATED' | 'ACKED' | 'SUCCESS' | 'FAILED' + +export type OutboxEntry = { + id: string + event: SupportedEvent + data: Omit, 'type'> + precedingMessageMetadata?: Partial + status: OutboxEntryStatus + created: Date + updated?: Date + retryCount: number +} diff --git a/packages/outbox-core/lib/outbox.ts b/packages/outbox-core/lib/outbox.ts new file mode 100644 index 00000000..4d7b7079 --- /dev/null +++ b/packages/outbox-core/lib/outbox.ts @@ -0,0 +1,150 @@ +import type { PeriodicJobDependencies } from '@lokalise/background-jobs-common' +import { AbstractPeriodicJob, type JobExecutionContext } from '@lokalise/background-jobs-common' +import type { + CommonEventDefinition, + CommonEventDefinitionPublisherSchemaType, + ConsumerMessageMetadataType, + DomainEventEmitter, +} from '@message-queue-toolkit/core' +import { PromisePool } from '@supercharge/promise-pool' +import { uuidv7 } from 'uuidv7' +import type { OutboxAccumulator } from './accumulators' +import type { OutboxEntry } from './objects' +import type { OutboxStorage } from './storage' + +export type OutboxDependencies = { + outboxStorage: OutboxStorage + outboxAccumulator: OutboxAccumulator + eventEmitter: DomainEventEmitter +} + +export type OutboxProcessorConfiguration = { + maxRetryCount: number + emitBatchSize: number +} + +export type OutboxConfiguration = { + jobIntervalInMs: number +} & OutboxProcessorConfiguration + +/** + * Main logic for handling outbox entries. + * + * If entry is rejected, it is NOT going to be handled during the same execution. Next execution will pick it up. + */ +export class OutboxProcessor { + constructor( + private readonly outboxDependencies: OutboxDependencies, + private readonly outboxProcessorConfiguration: OutboxProcessorConfiguration, + ) {} + + public async processOutboxEntries(context: JobExecutionContext) { + const { outboxStorage, eventEmitter, outboxAccumulator } = this.outboxDependencies + + const entries = await outboxStorage.getEntries(this.outboxProcessorConfiguration.maxRetryCount) + + const filteredEntries = + entries.length === 0 ? entries : await this.getFilteredEntries(entries, outboxAccumulator) + + await PromisePool.for(filteredEntries) + .withConcurrency(this.outboxProcessorConfiguration.emitBatchSize) + .process(async (entry) => { + try { + await eventEmitter.emit(entry.event, entry.data, entry.precedingMessageMetadata) + await outboxAccumulator.add(entry) + } catch (e) { + context.logger.error({ error: e }, 'Failed to process outbox entry.') + + await outboxAccumulator.addFailure(entry) + } + }) + + await outboxStorage.flush(outboxAccumulator) + await outboxAccumulator.clear() + } + + private async getFilteredEntries( + entries: OutboxEntry[], + outboxAccumulator: OutboxAccumulator, + ) { + const currentEntriesInAccumulator = new Set( + (await outboxAccumulator.getEntries()).map((entry) => entry.id), + ) + return entries.filter((entry) => !currentEntriesInAccumulator.has(entry.id)) + } +} + +/** + * Periodic job that processes outbox entries every "intervalInMs". If processing takes longer than defined interval, another subsequent job WILL NOT be started. + * + * When event is published, and then entry is accumulated into SUCCESS group. If processing fails, entry is accumulated as FAILED and will be retried. + * + * Max retry count is defined by the user. + */ +/* c8 ignore start */ +export class OutboxPeriodicJob< + SupportedEvents extends CommonEventDefinition[], +> extends AbstractPeriodicJob { + private readonly outboxProcessor: OutboxProcessor + + constructor( + outboxDependencies: OutboxDependencies, + outboxConfiguration: OutboxConfiguration, + dependencies: PeriodicJobDependencies, + ) { + super( + { + jobId: 'OutboxJob', + schedule: { + intervalInMs: outboxConfiguration.jobIntervalInMs, + }, + singleConsumerMode: { + enabled: true, + }, + }, + { + redis: dependencies.redis, + logger: dependencies.logger, + transactionObservabilityManager: dependencies.transactionObservabilityManager, + errorReporter: dependencies.errorReporter, + scheduler: dependencies.scheduler, + }, + ) + + this.outboxProcessor = new OutboxProcessor( + outboxDependencies, + outboxConfiguration, + ) + } + + protected async processInternal(context: JobExecutionContext): Promise { + await this.outboxProcessor.processOutboxEntries(context) + } +} +/* c8 ignore stop */ + +export class OutboxEventEmitter { + constructor(private storage: OutboxStorage) {} + + /** + * Persists outbox entry in persistence layer, later it will be picked up by outbox job. + * @param supportedEvent + * @param data + * @param precedingMessageMetadata + */ + public async emit( + supportedEvent: SupportedEvent, + data: Omit, 'type'>, + precedingMessageMetadata?: Partial, + ) { + await this.storage.createEntry({ + id: uuidv7(), + event: supportedEvent, + data, + precedingMessageMetadata, + status: 'CREATED', + created: new Date(), + retryCount: 0, + }) + } +} diff --git a/packages/outbox-core/lib/storage.ts b/packages/outbox-core/lib/storage.ts new file mode 100644 index 00000000..c98ee764 --- /dev/null +++ b/packages/outbox-core/lib/storage.ts @@ -0,0 +1,32 @@ +import type { CommonEventDefinition } from '@message-queue-toolkit/schemas' +import type { OutboxAccumulator } from './accumulators' +import type { OutboxEntry } from './objects' + +/** + * Takes care of persisting and retrieving outbox entries. + * + * Implementation is required: + * - in order to fulfill at least once delivery guarantee, persisting entries should be performed inside isolated transaction + * - to return entries in the order they were created (UUID7 is used to create entries in OutboxEventEmitter) + * - returned entries should not include the ones with 'SUCCESS' status + */ +export interface OutboxStorage { + createEntry( + outboxEntry: OutboxEntry, + ): Promise> + + /** + * Responsible for taking all entries from the accumulator and persisting them in the storage. + * + * - Items that are in OutboxAccumulator::getEntries MUST be changed to SUCCESS status and `updatedAt` field needs to be set. + * - Items that are in OutboxAccumulator::getFailedEntries MUST be changed to FAILED status, `updatedAt` field needs to be set and retryCount needs to be incremented. + */ + flush(outboxAccumulator: OutboxAccumulator): Promise + + /** + * Returns entries in the order they were created. It doesn't return entries with 'SUCCESS' status. It doesn't return entries that have been retried more than maxRetryCount times. + * + * For example if entry retryCount is 1 and maxRetryCount is 1, entry MUST be returned. If it fails again then retry count is 2, in that case entry MUST NOT be returned. + */ + getEntries(maxRetryCount: number): Promise[]> +} diff --git a/packages/outbox-core/package.json b/packages/outbox-core/package.json new file mode 100644 index 00000000..99972529 --- /dev/null +++ b/packages/outbox-core/package.json @@ -0,0 +1,61 @@ +{ + "name": "@message-queue-toolkit/outbox-core", + "version": "0.1.0", + "private": false, + "license": "MIT", + "description": "Outbox pattern implementation for message queue toolkit", + "maintainers": [ + { + "name": "Igor Savin", + "email": "kibertoad@gmail.com" + } + ], + "main": "dist/index.js", + "types": "dist/index.d.ts", + "scripts": { + "build": "del-cli dist && tsc", + "build:release": "del-cli dist && del-cli coverage && npm run lint && tsc --project tsconfig.release.json", + "test": "vitest", + "test:coverage": "npm test -- --coverage", + "test:ci": "npm run test:coverage", + "lint": "biome check . && tsc --project tsconfig.json --noEmit", + "lint:fix": "biome check --write .", + "prepublishOnly": "npm run build:release" + }, + "dependencies": { + "@lokalise/background-jobs-common": "^7.6.1", + "@supercharge/promise-pool": "^3.2.0", + "uuidv7": "^1.0.2" + }, + "peerDependencies": { + "@message-queue-toolkit/core": ">=14.0.0", + "@message-queue-toolkit/schemas": ">=4.0.0" + }, + "devDependencies": { + "@biomejs/biome": "1.8.3", + "@kibertoad/biome-config": "^1.2.1", + "@types/node": "^22.0.0", + "@vitest/coverage-v8": "^2.0.4", + "del-cli": "^5.1.0", + "typescript": "^5.5.3", + "vitest": "^2.0.4", + "zod": "^3.23.8" + }, + "homepage": "https://github.com/kibertoad/message-queue-toolkit", + "repository": { + "type": "git", + "url": "git://github.com/kibertoad/message-queue-toolkit.git" + }, + "keywords": [ + "message", + "queue", + "queues", + "abstract", + "common", + "utils", + "notification", + "outbox", + "pattern" + ], + "files": ["README.md", "LICENSE", "dist/*"] +} diff --git a/packages/outbox-core/test/InMemoryOutboxStorage.ts b/packages/outbox-core/test/InMemoryOutboxStorage.ts new file mode 100644 index 00000000..5f7ed8b5 --- /dev/null +++ b/packages/outbox-core/test/InMemoryOutboxStorage.ts @@ -0,0 +1,74 @@ +import type { CommonEventDefinition } from '@message-queue-toolkit/schemas' +import type { OutboxAccumulator } from '../lib/accumulators' +import type { OutboxEntry } from '../lib/objects' +import type { OutboxStorage } from '../lib/storage' + +export class InMemoryOutboxStorage + implements OutboxStorage +{ + public entries: OutboxEntry[] = [] + + createEntry( + outboxEntry: OutboxEntry, + ): Promise> { + this.entries = [...this.entries, outboxEntry] + + return Promise.resolve(outboxEntry) + } + + getEntries(maxRetryCount: number): Promise[]> { + const entries = this.entries.filter((entry) => { + return entry.status !== 'SUCCESS' && entry.retryCount <= maxRetryCount + }) + + return Promise.resolve(entries) + } + + update( + outboxEntry: OutboxEntry, + ): Promise> { + this.entries = this.entries.map((entry) => { + if (entry.id === outboxEntry.id) { + return outboxEntry + } + return entry + }) + + return Promise.resolve(outboxEntry) + } + + public async flush(outboxAccumulator: OutboxAccumulator): Promise { + let successEntries = await outboxAccumulator.getEntries() + successEntries = successEntries.map((entry) => { + return { + ...entry, + status: 'SUCCESS', + updateAt: new Date(), + } + }) + this.entries = this.entries.map((entry) => { + const foundEntry = successEntries.find((successEntry) => successEntry.id === entry.id) + if (foundEntry) { + return foundEntry + } + return entry + }) + + let failedEntries = await outboxAccumulator.getFailedEntries() + failedEntries = failedEntries.map((entry) => { + return { + ...entry, + status: 'FAILED', + updateAt: new Date(), + retryCount: entry.retryCount + 1, + } + }) + this.entries = this.entries.map((entry) => { + const foundEntry = failedEntries.find((failedEntry) => failedEntry.id === entry.id) + if (foundEntry) { + return foundEntry + } + return entry + }) + } +} diff --git a/packages/outbox-core/test/outbox.spec.ts b/packages/outbox-core/test/outbox.spec.ts new file mode 100644 index 00000000..ec1950c5 --- /dev/null +++ b/packages/outbox-core/test/outbox.spec.ts @@ -0,0 +1,256 @@ +import { randomUUID } from 'node:crypto' +import { + CommonMetadataFiller, + DomainEventEmitter, + EventRegistry, +} from '@message-queue-toolkit/core' +import { + type CommonEventDefinition, + type CommonEventDefinitionPublisherSchemaType, + enrichMessageSchemaWithBase, +} from '@message-queue-toolkit/schemas' +import pino, { type Logger } from 'pino' +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' +import { z } from 'zod' +import { InMemoryOutboxAccumulator } from '../lib/accumulators' +import { type OutboxDependencies, OutboxEventEmitter, OutboxProcessor } from '../lib/outbox' +import { InMemoryOutboxStorage } from './InMemoryOutboxStorage' + +const TestEvents = { + created: { + ...enrichMessageSchemaWithBase( + 'entity.created', + z.object({ + message: z.string(), + }), + ), + }, + + updated: { + ...enrichMessageSchemaWithBase( + 'entity.updated', + z.object({ + message: z.string(), + }), + ), + }, +} as const satisfies Record + +type TestEventsType = (typeof TestEvents)[keyof typeof TestEvents][] + +const createdEventPayload: CommonEventDefinitionPublisherSchemaType = { + payload: { + message: 'msg', + }, + type: 'entity.created', + metadata: { + originatedFrom: 'service', + producedBy: 'producer', + schemaVersion: '1', + correlationId: randomUUID(), + }, +} + +const TestLogger: Logger = pino() + +const MAX_RETRY_COUNT = 2 + +describe('outbox', () => { + let outboxProcessor: OutboxProcessor + let eventEmitter: DomainEventEmitter + let outboxEventEmitter: OutboxEventEmitter + let outboxStorage: InMemoryOutboxStorage + let inMemoryOutboxAccumulator: InMemoryOutboxAccumulator + + beforeEach(() => { + eventEmitter = new DomainEventEmitter({ + logger: TestLogger, + errorReporter: { report: () => {} }, + eventRegistry: new EventRegistry(Object.values(TestEvents)), + metadataFiller: new CommonMetadataFiller({ + serviceId: 'test', + }), + }) + + outboxStorage = new InMemoryOutboxStorage() + outboxEventEmitter = new OutboxEventEmitter(outboxStorage) + inMemoryOutboxAccumulator = new InMemoryOutboxAccumulator() + outboxProcessor = new OutboxProcessor( + { + outboxStorage, + //@ts-ignore + outboxAccumulator: inMemoryOutboxAccumulator, + eventEmitter, + } satisfies OutboxDependencies, + { maxRetryCount: MAX_RETRY_COUNT, emitBatchSize: 1 }, + ) + }) + + afterEach(() => { + vi.restoreAllMocks() + }) + + it('saves outbox entry to storage', async () => { + await outboxEventEmitter.emit(TestEvents.created, createdEventPayload, { + correlationId: randomUUID(), + }) + + const entries = await outboxStorage.getEntries(MAX_RETRY_COUNT) + + expect(entries).toHaveLength(1) + }) + + it('saves outbox entry and process it', async () => { + await outboxEventEmitter.emit(TestEvents.created, createdEventPayload, { + correlationId: randomUUID(), + }) + + await outboxProcessor.processOutboxEntries({ + logger: TestLogger, + reqId: randomUUID(), + executorId: randomUUID(), + }) + + const entries = await outboxStorage.getEntries(MAX_RETRY_COUNT) + + expect(entries).toHaveLength(0) + expect(outboxStorage.entries).toMatchObject([ + { + status: 'SUCCESS', + }, + ]) + }) + + it('saves outbox entry and process it with error and retries', async () => { + const mockedEventEmitter = vi.spyOn(eventEmitter, 'emit') + mockedEventEmitter.mockImplementationOnce(() => { + throw new Error('Could not emit event.') + }) + mockedEventEmitter.mockImplementationOnce(() => + Promise.resolve({ + ...createdEventPayload, + id: randomUUID(), + timestamp: new Date().toISOString(), + metadata: { + schemaVersion: '1', + producedBy: 'test', + originatedFrom: 'service', + correlationId: randomUUID(), + }, + }), + ) + + await outboxEventEmitter.emit(TestEvents.created, createdEventPayload, { + correlationId: randomUUID(), + }) + + await outboxProcessor.processOutboxEntries({ + logger: TestLogger, + reqId: randomUUID(), + executorId: randomUUID(), + }) + + let entries = await outboxStorage.getEntries(MAX_RETRY_COUNT) + expect(entries).toHaveLength(1) + expect(outboxStorage.entries).toMatchObject([ + { + status: 'FAILED', + retryCount: 1, + }, + ]) + + //Now let's process again successfully + await outboxProcessor.processOutboxEntries({ + logger: TestLogger, + reqId: randomUUID(), + executorId: randomUUID(), + }) + + entries = await outboxStorage.getEntries(MAX_RETRY_COUNT) + expect(entries).toHaveLength(0) //Nothing to process anymore + expect(outboxStorage.entries).toMatchObject([ + { + status: 'SUCCESS', + retryCount: 1, + }, + ]) + }) + + it('no longer processes the event if exceeded retry count', async () => { + //Let's always fail the event + const mockedEventEmitter = vi.spyOn(eventEmitter, 'emit') + mockedEventEmitter.mockImplementation(() => { + throw new Error('Could not emit event.') + }) + + //Persist the event + await outboxEventEmitter.emit(TestEvents.created, createdEventPayload, { + correlationId: randomUUID(), + }) + + //Initially event is present in outbox storage. + expect(await outboxStorage.getEntries(MAX_RETRY_COUNT)).toHaveLength(1) + + //Retry +1 + await outboxProcessor.processOutboxEntries({ + logger: TestLogger, + reqId: randomUUID(), + executorId: randomUUID(), + }) + //Still present + expect(await outboxStorage.getEntries(MAX_RETRY_COUNT)).toHaveLength(1) + + //Retry +2 + await outboxProcessor.processOutboxEntries({ + logger: TestLogger, + reqId: randomUUID(), + executorId: randomUUID(), + }) + //Stil present + expect(await outboxStorage.getEntries(MAX_RETRY_COUNT)).toHaveLength(1) + + //Retry +3 + await outboxProcessor.processOutboxEntries({ + logger: TestLogger, + reqId: randomUUID(), + executorId: randomUUID(), + }) + //Now it's gone, we no longer try to process it + expect(await outboxStorage.getEntries(MAX_RETRY_COUNT)).toHaveLength(0) + + expect(outboxStorage.entries).toMatchObject([ + { + status: 'FAILED', + retryCount: 3, + }, + ]) + }) + + it("doesn't emit event again if it's already present in accumulator", async () => { + const mockedEventEmitter = vi.spyOn(eventEmitter, 'emit') + + await outboxEventEmitter.emit(TestEvents.created, createdEventPayload, { + correlationId: randomUUID(), + }) + + await inMemoryOutboxAccumulator.add(outboxStorage.entries[0]) + + await outboxProcessor.processOutboxEntries({ + logger: TestLogger, + reqId: randomUUID(), + executorId: randomUUID(), + }) + + //We pretended that event was emitted in previous run by adding state to accumulator + expect(mockedEventEmitter).toHaveBeenCalledTimes(0) + + //But after the loop, if successful, it should be marked as success anyway + expect(outboxStorage.entries).toMatchObject([ + { + status: 'SUCCESS', + }, + ]) + //And accumulator should be cleared + expect(await inMemoryOutboxAccumulator.getEntries()).toHaveLength(0) + }) +}) diff --git a/packages/outbox-core/tsconfig.json b/packages/outbox-core/tsconfig.json new file mode 100644 index 00000000..9cd7c80a --- /dev/null +++ b/packages/outbox-core/tsconfig.json @@ -0,0 +1,27 @@ +{ + "compilerOptions": { + "outDir": "dist", + "module": "commonjs", + "target": "ES2022", + "lib": ["ES2022", "dom"], + "sourceMap": true, + "declaration": true, + "declarationMap": false, + "types": ["node", "vitest/globals"], + "strict": true, + "moduleResolution": "node", + "noUnusedLocals": false, + "noUnusedParameters": false, + "noFallthroughCasesInSwitch": true, + "strictNullChecks": true, + "importHelpers": true, + "baseUrl": ".", + "skipLibCheck": true, + "allowSyntheticDefaultImports": true, + "esModuleInterop": true, + "forceConsistentCasingInFileNames": true, + "resolveJsonModule": true + }, + "include": ["lib/**/*.ts", "test/**/*.ts", "index.ts"], + "exclude": ["node_modules", "dist"] +} diff --git a/packages/outbox-core/tsconfig.release.json b/packages/outbox-core/tsconfig.release.json new file mode 100644 index 00000000..93ab99f8 --- /dev/null +++ b/packages/outbox-core/tsconfig.release.json @@ -0,0 +1,5 @@ +{ + "extends": "./tsconfig.json", + "include": ["lib/**/*.ts", "index.ts"], + "exclude": ["node_modules", "dist", "lib/**/*.spec.ts"] +} diff --git a/packages/outbox-core/vitest.config.mts b/packages/outbox-core/vitest.config.mts new file mode 100644 index 00000000..2bcce478 --- /dev/null +++ b/packages/outbox-core/vitest.config.mts @@ -0,0 +1,23 @@ +import { defineConfig } from 'vitest/config' + +export default defineConfig({ + test: { + globals: true, + watch: false, + environment: 'node', + reporters: ['default'], + coverage: { + provider: 'v8', + include: ['lib/**/*.ts'], + exclude: ['lib/**/*.spec.ts', 'lib/**/*.test.ts', 'test/**/*.*'], + reporter: ['text'], + all: true, + thresholds: { + lines: 100, + functions: 100, + branches: 91.66, + statements: 100, + }, + }, + }, +})