Skip to content

Commit

Permalink
Add an option specify number of tasks to process for AsyncTaskCommand…
Browse files Browse the repository at this point in the history
…. Docker container for local tests.
  • Loading branch information
bazilio91 committed Nov 18, 2015
1 parent 06c4ed1 commit 7579d01
Show file tree
Hide file tree
Showing 10 changed files with 272 additions and 112 deletions.
8 changes: 8 additions & 0 deletions AsyncComponent.php
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,12 @@ public function purge($queueName)
{
return $this->transport->purge($queueName);
}

/**
* @return Transport
*/
public function getTransport()
{
return $this->transport;
}
}
11 changes: 11 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM php:cli

RUN apt-get update && apt-get install -y redis-server \
rabbitmq-server librabbitmq-dev
RUN php -r "readfile('https://getcomposer.org/installer');" | php
RUN docker-php-ext-install pcntl shmop mbstring
RUN pecl install amqp && echo "extension=amqp.so" >> /usr/local/etc/php/conf.d/amqp.ini

ADD . /var/code

CMD /var/code/tests/docker-test.sh
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,7 @@ For more code examples look into tests:
~~~
vendor/bin/codecept run
~~~
Or in Docker:
~~~
./test.sh
~~~
59 changes: 53 additions & 6 deletions commands/AsyncWorkerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,77 @@
namespace bazilio\async\commands;

use bazilio\async\models\AsyncTask;
use Spork\ProcessManager;


class AsyncWorkerCommand extends \yii\console\Controller
{
static $state = 1;
protected $child;

/**
* @param string|null $queueName
* @param int|null $count tasks to process
*/
public function actionExecute($queueName = null)
public function actionExecute($queueName = null, $count = null)
{
$this->handleSignal();
/** @var AsyncTask $task */
while ($task = \Yii::$app->async->receiveTask($queueName ?: AsyncTask::$queueName)) {
$task->execute();
\Yii::$app->async->acknowledgeTask($task);
$this->checkSignal();

$this->processTask($task);

if (($count !== null && !--$count) || $this->checkSignal()) {
break;
}
}
}

/**
* @param string|null $queueName
* @param int|null $count tasks to process
*/
public function actionDaemon($queueName = null)
public function actionDaemon($queueName = null, $count = null)
{
$this->handleSignal();

/** @var AsyncTask $task */
while ($task = \Yii::$app->async->receiveTask($queueName ?: AsyncTask::$queueName, true)) {
$task->execute();
\Yii::$app->async->acknowledgeTask($task);
$this->checkSignal();

$task::$queueName = $queueName ?: AsyncTask::$queueName;
$this->processTask($task);

if (($count !== null && !--$count) || $this->checkSignal()) {
break;
}
}
}

protected function processTask(AsyncTask $task)
{
$task->execute();
\Yii::$app->async->acknowledgeTask($task);

}

private function handleSignal()
{
pcntl_signal(
SIGTERM,
function ($signo) {
echo "This signal is called. [$signo] \n";
static::$state = -1;
}
);
}

private function checkSignal()
{
pcntl_signal_dispatch();
if (AsyncWorkerCommand::$state == -1) {
return true;
}
}
}
Loading

0 comments on commit 7579d01

Please sign in to comment.