-
Notifications
You must be signed in to change notification settings - Fork 684
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add citus_shard_property_set() UDF to allow separating placements of given shard from others in the cluster #7193
base: main
Are you sure you want to change the base?
Conversation
697908a
to
fa2b1d9
Compare
065d879
to
6926cee
Compare
@@ -3163,7 +3189,7 @@ ReplicationPlacementUpdates(List *workerNodeList, List *activeShardPlacementList | |||
{ | |||
WorkerNode *workerNode = list_nth(workerNodeList, workerNodeIndex); | |||
|
|||
if (!NodeCanHaveDistTablePlacements(workerNode)) | |||
if (!NodeCanBeUsedForNonIsolatedPlacements(workerNode)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inefficient but not sure if it's worth to optimize
e1d7cc6
to
737eb8f
Compare
ad877eb
to
0c3bf74
Compare
Codecov Report
Additional details and impacted files@@ Coverage Diff @@
## main #7193 +/- ##
==========================================
+ Coverage 89.60% 89.62% +0.02%
==========================================
Files 282 283 +1
Lines 60314 60634 +320
Branches 7512 7544 +32
==========================================
+ Hits 54043 54342 +299
- Misses 4116 4136 +20
- Partials 2155 2156 +1 |
3f279c7
to
0d66311
Compare
0d66311
to
50febd9
Compare
src/backend/distributed/operations/rebalancer_placement_isolation.c
Outdated
Show resolved
Hide resolved
47b901d
to
b0fa3d9
Compare
HeapTuple heapTuple = systable_getnext(scanDescriptor); | ||
while (HeapTupleIsValid(heapTuple)) | ||
{ | ||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPlacement); | ||
|
||
GroupShardPlacement *placement = | ||
TupleToGroupShardPlacement(tupleDescriptor, heapTuple); | ||
|
||
ShardInterval *shardInterval = LoadShardInterval(placement->shardId); | ||
Oid citusTableId = shardInterval->relationId; | ||
if (!IsCitusTableType(citusTableId, DISTRIBUTED_TABLE)) | ||
{ | ||
heapTuple = systable_getnext(scanDescriptor); | ||
continue; | ||
} | ||
|
||
ShardPlacementGroup *shardPlacementGroup = | ||
GetShardPlacementGroupForPlacement(placement->shardId, | ||
placement->placementId); | ||
|
||
if (nodeShardPlacementGroup && | ||
!ShardPlacementGroupsSame(shardPlacementGroup, | ||
nodeShardPlacementGroup)) | ||
{ | ||
nodeShardPlacementGroup = NULL; | ||
break; | ||
} | ||
|
||
nodeShardPlacementGroup = shardPlacementGroup; | ||
shardPlacementGroupNeedsSeparateNode = shardInterval->needsSeparateNode; | ||
|
||
heapTuple = systable_getnext(scanDescriptor); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a couple of exit points in this loop, to make it easier to write evidently correct code I would suggest to only invoke systable_getnext
in the condition of the while loop like this:
HeapTuple heapTuple = systable_getnext(scanDescriptor); | |
while (HeapTupleIsValid(heapTuple)) | |
{ | |
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPlacement); | |
GroupShardPlacement *placement = | |
TupleToGroupShardPlacement(tupleDescriptor, heapTuple); | |
ShardInterval *shardInterval = LoadShardInterval(placement->shardId); | |
Oid citusTableId = shardInterval->relationId; | |
if (!IsCitusTableType(citusTableId, DISTRIBUTED_TABLE)) | |
{ | |
heapTuple = systable_getnext(scanDescriptor); | |
continue; | |
} | |
ShardPlacementGroup *shardPlacementGroup = | |
GetShardPlacementGroupForPlacement(placement->shardId, | |
placement->placementId); | |
if (nodeShardPlacementGroup && | |
!ShardPlacementGroupsSame(shardPlacementGroup, | |
nodeShardPlacementGroup)) | |
{ | |
nodeShardPlacementGroup = NULL; | |
break; | |
} | |
nodeShardPlacementGroup = shardPlacementGroup; | |
shardPlacementGroupNeedsSeparateNode = shardInterval->needsSeparateNode; | |
heapTuple = systable_getnext(scanDescriptor); | |
} | |
HeapTuple heapTuple = NULL; | |
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) | |
{ | |
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPlacement); | |
GroupShardPlacement *placement = | |
TupleToGroupShardPlacement(tupleDescriptor, heapTuple); | |
ShardInterval *shardInterval = LoadShardInterval(placement->shardId); | |
Oid citusTableId = shardInterval->relationId; | |
if (!IsCitusTableType(citusTableId, DISTRIBUTED_TABLE)) | |
{ | |
continue; | |
} | |
ShardPlacementGroup *shardPlacementGroup = | |
GetShardPlacementGroupForPlacement(placement->shardId, | |
placement->placementId); | |
if (nodeShardPlacementGroup && | |
!ShardPlacementGroupsSame(shardPlacementGroup, | |
nodeShardPlacementGroup)) | |
{ | |
nodeShardPlacementGroup = NULL; | |
break; | |
} | |
nodeShardPlacementGroup = shardPlacementGroup; | |
shardPlacementGroupNeedsSeparateNode = shardInterval->needsSeparateNode; | |
} |
This allows easy changes in break
/continue
with evidently correct behaviour
if (nodeShardPlacementGroup && | ||
!ShardPlacementGroupsSame(shardPlacementGroup, | ||
nodeShardPlacementGroup)) | ||
{ | ||
nodeShardPlacementGroup = NULL; | ||
break; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A small comment above here would be great to document why we are breaking out and setting nodeShardPlacementGroup
to NULL
.
I assume it is because this function is checking if there is exactly one nodeShardPlacementGroup
on this node, if we find a different one this invariant is broken.
!ShardPlacementGroupsSame(shardPlacementGroup, | ||
nodeShardPlacementGroup)) | ||
{ | ||
nodeShardPlacementGroup = NULL; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to set shardPlacementGroupNeedsSeparateNode
to false here.
If the first group we find had needsSeparateNode
set this variable could have been left true causing the early escape to not trigger.
It seems that the code would not crash terribly, and might actually work correctly, but the flow doesn't feel natural now, as the idea could arise that nodeShardPlacementGroup
is always non-NULL
when shardPlacementGroupNeedsSeparateNode
is true.
|
||
/* | ||
* For a new colocation group, needsseparatenode is set to false for | ||
* all shards. | ||
*/ | ||
needsSeparateNodeForPlacementList = GenerateListFromIntElement(false, shardCount); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any specific reason to use a list of int's to denote a boolean array instead of using a bitmap? Not saying it is wrong, but it feels strange. Wonder if there is any specific reason for this use of a List?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no specific reason for doing this indeed, just because workersForPlacementList is of type List *
too.
uint32 colocatationId; | ||
int shardIntervalIndex; | ||
int32 nodeGroupId; | ||
} ShardPlacementGroup; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to revisit this name. Because it signals a placement group, which is a bit confusing with nodegroup.
Would it make sense to call this a ShardgroupPlacement
and use this to introduce a bit of the naming around Shardgroups
(take note of the lowercase group here).
I think the first two members uniquely identify what we internally started talking about as Shardgroups
.
By not having the group written with a capital it makes it clear what the group is talking about, contrary to its current name.
/*------------------------------------------------------------------------- | ||
* | ||
* rebalancer_placement_isolation.c | ||
* Routines to determine which worker node should be used to separate | ||
* a colocated set of shard placements that need separate nodes. | ||
* | ||
* Copyright (c) Citus Data, Inc. | ||
* | ||
*------------------------------------------------------------------------- | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file needs serious thoughts about naming and concepts. It is very hard to follow what is going on.
*/ | ||
bool | ||
NodeCanHaveDistTablePlacements(WorkerNode *node) | ||
NodeCanBeUsedForNonIsolatedPlacements(WorkerNode *node) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isolated is probably a left over from the earlier itteration, I think you call it separated throught the rest of the code now.
@@ -145,6 +146,8 @@ typedef struct RebalanceContext | |||
FmgrInfo shardCostUDF; | |||
FmgrInfo nodeCapacityUDF; | |||
FmgrInfo shardAllowedOnNodeUDF; | |||
|
|||
RebalancerPlacementIsolationContext *rebalancerPlacementGroupIsolationContext; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given we are in a rebalance context I think we don't have to be that verbose in the member name
RebalancerPlacementIsolationContext *rebalancerPlacementGroupIsolationContext; | |
RebalancerPlacementIsolationContext *shardgroupSeparationContext; |
src/backend/distributed/operations/rebalancer_placement_separation.c
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After the cleanup of the rebalancer placement separation logic (thanks!) it became clear that there are logical issues in the algorithm used to serapate shardgroups to nodes. Please see elaborate comment in InitRebalancerPlacementSeparationContext
for an analysis and reproducable example of failure to separate due to logical issues.
|
||
bool needsSeparateNode = false; | ||
if (expectNeedsSeparateNode) | ||
{ | ||
PG_ENSURE_ARGNOTNULL(5, "needs separate node"); | ||
needsSeparateNode = PG_GETARG_BOOL(5); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do that to support rolling upgrades to some degree, where while the coordinator and some workers have Citus version X, some other workers have Citus version Y, where Y > X.
Do our managed services guarantee all workers are upgraded before the coordinator is?
Oid colocatedTableId = InvalidOid; | ||
foreach_oid(colocatedTableId, colocatedTableList) | ||
{ | ||
LockRelationOid(colocatedTableId, ShareUpdateExclusiveLock); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we comment this section with the reason for locking?
What concurrent actions are we guarding, and why do we need to lock all relations in the colocation group for that? even after already acquiring the `PlacementColocationLock`` before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we at least wouldn't want relations to get dropped when setting a property for the shards. Maybe this should simply be AccessShareLock?
foreach_ptr(colocatedShardInterval, colocatedShardIntervalList) | ||
{ | ||
ShardSetNeedsSeparateNode(colocatedShardInterval->shardId, | ||
enabled); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so the most problematic of all is the amount we increment the command counter here. For an operation like this we should do 1 batch update and 1 command increment.
Don't we already have the logical relation id's earlier that we can use to send the invalidations?
I think there is a big opportunity here to refactor, which we might want to do in it's own PR for the rest of the code. For now we could refactor this by inlining, and using the same update loop to create a list of all logical rel id's that needs invalidation. Pass that list to a new, specialized function to batch invalidate the caches for a list of relid's.
Later we should go through our code, because I think there might be many places where we are invalidating those caches for all colocated tables at once. We could reuse the introduced function to invalidate a list of relid's at once. Ideally this would remove the need to invalidate shardid's at all, if not we could batchify that as well.
* that given shard belongs to. | ||
*/ | ||
Datum | ||
citus_internal_shard_group_set_needsseparatenode(PG_FUNCTION_ARGS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In different work I am adding the concept of a shardgroup
, given the difference in arguments I suggest keeping this function named shard
and refrain from the group name in API's if we are still only passing shardid's. Any reason this function breaks with the common pattern where the internal metadata functions are simple wrappers around insert/updating catalog entries and perform lookups here to find colocated shards? I would suggest making this function work on a single shardid and call it for every shard, driving the fanout behaviour from the coordinator.
the function is very specific, possibly too specific. Any reason to have needsseparatenode
in the function name, compared to move it as an argument? The only reason I could think of is as to be able to deprecate this 'easily' once shardgroups are introduced. Given this shard based parameter moves to shardgroups by then, it would scream for an explanation in the comment of the function though.
lastly, in orthogonal work I am introducting the concept of a shardgroup (one word). The naming of this function, given it works on shardid's might become confusing given it talks about shard group (two words) with a different identifier.
I would change this function to reflect it works on shards, takes needsseparatenode
as an argument, and fill out all arguments based on the updated state on the coordinator (assuming the functionality might extend over time to add more configurations parameters to shards).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest making this function work on a single shardid and call it for every shard, driving the fanout behaviour from the coordinator.
But wouldn't this be super inefficient based on our discussion?
#7193 (comment)
The naming of this function, given it works on shardid's might become confusing given it talks about shard group (two words) with a different identifier.
I would suggest making this function work on a single shardid and call it for every shard, driving the fanout behaviour from the coordinator.
I understand all those concerns, but I'm not really sure if it's a good idea to send citus_internal_shard_group_set_needsseparatenode command to workers N times for N colocated shards. At least, it'd be inconsistent with what we're doing in local node.
the function is very specific, possibly too specific. Any reason to have needsseparatenode in the function name, compared to move it as an argument?
No particular reasons, let me make it a function argument and rename the function to citus_internal_shard_property_set? It'd still perform the updates on all the colocated shards but this is also the case for citus_shard_property_set. (citus_shard_property_set also takes a shardid but operates on all the colocated shards).
/* | ||
* citus_shard_property_set allows setting shard properties for all | ||
* the shards within the shard group that given shard belongs to. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assumes all properties will target shardgroups, which will probably not be the case. Once shardgroups are formally introduced we should move needs_separate_node
/anti_affinity
to a function targetting shardgroups. This function could stick around for shard specific properties.
Unfortunately we can't introduce the shardgroup based function yet as we don't have a way to address shardgroups. It is unclear if we get that before we release. If so we can refactor this work to move to shardgroups directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once shardgroups are formally introduced we should move needs_separate_node/anti_affinity to a function targetting shardgroups.
👍
FmgrInfo *shardAllowedOnNodeUDF) | ||
{ | ||
HTAB *nodePlacementGroupHash = | ||
CreateSimpleHashWithNameAndSize(uint32, NodeToPlacementGroupHashEntry, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the type of the key differs from the type of the first field in the struct.
CreateSimpleHashWithNameAndSize(uint32, NodeToPlacementGroupHashEntry, | |
CreateSimpleHashWithNameAndSize(int32, NodeToPlacementGroupHashEntry, |
list_length(activeWorkerNodeList)); | ||
|
||
RebalancerPlacementSeparationContext *context = | ||
palloc(sizeof(RebalancerPlacementSeparationContext)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We like zero initialized values.
palloc(sizeof(RebalancerPlacementSeparationContext)); | |
palloc0(sizeof(RebalancerPlacementSeparationContext)); |
nodePlacementGroupHashEntry->shouldHaveShards = | ||
workerNode->shouldHaveShards; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this can be formatted on a single line
nodePlacementGroupHashEntry->shouldHaveShards = | |
workerNode->shouldHaveShards; | |
nodePlacementGroupHashEntry->shouldHaveShards = workerNode->shouldHaveShards; |
locally running citus_indent
shows that it fits.
/* | ||
* Lets call set of the nodes that placements in rebalancePlacementList | ||
* are stored on as D and the others as S. In other words, D is the set | ||
* of the nodes that we're allowed to move the placements "from" or | ||
* "to (*)" (* = if we're not draining it) and S is the set of the nodes | ||
* that we're only allowed to move the placements "to" but not "from". | ||
* | ||
* This means that, for a node of type S, the fact that whether the node | ||
* is used to separate a placement group or not cannot be changed in the | ||
* runtime. | ||
* | ||
* For this reason, below we find out the assigned placement groups for | ||
* nodes of type S because we want to avoid from moving the placements | ||
* (if any) from a node of type D to a node that is used to separate a | ||
* placement group within S. We also set hasPlacementsThatCannotBeMovedAway | ||
* to true for the nodes that already have some shard placements within S | ||
* because we want to avoid from moving the placements that need a separate | ||
* node (if any) from node D to node S. | ||
* | ||
* We skip below code for nodes of type D not because optimization purposes | ||
* but because it would be "incorrect" to assume that "current placement | ||
* distribution for a node of type D would be the same" after the rebalancer | ||
* plans the moves. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still find this comment incomprehensable.
As far as I understand the code:
- Nodes of type S do not have any placements on them and are allowed to have placements on them, eg. an empty node
- Nodes of type D have placements on them, and are allowed to have placements on them
What is encapsulated in the comment above that is not covered by my two lines?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nodes of type S do not have any placements on them and are allowed to have placements on them, eg. an empty node
S is not necessarily empty. S means a node that doesn't store any of input placements and hence, the ones that we cannot move the shards away "from".
Nodes of type D have placements on them, and are allowed to have placements on them
Yes. Even more, D actually corresponds to the set of nodes that contain input placements.
nodePlacementGroupHashEntry->hasPlacementsThatCannotBeMovedAway = | ||
NodeGroupHasDistributedTableShardPlacements( | ||
nodePlacementGroupHashEntry->nodeGroupId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It took me quite a while to understand why it has placements that cannot be moved away was chosen as this name.
I now think I understand it a bit.
Given we didn't find the groupId in the list of rebalancePlacementList
, containing the list of shards to rebalance, the view of our node is 'empty'. Howver, due to the stupidity of the Citus rebalancer, you are allowed to call the rebalancer on a subset of that shards in the system, limiting the rebalancing to a single set of colocated tables. Meaning, us not seeing the nodeGroupId
in the list of placements up for rebalancing, doesn't mean the node is empty.
This variable therefor does not contain placements that cannot be moved, instead the node contains shards for colocation groups that we are currently not rebalancing.
I guess this means that, since colocation groups having placements on this node are not rebalanced we can currently not use this node as a node to place a separated shardgoup on.
I think we need to dumb down to comments on this function to describe the above, instead of defining nodes as D or S
Also the field hasPlacementsThatCannotBeMovedAway
should be renamed or atleast have a more desciptive comment on it. A name that works better would be hasPlacementsOutsideCurrentRebalance
.
Lastly, we never set this parameter to true if we found this node in the list of placements we are rebalancing. It doesn't mean that if the node is found in that list it doesn't contain shards that are outside of this rebalance, eg. I think not setting this value for nodes you define as type D is incorrect. This seems to be corroborated by the checks we perform in TryAssignPlacementGroupsToNodegroups
.
Lets say we have two colocation groups with 4 shards respectively:
C1 => SG1, SG2, SG3, SG4
C2 => SG5, SG6, SG7, SG8
3 Nodes
N1 => SG1, SG3, SG5, SG7
N2 => SG2, SG4, SG6, SG8
N3 => newnode
if we are rebalancing C1
we will find nodes N1
and N2
in rebalancePlacementList
, designating them as D
only N3
is now designated as S
.
N3
doesn't contain any shards that 'can't be moved' or as I describe them 'having shards outside of rebalance'. However N1
and N2
do have them.
Now assume we are separating SG1
, we call TryAssignPlacementGroupsToNodeGroups
which iterates over SG1. We call TryAssignPlacementGroupToNode
for SG1
on its current node N1
. Since we assumed N1
to be of type D
we have never set hasPlacementsThatCannotBeMovedAway
to true, even though SG5
and SG7
are on the node. This allows TryAssignPlacementGroupToNode
to assign SG1
to this node, instead of moving it to the newly added node N3
.
Experimenting the above scenario with citus_dev
shows that this is indeed the case:
citus_dev make citus --size 3
-- create 2 tables with 4 shards each in 2 colocation groups balanced over 2 nodes in a 3 node cluster
SELECT citus_set_node_property('localhost', 9703, 'shouldhaveshards', false);
CREATE TABLE foo( a bigserial primary key, b text);
CREATE TABLE bar( a bigserial primary key, b text);
SELECT create_distributed_table('foo','a',shard_count=>4,colocate_with=>'none');
SELECT create_distributed_table('bar','a',shard_count=>4,colocate_with=>'none');
SELECT citus_set_node_property('localhost', 9703, 'shouldhaveshards', true);
-- separate the shard for tenant 5
SELECT get_shard_id_for_distribution_column('foo', 5); -- 102104 in my case (shadindex1 or SG1 from the above scenario)
SELECT citus_shard_property_set(get_shard_id_for_distribution_column('foo', 5), anti_affinity => true);
-- rebalance the colocation group where foo is in
SELECT rebalance_table_shards('foo');
-- NOTICE: Moving shard 102106 from localhost:9701 to localhost:9703 ...
-- (sharindex 3 or SG3 from the above scenario)
SELECT * FROM citus_shards;
-- shows that no shard has `has_seprate_node` set to true
As you will see, a shard from foo
that doesn't need isolation gets isolated to the newly added node, instead of the one that actually needs isolation, leaving the one that we want isolated left over on a node with shardgroups from different colocation groups.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lastly, we never set this parameter to true if we found this node in the list of placements we are rebalancing. It doesn't mean that if the node is found in that list it doesn't contain shards that are outside of this rebalance
Right, this is an outcome of the algorithm I realized last week and wishing to talk to you as well. However, I'm not sure if this would imply an important problem. Maybe we can tell users that shard separation rules may not become effective unless the rebalancer is invoked for whole cluster?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a stab at getting this datastructure correctly populated: 6a64c23
c5fd89b
to
f6b9b41
Compare
@@ -0,0 +1,21 @@ | |||
SET client_min_messages TO WARNING; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this PR should also add tests to src/test/regress/sql/shard_rebalancer_unit.sql
, to show how the rebalancer plans moves in relation with isolation of placements.
f6b9b41
to
6b9ba04
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the correct way of updating add_shard_metadata UDF? In other words, should we update definitions of it in both schemas?
DROP FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text); | ||
#include "udfs/citus_internal_add_shard_metadata/12.2-1.sql" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
regarding https://github.com/citusdata/citus/pull/7193/files#r1484111841, is this correct?
DROP FUNCTION citus_internal.add_shard_metadata(regclass, bigint, "char", text, text, boolean); | ||
DROP FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text, boolean); | ||
#include "../udfs/citus_internal_add_shard_metadata/10.2-1.sql" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
regarding https://github.com/citusdata/citus/pull/7193/files#r1484111841, is this correct?
2dc12d2
to
0fcd726
Compare
0fcd726
to
f1f72f3
Compare
citus_shard_property_set() accepts optional "anti_affinity" parameter such that it marks all shards that are colocated with the given shard as "needs to be separated" in pg_dist_shard via new "needsseparatenode" column, when "anti_affinity" is set to true. Likewise, setting "anti_affinity" to false undos the effect mentioned.
And when "needsseparatenode" column is set for a shard, then the placements of the shard and the other shard placements that are colocated with them will be placed on nodes in a way that no other shard placement will be placed on those nodes upon the next invocation of shard rebalancer. Say, if the shard group has two shard placement groups S1 & S2, then while S1 will be stored on node NX and S2 will be stored on node NY, no other shard placement will be placed on NX or NY.
The rebalancer will try to place the placements that need to be separated on the nodes where they stay currently. If that's not possible, then it will try to place them on other worker nodes.
This feature would be useful to separate distributed schemas to individual nodes. We can add very simple wrappers around citus_shard_property_set() to mark a shard-group that corresponds to given distributed schema as "needs to be separated" or as "doesn't need to be separated".
When citus_shard_property_set() is invoked with "anti_affinity=>true", we mark all the shards within the related shard-group as "needs to be separated" just to avoid from forgetting about whether a shard-group needs separate node. If we did so for a random shard within the shard-group, then dropping the distributed table that owns that shard would cause us forgeting about the separation requirement. If we had shard-group concept in the code, then we wouldn't need to do so. Likely, whenever a distributed table is created in a way that a new shard is added into an existing shard-group that needs a separate node, we mark that new shard as "needs to be separated" too.
Note that citus_shard_property_set() cannot be run concurrently with distributed table creation, shard transfer or any other operation that targets the same colocation group and that needs to acquire placement colocation lock (via AcquirePlacementColocationLock()).
This PR also adds "has_separate_node" column to "citus_shards", such that; if "has_separate_node" is true, then the node that contains the shard only contains the shards that belong to the same shard group.
Design choices:
DESCRIPTION: Adds
citus_shard_property_set()
UDF to allow separating placements of given shard from others in the cluster based onanti_affinity
parameter.