Skip to content

Commit

Permalink
Concurrent consumers support (#232)
Browse files Browse the repository at this point in the history
* Allow to specify number of concurrent consumers to create

* linting

* improved test flow, added concurrentConsumersAmount parameter to README

* tests fix

* linting

* concurrent consumers test for SnsSqsPermissionConsumer

* Update README.md

Co-authored-by: Igor Savin <iselwin@gmail.com>

* Prepare to release @message-queue-toolkit/core 17.2.3

* Prepare to release @message-queue-toolkit/sqs 17.3.0

* Fixed import

* Prepare to release @message-queue-toolkit/sns 18.1.1

* Updated sns package version

---------

Co-authored-by: Igor Savin <iselwin@gmail.com>
  • Loading branch information
kjamrog and kibertoad authored Dec 13, 2024
1 parent c46d63f commit 2c9bedf
Show file tree
Hide file tree
Showing 11 changed files with 186 additions and 30 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ Multi-schema consumers support multiple message types via handler configs. They
* `handlerSpy` - allow awaiting certain messages to be published (see [Handler Spies](#handler-spies) for more information);
* `logMessages` - add logs for processed messages.
* `payloadStoreConfig` - configuration for payload offloading. This option enables the external storage of large message payloads to comply with message size limitations of the queue system. For more details on setting this up, see [Payload Offloading](#payload-offloading).
* `concurrentConsumersAmount` - configuration for specifying the number of concurrent consumers to create. Available only for SQS and SNS consumers
* `init()`, prepare consumer for use (e. g. establish all necessary connections);
* `close()`, stop listening for messages and disconnect;
* `start()`, which invokes `init()`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ describe('AmqpPermissionConsumer', () => {
await newConsumer.handlerSpy.waitForMessageWithId('1', 'consumed')

expect(logger.loggedMessages.length).toBe(5)
expect(logger.loggedMessages).toEqual([
expect(logger.loggedMessages).toMatchObject([
'Propagating new connection across 0 receivers',
{
id: '1',
Expand Down
16 changes: 14 additions & 2 deletions packages/core/lib/queues/AbstractQueueService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,27 @@ export abstract class AbstractQueueService<
}

protected logProcessedMessage(
_message: MessagePayloadSchemas | null,
message: MessagePayloadSchemas | null,
processingResult: MessageProcessingResult,
messageId?: string,
) {
const messageTimestamp = message ? this.tryToExtractTimestamp(message) : undefined
const messageProcessingMilliseconds = messageTimestamp
? Date.now() - messageTimestamp.getTime()
: undefined

const messageType =
message && this.messageTypeField in message
? // @ts-ignore
message[this.messageTypeField]
: undefined

this.logger.debug(
{
processingResult,
messageId,
messageProcessingTime: messageProcessingMilliseconds,
messageType,
},
`Finished processing message ${messageId ?? `(unknown id)`}`,
)
Expand Down Expand Up @@ -206,7 +219,6 @@ export abstract class AbstractQueueService<
if (this.logMessages) {
// @ts-ignore
const resolvedMessageId: string | undefined = message?.[this.messageIdField] ?? messageId

this.logProcessedMessage(message, processingResult, resolvedMessageId)
}
}
Expand Down
2 changes: 1 addition & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@message-queue-toolkit/core",
"version": "17.2.1",
"version": "17.2.3",
"private": false,
"license": "MIT",
"description": "Useful utilities, interfaces and base classes for message queue handling. Supports AMQP and SQS with a common abstraction on top currently",
Expand Down
4 changes: 2 additions & 2 deletions packages/sns/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@message-queue-toolkit/sns",
"version": "18.0.1",
"version": "18.1.0",
"private": false,
"license": "MIT",
"description": "SNS adapter for message-queue-toolkit",
Expand Down Expand Up @@ -35,7 +35,7 @@
"@aws-sdk/client-sts": "^3.632.0",
"@message-queue-toolkit/core": ">=15.0.0",
"@message-queue-toolkit/schemas": ">=2.0.0",
"@message-queue-toolkit/sqs": "^17.0.0"
"@message-queue-toolkit/sqs": "^17.3.0"
},
"devDependencies": {
"@aws-sdk/client-s3": "^3.670.0",
Expand Down
64 changes: 62 additions & 2 deletions packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import { ListTagsForResourceCommand, type SNSClient } from '@aws-sdk/client-sns'
import { ListQueueTagsCommand, type SQSClient } from '@aws-sdk/client-sqs'
import { waitAndRetry } from '@lokalise/node-core'
import { assertQueue, deleteQueue, getQueueAttributes } from '@message-queue-toolkit/sqs'
import { type AwilixContainer, asValue } from 'awilix'
import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest'
import { type AwilixContainer, asFunction, asValue } from 'awilix'
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest'

import { assertTopic, deleteTopic } from '../../lib/utils/snsUtils'
import { SnsPermissionPublisher } from '../publishers/SnsPermissionPublisher'
Expand All @@ -13,6 +13,7 @@ import type { Dependencies } from '../utils/testContext'

import type { STSClient } from '@aws-sdk/client-sts'
import { SnsSqsPermissionConsumer } from './SnsSqsPermissionConsumer'
import type { PERMISSIONS_ADD_MESSAGE_TYPE } from './userConsumerSchemas'

describe('SnsSqsPermissionConsumer', () => {
describe('init', () => {
Expand Down Expand Up @@ -706,6 +707,65 @@ describe('SnsSqsPermissionConsumer', () => {
})
})

describe('multiple consumers', () => {
let diContainer: AwilixContainer<Dependencies>

let publisher: SnsPermissionPublisher
let consumer: SnsSqsPermissionConsumer

beforeEach(async () => {
diContainer = await registerDependencies({
permissionConsumer: asFunction((dependencies) => {
return new SnsSqsPermissionConsumer(dependencies, {
creationConfig: {
topic: {
Name: SnsSqsPermissionConsumer.SUBSCRIBED_TOPIC_NAME,
},
queue: {
QueueName: SnsSqsPermissionConsumer.CONSUMED_QUEUE_NAME,
},
updateAttributesIfExists: true,
},
deletionConfig: {
deleteIfExists: true,
},
concurrentConsumersAmount: 10,
})
}),
})
publisher = diContainer.cradle.permissionPublisher
consumer = diContainer.cradle.permissionConsumer

await consumer.start()
})

afterEach(async () => {
await diContainer.cradle.awilixManager.executeDispose()
await diContainer.dispose()
})

it('process all messages properly', async () => {
const messagesAmount = 50
const messages: PERMISSIONS_ADD_MESSAGE_TYPE[] = Array.from({ length: messagesAmount }).map(
(_, i) => ({
id: `${i}`,
messageType: 'add',
timestamp: new Date().toISOString(),
}),
)

messages.map((m) => publisher.publish(m))
await Promise.all(
messages.map((m) => consumer.handlerSpy.waitForMessageWithId(m.id, 'consumed')),
)

// Verifies that each message is executed only once
expect(consumer.addCounter).toBe(messagesAmount)
// Verifies that no message is lost
expect(consumer.processedMessagesIds).toHaveLength(messagesAmount)
})
})

describe('visibility timeout', () => {
const topicName = 'myTestTopic'
const queueName = 'myTestQueue'
Expand Down
4 changes: 4 additions & 0 deletions packages/sns/test/consumers/SnsSqsPermissionConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type SnsSqsPermissionConsumerOptions = Pick<
| 'consumerOverrides'
| 'maxRetryDuration'
| 'payloadStoreConfig'
| 'concurrentConsumersAmount'
> & {
addPreHandlerBarrier?: (
message: SupportedMessages,
Expand All @@ -65,6 +66,7 @@ export class SnsSqsPermissionConsumer extends AbstractSnsSqsConsumer<
public addBarrierCounter = 0
public removeCounter = 0
public preHandlerCounter = 0
public processedMessagesIds: Set<string> = new Set()

constructor(
dependencies: SNSSQSConsumerDependencies,
Expand Down Expand Up @@ -101,6 +103,7 @@ export class SnsSqsPermissionConsumer extends AbstractSnsSqsConsumer<
PERMISSIONS_ADD_MESSAGE_SCHEMA,
(_message, context, _preHandlingOutputs) => {
this.addCounter += context.incrementAmount
this.processedMessagesIds.add(_message.id)
return Promise.resolve({ result: 'success' })
},
{
Expand Down Expand Up @@ -164,6 +167,7 @@ export class SnsSqsPermissionConsumer extends AbstractSnsSqsConsumer<
updateAttributesIfExists: false,
},
maxRetryDuration: options.maxRetryDuration,
concurrentConsumersAmount: options.concurrentConsumersAmount,
},
{
incrementAmount: 1,
Expand Down
52 changes: 34 additions & 18 deletions packages/sqs/lib/sqs/AbstractSqsConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ export type SQSConsumerOptions<
ConsumerOptions,
'sqs' | 'queueUrl' | 'handler' | 'handleMessageBatch' | 'visibilityTimeout'
>
concurrentConsumersAmount?: number
}

export abstract class AbstractSqsConsumer<
Expand Down Expand Up @@ -96,7 +97,8 @@ export abstract class AbstractSqsConsumer<
>
implements QueueConsumer
{
private consumer?: Consumer
private consumers: Consumer[]
private readonly concurrentConsumersAmount: number
private readonly transactionObservabilityManager?: TransactionObservabilityManager
private readonly consumerOptionsOverride: Partial<ConsumerOptions>
private readonly handlerContainer: HandlerContainer<
Expand Down Expand Up @@ -129,7 +131,8 @@ export abstract class AbstractSqsConsumer<
this.deadLetterQueueOptions = options.deadLetterQueue
this.maxRetryDuration = options.maxRetryDuration ?? DEFAULT_MAX_RETRY_DURATION
this.executionContext = executionContext

this.consumers = []
this.concurrentConsumersAmount = options.concurrentConsumersAmount ?? 1
this._messageSchemaContainer = this.resolveConsumerMessageSchemaContainer(options)
this.handlerContainer = new HandlerContainer<
MessagePayloadType,
Expand Down Expand Up @@ -174,14 +177,34 @@ export abstract class AbstractSqsConsumer<

public async start() {
await this.init()
if (this.consumer) this.consumer.stop()
this.stopExistingConsumers()

const visibilityTimeout = await this.getQueueVisibilityTimeout()

this.consumer = Consumer.create({
this.consumers = Array.from({ length: this.concurrentConsumersAmount }).map((_) =>
this.createConsumer({ visibilityTimeout }),
)

for (const consumer of this.consumers) {
consumer.on('error', (err) => {
this.handleError(err, {
queueName: this.queueName,
})
})
consumer.start()
}
}

public override async close(abort?: boolean): Promise<void> {
await super.close()
this.stopExistingConsumers(abort ?? false)
}

private createConsumer(options: { visibilityTimeout: number | undefined }): Consumer {
return Consumer.create({
sqs: this.sqsClient,
queueUrl: this.queueUrl,
visibilityTimeout,
visibilityTimeout: options.visibilityTimeout,
messageAttributeNames: [`${PAYLOAD_OFFLOADING_ATTRIBUTE_PREFIX}*`],
...this.consumerOptionsOverride,
handleMessage: async (message: SQSMessage) => {
Expand Down Expand Up @@ -250,21 +273,14 @@ export abstract class AbstractSqsConsumer<
return Promise.reject(result.error)
},
})

this.consumer.on('error', (err) => {
this.handleError(err, {
queueName: this.queueName,
})
})

this.consumer.start()
}

public override async close(abort?: boolean): Promise<void> {
await super.close()
this.consumer?.stop({
abort: abort ?? false,
})
private stopExistingConsumers(abort?: boolean) {
for (const consumer of this.consumers) {
consumer.stop({
abort,
})
}
}

private async internalProcessMessage(
Expand Down
2 changes: 1 addition & 1 deletion packages/sqs/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@message-queue-toolkit/sqs",
"version": "17.2.0",
"version": "17.3.0",
"private": false,
"license": "MIT",
"description": "SQS adapter for message-queue-toolkit",
Expand Down
61 changes: 61 additions & 0 deletions packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { SINGLETON_CONFIG, registerDependencies } from '../utils/testContext'
import type { Dependencies } from '../utils/testContext'

import { SqsPermissionConsumer } from './SqsPermissionConsumer'
import type { PERMISSIONS_ADD_MESSAGE_TYPE } from './userConsumerSchemas'

describe('SqsPermissionConsumer', () => {
describe('init', () => {
Expand Down Expand Up @@ -596,6 +597,66 @@ describe('SqsPermissionConsumer', () => {
})
})

describe('multiple consumers', () => {
let diContainer: AwilixContainer<Dependencies>
let sqsClient: SQSClient

let publisher: SqsPermissionPublisher
let consumer: SqsPermissionConsumer

beforeEach(async () => {
diContainer = await registerDependencies({
permissionConsumer: asFunction((dependencies) => {
return new SqsPermissionConsumer(dependencies, {
creationConfig: {
queue: {
QueueName: SqsPermissionConsumer.QUEUE_NAME,
},
},
concurrentConsumersAmount: 5,
})
}),
})
sqsClient = diContainer.cradle.sqsClient
publisher = diContainer.cradle.permissionPublisher
consumer = diContainer.cradle.permissionConsumer

await consumer.start()

const command = new ReceiveMessageCommand({
QueueUrl: publisher.queueProps.url,
})
const reply = await sqsClient.send(command)
expect(reply.Messages).toBeUndefined()
})

afterEach(async () => {
await diContainer.cradle.awilixManager.executeDispose()
await diContainer.dispose()
})

it('process all messages properly', async () => {
const messagesAmount = 100
const messages: PERMISSIONS_ADD_MESSAGE_TYPE[] = Array.from({ length: messagesAmount }).map(
(_, i) => ({
id: `${i}`,
messageType: 'add',
timestamp: new Date().toISOString(),
}),
)

messages.map((m) => publisher.publish(m))
await Promise.all(
messages.map((m) => consumer.handlerSpy.waitForMessageWithId(m.id, 'consumed')),
)

// Verifies that each message is executed only once
expect(consumer.addCounter).toBe(messagesAmount)
// Verifies that no message is lost
expect(consumer.processedMessagesIds).toHaveLength(messagesAmount)
})
})

describe('visibility timeout', () => {
const queueName = 'myTestQueue'
let diContainer: AwilixContainer<Dependencies>
Expand Down
Loading

0 comments on commit 2c9bedf

Please sign in to comment.