Skip to content

Commit

Permalink
SQS polling, fetching, and uploading to S3 are done in batch (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
chrome99 authored Sep 19, 2023
1 parent 76ae5e0 commit d3948a2
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 47 deletions.
52 changes: 38 additions & 14 deletions src/consumer/index.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,47 @@
import fetch from 'node-fetch'
import { writeOutput } from '../utils/writeOutput.js'
import { fetchWithTimeout } from '../utils/consumer.js'

const FETCH_TIMEOUT = 10000;

/**
* This script is the handler that performs any manipulation of the message
* before it is uploaded to Amazon S3. It must return the string data that will
* be written to the S3 bucket.
* This script is an example of a handler function that sends an HTTP request to the url
* and uploads the result to Amazon S3. You can change it to anything you want ;)
*
* @param {object} message The SQS message body as a JSON object
* @param {object} messages The SQS messages batch as an array with JSON objects
* @return {void}
*/
export async function handler(message) {
// Make the request
const response = await fetch(message.url)

// Get the response body as JSON
const body = await response.json()
export async function handler(messages) {
try {
// running multiple asynchronous tasks
const responses = await Promise.allSettled(
// mapping messages from producer to return promises array
messages.map(async message => {
const parsedMessage = JSON.parse(message.Body);
const url = parsedMessage.url;
const urlWithProtocol = url.startsWith("http") ? url : `http://${url}`;

console.log("Handling url: ", urlWithProtocol);
return await fetchWithTimeout(urlWithProtocol, FETCH_TIMEOUT);
})
)

// handling aggregated results
console.log("Responses are back!");
const okResponses = responses
.filter((res) => {
if (res.status === "fulfilled") {
console.log('Response OK from ', res.value.url);
return res;
}
else { console.error(res.reason) }
})
.map(res => res.value);

// (TODO) Add your processing logic here...
// upload to s3 bucket
await writeOutput(okResponses);

// Write the output to S3
await writeOutput(message, body)
}
} catch (error) {
console.error('An error occurred:', error)
}
}
8 changes: 7 additions & 1 deletion src/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion src/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"@aws-sdk/client-s3": "^3.395.0",
"@aws-sdk/client-sqs": "^3.395.0",
"node-fetch": "^3.3.2",
"sqs-consumer": "^7.2.2"
"sqs-consumer": "^7.2.2",
"uniqid": "^5.4.0"
}
}
19 changes: 16 additions & 3 deletions src/producer/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { sendMessage } from '../utils/sendMessage.js'
import data from './urls.json' assert { type: 'json' }
import uniqid from 'uniqid';

/**
* Producer script for AWS Lambda functions.
Expand All @@ -9,12 +10,24 @@ import data from './urls.json' assert { type: 'json' }
* @return {Object} The response object
*/
export async function handler(event, context) {
// Send the urls to the SQS queue
for (const url of data) {
await sendMessage({ url: url })
// Sends 10 urls batches to the SQS queue for quicker performance
let msgCounter = 0;
for (let i = 0; i < data.length; i += 10) {
const dataBatch = data.slice(i, i + 10);
const urlsBatch = [];
for (let j = 0; j < dataBatch.length; j++) {
urlsBatch.push({
MessageBody: JSON.stringify({url: dataBatch[j]}),
Id: uniqid()
})
}

await sendMessage(urlsBatch)
msgCounter++;
}

// Return success response
console.log(`Request complete! Sent ${msgCounter} message batches for ${data.length} urls.`)
return {
statusCode: 200,
body: JSON.stringify('Request complete!')
Expand Down
25 changes: 13 additions & 12 deletions src/utils/consumer.js
Original file line number Diff line number Diff line change
@@ -1,25 +1,16 @@
import { Consumer } from 'sqs-consumer'
import { SQSClient } from '@aws-sdk/client-sqs'
import { handler } from '../consumer/index.js'
import fetch from 'node-fetch'

/**
* Consumer app for Amazon EC2 Spot Instances.
*/
const app = Consumer.create({
queueUrl: process.env.SQS_QUEUE_URL,
batchSize: parseInt(process.env.SQS_BATCH_SIZE, 1),
batchSize: Math.min(parseInt(process.env.SQS_BATCH_SIZE), 10),
handleMessageBatch: async (messages) => {
const processed = []

for (const message of messages) {
// Call the user-defined handler
await handler(JSON.parse(message.Body))

// Add the message to the processed list
processed.push(message)
}

return processed
await handler(messages);
},
sqs: new SQSClient({
region: process.env.AWS_REGION
Expand All @@ -35,3 +26,13 @@ app.on('processing_error', (err) => {
})

app.start()

export async function fetchWithTimeout(url, timeout) {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeout);

const response = await fetch(url, { signal: controller.signal });
clearTimeout(timeoutId);

return response;
}
10 changes: 5 additions & 5 deletions src/utils/sendMessage.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs'
import { SQSClient, SendMessageBatchCommand } from '@aws-sdk/client-sqs'

/**
* Sends a message to the SQS queue `count` times.
*
* @param {object} message The message to send
* @param {object} messageBatch The message batch to send
* @return {Promise<void>}
*/
export async function sendMessage(message) {
export async function sendMessage(messageBatch) {
// Build the SQS client
const client = new SQSClient({
region: process.env.AWS_REGION
})

const sqsResponse = await client.send(
new SendMessageCommand({
new SendMessageBatchCommand({
QueueUrl: process.env.MESSAGE_QUEUE_URL,
MessageBody: JSON.stringify(message)
Entries: messageBatch
})
)

Expand Down
35 changes: 24 additions & 11 deletions src/utils/writeOutput.js
Original file line number Diff line number Diff line change
@@ -1,26 +1,39 @@
import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3'
import uniqid from 'uniqid';

/**
* Writes output to Amazon S3
*
* @param {import('@aws-sdk/types').Message} message The SQS message
* @param {object} output The output to write
* @param {object[]} outputs The output to write
* @return {void}
*/
export async function writeOutput(message, output) {
export async function writeOutput(outputs) {
// Build the S3 client
const client = new S3Client({
region: process.env.AWS_REGION
})

// Write to S3
const s3Response = await client.send(
new PutObjectCommand({
Body: JSON.stringify(output),
Bucket: process.env.S3_BUCKET_ARN.split(':::')[1],
Key: `${message.MessageId}.json`
// Write to S3
const s3Responses = await Promise.allSettled(
outputs.map(async output => {
return await client.send(
new PutObjectCommand({
Body: JSON.stringify(output.body),
Bucket: process.env.S3_BUCKET_ARN.split(':::')[1],
// Key: `${urlToFileString(output.url)}.json`
Key: `${uniqid()}.json`
})
)
})
)

console.log(`S3 Response: ${JSON.stringify(s3Response)}`)
s3Responses.map((res) => {
if (res.status === "fulfilled") {
console.log(`S3 Response: `, res.value);
}
else { console.error(res.reason) }
});
}

function urlToFileString(url) {
return new URL(url).host.replace(/\./g, '_');
}

0 comments on commit d3948a2

Please sign in to comment.