From 2f7f711cd587a96b83cdaaace5dea107f320da42 Mon Sep 17 00:00:00 2001 From: exAspArk Date: Wed, 7 Feb 2024 16:05:19 -0500 Subject: [PATCH] Ack with higher frequency if the buffer is empty --- core/src/ingestion.ts | 2 +- core/src/specs/stitching.spec.ts | 26 ++++++++++++++++++++++++-- core/src/stitching.ts | 7 ++++++- 3 files changed, 31 insertions(+), 4 deletions(-) diff --git a/core/src/ingestion.ts b/core/src/ingestion.ts index 0997a27..05ec912 100644 --- a/core/src/ingestion.ts +++ b/core/src/ingestion.ts @@ -102,7 +102,7 @@ export const runIngestionLoop = async ( `Fetched: ${messages.length}`, `Saving: ${stitchedChangeMessages.length}`, `Pending: ${changeMessagesBuffer.size()}`, - `Ack sequence: #${ackStreamSequence}`, + `Ack sequence: ${ackStreamSequence ? `#${ackStreamSequence}` : 'none'}`, `Last sequence: #${lastStreamSequence}`, ].join('. ')) diff --git a/core/src/specs/stitching.spec.ts b/core/src/specs/stitching.spec.ts index 2acc2d0..c77e37b 100644 --- a/core/src/specs/stitching.spec.ts +++ b/core/src/specs/stitching.spec.ts @@ -1,3 +1,5 @@ +process.env.LOG_LEVEL = 'INFO' + import { stitchChangeMessages } from '../stitching'; import { ChangeMessage, MESSAGE_PREFIX_CONTEXT, MESSAGE_PREFIX_HEARTBEAT } from '../change-message'; import { ChangeMessagesBuffer } from '../change-message-buffer'; @@ -32,7 +34,7 @@ describe('stitchChangeMessages', () => { stitchedChangeMessages: [ findChangeMessage(changeMessages, 2).setContext(findChangeMessage(changeMessages, 1).context()), ], - ackStreamSequence: 2, + ackStreamSequence: 3, newChangeMessagesBuffer: ChangeMessagesBuffer.fromStore({}), }) }) @@ -66,6 +68,26 @@ describe('stitchChangeMessages', () => { }), }) }) + + test('acks the last heartbeat message if the buffer is empty', () => { + const subject = 'bemi-subject' + const changeMessages = [ + new ChangeMessage({ subject, streamSequence: 1, changeAttributes: { ...CHANGE_ATTRIBUTES.HEARTBEAT_MESSAGE, position: 1 }, messagePrefix: MESSAGE_PREFIX_HEARTBEAT }), + new ChangeMessage({ subject, streamSequence: 3, changeAttributes: { ...CHANGE_ATTRIBUTES.HEARTBEAT_MESSAGE, position: 3 }, messagePrefix: MESSAGE_PREFIX_HEARTBEAT }), + new ChangeMessage({ subject, streamSequence: 2, changeAttributes: { ...CHANGE_ATTRIBUTES.HEARTBEAT_MESSAGE, position: 2 }, messagePrefix: MESSAGE_PREFIX_HEARTBEAT }), + ] + + const result = stitchChangeMessages({ + changeMessagesBuffer: new ChangeMessagesBuffer().addChangeMessages(changeMessages), + useBuffer: true, + }) + + expect(result).toStrictEqual({ + stitchedChangeMessages: [], + ackStreamSequence: 3, + newChangeMessagesBuffer: ChangeMessagesBuffer.fromStore({}), + }) + }) }) describe('when messages in separate batches', () => { @@ -199,7 +221,7 @@ describe('stitchChangeMessages', () => { stitchedChangeMessages: [ findChangeMessage(changeMessages1, 1), ], - ackStreamSequence: 1, + ackStreamSequence: 2, newChangeMessagesBuffer: ChangeMessagesBuffer.fromStore({}), }) }) diff --git a/core/src/stitching.ts b/core/src/stitching.ts index a910f0d..d527471 100644 --- a/core/src/stitching.ts +++ b/core/src/stitching.ts @@ -9,10 +9,15 @@ export const stitchChangeMessages = ( let stitchedChangeMessages: ChangeMessage[] = [] let ackSequenceBySubject: { [key: string]: string | undefined } = {} let newChangeMessagesBuffer = new ChangeMessagesBuffer() + let maxSequence: number | undefined = undefined changeMessagesBuffer.forEach((subject, sortedChangeMessages) => { let ackSequence: number | undefined = undefined + if (sortedChangeMessages.length && (!maxSequence || sortedChangeMessages[sortedChangeMessages.length - 1].streamSequence > maxSequence)) { + maxSequence = sortedChangeMessages[sortedChangeMessages.length - 1].streamSequence + } + sortedChangeMessages.forEach((changeMessage) => { const position = changeMessage.changeAttributes.position.toString() const changeMessages = changeMessagesBuffer.changeMessagesByPosition(subject, position) @@ -77,6 +82,6 @@ export const stitchChangeMessages = ( return { stitchedChangeMessages, newChangeMessagesBuffer, - ackStreamSequence, + ackStreamSequence: newChangeMessagesBuffer.size() ? ackStreamSequence : maxSequence, } }