diff --git a/composer.json b/composer.json index adfe6f0..d578b6e 100644 --- a/composer.json +++ b/composer.json @@ -13,7 +13,7 @@ ], "require": { "php": "^8.1", - "amphp/parallel": "^1.4", + "amphp/parallel": "^2.2", "chevere/action": "^1.0.0", "chevere/data-structure": "^1.0.1", "chevere/parameter": "^1.0.x-dev", diff --git a/src/CallableTask.php b/src/CallableTask.php new file mode 100644 index 0000000..2d8a4f0 --- /dev/null +++ b/src/CallableTask.php @@ -0,0 +1,45 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace Chevere\Workflow; + +use Amp\Cancellation; +use Amp\Parallel\Worker\Task; +use Amp\Sync\Channel; + +/** + * @template-implements Task + */ +final class CallableTask implements Task +{ + private string $callable; + + /** + * @var array + */ + private array $arguments; + + public function __construct( + string $callable, + mixed ...$arguments + ) { + $this->callable = $callable; + $this->arguments = $arguments; + } + + public function run(Channel $channel, Cancellation $cancellation): mixed + { + // @phpstan-ignore-next-line + return ($this->callable)(...$this->arguments); + } +} diff --git a/src/Runner.php b/src/Runner.php index 50deb32..bbec73d 100644 --- a/src/Runner.php +++ b/src/Runner.php @@ -13,7 +13,7 @@ namespace Chevere\Workflow; -use Amp\Promise; +use Amp\Parallel\Worker\Execution; use Chevere\Action\Interfaces\ActionInterface; use Chevere\Parameter\Interfaces\CastInterface; use Chevere\Workflow\Interfaces\JobInterface; @@ -24,9 +24,8 @@ use InvalidArgumentException; use OutOfBoundsException; use Throwable; -use function Amp\Parallel\Worker\enqueueCallable; -use function Amp\Promise\all; -use function Amp\Promise\wait; +use function Amp\Future\await; +use function Amp\Parallel\Worker\submit; use function Chevere\Message\message; use function Chevere\Parameter\cast; @@ -54,9 +53,12 @@ public function withRun(): RunnerInterface continue; } - $promises = $new->getPromises($node); + $executions = $new->getExecutions($node); /** @var RunnerInterface[] $responses */ - $responses = wait(all($promises)); + $responses = await(array_map( + fn (Execution $e) => $e->getFuture(), + $executions, + )); foreach ($responses as $runner) { $new->merge($new, $runner); } @@ -178,20 +180,22 @@ private function addJobSkip(string $name): void /** * @param array $queue - * @return array> + * @return array> */ - private function getPromises(array $queue): array + private function getExecutions(array $queue): array { - $promises = []; + $return = []; foreach ($queue as $job) { - $promises[] = enqueueCallable( - 'Chevere\\Workflow\\runnerForJob', - $this, - $job, + $return[] = submit( + new CallableTask( + 'Chevere\\Workflow\\runnerForJob', + $this, + $job, + ) ); } - return $promises; + return $return; } private function merge(self $self, RunnerInterface $runner): void