-
-
Notifications
You must be signed in to change notification settings - Fork 24
/
exampleDLQConsumer.ts
39 lines (33 loc) · 1.27 KB
/
exampleDLQConsumer.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/* eslint-disable no-console */
import { DeadLetterQueue } from 'kafka-penguin';
const client = require('./clientConfig.ts');
const topic = 'test-topic-DLQ';
// This allows the consumer to evaluate each message according to a condition
// The callback must return a boolean value
const callback = (message) => {
try {
if (typeof message.value === 'string') {
return true;
}
} catch (e) {
return false;
}
return true;
};
// Set up the Dead Letter Queue (DLQ) strategy
// with a configured KafkaJS client, a topic, and the evaluating callback
const exampleDLQConsumer = new DeadLetterQueue(client, topic, callback);
// Initialize a consumer from the new instance of the Dead Letter Queue strategy
const consumerDLQ = exampleDLQConsumer.consumer({ groupId: 'testID' });
// Connecting the consumer creates a DLQ topic in case of bad messages
// If the callback returns false, the strategy moves the message to the topic specific DLQ
// The consumer is able to keep consuming good messages from the topic
consumerDLQ.connect()
.then(consumerDLQ.subscribe())
.then(() => consumerDLQ.run({
eachMessage: ({ message }) => {
if (message.value.length < 5) return true;
return false;
},
}))
.catch((e) => console.log(`Error message from consumer: ${e}`));