Skip to content

Commit

Permalink
Allow isolating shard placement groups on individual nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
onurctirtir committed Sep 12, 2023
1 parent e0683aa commit 697908a
Show file tree
Hide file tree
Showing 32 changed files with 1,068 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
colocatedTableId = ColocatedTableId(colocationId);
}

List *workerNodeList = DistributedTablePlacementNodeList(NoLock);
List *workerNodeList = NewDistributedTablePlacementNodeList(NoLock);
if (workerNodeList == NIL)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
Expand Down
36 changes: 23 additions & 13 deletions src/backend/distributed/metadata/metadata_sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -1415,12 +1415,13 @@ ColocationIdUpdateCommand(Oid relationId, uint32 colocationId)
*/
char *
PlacementUpsertCommand(uint64 shardId, uint64 placementId,
uint64 shardLength, int32 groupId)
uint64 shardLength, int32 groupId,
bool needsIsolatedNode)
{
StringInfo command = makeStringInfo();

appendStringInfo(command, UPSERT_PLACEMENT, shardId, shardLength,
groupId, placementId);
groupId, placementId, needsIsolatedNode ? "true" : "false");

return command->data;
}
Expand Down Expand Up @@ -3444,16 +3445,18 @@ citus_internal_add_placement_metadata(PG_FUNCTION_ARGS)
int64 shardLength = PG_GETARG_INT64(1);
int32 groupId = PG_GETARG_INT32(2);
int64 placementId = PG_GETARG_INT64(3);
bool needsIsolatedNode = PG_GETARG_BOOL(4);

citus_internal_add_placement_metadata_internal(shardId, shardLength,
groupId, placementId);
groupId, placementId,
needsIsolatedNode);

PG_RETURN_VOID();
}


/*
* citus_internal_add_placement_metadata is an internal UDF to
* citus_internal_delete_placement_metadata is an internal UDF to
* delete a row from pg_dist_placement.
*/
Datum
Expand Down Expand Up @@ -3483,12 +3486,15 @@ citus_internal_add_placement_metadata_legacy(PG_FUNCTION_ARGS)
CheckCitusVersion(ERROR);

int64 shardId = PG_GETARG_INT64(0);
int64 shardLength = PG_GETARG_INT64(2);
int32 groupId = PG_GETARG_INT32(3);
int64 placementId = PG_GETARG_INT64(4);
int64 shardLength = PG_GETARG_INT64(1);
int32 groupId = PG_GETARG_INT32(2);
int64 placementId = PG_GETARG_INT64(3);

bool needsIsolatedNode = false;
citus_internal_add_placement_metadata_internal(shardId, shardLength,
groupId, placementId);
groupId, placementId,
needsIsolatedNode);

PG_RETURN_VOID();
}

Expand All @@ -3499,7 +3505,8 @@ citus_internal_add_placement_metadata_legacy(PG_FUNCTION_ARGS)
*/
void
citus_internal_add_placement_metadata_internal(int64 shardId, int64 shardLength,
int32 groupId, int64 placementId)
int32 groupId, int64 placementId,
bool needsIsolatedNode)
{
bool missingOk = false;
Oid relationId = LookupShardRelationFromCatalog(shardId, missingOk);
Expand All @@ -3524,7 +3531,8 @@ citus_internal_add_placement_metadata_internal(int64 shardId, int64 shardLength,
shardLength, groupId);
}

InsertShardPlacementRow(shardId, placementId, shardLength, groupId);
InsertShardPlacementRow(shardId, placementId, shardLength, groupId,
needsIsolatedNode);
}


Expand Down Expand Up @@ -4100,12 +4108,14 @@ UpdateNoneDistTableMetadataCommand(Oid relationId, char replicationModel,
*/
char *
AddPlacementMetadataCommand(uint64 shardId, uint64 placementId,
uint64 shardLength, int32 groupId)
uint64 shardLength, int32 groupId,
bool needsIsolatedNode)
{
StringInfo command = makeStringInfo();
appendStringInfo(command,
"SELECT citus_internal_add_placement_metadata(%ld, %ld, %d, %ld)",
shardId, shardLength, groupId, placementId);
"SELECT citus_internal_add_placement_metadata(%ld, %ld, %d, %ld, %s)",
shardId, shardLength, groupId, placementId,
needsIsolatedNode ? "true" : "false");
return command->data;
}

Expand Down
212 changes: 180 additions & 32 deletions src/backend/distributed/metadata/metadata_utility.c
Original file line number Diff line number Diff line change
Expand Up @@ -1344,6 +1344,108 @@ ShardLength(uint64 shardId)
}


/*
* NodeGroupIsolatesAShardPlacementGroup returns true if given node group is
* used to isolate a shard placement group. This means that the node group
* has a shard placement group that needs an isolated node and no other shard
* placements.
*/
bool
NodeGroupIsolatesAShardPlacementGroup(int32 groupId)
{
ShardPlacementGroup *isolatedShardPlacementGroup = NULL;

bool indexOK = false;
int scanKeyCount = 1;
ScanKeyData scanKey[1];

Relation pgDistPlacement = table_open(DistPlacementRelationId(),
AccessShareLock);

ScanKeyInit(&scanKey[0], Anum_pg_dist_placement_groupid,
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(groupId));
SysScanDesc scanDescriptor = systable_beginscan(pgDistPlacement,
DistPlacementGroupidIndexId(),
indexOK,
NULL, scanKeyCount, scanKey);

HeapTuple heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPlacement);

GroupShardPlacement *placement =
TupleToGroupShardPlacement(tupleDescriptor, heapTuple);

ShardPlacementGroup *shardPlacementGroup =
GetShardPlacementGroupForPlacement(placement->shardId,
placement->placementId);

if (isolatedShardPlacementGroup)
{
if (ShardPlacementGroupCompare(shardPlacementGroup,
isolatedShardPlacementGroup) != 0)
{
isolatedShardPlacementGroup = NULL;
break;
}
}
else if (placement->needsIsolatedNode)
{
isolatedShardPlacementGroup = shardPlacementGroup;
}

heapTuple = systable_getnext(scanDescriptor);
}

systable_endscan(scanDescriptor);
table_close(pgDistPlacement, NoLock);

return isolatedShardPlacementGroup != NULL;
}


/*
* ShardPlacementGroupCompare is a comparator function for
* ShardPlacementGroup.
*/
int
ShardPlacementGroupCompare(const void *left, const void *right)
{
const ShardPlacementGroup *leftGroup = left;
const ShardPlacementGroup *rightGroup = right;

if (leftGroup->colocatationId < rightGroup->colocatationId)
{
return -1;
}
else if (leftGroup->colocatationId > rightGroup->colocatationId)
{
return 1;
}

if (leftGroup->shardIntervalIndex < rightGroup->shardIntervalIndex)
{
return -1;
}
else if (leftGroup->shardIntervalIndex > rightGroup->shardIntervalIndex)
{
return 1;
}

if (leftGroup->nodeGroupId < rightGroup->nodeGroupId)
{
return -1;
}
else if (leftGroup->nodeGroupId > rightGroup->nodeGroupId)
{
return 1;
}

return 0;
}


/*
* NodeGroupHasShardPlacements returns whether any active shards are placed on the group
*/
Expand Down Expand Up @@ -1376,6 +1478,25 @@ NodeGroupHasShardPlacements(int32 groupId)
}


/*
* GetShardPlacementGroupForPlacement returns ShardPlacementGroup that placement
* with given shardId & placementId belongs to.
*/
ShardPlacementGroup *
GetShardPlacementGroupForPlacement(uint64 shardId, uint64 placementId)
{
ShardPlacement *shardPlacement = LoadShardPlacement(shardId, placementId);
ShardInterval *shardInterval = LoadShardInterval(shardId);

ShardPlacementGroup *placementGroup = palloc(sizeof(ShardPlacementGroup));
placementGroup->colocatationId = shardPlacement->colocationGroupId;
placementGroup->shardIntervalIndex = shardInterval->shardIndex;
placementGroup->nodeGroupId = shardPlacement->groupId;

return placementGroup;
}


/*
* IsActiveShardPlacement checks if the shard placement is labelled as
* active, and that it is placed in an active worker.
Expand Down Expand Up @@ -1447,30 +1568,6 @@ FilterShardPlacementList(List *shardPlacementList, bool (*filter)(ShardPlacement
}


/*
* FilterActiveShardPlacementListByNode filters a list of active shard placements based on given nodeName and nodePort.
*/
List *
FilterActiveShardPlacementListByNode(List *shardPlacementList, WorkerNode *workerNode)
{
List *activeShardPlacementList = FilterShardPlacementList(shardPlacementList,
IsActiveShardPlacement);
List *filteredShardPlacementList = NIL;
ShardPlacement *shardPlacement = NULL;

foreach_ptr(shardPlacement, activeShardPlacementList)
{
if (IsPlacementOnWorkerNode(shardPlacement, workerNode))
{
filteredShardPlacementList = lappend(filteredShardPlacementList,
shardPlacement);
}
}

return filteredShardPlacementList;
}


/*
* ActiveShardPlacementListOnGroup returns a list of active shard placements
* that are sitting on group with groupId for given shardId.
Expand Down Expand Up @@ -1664,6 +1761,49 @@ AllShardPlacementsOnNodeGroup(int32 groupId)
}


/*
* AllGroupShardPlacementsThatNeedIsolatedNode returns a list of group shard
* placements that need an isolated node.
*/
List *
AllGroupShardPlacementsThatNeedIsolatedNode(void)
{
List *groupShardPlacementList = NIL;

Relation pgDistPlacement = table_open(DistPlacementRelationId(), AccessShareLock);

int scanKeyCount = 0;
ScanKeyData *scanKey = NULL;
bool indexOK = false;
Oid indexId = InvalidOid;
SysScanDesc scanDescriptor = systable_beginscan(pgDistPlacement,
indexId, indexOK, NULL,
scanKeyCount, scanKey);

HeapTuple heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPlacement);

GroupShardPlacement *groupShardPlacement =
TupleToGroupShardPlacement(tupleDescriptor, heapTuple);

if (groupShardPlacement->needsIsolatedNode)
{
groupShardPlacementList = lappend(groupShardPlacementList,
groupShardPlacement);
}

heapTuple = systable_getnext(scanDescriptor);
}

systable_endscan(scanDescriptor);
table_close(pgDistPlacement, NoLock);

return groupShardPlacementList;
}


/*
* TupleToGroupShardPlacement takes in a heap tuple from pg_dist_placement,
* and converts this tuple to in-memory struct. The function assumes the
Expand All @@ -1674,9 +1814,10 @@ TupleToGroupShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple)
{
bool isNullArray[Natts_pg_dist_placement];
Datum datumArray[Natts_pg_dist_placement];
memset(datumArray, 0, sizeof(datumArray));
memset(isNullArray, false, sizeof(isNullArray));

if (HeapTupleHeaderGetNatts(heapTuple->t_data) != Natts_pg_dist_placement ||
HeapTupleHasNulls(heapTuple))
if (HeapTupleHasNulls(heapTuple))
{
ereport(ERROR, (errmsg("unexpected null in pg_dist_placement tuple")));
}
Expand All @@ -1696,6 +1837,8 @@ TupleToGroupShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple)
datumArray[Anum_pg_dist_placement_shardlength - 1]);
shardPlacement->groupId = DatumGetInt32(
datumArray[Anum_pg_dist_placement_groupid - 1]);
shardPlacement->needsIsolatedNode = DatumGetBool(
datumArray[Anum_pg_dist_placement_needsisolatednode - 1]);

return shardPlacement;
}
Expand Down Expand Up @@ -1800,12 +1943,15 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType,
*/
ShardPlacement *
InsertShardPlacementRowGlobally(uint64 shardId, uint64 placementId,
uint64 shardLength, int32 groupId)
uint64 shardLength, int32 groupId,
bool needsIsolatedNode)
{
InsertShardPlacementRow(shardId, placementId, shardLength, groupId);
InsertShardPlacementRow(shardId, placementId, shardLength, groupId,
needsIsolatedNode);

char *insertPlacementCommand =
AddPlacementMetadataCommand(shardId, placementId, shardLength, groupId);
AddPlacementMetadataCommand(shardId, placementId, shardLength, groupId,
needsIsolatedNode);
SendCommandToWorkersWithMetadata(insertPlacementCommand);

return LoadShardPlacement(shardId, placementId);
Expand All @@ -1820,7 +1966,8 @@ InsertShardPlacementRowGlobally(uint64 shardId, uint64 placementId,
*/
uint64
InsertShardPlacementRow(uint64 shardId, uint64 placementId,
uint64 shardLength, int32 groupId)
uint64 shardLength, int32 groupId,
bool needsIsolatedNode)
{
Datum values[Natts_pg_dist_placement];
bool isNulls[Natts_pg_dist_placement];
Expand All @@ -1838,6 +1985,8 @@ InsertShardPlacementRow(uint64 shardId, uint64 placementId,
values[Anum_pg_dist_placement_shardstate - 1] = Int32GetDatum(1);
values[Anum_pg_dist_placement_shardlength - 1] = Int64GetDatum(shardLength);
values[Anum_pg_dist_placement_groupid - 1] = Int32GetDatum(groupId);
values[Anum_pg_dist_placement_needsisolatednode - 1] =
BoolGetDatum(needsIsolatedNode);

/* open shard placement relation and insert new tuple */
Relation pgDistPlacement = table_open(DistPlacementRelationId(), RowExclusiveLock);
Expand Down Expand Up @@ -2075,8 +2224,7 @@ DeleteShardPlacementRow(uint64 placementId)

uint64 shardId = heap_getattr(heapTuple, Anum_pg_dist_placement_shardid,
tupleDescriptor, &isNull);
if (HeapTupleHeaderGetNatts(heapTuple->t_data) != Natts_pg_dist_placement ||
HeapTupleHasNulls(heapTuple))
if (HeapTupleHasNulls(heapTuple))
{
ereport(ERROR, (errmsg("unexpected null in pg_dist_placement tuple")));
}
Expand Down
2 changes: 0 additions & 2 deletions src/backend/distributed/metadata/node_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@
#include "utils/rel.h"
#include "utils/relcache.h"

#define INVALID_GROUP_ID -1

/* default group size */
int GroupSize = 1;

Expand Down
Loading

0 comments on commit 697908a

Please sign in to comment.