From 697908ad115c089564573aabca2f990c1e4cdf95 Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Fri, 8 Sep 2023 19:36:04 +0300 Subject: [PATCH] Allow isolating shard placement groups on individual nodes --- .../commands/create_distributed_table.c | 2 +- .../distributed/metadata/metadata_sync.c | 36 +- .../distributed/metadata/metadata_utility.c | 212 ++++++-- .../distributed/metadata/node_metadata.c | 2 - .../distributed/operations/create_shards.c | 12 +- .../rebalancer_placement_isolation.c | 473 ++++++++++++++++++ .../replicate_none_dist_table_shard.c | 31 +- .../distributed/operations/shard_rebalancer.c | 94 +++- .../distributed/operations/shard_split.c | 5 +- .../distributed/operations/shard_transfer.c | 56 ++- .../distributed/operations/stage_protocol.c | 23 +- .../operations/worker_node_manager.c | 20 +- .../distributed/sql/citus--12.0-1--12.1-1.sql | 13 + .../sql/downgrades/citus--12.1-1--12.0-1.sql | 10 + .../12.1-1.sql | 24 + .../latest.sql | 14 +- .../distributed/test/shard_rebalancer.c | 11 +- .../distributed/utils/colocation_utils.c | 2 +- src/include/distributed/metadata_sync.h | 13 +- src/include/distributed/metadata_utility.h | 23 +- src/include/distributed/pg_dist_placement.h | 4 +- .../rebalancer_placement_isolation.h | 37 ++ src/include/distributed/shard_rebalancer.h | 6 +- src/include/distributed/worker_manager.h | 5 +- src/test/regress/citus_tests/run_test.py | 7 + .../expected/metadata_sync_helpers.out | 20 +- src/test/regress/expected/multi_extension.out | 20 +- .../expected/multi_metadata_attributes.out | 3 +- .../regress/expected/shard_rebalancer.out | 20 +- .../expected/upgrade_list_citus_objects.out | 2 +- .../regress/sql/metadata_sync_helpers.sql | 18 +- .../regress/sql/multi_metadata_attributes.sql | 3 +- 32 files changed, 1068 insertions(+), 153 deletions(-) create mode 100644 src/backend/distributed/operations/rebalancer_placement_isolation.c create mode 100644 src/backend/distributed/sql/udfs/citus_internal_add_placement_metadata/12.1-1.sql create mode 100644 src/include/distributed/rebalancer_placement_isolation.h diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index dc06692b366..1787402fe06 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -558,7 +558,7 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName, colocatedTableId = ColocatedTableId(colocationId); } - List *workerNodeList = DistributedTablePlacementNodeList(NoLock); + List *workerNodeList = NewDistributedTablePlacementNodeList(NoLock); if (workerNodeList == NIL) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 40bdae0eaf2..63540a708b0 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -1415,12 +1415,13 @@ ColocationIdUpdateCommand(Oid relationId, uint32 colocationId) */ char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, - uint64 shardLength, int32 groupId) + uint64 shardLength, int32 groupId, + bool needsIsolatedNode) { StringInfo command = makeStringInfo(); appendStringInfo(command, UPSERT_PLACEMENT, shardId, shardLength, - groupId, placementId); + groupId, placementId, needsIsolatedNode ? "true" : "false"); return command->data; } @@ -3444,16 +3445,18 @@ citus_internal_add_placement_metadata(PG_FUNCTION_ARGS) int64 shardLength = PG_GETARG_INT64(1); int32 groupId = PG_GETARG_INT32(2); int64 placementId = PG_GETARG_INT64(3); + bool needsIsolatedNode = PG_GETARG_BOOL(4); citus_internal_add_placement_metadata_internal(shardId, shardLength, - groupId, placementId); + groupId, placementId, + needsIsolatedNode); PG_RETURN_VOID(); } /* - * citus_internal_add_placement_metadata is an internal UDF to + * citus_internal_delete_placement_metadata is an internal UDF to * delete a row from pg_dist_placement. */ Datum @@ -3483,12 +3486,15 @@ citus_internal_add_placement_metadata_legacy(PG_FUNCTION_ARGS) CheckCitusVersion(ERROR); int64 shardId = PG_GETARG_INT64(0); - int64 shardLength = PG_GETARG_INT64(2); - int32 groupId = PG_GETARG_INT32(3); - int64 placementId = PG_GETARG_INT64(4); + int64 shardLength = PG_GETARG_INT64(1); + int32 groupId = PG_GETARG_INT32(2); + int64 placementId = PG_GETARG_INT64(3); + bool needsIsolatedNode = false; citus_internal_add_placement_metadata_internal(shardId, shardLength, - groupId, placementId); + groupId, placementId, + needsIsolatedNode); + PG_RETURN_VOID(); } @@ -3499,7 +3505,8 @@ citus_internal_add_placement_metadata_legacy(PG_FUNCTION_ARGS) */ void citus_internal_add_placement_metadata_internal(int64 shardId, int64 shardLength, - int32 groupId, int64 placementId) + int32 groupId, int64 placementId, + bool needsIsolatedNode) { bool missingOk = false; Oid relationId = LookupShardRelationFromCatalog(shardId, missingOk); @@ -3524,7 +3531,8 @@ citus_internal_add_placement_metadata_internal(int64 shardId, int64 shardLength, shardLength, groupId); } - InsertShardPlacementRow(shardId, placementId, shardLength, groupId); + InsertShardPlacementRow(shardId, placementId, shardLength, groupId, + needsIsolatedNode); } @@ -4100,12 +4108,14 @@ UpdateNoneDistTableMetadataCommand(Oid relationId, char replicationModel, */ char * AddPlacementMetadataCommand(uint64 shardId, uint64 placementId, - uint64 shardLength, int32 groupId) + uint64 shardLength, int32 groupId, + bool needsIsolatedNode) { StringInfo command = makeStringInfo(); appendStringInfo(command, - "SELECT citus_internal_add_placement_metadata(%ld, %ld, %d, %ld)", - shardId, shardLength, groupId, placementId); + "SELECT citus_internal_add_placement_metadata(%ld, %ld, %d, %ld, %s)", + shardId, shardLength, groupId, placementId, + needsIsolatedNode ? "true" : "false"); return command->data; } diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index ae0f6589a2b..c2f217769b9 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -1344,6 +1344,108 @@ ShardLength(uint64 shardId) } +/* + * NodeGroupIsolatesAShardPlacementGroup returns true if given node group is + * used to isolate a shard placement group. This means that the node group + * has a shard placement group that needs an isolated node and no other shard + * placements. + */ +bool +NodeGroupIsolatesAShardPlacementGroup(int32 groupId) +{ + ShardPlacementGroup *isolatedShardPlacementGroup = NULL; + + bool indexOK = false; + int scanKeyCount = 1; + ScanKeyData scanKey[1]; + + Relation pgDistPlacement = table_open(DistPlacementRelationId(), + AccessShareLock); + + ScanKeyInit(&scanKey[0], Anum_pg_dist_placement_groupid, + BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(groupId)); + SysScanDesc scanDescriptor = systable_beginscan(pgDistPlacement, + DistPlacementGroupidIndexId(), + indexOK, + NULL, scanKeyCount, scanKey); + + HeapTuple heapTuple = systable_getnext(scanDescriptor); + while (HeapTupleIsValid(heapTuple)) + { + TupleDesc tupleDescriptor = RelationGetDescr(pgDistPlacement); + + GroupShardPlacement *placement = + TupleToGroupShardPlacement(tupleDescriptor, heapTuple); + + ShardPlacementGroup *shardPlacementGroup = + GetShardPlacementGroupForPlacement(placement->shardId, + placement->placementId); + + if (isolatedShardPlacementGroup) + { + if (ShardPlacementGroupCompare(shardPlacementGroup, + isolatedShardPlacementGroup) != 0) + { + isolatedShardPlacementGroup = NULL; + break; + } + } + else if (placement->needsIsolatedNode) + { + isolatedShardPlacementGroup = shardPlacementGroup; + } + + heapTuple = systable_getnext(scanDescriptor); + } + + systable_endscan(scanDescriptor); + table_close(pgDistPlacement, NoLock); + + return isolatedShardPlacementGroup != NULL; +} + + +/* + * ShardPlacementGroupCompare is a comparator function for + * ShardPlacementGroup. + */ +int +ShardPlacementGroupCompare(const void *left, const void *right) +{ + const ShardPlacementGroup *leftGroup = left; + const ShardPlacementGroup *rightGroup = right; + + if (leftGroup->colocatationId < rightGroup->colocatationId) + { + return -1; + } + else if (leftGroup->colocatationId > rightGroup->colocatationId) + { + return 1; + } + + if (leftGroup->shardIntervalIndex < rightGroup->shardIntervalIndex) + { + return -1; + } + else if (leftGroup->shardIntervalIndex > rightGroup->shardIntervalIndex) + { + return 1; + } + + if (leftGroup->nodeGroupId < rightGroup->nodeGroupId) + { + return -1; + } + else if (leftGroup->nodeGroupId > rightGroup->nodeGroupId) + { + return 1; + } + + return 0; +} + + /* * NodeGroupHasShardPlacements returns whether any active shards are placed on the group */ @@ -1376,6 +1478,25 @@ NodeGroupHasShardPlacements(int32 groupId) } +/* + * GetShardPlacementGroupForPlacement returns ShardPlacementGroup that placement + * with given shardId & placementId belongs to. + */ +ShardPlacementGroup * +GetShardPlacementGroupForPlacement(uint64 shardId, uint64 placementId) +{ + ShardPlacement *shardPlacement = LoadShardPlacement(shardId, placementId); + ShardInterval *shardInterval = LoadShardInterval(shardId); + + ShardPlacementGroup *placementGroup = palloc(sizeof(ShardPlacementGroup)); + placementGroup->colocatationId = shardPlacement->colocationGroupId; + placementGroup->shardIntervalIndex = shardInterval->shardIndex; + placementGroup->nodeGroupId = shardPlacement->groupId; + + return placementGroup; +} + + /* * IsActiveShardPlacement checks if the shard placement is labelled as * active, and that it is placed in an active worker. @@ -1447,30 +1568,6 @@ FilterShardPlacementList(List *shardPlacementList, bool (*filter)(ShardPlacement } -/* - * FilterActiveShardPlacementListByNode filters a list of active shard placements based on given nodeName and nodePort. - */ -List * -FilterActiveShardPlacementListByNode(List *shardPlacementList, WorkerNode *workerNode) -{ - List *activeShardPlacementList = FilterShardPlacementList(shardPlacementList, - IsActiveShardPlacement); - List *filteredShardPlacementList = NIL; - ShardPlacement *shardPlacement = NULL; - - foreach_ptr(shardPlacement, activeShardPlacementList) - { - if (IsPlacementOnWorkerNode(shardPlacement, workerNode)) - { - filteredShardPlacementList = lappend(filteredShardPlacementList, - shardPlacement); - } - } - - return filteredShardPlacementList; -} - - /* * ActiveShardPlacementListOnGroup returns a list of active shard placements * that are sitting on group with groupId for given shardId. @@ -1664,6 +1761,49 @@ AllShardPlacementsOnNodeGroup(int32 groupId) } +/* + * AllGroupShardPlacementsThatNeedIsolatedNode returns a list of group shard + * placements that need an isolated node. + */ +List * +AllGroupShardPlacementsThatNeedIsolatedNode(void) +{ + List *groupShardPlacementList = NIL; + + Relation pgDistPlacement = table_open(DistPlacementRelationId(), AccessShareLock); + + int scanKeyCount = 0; + ScanKeyData *scanKey = NULL; + bool indexOK = false; + Oid indexId = InvalidOid; + SysScanDesc scanDescriptor = systable_beginscan(pgDistPlacement, + indexId, indexOK, NULL, + scanKeyCount, scanKey); + + HeapTuple heapTuple = systable_getnext(scanDescriptor); + while (HeapTupleIsValid(heapTuple)) + { + TupleDesc tupleDescriptor = RelationGetDescr(pgDistPlacement); + + GroupShardPlacement *groupShardPlacement = + TupleToGroupShardPlacement(tupleDescriptor, heapTuple); + + if (groupShardPlacement->needsIsolatedNode) + { + groupShardPlacementList = lappend(groupShardPlacementList, + groupShardPlacement); + } + + heapTuple = systable_getnext(scanDescriptor); + } + + systable_endscan(scanDescriptor); + table_close(pgDistPlacement, NoLock); + + return groupShardPlacementList; +} + + /* * TupleToGroupShardPlacement takes in a heap tuple from pg_dist_placement, * and converts this tuple to in-memory struct. The function assumes the @@ -1674,9 +1814,10 @@ TupleToGroupShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple) { bool isNullArray[Natts_pg_dist_placement]; Datum datumArray[Natts_pg_dist_placement]; + memset(datumArray, 0, sizeof(datumArray)); + memset(isNullArray, false, sizeof(isNullArray)); - if (HeapTupleHeaderGetNatts(heapTuple->t_data) != Natts_pg_dist_placement || - HeapTupleHasNulls(heapTuple)) + if (HeapTupleHasNulls(heapTuple)) { ereport(ERROR, (errmsg("unexpected null in pg_dist_placement tuple"))); } @@ -1696,6 +1837,8 @@ TupleToGroupShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple) datumArray[Anum_pg_dist_placement_shardlength - 1]); shardPlacement->groupId = DatumGetInt32( datumArray[Anum_pg_dist_placement_groupid - 1]); + shardPlacement->needsIsolatedNode = DatumGetBool( + datumArray[Anum_pg_dist_placement_needsisolatednode - 1]); return shardPlacement; } @@ -1800,12 +1943,15 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType, */ ShardPlacement * InsertShardPlacementRowGlobally(uint64 shardId, uint64 placementId, - uint64 shardLength, int32 groupId) + uint64 shardLength, int32 groupId, + bool needsIsolatedNode) { - InsertShardPlacementRow(shardId, placementId, shardLength, groupId); + InsertShardPlacementRow(shardId, placementId, shardLength, groupId, + needsIsolatedNode); char *insertPlacementCommand = - AddPlacementMetadataCommand(shardId, placementId, shardLength, groupId); + AddPlacementMetadataCommand(shardId, placementId, shardLength, groupId, + needsIsolatedNode); SendCommandToWorkersWithMetadata(insertPlacementCommand); return LoadShardPlacement(shardId, placementId); @@ -1820,7 +1966,8 @@ InsertShardPlacementRowGlobally(uint64 shardId, uint64 placementId, */ uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId, - uint64 shardLength, int32 groupId) + uint64 shardLength, int32 groupId, + bool needsIsolatedNode) { Datum values[Natts_pg_dist_placement]; bool isNulls[Natts_pg_dist_placement]; @@ -1838,6 +1985,8 @@ InsertShardPlacementRow(uint64 shardId, uint64 placementId, values[Anum_pg_dist_placement_shardstate - 1] = Int32GetDatum(1); values[Anum_pg_dist_placement_shardlength - 1] = Int64GetDatum(shardLength); values[Anum_pg_dist_placement_groupid - 1] = Int32GetDatum(groupId); + values[Anum_pg_dist_placement_needsisolatednode - 1] = + BoolGetDatum(needsIsolatedNode); /* open shard placement relation and insert new tuple */ Relation pgDistPlacement = table_open(DistPlacementRelationId(), RowExclusiveLock); @@ -2075,8 +2224,7 @@ DeleteShardPlacementRow(uint64 placementId) uint64 shardId = heap_getattr(heapTuple, Anum_pg_dist_placement_shardid, tupleDescriptor, &isNull); - if (HeapTupleHeaderGetNatts(heapTuple->t_data) != Natts_pg_dist_placement || - HeapTupleHasNulls(heapTuple)) + if (HeapTupleHasNulls(heapTuple)) { ereport(ERROR, (errmsg("unexpected null in pg_dist_placement tuple"))); } diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index a73f2e9d2cb..78b5a5d1788 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -61,8 +61,6 @@ #include "utils/rel.h" #include "utils/relcache.h" -#define INVALID_GROUP_ID -1 - /* default group size */ int GroupSize = 1; diff --git a/src/backend/distributed/operations/create_shards.c b/src/backend/distributed/operations/create_shards.c index d0fcc961256..51881e30c44 100644 --- a/src/backend/distributed/operations/create_shards.c +++ b/src/backend/distributed/operations/create_shards.c @@ -145,7 +145,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, LockRelationOid(DistNodeRelationId(), RowShareLock); /* load and sort the worker node list for deterministic placement */ - List *workerNodeList = DistributedTablePlacementNodeList(NoLock); + List *workerNodeList = NewDistributedTablePlacementNodeList(NoLock); workerNodeList = SortList(workerNodeList, CompareWorkerNodes); int32 workerNodeCount = list_length(workerNodeList); @@ -296,10 +296,12 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool int32 groupId = sourcePlacement->groupId; const uint64 shardSize = 0; + bool needsIsolatedNode = false; InsertShardPlacementRow(*newShardIdPtr, INVALID_PLACEMENT_ID, shardSize, - groupId); + groupId, + needsIsolatedNode); } } @@ -416,7 +418,7 @@ CreateSingleShardTableShardWithRoundRobinPolicy(Oid relationId, uint32 colocatio * Also take a RowShareLock on pg_dist_node to disallow concurrent * node list changes that require an exclusive lock. */ - List *workerNodeList = DistributedTablePlacementNodeList(RowShareLock); + List *workerNodeList = NewDistributedTablePlacementNodeList(RowShareLock); workerNodeList = SortList(workerNodeList, CompareWorkerNodes); int roundRobinNodeIdx = @@ -459,12 +461,12 @@ CreateSingleShardTableShardWithRoundRobinPolicy(Oid relationId, uint32 colocatio * group" should be placed on. * * This is determined by modulo of the colocation id by the length of the - * list returned by DistributedTablePlacementNodeList(). + * list returned by NewDistributedTablePlacementNodeList(). */ int EmptySingleShardTableColocationDecideNodeId(uint32 colocationId) { - List *workerNodeList = DistributedTablePlacementNodeList(RowShareLock); + List *workerNodeList = NewDistributedTablePlacementNodeList(RowShareLock); int32 workerNodeCount = list_length(workerNodeList); if (workerNodeCount == 0) { diff --git a/src/backend/distributed/operations/rebalancer_placement_isolation.c b/src/backend/distributed/operations/rebalancer_placement_isolation.c new file mode 100644 index 00000000000..f929cd2869d --- /dev/null +++ b/src/backend/distributed/operations/rebalancer_placement_isolation.c @@ -0,0 +1,473 @@ +/*------------------------------------------------------------------------- + * + * rebalancer_placement_isolation.c + * Routines to determine which worker node should be used to isolate + * a colocated set of shard placements that needs isolation. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "nodes/pg_list.h" +#include "utils/hsearch.h" +#include "utils/lsyscache.h" + +#include "distributed/colocation_utils.h" +#include "distributed/listutils.h" +#include "distributed/metadata_cache.h" +#include "distributed/metadata_utility.h" +#include "distributed/rebalancer_placement_isolation.h" +#include "distributed/shard_rebalancer.h" + + +struct RebalancerPlacementIsolationContext +{ + HTAB *placementGroupNodeHash; + HTAB *nodePlacementGroupHash; +}; + + +/* + * Entry of the hash table that maps each shard placement group to a primary + * worker node that is determined to be used to isolate that shard placement. + */ +typedef struct +{ + /* hash key -- shard placement group */ + ShardPlacementGroup placementGroup; + + /* + * Group id of the primary worker node that is assigned to isolate this + * shard placement. + * + * INVALID_GROUP_ID if no node is assigned yet. + */ + int32 assignedNodeGroupId; +} PlacementGroupNodeHashEntry; + +/* + * Routines to prepare a hash table where each entry is of type + * PlacementGroupNodeHashEntry. + */ +static HTAB * CreatePlacementGroupNodeHash(int + groupShardPlacementsThatNeedIsolatedNodeCount); +static void PlacementGroupNodeHashInit(HTAB *placementGroupNodeHash, + List *groupShardPlacementsThatNeedIsolatedNode); +static void PlacementGroupNodeHashAssignNodes(HTAB *placementGroupNodeHash, + HTAB *nodePlacementGroupHash, + List *workerNodeList); +static bool PlacementGroupNodeHashEntryAssignNode( + PlacementGroupNodeHashEntry *placementGroupNodeHashEntry, + HTAB *nodePlacementGroupHash, + int32 nodeGroupId); +static List * PlacementGroupNodeHashGetSortedEntryList(HTAB *placementGroupNodeHash); +static int PlacementGroupNodeHashEntryCompare(const void *left, const void *right); + + +/* + * Entry of the hash table that maps each primary worker node to a shard + * placement group that is determined to be isolated on that node. + */ +typedef struct +{ + /* hash key -- group id of the node */ + int32 nodeGroupId; + + /* + * Whether given node is allowed to have any shards. + * + * This is not just WorkerNode->shouldHaveShards but also takes into account + * whether the node is being drained. + */ + bool shouldHaveShards; + + /* + * Shard placement group that is assigned to this node to be isolated. + * + * NULL if no shard placement group is assigned yet. + */ + ShardPlacementGroup *assignedPlacementGroup; +} NodePlacementGroupHashEntry; + +/* + * Routines to prepare a hash table where each entry is of type + * NodePlacementGroupHashEntry. + */ +static HTAB * CreateNodePlacementGroupHash(int workerCount); +static void NodePlacementGroupHashInit(HTAB *nodePlacementGroupHash, + List *workerNodeList, + WorkerNode *drainWorkerNode); +static NodePlacementGroupHashEntry * NodePlacementGroupHashGetNodeWithGroupId( + HTAB *nodePlacementGroupHash, + int32 + nodeGroupId); + + +/* other helpers */ +static int WorkerNodeListGetNodeWithGroupId(List *workerNodeList, int32 nodeGroupId); + + +/* + * PrepareRebalancerPlacementIsolationContext creates RebalancerPlacementIsolationContext + * that stores two hash tables to keep track of which worker nodes are used to + * isolate which shard placement groups that need isolated node and vice versa. + */ +RebalancerPlacementIsolationContext * +PrepareRebalancerPlacementIsolationContext(List *activeWorkerNodeList, + WorkerNode *drainWorkerNode) +{ + HTAB *nodePlacementGroupHash = + CreateNodePlacementGroupHash(list_length(activeWorkerNodeList)); + NodePlacementGroupHashInit(nodePlacementGroupHash, activeWorkerNodeList, + drainWorkerNode); + + List *groupShardPlacementsThatNeedIsolatedNode = + AllGroupShardPlacementsThatNeedIsolatedNode(); + HTAB *placementGroupNodeHash = CreatePlacementGroupNodeHash( + list_length(groupShardPlacementsThatNeedIsolatedNode) + ); + PlacementGroupNodeHashInit(placementGroupNodeHash, + groupShardPlacementsThatNeedIsolatedNode); + + PlacementGroupNodeHashAssignNodes(placementGroupNodeHash, + nodePlacementGroupHash, + activeWorkerNodeList); + + RebalancerPlacementIsolationContext *context = + palloc(sizeof(RebalancerPlacementIsolationContext)); + context->placementGroupNodeHash = placementGroupNodeHash; + context->nodePlacementGroupHash = nodePlacementGroupHash; + + return context; +} + + +/* + * CreatePlacementGroupNodeHash creates a hash table where each entry + * is of type PlacementGroupNodeHashEntry. + */ +static HTAB * +CreatePlacementGroupNodeHash(int groupShardPlacementsThatNeedIsolatedNodeCount) +{ + HASHCTL info; + memset(&info, 0, sizeof(info)); + + info.keysize = sizeof(ShardPlacementGroup); + info.entrysize = sizeof(PlacementGroupNodeHashEntry); + info.hcxt = CurrentMemoryContext; + int hashFlags = (HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + + return hash_create("PlacementGroupNode Hash", + groupShardPlacementsThatNeedIsolatedNodeCount, + &info, hashFlags); +} + + +/* + * PlacementGroupNodeHashInit initializes given hash table where each + * entry is of type PlacementGroupNodeHashEntry by using given list + * of group shard placement that are known to need an isolated node. + */ +static void +PlacementGroupNodeHashInit(HTAB *placementGroupNodeHash, + List *groupShardPlacementsThatNeedIsolatedNode) +{ + GroupShardPlacement *groupShardPlacement = NULL; + foreach_ptr(groupShardPlacement, groupShardPlacementsThatNeedIsolatedNode) + { + ShardPlacementGroup *placementGroup = + GetShardPlacementGroupForPlacement(groupShardPlacement->shardId, + groupShardPlacement->placementId); + + PlacementGroupNodeHashEntry *hashEntry = + hash_search(placementGroupNodeHash, placementGroup, HASH_ENTER, NULL); + + hashEntry->assignedNodeGroupId = INVALID_GROUP_ID; + } +} + + +/* + * PlacementGroupNodeHashAssignNodes assigns all active shard placements in + * the cluster that need to be isolated to individual worker nodes. + */ +static void +PlacementGroupNodeHashAssignNodes(HTAB *placementGroupNodeHash, + HTAB *nodePlacementGroupHash, + List *workerNodeList) +{ + List *availableWorkerList = list_copy(workerNodeList); + List *unassignedPlacementGroups = NIL; + + /* + * Assign as much as possible shard placement groups to worker nodes where + * they are stored already. + */ + PlacementGroupNodeHashEntry *placementGroupNodeHashEntry = NULL; + List *sortedPlacementGroupNodeHashEntryList = + PlacementGroupNodeHashGetSortedEntryList(placementGroupNodeHash); + foreach_ptr(placementGroupNodeHashEntry, sortedPlacementGroupNodeHashEntryList) + { + int32 assignGroupId = placementGroupNodeHashEntry->placementGroup.nodeGroupId; + if (PlacementGroupNodeHashEntryAssignNode(placementGroupNodeHashEntry, + nodePlacementGroupHash, + assignGroupId)) + { + int currentPlacementNodeIdx = + WorkerNodeListGetNodeWithGroupId(availableWorkerList, + assignGroupId); + availableWorkerList = list_delete_nth_cell(availableWorkerList, + currentPlacementNodeIdx); + } + else + { + unassignedPlacementGroups = + lappend(unassignedPlacementGroups, placementGroupNodeHashEntry); + } + } + + /* + * For the shard placement groups that could not be assigned to their + * current node, assign them to any other node that is available. + */ + int availableNodeIdx = 0; + PlacementGroupNodeHashEntry *unassignedPlacementGroupHashEntry = NULL; + foreach_ptr(unassignedPlacementGroupHashEntry, unassignedPlacementGroups) + { + bool isolated = false; + while (!isolated && availableNodeIdx < list_length(availableWorkerList)) + { + WorkerNode *availableWorkerNode = + (WorkerNode *) list_nth(availableWorkerList, availableNodeIdx); + availableNodeIdx++; + + if (PlacementGroupNodeHashEntryAssignNode(unassignedPlacementGroupHashEntry, + nodePlacementGroupHash, + availableWorkerNode->groupId)) + { + isolated = true; + break; + } + } + + if (!isolated) + { + ereport(WARNING, (errmsg("could not isolate all shard placements " + "that need isolation"))); + return; + } + } +} + + +/* + * PlacementGroupNodeHashAssignNode is an helper to + * PlacementGroupNodeHashAssignNodes that tries to assign given + * shard placement to given node and returns true if it succeeds. + */ +static bool +PlacementGroupNodeHashEntryAssignNode( + PlacementGroupNodeHashEntry *placementGroupNodeHashEntry, + HTAB *nodePlacementGroupHash, + int32 nodeGroupId) +{ + NodePlacementGroupHashEntry *nodePlacementGroupHashEntry = + NodePlacementGroupHashGetNodeWithGroupId(nodePlacementGroupHash, nodeGroupId); + + if (nodePlacementGroupHashEntry->assignedPlacementGroup) + { + return false; + } + + if (!nodePlacementGroupHashEntry->shouldHaveShards) + { + return false; + } + + nodePlacementGroupHashEntry->assignedPlacementGroup = + palloc(sizeof(ShardPlacementGroup)); + *nodePlacementGroupHashEntry->assignedPlacementGroup = + placementGroupNodeHashEntry->placementGroup; + + placementGroupNodeHashEntry->assignedNodeGroupId = nodeGroupId; + + return true; +} + + +/* + * PlacementGroupNodeHashGetSortedEntryList scans given PlacementGroupNodeHash + * returns a list hash entries sorted by colocatationId, shardIntervalIndex and + * nodeGroupId. + */ +static List * +PlacementGroupNodeHashGetSortedEntryList(HTAB *placementGroupNodeHash) +{ + List *entryList = NIL; + + HASH_SEQ_STATUS status; + PlacementGroupNodeHashEntry *placementGroupNodeHashEntry = NULL; + hash_seq_init(&status, placementGroupNodeHash); + while ((placementGroupNodeHashEntry = hash_seq_search(&status)) != 0) + { + entryList = lappend(entryList, placementGroupNodeHashEntry); + } + + SortList(entryList, PlacementGroupNodeHashEntryCompare); + return entryList; +} + + +/* + * PlacementGroupNodeHashEntryCompare is a comparator function for + * PlacementGroupNodeHashEntry. + */ +static int +PlacementGroupNodeHashEntryCompare(const void *left, const void *right) +{ + PlacementGroupNodeHashEntry *leftEntry = (PlacementGroupNodeHashEntry *) left; + PlacementGroupNodeHashEntry *rightEntry = (PlacementGroupNodeHashEntry *) right; + return ShardPlacementGroupCompare(&leftEntry->placementGroup, + &rightEntry->placementGroup); +} + + +/* + * RebalancerPlacementIsolationContextPlacementIsAllowedOnWorker returns true + * if shard placement with given shardId & placementId is allowed to be stored + * on given worker node. + */ +bool +RebalancerPlacementIsolationContextPlacementIsAllowedOnWorker( + RebalancerPlacementIsolationContext *context, + uint64 shardId, + uint64 placementId, + WorkerNode *workerNode) +{ + ShardPlacementGroup *placementGroup = + GetShardPlacementGroupForPlacement(shardId, placementId); + + HTAB *placementGroupNodeHash = context->placementGroupNodeHash; + PlacementGroupNodeHashEntry *placementGroupNodeHashEntry = + hash_search(placementGroupNodeHash, placementGroup, HASH_FIND, NULL); + if (placementGroupNodeHashEntry == NULL) + { + /* + * It doesn't need an isolated node, but is the node used to isolate + * a shard placement group? If so, we cannot store it on this node. + */ + HTAB *nodePlacementGroupHash = context->nodePlacementGroupHash; + NodePlacementGroupHashEntry *nodePlacementGroupHashEntry = + NodePlacementGroupHashGetNodeWithGroupId(nodePlacementGroupHash, + workerNode->groupId); + return nodePlacementGroupHashEntry->shouldHaveShards && + nodePlacementGroupHashEntry->assignedPlacementGroup == NULL; + } + + /* + * Given shard placement needs an isolated node. + * Check if given worker node is the one that is assigned to isolate it. + */ + return placementGroupNodeHashEntry->assignedNodeGroupId == workerNode->groupId; +} + + +/* + * CreateNodePlacementGroupHash creates a hash table where each entry + * is of type NodePlacementGroupHashEntry. + */ +static HTAB * +CreateNodePlacementGroupHash(int workerCount) +{ + HASHCTL info; + memset(&info, 0, sizeof(info)); + + info.keysize = sizeof(uint32); + info.entrysize = sizeof(NodePlacementGroupHashEntry); + info.hcxt = CurrentMemoryContext; + int hashFlags = (HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + + return hash_create("NodePlacementGroupHash Hash", + workerCount, &info, hashFlags); +} + + +/* + * NodePlacementGroupHashInit initializes given hash table where each + * entry is of type NodePlacementGroupHashEntry by using given list + * of worker nodes and the worker node that is being drained, if specified. + */ +static void +NodePlacementGroupHashInit(HTAB *nodePlacementGroupHash, List *workerNodeList, + WorkerNode *drainWorkerNode) +{ + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, workerNodeList) + { + NodePlacementGroupHashEntry *nodePlacementGroupHashEntry = + hash_search(nodePlacementGroupHash, &workerNode->groupId, HASH_ENTER, + NULL); + + nodePlacementGroupHashEntry->nodeGroupId = workerNode->groupId; + + if (drainWorkerNode && drainWorkerNode->groupId == workerNode->groupId) + { + nodePlacementGroupHashEntry->shouldHaveShards = false; + } + else + { + nodePlacementGroupHashEntry->shouldHaveShards = + workerNode->shouldHaveShards; + } + + nodePlacementGroupHashEntry->assignedPlacementGroup = NULL; + } +} + + +/* + * NodePlacementGroupHashGetNodeWithGroupId searches given hash table for + * NodePlacementGroupHashEntry with given node id and returns it. + * + * Throws an error if no such entry is found. + */ +static NodePlacementGroupHashEntry * +NodePlacementGroupHashGetNodeWithGroupId(HTAB *nodePlacementGroupHash, int32 nodeGroupId) +{ + NodePlacementGroupHashEntry *nodePlacementGroupHashEntry = + hash_search(nodePlacementGroupHash, &nodeGroupId, HASH_FIND, NULL); + + if (nodePlacementGroupHashEntry == NULL) + { + ereport(ERROR, (errmsg("no such node found"))); + } + + return nodePlacementGroupHashEntry; +} + + +/* + * WorkerNodeListGetNodeWithGroupId returns the index of worker node with given id + * in given worker node list. + * + * Throws an error if no such node is found. + */ +static int +WorkerNodeListGetNodeWithGroupId(List *workerNodeList, int32 nodeGroupId) +{ + int workerNodeIndex = 0; + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, workerNodeList) + { + if (workerNode->groupId == nodeGroupId) + { + return workerNodeIndex; + } + + workerNodeIndex++; + } + + ereport(ERROR, (errmsg("no such node is found"))); +} diff --git a/src/backend/distributed/operations/replicate_none_dist_table_shard.c b/src/backend/distributed/operations/replicate_none_dist_table_shard.c index c28490367db..667b4f3c693 100644 --- a/src/backend/distributed/operations/replicate_none_dist_table_shard.c +++ b/src/backend/distributed/operations/replicate_none_dist_table_shard.c @@ -4,6 +4,11 @@ * Routines to replicate shard of none-distributed table to * a remote node. * + * Procedures defined in this file assume that given none-distributed + * table was a Citus local table, caller updated the metadata to convert + * it to another none-distributed table type, and now wants to replicate + * the shard of the table to a remote node as part of the conversion. + * * Copyright (c) Citus Data, Inc. * *------------------------------------------------------------------------- @@ -58,14 +63,25 @@ NoneDistTableReplicateCoordinatorPlacement(Oid noneDistTableId, uint64 shardLength = ShardLength(shardId); + /* we've already verified that table has a coordinator placement */ + ShardPlacement *coordinatorPlacement = + linitial(ActiveShardPlacementListOnGroup(shardId, COORDINATOR_GROUP_ID)); + /* insert new placements to pg_dist_placement */ List *insertedPlacementList = NIL; WorkerNode *targetNode = NULL; foreach_ptr(targetNode, targetNodeList) { + /* + * needsIsolatedNode cannot be true because the input table was + * originally a Citus local table. + */ + Assert(!coordinatorPlacement->needsIsolatedNode); + ShardPlacement *shardPlacement = InsertShardPlacementRowGlobally(shardId, GetNextPlacementId(), - shardLength, targetNode->groupId); + shardLength, targetNode->groupId, + coordinatorPlacement->needsIsolatedNode); /* and save the placement for shard creation on workers */ insertedPlacementList = lappend(insertedPlacementList, shardPlacement); @@ -78,8 +94,6 @@ NoneDistTableReplicateCoordinatorPlacement(Oid noneDistTableId, /* fetch coordinator placement before deleting it */ Oid localPlacementTableId = GetTableLocalShardOid(noneDistTableId, shardId); - ShardPlacement *coordinatorPlacement = - linitial(ActiveShardPlacementListOnGroup(shardId, COORDINATOR_GROUP_ID)); /* * CreateForeignKeysFromReferenceTablesOnShards and CopyFromLocalTableIntoDistTable @@ -106,9 +120,16 @@ NoneDistTableReplicateCoordinatorPlacement(Oid noneDistTableId, */ CreateForeignKeysFromReferenceTablesOnShards(noneDistTableId); - /* using the same placement id, re-insert the deleted placement */ + /* + * Using the same placement id, re-insert the deleted placement. + * + * needsIsolatedNode cannot be true because the input table was originally + * a Citus local table. + */ + Assert(!coordinatorPlacement->needsIsolatedNode); InsertShardPlacementRowGlobally(shardId, coordinatorPlacement->placementId, - shardLength, COORDINATOR_GROUP_ID); + shardLength, COORDINATOR_GROUP_ID, + coordinatorPlacement->needsIsolatedNode); } diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index e3ee4aa4d0b..09d43718509 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -44,6 +44,7 @@ #include "distributed/pg_dist_rebalance_strategy.h" #include "distributed/reference_table_utils.h" #include "distributed/remote_commands.h" +#include "distributed/rebalancer_placement_isolation.h" #include "distributed/resource_lock.h" #include "distributed/shard_rebalancer.h" #include "distributed/shard_cleaner.h" @@ -145,6 +146,8 @@ typedef struct RebalanceContext FmgrInfo shardCostUDF; FmgrInfo nodeCapacityUDF; FmgrInfo shardAllowedOnNodeUDF; + + RebalancerPlacementIsolationContext *rebalancerPlacementGroupIsolationContext; } RebalanceContext; /* WorkerHashKey contains hostname and port to be used as a key in a hash */ @@ -253,7 +256,8 @@ static bool FindAndMoveShardCost(float4 utilizationLowerBound, float4 utilizationUpperBound, float4 improvementThreshold, RebalanceState *state); -static NodeFillState * FindAllowedTargetFillState(RebalanceState *state, uint64 shardId); +static NodeFillState * FindAllowedTargetFillState(RebalanceState *state, uint64 shardId, + uint64 placementId); static void MoveShardCost(NodeFillState *sourceFillState, NodeFillState *targetFillState, ShardCost *shardCost, RebalanceState *state); static int CompareNodeFillStateAsc(const void *void1, const void *void2); @@ -262,9 +266,10 @@ static int CompareShardCostAsc(const void *void1, const void *void2); static int CompareShardCostDesc(const void *void1, const void *void2); static int CompareDisallowedPlacementAsc(const void *void1, const void *void2); static int CompareDisallowedPlacementDesc(const void *void1, const void *void2); -static bool ShardAllowedOnNode(uint64 shardId, WorkerNode *workerNode, void *context); +static bool ShardAllowedOnNode(uint64 shardId, uint64 placementId, WorkerNode *workerNode, + void *context); static float4 NodeCapacity(WorkerNode *workerNode, void *context); -static ShardCost GetShardCost(uint64 shardId, void *context); +static ShardCost GetShardCost(uint64 shardId, uint64 placementId, void *context); static List * NonColocatedDistRelationIdList(void); static void RebalanceTableShards(RebalanceOptions *options, Oid shardReplicationModeOid); static int64 RebalanceTableShardsBackground(RebalanceOptions *options, Oid @@ -465,7 +470,9 @@ FullShardPlacementList(Oid relationId, ArrayType *excludedShardArray) ShardPlacement *placement = CitusMakeNode(ShardPlacement); placement->shardId = groupPlacement->shardId; placement->shardLength = groupPlacement->shardLength; + placement->groupId = groupPlacement->groupId; placement->nodeId = worker->nodeId; + placement->needsIsolatedNode = groupPlacement->needsIsolatedNode; placement->nodeName = pstrdup(worker->workerName); placement->nodePort = worker->workerPort; placement->placementId = groupPlacement->placementId; @@ -477,6 +484,36 @@ FullShardPlacementList(Oid relationId, ArrayType *excludedShardArray) } +/* + * ExcludeShardPlacementsFromNodesNotAllowedToHaveShards excludes shard + * placements from nodes that are not allowed to have shards and returns + * the filtered list. + * + * As an excecption, the shard placements on specified worker node are + * not excluded even if the node has been marked as not allowed to have + * placements. + */ +static List * +ExcludeShardPlacementsFromNodesNotAllowedToHaveShards(List *shardPlacementList, + WorkerNode *workerNode) +{ + List *filteredShardPlacementList = NIL; + + ShardPlacement *shardPlacement = NULL; + foreach_ptr(shardPlacement, shardPlacementList) + { + WorkerNode *node = LookupNodeByNodeIdOrError(shardPlacement->nodeId); + if (node->shouldHaveShards || workerNode->nodeId == node->nodeId) + { + filteredShardPlacementList = lappend(filteredShardPlacementList, + shardPlacement); + } + } + + return filteredShardPlacementList; +} + + /* * SortedActiveWorkers returns all the active workers like * ActiveReadableNodeList, but sorted. @@ -546,8 +583,22 @@ GetRebalanceSteps(RebalanceOptions *options) if (options->workerNode != NULL) { - activeShardPlacementListForRelation = FilterActiveShardPlacementListByNode( - shardPlacementList, options->workerNode); + /* + * Exclude the shard placements from the nodes that are marked + * as "not allowed to have shards", except for the specified + * worker node. + * + * We do not only include the shards from the specified worker + * but also the shards from the nodes that are allowed to have + * shards because draining the specified worker node might require + * moving the existing shards away from some other nodes, + * e.g., when the node we're draining causes placing a placement + * that needs an isolated node to another node that already had + * some shard placements. + */ + activeShardPlacementListForRelation = + ExcludeShardPlacementsFromNodesNotAllowedToHaveShards( + shardPlacementList, options->workerNode); } if (list_length(activeShardPlacementListForRelation) >= shardAllowedNodeCount) @@ -588,6 +639,9 @@ GetRebalanceSteps(RebalanceOptions *options) options->threshold = options->rebalanceStrategy->minimumThreshold; } + context.rebalancerPlacementGroupIsolationContext = + PrepareRebalancerPlacementIsolationContext(activeWorkerList, options->workerNode); + return RebalancePlacementUpdates(activeWorkerList, activeShardPlacementListList, options->threshold, @@ -602,7 +656,8 @@ GetRebalanceSteps(RebalanceOptions *options) * ShardAllowedOnNode determines if shard is allowed on a specific worker node. */ static bool -ShardAllowedOnNode(uint64 shardId, WorkerNode *workerNode, void *voidContext) +ShardAllowedOnNode(uint64 shardId, uint64 placementId, WorkerNode *workerNode, + void *voidContext) { if (!workerNode->shouldHaveShards) { @@ -610,6 +665,14 @@ ShardAllowedOnNode(uint64 shardId, WorkerNode *workerNode, void *voidContext) } RebalanceContext *context = voidContext; + + if (!RebalancerPlacementIsolationContextPlacementIsAllowedOnWorker( + context->rebalancerPlacementGroupIsolationContext, + shardId, placementId, workerNode)) + { + return false; + } + Datum allowed = FunctionCall2(&context->shardAllowedOnNodeUDF, shardId, workerNode->nodeId); return DatumGetBool(allowed); @@ -643,10 +706,11 @@ NodeCapacity(WorkerNode *workerNode, void *voidContext) * to be. */ static ShardCost -GetShardCost(uint64 shardId, void *voidContext) +GetShardCost(uint64 shardId, uint64 placementId, void *voidContext) { ShardCost shardCost = { 0 }; shardCost.shardId = shardId; + shardCost.placementId = placementId; RebalanceContext *context = voidContext; Datum shardCostDatum = FunctionCall1(&context->shardCostUDF, UInt64GetDatum(shardId)); shardCost.cost = DatumGetFloat4(shardCostDatum); @@ -2560,7 +2624,8 @@ InitRebalanceState(List *workerNodeList, List *shardPlacementList, Assert(fillState != NULL); - *shardCost = functions->shardCost(placement->shardId, functions->context); + *shardCost = functions->shardCost(placement->shardId, placement->placementId, + functions->context); fillState->totalCost += shardCost->cost; fillState->utilization = CalculateUtilization(fillState->totalCost, @@ -2572,8 +2637,8 @@ InitRebalanceState(List *workerNodeList, List *shardPlacementList, state->totalCost += shardCost->cost; - if (!functions->shardAllowedOnNode(placement->shardId, fillState->node, - functions->context)) + if (!functions->shardAllowedOnNode(placement->shardId, placement->placementId, + fillState->node, functions->context)) { DisallowedPlacement *disallowed = palloc0(sizeof(DisallowedPlacement)); disallowed->shardCost = shardCost; @@ -2733,7 +2798,8 @@ MoveShardsAwayFromDisallowedNodes(RebalanceState *state) foreach_ptr(disallowedPlacement, state->disallowedPlacementList) { NodeFillState *targetFillState = FindAllowedTargetFillState( - state, disallowedPlacement->shardCost->shardId); + state, disallowedPlacement->shardCost->shardId, + disallowedPlacement->shardCost->placementId); if (targetFillState == NULL) { ereport(WARNING, (errmsg( @@ -2782,7 +2848,7 @@ CompareDisallowedPlacementDesc(const void *a, const void *b) * where the shard can be moved to. */ static NodeFillState * -FindAllowedTargetFillState(RebalanceState *state, uint64 shardId) +FindAllowedTargetFillState(RebalanceState *state, uint64 shardId, uint64 placementId) { NodeFillState *targetFillState = NULL; foreach_ptr(targetFillState, state->fillStateListAsc) @@ -2793,6 +2859,7 @@ FindAllowedTargetFillState(RebalanceState *state, uint64 shardId) targetFillState->node); if (!hasShard && state->functions->shardAllowedOnNode( shardId, + placementId, targetFillState->node, state->functions->context)) { @@ -2967,6 +3034,7 @@ FindAndMoveShardCost(float4 utilizationLowerBound, /* Skip shards that already are not allowed on the node */ if (!state->functions->shardAllowedOnNode(shardCost->shardId, + shardCost->placementId, targetFillState->node, state->functions->context)) { @@ -3163,7 +3231,7 @@ ReplicationPlacementUpdates(List *workerNodeList, List *activeShardPlacementList { WorkerNode *workerNode = list_nth(workerNodeList, workerNodeIndex); - if (!NodeCanHaveDistTablePlacements(workerNode)) + if (!NodeCanBeUsedForNonIsolatedPlacements(workerNode)) { /* never replicate placements to nodes that should not have placements */ continue; diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 0772b03b488..e44bd0b9f8f 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -1177,11 +1177,14 @@ InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, IntegerToText(DatumGetInt32(shardInterval->minValue)), IntegerToText(DatumGetInt32(shardInterval->maxValue))); + /* we don't presert needsIsolatedNode when splitting a shard */ + bool needsIsolatedNode = false; InsertShardPlacementRow( shardInterval->shardId, INVALID_PLACEMENT_ID, /* triggers generation of new id */ 0, /* shard length (zero for HashDistributed Table) */ - workerPlacementNode->groupId); + workerPlacementNode->groupId, + needsIsolatedNode); if (ShouldSyncTableMetadata(shardInterval->relationId)) { diff --git a/src/backend/distributed/operations/shard_transfer.c b/src/backend/distributed/operations/shard_transfer.c index 23925a3153a..c305e90f934 100644 --- a/src/backend/distributed/operations/shard_transfer.c +++ b/src/backend/distributed/operations/shard_transfer.c @@ -122,7 +122,10 @@ static void EnsureShardCanBeCopied(int64 shardId, const char *sourceNodeName, static List * RecreateTableDDLCommandList(Oid relationId); static void EnsureTableListOwner(List *tableIdList); static void ErrorIfReplicatingDistributedTableWithFKeys(List *tableIdList); - +static bool NewPlacementNeedsIsolatedNode(uint64 shardId, + char *sourceNodeName, + int32 sourceNodePort, + ShardTransferType transferType); static void DropShardPlacementsFromMetadata(List *shardList, char *nodeName, int32 nodePort); @@ -524,15 +527,35 @@ TransferShards(int64 shardId, char *sourceNodeName, uint32 groupId = GroupForNode(targetNodeName, targetNodePort); uint64 placementId = GetNextPlacementId(); + /* + * Decide whether the new placement needs isolated node or not. + * + * Note that even if the new placement needs isolated node, we don't + * enforce it here because we assume that user is aware of what they're + * doing if this shard transfer operation is initiated by the user. + * Consequence of this assumption is that if user is moving a placement + * that needs isolated node to a node that has some other placements, + * then the next call made to rebalancer would overwrite this operation + * (by moving the placement to an appropriate node). + * + * Otherwise, i.e., if this is initiated by the rebalancer, rebalancer + * anyway enforces isolation by choosing an appropriate node. + */ + bool newPlacementNeedsIsolatedNode = NewPlacementNeedsIsolatedNode( + colocatedShardId, + sourceNodeName, + sourceNodePort, + transferType); InsertShardPlacementRow(colocatedShardId, placementId, ShardLength(colocatedShardId), - groupId); + groupId, newPlacementNeedsIsolatedNode); if (transferType == SHARD_TRANSFER_COPY && ShouldSyncTableMetadata(colocatedShard->relationId)) { char *placementCommand = PlacementUpsertCommand(colocatedShardId, placementId, - 0, groupId); + 0, groupId, + newPlacementNeedsIsolatedNode); SendCommandToWorkersWithMetadata(placementCommand); } @@ -1979,6 +2002,33 @@ RecreateTableDDLCommandList(Oid relationId) } +/* + * NewPlacementNeedsIsolatedNode if the placement we're creating based on the + * placement we're replicating from sourceNodeName/sourceNodePort needs + * isolation. + * + * For SHARD_TRANSFER_COPY, we don't preserve needsisolatednode. However, for + * SHARD_TRANSFER_MOVE, we preserve it. + */ +static bool +NewPlacementNeedsIsolatedNode(uint64 shardId, char *sourceNodeName, int32 sourceNodePort, + ShardTransferType transferType) +{ + if (transferType == SHARD_TRANSFER_COPY) + { + return false; + } + + /* assume we're transferring the first placement */ + uint32 groupId = GroupForNode(sourceNodeName, sourceNodePort); + + List *activeShardPlacementListOnGroup = ActiveShardPlacementListOnGroup(shardId, + groupId); + ShardPlacement *firstPlacementOnGroup = linitial(activeShardPlacementListOnGroup); + return firstPlacementOnGroup->needsIsolatedNode; +} + + /* * DropShardPlacementsFromMetadata drops the shard placement metadata for * the shard placements of given shard interval list from pg_dist_placement. diff --git a/src/backend/distributed/operations/stage_protocol.c b/src/backend/distributed/operations/stage_protocol.c index 421593c662c..6660b260aa2 100644 --- a/src/backend/distributed/operations/stage_protocol.c +++ b/src/backend/distributed/operations/stage_protocol.c @@ -164,7 +164,14 @@ master_create_empty_shard(PG_FUNCTION_ARGS) uint64 shardId = GetNextShardId(); /* if enough live groups, add an extra candidate node as backup */ - List *workerNodeList = DistributedTablePlacementNodeList(NoLock); + List *workerNodeList = NewDistributedTablePlacementNodeList(NoLock); + + if (workerNodeList == NIL) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("no worker nodes are available for placing shards"), + errhint("Add more worker nodes."))); + } if (list_length(workerNodeList) > ShardReplicationFactor) { @@ -362,8 +369,9 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId, ExecuteCriticalRemoteCommandList(connection, commandList); + bool needsIsolatedNode = false; InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, shardSize, - nodeGroupId); + nodeGroupId, needsIsolatedNode); placementsCreated++; if (placementsCreated >= replicationFactor) @@ -384,6 +392,9 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId, /* * InsertShardPlacementRows inserts shard placements to the metadata table on * the coordinator node. + * + * This function assumes that the caller is inserting the placements for a + * newly created shard. As a result, always sets needsisolatednode to false. */ void InsertShardPlacementRows(Oid relationId, int64 shardId, List *workerNodeList, @@ -398,10 +409,12 @@ InsertShardPlacementRows(Oid relationId, int64 shardId, List *workerNodeList, uint32 nodeGroupId = workerNode->groupId; const uint64 shardSize = 0; + bool needsIsolatedNode = false; InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, shardSize, - nodeGroupId); + nodeGroupId, + needsIsolatedNode); } } @@ -811,9 +824,11 @@ UpdateShardSize(uint64 shardId, ShardInterval *shardInterval, Oid relationId, { uint64 placementId = placement->placementId; int32 groupId = placement->groupId; + bool needsIsolatedNode = placement->needsIsolatedNode; DeleteShardPlacementRow(placementId); - InsertShardPlacementRow(shardId, placementId, shardSize, groupId); + InsertShardPlacementRow(shardId, placementId, shardSize, groupId, + needsIsolatedNode); } } diff --git a/src/backend/distributed/operations/worker_node_manager.c b/src/backend/distributed/operations/worker_node_manager.c index 76f2732bae2..06ab3adab20 100644 --- a/src/backend/distributed/operations/worker_node_manager.c +++ b/src/backend/distributed/operations/worker_node_manager.c @@ -271,30 +271,36 @@ ErrorIfCoordinatorNotAddedAsWorkerNode() /* - * DistributedTablePlacementNodeList returns a list of all active, primary + * NewDistributedTablePlacementNodeList returns a list of all active, primary * worker nodes that can store new data, i.e shouldstoreshards is 'true' + * and that is not used to isolate a shard placement group. */ List * -DistributedTablePlacementNodeList(LOCKMODE lockMode) +NewDistributedTablePlacementNodeList(LOCKMODE lockMode) { EnsureModificationsCanRun(); - return FilterActiveNodeListFunc(lockMode, NodeCanHaveDistTablePlacements); + return FilterActiveNodeListFunc(lockMode, NodeCanBeUsedForNonIsolatedPlacements); } /* - * NodeCanHaveDistTablePlacements returns true if the given node can have - * shards of a distributed table. + * NodeCanBeUsedForNonIsolatedPlacements returns true if given node can be + * used to store shard placements that don't need isolation. */ bool -NodeCanHaveDistTablePlacements(WorkerNode *node) +NodeCanBeUsedForNonIsolatedPlacements(WorkerNode *node) { if (!NodeIsPrimary(node)) { return false; } - return node->shouldHaveShards; + if (!node->shouldHaveShards) + { + return false; + } + + return !NodeGroupIsolatesAShardPlacementGroup(node->groupId); } diff --git a/src/backend/distributed/sql/citus--12.0-1--12.1-1.sql b/src/backend/distributed/sql/citus--12.0-1--12.1-1.sql index 4e2a515a357..47126555b84 100644 --- a/src/backend/distributed/sql/citus--12.0-1--12.1-1.sql +++ b/src/backend/distributed/sql/citus--12.0-1--12.1-1.sql @@ -10,3 +10,16 @@ #include "udfs/citus_internal_delete_placement_metadata/12.1-1.sql" #include "udfs/citus_schema_move/12.1-1.sql" + +ALTER TABLE pg_dist_placement ADD COLUMN needsisolatednode boolean NOT NULL DEFAULT false; + +-- Drop the legacy one that survived from 10.2-1, not the one created in 11.2-1. +-- +-- And as we did when upgrading from 10.2-1 to 11.2-1, citus_internal_add_placement_metadata/12.1-1.sql +-- preserves the one created in 11.2-1 as the "new legacy" one. +DROP FUNCTION pg_catalog.citus_internal_add_placement_metadata( + shard_id bigint, shard_state integer, + shard_length bigint, group_id integer, + placement_id bigint); + +#include "udfs/citus_internal_add_placement_metadata/12.1-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql b/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql index 6f58b2f54b1..f1a8b3adb58 100644 --- a/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql @@ -22,3 +22,13 @@ DROP FUNCTION pg_catalog.citus_schema_move( schema_id regnamespace, target_node_id integer, shard_transfer_mode citus.shard_transfer_mode ); + +ALTER TABLE pg_dist_placement DROP COLUMN needsisolatednode; + +DROP FUNCTION pg_catalog.citus_internal_add_placement_metadata( + shard_id bigint, + shard_length bigint, group_id integer, + placement_id bigint, + needs_isolation boolean); + +#include "../udfs/citus_internal_add_placement_metadata/11.2-1.sql" diff --git a/src/backend/distributed/sql/udfs/citus_internal_add_placement_metadata/12.1-1.sql b/src/backend/distributed/sql/udfs/citus_internal_add_placement_metadata/12.1-1.sql new file mode 100644 index 00000000000..45acb3f2dfc --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_add_placement_metadata/12.1-1.sql @@ -0,0 +1,24 @@ +-- create a new function, with needs_isolation +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_placement_metadata( + shard_id bigint, + shard_length bigint, group_id integer, + placement_id bigint, + needs_isolation boolean) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$citus_internal_add_placement_metadata$$; + +COMMENT ON FUNCTION pg_catalog.citus_internal_add_placement_metadata(bigint, bigint, integer, bigint, boolean) IS + 'Inserts into pg_dist_shard_placement with user checks'; + +-- replace the old one so it would call the old C function without needs_isolation +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_placement_metadata( + shard_id bigint, + shard_length bigint, group_id integer, + placement_id bigint) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$citus_internal_add_placement_metadata_legacy$$; + +COMMENT ON FUNCTION pg_catalog.citus_internal_add_placement_metadata(bigint, bigint, integer, bigint) IS + 'Inserts into pg_dist_shard_placement with user checks'; diff --git a/src/backend/distributed/sql/udfs/citus_internal_add_placement_metadata/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_add_placement_metadata/latest.sql index 9d1dd4ffa36..45acb3f2dfc 100644 --- a/src/backend/distributed/sql/udfs/citus_internal_add_placement_metadata/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_internal_add_placement_metadata/latest.sql @@ -1,24 +1,24 @@ --- create a new function, without shardstate +-- create a new function, with needs_isolation CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_placement_metadata( shard_id bigint, shard_length bigint, group_id integer, - placement_id bigint) + placement_id bigint, + needs_isolation boolean) RETURNS void LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$citus_internal_add_placement_metadata$$; -COMMENT ON FUNCTION pg_catalog.citus_internal_add_placement_metadata(bigint, bigint, integer, bigint) IS +COMMENT ON FUNCTION pg_catalog.citus_internal_add_placement_metadata(bigint, bigint, integer, bigint, boolean) IS 'Inserts into pg_dist_shard_placement with user checks'; --- replace the old one so it would call the old C function with shard_state +-- replace the old one so it would call the old C function without needs_isolation CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_placement_metadata( - shard_id bigint, shard_state integer, + shard_id bigint, shard_length bigint, group_id integer, placement_id bigint) RETURNS void LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$citus_internal_add_placement_metadata_legacy$$; -COMMENT ON FUNCTION pg_catalog.citus_internal_add_placement_metadata(bigint, integer, bigint, integer, bigint) IS +COMMENT ON FUNCTION pg_catalog.citus_internal_add_placement_metadata(bigint, bigint, integer, bigint) IS 'Inserts into pg_dist_shard_placement with user checks'; - diff --git a/src/backend/distributed/test/shard_rebalancer.c b/src/backend/distributed/test/shard_rebalancer.c index 56a06398232..3174d5fa91e 100644 --- a/src/backend/distributed/test/shard_rebalancer.c +++ b/src/backend/distributed/test/shard_rebalancer.c @@ -44,9 +44,10 @@ static uint64 JsonFieldValueUInt64Default(Datum jsonDocument, const char *key, uint64 defaultValue); static char * JsonFieldValueString(Datum jsonDocument, const char *key); static ArrayType * PlacementUpdateListToJsonArray(List *placementUpdateList); -static bool ShardAllowedOnNode(uint64 shardId, WorkerNode *workerNode, void *context); +static bool ShardAllowedOnNode(uint64 shardId, uint64 placementId, WorkerNode *workerNode, + void *context); static float NodeCapacity(WorkerNode *workerNode, void *context); -static ShardCost GetShardCost(uint64 shardId, void *context); +static ShardCost GetShardCost(uint64 shardId, uint64 placementId, void *context); PG_FUNCTION_INFO_V1(shard_placement_rebalance_array); @@ -191,7 +192,8 @@ shard_placement_rebalance_array(PG_FUNCTION_ARGS) * a worker when running the shard rebalancer unit tests. */ static bool -ShardAllowedOnNode(uint64 shardId, WorkerNode *workerNode, void *voidContext) +ShardAllowedOnNode(uint64 shardId, uint64 placementId, WorkerNode *workerNode, + void *voidContext) { RebalancePlacementContext *context = voidContext; WorkerTestInfo *workerTestInfo = NULL; @@ -242,12 +244,13 @@ NodeCapacity(WorkerNode *workerNode, void *voidContext) * the shard rebalancer unit tests. */ static ShardCost -GetShardCost(uint64 shardId, void *voidContext) +GetShardCost(uint64 shardId, uint64 placementId, void *voidContext) { RebalancePlacementContext *context = voidContext; ShardCost shardCost; memset_struct_0(shardCost); shardCost.shardId = shardId; + shardCost.placementId = placementId; ShardPlacementTestInfo *shardPlacementTestInfo = NULL; foreach_ptr(shardPlacementTestInfo, context->shardPlacementTestInfoList) diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index e7007874bb3..8ab43acf793 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -1245,7 +1245,7 @@ SingleShardTableColocationNodeId(uint32 colocationId) { int workerNodeIndex = EmptySingleShardTableColocationDecideNodeId(colocationId); - List *workerNodeList = DistributedTablePlacementNodeList(RowShareLock); + List *workerNodeList = NewDistributedTablePlacementNodeList(RowShareLock); WorkerNode *workerNode = (WorkerNode *) list_nth(workerNodeList, workerNodeIndex); return workerNode->nodeId; diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 237df363a13..334a67f4743 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -73,7 +73,8 @@ typedef struct SequenceInfo extern void citus_internal_add_placement_metadata_internal(int64 shardId, int64 shardLength, int32 groupId, - int64 placementId); + int64 placementId, + bool needsIsolatedNode); extern void SyncCitusTableMetadata(Oid relationId); extern void EnsureSequentialModeMetadataOperations(void); extern bool ClusterHasKnownMetadataWorkers(void); @@ -112,7 +113,8 @@ extern List * GenerateGrantOnForeignServerQueriesFromAclItem(Oid serverId, AclItem *aclItem); extern List * GenerateGrantOnFDWQueriesFromAclItem(Oid serverId, AclItem *aclItem); extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, - uint64 shardLength, int32 groupId); + uint64 shardLength, int32 groupId, + bool needsIsolatedNode); extern TableDDLCommand * TruncateTriggerCreateCommand(Oid relationId); extern void CreateInterTableRelationshipOfRelationOnWorkers(Oid relationId); extern List * InterTableRelationshipOfRelationCommandList(Oid relationId); @@ -142,7 +144,8 @@ extern char * TenantSchemaDeleteCommand(char *schemaName); extern char * UpdateNoneDistTableMetadataCommand(Oid relationId, char replicationModel, uint32 colocationId, bool autoConverted); extern char * AddPlacementMetadataCommand(uint64 shardId, uint64 placementId, - uint64 shardLength, int32 groupId); + uint64 shardLength, int32 groupId, + bool needsIsolatedNode); extern char * DeletePlacementMetadataCommand(uint64 placementId); extern MetadataSyncContext * CreateMetadataSyncContext(List *nodeList, @@ -203,10 +206,10 @@ extern void SendInterTableRelationshipCommands(MetadataSyncContext *context); #define UPSERT_PLACEMENT \ "INSERT INTO pg_dist_placement " \ "(shardid, shardstate, shardlength, " \ - "groupid, placementid) " \ + "groupid, placementid, needsisolatednode) " \ "VALUES (" UINT64_FORMAT ", 1, " UINT64_FORMAT \ ", %d, " UINT64_FORMAT \ - ") " \ + ", %s) " \ "ON CONFLICT (shardid, groupid) DO UPDATE SET " \ "shardstate = EXCLUDED.shardstate, " \ "shardlength = EXCLUDED.shardlength, " \ diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 9234adc7648..3f838a2772d 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -78,6 +78,7 @@ typedef struct GroupShardPlacement uint64 shardId; uint64 shardLength; int32 groupId; + bool needsIsolatedNode; } GroupShardPlacement; @@ -92,6 +93,7 @@ typedef struct ShardPlacement uint64 shardId; uint64 shardLength; int32 groupId; + bool needsIsolatedNode; /* the rest of the fields aren't from pg_dist_placement */ char *nodeName; @@ -103,6 +105,14 @@ typedef struct ShardPlacement } ShardPlacement; +typedef struct +{ + uint32 colocatationId; + int shardIntervalIndex; + int32 nodeGroupId; +} ShardPlacementGroup; + + typedef enum CascadeToColocatedOption { CASCADE_TO_COLOCATED_UNSPECIFIED, @@ -322,14 +332,16 @@ extern int ShardIntervalCount(Oid relationId); extern List * LoadShardList(Oid relationId); extern ShardInterval * CopyShardInterval(ShardInterval *srcInterval); extern uint64 ShardLength(uint64 shardId); +extern bool NodeGroupIsolatesAShardPlacementGroup(int32 groupId); +extern int ShardPlacementGroupCompare(const void *left, const void *right); extern bool NodeGroupHasShardPlacements(int32 groupId); +extern ShardPlacementGroup * GetShardPlacementGroupForPlacement(uint64 shardId, + uint64 placementId); extern bool IsActiveShardPlacement(ShardPlacement *ShardPlacement); extern bool IsRemoteShardPlacement(ShardPlacement *shardPlacement); extern bool IsPlacementOnWorkerNode(ShardPlacement *placement, WorkerNode *workerNode); extern List * FilterShardPlacementList(List *shardPlacementList, bool (*filter)( ShardPlacement *)); -extern List * FilterActiveShardPlacementListByNode(List *shardPlacementList, - WorkerNode *workerNode); extern List * ActiveShardPlacementListOnGroup(uint64 shardId, int32 groupId); extern List * ActiveShardPlacementList(uint64 shardId); extern List * ShardPlacementListSortedByWorker(uint64 shardId); @@ -337,6 +349,7 @@ extern ShardPlacement * ActiveShardPlacement(uint64 shardId, bool missingOk); extern WorkerNode * ActiveShardPlacementWorkerNode(uint64 shardId); extern List * BuildShardPlacementList(int64 shardId); extern List * AllShardPlacementsOnNodeGroup(int32 groupId); +extern List * AllGroupShardPlacementsThatNeedIsolatedNode(void); extern List * GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId); extern void LookupTaskPlacementHostAndPort(ShardPlacement *taskPlacement, char **nodeName, int *nodePort); @@ -353,9 +366,11 @@ extern void DeleteShardRow(uint64 shardId); extern ShardPlacement * InsertShardPlacementRowGlobally(uint64 shardId, uint64 placementId, uint64 shardLength, - int32 groupId); + int32 groupId, + bool needsIsolatedNode); extern uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId, - uint64 shardLength, int32 groupId); + uint64 shardLength, int32 groupId, + bool needsIsolatedNode); extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod, Var *distributionColumn, uint32 colocationId, char replicationModel, bool autoConverted); diff --git a/src/include/distributed/pg_dist_placement.h b/src/include/distributed/pg_dist_placement.h index 6aecdbf2194..5affde1a532 100644 --- a/src/include/distributed/pg_dist_placement.h +++ b/src/include/distributed/pg_dist_placement.h @@ -28,6 +28,7 @@ typedef struct FormData_pg_dist_placement int32 shardstate; /* shard state on remote node; see ShardState */ int64 shardlength; /* shard length on remote node; stored as bigint */ int32 groupid; /* the group the shard is placed on */ + bool needsisolatednode; /* whether the placement group needs to be isolated from others */ } FormData_pg_dist_placement; /* ---------------- @@ -41,12 +42,13 @@ typedef FormData_pg_dist_placement *Form_pg_dist_placement; * compiler constants for pg_dist_placement * ---------------- */ -#define Natts_pg_dist_placement 5 +#define Natts_pg_dist_placement 6 #define Anum_pg_dist_placement_placementid 1 #define Anum_pg_dist_placement_shardid 2 #define Anum_pg_dist_placement_shardstate 3 #define Anum_pg_dist_placement_shardlength 4 #define Anum_pg_dist_placement_groupid 5 +#define Anum_pg_dist_placement_needsisolatednode 6 #endif /* PG_DIST_PLACEMENT_H */ diff --git a/src/include/distributed/rebalancer_placement_isolation.h b/src/include/distributed/rebalancer_placement_isolation.h new file mode 100644 index 00000000000..81b7d8df33e --- /dev/null +++ b/src/include/distributed/rebalancer_placement_isolation.h @@ -0,0 +1,37 @@ +/*------------------------------------------------------------------------- + * + * rebalancer_placement_isolation.h + * Routines to determine which worker node should be used to isolate + * a colocated set of shard placements that needs isolation. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef PLACEMENT_ISOLATION_H +#define PLACEMENT_ISOLATION_H + +#include "postgres.h" +#include "nodes/pg_list.h" +#include "utils/hsearch.h" + +#include "distributed/metadata_utility.h" + +struct RebalancerPlacementIsolationContext; +typedef struct RebalancerPlacementIsolationContext RebalancerPlacementIsolationContext; + +extern RebalancerPlacementIsolationContext * PrepareRebalancerPlacementIsolationContext( + List *activeWorkerNodeList, + WorkerNode + * + drainWorkerNode); +extern bool RebalancerPlacementIsolationContextPlacementIsAllowedOnWorker( + RebalancerPlacementIsolationContext *context, + uint64 shardId, + uint64 + placementId, + WorkerNode * + workerNode); + +#endif /* PLACEMENT_ISOLATION_H */ diff --git a/src/include/distributed/shard_rebalancer.h b/src/include/distributed/shard_rebalancer.h index 38ce4f48562..8245bef5307 100644 --- a/src/include/distributed/shard_rebalancer.h +++ b/src/include/distributed/shard_rebalancer.h @@ -162,6 +162,7 @@ typedef struct NodeFillState typedef struct ShardCost { uint64 shardId; + uint64 placementId; /* * cost is the cost of the shard. This doesn't have a unit. @@ -180,9 +181,10 @@ typedef struct DisallowedPlacement typedef struct RebalancePlanFunctions { - bool (*shardAllowedOnNode)(uint64 shardId, WorkerNode *workerNode, void *context); + bool (*shardAllowedOnNode)(uint64 shardId, uint64 placementId, WorkerNode *workerNode, + void *context); float4 (*nodeCapacity)(WorkerNode *workerNode, void *context); - ShardCost (*shardCost)(uint64 shardId, void *context); + ShardCost (*shardCost)(uint64 shardId, uint64 placementId, void *context); void *context; } RebalancePlanFunctions; diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 5ad7f496294..d35b8e0a29a 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -33,6 +33,7 @@ #define WORKER_DEFAULT_CLUSTER "default" +#define INVALID_GROUP_ID -1 #define COORDINATOR_GROUP_ID 0 /* @@ -76,8 +77,8 @@ extern bool CoordinatorAddedAsWorkerNode(void); extern List * ReferenceTablePlacementNodeList(LOCKMODE lockMode); extern WorkerNode * CoordinatorNodeIfAddedAsWorkerOrError(void); extern void ErrorIfCoordinatorNotAddedAsWorkerNode(void); -extern List * DistributedTablePlacementNodeList(LOCKMODE lockMode); -extern bool NodeCanHaveDistTablePlacements(WorkerNode *node); +extern List * NewDistributedTablePlacementNodeList(LOCKMODE lockMode); +extern bool NodeCanBeUsedForNonIsolatedPlacements(WorkerNode *node); extern List * ActiveReadableNonCoordinatorNodeList(void); extern List * ActiveReadableNodeList(void); extern WorkerNode * FindWorkerNode(const char *nodeName, int32 nodePort); diff --git a/src/test/regress/citus_tests/run_test.py b/src/test/regress/citus_tests/run_test.py index 2b71f5e1b6c..ce4ce8326fb 100755 --- a/src/test/regress/citus_tests/run_test.py +++ b/src/test/regress/citus_tests/run_test.py @@ -169,6 +169,13 @@ def extra_tests(self): ), "grant_on_schema_propagation": TestDeps("minimal_schedule"), "propagate_extension_commands": TestDeps("minimal_schedule"), + "metadata_sync_helpers": TestDeps( + None, + [ + "multi_test_helpers", + "multi_cluster_management", + ], + ), } diff --git a/src/test/regress/expected/metadata_sync_helpers.out b/src/test/regress/expected/metadata_sync_helpers.out index 29d62c46aa7..85eff3c8680 100644 --- a/src/test/regress/expected/metadata_sync_helpers.out +++ b/src/test/regress/expected/metadata_sync_helpers.out @@ -963,9 +963,9 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse - WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS - (VALUES (-10, 1, 0::bigint, 1::int, 1500000::bigint)) - SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data; + WITH placement_data(shardid, shardlength, groupid, placementid) AS + (VALUES (-10, 0::bigint, 1::int, 1500000::bigint)) + SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data; ERROR: could not find valid entry for shard xxxxx ROLLBACK; -- invalid placementid @@ -980,7 +980,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; \set VERBOSITY terse WITH placement_data(shardid, shardlength, groupid, placementid) AS (VALUES (1420000, 0::bigint, 1::int, -10)) - SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data; + SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid, false) FROM placement_data; ERROR: Shard placement has invalid placement id (-10) for shard(1420000) ROLLBACK; -- non-existing shard @@ -995,7 +995,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; \set VERBOSITY terse WITH placement_data(shardid, shardlength, groupid, placementid) AS (VALUES (1430100, 0::bigint, 1::int, 10)) - SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data; + SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid, false) FROM placement_data; ERROR: could not find valid entry for shard xxxxx ROLLBACK; -- non-existing node with non-existing node-id 123123123 @@ -1010,7 +1010,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; \set VERBOSITY terse WITH placement_data(shardid, shardlength, groupid, placementid) AS (VALUES ( 1420000, 0::bigint, 123123123::int, 1500000)) - SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data; + SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid, false) FROM placement_data; ERROR: Node with group id 123123123 for shard placement xxxxx does not exist ROLLBACK; -- create a volatile function that returns the local node id @@ -1041,7 +1041,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; WITH placement_data(shardid, shardlength, groupid, placementid) AS (VALUES (1420000, 0::bigint, get_node_id(), 1500000), (1420000, 0::bigint, get_node_id(), 1500001)) - SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data; + SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid, false) FROM placement_data; ERROR: duplicate key value violates unique constraint "placement_shardid_groupid_unique_index" ROLLBACK; -- shard is not owned by us @@ -1056,7 +1056,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; \set VERBOSITY terse WITH placement_data(shardid, shardlength, groupid, placementid) AS (VALUES (1420007, 0::bigint, get_node_id(), 1500000)) - SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data; + SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid, false) FROM placement_data; ERROR: must be owner of table super_user_table ROLLBACK; -- sucessfully add placements @@ -1082,7 +1082,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1420011, 0::bigint, get_node_id(), 1500009), (1420012, 0::bigint, get_node_id(), 1500010), (1420013, 0::bigint, get_node_id(), 1500011)) - SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data; + SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid, false) FROM placement_data; citus_internal_add_placement_metadata --------------------------------------------------------------------- @@ -1284,7 +1284,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SET application_name to 'citus_internal gpid=10000000001'; -- with an ugly trick, update the vartype of table from int to bigint -- so that making two tables colocated fails - -- include varnullingrels for PG16 + -- include varnullingrels for PG16 SHOW server_version \gset SELECT substring(:'server_version', '\d+')::int >= 16 AS server_version_ge_16 \gset diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index fe203efb585..427059082fd 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -60,7 +60,7 @@ BEGIN FROM current_objects c FULL JOIN prev_objects p ON p.description = c.description WHERE (p.description is null OR c.description is null) - AND c.description IS DISTINCT FROM 'function any_value(anyelement) anyelement' + AND c.description IS DISTINCT FROM 'function any_value(anyelement) anyelement' AND c.description IS DISTINCT FROM 'function any_value_agg(anyelement,anyelement) anyelement'; DROP TABLE prev_objects; @@ -1399,14 +1399,16 @@ SELECT * FROM multi_extension.print_extension_changes(); -- Snapshot of state at 12.1-1 ALTER EXTENSION citus UPDATE TO '12.1-1'; SELECT * FROM multi_extension.print_extension_changes(); - previous_object | current_object ---------------------------------------------------------------------- - | function citus_internal_delete_placement_metadata(bigint) void - | function citus_internal_update_none_dist_table_metadata(oid,"char",bigint,boolean) void - | function citus_pause_node_within_txn(integer,boolean,integer) void - | function citus_schema_move(regnamespace,integer,citus.shard_transfer_mode) void - | function citus_schema_move(regnamespace,text,integer,citus.shard_transfer_mode) void -(5 rows) + previous_object | current_object +--------------------------------------------------------------------- + function citus_internal_add_placement_metadata(bigint,integer,bigint,integer,bigint) void | + | function citus_internal_add_placement_metadata(bigint,bigint,integer,bigint,boolean) void + | function citus_internal_delete_placement_metadata(bigint) void + | function citus_internal_update_none_dist_table_metadata(oid,"char",bigint,boolean) void + | function citus_pause_node_within_txn(integer,boolean,integer) void + | function citus_schema_move(regnamespace,integer,citus.shard_transfer_mode) void + | function citus_schema_move(regnamespace,text,integer,citus.shard_transfer_mode) void +(7 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/multi_metadata_attributes.out b/src/test/regress/expected/multi_metadata_attributes.out index b54946d3f01..0e290cc6840 100644 --- a/src/test/regress/expected/multi_metadata_attributes.out +++ b/src/test/regress/expected/multi_metadata_attributes.out @@ -10,7 +10,8 @@ WHERE atthasmissing AND attrelid NOT IN ('pg_dist_node'::regclass, 'pg_dist_rebalance_strategy'::regclass, 'pg_dist_partition'::regclass, 'pg_dist_object'::regclass, - 'pg_dist_background_task'::regclass) + 'pg_dist_background_task'::regclass, + 'pg_dist_placement'::regclass) ORDER BY attrelid, attname; attrelid | attname | atthasmissing | attmissingval --------------------------------------------------------------------- diff --git a/src/test/regress/expected/shard_rebalancer.out b/src/test/regress/expected/shard_rebalancer.out index 7997b5e28b5..f273f3bfc9b 100644 --- a/src/test/regress/expected/shard_rebalancer.out +++ b/src/test/regress/expected/shard_rebalancer.out @@ -1153,16 +1153,16 @@ SELECT * FROM public.table_placements_per_node; CALL citus_cleanup_orphaned_resources(); select * from pg_dist_placement ORDER BY placementid; - placementid | shardid | shardstate | shardlength | groupid ---------------------------------------------------------------------- - 138 | 123023 | 1 | 0 | 14 - 141 | 123024 | 1 | 0 | 14 - 144 | 123027 | 1 | 0 | 14 - 145 | 123028 | 1 | 0 | 14 - 146 | 123021 | 1 | 0 | 16 - 147 | 123025 | 1 | 0 | 16 - 148 | 123022 | 1 | 0 | 16 - 149 | 123026 | 1 | 0 | 16 + placementid | shardid | shardstate | shardlength | groupid | needsisolatednode +--------------------------------------------------------------------- + 138 | 123023 | 1 | 0 | 14 | f + 141 | 123024 | 1 | 0 | 14 | f + 144 | 123027 | 1 | 0 | 14 | f + 145 | 123028 | 1 | 0 | 14 | f + 146 | 123021 | 1 | 0 | 16 | f + 147 | 123025 | 1 | 0 | 16 | f + 148 | 123022 | 1 | 0 | 16 | f + 149 | 123026 | 1 | 0 | 16 | f (8 rows) -- Move all shards to worker1 again diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 36bd504e88d..59c31fc70f5 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -67,7 +67,7 @@ ORDER BY 1; function citus_internal_add_object_metadata(text,text[],text[],integer,integer,boolean) function citus_internal_add_partition_metadata(regclass,"char",text,integer,"char") function citus_internal_add_placement_metadata(bigint,bigint,integer,bigint) - function citus_internal_add_placement_metadata(bigint,integer,bigint,integer,bigint) + function citus_internal_add_placement_metadata(bigint,bigint,integer,bigint,boolean) function citus_internal_add_shard_metadata(regclass,bigint,"char",text,text) function citus_internal_add_tenant_schema(oid,integer) function citus_internal_adjust_local_clock_to_remote(cluster_clock) diff --git a/src/test/regress/sql/metadata_sync_helpers.sql b/src/test/regress/sql/metadata_sync_helpers.sql index a4044bab3de..c2009fd2cbb 100644 --- a/src/test/regress/sql/metadata_sync_helpers.sql +++ b/src/test/regress/sql/metadata_sync_helpers.sql @@ -597,9 +597,9 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); SET application_name to 'citus_internal gpid=10000000001'; \set VERBOSITY terse - WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS - (VALUES (-10, 1, 0::bigint, 1::int, 1500000::bigint)) - SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data; + WITH placement_data(shardid, shardlength, groupid, placementid) AS + (VALUES (-10, 0::bigint, 1::int, 1500000::bigint)) + SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data; ROLLBACK; -- invalid placementid @@ -609,7 +609,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; \set VERBOSITY terse WITH placement_data(shardid, shardlength, groupid, placementid) AS (VALUES (1420000, 0::bigint, 1::int, -10)) - SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data; + SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid, false) FROM placement_data; ROLLBACK; -- non-existing shard @@ -619,7 +619,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; \set VERBOSITY terse WITH placement_data(shardid, shardlength, groupid, placementid) AS (VALUES (1430100, 0::bigint, 1::int, 10)) - SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data; + SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid, false) FROM placement_data; ROLLBACK; -- non-existing node with non-existing node-id 123123123 @@ -629,7 +629,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; \set VERBOSITY terse WITH placement_data(shardid, shardlength, groupid, placementid) AS (VALUES ( 1420000, 0::bigint, 123123123::int, 1500000)) - SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data; + SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid, false) FROM placement_data; ROLLBACK; -- create a volatile function that returns the local node id @@ -656,7 +656,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; WITH placement_data(shardid, shardlength, groupid, placementid) AS (VALUES (1420000, 0::bigint, get_node_id(), 1500000), (1420000, 0::bigint, get_node_id(), 1500001)) - SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data; + SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid, false) FROM placement_data; ROLLBACK; -- shard is not owned by us @@ -666,7 +666,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; \set VERBOSITY terse WITH placement_data(shardid, shardlength, groupid, placementid) AS (VALUES (1420007, 0::bigint, get_node_id(), 1500000)) - SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data; + SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid, false) FROM placement_data; ROLLBACK; -- sucessfully add placements @@ -687,7 +687,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; (1420011, 0::bigint, get_node_id(), 1500009), (1420012, 0::bigint, get_node_id(), 1500010), (1420013, 0::bigint, get_node_id(), 1500011)) - SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data; + SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid, false) FROM placement_data; COMMIT; -- we should be able to colocate both tables now diff --git a/src/test/regress/sql/multi_metadata_attributes.sql b/src/test/regress/sql/multi_metadata_attributes.sql index 1a592d858ef..cdb9fee3b99 100644 --- a/src/test/regress/sql/multi_metadata_attributes.sql +++ b/src/test/regress/sql/multi_metadata_attributes.sql @@ -11,5 +11,6 @@ WHERE atthasmissing AND attrelid NOT IN ('pg_dist_node'::regclass, 'pg_dist_rebalance_strategy'::regclass, 'pg_dist_partition'::regclass, 'pg_dist_object'::regclass, - 'pg_dist_background_task'::regclass) + 'pg_dist_background_task'::regclass, + 'pg_dist_placement'::regclass) ORDER BY attrelid, attname;