Skip to content

Commit

Permalink
reflect on feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
onurctirtir committed Sep 18, 2023
1 parent 3724336 commit bcb6a8b
Show file tree
Hide file tree
Showing 26 changed files with 450 additions and 368 deletions.
53 changes: 49 additions & 4 deletions src/backend/distributed/metadata/metadata_sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ PG_FUNCTION_INFO_V1(citus_internal_delete_colocation_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_tenant_schema);
PG_FUNCTION_INFO_V1(citus_internal_delete_tenant_schema);
PG_FUNCTION_INFO_V1(citus_internal_update_none_dist_table_metadata);
PG_FUNCTION_INFO_V1(citus_internal_shard_group_set_needsisolatednode);


static bool got_SIGTERM = false;
Expand Down Expand Up @@ -1202,7 +1203,7 @@ ShardListInsertCommand(List *shardIntervalList)
StringInfo insertPlacementCommand = makeStringInfo();
appendStringInfo(insertPlacementCommand,
"WITH placement_data(shardid, "
"shardlength, groupid, placementid) AS (VALUES ");
"shardlength, groupid, placementid, needsisolatednode) AS (VALUES ");

ShardInterval *shardInterval = NULL;
bool firstPlacementProcessed = false;
Expand All @@ -1225,19 +1226,20 @@ ShardListInsertCommand(List *shardIntervalList)
firstPlacementProcessed = true;

appendStringInfo(insertPlacementCommand,
"(%ld, %ld, %d, %ld)",
"(%ld, %ld, %d, %ld, %s)",
shardId,
placement->shardLength,
placement->groupId,
placement->placementId);
placement->placementId,
placement->needsIsolatedNode ? "true" : "false");
}
}

appendStringInfo(insertPlacementCommand, ") ");

appendStringInfo(insertPlacementCommand,
"SELECT citus_internal_add_placement_metadata("
"shardid, shardlength, groupid, placementid) "
"shardid, shardlength, groupid, placementid, needsisolatednode) "
"FROM placement_data;");

/* now add shards to insertShardCommand */
Expand Down Expand Up @@ -3903,6 +3905,33 @@ citus_internal_update_none_dist_table_metadata(PG_FUNCTION_ARGS)
}


/*
* citus_internal_shard_group_set_needsisolatednode is an internal UDF to
* set needsisolatednode flag for all the placements within the shard group
* that given shard belongs to.
*/
Datum
citus_internal_shard_group_set_needsisolatednode(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);

PG_ENSURE_ARGNOTNULL(0, "shard_id");
uint64 shardId = PG_GETARG_INT64(0);

PG_ENSURE_ARGNOTNULL(1, "enabled");
bool enabled = PG_GETARG_BOOL(1);

if (!ShouldSkipMetadataChecks())
{
EnsureCoordinatorInitiatedOperation();
}

ShardGroupSetNeedsIsolatedNode(shardId, enabled);

PG_RETURN_VOID();
}


/*
* SyncNewColocationGroup synchronizes a new pg_dist_colocation entry to a worker.
*/
Expand Down Expand Up @@ -4102,6 +4131,22 @@ UpdateNoneDistTableMetadataCommand(Oid relationId, char replicationModel,
}


/*
* ShardGroupSetNeedsIsolatedNodeCommand returns a command to call
* citus_internal_shard_group_set_needsisolatednode().
*/
char *
ShardGroupSetNeedsIsolatedNodeCommand(uint64 shardId, bool enabled)
{
StringInfo command = makeStringInfo();
appendStringInfo(command,
"SELECT pg_catalog.citus_internal_shard_group_set_needsisolatednode(%lu, %s)",
shardId, enabled ? "true" : "false");

return command->data;
}


/*
* AddPlacementMetadataCommand returns a command to call
* citus_internal_add_placement_metadata().
Expand Down
Loading

0 comments on commit bcb6a8b

Please sign in to comment.