diff --git a/dgi_migrate.services.yml b/dgi_migrate.services.yml index c5b428f4..0d72593e 100644 --- a/dgi_migrate.services.yml +++ b/dgi_migrate.services.yml @@ -4,3 +4,6 @@ services: class: Drupal\dgi_migrate\Routing\RouteSubscriber tags: - name: event_subscriber + plugin.manager.dgi_migrate.locker: + class: \Drupal\dgi_migrate\LockerPluginManager + parent: default_plugin_manager diff --git a/scripts/README.md b/scripts/README.md index 6f9962c1..2a7086c5 100644 --- a/scripts/README.md +++ b/scripts/README.md @@ -106,6 +106,15 @@ NOTE: In terms of establishing the `PROCESSES` quantity, there are environmental * Is Crayfish on the same machine? Are derivatives enabled? There can be additional load from Crayfish acquiring files from Drupal, or if on the same machine, from the derivatives proper being run. * Is the site being used by others? If so, it is probably a good idea to play nice and to try to avoid saturating the CPUs, perhaps going so far as to `nice` the migration execution. If not, we could target a slight bit of oversaturation, with the expectation that there will be some background I/O overhead on read/write operations that might leave some CPU cycles otherwise unoccupied. +#### Locking plugins + +The means of locking is pluggable. The default implementation uses a directory of locks for various purposes in `temporary://`, using [PHP's `\SplFileInfo::flock()`](https://www.php.net/manual/en/splfileobject.flock.php), in [our `flock` plugin](../src/Plugin/dgi_migrate/locker/Flock.php); this `flock` plugin _does_ requires the ability to open many files (on the orders of tens-of-thousands). We also provide [a `pgsql_advisory_locking` plugin](../src/Plugin/dgi_migrate/locker/PgsqlAdvisoryLocking.php) that might be used to perform locking using [a PostgreSQL database's advisory locking capabilities](https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS); however, it is expected that using such might perform somewhat slower than `flock` due to requiring the round-trips to the DB. Ideally, we should look at implementing native semaphore/mutex functionality. + +The locking plugin can be: +- configured by setting the `DGI_MIGRATE_DEFAULT_LOCKER` to the ID of the desired plugin + - Built-in are `flock` and `pgsql_advisory_locking`; however, other modules might provide other plugins +- configured for on the particular migration lookup plugin definition, via the `locker` property. + ### Rollback If additional parameters/options need to be passed to the `dgi-migrate:rollback` diff --git a/src/Attribute/Locker.php b/src/Attribute/Locker.php new file mode 100644 index 00000000..bd762734 --- /dev/null +++ b/src/Attribute/Locker.php @@ -0,0 +1,13 @@ +alterInfo('dgi_migrate__locker_info'); + $this->setCacheBackend($cacheBackend, 'dgi_migrate__locker_plugins'); + } + + /** + * {@inheritDoc} + */ + public function getFallbackPluginId($plugin_id, array $configuration = []) : string { + return 'flock'; + } + +} diff --git a/src/LockerPluginManagerInterface.php b/src/LockerPluginManagerInterface.php new file mode 100644 index 00000000..3889e07e --- /dev/null +++ b/src/LockerPluginManagerInterface.php @@ -0,0 +1,12 @@ +get('file_system'), + ); + } + + /** + * {@inheritDoc} + */ + public function acquireLock(string $name, int $mode = LOCK_EX, bool &$would_block = FALSE): bool { + return $this->getLockFile($name)->flock($mode, $would_block); + } + + /** + * {@inheritDoc} + */ + public function releaseLock(string $name): bool { + return $this->getLockFile($name)->flock(LOCK_UN); + } + + /** + * Get an \SplFileObject instance to act as the lock. + * + * @param string $name + * The name of the lock to acquire. Should result in a file being created + * under the temporary:// scheme of the same name, against which `flock` + * commands will be issued. + * + * @return \SplFileObject + * The \SplFileObject instance against which to lock. + */ + protected function getLockFile(string $name) : \SplFileObject { + if (!isset($this->lockFiles[$name])) { + $file_uri = "temporary://{$name}"; + $directory = $this->fileSystem->dirname($file_uri); + $basename = $this->fileSystem->basename($file_uri); + $this->fileSystem->prepareDirectory($directory, FileSystemInterface::CREATE_DIRECTORY); + $file_uri = "{$directory}/{$basename}"; + + // XXX: Drupal's LocalStream wrappers presently have a bug in their + // ::stream_lock() method which underlies flock()/\SplFileObject::flock(), + // where they fail to properly report the lock status when non-blockingly + // acquiring locks, so let's side-step the issue by referencing the real + // file path directly. + // + // @see https://www.drupal.org/project/drupal/issues/3493632 + // @see https://github.com/php/doc-en/issues/4299 + $file_path = $this->fileSystem->realpath($file_uri); + + touch($file_path); + $this->lockFiles[$name] = new \SplFileObject($file_path, 'a+'); + } + + return $this->lockFiles[$name]; + } + + /** + * {@inheritDoc} + */ + public function acquireControl(): bool { + return $this->acquireLock(LockingMigrationLookup::CONTROL_LOCK); + } + + /** + * {@inheritDoc} + */ + public function releaseControl(): bool { + return $this->releaseLock(LockingMigrationLookup::CONTROL_LOCK); + } + +} diff --git a/src/Plugin/dgi_migrate/locker/LockerInterface.php b/src/Plugin/dgi_migrate/locker/LockerInterface.php new file mode 100644 index 00000000..eebe38b0 --- /dev/null +++ b/src/Plugin/dgi_migrate/locker/LockerInterface.php @@ -0,0 +1,58 @@ +get('database'), + ); + } + + /** + * {@inheritDoc} + * + * XXX: flock() has the idea of promoting/demoting for which we do not + * presently account; for example: + * - when holding an exclusive lock and a shared lock is acquired, flock() + * would then allow other process to acquire the shared lock. + * - when a process holding a shared lock promotes it to an exclusive lock, + * the shared lock would have been replaced, so it would not be necessary to + * separately release the shared lock. + * PostgreSQL's advisory locks do not act the same way. We _could_ have the + * acquisition release the "other" type of lock; however, there is not really + * any utility in doing so at present. + */ + public function acquireLock(string $name, int $mode = LOCK_EX, bool &$would_block = FALSE): bool { + $lock_id = static::toLockId($name); + if ($mode === LOCK_EX) { + $this->database->query( + 'SELECT pg_advisory_lock(:lock_id);', + [ + ':lock_id' => $lock_id, + ], + ); + $this->exclusiveLocks[$lock_id] = ($this->exclusiveLocks[$lock_id] ?? 0) + 1; + return TRUE; + } + if ($mode === LOCK_SH) { + $this->database->query( + 'SELECT pg_advisory_lock_shared(:lock_id);', + [ + ':lock_id' => $lock_id, + ], + ); + $this->sharedLocks[$lock_id] = ($this->sharedLocks[$lock_id] ?? 0) + 1; + return TRUE; + } + if ($mode === (LOCK_EX | LOCK_NB)) { + $result = $this->database->query( + 'SELECT pg_try_advisory_lock(:lock_id);', + [ + ':lock_id' => $lock_id, + ], + )?->fetchField(); + if ($result) { + $would_block = FALSE; + $this->exclusiveLocks[$lock_id] = ($this->exclusiveLocks[$lock_id] ?? 0) + 1; + } + else { + $would_block = TRUE; + } + return $result; + } + if ($mode === (LOCK_SH | LOCK_NB)) { + $result = $this->database->query( + 'SELECT pg_try_advisory_lock_shared(:lock_id);', + [ + ':lock_id' => $lock_id, + ], + )?->fetchField(); + if ($result) { + $would_block = FALSE; + $this->sharedLocks[$lock_id] = ($this->sharedLocks[$lock_id] ?? 0) + 1; + } + else { + $would_block = TRUE; + } + return $result; + } + throw new \LogicException("Unknown lock mode: {$mode}"); + } + + /** + * {@inheritDoc} + */ + public function releaseLock(string $name): bool { + $lock_id = static::toLockId($name); + + $shared = $this->releaseSharedLocks($lock_id); + $exclusive = $this->releaseExclusiveLocks($lock_id); + + return $shared || $exclusive; + } + + /** + * PostgreSQL deal with (big) integers for its locks, so let's map things. + * + * Somewhat adapted from https://stackoverflow.com/a/9812029. + * + * @param string $name + * The lock name to map. + * + * @return int + * An ID to use. + */ + protected static function toLockId(string $name) : int { + $hash = md5($name); + return ((hexdec($hash[16]) & 1) ? 1 : -1) * hexdec(substr($hash, 0, 15)); + } + + /** + * Release target exclusive lock. + * + * @param int $lock_id + * Mapped ID of the lock to release. + * + * @return bool + * TRUE if we released without issue; otherwise, FALSE if we thought we had + * to release it too many times. + */ + protected function releaseExclusiveLocks(int $lock_id) : bool { + $results = []; + $occurrences = $this->exclusiveLocks[$lock_id] ?? 0; + while ($occurrences > 0) { + $results[] = (bool) $this->database->query( + 'SELECT pg_advisory_unlock(:lock_id);', + [ + ':lock_id' => $lock_id, + ], + )?->fetchField(); + $occurrences--; + } + unset($this->exclusiveLocks[$lock_id]); + return !empty($results) && !in_array(FALSE, $results, TRUE); + } + + /** + * Release target shared lock. + * + * @param int $lock_id + * Mapped ID of the lock to release. + * + * @return bool + * TRUE if we released without issue; otherwise, FALSE if we thought we had + * to release it too many times. + */ + protected function releaseSharedLocks(int $lock_id) : bool { + $results = []; + $occurrences = $this->sharedLocks[$lock_id] ?? 0; + while ($occurrences > 0) { + $results[] = (bool) $this->database->query( + 'SELECT pg_advisory_unlock_shared(:lock_id);', + [ + ':lock_id' => $lock_id, + ], + )?->fetchField(); + $occurrences--; + } + unset($this->sharedLocks[$lock_id]); + return !empty($results) && !in_array(FALSE, $results, TRUE); + } + + /** + * Hashed control lock ID as a (less-than) 32-bit integer. + */ + protected const CONTROL_LOCK_ID = -89904742; + + /** + * {@inheritDoc} + */ + public function acquireControl(): bool { + $this->database->query( + 'SELECT pg_advisory_lock(:lock_id, :lock_id);', + [ + ':lock_id' => static::CONTROL_LOCK_ID, + ], + ); + return TRUE; + } + + /** + * {@inheritDoc} + */ + public function releaseControl(): bool { + return $this->database->query( + 'SELECT pg_advisory_unlock(:lock_id, :lock_id);', + [ + ':lock_id' => static::CONTROL_LOCK_ID, + ], + )?->fetchField(); + } + +} diff --git a/src/Plugin/migrate/process/LockingMigrationLookup.php b/src/Plugin/migrate/process/LockingMigrationLookup.php index bdecd296..08b2b69c 100644 --- a/src/Plugin/migrate/process/LockingMigrationLookup.php +++ b/src/Plugin/migrate/process/LockingMigrationLookup.php @@ -5,8 +5,8 @@ use Drupal\Component\Plugin\Exception\PluginNotFoundException; use Drupal\Component\Utility\NestedArray; use Drupal\Core\Database\Connection; -use Drupal\Core\File\FileSystemInterface; use Drupal\Core\Plugin\ContainerFactoryPluginInterface; +use Drupal\dgi_migrate\Plugin\dgi_migrate\locker\LockerInterface; use Drupal\migrate\MigrateException; use Drupal\migrate\MigrateExecutableInterface; use Drupal\migrate\MigrateLookupInterface; @@ -27,8 +27,8 @@ * * Accepts all the same as the core "migration_lookup" plugin, in addition to: * - "no_lock": Flag to explicitly skip locking, which should only be used when - * it is known that there's a one-to-one mapping between each set of paramters - * and each resultant value. + * it is known that there's a one-to-one mapping between each set of + * parameters and each resultant value. * - "lock_context_keys": A mapping of migrations IDs to arrays of maps, * mapping: * - "offset": An array of offsets indexing into the `$value` passed to the @@ -37,6 +37,8 @@ * - "hash": An optional string representing a pattern. If provided every * '#' found will be replaced with hexit resulting from hashing the value * "offset". + * - "locker": Optional ID of locker plugin to use. Should not be necessary to + * provide, but just to be completionist. */ class LockingMigrationLookup extends ProcessPluginBase implements MigrateProcessInterface, ContainerFactoryPluginInterface { @@ -99,12 +101,6 @@ class LockingMigrationLookup extends ProcessPluginBase implements MigrateProcess */ protected MigrationInterface $migration; - /** - * An array of SplFileObjects, to facilitate locking. - * - * @var \SplFileObject[] - */ - protected array $lockFiles = []; /** * The migration stub service. @@ -135,11 +131,11 @@ class LockingMigrationLookup extends ProcessPluginBase implements MigrateProcess protected $lockContextKeys; /** - * The file system service. + * Locker plugin instance to use to manage locks. * - * @var \Drupal\Core\File\FileSystemInterface + * @var \Drupal\dgi_migrate\Plugin\dgi_migrate\locker\LockerInterface */ - protected FileSystemInterface $fileSystem; + protected LockerInterface $locker; /** * Constructor. @@ -304,37 +300,11 @@ protected function releaseMigrationLocks() : void { */ protected function getControlLock() : bool { if (!$this->hasControl) { - $this->hasControl = $this->acquireLock(static::CONTROL_LOCK); + $this->hasControl = $this->locker->acquireControl(); } return $this->hasControl; } - /** - * Get an \SplFileObject instance to act as the lock. - * - * @param string $name - * The name of the lock to acquire. Should result in a file being created - * under the temporary:// scheme of the same name, against which `flock` - * commands will be issued. - * - * @return \SplFileObject - * The \SplFileObject instance against which to lock. - */ - protected function getLockFile(string $name) : \SplFileObject { - if (!isset($this->lockFiles[$name])) { - $file_name = "temporary://{$name}"; - $directory = $this->fileSystem->dirname($file_name); - $basename = $this->fileSystem->basename($file_name); - $this->fileSystem->prepareDirectory($directory, FileSystemInterface::CREATE_DIRECTORY); - $file_name = "{$directory}/{$basename}"; - - touch($file_name); - $this->lockFiles[$name] = $file = new \SplFileObject($file_name, 'a+'); - } - - return $this->lockFiles[$name]; - } - /** * Helper; acquire the lock. * @@ -347,11 +317,11 @@ protected function getLockFile(string $name) : \SplFileObject { * call _would_ have blocked. * * @return bool - * TRUE on success. Should not be able to return FALSE, as we perform this - * in a blocking manner. + * TRUE on success. Should not be able to return FALSE (except with LOCK_NB) + * as we perform this in a blocking manner. */ protected function acquireLock(string $name, int $mode = LOCK_EX, bool &$would_block = FALSE) : bool { - return $this->getLockFile($name)->flock($mode, $would_block); + return $this->locker->acquireLock($name, $mode, $would_block); } /** @@ -365,7 +335,7 @@ protected function acquireLock(string $name, int $mode = LOCK_EX, bool &$would_b * not hold the lock? */ protected function releaseLock(string $name) : bool { - return $this->getLockFile($name)->flock(LOCK_UN); + return $this->locker->releaseLock($name); } /** @@ -373,7 +343,7 @@ protected function releaseLock(string $name) : bool { */ protected function releaseControlLock() { if ($this->hasControl) { - $this->releaseLock(static::CONTROL_LOCK); + $this->locker->releaseControl(); $this->hasControl = FALSE; } } @@ -588,7 +558,15 @@ public static function create(ContainerInterface $container, array $configuratio $instance->migration = $migration; $instance->migrateStub = $container->get('migrate.stub'); $instance->migrateLookup = $container->get('migrate.lookup'); - $instance->fileSystem = $container->get('file_system'); + + /** @var \Drupal\dgi_migrate\LockerPluginManagerInterface $locker_plugin_manager */ + $locker_plugin_manager = $container->get('plugin.manager.dgi_migrate.locker'); + $locker_plugin_id = match(TRUE) { + isset($plugin_definition['locker']) => $plugin_definition['locker'], + getenv('DGI_MIGRATE_DEFAULT_LOCKER') => getenv('DGI_MIGRATE_DEFAULT_LOCKER'), + default => 'flock', + }; + $instance->locker = $locker_plugin_manager->createInstance($locker_plugin_id); return $instance; }