Skip to content

Commit

Permalink
Filter out non-Bemi replication messages
Browse files Browse the repository at this point in the history
  • Loading branch information
exAspArk committed May 20, 2024
1 parent 134797d commit e554e1f
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 60 deletions.
22 changes: 9 additions & 13 deletions core/src/fetched-record.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import { NatsMessage, decodeData } from './nats'

export const MESSAGE_PREFIX_CONTEXT = '_bemi'
export const MESSAGE_PREFIX_HEARTBEAT = '_bemi_heartbeat'
export const HEARTBEAT_CHANGE_SCHEMA = '_bemi'
export const HEARTBEAT_CHANGE_TABLE = 'heartbeats'
const UNAVAILABLE_VALUE_PLACEHOLDER = '__bemi_unavailable_value'

const parseDebeziumData = (debeziumChange: any, now: Date) => {
Expand Down Expand Up @@ -81,20 +79,26 @@ export class FetchedRecord {
static fromNatsMessage(natsMessage: NatsMessage, now = new Date()) {
const debeziumData = decodeData(natsMessage.data) as any

const messagePrefix = debeziumData.message?.prefix
if (messagePrefix && messagePrefix !== MESSAGE_PREFIX_CONTEXT && messagePrefix !== MESSAGE_PREFIX_HEARTBEAT) {
// Ignore non-Bemi message prefixes
return null
}

return new FetchedRecord({
changeAttributes: parseDebeziumData(debeziumData, now),
subject: natsMessage.subject,
streamSequence: natsMessage.info.streamSequence,
messagePrefix: debeziumData.message?.prefix,
messagePrefix,
})
}

isContextMessage() {
return this.isMessage() && this.messagePrefix === MESSAGE_PREFIX_CONTEXT
}

isHeartbeat() {
return this.isHeartbeatMessage() || this.isHeartbeatChange()
isHeartbeatMessage() {
return this.isMessage() && this.messagePrefix === MESSAGE_PREFIX_HEARTBEAT
}

context() {
Expand All @@ -110,12 +114,4 @@ export class FetchedRecord {
private isMessage() {
return this.changeAttributes.operation === Operation.MESSAGE
}

private isHeartbeatMessage() {
return this.isMessage() && this.messagePrefix === MESSAGE_PREFIX_HEARTBEAT
}

private isHeartbeatChange() {
return this.changeAttributes.schema === HEARTBEAT_CHANGE_SCHEMA && this.changeAttributes.table === HEARTBEAT_CHANGE_TABLE
}
}
2 changes: 1 addition & 1 deletion core/src/ingestion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ export const runIngestionLoop = async (
// Stitching
const now = new Date()
const natsMessages = Object.values(natsMessageBySequence)
const fetchedRecords = natsMessages.map((m: NatsMessage) => FetchedRecord.fromNatsMessage(m, now))
const fetchedRecords = natsMessages.map((m: NatsMessage) => FetchedRecord.fromNatsMessage(m, now)).filter(r => r) as FetchedRecord[]
const { stitchedFetchedRecords, newFetchedRecordBuffer, ackStreamSequence } = stitchFetchedRecords({
fetchedRecordBuffer: fetchedRecordBuffer.addFetchedRecords(fetchedRecords),
useBuffer,
Expand Down
12 changes: 12 additions & 0 deletions core/src/specs/fetched-record.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,16 @@ describe('fromNatsMessage', () => {
}),
])
})

test('ignores non-Bemi messages', () => {
const natsMessage = buildNatsMessage({
subject: 'bemi-subject',
streamSequence: 1,
data: MESSAGE_DATA.NON_BEMI_MESSAGE,
})

const result = FetchedRecord.fromNatsMessage(natsMessage)

expect(result).toStrictEqual(null)
})
})
16 changes: 0 additions & 16 deletions core/src/specs/fixtures/fetched-records.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { HEARTBEAT_CHANGE_SCHEMA, HEARTBEAT_CHANGE_TABLE } from '../../fetched-record'
import { Operation } from '../../entities/Change'

import { POSITIONS } from './nats-messages'
Expand Down Expand Up @@ -111,21 +110,6 @@ export const CHANGE_ATTRIBUTES = {
before: {},
after: {},
},
HEARTBEAT_CHANGE: {
committedAt: MOCKED_DATE,
createdAt: MOCKED_DATE,
database: "bemi_dev_source",
context: {},
operation: Operation.UPDATE,
position: POSITIONS.HEARTBEAT_CHANGE,
primaryKey: "1",
queuedAt: MOCKED_DATE,
schema: HEARTBEAT_CHANGE_SCHEMA,
table: HEARTBEAT_CHANGE_TABLE,
transactionId: 770,
before: null,
after: { id: 1, last_heartbeat_at: "2024-04-18T20:40:29.086091Z" },
},
TRUNCATE: {
committedAt: MOCKED_DATE,
createdAt: MOCKED_DATE,
Expand Down
21 changes: 20 additions & 1 deletion core/src/specs/fixtures/nats-messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,26 @@ export const MESSAGE_DATA = {
op: "u",
ts_ms: 1713472829090,
transaction: null,
}
},
NON_BEMI_MESSAGE: {
op: 'm',
ts_ms: 1706128742602,
source: {
version: '2.5.0-SNAPSHOT',
connector: 'postgresql',
name: 'us-west-1-bemi-dev',
ts_ms: 1706128742600,
snapshot: 'false',
db: 'bemi_dev_source',
sequence: '["371211440","371211568"]',
schema: '',
table: '',
txId: null,
lsn: 94229520,
xmin: null
},
message: { prefix: 'something-else', content: 'cGluZw==' }
},
}

export const buildNatsMessage = (
Expand Down
34 changes: 6 additions & 28 deletions core/src/specs/stitching.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,28 +39,6 @@ describe('stitchFetchedRecords', () => {
})
})

test('stitches context if it is first, ignores a heartbeat change', () => {
const subject = 'bemi-subject'
const fetchedRecords = [
new FetchedRecord({ subject, streamSequence: 1, changeAttributes: CHANGE_ATTRIBUTES.CREATE_MESSAGE, messagePrefix: MESSAGE_PREFIX_CONTEXT }),
new FetchedRecord({ subject, streamSequence: 2, changeAttributes: CHANGE_ATTRIBUTES.CREATE }),
new FetchedRecord({ subject, streamSequence: 3, changeAttributes: CHANGE_ATTRIBUTES.HEARTBEAT_CHANGE }),
]

const result = stitchFetchedRecords({
fetchedRecordBuffer: new FetchedRecordBuffer().addFetchedRecords(fetchedRecords),
useBuffer: true,
})

expect(result).toStrictEqual({
stitchedFetchedRecords: [
findFetchedRecord(fetchedRecords, 2).setContext(findFetchedRecord(fetchedRecords, 1).context()),
],
ackStreamSequence: 3,
newFetchedRecordBuffer: FetchedRecordBuffer.fromStore({}),
})
})

test('stitches context if it is second and pauses on the one before last position', () => {
const subject = 'bemi-subject'
const fetchedRecords = [
Expand Down Expand Up @@ -113,14 +91,14 @@ describe('stitchFetchedRecords', () => {
})

describe('when messages from separate subjects', () => {
test('stitches context across multiple subjects with a heartbeat change and pending context', () => {
test('stitches context across multiple subjects with a heartbeat message and pending context', () => {
const subject1 = 'bemi-subject-1'
const subject2 = 'bemi-subject-2'
const updateMessagePosition = CHANGE_ATTRIBUTES.HEARTBEAT_CHANGE.position + 1
const updateMessagePosition = CHANGE_ATTRIBUTES.HEARTBEAT_MESSAGE.position + 1
const fetchedRecords = [
new FetchedRecord({ subject: subject1, streamSequence: 1, changeAttributes: CHANGE_ATTRIBUTES.CREATE_MESSAGE, messagePrefix: MESSAGE_PREFIX_CONTEXT }),
new FetchedRecord({ subject: subject1, streamSequence: 2, changeAttributes: CHANGE_ATTRIBUTES.CREATE }),
new FetchedRecord({ subject: subject2, streamSequence: 3, changeAttributes: CHANGE_ATTRIBUTES.HEARTBEAT_CHANGE }),
new FetchedRecord({ subject: subject2, streamSequence: 3, changeAttributes: CHANGE_ATTRIBUTES.HEARTBEAT_MESSAGE, messagePrefix: MESSAGE_PREFIX_HEARTBEAT }),
new FetchedRecord({ subject: subject2, streamSequence: 4, changeAttributes: { ...CHANGE_ATTRIBUTES.UPDATE_MESSAGE, position: updateMessagePosition }, messagePrefix: MESSAGE_PREFIX_CONTEXT }),
]

Expand All @@ -144,7 +122,7 @@ describe('stitchFetchedRecords', () => {
})
})

test('stitches context across multiple subjects with a heartbeat change and pending change', () => {
test('stitches context across multiple subjects with a heartbeat message and pending change', () => {
const subject1 = 'bemi-subject-1'
const subject2 = 'bemi-subject-2'
const updatePosition = CHANGE_ATTRIBUTES.HEARTBEAT_MESSAGE.position + 1
Expand Down Expand Up @@ -175,13 +153,13 @@ describe('stitchFetchedRecords', () => {
})
})

test('stitches context across multiple subjects with a single heartbeat change in one of them', () => {
test('stitches context across multiple subjects with a single heartbeat message in one of them', () => {
const subject1 = 'bemi-subject-1'
const subject2 = 'bemi-subject-2'
const fetchedRecords = [
new FetchedRecord({ subject: subject1, streamSequence: 1, changeAttributes: CHANGE_ATTRIBUTES.CREATE_MESSAGE, messagePrefix: MESSAGE_PREFIX_CONTEXT }),
new FetchedRecord({ subject: subject1, streamSequence: 2, changeAttributes: CHANGE_ATTRIBUTES.CREATE }),
new FetchedRecord({ subject: subject2, streamSequence: 3, changeAttributes: CHANGE_ATTRIBUTES.HEARTBEAT_CHANGE }),
new FetchedRecord({ subject: subject2, streamSequence: 3, changeAttributes: CHANGE_ATTRIBUTES.HEARTBEAT_MESSAGE, messagePrefix: MESSAGE_PREFIX_HEARTBEAT }),
]

const result = stitchFetchedRecords({
Expand Down
2 changes: 1 addition & 1 deletion core/src/stitching.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export const stitchFetchedRecords = (
const contextFetchedRecord = samePositionFetchedRecords.find(r => r.isContextMessage())

// If it's a heartbeat message/change, use its sequence number
if (fetchedRecord.isHeartbeat()) {
if (fetchedRecord.isHeartbeatMessage()) {
logger.debug(`Ignoring heartbeat message`)
if (!maxSubjectSequence || maxSubjectSequence < fetchedRecord.streamSequence) {
maxSubjectSequence = fetchedRecord.streamSequence
Expand Down
1 change: 1 addition & 0 deletions docs/docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ keywords: ['Bemi Changelog', 'Bemi New Features', 'Postgres Audit Trails', 'Chan
* Allow saving information about a PostgreSQL user who made data changes in the app context
* Integrations
* [GCP Cloud SQL](https://docs.bemi.io/postgresql/source-database#gcp-cloud-sql): describe how to enable logical decoding and connect
* [PowerSync](https://www.powersync.com): make ingestion worker compatible with a separate PostgreSQL replication

## 2024-04

Expand Down

0 comments on commit e554e1f

Please sign in to comment.