Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
onurctirtir committed Sep 18, 2023
1 parent bcb6a8b commit 6926cee
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 112 deletions.
64 changes: 45 additions & 19 deletions src/backend/distributed/metadata/metadata_utility.c
Original file line number Diff line number Diff line change
Expand Up @@ -1515,15 +1515,15 @@ ShardLength(uint64 shardId)


/*
* NodeGroupIsolatesAShardPlacementGroup returns true if given node group is
* used to isolate a shard placement group. This means that the node group
* has a shard placement group that needs an isolated node and no other shard
* placements.
* NodeGroupGetIsolatedShardPlacementGroup returns the shard placement group
* that given node group is used to isolate, if any. Returns NULL if this
* node is not used to a shard placement group.
*/
bool
NodeGroupIsolatesAShardPlacementGroup(int32 groupId)
ShardPlacementGroup *
NodeGroupGetIsolatedShardPlacementGroup(int32 groupId)
{
ShardPlacementGroup *isolatedShardPlacementGroup = NULL;
ShardPlacementGroup *nodeShardPlacementGroup = NULL;
bool shardPlacementGroupNeedsIsolatedNode = false;

bool indexOK = false;
int scanKeyCount = 1;
Expand Down Expand Up @@ -1551,27 +1551,29 @@ NodeGroupIsolatesAShardPlacementGroup(int32 groupId)
GetShardPlacementGroupForPlacement(placement->shardId,
placement->placementId);

if (isolatedShardPlacementGroup)
{
if (!ShardPlacementGroupsSame(shardPlacementGroup,
isolatedShardPlacementGroup))
{
isolatedShardPlacementGroup = NULL;
break;
}
}
else if (placement->needsIsolatedNode)
if (nodeShardPlacementGroup &&
!ShardPlacementGroupsSame(shardPlacementGroup,
nodeShardPlacementGroup))
{
isolatedShardPlacementGroup = shardPlacementGroup;
nodeShardPlacementGroup = NULL;
break;
}

nodeShardPlacementGroup = shardPlacementGroup;
shardPlacementGroupNeedsIsolatedNode = placement->needsIsolatedNode;

heapTuple = systable_getnext(scanDescriptor);
}

systable_endscan(scanDescriptor);
table_close(pgDistPlacement, NoLock);

return isolatedShardPlacementGroup != NULL;
if (!shardPlacementGroupNeedsIsolatedNode)
{
return NULL;
}

return nodeShardPlacementGroup;
}


Expand Down Expand Up @@ -1710,6 +1712,30 @@ FilterShardPlacementList(List *shardPlacementList, bool (*filter)(ShardPlacement
}


/*
* FilterActiveShardPlacementListByNode filters a list of active shard placements based on given nodeName and nodePort.
*/
List *
FilterActiveShardPlacementListByNode(List *shardPlacementList, WorkerNode *workerNode)
{
List *activeShardPlacementList = FilterShardPlacementList(shardPlacementList,
IsActiveShardPlacement);
List *filteredShardPlacementList = NIL;
ShardPlacement *shardPlacement = NULL;

foreach_ptr(shardPlacement, activeShardPlacementList)
{
if (IsPlacementOnWorkerNode(shardPlacement, workerNode))
{
filteredShardPlacementList = lappend(filteredShardPlacementList,
shardPlacement);
}
}

return filteredShardPlacementList;
}


/*
* ActiveShardPlacementListOnGroup returns a list of active shard placements
* that are sitting on group with groupId for given shardId.
Expand Down
107 changes: 77 additions & 30 deletions src/backend/distributed/operations/rebalancer_placement_isolation.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "utils/lsyscache.h"

#include "distributed/colocation_utils.h"
#include "distributed/hash_helpers.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_utility.h"
Expand Down Expand Up @@ -45,6 +46,19 @@ typedef struct
*/
bool shouldHaveShards;

/*
* Whether given node is allowed to isolate any shard placement groups.
*
* This is set only if we're draining a single node because otherwise
* we have the control to isolate shard placement groups on any node.
*
* However if we're draining a single node, we cannot isolate shard
* placement groups on the node that already has some placements because
* we cannot move the existing placements from a node that we're not
* draining to another node when we're draining a single node.
*/
bool allowedToIsolate;

/*
* Shard placement group that is assigned to this node to be isolated.
*
Expand All @@ -57,7 +71,6 @@ typedef struct
* Routines to prepare a hash table where each entry is of type
* NodePlacementGroupHashEntry.
*/
static HTAB * CreateNodePlacementGroupHash(int workerCount);
static void NodePlacementGroupHashInit(HTAB *nodePlacementGroupHash,
List *workerNodeList,
WorkerNode *drainWorkerNode);
Expand All @@ -79,16 +92,18 @@ static int WorkerNodeListGetNodeWithGroupId(List *workerNodeList, int32 nodeGrou

/*
* PrepareRebalancerPlacementIsolationContext creates RebalancerPlacementIsolationContext
* that keeps track of which worker nodes are used to isolate which nodes store which
* shard placement groups that need an isolated node.
* that keeps track of which worker nodes are used to isolate which shard placement groups
* that need an isolated node.
*/
RebalancerPlacementIsolationContext *
PrepareRebalancerPlacementIsolationContext(List *activeWorkerNodeList,
List *activeShardPlacementListList,
WorkerNode *drainWorkerNode)
{
HTAB *nodePlacementGroupHash =
CreateNodePlacementGroupHash(list_length(activeWorkerNodeList));
CreateSimpleHashWithNameAndSize(uint32, NodePlacementGroupHashEntry,
"NodePlacementGroupHash",
list_length(activeWorkerNodeList));
NodePlacementGroupHashInit(nodePlacementGroupHash, activeWorkerNodeList,
drainWorkerNode);

Expand All @@ -106,26 +121,6 @@ PrepareRebalancerPlacementIsolationContext(List *activeWorkerNodeList,
}


/*
* CreateNodePlacementGroupHash creates a hash table where each entry
* is of type NodePlacementGroupHashEntry.
*/
static HTAB *
CreateNodePlacementGroupHash(int workerCount)
{
HASHCTL info;
memset(&info, 0, sizeof(info));

info.keysize = sizeof(uint32);
info.entrysize = sizeof(NodePlacementGroupHashEntry);
info.hcxt = CurrentMemoryContext;
int hashFlags = (HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);

return hash_create("NodePlacementGroupHash Hash",
workerCount, &info, hashFlags);
}


/*
* NodePlacementGroupHashInit initializes given hash table where each
* entry is of type NodePlacementGroupHashEntry by using given list
Expand All @@ -135,6 +130,8 @@ static void
NodePlacementGroupHashInit(HTAB *nodePlacementGroupHash, List *workerNodeList,
WorkerNode *drainWorkerNode)
{
bool drainSingleNode = drainWorkerNode != NULL;

WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
Expand All @@ -144,17 +141,61 @@ NodePlacementGroupHashInit(HTAB *nodePlacementGroupHash, List *workerNodeList,

nodePlacementGroupHashEntry->nodeGroupId = workerNode->groupId;

bool shouldHaveShards = workerNode->shouldHaveShards;
if (drainWorkerNode && drainWorkerNode->groupId == workerNode->groupId)
{
nodePlacementGroupHashEntry->shouldHaveShards = false;
shouldHaveShards = false;
}
else

nodePlacementGroupHashEntry->shouldHaveShards = shouldHaveShards;
nodePlacementGroupHashEntry->allowedToIsolate = shouldHaveShards;
nodePlacementGroupHashEntry->assignedPlacementGroup = NULL;

/*
* For the rest of the comment, assume that:
* Node D: the node we're draining
* Node I: a node that is not D and that has a shard placement group
* that needs an isolated node
* Node R: a node that is not D and that has some regular shard
* placements
*
* If we're draining a single node, then we don't know whether other
* nodes have any regular shard placements or any that need an isolated
* node because in that case GetRebalanceSteps() would provide a list of
* shard placements that are stored on D, not a list that contains all
* the placements accross the cluster (because we want to limit node
* draining to that node in that case.). Note that when all shard
* placements in the cluster are provided, NodePlacementGroupHashAssignNodes()
* would already be aware of which node is used to isolate which shard
* placement group or which node is used to store some regular shard
* placements. That is why we skip below code if we're not draining a
* single node.
*
* Below we find out the assigned placement groups for nodes of type
* I because we want to avoid from moving the placements (if any) from
* node D to node I. We also set allowedToIsolate to false for the nodes
* that already have some shard placements because we want to avoid from
* moving the placements that need an isolated node (if any) from node D
* to node R.
*/
if (!(shouldHaveShards && drainSingleNode))
{
nodePlacementGroupHashEntry->shouldHaveShards =
workerNode->shouldHaveShards;
continue;
}

nodePlacementGroupHashEntry->assignedPlacementGroup = NULL;
ShardPlacementGroup *isolatedShardPlacementGroup =
NodeGroupGetIsolatedShardPlacementGroup(
nodePlacementGroupHashEntry->nodeGroupId);
if (isolatedShardPlacementGroup)
{
nodePlacementGroupHashEntry->assignedPlacementGroup =
isolatedShardPlacementGroup;
}
else
{
nodePlacementGroupHashEntry->allowedToIsolate =
!NodeGroupHasShardPlacements(nodePlacementGroupHashEntry->nodeGroupId);
}
}
}

Expand Down Expand Up @@ -274,6 +315,11 @@ NodePlacementGroupHashAssignNode(HTAB *nodePlacementGroupHash,
return false;
}

if (!nodePlacementGroupHashEntry->allowedToIsolate)
{
return false;
}

nodePlacementGroupHashEntry->assignedPlacementGroup = placementGroup;

return true;
Expand Down Expand Up @@ -314,7 +360,8 @@ RebalancerPlacementIsolationContextPlacementIsAllowedOnWorker(
*/
ShardPlacementGroup *placementGroup =
GetShardPlacementGroupForPlacement(shardId, placementId);
return ShardPlacementGroupsSame(nodePlacementGroupHashEntry->assignedPlacementGroup,
return nodePlacementGroupHashEntry->assignedPlacementGroup != NULL &&
ShardPlacementGroupsSame(nodePlacementGroupHashEntry->assignedPlacementGroup,
placementGroup);
}

Expand Down
48 changes: 2 additions & 46 deletions src/backend/distributed/operations/shard_rebalancer.c
Original file line number Diff line number Diff line change
Expand Up @@ -484,36 +484,6 @@ FullShardPlacementList(Oid relationId, ArrayType *excludedShardArray)
}


/*
* ExcludeShardPlacementsFromNodesNotAllowedToHaveShards excludes shard
* placements from nodes that are not allowed to have shards and returns
* the filtered list.
*
* As an excecption, the shard placements on specified worker node are
* not excluded even if the node has been marked as not allowed to have
* placements.
*/
static List *
ExcludeShardPlacementsFromNodesNotAllowedToHaveShards(List *shardPlacementList,
WorkerNode *workerNode)
{
List *filteredShardPlacementList = NIL;

ShardPlacement *shardPlacement = NULL;
foreach_ptr(shardPlacement, shardPlacementList)
{
WorkerNode *node = LookupNodeByNodeIdOrError(shardPlacement->nodeId);
if (node->shouldHaveShards || workerNode->nodeId == node->nodeId)
{
filteredShardPlacementList = lappend(filteredShardPlacementList,
shardPlacement);
}
}

return filteredShardPlacementList;
}


/*
* SortedActiveWorkers returns all the active workers like
* ActiveReadableNodeList, but sorted.
Expand Down Expand Up @@ -583,22 +553,8 @@ GetRebalanceSteps(RebalanceOptions *options)

if (options->workerNode != NULL)
{
/*
* Exclude the shard placements from the nodes that are marked
* as "not allowed to have shards", except for the specified
* worker node.
*
* We do not only include the shards from the specified worker
* but also the shards from the nodes that are allowed to have
* shards because draining the specified worker node might require
* moving the existing shards away from some other nodes,
* e.g., when the node we're draining causes placing a placement
* that needs an isolated node to another node that already had
* some shard placements.
*/
activeShardPlacementListForRelation =
ExcludeShardPlacementsFromNodesNotAllowedToHaveShards(
shardPlacementList, options->workerNode);
activeShardPlacementListForRelation = FilterActiveShardPlacementListByNode(
shardPlacementList, options->workerNode);
}

if (list_length(activeShardPlacementListForRelation) >= shardAllowedNodeCount)
Expand Down
2 changes: 1 addition & 1 deletion src/backend/distributed/operations/worker_node_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ NodeCanBeUsedForNonIsolatedPlacements(WorkerNode *node)
return false;
}

return !NodeGroupIsolatesAShardPlacementGroup(node->groupId);
return NodeGroupGetIsolatedShardPlacementGroup(node->groupId) == NULL;
}


Expand Down
4 changes: 3 additions & 1 deletion src/include/distributed/metadata_utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ extern int ShardIntervalCount(Oid relationId);
extern List * LoadShardList(Oid relationId);
extern ShardInterval * CopyShardInterval(ShardInterval *srcInterval);
extern uint64 ShardLength(uint64 shardId);
extern bool NodeGroupIsolatesAShardPlacementGroup(int32 groupId);
extern ShardPlacementGroup * NodeGroupGetIsolatedShardPlacementGroup(int32 groupId);
extern bool ShardPlacementGroupsSame(const ShardPlacementGroup *leftGroup,
const ShardPlacementGroup *rightGroup);
extern bool NodeGroupHasShardPlacements(int32 groupId);
Expand All @@ -343,6 +343,8 @@ extern bool IsRemoteShardPlacement(ShardPlacement *shardPlacement);
extern bool IsPlacementOnWorkerNode(ShardPlacement *placement, WorkerNode *workerNode);
extern List * FilterShardPlacementList(List *shardPlacementList, bool (*filter)(
ShardPlacement *));
extern List * FilterActiveShardPlacementListByNode(List *shardPlacementList,
WorkerNode *workerNode);
extern List * ActiveShardPlacementListOnGroup(uint64 shardId, int32 groupId);
extern List * ActiveShardPlacementList(uint64 shardId);
extern List * ShardPlacementListSortedByWorker(uint64 shardId);
Expand Down
2 changes: 1 addition & 1 deletion src/test/regress/bin/normalize.sed
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ s/CREATE TABLESPACE test_tablespace LOCATION.*/CREATE TABLESPACE test_tablespace
s/(.*absolute correlation \()([0,1]\.[0-9]+)(\) of var attribute [0-9]+ is smaller than.*)/\1X\.YZ\3/g

# normalize differences in multi_fix_partition_shard_index_names test
s/NOTICE: issuing WITH placement_data\(shardid, shardlength, groupid, placementid, needsisolatednode\) AS \(VALUES \([0-9]+, [0-9]+, [0-9]+, [0-9]+\)\)/NOTICE: issuing WITH placement_data\(shardid, shardlength, groupid, placementid\) AS \(VALUES \(xxxxxx, xxxxxx, xxxxxx, xxxxxx\)\)/g
s/NOTICE: issuing WITH placement_data\(shardid, shardlength, groupid, placementid, needsisolatednode\) AS \(VALUES \([0-9]+, [0-9]+, [0-9]+, [0-9]+, (true|false)\)\)/NOTICE: issuing WITH placement_data\(shardid, shardlength, groupid, placementid, needsisolatednode\) AS \(VALUES \(xxxxxx, xxxxxx, xxxxxx, xxxxxx, xxxxxx\)\)/g

# global_pid when pg_cancel_backend is sent to workers
s/pg_cancel_backend\('[0-9]+'::bigint\)/pg_cancel_backend('xxxxx'::bigint)/g
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,9 +687,9 @@ NOTICE: issuing WITH shard_data(relationname, shardid, storagetype, shardminval
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('fix_idx_names.p2'::regclass, 915002, 't'::"char", '-2147483648', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing WITH placement_data(shardid, shardlength, groupid, placementid, needsisolatednode) AS (VALUES (915002, 0, 14, 920004, false)) SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid, needsisolatednode) FROM placement_data;
NOTICE: issuing WITH placement_data(shardid, shardlength, groupid, placementid, needsisolatednode) AS (VALUES (xxxxxx, xxxxxx, xxxxxx, xxxxxx, xxxxxx)) SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid, needsisolatednode) FROM placement_data;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing WITH placement_data(shardid, shardlength, groupid, placementid, needsisolatednode) AS (VALUES (915002, 0, 14, 920004, false)) SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid, needsisolatednode) FROM placement_data;
NOTICE: issuing WITH placement_data(shardid, shardlength, groupid, placementid, needsisolatednode) AS (VALUES (xxxxxx, xxxxxx, xxxxxx, xxxxxx, xxxxxx)) SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid, needsisolatednode) FROM placement_data;
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SET citus.enable_ddl_propagation TO 'off'
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
Expand Down
Loading

0 comments on commit 6926cee

Please sign in to comment.