Skip to content

Commit

Permalink
Fixes for consumer and writer
Browse files Browse the repository at this point in the history
  • Loading branch information
agamm committed Oct 5, 2023
1 parent a030bf8 commit 910aa04
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 18 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,6 @@ consumer.zip
producer.zip
output.txt
output/
logs/

*.csv
17 changes: 10 additions & 7 deletions src/utils/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const app = Consumer.create({
queueUrl: process.env.SQS_QUEUE_URL,
batchSize: Math.min(parseInt(process.env.SQS_BATCH_SIZE), 10),
handleMessageBatch: async (messages) => {
await handler(messages);
await handler(messages)
},
sqs: new SQSClient({
region: process.env.AWS_REGION
Expand All @@ -27,12 +27,15 @@ app.on('processing_error', (err) => {

app.start()

export async function fetchWithTimeout(url, timeout) {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeout);
export async function fetchWithTimeout(url, timeout, fetchOptions) {
const controller = new AbortController()
const timeoutId = setTimeout(() => controller.abort(), timeout)

const response = await fetch(url, { signal: controller.signal });
clearTimeout(timeoutId);
const response = await fetch(url, {
signal: controller.signal,
...fetchOptions
})
clearTimeout(timeoutId)

return response;
return response
}
22 changes: 11 additions & 11 deletions src/utils/writeOutput.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3'
import uniqid from 'uniqid';
import uniqid from 'uniqid'

/**
* Writes output to Amazon S3
Expand All @@ -13,27 +13,27 @@ export async function writeOutput(outputs) {
region: process.env.AWS_REGION
})

// Write to S3
// Write to S3
const s3Responses = await Promise.allSettled(
outputs.map(async output => {
outputs.map(async (output) => {
return await client.send(
new PutObjectCommand({
Body: JSON.stringify(output.body),
Body: JSON.stringify(output),
Bucket: process.env.S3_BUCKET_ARN.split(':::')[1],
// Key: `${urlToFileString(output.url)}.json`
Key: `${uniqid()}.json`
})
)
})
)
s3Responses.map((res) => {
if (res.status === "fulfilled") {
console.log(`S3 Response: `, res.value);
if (res.status === 'fulfilled') {
console.log(`S3 Response: `, res.value)
} else {
console.error(res.reason)
}
else { console.error(res.reason) }
});
})
}

function urlToFileString(url) {
return new URL(url).host.replace(/\./g, '_');
}
return new URL(url).host.replace(/\./g, '_')
}

0 comments on commit 910aa04

Please sign in to comment.