Skip to content

Commit

Permalink
Merge + gitignore changes
Browse files Browse the repository at this point in the history
  • Loading branch information
agamm committed Sep 23, 2023
2 parents 0e99d36 + d3948a2 commit f1e5417
Show file tree
Hide file tree
Showing 16 changed files with 206 additions and 10,090 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/codeql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
steps:
- name: Checkout
id: checkout
uses: actions/checkout@v3
uses: actions/checkout@v4

- name: Initialize CodeQL
id: initialize
Expand Down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,4 @@ output.txt
output/

*.csv
script/download
script/download
16 changes: 9 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ queue.
**Consumers**: These are AWS Spot Instances that retrieve tasks from the queue,
execute the tasks, and then store the results in S3 buckets.

![Flow Chart](https://i.imgur.com/qpGFq5x.png)

With **aws-shotgun**, you don't need to worry about setting up or cleaning up
the infrastructure. Your main focus remains on defining the business logic that
you want to execute at scale. So, let's dive in and start using **aws-shotgun**!
Expand Down Expand Up @@ -83,13 +85,13 @@ Initially, the response body is written as-is to S3.
The [`settings.json`](./settings.json) file defines the following configuration
values:

| Name | Description | Default Value |
| --------------------------- | ------------------------------------- | ------------- |
| `aws_region` | AWS region to deploy to | `us-east-1` |
| `aws_spot_instance_bid_usd` | Spot instance bid price (USD) | `0.015` |
| `aws_spot_instance_type` | Spot instance type | `t2.micro` |
| `aws_spot_instance_count` | Number of spot instances | `2` |
| `aws_sqs_batch_size` | Batch size for receiving SQS messages | `10` |
| Name | Description | Default Value |
| --------------------------- | ------------------------------------- | ------------- |
| `aws_region` | AWS region to deploy to | `us-east-1` |
| `aws_spot_instance_bid_usd` | Spot instance bid price (USD) | `0.015` |
| `aws_spot_instance_type` | Spot instance type | `t2.micro` |
| `aws_spot_instance_count` | Number of spot instances | `2` |
| `aws_sqs_batch_size` | Batch size for receiving SQS messages (Max: 10) | `10` |

### Step 4: Update `src/producer/index.js`

Expand Down
18 changes: 0 additions & 18 deletions script/cleanup

This file was deleted.

51 changes: 51 additions & 0 deletions script/logs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#!/bin/bash

LOG_DIR="logs"

mkdir -p "$LOG_DIR"


LOG_GROUP="/aws/lambda/producer"

# Get the latest log stream for the specified log group
LOG_STREAM=$(aws logs describe-log-streams \
--log-group-name "$LOG_GROUP" \
--max-items 1 \
--order-by LastEventTime \
--descending \
--query "logStreams[].logStreamName" \
--output text | head -n 1)

if [ -n "$LOG_STREAM" ]; then
LOG_FILE="$LOG_DIR/producer.log"

# Download the log events from the specified log stream
aws logs get-log-events \
--log-group-name "$LOG_GROUP" \
--log-stream-name "$LOG_STREAM" \
--query "events[].message" \
--output text > "$LOG_FILE"

echo "Downloaded producer log for $LOG_STREAM to $LOG_FILE"
else
echo "No log streams found for $LOG_GROUP"
fi

# Get a list of EC2 instance IDs
INSTANCE_IDS=$(aws ec2 describe-instances --query "Reservations[*].Instances[*].InstanceId" --filters Name=instance-state-name,Values=running --output text)

# Loop through the instance IDs and download logs for each
for INSTANCE_ID in $INSTANCE_IDS; do
LOG_FILE="$LOG_DIR/$INSTANCE_ID.log"

# Capture the system logs for the instance
output=$(aws ec2 get-console-output --instance-id "$INSTANCE_ID" --query "Output" --output text)

# Check if the output is not None before redirecting it to the log file
if [ "$output" != "None" ]; then
echo "$output" > "$LOG_FILE"
echo "Downloaded consumer log for $INSTANCE_ID to $LOG_FILE"
else
echo "Empty consumer log for $INSTANCE_ID"
fi
done
32 changes: 28 additions & 4 deletions script/start
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,34 @@
WORKSPACE=$(pwd)
DEBUG=false

# Check if there is an existing Terraform state
if test -f "$WORKSPACE/tf/terraform.tfstate"; then
echo "Terraform state exists. Remove any .tfstate, .lock.hcl, and .terraform data."
exit 1
# Check if there is an existing Terraform state file
if [ -f "$WORKSPACE/tf/terraform.tfstate" ]; then
# Get the list of resources in the Terraform state
resource_list=$(terraform state list -state="tf/terraform.tfstate")

# Check if the resource list is empty
if [ -z "$resource_list" ]; then
echo "Terraform state contains no resources."
read -p "Remove the empty Terraform state? (y/n): " confirm
if [ "$confirm" == "y" ]; then
rm -rf "$WORKSPACE/tf/.terraform"
rm -f "$WORKSPACE/tf/terraform.tfstate"
rm -f "$WORKSPACE/tf/terraform.tfstate.backup"
rm -f "$WORKSPACE/tf/terraform.tfvars"
rm -f "$WORKSPACE/tf/.terraform.lock.hcl"
rm -f "$WORKSPACE/tf/*.backup"
echo "Terraform state removed."
else
echo "Terraform state removal canceled."
echo "Please remove any .tfstate, .lock.hcl, and .terraform data."
exit 1
fi
else
echo "Terraform state contains the following resources:"
echo "$resource_list"
echo "Please remove your existing resources."
exit 1
fi
fi

# Get the user data for the consumer instances
Expand Down
2 changes: 2 additions & 0 deletions script/status
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#!/bin/bash
WORKSPACE=$(pwd)

echo "Time: $(date +"%H:%M:%S")"

# Get the SQS queue and S3 bucket information from the Terraform outputs
cd "$WORKSPACE/tf/" || (echo "Please run from the root of the repository" && exit)
QUEUE_URL=$(terraform output -raw sqs_queue_url)
Expand Down
62 changes: 36 additions & 26 deletions src/consumer/index.js
Original file line number Diff line number Diff line change
@@ -1,37 +1,47 @@
import fetch from 'node-fetch'
import { writeOutput } from '../utils/writeOutput.js'
import { fetchWithTimeout } from '../utils/consumer.js'

const FETCH_TIMEOUT = 10000;

/**
* This script is the handler that performs any manipulation of the message
* before it is uploaded to Amazon S3. It must return the string data that will
* be written to the S3 bucket.
* This script is an example of a handler function that sends an HTTP request to the url
* and uploads the result to Amazon S3. You can change it to anything you want ;)
*
* @param {object} message The SQS message body as a JSON object
* @param {object} messages The SQS messages batch as an array with JSON objects
* @return {void}
*/
export async function handler(message) {

export async function handler(messages) {
try {
// Make the request
await timeout(200 * Math.random())
const response = await fetch(`https://${message.url}`)
// running multiple asynchronous tasks
const responses = await Promise.allSettled(
// mapping messages from producer to return promises array
messages.map(async message => {
const parsedMessage = JSON.parse(message.Body);
const url = parsedMessage.url;
const urlWithProtocol = url.startsWith("http") ? url : `http://${url}`;

console.log("Handling url: ", urlWithProtocol);
return await fetchWithTimeout(urlWithProtocol, FETCH_TIMEOUT);
})
)

// handling aggregated results
console.log("Responses are back!");
const okResponses = responses
.filter((res) => {
if (res.status === "fulfilled") {
console.log('Response OK from ', res.value.url);
return res;
}
else { console.error(res.reason) }
})
.map(res => res.value);

// upload to s3 bucket
await writeOutput(okResponses);

// Write the output to S3
if (response.ok) {
await writeOutput(urlToFileString(message.url), message.url)
}
} catch (error) {
console.error('An error occurred:', error)
}
}

function urlToFileString(url) {
let urlWithoutProtocol = url.replace('https://', '').replace('http://', '')
let fileCompatibleString = urlWithoutProtocol
.replace(/\//g, '-')
.replace(/\./g, '_')
return fileCompatibleString
}

function timeout(ms) {
return new Promise((resolve) => setTimeout(resolve, ms))
}
}
8 changes: 7 additions & 1 deletion src/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion src/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"@aws-sdk/client-s3": "^3.395.0",
"@aws-sdk/client-sqs": "^3.395.0",
"node-fetch": "^3.3.2",
"sqs-consumer": "^7.2.2"
"sqs-consumer": "^7.2.2",
"uniqid": "^5.4.0"
}
}
19 changes: 16 additions & 3 deletions src/producer/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { sendMessage } from '../utils/sendMessage.js'
import data from './urls.json' assert { type: 'json' }
import uniqid from 'uniqid';

/**
* Producer script for AWS Lambda functions.
Expand All @@ -9,12 +10,24 @@ import data from './urls.json' assert { type: 'json' }
* @return {Object} The response object
*/
export async function handler(event, context) {
// Send the urls to the SQS queue
for (const url of data) {
await sendMessage({ url: url })
// Sends 10 urls batches to the SQS queue for quicker performance
let msgCounter = 0;
for (let i = 0; i < data.length; i += 10) {
const dataBatch = data.slice(i, i + 10);
const urlsBatch = [];
for (let j = 0; j < dataBatch.length; j++) {
urlsBatch.push({
MessageBody: JSON.stringify({url: dataBatch[j]}),
Id: uniqid()
})
}

await sendMessage(urlsBatch)
msgCounter++;
}

// Return success response
console.log(`Request complete! Sent ${msgCounter} message batches for ${data.length} urls.`)
return {
statusCode: 200,
body: JSON.stringify('Request complete!')
Expand Down
Loading

0 comments on commit f1e5417

Please sign in to comment.