Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add citus_shard_property_set() UDF to allow separating placements of given shard from others in the cluster #7193

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

onurctirtir
Copy link
Member

@onurctirtir onurctirtir commented Sep 11, 2023

citus_shard_property_set() accepts optional "anti_affinity" parameter such that it marks all shards that are colocated with the given shard as "needs to be separated" in pg_dist_shard via new "needsseparatenode" column, when "anti_affinity" is set to true. Likewise, setting "anti_affinity" to false undos the effect mentioned.

And when "needsseparatenode" column is set for a shard, then the placements of the shard and the other shard placements that are colocated with them will be placed on nodes in a way that no other shard placement will be placed on those nodes upon the next invocation of shard rebalancer. Say, if the shard group has two shard placement groups S1 & S2, then while S1 will be stored on node NX and S2 will be stored on node NY, no other shard placement will be placed on NX or NY.

The rebalancer will try to place the placements that need to be separated on the nodes where they stay currently. If that's not possible, then it will try to place them on other worker nodes.

This feature would be useful to separate distributed schemas to individual nodes. We can add very simple wrappers around citus_shard_property_set() to mark a shard-group that corresponds to given distributed schema as "needs to be separated" or as "doesn't need to be separated".

When citus_shard_property_set() is invoked with "anti_affinity=>true", we mark all the shards within the related shard-group as "needs to be separated" just to avoid from forgetting about whether a shard-group needs separate node. If we did so for a random shard within the shard-group, then dropping the distributed table that owns that shard would cause us forgeting about the separation requirement. If we had shard-group concept in the code, then we wouldn't need to do so. Likely, whenever a distributed table is created in a way that a new shard is added into an existing shard-group that needs a separate node, we mark that new shard as "needs to be separated" too.

Note that citus_shard_property_set() cannot be run concurrently with distributed table creation, shard transfer or any other operation that targets the same colocation group and that needs to acquire placement colocation lock (via AcquirePlacementColocationLock()).

This PR also adds "has_separate_node" column to "citus_shards", such that; if "has_separate_node" is true, then the node that contains the shard only contains the shards that belong to the same shard group.

Design choices:

  • Even if the input shard were marked as "needsseparatenode", citus_split_shard_by_split_points() / isolate_tenant_to_new_shard() ignore this setting for the shards created as a result of the split.
  • citus_move_shard_placement() / citus_copy_shard_placement() don't check whether placement separation rules allow target node to store the placement.
  • Similarly, citus_split_shard_by_split_points() doesn't check whether placement separation rules allow target node to store the split shards.
  • citus_drain_node() takes shard separation rules into account when draining a node. However, it might fail to enforce some separation requirements defined for the shards on the node we're draining and might emit a warning for some of the required shard-move operations, as before. Likely, if the node we're draining has some placements that don't need to be separated but there are no nodes that we can place them, then we emit warnings for those placements as well.
  • When creating placements that don't need an separated node, create_distributed_table() / create_distributed_table_concurrently() / replicate_table_shards() takes shard separation rules into account when creating / replicating a distributed table such that they don't place new placements to the nodes that are used to separate shard placement groups.
  • When creating placements that need a separated node via replicate_table_shards(), it might put the new placements on the nodes that already store some shards. This is not the case for create_distributed_table() / create_distributed_table_concurrently() because new placements have to be co-located with the existing ones and this prevents the behavior that happens in replicate_table_shards().
  • create_distributed_table() / create_distributed_table_concurrently() inherits shard separation rules from the table that input table is requested to be co-located with.

DESCRIPTION: Adds citus_shard_property_set() UDF to allow separating placements of given shard from others in the cluster based on anti_affinity parameter.

@onurctirtir onurctirtir force-pushed the tenant-schema-isolation branch 5 times, most recently from 697908a to fa2b1d9 Compare September 13, 2023 14:32
@onurctirtir onurctirtir changed the title Allow isolating shard placement groups on individual nodes Allow isolating shard groups on individual nodes Sep 14, 2023
@onurctirtir onurctirtir changed the title Allow isolating shard groups on individual nodes Add citus_shard_set_isolated() UDF to allow isolating placements of given shard from others in the cluster Sep 14, 2023
@onurctirtir onurctirtir force-pushed the tenant-schema-isolation branch 8 times, most recently from 065d879 to 6926cee Compare September 18, 2023 14:42
@@ -3163,7 +3189,7 @@ ReplicationPlacementUpdates(List *workerNodeList, List *activeShardPlacementList
{
WorkerNode *workerNode = list_nth(workerNodeList, workerNodeIndex);

if (!NodeCanHaveDistTablePlacements(workerNode))
if (!NodeCanBeUsedForNonIsolatedPlacements(workerNode))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inefficient but not sure if it's worth to optimize

@onurctirtir onurctirtir force-pushed the tenant-schema-isolation branch 13 times, most recently from e1d7cc6 to 737eb8f Compare September 25, 2023 11:33
@onurctirtir onurctirtir force-pushed the tenant-schema-isolation branch 3 times, most recently from ad877eb to 0c3bf74 Compare September 26, 2023 14:54
@codecov
Copy link

codecov bot commented Sep 26, 2023

Codecov Report

Merging #7193 (f1f72f3) into main (2fae91c) will increase coverage by 0.02%.
The diff coverage is 91.25%.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #7193      +/-   ##
==========================================
+ Coverage   89.60%   89.62%   +0.02%     
==========================================
  Files         282      283       +1     
  Lines       60314    60634     +320     
  Branches     7512     7544      +32     
==========================================
+ Hits        54043    54342     +299     
- Misses       4116     4136      +20     
- Partials     2155     2156       +1     

@onurctirtir onurctirtir changed the title Add citus_shard_property_set() UDF to allow isolating placements of given shard from others in the cluster Add citus_shard_property_set() UDF to allow separating placements of given shard from others in the cluster Sep 29, 2023
@onurctirtir onurctirtir force-pushed the tenant-schema-isolation branch from 3f279c7 to 0d66311 Compare September 29, 2023 13:09
@onurctirtir onurctirtir force-pushed the tenant-schema-isolation branch from 0d66311 to 50febd9 Compare September 29, 2023 14:15
@onurctirtir onurctirtir force-pushed the tenant-schema-isolation branch 2 times, most recently from 47b901d to b0fa3d9 Compare October 13, 2023 12:01
Comment on lines 1529 to 1627
HeapTuple heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPlacement);

GroupShardPlacement *placement =
TupleToGroupShardPlacement(tupleDescriptor, heapTuple);

ShardInterval *shardInterval = LoadShardInterval(placement->shardId);
Oid citusTableId = shardInterval->relationId;
if (!IsCitusTableType(citusTableId, DISTRIBUTED_TABLE))
{
heapTuple = systable_getnext(scanDescriptor);
continue;
}

ShardPlacementGroup *shardPlacementGroup =
GetShardPlacementGroupForPlacement(placement->shardId,
placement->placementId);

if (nodeShardPlacementGroup &&
!ShardPlacementGroupsSame(shardPlacementGroup,
nodeShardPlacementGroup))
{
nodeShardPlacementGroup = NULL;
break;
}

nodeShardPlacementGroup = shardPlacementGroup;
shardPlacementGroupNeedsSeparateNode = shardInterval->needsSeparateNode;

heapTuple = systable_getnext(scanDescriptor);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a couple of exit points in this loop, to make it easier to write evidently correct code I would suggest to only invoke systable_getnext in the condition of the while loop like this:

Suggested change
HeapTuple heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPlacement);
GroupShardPlacement *placement =
TupleToGroupShardPlacement(tupleDescriptor, heapTuple);
ShardInterval *shardInterval = LoadShardInterval(placement->shardId);
Oid citusTableId = shardInterval->relationId;
if (!IsCitusTableType(citusTableId, DISTRIBUTED_TABLE))
{
heapTuple = systable_getnext(scanDescriptor);
continue;
}
ShardPlacementGroup *shardPlacementGroup =
GetShardPlacementGroupForPlacement(placement->shardId,
placement->placementId);
if (nodeShardPlacementGroup &&
!ShardPlacementGroupsSame(shardPlacementGroup,
nodeShardPlacementGroup))
{
nodeShardPlacementGroup = NULL;
break;
}
nodeShardPlacementGroup = shardPlacementGroup;
shardPlacementGroupNeedsSeparateNode = shardInterval->needsSeparateNode;
heapTuple = systable_getnext(scanDescriptor);
}
HeapTuple heapTuple = NULL;
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
{
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPlacement);
GroupShardPlacement *placement =
TupleToGroupShardPlacement(tupleDescriptor, heapTuple);
ShardInterval *shardInterval = LoadShardInterval(placement->shardId);
Oid citusTableId = shardInterval->relationId;
if (!IsCitusTableType(citusTableId, DISTRIBUTED_TABLE))
{
continue;
}
ShardPlacementGroup *shardPlacementGroup =
GetShardPlacementGroupForPlacement(placement->shardId,
placement->placementId);
if (nodeShardPlacementGroup &&
!ShardPlacementGroupsSame(shardPlacementGroup,
nodeShardPlacementGroup))
{
nodeShardPlacementGroup = NULL;
break;
}
nodeShardPlacementGroup = shardPlacementGroup;
shardPlacementGroupNeedsSeparateNode = shardInterval->needsSeparateNode;
}

This allows easy changes in break/continue with evidently correct behaviour

Comment on lines 1549 to 1623
if (nodeShardPlacementGroup &&
!ShardPlacementGroupsSame(shardPlacementGroup,
nodeShardPlacementGroup))
{
nodeShardPlacementGroup = NULL;
break;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A small comment above here would be great to document why we are breaking out and setting nodeShardPlacementGroup to NULL.

I assume it is because this function is checking if there is exactly one nodeShardPlacementGroup on this node, if we find a different one this invariant is broken.

!ShardPlacementGroupsSame(shardPlacementGroup,
nodeShardPlacementGroup))
{
nodeShardPlacementGroup = NULL;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to set shardPlacementGroupNeedsSeparateNode to false here.

If the first group we find had needsSeparateNode set this variable could have been left true causing the early escape to not trigger.

It seems that the code would not crash terribly, and might actually work correctly, but the flow doesn't feel natural now, as the idea could arise that nodeShardPlacementGroup is always non-NULL when shardPlacementGroupNeedsSeparateNode is true.

Comment on lines +619 to +625

/*
* For a new colocation group, needsseparatenode is set to false for
* all shards.
*/
needsSeparateNodeForPlacementList = GenerateListFromIntElement(false, shardCount);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific reason to use a list of int's to denote a boolean array instead of using a bitmap? Not saying it is wrong, but it feels strange. Wonder if there is any specific reason for this use of a List?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no specific reason for doing this indeed, just because workersForPlacementList is of type List * too.

uint32 colocatationId;
int shardIntervalIndex;
int32 nodeGroupId;
} ShardPlacementGroup;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to revisit this name. Because it signals a placement group, which is a bit confusing with nodegroup.

Would it make sense to call this a ShardgroupPlacement and use this to introduce a bit of the naming around Shardgroups (take note of the lowercase group here).

I think the first two members uniquely identify what we internally started talking about as Shardgroups.

By not having the group written with a capital it makes it clear what the group is talking about, contrary to its current name.

Comment on lines 1 to 10
/*-------------------------------------------------------------------------
*
* rebalancer_placement_isolation.c
* Routines to determine which worker node should be used to separate
* a colocated set of shard placements that need separate nodes.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file needs serious thoughts about naming and concepts. It is very hard to follow what is going on.

*/
bool
NodeCanHaveDistTablePlacements(WorkerNode *node)
NodeCanBeUsedForNonIsolatedPlacements(WorkerNode *node)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isolated is probably a left over from the earlier itteration, I think you call it separated throught the rest of the code now.

@@ -145,6 +146,8 @@ typedef struct RebalanceContext
FmgrInfo shardCostUDF;
FmgrInfo nodeCapacityUDF;
FmgrInfo shardAllowedOnNodeUDF;

RebalancerPlacementIsolationContext *rebalancerPlacementGroupIsolationContext;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given we are in a rebalance context I think we don't have to be that verbose in the member name

Suggested change
RebalancerPlacementIsolationContext *rebalancerPlacementGroupIsolationContext;
RebalancerPlacementIsolationContext *shardgroupSeparationContext;

@onurctirtir onurctirtir requested a review from thanodnl October 17, 2023 22:34
Copy link
Member

@thanodnl thanodnl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the cleanup of the rebalancer placement separation logic (thanks!) it became clear that there are logical issues in the algorithm used to serapate shardgroups to nodes. Please see elaborate comment in InitRebalancerPlacementSeparationContext for an analysis and reproducable example of failure to separate due to logical issues.

Comment on lines 3279 to 3285

bool needsSeparateNode = false;
if (expectNeedsSeparateNode)
{
PG_ENSURE_ARGNOTNULL(5, "needs separate node");
needsSeparateNode = PG_GETARG_BOOL(5);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do that to support rolling upgrades to some degree, where while the coordinator and some workers have Citus version X, some other workers have Citus version Y, where Y > X.

Do our managed services guarantee all workers are upgraded before the coordinator is?

Comment on lines 386 to 399
Oid colocatedTableId = InvalidOid;
foreach_oid(colocatedTableId, colocatedTableList)
{
LockRelationOid(colocatedTableId, ShareUpdateExclusiveLock);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we comment this section with the reason for locking?
What concurrent actions are we guarding, and why do we need to lock all relations in the colocation group for that? even after already acquiring the `PlacementColocationLock`` before?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we at least wouldn't want relations to get dropped when setting a property for the shards. Maybe this should simply be AccessShareLock?

Comment on lines 451 to 456
foreach_ptr(colocatedShardInterval, colocatedShardIntervalList)
{
ShardSetNeedsSeparateNode(colocatedShardInterval->shardId,
enabled);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so the most problematic of all is the amount we increment the command counter here. For an operation like this we should do 1 batch update and 1 command increment.

Don't we already have the logical relation id's earlier that we can use to send the invalidations?

I think there is a big opportunity here to refactor, which we might want to do in it's own PR for the rest of the code. For now we could refactor this by inlining, and using the same update loop to create a list of all logical rel id's that needs invalidation. Pass that list to a new, specialized function to batch invalidate the caches for a list of relid's.

Later we should go through our code, because I think there might be many places where we are invalidating those caches for all colocated tables at once. We could reuse the introduced function to invalidate a list of relid's at once. Ideally this would remove the need to invalidate shardid's at all, if not we could batchify that as well.

* that given shard belongs to.
*/
Datum
citus_internal_shard_group_set_needsseparatenode(PG_FUNCTION_ARGS)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In different work I am adding the concept of a shardgroup, given the difference in arguments I suggest keeping this function named shard and refrain from the group name in API's if we are still only passing shardid's. Any reason this function breaks with the common pattern where the internal metadata functions are simple wrappers around insert/updating catalog entries and perform lookups here to find colocated shards? I would suggest making this function work on a single shardid and call it for every shard, driving the fanout behaviour from the coordinator.

the function is very specific, possibly too specific. Any reason to have needsseparatenode in the function name, compared to move it as an argument? The only reason I could think of is as to be able to deprecate this 'easily' once shardgroups are introduced. Given this shard based parameter moves to shardgroups by then, it would scream for an explanation in the comment of the function though.

lastly, in orthogonal work I am introducting the concept of a shardgroup (one word). The naming of this function, given it works on shardid's might become confusing given it talks about shard group (two words) with a different identifier.

I would change this function to reflect it works on shards, takes needsseparatenode as an argument, and fill out all arguments based on the updated state on the coordinator (assuming the functionality might extend over time to add more configurations parameters to shards).

Copy link
Member Author

@onurctirtir onurctirtir Oct 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest making this function work on a single shardid and call it for every shard, driving the fanout behaviour from the coordinator.

But wouldn't this be super inefficient based on our discussion?
#7193 (comment)

The naming of this function, given it works on shardid's might become confusing given it talks about shard group (two words) with a different identifier.

I would suggest making this function work on a single shardid and call it for every shard, driving the fanout behaviour from the coordinator.

I understand all those concerns, but I'm not really sure if it's a good idea to send citus_internal_shard_group_set_needsseparatenode command to workers N times for N colocated shards. At least, it'd be inconsistent with what we're doing in local node.

the function is very specific, possibly too specific. Any reason to have needsseparatenode in the function name, compared to move it as an argument?

No particular reasons, let me make it a function argument and rename the function to citus_internal_shard_property_set? It'd still perform the updates on all the colocated shards but this is also the case for citus_shard_property_set. (citus_shard_property_set also takes a shardid but operates on all the colocated shards).

Comment on lines +363 to +371
/*
* citus_shard_property_set allows setting shard properties for all
* the shards within the shard group that given shard belongs to.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes all properties will target shardgroups, which will probably not be the case. Once shardgroups are formally introduced we should move needs_separate_node/anti_affinity to a function targetting shardgroups. This function could stick around for shard specific properties.

Unfortunately we can't introduce the shardgroup based function yet as we don't have a way to address shardgroups. It is unclear if we get that before we release. If so we can refactor this work to move to shardgroups directly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once shardgroups are formally introduced we should move needs_separate_node/anti_affinity to a function targetting shardgroups.

👍

FmgrInfo *shardAllowedOnNodeUDF)
{
HTAB *nodePlacementGroupHash =
CreateSimpleHashWithNameAndSize(uint32, NodeToPlacementGroupHashEntry,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the type of the key differs from the type of the first field in the struct.

Suggested change
CreateSimpleHashWithNameAndSize(uint32, NodeToPlacementGroupHashEntry,
CreateSimpleHashWithNameAndSize(int32, NodeToPlacementGroupHashEntry,

list_length(activeWorkerNodeList));

RebalancerPlacementSeparationContext *context =
palloc(sizeof(RebalancerPlacementSeparationContext));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We like zero initialized values.

Suggested change
palloc(sizeof(RebalancerPlacementSeparationContext));
palloc0(sizeof(RebalancerPlacementSeparationContext));

Comment on lines 161 to 162
nodePlacementGroupHashEntry->shouldHaveShards =
workerNode->shouldHaveShards;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this can be formatted on a single line

Suggested change
nodePlacementGroupHashEntry->shouldHaveShards =
workerNode->shouldHaveShards;
nodePlacementGroupHashEntry->shouldHaveShards = workerNode->shouldHaveShards;

locally running citus_indent shows that it fits.

Comment on lines 166 to 189
/*
* Lets call set of the nodes that placements in rebalancePlacementList
* are stored on as D and the others as S. In other words, D is the set
* of the nodes that we're allowed to move the placements "from" or
* "to (*)" (* = if we're not draining it) and S is the set of the nodes
* that we're only allowed to move the placements "to" but not "from".
*
* This means that, for a node of type S, the fact that whether the node
* is used to separate a placement group or not cannot be changed in the
* runtime.
*
* For this reason, below we find out the assigned placement groups for
* nodes of type S because we want to avoid from moving the placements
* (if any) from a node of type D to a node that is used to separate a
* placement group within S. We also set hasPlacementsThatCannotBeMovedAway
* to true for the nodes that already have some shard placements within S
* because we want to avoid from moving the placements that need a separate
* node (if any) from node D to node S.
*
* We skip below code for nodes of type D not because optimization purposes
* but because it would be "incorrect" to assume that "current placement
* distribution for a node of type D would be the same" after the rebalancer
* plans the moves.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still find this comment incomprehensable.

As far as I understand the code:

  • Nodes of type S do not have any placements on them and are allowed to have placements on them, eg. an empty node
  • Nodes of type D have placements on them, and are allowed to have placements on them

What is encapsulated in the comment above that is not covered by my two lines?

Copy link
Member Author

@onurctirtir onurctirtir Oct 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nodes of type S do not have any placements on them and are allowed to have placements on them, eg. an empty node

S is not necessarily empty. S means a node that doesn't store any of input placements and hence, the ones that we cannot move the shards away "from".

Nodes of type D have placements on them, and are allowed to have placements on them

Yes. Even more, D actually corresponds to the set of nodes that contain input placements.

Comment on lines 204 to 206
nodePlacementGroupHashEntry->hasPlacementsThatCannotBeMovedAway =
NodeGroupHasDistributedTableShardPlacements(
nodePlacementGroupHashEntry->nodeGroupId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It took me quite a while to understand why it has placements that cannot be moved away was chosen as this name.

I now think I understand it a bit.

Given we didn't find the groupId in the list of rebalancePlacementList, containing the list of shards to rebalance, the view of our node is 'empty'. Howver, due to the stupidity of the Citus rebalancer, you are allowed to call the rebalancer on a subset of that shards in the system, limiting the rebalancing to a single set of colocated tables. Meaning, us not seeing the nodeGroupId in the list of placements up for rebalancing, doesn't mean the node is empty.

This variable therefor does not contain placements that cannot be moved, instead the node contains shards for colocation groups that we are currently not rebalancing.

I guess this means that, since colocation groups having placements on this node are not rebalanced we can currently not use this node as a node to place a separated shardgoup on.

I think we need to dumb down to comments on this function to describe the above, instead of defining nodes as D or S

Also the field hasPlacementsThatCannotBeMovedAway should be renamed or atleast have a more desciptive comment on it. A name that works better would be hasPlacementsOutsideCurrentRebalance.

Lastly, we never set this parameter to true if we found this node in the list of placements we are rebalancing. It doesn't mean that if the node is found in that list it doesn't contain shards that are outside of this rebalance, eg. I think not setting this value for nodes you define as type D is incorrect. This seems to be corroborated by the checks we perform in TryAssignPlacementGroupsToNodegroups.

Lets say we have two colocation groups with 4 shards respectively:

C1 => SG1, SG2, SG3, SG4
C2 => SG5, SG6, SG7, SG8

3 Nodes
N1 => SG1, SG3, SG5, SG7
N2 => SG2, SG4, SG6, SG8
N3 => newnode

if we are rebalancing C1 we will find nodes N1 and N2 in rebalancePlacementList, designating them as D only N3 is now designated as S.
N3 doesn't contain any shards that 'can't be moved' or as I describe them 'having shards outside of rebalance'. However N1 and N2 do have them.

Now assume we are separating SG1, we call TryAssignPlacementGroupsToNodeGroups which iterates over SG1. We call TryAssignPlacementGroupToNode for SG1 on its current node N1. Since we assumed N1 to be of type D we have never set hasPlacementsThatCannotBeMovedAway to true, even though SG5 and SG7 are on the node. This allows TryAssignPlacementGroupToNode to assign SG1 to this node, instead of moving it to the newly added node N3.

Experimenting the above scenario with citus_dev shows that this is indeed the case:

citus_dev make citus --size 3
-- create 2 tables with 4 shards each in 2 colocation groups balanced over 2 nodes in a 3 node cluster
SELECT citus_set_node_property('localhost', 9703, 'shouldhaveshards', false);
CREATE TABLE foo( a bigserial primary key, b text);
CREATE TABLE bar( a bigserial primary key, b text);
SELECT create_distributed_table('foo','a',shard_count=>4,colocate_with=>'none');
SELECT create_distributed_table('bar','a',shard_count=>4,colocate_with=>'none');
SELECT citus_set_node_property('localhost', 9703, 'shouldhaveshards', true);

-- separate the shard for tenant 5
SELECT get_shard_id_for_distribution_column('foo', 5); -- 102104 in my case (shadindex1 or SG1 from the above scenario)
SELECT citus_shard_property_set(get_shard_id_for_distribution_column('foo', 5), anti_affinity => true);

-- rebalance the colocation group where foo is in
SELECT rebalance_table_shards('foo');
-- NOTICE:  Moving shard 102106 from localhost:9701 to localhost:9703 ...
-- (sharindex 3 or SG3 from the above scenario)

SELECT * FROM citus_shards;
-- shows that no shard has `has_seprate_node` set to true

As you will see, a shard from foo that doesn't need isolation gets isolated to the newly added node, instead of the one that actually needs isolation, leaving the one that we want isolated left over on a node with shardgroups from different colocation groups.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lastly, we never set this parameter to true if we found this node in the list of placements we are rebalancing. It doesn't mean that if the node is found in that list it doesn't contain shards that are outside of this rebalance

Right, this is an outcome of the algorithm I realized last week and wishing to talk to you as well. However, I'm not sure if this would imply an important problem. Maybe we can tell users that shard separation rules may not become effective unless the rebalancer is invoked for whole cluster?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a stab at getting this datastructure correctly populated: 6a64c23

@onurctirtir onurctirtir force-pushed the tenant-schema-isolation branch 2 times, most recently from c5fd89b to f6b9b41 Compare November 6, 2023 22:55
@@ -0,0 +1,21 @@
SET client_min_messages TO WARNING;
Copy link
Contributor

@JelteF JelteF Nov 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this PR should also add tests to src/test/regress/sql/shard_rebalancer_unit.sql, to show how the rebalancer plans moves in relation with isolation of placements.

@onurctirtir onurctirtir force-pushed the tenant-schema-isolation branch from f6b9b41 to 6b9ba04 Compare February 9, 2024 10:03
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the correct way of updating add_shard_metadata UDF? In other words, should we update definitions of it in both schemas?

Comment on lines +57 to +58
DROP FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text);
#include "udfs/citus_internal_add_shard_metadata/12.2-1.sql"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +57 to +59
DROP FUNCTION citus_internal.add_shard_metadata(regclass, bigint, "char", text, text, boolean);
DROP FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text, boolean);
#include "../udfs/citus_internal_add_shard_metadata/10.2-1.sql"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@onurctirtir onurctirtir force-pushed the tenant-schema-isolation branch 3 times, most recently from 2dc12d2 to 0fcd726 Compare February 9, 2024 12:16
@onurctirtir onurctirtir force-pushed the tenant-schema-isolation branch from 0fcd726 to f1f72f3 Compare February 9, 2024 12:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants