Skip to content

Commit

Permalink
Better heap sync (#180)
Browse files Browse the repository at this point in the history
* Fix test

* Better sync

* hotfix

* Remove SplObjectStorage cloning

* Add heap test
  • Loading branch information
roxblnfk authored May 9, 2021
1 parent 66d04a0 commit 7cfd773
Show file tree
Hide file tree
Showing 6 changed files with 258 additions and 29 deletions.
38 changes: 27 additions & 11 deletions src/Heap/Heap.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ public function __destruct()
$this->clean();
}

/**
* @return SplObjectStorage
*/
public function getIterator(): SplObjectStorage
{
return $this->storage;
Expand Down Expand Up @@ -95,6 +92,11 @@ public function find(string $role, array $scope)
public function attach($entity, Node $node, array $index = []): void
{
$this->storage->offsetSet($entity, $node);
$role = $node->getRole();

if ($node->hasState()) {
$this->eraseIndexes($role, $node->getInitialData(), $entity);
}

$data = $node->getData();
foreach ($index as $key) {
Expand All @@ -113,8 +115,7 @@ public function attach($entity, Node $node, array $index = []): void
$value = (string)$data[$key];
}

$this->paths[get_class($entity)][$key][$value] = $entity;
$this->paths[$node->getRole()][$key][$value] = $entity;
$this->paths[$role][$key][$value] = $entity;
}
}

Expand All @@ -130,12 +131,9 @@ public function detach($entity): void

$role = $node->getRole();

// erase all the indexes
if (isset($this->paths[$role])) {
$keys = array_keys($this->paths[$role]);
foreach ($keys as $key) {
unset($this->paths[$role][$key][$node->getData()[$key]]);
}
$this->eraseIndexes($role, $node->getData(), $entity);
if ($node->hasState()) {
$this->eraseIndexes($role, $node->getInitialData(), $entity);
}

$this->storage->offsetUnset($entity);
Expand All @@ -149,4 +147,22 @@ public function clean(): void
$this->paths = [];
$this->storage = new \SplObjectStorage();
}

private function eraseIndexes(string $role, array $data, object $entity): void
{
if (!isset($this->paths[$role]) || empty($data)) {
return;
}
$keys = array_keys($this->paths[$role]);
foreach ($keys as $key) {
$value = isset($data[$key]) ? (string)$data[$key] : null;
if ($value === null) {
continue;
}
$current = &$this->paths[$role][$key];
if (isset($current[$value]) && $current[$value] === $entity) {
unset($current[$value]);
}
}
}
}
5 changes: 5 additions & 0 deletions src/Heap/Node.php
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ public function getState(): State
return $this->state;
}

public function hasState(): bool
{
return $this->state !== null;
}

/**
* Set new state value.
*
Expand Down
23 changes: 10 additions & 13 deletions src/Mapper/DatabaseMapper.php
Original file line number Diff line number Diff line change
Expand Up @@ -117,25 +117,22 @@ public function queueCreate($entity, Node $node, State $state): ContextCarrierIn
*/
public function queueUpdate($entity, Node $node, State $state): ContextCarrierInterface
{
$fromData = $state->getTransactionData();
$data = $this->fetchFields($entity);

// in a future mapper must support solid states
$changes = array_udiff_assoc($data, $state->getTransactionData(), [Node::class, 'compare']);
$changedColumns = $this->mapColumns($changes);
unset($changes[$this->primaryKey]);

$update = new Update($this->source->getDatabase(), $this->source->getTable(), $changedColumns);
$changes = array_udiff_assoc($data, $fromData, [Node::class, 'compare']);
$state->setStatus(Node::SCHEDULED_UPDATE);
$state->setData($changes);

// we are trying to update entity without PK right now
$state->forward(
$this->primaryKey,
$update,
$this->primaryColumn,
true,
ConsumerInterface::SCOPE
);
$update = new Update($this->source->getDatabase(), $this->source->getTable(), $this->mapColumns($changes));
if (isset($fromData[$this->primaryKey])) {
// set update criteria right now
$update->register($this->primaryColumn, $fromData[$this->primaryKey], false, ConsumerInterface::SCOPE);
} else {
// subscribe to PK update
$state->forward($this->primaryKey, $update, $this->primaryColumn, true, ConsumerInterface::SCOPE);
}

return $update;
}
Expand Down
12 changes: 8 additions & 4 deletions src/Transaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -147,17 +147,21 @@ protected function syncHeap(): void
// optimize to only scan over affected entities
$node = $heap->get($e);

if (!$node->hasState()) {
continue;
}

// marked as being deleted and has no external claims (GC like approach)
if ($node->getStatus() == Node::SCHEDULED_DELETE && !$node->getState()->hasClaims()) {
if ($node->getStatus() === Node::SCHEDULED_DELETE && !$node->getState()->hasClaims()) {
$heap->detach($e);
continue;
}

// reindex the entity while it has old data
$heap->attach($e, $node, $this->getIndexes($node->getRole()));

// sync the current entity data with newly generated data
$this->orm->getMapper($node->getRole())->hydrate($e, $node->syncState());

// reindex the entity
$heap->attach($e, $node, $this->getIndexes($node->getRole()));
}
}

Expand Down
208 changes: 208 additions & 0 deletions tests/ORM/Heap/HeapTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
<?php

declare(strict_types=1);

namespace Cycle\ORM\Tests\Heap;

use Cycle\ORM\Heap\Heap;
use Cycle\ORM\Heap\HeapInterface;
use Cycle\ORM\Heap\Node;
use Cycle\ORM\Tests\Fixtures\User;
use PHPUnit\Framework\TestCase;

class HeapTest extends TestCase
{
protected const
INDEX_FIELDS_1 = 'id',
INDEX_VALUES_1_1 = 42,
INDEX_VALUES_1_2 = 24,
INDEX_FIND_1_1 = [self::INDEX_FIELDS_1 => self::INDEX_VALUES_1_1],
INDEX_FIND_1_2 = [self::INDEX_FIELDS_1 => self::INDEX_VALUES_1_2],
INDEX_FIND_1_BAD = [self::INDEX_FIELDS_1 => 404],

INDEX_FIELDS_2 = 'email',
INDEX_VALUES_2_1 = 'mail1@spiral',
INDEX_VALUES_2_2 = 'mail2@spiral',
INDEX_FIND_2_1 = [self::INDEX_FIELDS_2 => self::INDEX_VALUES_2_1],
INDEX_FIND_2_2 = [self::INDEX_FIELDS_2 => self::INDEX_VALUES_2_2],
INDEX_FIND_2_BAD = [self::INDEX_FIELDS_2 => 505],

ENTITY_SET_1 = [
self::INDEX_FIELDS_1 => self::INDEX_VALUES_1_1,
self::INDEX_FIELDS_2 => self::INDEX_VALUES_2_1,
],
ENTITY_SET_2 = [
self::INDEX_FIELDS_1 => self::INDEX_VALUES_1_2,
self::INDEX_FIELDS_2 => self::INDEX_VALUES_2_2,
];

public function testAttachAndFind(): void
{
$heap = $this->createHeap();
$node1 = new Node(Node::NEW, self::ENTITY_SET_1, 'user');
$entity1 = new User();
$node2 = new Node(Node::NEW, self::ENTITY_SET_2, 'user');
$entity2 = new User();
$heap->attach($entity1, $node1, [self::INDEX_FIELDS_1]);
$heap->attach($entity2, $node2, [self::INDEX_FIELDS_1]);

$this->assertSame($entity2, $heap->find('user', self::INDEX_FIND_1_2), 'Found');
$this->assertNull($heap->find('user', self::INDEX_FIND_1_BAD), 'Not found');
$this->assertNull($heap->find('user', []), 'Empty scope');
}

public function testDetach(): void
{
$heap = $this->createHeap();
$node1 = new Node(Node::NEW, self::INDEX_FIND_1_1, 'user');
$entity1 = new User();
$node2 = new Node(Node::NEW, self::INDEX_FIND_1_2, 'user');
$entity2 = new User();
$heap->attach($entity1, $node1, [self::INDEX_FIELDS_1]);
$heap->attach($entity2, $node2, [self::INDEX_FIELDS_1]);

$this->assertSame($entity1, $heap->find('user', self::INDEX_FIND_1_1), 'Found');
$this->assertSame($entity2, $heap->find('user', self::INDEX_FIND_1_2), 'Found');
$this->assertTrue($heap->has($entity1));
$this->assertTrue($heap->has($entity2));

# Now detach it
$heap->detach($entity1);
$heap->detach($entity2);

$this->assertNull($heap->find('user', self::INDEX_FIND_1_1));
$this->assertNull($heap->find('user', self::INDEX_FIND_1_2));
$this->assertFalse($heap->has($entity1));
$this->assertFalse($heap->has($entity2));
}

public function testGet(): void
{
$heap = $this->createHeap();
$node1 = new Node(Node::NEW, ['email' => 'test1'], 'user');
$entity1 = new User();
$node2 = new Node(Node::NEW, self::ENTITY_SET_2, 'user');
$entity2 = new User();
$heap->attach($entity1, $node1, [self::INDEX_FIELDS_1]);
$heap->attach($entity2, $node2, [self::INDEX_FIELDS_1]);

$this->assertSame($node1, $heap->get($entity1));
$this->assertSame($node2, $heap->get($entity2));
}

public function testClean(): void
{
$heap = $this->createHeap();
$node1 = new Node(Node::NEW, ['email' => 'test1'], 'user');
$entity1 = new User();
$node2 = new Node(Node::NEW, self::ENTITY_SET_1, 'user');
$entity2 = new User();
$heap->attach($entity1, $node1, [self::INDEX_FIELDS_1]);
$heap->attach($entity2, $node2, [self::INDEX_FIELDS_1]);

# Now detach it
$heap->clean();

$this->assertNull($heap->find('user', self::INDEX_FIND_1_1));
$this->assertFalse($heap->has($entity1));
$this->assertFalse($heap->has($entity2));

$count = 0;
foreach ($heap as $value) {
++$count;
}
$this->assertSame(0, $count);
}

public function testSyncWhenIndexedValueChanged(): void
{
$heap = $this->createHeap();
$node = new Node(Node::NEW, self::ENTITY_SET_1, 'user');
$entity = new User();
$heap->attach($entity, $node, [self::INDEX_FIELDS_1]);

foreach (self::ENTITY_SET_2 as $key => $value) {
$node->getState()->register($key, $value);
}
$heap->attach($entity, $node, [self::INDEX_FIELDS_1]);

$this->assertNull($heap->find('user', self::INDEX_FIND_1_1));
$this->assertSame($entity, $heap->find('user', self::INDEX_FIND_1_2));
}

public function testSyncWhenEntitiesPKSwitch(): void
{
$heap = $this->createHeap();
$node1 = new Node(Node::NEW, self::ENTITY_SET_1, 'user');
$entity1 = new User();
$node2 = new Node(Node::NEW, self::ENTITY_SET_2, 'user');
$entity2 = new User();
$heap->attach($entity1, $node1, [self::INDEX_FIELDS_1]);
$heap->attach($entity2, $node2, [self::INDEX_FIELDS_1]);

foreach (self::ENTITY_SET_2 as $key => $value) {
$node1->getState()->register($key, $value);
}
foreach (self::ENTITY_SET_1 as $key => $value) {
$node2->getState()->register($key, $value);
}
$heap->attach($entity1, $node1, [self::INDEX_FIELDS_1]);
$heap->attach($entity2, $node2, [self::INDEX_FIELDS_1]);

$this->assertSame($entity1, $heap->find('user', self::INDEX_FIND_1_2));
$this->assertSame($entity2, $heap->find('user', self::INDEX_FIND_1_1));
}

public function testOverwriteEntity(): void
{
$heap = $this->createHeap();
$node1 = new Node(Node::NEW, self::ENTITY_SET_1, 'user');
$entity1 = new User();
$heap->attach($entity1, $node1, [self::INDEX_FIELDS_1]);

$node2 = new Node(Node::NEW, self::ENTITY_SET_1, 'user');
$entity2 = new User();
$heap->attach($entity2, $node2, [self::INDEX_FIELDS_1]);

$this->assertSame($entity2, $heap->find('user', self::INDEX_FIND_1_1));
}

public function testAttachWithNewIndex(): void
{
$heap = $this->createHeap();
$node = new Node(Node::NEW, self::ENTITY_SET_1, 'user');
$entity = new User();
$heap->attach($entity, $node, [self::INDEX_FIELDS_1]);

$this->assertNull($heap->find('user', self::INDEX_FIND_2_1));

$heap->attach($entity, $node, [self::INDEX_FIELDS_2]);

$this->assertSame($entity, $heap->find('user', self::INDEX_FIND_2_1));
// old index was not deleted
$this->assertSame($entity, $heap->find('user', self::INDEX_FIND_1_1));
}

public function testSyncWhenIndexAndValuesChanged(): void
{
$heap = $this->createHeap();
$node = new Node(Node::NEW, self::ENTITY_SET_1, 'user');
$entity = new User();
$heap->attach($entity, $node, [self::INDEX_FIELDS_1]);

foreach (self::ENTITY_SET_2 as $key => $value) {
$node->getState()->register($key, $value);
}
$heap->attach($entity, $node, [self::INDEX_FIELDS_2]);

$this->assertNull($heap->find('user', self::INDEX_FIND_1_1));
$this->assertNull($heap->find('user', self::INDEX_FIND_1_2));
$this->assertNull($heap->find('user', self::INDEX_FIND_2_1));
$this->assertSame($entity, $heap->find('user', self::INDEX_FIND_2_2));
}

protected function createHeap(): HeapInterface
{
return new Heap();
}
}
1 change: 0 additions & 1 deletion tests/ORM/RenamedPKTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ public function testChangePK(): void
$this->save($u);
$this->assertNumWrites(1);

$this->orm = $this->orm->withHeap(new Heap());
$data = (new Select($this->orm, Identity::class))
->fetchAll();

Expand Down

0 comments on commit 7cfd773

Please sign in to comment.