Skip to content

Commit

Permalink
Allow isolating shard placement groups on individual nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
onurctirtir committed Nov 6, 2023
1 parent e535f53 commit f6b9b41
Show file tree
Hide file tree
Showing 71 changed files with 5,333 additions and 428 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1456,8 +1456,9 @@ InsertMetadataForCitusLocalTable(Oid citusLocalTableId, uint64 shardId,

text *shardMinValue = NULL;
text *shardMaxValue = NULL;
bool needsSeparateNode = false;
InsertShardRow(citusLocalTableId, shardId, shardStorageType,
shardMinValue, shardMaxValue);
shardMinValue, shardMaxValue, needsSeparateNode);

List *nodeList = list_make1(CoordinatorNodeIfAddedAsWorkerOrError());

Expand Down
52 changes: 44 additions & 8 deletions src/backend/distributed/commands/create_distributed_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ static char DecideDistTableReplicationModel(char distributionMethod,
static List * HashSplitPointsForShardList(List *shardList);
static List * HashSplitPointsForShardCount(int shardCount);
static List * WorkerNodesForShardList(List *shardList);
static List * NeedsSeparateNodeForShardList(List *shardList);
static List * RoundRobinWorkerNodeList(List *workerNodeList, int listLength);
static CitusTableParams DecideCitusTableParams(CitusTableType tableType,
DistributedTableParams *
Expand Down Expand Up @@ -571,16 +572,10 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
colocatedTableId = ColocatedTableId(colocationId);
}

List *workerNodeList = DistributedTablePlacementNodeList(NoLock);
if (workerNodeList == NIL)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("no worker nodes are available for placing shards"),
errhint("Add more worker nodes.")));
}

List *workersForPlacementList;
List *shardSplitPointsList;
List *needsSeparateNodeForPlacementList;


if (colocatedTableId != InvalidOid)
{
Expand All @@ -595,6 +590,12 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
* Find the node IDs of the shard placements.
*/
workersForPlacementList = WorkerNodesForShardList(colocatedShardList);

/*
* Inherit needsseparatenode from the colocated shards.
*/
needsSeparateNodeForPlacementList =
NeedsSeparateNodeForShardList(colocatedShardList);
}
else
{
Expand All @@ -606,7 +607,21 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
/*
* Place shards in a round-robin fashion across all data nodes.
*/
List *workerNodeList = NewDistributedTablePlacementNodeList(NoLock);
if (workerNodeList == NIL)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),

Check warning on line 613 in src/backend/distributed/commands/create_distributed_table.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/create_distributed_table.c#L613

Added line #L613 was not covered by tests
errmsg("no worker nodes are available for placing shards"),
errhint("Add more worker nodes.")));
}

workersForPlacementList = RoundRobinWorkerNodeList(workerNodeList, shardCount);

/*
* For a new colocation group, needsseparatenode is set to false for
* all shards.
*/
needsSeparateNodeForPlacementList = GenerateListFromIntElement(false, shardCount);
}

/*
Expand Down Expand Up @@ -645,6 +660,7 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
shardToSplit->shardId,
shardSplitPointsList,
workersForPlacementList,
needsSeparateNodeForPlacementList,
distributionColumnOverrides,
sourceColocatedShardIntervalList,
colocationId
Expand Down Expand Up @@ -897,6 +913,26 @@ WorkerNodesForShardList(List *shardList)
}


/*
* NeedsSeparateNodeForShardList returns a list of node booleans reflecting whether
* each shard in the given list needs a separate node.
*/
static List *
NeedsSeparateNodeForShardList(List *shardList)
{
List *needsSeparateNodeList = NIL;

ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, shardList)
{
needsSeparateNodeList = lappend_int(needsSeparateNodeList,
shardInterval->needsSeparateNode);
}

return needsSeparateNodeList;
}


/*
* RoundRobinWorkerNodeList round robins over the workers in the worker node list
* and adds node ids to a list of length listLength.
Expand Down
3 changes: 3 additions & 0 deletions src/backend/distributed/metadata/metadata_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -5378,6 +5378,8 @@ DeformedDistShardTupleToShardInterval(Datum *datumArray, bool *isNullArray,
char storageType = DatumGetChar(datumArray[Anum_pg_dist_shard_shardstorage - 1]);
Datum minValueTextDatum = datumArray[Anum_pg_dist_shard_shardminvalue - 1];
Datum maxValueTextDatum = datumArray[Anum_pg_dist_shard_shardmaxvalue - 1];
bool needsSeparateNode = DatumGetBool(
datumArray[Anum_pg_dist_shard_needsseparatenode - 1]);

bool minValueNull = isNullArray[Anum_pg_dist_shard_shardminvalue - 1];
bool maxValueNull = isNullArray[Anum_pg_dist_shard_shardmaxvalue - 1];
Expand Down Expand Up @@ -5414,6 +5416,7 @@ DeformedDistShardTupleToShardInterval(Datum *datumArray, bool *isNullArray,
shardInterval->minValue = minValue;
shardInterval->maxValue = maxValue;
shardInterval->shardId = shardId;
shardInterval->needsSeparateNode = needsSeparateNode;

return shardInterval;
}
Expand Down
73 changes: 68 additions & 5 deletions src/backend/distributed/metadata/metadata_sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ PG_FUNCTION_INFO_V1(citus_internal_delete_colocation_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_tenant_schema);
PG_FUNCTION_INFO_V1(citus_internal_delete_tenant_schema);
PG_FUNCTION_INFO_V1(citus_internal_update_none_dist_table_metadata);
PG_FUNCTION_INFO_V1(citus_internal_shard_property_set);


static bool got_SIGTERM = false;
Expand Down Expand Up @@ -1244,7 +1245,7 @@ ShardListInsertCommand(List *shardIntervalList)
StringInfo insertShardCommand = makeStringInfo();
appendStringInfo(insertShardCommand,
"WITH shard_data(relationname, shardid, storagetype, "
"shardminvalue, shardmaxvalue) AS (VALUES ");
"shardminvalue, shardmaxvalue, needsseparatenode) AS (VALUES ");

foreach_ptr(shardInterval, shardIntervalList)
{
Expand Down Expand Up @@ -1276,12 +1277,13 @@ ShardListInsertCommand(List *shardIntervalList)
}

appendStringInfo(insertShardCommand,
"(%s::regclass, %ld, '%c'::\"char\", %s, %s)",
"(%s::regclass, %ld, '%c'::\"char\", %s, %s, %s)",
quote_literal_cstr(qualifiedRelationName),
shardId,
shardInterval->storageType,
minHashToken->data,
maxHashToken->data);
maxHashToken->data,
shardInterval->needsSeparateNode ? "true" : "false");

if (llast(shardIntervalList) != shardInterval)
{
Expand All @@ -1293,7 +1295,7 @@ ShardListInsertCommand(List *shardIntervalList)

appendStringInfo(insertShardCommand,
"SELECT citus_internal_add_shard_metadata(relationname, shardid, "
"storagetype, shardminvalue, shardmaxvalue) "
"storagetype, shardminvalue, shardmaxvalue, needsseparatenode) "
"FROM shard_data;");

/*
Expand Down Expand Up @@ -3245,6 +3247,9 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS)
shardMaxValue = PG_GETARG_TEXT_P(4);
}

PG_ENSURE_ARGNOTNULL(5, "needs separate node");

Check warning on line 3250 in src/backend/distributed/metadata/metadata_sync.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/metadata/metadata_sync.c#L3250

Added line #L3250 was not covered by tests
bool needsSeparateNode = PG_GETARG_BOOL(5);

/* only owner of the table (or superuser) is allowed to add the Citus metadata */
EnsureTableOwner(relationId);

Expand All @@ -3265,7 +3270,8 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS)
shardMaxValue);
}

InsertShardRow(relationId, shardId, storageType, shardMinValue, shardMaxValue);
InsertShardRow(relationId, shardId, storageType, shardMinValue, shardMaxValue,
needsSeparateNode);

PG_RETURN_VOID();
}
Expand Down Expand Up @@ -3895,6 +3901,45 @@ citus_internal_update_none_dist_table_metadata(PG_FUNCTION_ARGS)
}


/*
* citus_internal_shard_property_set is an internal UDF to
* set shard properties for all the shards within the shard group
* that given shard belongs to.
*/
Datum
citus_internal_shard_property_set(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);

PG_ENSURE_ARGNOTNULL(0, "shard_id");

Check warning on line 3914 in src/backend/distributed/metadata/metadata_sync.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/metadata/metadata_sync.c#L3914

Added line #L3914 was not covered by tests
uint64 shardId = PG_GETARG_INT64(0);

/* only owner of the table (or superuser) is allowed to modify the Citus metadata */
Oid distributedRelationId = RelationIdForShard(shardId);
EnsureTableOwner(distributedRelationId);

/* we want to serialize all the metadata changes to this table */
LockRelationOid(distributedRelationId, ShareUpdateExclusiveLock);

if (!ShouldSkipMetadataChecks())
{
EnsureCoordinatorInitiatedOperation();
}

bool *needsSeparateNodePtr = NULL;

if (!PG_ARGISNULL(1))
{
needsSeparateNodePtr = palloc(sizeof(bool));
*needsSeparateNodePtr = PG_GETARG_BOOL(1);
}

ShardgroupSetProperty(shardId, needsSeparateNodePtr);

PG_RETURN_VOID();
}


/*
* SyncNewColocationGroup synchronizes a new pg_dist_colocation entry to a worker.
*/
Expand Down Expand Up @@ -4094,6 +4139,24 @@ UpdateNoneDistTableMetadataCommand(Oid relationId, char replicationModel,
}


/*
* ShardgroupSetPropertyCommand returns a command to call
* citus_internal_shard_property_set().
*/
char *
ShardgroupSetPropertyCommand(uint64 shardId, bool *needsSeparateNodePtr)
{
char *needsSeparateNodeStr = !needsSeparateNodePtr ? "null" :
(*needsSeparateNodePtr ? "true" : "false");
StringInfo command = makeStringInfo();
appendStringInfo(command,
"SELECT pg_catalog.citus_internal_shard_property_set(%lu, %s)",
shardId, needsSeparateNodeStr);

return command->data;
}


/*
* AddPlacementMetadataCommand returns a command to call
* citus_internal_add_placement_metadata().
Expand Down
Loading

0 comments on commit f6b9b41

Please sign in to comment.