Skip to content

Commit

Permalink
Mono publisher and consumer removal (#112)
Browse files Browse the repository at this point in the history
* Removing sqs mon publisher/consumer

* core preparation for multi publishers only

* SQS Abstract consumer

* SQS publisher rename

* SQS test fixes

* Unused code removed

* Fix lint

* sqs Index fix + lint fix

* Removing SNS mono consumer and publisher

* Minor fix on sqs

* SNS Fixing types

* SNS renaming

* SNS test fixes

* amqp service types fix

* amqp single publisher

* amqp single consumer

* removing unused tests

* AMQP tests fixes

* amqp index fix

* Minor changes

* Solving TODO

* Minor change to not allow changing internal properties

* SQS improving coverage

* Adding tests

* Lint fix

* Removing unused test class

* SQS making props protected

* SNS marking some props as protected

* Minor changes

* AMQP improving tests coverage

* Improving AMQP test coverage

* Fix test

* Trying to fix test

* readme updated

* Major version

* Minor change

* upgrading md

* Lint fixes + solving todo + build fix

* CR comments
  • Loading branch information
CarlosGamero authored Apr 1, 2024
1 parent a47c67c commit a176d75
Show file tree
Hide file tree
Showing 74 changed files with 1,756 additions and 3,618 deletions.
79 changes: 17 additions & 62 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,7 @@ It consists of the following submodules:
### Publishers

`message-queue-toolkit` provides base classes for implementing publishers for each of the supported protocol.

#### Mono-schema publishers

Mono-schema publishers only support a single message type and are simpler to implement. They expose the following public methods:

* `constructor()`, which accepts the following parameters:
* `dependencies` – a set of dependencies depending on the protocol;
* `options`, composed by
* `messageSchema` – the `zod` schema for the message;
* `messageTypeField` - which field in the message describes the type of a message. This field needs to be defined as `z.literal` in the schema;
* `locatorConfig` - configuration for resolving existing queue and/or topic. Should not be specified together with the `creationConfig`.
* `creationConfig` - configuration for queue and/or topic to create, if one does not exist. Should not be specified together with the `locatorConfig`.
* `init()`, prepare publisher for use (e. g. establish all necessary connections), it will be called automatically by `publish()` if not called before explicitly (lazy loading).
* `close()`, stop publisher use (e. g. disconnect);
* `publish()`, send a message to a queue or topic. It accepts the following parameters:
* `message` – a message following a `zod` schema;
* `options` – a protocol-dependent set of message parameters. For more information please check documentation for options for each protocol: [AMQP](https://amqp-node.github.io/amqplib/channel_api.html#channel_sendToQueue), [SQS](https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/clients/client-sqs/interfaces/sendmessagecommandinput.html) and [SNS](https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/clients/client-sns/interfaces/publishcommandinput.html).

> **_NOTE:_** See [SqsPermissionPublisherMonoSchema.ts](./packages/sqs/test/publishers/SqsPermissionPublisherMonoSchema.ts) for a practical example.
> **_NOTE:_** Lazy loading is not supported for AMQP publishers.
#### Multi-schema publishers

Multi-schema publishers support multiple messages types. They implement the following public methods:
They implement the following public methods:

* `constructor()`, which accepts the following parameters:
* `dependencies` – a set of dependencies depending on the protocol;
Expand All @@ -61,35 +37,14 @@ Multi-schema publishers support multiple messages types. They implement the foll
* `message` – a message following one of the `zod` schemas, supported by the publisher;
* `options` – a protocol-dependent set of message parameters. For more information please check documentation for options for each protocol: [AMQP](https://amqp-node.github.io/amqplib/channel_api.html#channel_sendToQueue), [SQS](https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/clients/client-sqs/interfaces/sendmessagecommandinput.html) and [SNS](https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/clients/client-sns/interfaces/publishcommandinput.html).

> **_NOTE:_** See [SqsPermissionPublisher.ts](./packages/sqs/test/publishers/SqsPermissionPublisher.ts) for a practical example.
> **_NOTE:_** Lazy loading is not supported for AMQP publishers.

### Consumers

`message-queue-toolkit` provides base classes for implementing consumers for each of the supported protocol.

#### Mono-schema consumers

Mono-schema consumers only support a single message type and are simpler to implement. They expose the following public methods:

* `constructor()`, which accepts the following parameters:
* `dependencies` – a set of dependencies depending on the protocol;
* `options`, composed by
* `messageSchema` – the `zod` schema for the message;
* `messageTypeField` - which field in the message is used for resolving the message type (for observability purposes);
* `queueName`; (for SNS publishers this is a misnomer which actually refers to a topic name)
* `locatorConfig` - configuration for resolving existing queue and/or topic. Should not be specified together with the `creationConfig`.
* `creationConfig` - configuration for queue and/or topic to create, if one does not exist. Should not be specified together with the `locatorConfig`.
* `subscriptionConfig` - SNS SQS consumer only - configuration for SNS -> SQS subscription to create, if one doesn't exist.
* `consumerOverrides` – available only for SQS consumers;
* `subscribedToTopic` – parameters for a topic to use during creation if it does not exist. Ignored if `queueLocator.subscriptionArn` is set. Available only for SNS consumers;
* `init()`, prepare consumer for use (e. g. establish all necessary connections);
* `close()`, stop listening for messages and disconnect;
* `processMessage()`, which accepts as parameter a `message` following a `zod` schema and should be overridden with logic on what to do with the message;
* `start()`, which invokes `init()` and `processMessage()` and handles errors.
* `preHandlerBarrier`, which accepts as a parameter a `message` following a `zod` schema and can be overridden to enable the barrier pattern (see [Barrier pattern](#barrier-pattern))

> **_NOTE:_** See [SqsPermissionConsumerMonoSchema.ts](./packages/sqs/test/consumers/SqsPermissionConsumerMonoSchema.ts) for a practical example.
#### Multi-schema consumers
They expose the following public methods:

Multi-schema consumers support multiple message types via handler configs. They expose the following public methods:

Expand All @@ -106,17 +61,18 @@ Multi-schema consumers support multiple message types via handler configs. They
* `subscribedToTopic` – parameters for a topic to use during creation if it does not exist. Ignored if `queueLocator.subscriptionArn` is set. Available only for SNS consumers;
* `init()`, prepare consumer for use (e. g. establish all necessary connections);
* `close()`, stop listening for messages and disconnect;
* `start()`, which invokes `init()`.

* `processMessage()`, which accepts as parameter a `message` following a `zod` schema and should be overridden with logic on what to do with the message;
* `start()`, which invokes `init()` and `processMessage()` and handles errors.
> **_NOTE:_** See [SqsPermissionConsumer.ts](./packages/sqs/test/consumers/SqsPermissionConsumer.ts) for a practical example.
##### Multi-schema handler definition

##### How to define a handler

You can define handlers for each of the supported messages in a type-safe way using the MessageHandlerConfigBuilder.

Here is an example:

```ts
```typescript
type SupportedMessages = PERMISSIONS_ADD_MESSAGE_TYPE | PERMISSIONS_REMOVE_MESSAGE_TYPE
type ExecutionContext = {
userService: UserService
Expand Down Expand Up @@ -176,7 +132,7 @@ export class TestConsumerMultiSchema extends AbstractSqsConsumerMultiSchema<

#### Error Handling

When implementing message handler in consumer (by overriding the `processMessage()` method), you are expected to return an instance of `Either`, containing either an error `retryLater`, or result `success`. In case of `retryLater`, the abstract consumer is instructed to requeue the message. Otherwise, in case of success, the message is finally removed from the queue. If an error is thrown while processing the message, the abstract consumer will also requeue the message. When overriding the `processMessage()` method, you should leverage the possible types to process the message as you need.
When implementing a handler, you are expected to return an instance of `Either`, containing either an error `retryLater`, or result `success`. In case of `retryLater`, the abstract consumer is instructed to requeue the message. Otherwise, in case of success, the message is finally removed from the queue. If an error is thrown while processing the message, the abstract consumer will also requeue the message.

#### Schema Validation and Deserialization

Expand All @@ -189,23 +145,22 @@ If

Then the message is automatically nacked without requeueing by the abstract consumer and processing fails.

> **_NOTE:_** See [userConsumerSchemas.ts](./packages/sqs/test/consumers/userConsumerSchemas.ts) and [SqsPermissionsConsumerMonoSchema.spec.ts](./packages/sqs/test/consumers/SqsPermissionsConsumerMonoSchema.spec.ts) for a practical example.
> **_NOTE:_** See [userConsumerSchemas.ts](./packages/sqs/test/consumers/userConsumerSchemas.ts) and [SqsPermissionsConsumer.spec.ts](./packages/sqs/test/consumers/SqsPermissionsConsumer.spec.ts) for a practical example.
### Barrier pattern
The barrier pattern facilitates the out-of-order message handling by retrying the message later if the system is not yet in the proper state to be able to process that message (e. g. some prerequisite messages have not yet arrived).

To enable this pattern you should implement `preHandlerBarrier` in order to define the conditions for starting to process the message.
To enable this pattern you should define `preHandlerBarrier` on your message handler in order to define the conditions for starting to process the message.
If the barrier method returns `false`, message will be returned into the queue for the later processing. If the barrier method returns `true`, message will be processed.

> **_NOTE:_** See [SqsPermissionConsumerMonoSchema.ts](./packages/sns/test/consumers/SnsSqsPermissionConsumerMonoSchema.ts) for a practical example on mono consumers.
> **_NOTE:_** See [SqsPermissionConsumerMultiSchema.ts](./packages/sns/test/consumers/SnsSqsPermissionConsumerMultiSchema.ts) for a practical example on multi consumers.
> **_NOTE:_** See [SqsPermissionConsumer.ts](./packages/sns/test/consumers/SnsSqsPermissionConsumer.ts) for a practical example.

## Fan-out to Multiple Consumers

SQS queues are built in a way that every message is only consumed once, and then deleted. If you want to do fan-out to multiple consumers, you need SNS topic in the middle, which is then propagated to all the SQS queues that have subscribed.

> **_NOTE:_** See [SnsPermissionPublisher.ts](./packages/sns/test/publishers/SnsPermissionPublisherMonoSchema.ts) and [SnsSqsPermissionConsumerMonoSchema.ts](./packages/sns/test/consumers/SnsSqsPermissionConsumerMonoSchema.ts) for a practical example.
> **_NOTE:_** See [SnsPermissionPublisher.ts](./packages/sns/test/publishers/SnsPermissionPublisher.ts) and [SnsSqsPermissionConsumerMonoSchema.ts](./packages/sns/test/consumers/SnsSqsPermissionConsumer.ts) for a practical example.
## Automatic Queue and Topic Creation

Expand All @@ -220,7 +175,7 @@ In certain cases you want to await until certain publisher publishes a message,
In order to enable this functionality, configure spyHandler on the publisher or consumer:

```ts
export class TestConsumerMultiSchema extends AbstractSqsConsumerMultiSchema<
export class TestConsumerMultiSchema extends AbstractSqsConsumer<
SupportedMessages,
ExecutionContext
> {
Expand All @@ -236,7 +191,7 @@ export class TestConsumerMultiSchema extends AbstractSqsConsumerMultiSchema<
bufferSize: 100, // how many processed messages should be retained in memory for spy lookup. Default is 100
messageIdField: 'id', // which field within a message payload uniquely identifies it. Default is `id`
},
}
})
}
}
```
Expand Down
146 changes: 146 additions & 0 deletions UPGRADING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# Upgrading Guide

We have introduced the following breaking changes on version `12.0.0`, please follow the steps below to update your code
from the previous version to the new one.

## Breaking Changes

### Description of Breaking Change
Multi consumers and publishers can accomplish the same tasks as mono ones, but they add extra layer of complexity by
requiring features to be implemented in both.
As a result, we have decided to remove the mono ones to enhance maintainability.

### Migration Steps
#### Multi consumers and publishers
If you are using the multi consumer or consumer, you will only need to rename the class you are extending, and it should
work as before.
- `AbstractSqsMultiConsumer` -> `AbstractSqsConsumer`
- `AbstractSqsMultiPublisher` -> `AbstractSqsPublisher`

#### Mono consumers and publishers
If you are using the mono consumer or publisher, they no longer exist, so you will need to adjust your code to use
the old named multi consumer or publisher (now called just consumer or publisher). Please check the guide below.

##### Publisher
1. Rename the class you are extending from `AbstractSqsPublisherMonoSchema` to `AbstractSqsPublisherSchema`.
2. replace the `messageSchema` property with `messageSchemas`, it is an array of `zod` schemas.
```typescript
// Old code
export class MyPublisher extends AbstractSqsPublisherMonoSchema<MyType> {
public static QUEUE_NAME = 'my-queue-name'

constructor(dependencies: SQSDependencies) {
super(dependencies, {
creationConfig: {
queue: {
QueueName: SqsPermissionPublisherMonoSchema.QUEUE_NAME,
},
},
handlerSpy: true,
deletionConfig: {
deleteIfExists: false,
},
logMessages: true,
messageSchema: MY_MESSAGE_SCHEMA,
messageTypeField: 'messageType',
})
}
}

// Updated code
export class MyPublisher extends AbstractSqsPublisher<MyType> {
public static QUEUE_NAME = 'my-queue-name'

constructor(dependencies: SQSDependencies) {
super(dependencies, {
creationConfig: {
queue: {
QueueName: SqsPermissionPublisherMonoSchema.QUEUE_NAME,
},
},
handlerSpy: true,
deletionConfig: {
deleteIfExists: false,
},
logMessages: true,
messageSchemas: [MY_MESSAGE_SCHEMA],
messageTypeField: 'messageType',
})
}
}
```

##### Consumer
1. Rename the class you are extending from `AbstractSqsConsumerMonoSchema` to `AbstractSqsConsumer`.
2. Remove the `messageSchema` property.
3. Define a handler (`handlers` property) for your message, specifying the `zod` schema (old `messageSchema`) and the
method to handle the message (old `processMessage` method)
```typescript
// Old code
export class MyConsumer extends AbstractAmqpConsumerMonoSchema<MyType> {
public static QUEUE_NAME = 'my-queue-name'

constructor(dependencies: AMQPConsumerDependencies) {
super(dependencies, {
creationConfig: {
queueName: AmqpPermissionConsumer.QUEUE_NAME,
queueOptions: {
durable: true,
autoDelete: false,
},
},
deletionConfig: {
deleteIfExists: true,
},
messageSchema: MY_MESSAGE_SCHEMA,
messageTypeField: 'messageType',
})
}

override async processMessage(
message: MyType,
): Promise<Either<'retryLater', 'success'>> {
// Your handling code
return { result: 'success' }
}
}

// Updated code
export class MyConsumer extends AbstractAmqpConsumer<MyType, undefined> {
public static QUEUE_NAME = 'my-queue-name'

constructor(dependencies: AMQPConsumerDependencies) {
super(
dependencies,
{
creationConfig: {
queueName: AmqpPermissionConsumer.QUEUE_NAME,
queueOptions: {
durable: true,
autoDelete: false,
},
},
deletionConfig: {
deleteIfExists: true,
},
messageTypeField: 'messageType',
handlers: new MessageHandlerConfigBuilder<SupportedEvents, ExecutionContext>()
.addConfig(
MY_MESSAGE_SCHEMA,
async (message) => {
// Your handling code
return {
result: 'success',
}
},
)
.build(),
},
undefined
)
}
}
```
> **_NOTE:_** on this example code we are omitting the barrier pattern (`preHandlerBarrier`) and pre handlers (`preHandlers`)
to simplify the example. If you are using them, please check [SqsPermissionConsumer.ts](./packages/sqs/test/consumers/SqsPermissionConsumer.ts)
to see how to update your code.
8 changes: 2 additions & 6 deletions packages/amqp/index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
export type { CommonMessage } from './lib/types/MessageTypes'

export type { AMQPQueueConfig } from './lib/AbstractAmqpService'

export { AbstractAmqpConsumerMonoSchema } from './lib/AbstractAmqpConsumerMonoSchema'
export { AbstractAmqpConsumerMultiSchema } from './lib/AbstractAmqpConsumerMultiSchema'
export { AbstractAmqpConsumer, AMQPConsumerOptions } from './lib/AbstractAmqpConsumer'
export { AmqpConsumerErrorResolver } from './lib/errors/AmqpConsumerErrorResolver'

export { AbstractAmqpPublisherMonoSchema } from './lib/AbstractAmqpPublisherMonoSchema'
export { AbstractAmqpPublisherMultiSchema } from './lib/AbstractAmqpPublisherMultiSchema'
export { AbstractAmqpPublisher, AMQPPublisherOptions } from './lib/AbstractAmqpPublisher'

export type { AmqpConfig } from './lib/amqpConnectionResolver'

Expand Down
53 changes: 0 additions & 53 deletions packages/amqp/lib/AbstractAmqpBasePublisher.ts

This file was deleted.

Loading

0 comments on commit a176d75

Please sign in to comment.