Skip to content

Commit

Permalink
DEV-16950 - Ability to send batch messages, get attributes (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
unusorin authored Nov 16, 2020
1 parent 63e9404 commit 9c38b72
Showing 1 changed file with 77 additions and 1 deletion.
78 changes: 77 additions & 1 deletion Client/Aws/SqsClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,24 @@
use Aws\Sns\MessageValidator;
use GuzzleHttp\Promise\Promise;
use Psr\Log\LoggerInterface;
use Ramsey\Uuid\Uuid;

class SqsClient extends AbstractAwsClient
{
private const MAX_BATCH_SIZE = 10;

protected const RESOURCE_NAME = 'QueueUrl';

private const MESSAGE_BODY = 'MessageBody';
private const ID = 'Id';
private const MESSAGES = 'Messages';
private const MESSAGE = 'Message';
private const BODY = 'Body';
private const RECEIPT_HANDLE = 'ReceiptHandle';
private const ENTRIES = 'Entries';
private const ATTRIBUTES = 'Attributes';
private const ATTRIBUTE_NAMES = 'AttributeNames';
private const APPROXIMATE_NUMBER_OF_MESSAGES = 'ApproximateNumberOfMessages';

/**
* @var MessageValidator
Expand Down Expand Up @@ -99,6 +106,75 @@ public function send($message)
return $this->sendMessage($message);
}

/**
* @param array $messages
*/
public function sendBatch(array $messages)
{
if (count($messages) > self::MAX_BATCH_SIZE) {
throw new \RuntimeException('SQS batch size is hard limited to ' . self::MAX_BATCH_SIZE);
}
$messages = array_map(
function ($message) {
if (is_array($message)) {
$message = $this->prepareMessageFromArrayMessage($message);
} else {
$message = $this->prepareMessageFromNonArrayMessage($message);
}

if (
$this->largePayloadMessageExtension !== null
&& $this->largePayloadMessageExtension->isMessageLarge($message)
) {
$messageS3Pointer = $this->largePayloadMessageExtension->storeMessageInS3($message);
$message[self::MESSAGE_BODY] = json_encode($messageS3Pointer);
}

$message[self::ID] = Uuid::uuid4()->toString();

return $message;
},
$messages
);

/** @noinspection PhpUndefinedMethodInspection */
return $this->sendMessageBatch([self::ENTRIES => $messages]);
}

/**
* @param array $messages
*/
public function sendBufferedBatch(array $messages)
{
while (count($messages) > 0) {
$this->sendBatch(array_splice($messages, 0, 10));
}
}

/**
* @param array $options
* @return mixed|null
*/
public function getQueueAttributes(array $options = [])
{
$options[self::RESOURCE_NAME] = $this->resource;

/** @var Result $result */
$result = parent::getQueueAttributes($options);

return $result->get(self::ATTRIBUTES);
}

/**
* @return int
*/
public function getApproximateNumberOfMessages(): int
{
$attributes = $this->getQueueAttributes([self::ATTRIBUTE_NAMES => [self::APPROXIMATE_NUMBER_OF_MESSAGES]]);

return $attributes[self::APPROXIMATE_NUMBER_OF_MESSAGES];
}

/**
* @param array $args
*
Expand Down Expand Up @@ -232,7 +308,7 @@ private function prepareMessageFromArrayMessage(array $message): array
{
if (!array_key_exists(self::MESSAGE_BODY, $message)) {
$message = [self::MESSAGE_BODY => json_encode($message)];
} else if (is_array($message[self::MESSAGE_BODY]) || is_object($message[self::MESSAGE_BODY])) {
} elseif (is_array($message[self::MESSAGE_BODY]) || is_object($message[self::MESSAGE_BODY])) {
$message[self::MESSAGE_BODY] = json_encode($message[self::MESSAGE_BODY]);
}

Expand Down

0 comments on commit 9c38b72

Please sign in to comment.