diff --git a/src/consumer/index.js b/src/consumer/index.js index 0baea38..b273cd8 100644 --- a/src/consumer/index.js +++ b/src/consumer/index.js @@ -1,23 +1,47 @@ -import fetch from 'node-fetch' import { writeOutput } from '../utils/writeOutput.js' +import { fetchWithTimeout } from '../utils/consumer.js' + +const FETCH_TIMEOUT = 10000; /** - * This script is the handler that performs any manipulation of the message - * before it is uploaded to Amazon S3. It must return the string data that will - * be written to the S3 bucket. + * This script is an example of a handler function that sends an HTTP request to the url + * and uploads the result to Amazon S3. You can change it to anything you want ;) * - * @param {object} message The SQS message body as a JSON object + * @param {object} messages The SQS messages batch as an array with JSON objects * @return {void} */ -export async function handler(message) { - // Make the request - const response = await fetch(message.url) - // Get the response body as JSON - const body = await response.json() +export async function handler(messages) { + try { + // running multiple asynchronous tasks + const responses = await Promise.allSettled( + // mapping messages from producer to return promises array + messages.map(async message => { + const parsedMessage = JSON.parse(message.Body); + const url = parsedMessage.url; + const urlWithProtocol = url.startsWith("http") ? url : `http://${url}`; + + console.log("Handling url: ", urlWithProtocol); + return await fetchWithTimeout(urlWithProtocol, FETCH_TIMEOUT); + }) + ) + + // handling aggregated results + console.log("Responses are back!"); + const okResponses = responses + .filter((res) => { + if (res.status === "fulfilled") { + console.log('Response OK from ', res.value.url); + return res; + } + else { console.error(res.reason) } + }) + .map(res => res.value); - // (TODO) Add your processing logic here... + // upload to s3 bucket + await writeOutput(okResponses); - // Write the output to S3 - await writeOutput(message, body) -} + } catch (error) { + console.error('An error occurred:', error) + } +} \ No newline at end of file diff --git a/src/package-lock.json b/src/package-lock.json index 010a02b..5b9fb59 100644 --- a/src/package-lock.json +++ b/src/package-lock.json @@ -12,7 +12,8 @@ "@aws-sdk/client-s3": "^3.395.0", "@aws-sdk/client-sqs": "^3.395.0", "node-fetch": "^3.3.2", - "sqs-consumer": "^7.2.2" + "sqs-consumer": "^7.2.2", + "uniqid": "^5.4.0" }, "devDependencies": { "@aws-sdk/types": "^3.391.0", @@ -3184,6 +3185,11 @@ "node": ">=4.2.0" } }, + "node_modules/uniqid": { + "version": "5.4.0", + "resolved": "https://registry.npmjs.org/uniqid/-/uniqid-5.4.0.tgz", + "integrity": "sha512-38JRbJ4Fj94VmnC7G/J/5n5SC7Ab46OM5iNtSstB/ko3l1b5g7ALt4qzHFgGciFkyiRNtDXtLNb+VsxtMSE77A==" + }, "node_modules/uri-js": { "version": "4.4.1", "resolved": "https://registry.npmjs.org/uri-js/-/uri-js-4.4.1.tgz", diff --git a/src/package.json b/src/package.json index 3d3b5bc..81a9ab3 100644 --- a/src/package.json +++ b/src/package.json @@ -27,6 +27,7 @@ "@aws-sdk/client-s3": "^3.395.0", "@aws-sdk/client-sqs": "^3.395.0", "node-fetch": "^3.3.2", - "sqs-consumer": "^7.2.2" + "sqs-consumer": "^7.2.2", + "uniqid": "^5.4.0" } } diff --git a/src/producer/index.js b/src/producer/index.js index 3780f5c..24944bb 100644 --- a/src/producer/index.js +++ b/src/producer/index.js @@ -1,5 +1,6 @@ import { sendMessage } from '../utils/sendMessage.js' import data from './urls.json' assert { type: 'json' } +import uniqid from 'uniqid'; /** * Producer script for AWS Lambda functions. @@ -9,12 +10,24 @@ import data from './urls.json' assert { type: 'json' } * @return {Object} The response object */ export async function handler(event, context) { - // Send the urls to the SQS queue - for (const url of data) { - await sendMessage({ url: url }) + // Sends 10 urls batches to the SQS queue for quicker performance + let msgCounter = 0; + for (let i = 0; i < data.length; i += 10) { + const dataBatch = data.slice(i, i + 10); + const urlsBatch = []; + for (let j = 0; j < dataBatch.length; j++) { + urlsBatch.push({ + MessageBody: JSON.stringify({url: dataBatch[j]}), + Id: uniqid() + }) + } + + await sendMessage(urlsBatch) + msgCounter++; } // Return success response + console.log(`Request complete! Sent ${msgCounter} message batches for ${data.length} urls.`) return { statusCode: 200, body: JSON.stringify('Request complete!') diff --git a/src/utils/consumer.js b/src/utils/consumer.js index 24296d0..044d2bd 100644 --- a/src/utils/consumer.js +++ b/src/utils/consumer.js @@ -1,25 +1,16 @@ import { Consumer } from 'sqs-consumer' import { SQSClient } from '@aws-sdk/client-sqs' import { handler } from '../consumer/index.js' +import fetch from 'node-fetch' /** * Consumer app for Amazon EC2 Spot Instances. */ const app = Consumer.create({ queueUrl: process.env.SQS_QUEUE_URL, - batchSize: parseInt(process.env.SQS_BATCH_SIZE, 1), + batchSize: Math.min(parseInt(process.env.SQS_BATCH_SIZE), 10), handleMessageBatch: async (messages) => { - const processed = [] - - for (const message of messages) { - // Call the user-defined handler - await handler(JSON.parse(message.Body)) - - // Add the message to the processed list - processed.push(message) - } - - return processed + await handler(messages); }, sqs: new SQSClient({ region: process.env.AWS_REGION @@ -35,3 +26,13 @@ app.on('processing_error', (err) => { }) app.start() + +export async function fetchWithTimeout(url, timeout) { + const controller = new AbortController(); + const timeoutId = setTimeout(() => controller.abort(), timeout); + + const response = await fetch(url, { signal: controller.signal }); + clearTimeout(timeoutId); + + return response; +} diff --git a/src/utils/sendMessage.js b/src/utils/sendMessage.js index d552f79..bb5b170 100644 --- a/src/utils/sendMessage.js +++ b/src/utils/sendMessage.js @@ -1,21 +1,21 @@ -import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs' +import { SQSClient, SendMessageBatchCommand } from '@aws-sdk/client-sqs' /** * Sends a message to the SQS queue `count` times. * - * @param {object} message The message to send + * @param {object} messageBatch The message batch to send * @return {Promise} */ -export async function sendMessage(message) { +export async function sendMessage(messageBatch) { // Build the SQS client const client = new SQSClient({ region: process.env.AWS_REGION }) const sqsResponse = await client.send( - new SendMessageCommand({ + new SendMessageBatchCommand({ QueueUrl: process.env.MESSAGE_QUEUE_URL, - MessageBody: JSON.stringify(message) + Entries: messageBatch }) ) diff --git a/src/utils/writeOutput.js b/src/utils/writeOutput.js index 317a375..583ca96 100644 --- a/src/utils/writeOutput.js +++ b/src/utils/writeOutput.js @@ -1,26 +1,39 @@ import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3' +import uniqid from 'uniqid'; /** * Writes output to Amazon S3 * - * @param {import('@aws-sdk/types').Message} message The SQS message - * @param {object} output The output to write + * @param {object[]} outputs The output to write * @return {void} */ -export async function writeOutput(message, output) { +export async function writeOutput(outputs) { // Build the S3 client const client = new S3Client({ region: process.env.AWS_REGION }) - // Write to S3 - const s3Response = await client.send( - new PutObjectCommand({ - Body: JSON.stringify(output), - Bucket: process.env.S3_BUCKET_ARN.split(':::')[1], - Key: `${message.MessageId}.json` + // Write to S3 + const s3Responses = await Promise.allSettled( + outputs.map(async output => { + return await client.send( + new PutObjectCommand({ + Body: JSON.stringify(output.body), + Bucket: process.env.S3_BUCKET_ARN.split(':::')[1], + // Key: `${urlToFileString(output.url)}.json` + Key: `${uniqid()}.json` + }) + ) }) ) - - console.log(`S3 Response: ${JSON.stringify(s3Response)}`) + s3Responses.map((res) => { + if (res.status === "fulfilled") { + console.log(`S3 Response: `, res.value); + } + else { console.error(res.reason) } + }); } + +function urlToFileString(url) { + return new URL(url).host.replace(/\./g, '_'); +} \ No newline at end of file