From 57bd33f6e7369d6f7fb896f9446dde11d7496311 Mon Sep 17 00:00:00 2001 From: Amir Blum Date: Thu, 23 Jul 2020 13:10:29 +0300 Subject: [PATCH] fix: no sleep between batch receive calls --- README.md | 2 +- src/sqs-consumer.ts | 36 ++++++++++++++---------------------- 2 files changed, 15 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index 4e59029..c02aec4 100644 --- a/README.md +++ b/README.md @@ -43,7 +43,7 @@ const snsProducer = SnsProducer.create({ region: 'us-east-1', // to enable sending large payloads (>256KiB) though S3 largePayloadThoughS3: true, - s3Bucket: '...', + s3EndpointUrl: '...', }); await snsProducer.sendJSON({ diff --git a/src/sqs-consumer.ts b/src/sqs-consumer.ts index 4cdb32f..f544378 100644 --- a/src/sqs-consumer.ts +++ b/src/sqs-consumer.ts @@ -46,7 +46,6 @@ export class SqsConsumer { private waitTimeSeconds: number; private started = false; private events = new EventEmitter(); - private pollingInterval = 1000; private connErrorTimeout = 10000; private handleMessage?: (message: SqsMessage) => Promise; private parsePayload?: (payload: any) => any; @@ -101,33 +100,26 @@ export class SqsConsumer { this.events.on(event, handler); } - private poll(): void { - if (!this.started) return; - let currentPollingInterval = this.pollingInterval; - - this.receiveMessages({ - QueueUrl: this.queueUrl, - MaxNumberOfMessages: this.batchSize, - WaitTimeSeconds: this.waitTimeSeconds, - }) - .then((response) => { + private async poll() { + while(this.started) { + try { + const response = await this.receiveMessages({ + QueueUrl: this.queueUrl, + MaxNumberOfMessages: this.batchSize, + WaitTimeSeconds: this.waitTimeSeconds, + }); if (!this.started) return; - return this.handleSqsResponse(response); - }) - .catch((err) => { + await this.handleSqsResponse(response); + } + catch (err) { if (this.isConnError(err)) { this.events.emit(SqsConsumerEvents.connectionError, err); - currentPollingInterval = this.connErrorTimeout; + await new Promise(resolve => setTimeout(resolve, this.connErrorTimeout)); } else { this.events.emit(SqsConsumerEvents.error, err); } - }) - .then(() => { - setTimeout(this.poll.bind(this), currentPollingInterval); - }) - .catch((err) => { - this.events.emit('error', err); - }); + } + } } private isConnError(err: AWSError): Boolean {