Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow citus_*_size on index related to a distributed table #7271

Merged
merged 21 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 91 additions & 45 deletions src/backend/distributed/metadata/metadata_utility.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "access/sysattr.h"
#include "access/xact.h"
#include "catalog/dependency.h"
#include "catalog/index.h"
#include "catalog/indexing.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_constraint.h"
Expand Down Expand Up @@ -88,11 +89,11 @@ static uint64 * AllocateUint64(uint64 value);
static void RecordDistributedRelationDependencies(Oid distributedRelationId);
static GroupShardPlacement * TupleToGroupShardPlacement(TupleDesc tupleDesc,
HeapTuple heapTuple);
static bool DistributedTableSize(Oid relationId, SizeQueryType sizeQueryType,
bool failOnError, uint64 *tableSize);
static bool DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
SizeQueryType sizeQueryType, bool failOnError,
uint64 *tableSize);
static bool DistributedRelationSize(Oid relationId, SizeQueryType sizeQueryType,
bool failOnError, uint64 *relationSize);
static bool DistributedRelationSizeOnWorker(WorkerNode *workerNode, Oid relationId,
SizeQueryType sizeQueryType, bool failOnError,
uint64 *relationSize);
static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId);
static char * GenerateShardIdNameValuesForShardList(List *shardIntervalList,
bool firstValue);
Expand Down Expand Up @@ -294,15 +295,15 @@ citus_total_relation_size(PG_FUNCTION_ARGS)
bool failOnError = PG_GETARG_BOOL(1);

SizeQueryType sizeQueryType = TOTAL_RELATION_SIZE;
uint64 tableSize = 0;
uint64 relationSize = 0;
c2main marked this conversation as resolved.
Show resolved Hide resolved

if (!DistributedTableSize(relationId, sizeQueryType, failOnError, &tableSize))
if (!DistributedRelationSize(relationId, sizeQueryType, failOnError, &relationSize))
{
Assert(!failOnError);
PG_RETURN_NULL();
}

PG_RETURN_INT64(tableSize);
PG_RETURN_INT64(relationSize);
}


Expand All @@ -318,20 +319,21 @@ citus_table_size(PG_FUNCTION_ARGS)
Oid relationId = PG_GETARG_OID(0);
bool failOnError = true;
SizeQueryType sizeQueryType = TABLE_SIZE;
uint64 tableSize = 0;
uint64 relationSize = 0;
c2main marked this conversation as resolved.
Show resolved Hide resolved

if (!DistributedTableSize(relationId, sizeQueryType, failOnError, &tableSize))
/* We do not check if relation is really a table, like PostgreSQL is doing. */
if (!DistributedRelationSize(relationId, sizeQueryType, failOnError, &relationSize))
{
Assert(!failOnError);
PG_RETURN_NULL();
}

PG_RETURN_INT64(tableSize);
PG_RETURN_INT64(relationSize);
}


/*
* citus_relation_size accept a table name and returns a relation's 'main'
* citus_relation_size accept a relation name and returns a relation's 'main'
c2main marked this conversation as resolved.
Show resolved Hide resolved
* fork's size.
c2main marked this conversation as resolved.
Show resolved Hide resolved
*/
Datum
Expand All @@ -344,7 +346,7 @@ citus_relation_size(PG_FUNCTION_ARGS)
SizeQueryType sizeQueryType = RELATION_SIZE;
uint64 relationSize = 0;

if (!DistributedTableSize(relationId, sizeQueryType, failOnError, &relationSize))
if (!DistributedRelationSize(relationId, sizeQueryType, failOnError, &relationSize))
{
Assert(!failOnError);
PG_RETURN_NULL();
Expand Down Expand Up @@ -506,13 +508,15 @@ ReceiveShardIdAndSizeResults(List *connectionList, Tuplestorestate *tupleStore,


/*
* DistributedTableSize is helper function for each kind of citus size functions.
* It first checks whether the table is distributed and size query can be run on
* it. Connection to each node has to be established to get the size of the table.
* DistributedRelationSize is helper function for each kind of citus size
* functions. It first checks whether the relation is a distributed table or an
* index belonging to a distributed table and size query can be run on it.
* Connection to each node has to be established to get the size of the
* relation.
*/
c2main marked this conversation as resolved.
Show resolved Hide resolved
static bool
DistributedTableSize(Oid relationId, SizeQueryType sizeQueryType, bool failOnError,
uint64 *tableSize)
DistributedRelationSize(Oid relationId, SizeQueryType sizeQueryType,
bool failOnError, uint64 *relationSize)
{
int logLevel = WARNING;

Expand All @@ -538,7 +542,7 @@ DistributedTableSize(Oid relationId, SizeQueryType sizeQueryType, bool failOnErr
if (relation == NULL)
{
ereport(logLevel,
(errmsg("could not compute table size: relation does not exist")));
(errmsg("could not compute relation size: relation does not exist")));

return false;
}
Expand All @@ -553,8 +557,9 @@ DistributedTableSize(Oid relationId, SizeQueryType sizeQueryType, bool failOnErr
{
uint64 relationSizeOnNode = 0;

bool gotSize = DistributedTableSizeOnWorker(workerNode, relationId, sizeQueryType,
failOnError, &relationSizeOnNode);
bool gotSize = DistributedRelationSizeOnWorker(workerNode, relationId,
sizeQueryType,
failOnError, &relationSizeOnNode);
if (!gotSize)
{
return false;
Expand All @@ -563,21 +568,21 @@ DistributedTableSize(Oid relationId, SizeQueryType sizeQueryType, bool failOnErr
sumOfSizes += relationSizeOnNode;
}

*tableSize = sumOfSizes;
*relationSize = sumOfSizes;

return true;
}


/*
* DistributedTableSizeOnWorker gets the workerNode and relationId to calculate
* DistributedRelationSizeOnWorker gets the workerNode and relationId to calculate
* size of that relation on the given workerNode by summing up the size of each
* shard placement.
*/
c2main marked this conversation as resolved.
Show resolved Hide resolved
static bool
DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
SizeQueryType sizeQueryType,
bool failOnError, uint64 *tableSize)
DistributedRelationSizeOnWorker(WorkerNode *workerNode, Oid relationId,
SizeQueryType sizeQueryType,
bool failOnError, uint64 *relationSize)
{
int logLevel = WARNING;

Expand All @@ -591,28 +596,39 @@ DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
uint32 connectionFlag = 0;
PGresult *result = NULL;

/* if the relation is an index, update relationId and define indexId */
Oid indexId = InvalidOid;
if (get_rel_relkind(relationId) == RELKIND_INDEX)
{
indexId = relationId;

bool missingOk = false;
relationId = IndexGetRelation(indexId, missingOk);
}

List *shardIntervalsOnNode = ShardIntervalsOnWorkerGroup(workerNode, relationId);

/*
* We pass false here, because if we optimize this, we would include child tables.
* But citus size functions shouldn't include them, like PG.
*/
bool optimizePartitionCalculations = false;
StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements(
StringInfo relationSizeQuery = GenerateSizeQueryOnMultiplePlacements(
shardIntervalsOnNode,
indexId,
sizeQueryType,
optimizePartitionCalculations);

MultiConnection *connection = GetNodeConnection(connectionFlag, workerNodeName,
workerNodePort);
int queryResult = ExecuteOptionalRemoteCommand(connection, tableSizeQuery->data,
int queryResult = ExecuteOptionalRemoteCommand(connection, relationSizeQuery->data,
&result);

if (queryResult != 0)
{
ereport(logLevel, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not connect to %s:%d to get size of "
"table \"%s\"",
"relation \"%s\"",
workerNodeName, workerNodePort,
get_rel_name(relationId))));

Expand All @@ -626,19 +642,19 @@ DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
ClearResults(connection, failOnError);

ereport(logLevel, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("cannot parse size of table \"%s\" from %s:%d",
errmsg("cannot parse size of relation \"%s\" from %s:%d",
get_rel_name(relationId), workerNodeName,
workerNodePort)));

return false;
}

StringInfo tableSizeStringInfo = (StringInfo) linitial(sizeList);
char *tableSizeString = tableSizeStringInfo->data;
StringInfo relationSizeStringInfo = (StringInfo) linitial(sizeList);
char *relationSizeString = relationSizeStringInfo->data;

if (strlen(tableSizeString) > 0)
if (strlen(relationSizeString) > 0)
{
*tableSize = SafeStringToUint64(tableSizeString);
*relationSize = SafeStringToUint64(relationSizeString);
}
else
{
Expand All @@ -647,7 +663,7 @@ DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
* being executed. For this case we get an empty string as table size.
* We can take that as zero to prevent any unnecessary errors.
*/
*tableSize = 0;
*relationSize = 0;
}

PQclear(result);
Expand Down Expand Up @@ -732,17 +748,21 @@ ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId)

/*
* GenerateSizeQueryOnMultiplePlacements generates a select size query to get
* size of multiple tables. Note that, different size functions supported by PG
* size of multiple relations. Note that, different size functions supported by PG
* are also supported by this function changing the size query type given as the
* last parameter to function. Depending on the sizeQueryType enum parameter, the
* generated query will call one of the functions: pg_relation_size,
* pg_total_relation_size, pg_table_size and cstore_table_size.
* This function uses UDFs named worker_partitioned_*_size for partitioned tables,
* if the parameter optimizePartitionCalculations is true. The UDF to be called is
* determined by the parameter sizeQueryType.
*
* indexId is provided if we're interested in the size of an index, not the whole
* table.
*/
StringInfo
GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
Oid indexId,
SizeQueryType sizeQueryType,
bool optimizePartitionCalculations)
{
Expand All @@ -766,16 +786,22 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
*/
continue;
}

/*
* We need to build the shard relation name, being an index or table ...
*/
c2main marked this conversation as resolved.
Show resolved Hide resolved
Oid objectId = OidIsValid(indexId) ? indexId : shardInterval->relationId;

uint64 shardId = shardInterval->shardId;
Oid schemaId = get_rel_namespace(shardInterval->relationId);
Oid schemaId = get_rel_namespace(objectId);
char *schemaName = get_namespace_name(schemaId);
char *shardName = get_rel_name(shardInterval->relationId);
char *shardName = get_rel_name(objectId);
AppendShardIdToName(&shardName, shardId);

char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName);
char *quotedShardName = quote_literal_cstr(shardQualifiedName);

/* for partitoned tables, we will call worker_partitioned_... size functions */
/* for partitioned tables, we will call worker_partitioned_... size functions */
if (optimizePartitionCalculations && PartitionedTable(shardInterval->relationId))
{
partitionedShardNames = lappend(partitionedShardNames, quotedShardName);
Expand Down Expand Up @@ -1010,19 +1036,39 @@ AppendShardIdNameValues(StringInfo selectQuery, ShardInterval *shardInterval)


/*
* ErrorIfNotSuitableToGetSize determines whether the table is suitable to find
* ErrorIfNotSuitableToGetSize determines whether the relation is suitable to find
* its' size with internal functions.
*/
static void
ErrorIfNotSuitableToGetSize(Oid relationId)
{
if (!IsCitusTable(relationId))
{
char *relationName = get_rel_name(relationId);
char *escapedQueryString = quote_literal_cstr(relationName);
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("cannot calculate the size because relation %s is not "
"distributed", escapedQueryString)));
if (get_rel_relkind(relationId) != RELKIND_INDEX)
{
char *relationName = get_rel_name(relationId);
char *escapedQueryString = quote_literal_cstr(relationName);
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg(
"cannot calculate the size because relation %s "
"is not distributed",
escapedQueryString)));
}
bool missingOk = false;
Oid indexId = relationId;
relationId = IndexGetRelation(relationId, missingOk);
if (!IsCitusTable(relationId))
{
char *tableName = get_rel_name(relationId);
char *escapedTableName = quote_literal_cstr(tableName);
char *indexName = get_rel_name(indexId);
char *escapedIndexName = quote_literal_cstr(indexName);
ereport(ERROR, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg(
"cannot calculate the size because table %s for "
"index %s is not distributed",
escapedTableName, escapedIndexName)));
}
c2main marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/backend/distributed/operations/shard_transfer.c
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,12 @@ ShardListSizeInBytes(List *shardList, char *workerNodeName, uint32

/* we skip child tables of a partitioned table if this boolean variable is true */
bool optimizePartitionCalculations = true;

/* we're interested in whole table, not a particular index */
Oid indexId = InvalidOid;

StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements(shardList,
indexId,
TOTAL_RELATION_SIZE,
optimizePartitionCalculations);

Expand Down
1 change: 1 addition & 0 deletions src/include/distributed/metadata_utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ extern void LookupTaskPlacementHostAndPort(ShardPlacement *taskPlacement, char *
int *nodePort);
extern bool IsDummyPlacement(ShardPlacement *taskPlacement);
extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
Oid indexId,
SizeQueryType sizeQueryType,
bool optimizePartitionCalculations);
extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList);
Expand Down
Loading