Skip to content

Commit

Permalink
amp/parallel 2.2
Browse files Browse the repository at this point in the history
  • Loading branch information
rodber committed May 18, 2024
1 parent 57d0678 commit 671fab3
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 15 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
],
"require": {
"php": "^8.1",
"amphp/parallel": "^1.4",
"amphp/parallel": "^2.2",
"chevere/action": "^1.0.0",
"chevere/data-structure": "^1.0.1",
"chevere/parameter": "^1.0.x-dev",
Expand Down
45 changes: 45 additions & 0 deletions src/CallableTask.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?php

/*
* This file is part of Chevere.
*
* (c) Rodolfo Berrios <rodolfo@chevere.org>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Chevere\Workflow;

use Amp\Cancellation;
use Amp\Parallel\Worker\Task;
use Amp\Sync\Channel;

/**
* @template-implements Task<mixed, never, never>
*/
final class CallableTask implements Task
{
private string $callable;

/**
* @var array<mixed>
*/
private array $arguments;

public function __construct(
string $callable,
mixed ...$arguments
) {
$this->callable = $callable;
$this->arguments = $arguments;
}

public function run(Channel $channel, Cancellation $cancellation): mixed
{
// @phpstan-ignore-next-line
return ($this->callable)(...$this->arguments);
}
}
32 changes: 18 additions & 14 deletions src/Runner.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

namespace Chevere\Workflow;

use Amp\Promise;
use Amp\Parallel\Worker\Execution;
use Chevere\Action\Interfaces\ActionInterface;
use Chevere\Parameter\Interfaces\CastInterface;
use Chevere\Workflow\Interfaces\JobInterface;
Expand All @@ -24,9 +24,8 @@
use InvalidArgumentException;
use OutOfBoundsException;
use Throwable;
use function Amp\Parallel\Worker\enqueueCallable;
use function Amp\Promise\all;
use function Amp\Promise\wait;
use function Amp\Future\await;
use function Amp\Parallel\Worker\submit;
use function Chevere\Message\message;
use function Chevere\Parameter\cast;

Expand Down Expand Up @@ -54,9 +53,12 @@ public function withRun(): RunnerInterface

continue;
}
$promises = $new->getPromises($node);
$executions = $new->getExecutions($node);
/** @var RunnerInterface[] $responses */
$responses = wait(all($promises));
$responses = await(array_map(
fn (Execution $e) => $e->getFuture(),
$executions,
));
foreach ($responses as $runner) {
$new->merge($new, $runner);
}
Expand Down Expand Up @@ -178,20 +180,22 @@ private function addJobSkip(string $name): void

/**
* @param array<string> $queue
* @return array<Promise<mixed>>
* @return array<Execution<mixed, never, never>>
*/
private function getPromises(array $queue): array
private function getExecutions(array $queue): array
{
$promises = [];
$return = [];
foreach ($queue as $job) {
$promises[] = enqueueCallable(
'Chevere\\Workflow\\runnerForJob',
$this,
$job,
$return[] = submit(
new CallableTask(
'Chevere\\Workflow\\runnerForJob',
$this,
$job,
)
);
}

return $promises;
return $return;
}

private function merge(self $self, RunnerInterface $runner): void
Expand Down

0 comments on commit 671fab3

Please sign in to comment.