Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
Merge pull request #5 from blumamir/no-sleep
Browse files Browse the repository at this point in the history
fix: no sleep between batch receive calls
  • Loading branch information
Amir Blum authored Jul 23, 2020
2 parents 4eb3aa1 + 57bd33f commit 4dec99c
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 23 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const snsProducer = SnsProducer.create({
region: 'us-east-1',
// to enable sending large payloads (>256KiB) though S3
largePayloadThoughS3: true,
s3Bucket: '...',
s3EndpointUrl: '...',
});

await snsProducer.sendJSON({
Expand Down
36 changes: 14 additions & 22 deletions src/sqs-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ export class SqsConsumer {
private waitTimeSeconds: number;
private started = false;
private events = new EventEmitter();
private pollingInterval = 1000;
private connErrorTimeout = 10000;
private handleMessage?: (message: SqsMessage) => Promise<void>;
private parsePayload?: (payload: any) => any;
Expand Down Expand Up @@ -101,33 +100,26 @@ export class SqsConsumer {
this.events.on(event, handler);
}

private poll(): void {
if (!this.started) return;
let currentPollingInterval = this.pollingInterval;

this.receiveMessages({
QueueUrl: this.queueUrl,
MaxNumberOfMessages: this.batchSize,
WaitTimeSeconds: this.waitTimeSeconds,
})
.then((response) => {
private async poll() {
while(this.started) {
try {
const response = await this.receiveMessages({
QueueUrl: this.queueUrl,
MaxNumberOfMessages: this.batchSize,
WaitTimeSeconds: this.waitTimeSeconds,
});
if (!this.started) return;
return this.handleSqsResponse(response);
})
.catch((err) => {
await this.handleSqsResponse(response);
}
catch (err) {
if (this.isConnError(err)) {
this.events.emit(SqsConsumerEvents.connectionError, err);
currentPollingInterval = this.connErrorTimeout;
await new Promise(resolve => setTimeout(resolve, this.connErrorTimeout));
} else {
this.events.emit(SqsConsumerEvents.error, err);
}
})
.then(() => {
setTimeout(this.poll.bind(this), currentPollingInterval);
})
.catch((err) => {
this.events.emit('error', err);
});
}
}
}

private isConnError(err: AWSError): Boolean {
Expand Down

0 comments on commit 4dec99c

Please sign in to comment.