Skip to content

Commit

Permalink
Address code review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
kibertoad committed Sep 27, 2023
1 parent b84477d commit c3d19fc
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 50 deletions.
32 changes: 32 additions & 0 deletions packages/amqp/lib/AbstractAmqpBasePublisher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import type { Either } from '@lokalise/node-core'
import type { MessageInvalidFormatError, MessageValidationError } from '@message-queue-toolkit/core'
import { objectToBuffer } from '@message-queue-toolkit/core'

import { AbstractAmqpService } from './AbstractAmqpService'

export abstract class AbstractAmqpBasePublisher<
MessagePayloadType extends object,
> extends AbstractAmqpService<MessagePayloadType> {
protected sendToQueue(message: MessagePayloadType): void {
try {
this.channel.sendToQueue(this.queueName, objectToBuffer(message))
} catch (err) {
// Unfortunately, reliable retry mechanism can't be implemented with try-catch block,
// as not all failures end up here. If connection is closed programmatically, it works fine,
// but if server closes connection unexpectedly (e. g. RabbitMQ is shut down), then we don't land here
// @ts-ignore
if (err.message === 'Channel closed') {
this.logger.error(`AMQP channel closed`)
void this.reconnect()
} else {
throw err
}
}
}

/* c8 ignore start */
protected resolveMessage(): Either<MessageInvalidFormatError | MessageValidationError, unknown> {
throw new Error('Not implemented for publisher')
}
/* c8 ignore stop */
}
27 changes: 3 additions & 24 deletions packages/amqp/lib/AbstractAmqpPublisherMonoSchema.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
import type { Either } from '@lokalise/node-core'
import type {
ExistingQueueOptions,
MessageInvalidFormatError,
MessageValidationError,
MonoSchemaQueueOptions,
NewQueueOptions,
SyncPublisher,
} from '@message-queue-toolkit/core'
import { objectToBuffer } from '@message-queue-toolkit/core'
import type { ZodSchema } from 'zod'

import type { AMQPLocatorType } from './AbstractAmqpBaseConsumer'
import { AbstractAmqpService } from './AbstractAmqpService'
import { AbstractAmqpBasePublisher } from './AbstractAmqpBasePublisher'
import type { AMQPDependencies, CreateAMQPQueueOptions } from './AbstractAmqpService'

export abstract class AbstractAmqpPublisherMonoSchema<MessagePayloadType extends object>
extends AbstractAmqpService<MessagePayloadType>
extends AbstractAmqpBasePublisher<MessagePayloadType>
implements SyncPublisher<MessagePayloadType>
{
private readonly messageSchema: ZodSchema<MessagePayloadType>
Expand All @@ -39,30 +36,12 @@ export abstract class AbstractAmqpPublisherMonoSchema<MessagePayloadType extends
this.logMessage(resolvedLogMessage)
}

try {
this.channel.sendToQueue(this.queueName, objectToBuffer(message))
} catch (err) {
// Unfortunately, reliable retry mechanism can't be implemented with try-catch block,
// as not all failures end up here. If connection is closed programmatically, it works fine,
// but if server closes connection unexpectedly (e. g. RabbitMQ is shut down), then we don't land here
// @ts-ignore
if (err.message === 'Channel closed') {
this.logger.error(`AMQP channel closed`)
void this.reconnect()
} else {
throw err
}
}
this.sendToQueue(message)
}

/* c8 ignore start */
protected resolveMessage(): Either<MessageInvalidFormatError | MessageValidationError, unknown> {
throw new Error('Not implemented for publisher')
}

protected override resolveSchema(): Either<Error, ZodSchema<MessagePayloadType>> {
throw new Error('Not implemented for publisher')
}

/* c8 ignore stop */
}
30 changes: 4 additions & 26 deletions packages/amqp/lib/AbstractAmqpPublisherMultiSchema.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
import type { Either } from '@lokalise/node-core'
import type {
ExistingQueueOptions,
MessageInvalidFormatError,
MessageValidationError,
NewQueueOptions,
SyncPublisher,
MultiSchemaPublisherOptions,
} from '@message-queue-toolkit/core'
import { MessageSchemaContainer, objectToBuffer } from '@message-queue-toolkit/core'
import { MessageSchemaContainer } from '@message-queue-toolkit/core'
import type { ZodSchema } from 'zod'

import type { AMQPLocatorType } from './AbstractAmqpBaseConsumer'
import { AbstractAmqpService } from './AbstractAmqpService'
import { AbstractAmqpBasePublisher } from './AbstractAmqpBasePublisher'
import type { AMQPDependencies, CreateAMQPQueueOptions } from './AbstractAmqpService'

export abstract class AbstractAmqpPublisherMultiSchema<MessagePayloadType extends object>
extends AbstractAmqpService<MessagePayloadType>
extends AbstractAmqpBasePublisher<MessagePayloadType>
implements SyncPublisher<MessagePayloadType>
{
private readonly messageSchemaContainer: MessageSchemaContainer<MessagePayloadType>
Expand Down Expand Up @@ -47,29 +45,9 @@ export abstract class AbstractAmqpPublisherMultiSchema<MessagePayloadType extend
this.logMessage(resolvedLogMessage)
}

try {
this.channel.sendToQueue(this.queueName, objectToBuffer(message))
} catch (err) {
// Unfortunately, reliable retry mechanism can't be implemented with try-catch block,
// as not all failures end up here. If connection is closed programmatically, it works fine,
// but if server closes connection unexpectedly (e. g. RabbitMQ is shut down), then we don't land here
// @ts-ignore
if (err.message === 'Channel closed') {
this.logger.error(`AMQP channel closed`)
void this.reconnect()
} else {
throw err
}
}
this.sendToQueue(message)
}

/* c8 ignore start */
protected resolveMessage(): Either<MessageInvalidFormatError | MessageValidationError, unknown> {
throw new Error('Not implemented for publisher')
}

/* c8 ignore stop */

protected override resolveSchema(
message: MessagePayloadType,
): Either<Error, ZodSchema<MessagePayloadType>> {
Expand Down

0 comments on commit c3d19fc

Please sign in to comment.