Skip to content

Commit

Permalink
Ack with higher frequency if the buffer is empty
Browse files Browse the repository at this point in the history
  • Loading branch information
exAspArk committed Feb 7, 2024
1 parent 527527f commit 2f7f711
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 4 deletions.
2 changes: 1 addition & 1 deletion core/src/ingestion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ export const runIngestionLoop = async (
`Fetched: ${messages.length}`,
`Saving: ${stitchedChangeMessages.length}`,
`Pending: ${changeMessagesBuffer.size()}`,
`Ack sequence: #${ackStreamSequence}`,
`Ack sequence: ${ackStreamSequence ? `#${ackStreamSequence}` : 'none'}`,
`Last sequence: #${lastStreamSequence}`,
].join('. '))

Expand Down
26 changes: 24 additions & 2 deletions core/src/specs/stitching.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
process.env.LOG_LEVEL = 'INFO'

import { stitchChangeMessages } from '../stitching';
import { ChangeMessage, MESSAGE_PREFIX_CONTEXT, MESSAGE_PREFIX_HEARTBEAT } from '../change-message';
import { ChangeMessagesBuffer } from '../change-message-buffer';
Expand Down Expand Up @@ -32,7 +34,7 @@ describe('stitchChangeMessages', () => {
stitchedChangeMessages: [
findChangeMessage(changeMessages, 2).setContext(findChangeMessage(changeMessages, 1).context()),
],
ackStreamSequence: 2,
ackStreamSequence: 3,
newChangeMessagesBuffer: ChangeMessagesBuffer.fromStore({}),
})
})
Expand Down Expand Up @@ -66,6 +68,26 @@ describe('stitchChangeMessages', () => {
}),
})
})

test('acks the last heartbeat message if the buffer is empty', () => {
const subject = 'bemi-subject'
const changeMessages = [
new ChangeMessage({ subject, streamSequence: 1, changeAttributes: { ...CHANGE_ATTRIBUTES.HEARTBEAT_MESSAGE, position: 1 }, messagePrefix: MESSAGE_PREFIX_HEARTBEAT }),
new ChangeMessage({ subject, streamSequence: 3, changeAttributes: { ...CHANGE_ATTRIBUTES.HEARTBEAT_MESSAGE, position: 3 }, messagePrefix: MESSAGE_PREFIX_HEARTBEAT }),
new ChangeMessage({ subject, streamSequence: 2, changeAttributes: { ...CHANGE_ATTRIBUTES.HEARTBEAT_MESSAGE, position: 2 }, messagePrefix: MESSAGE_PREFIX_HEARTBEAT }),
]

const result = stitchChangeMessages({
changeMessagesBuffer: new ChangeMessagesBuffer().addChangeMessages(changeMessages),
useBuffer: true,
})

expect(result).toStrictEqual({
stitchedChangeMessages: [],
ackStreamSequence: 3,
newChangeMessagesBuffer: ChangeMessagesBuffer.fromStore({}),
})
})
})

describe('when messages in separate batches', () => {
Expand Down Expand Up @@ -199,7 +221,7 @@ describe('stitchChangeMessages', () => {
stitchedChangeMessages: [
findChangeMessage(changeMessages1, 1),
],
ackStreamSequence: 1,
ackStreamSequence: 2,
newChangeMessagesBuffer: ChangeMessagesBuffer.fromStore({}),
})
})
Expand Down
7 changes: 6 additions & 1 deletion core/src/stitching.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@ export const stitchChangeMessages = (
let stitchedChangeMessages: ChangeMessage[] = []
let ackSequenceBySubject: { [key: string]: string | undefined } = {}
let newChangeMessagesBuffer = new ChangeMessagesBuffer()
let maxSequence: number | undefined = undefined

changeMessagesBuffer.forEach((subject, sortedChangeMessages) => {
let ackSequence: number | undefined = undefined

if (sortedChangeMessages.length && (!maxSequence || sortedChangeMessages[sortedChangeMessages.length - 1].streamSequence > maxSequence)) {
maxSequence = sortedChangeMessages[sortedChangeMessages.length - 1].streamSequence
}

sortedChangeMessages.forEach((changeMessage) => {
const position = changeMessage.changeAttributes.position.toString()
const changeMessages = changeMessagesBuffer.changeMessagesByPosition(subject, position)
Expand Down Expand Up @@ -77,6 +82,6 @@ export const stitchChangeMessages = (
return {
stitchedChangeMessages,
newChangeMessagesBuffer,
ackStreamSequence,
ackStreamSequence: newChangeMessagesBuffer.size() ? ackStreamSequence : maxSequence,
}
}

0 comments on commit 2f7f711

Please sign in to comment.