Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Swoole Backend #312

Merged
merged 18 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
with:
php-version: ${{ matrix.php-versions }}
tools: composer, pecl
extensions: svm, mbstring, gd, fileinfo
extensions: svm, mbstring, gd, fileinfo, swoole
ini-values: memory_limit=-1

- name: Validate composer.json
Expand Down
10 changes: 9 additions & 1 deletion benchmarks/Classifiers/OneVsRestBench.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,22 @@

namespace Rubix\ML\Benchmarks\Classifiers;

use Rubix\ML\Backends\Backend;
use Rubix\ML\Classifiers\OneVsRest;
use Rubix\ML\Datasets\Generators\Blob;
use Rubix\ML\Classifiers\LogisticRegression;
use Rubix\ML\NeuralNet\Optimizers\Stochastic;
use Rubix\ML\Datasets\Generators\Agglomerate;
use Rubix\ML\Tests\DataProvider\BackendProviderTrait;

/**
* @Groups({"Classifiers"})
* @BeforeMethods({"setUp"})
*/
class OneVsRestBench
{
use BackendProviderTrait;

protected const TRAINING_SIZE = 10000;

protected const TESTING_SIZE = 10000;
Expand Down Expand Up @@ -52,9 +56,13 @@ public function setUp() : void
* @Subject
* @Iterations(5)
* @OutputTimeUnit("seconds", precision=3)
* @ParamProviders("provideBackends")
* @param array{ backend: Backend } $params
*/
public function trainPredict() : void
public function trainPredict(array $params) : void
{
$this->estimator->setBackend($params['backend']);

$this->estimator->train($this->training);

$this->estimator->predict($this->testing);
Expand Down
16 changes: 14 additions & 2 deletions benchmarks/Classifiers/RandomForestBench.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@

namespace Rubix\ML\Benchmarks\Classifiers;

use Rubix\ML\Backends\Backend;
use Rubix\ML\Classifiers\RandomForest;
use Rubix\ML\Datasets\Generators\Blob;
use Rubix\ML\Classifiers\ClassificationTree;
use Rubix\ML\Datasets\Generators\Agglomerate;
use Rubix\ML\Tests\DataProvider\BackendProviderTrait;
use Rubix\ML\Transformers\IntervalDiscretizer;

/**
* @Groups({"Classifiers"})
*/
class RandomForestBench
{
use BackendProviderTrait;

protected const TRAINING_SIZE = 10000;

protected const TESTING_SIZE = 10000;
Expand Down Expand Up @@ -70,9 +74,13 @@ public function setUpCategorical() : void
* @Iterations(5)
* @BeforeMethods({"setUpContinuous"})
* @OutputTimeUnit("seconds", precision=3)
* @ParamProviders("provideBackends")
* @param array{ backend: Backend } $params
*/
public function continuous() : void
public function continuous(array $params) : void
{
$this->estimator->setBackend($params['backend']);

$this->estimator->train($this->training);

$this->estimator->predict($this->testing);
Expand All @@ -83,9 +91,13 @@ public function continuous() : void
* @Iterations(5)
* @BeforeMethods({"setUpCategorical"})
* @OutputTimeUnit("seconds", precision=3)
* @ParamProviders("provideBackends")
* @param array{ backend: Backend } $params
*/
public function categorical() : void
public function categorical(array $params) : void
{
$this->estimator->setBackend($params['backend']);

$this->estimator->train($this->training);

$this->estimator->predict($this->testing);
Expand Down
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@
"phpstan/extension-installer": "^1.0",
"phpstan/phpstan": "^1.0",
"phpstan/phpstan-phpunit": "^1.0",
"phpunit/phpunit": "^9.0"
"phpunit/phpunit": "^9.0",
"swoole/ide-helper": "^5.1"
},
"suggest": {
"ext-tensor": "For fast Matrix/Vector computing",
Expand Down
1 change: 1 addition & 0 deletions phpstan.neon
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ parameters:
- 'benchmarks'
excludePaths:
- src/Backends/Amp.php
- src/Backends/Swoole.php
15 changes: 14 additions & 1 deletion phpunit.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" backupGlobals="false" backupStaticAttributes="false" bootstrap="vendor/autoload.php" colors="true" convertErrorsToExceptions="true" convertNoticesToExceptions="true" convertWarningsToExceptions="true" forceCoversAnnotation="true" processIsolation="false" stopOnFailure="false" xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/9.3/phpunit.xsd">
<phpunit
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
backupGlobals="false"
backupStaticAttributes="false"
bootstrap="vendor/autoload.php"
colors="true"
convertErrorsToExceptions="true"
convertNoticesToExceptions="true"
convertWarningsToExceptions="true"
forceCoversAnnotation="true"
processIsolation="true"
stopOnFailure="false"
xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/9.3/phpunit.xsd"
>
<coverage processUncoveredFiles="true">
<include>
<directory suffix=".php">src</directory>
Expand Down
2 changes: 1 addition & 1 deletion src/AnomalyDetectors/LocalOutlierFactor.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class LocalOutlierFactor implements Estimator, Learner, Scoring, Persistable
*
* @var Spatial
*/
protected \Rubix\ML\Graph\Trees\Spatial $tree;
protected Spatial $tree;

/**
* The precomputed k distances between each training sample and its k'th nearest neighbor.
Expand Down
2 changes: 1 addition & 1 deletion src/AnomalyDetectors/Loda.php
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class Loda implements Estimator, Learner, Online, Scoring, Persistable
*
* @var \Tensor\Matrix|null
*/
protected ?\Tensor\Matrix $r = null;
protected ?Matrix $r = null;

/**
* The edges and bin counts of each histogram.
Expand Down
4 changes: 2 additions & 2 deletions src/AnomalyDetectors/OneClassSVM.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class OneClassSVM implements Estimator, Learner
*
* @var svm
*/
protected \svm $svm;
protected svm $svm;

/**
* The hyper-parameters of the model.
Expand All @@ -58,7 +58,7 @@ class OneClassSVM implements Estimator, Learner
*
* @var \svmmodel|null
*/
protected ?\svmmodel $model = null;
protected ?svmmodel $model = null;

/**
* @param float $nu
Expand Down
173 changes: 173 additions & 0 deletions src/Backends/Swoole.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
<?php

namespace Rubix\ML\Backends;

use Rubix\ML\Backends\Tasks\Task;
use Rubix\ML\Specifications\ExtensionIsLoaded;
use Rubix\ML\Specifications\SwooleExtensionIsLoaded;
use RuntimeException;
use Swoole\Atomic;
use Swoole\Process;

use function Swoole\Coroutine\run;

/**
* Swoole
*
* Works both with Swoole and OpenSwoole.
*
* @category Machine Learning
* @package Rubix/ML
*/
class Swoole implements Backend
{
/**
* The queue of tasks to be processed in parallel.
*/
protected array $queue = [];

private int $cpus;

private int $hasIgbinary;

public function __construct()
{
SwooleExtensionIsLoaded::create()->check();

$this->cpus = swoole_cpu_num();
$this->hasIgbinary = ExtensionIsLoaded::with('igbinary')->passes();
}

/**
* Queue up a deferred task for backend processing.
*
* @internal
*
* @param Task $task
* @param callable(mixed,mixed):void $after
* @param mixed $context
*/
public function enqueue(Task $task, ?callable $after = null, $context = null) : void
{
$this->queue[] = function () use ($task, $after, $context) {
$result = $task();

if ($after) {
$after($result, $context);
}

return $result;
};
}

/**
* Process the queue and return the results.
*
* @internal
*
* @return mixed[]
*/
public function process() : array
{
$results = [];

$maxMessageLength = new Atomic(0);
$workerProcesses = [];

$currentCpu = 0;

foreach ($this->queue as $index => $queueItem) {
$workerProcess = new Process(
function (Process $worker) use ($maxMessageLength, $queueItem) {
$serialized = $this->serialize($queueItem());

$serializedLength = strlen($serialized);
$currentMaxSerializedLength = $maxMessageLength->get();

if ($serializedLength > $currentMaxSerializedLength) {
$maxMessageLength->set($serializedLength);
}

$worker->exportSocket()->send($serialized);
},
// redirect_stdin_and_stdout
false,
// pipe_type
SOCK_DGRAM,
// enable_coroutine
true,
);

$workerProcess->setAffinity([$currentCpu]);
$workerProcess->setBlocking(false);
$workerProcess->start();

$workerProcesses[$index] = $workerProcess;

$currentCpu = ($currentCpu + 1) % $this->cpus;
}

run(function () use ($maxMessageLength, &$results, $workerProcesses) {
foreach ($workerProcesses as $index => $workerProcess) {
$status = $workerProcess->wait();

if (0 !== $status['code']) {
throw new RuntimeException('Worker process exited with an error');
}

$socket = $workerProcess->exportSocket();

if ($socket->isClosed()) {
throw new RuntimeException('Coroutine socket is closed');
}

$maxMessageLengthValue = $maxMessageLength->get();

$receivedData = $socket->recv($maxMessageLengthValue);
$unserialized = $this->unserialize($receivedData);

$results[] = $unserialized;
}
});

return $results;
}

/**
* Flush the queue
*/
public function flush() : void
{
$this->queue = [];
}

private function serialize(mixed $data) : string
{
if ($this->hasIgbinary) {
return igbinary_serialize($data);
}

return serialize($data);
}

private function unserialize(string $serialized) : mixed
{
if ($this->hasIgbinary) {
return igbinary_unserialize($serialized);
}

return unserialize($serialized);
}

/**
* Return the string representation of the object.
*
* @internal
*
* @return string
*/
public function __toString() : string
{
return 'Swoole';
}
}
2 changes: 1 addition & 1 deletion src/BootstrapAggregator.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class BootstrapAggregator implements Estimator, Learner, Parallel, Persistable
*
* @var Learner
*/
protected \Rubix\ML\Learner $base;
protected Learner $base;

/**
* The number of base learners to train in the ensemble.
Expand Down
2 changes: 1 addition & 1 deletion src/Classifiers/AdaBoost.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class AdaBoost implements Estimator, Learner, Probabilistic, Verbose, Persistabl
*
* @var Learner
*/
protected \Rubix\ML\Learner $base;
protected Learner $base;

/**
* The learning rate of the ensemble i.e. the *shrinkage* applied to each step.
Expand Down
2 changes: 1 addition & 1 deletion src/Classifiers/KDNeighbors.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class KDNeighbors implements Estimator, Learner, Probabilistic, Persistable
*
* @var Spatial
*/
protected \Rubix\ML\Graph\Trees\Spatial $tree;
protected Spatial $tree;

/**
* The zero vector for the possible class outcomes.
Expand Down
2 changes: 1 addition & 1 deletion src/Classifiers/KNearestNeighbors.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class KNearestNeighbors implements Estimator, Learner, Online, Probabilistic, Pe
*
* @var Distance
*/
protected \Rubix\ML\Kernels\Distance\Distance $kernel;
protected Distance $kernel;

/**
* The zero vector for the possible class outcomes.
Expand Down
Loading
Loading