Skip to content

Commit

Permalink
Allow citus_*_size on index related to a distributed table (#7271)
Browse files Browse the repository at this point in the history
I just enhanced the existing code to check if the relation is an index
belonging to a distributed table.
If so the shardId is appended to relation (index) name and the *_size
function are executed as before.

There is a change in an extern function:
  `extern StringInfo GenerateSizeQueryOnMultiplePlacements(...)`
It's possible to create a new function and deprecate this one later if
compatibility is an issue.

Fixes #6496.

DESCRIPTION: Allows using Citus size functions on distributed tables
indexes.

---------

Co-authored-by: Onur Tirtir <onurcantirtir@gmail.com>
  • Loading branch information
c2main and onurctirtir authored Nov 1, 2023
1 parent a76a832 commit 37415ef
Show file tree
Hide file tree
Showing 9 changed files with 333 additions and 68 deletions.
144 changes: 97 additions & 47 deletions src/backend/distributed/metadata/metadata_utility.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "access/sysattr.h"
#include "access/xact.h"
#include "catalog/dependency.h"
#include "catalog/index.h"
#include "catalog/indexing.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_constraint.h"
Expand Down Expand Up @@ -88,11 +89,11 @@ static uint64 * AllocateUint64(uint64 value);
static void RecordDistributedRelationDependencies(Oid distributedRelationId);
static GroupShardPlacement * TupleToGroupShardPlacement(TupleDesc tupleDesc,
HeapTuple heapTuple);
static bool DistributedTableSize(Oid relationId, SizeQueryType sizeQueryType,
bool failOnError, uint64 *tableSize);
static bool DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
SizeQueryType sizeQueryType, bool failOnError,
uint64 *tableSize);
static bool DistributedRelationSize(Oid relationId, SizeQueryType sizeQueryType,
bool failOnError, uint64 *relationSize);
static bool DistributedRelationSizeOnWorker(WorkerNode *workerNode, Oid relationId,
SizeQueryType sizeQueryType, bool failOnError,
uint64 *relationSize);
static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId);
static char * GenerateShardIdNameValuesForShardList(List *shardIntervalList,
bool firstValue);
Expand Down Expand Up @@ -282,7 +283,7 @@ citus_shard_sizes(PG_FUNCTION_ARGS)


/*
* citus_total_relation_size accepts a table name and returns a distributed table
* citus_total_relation_size accepts a distributed table name and returns a distributed table
* and its indexes' total relation size.
*/
Datum
Expand All @@ -294,20 +295,20 @@ citus_total_relation_size(PG_FUNCTION_ARGS)
bool failOnError = PG_GETARG_BOOL(1);

SizeQueryType sizeQueryType = TOTAL_RELATION_SIZE;
uint64 tableSize = 0;
uint64 relationSize = 0;

if (!DistributedTableSize(relationId, sizeQueryType, failOnError, &tableSize))
if (!DistributedRelationSize(relationId, sizeQueryType, failOnError, &relationSize))
{
Assert(!failOnError);
PG_RETURN_NULL();
}

PG_RETURN_INT64(tableSize);
PG_RETURN_INT64(relationSize);
}


/*
* citus_table_size accepts a table name and returns a distributed table's total
* citus_table_size accepts a distributed table name and returns a distributed table's total
* relation size.
*/
Datum
Expand All @@ -318,21 +319,24 @@ citus_table_size(PG_FUNCTION_ARGS)
Oid relationId = PG_GETARG_OID(0);
bool failOnError = true;
SizeQueryType sizeQueryType = TABLE_SIZE;
uint64 tableSize = 0;
uint64 relationSize = 0;

if (!DistributedTableSize(relationId, sizeQueryType, failOnError, &tableSize))
/* We do not check if relation is really a table, like PostgreSQL is doing. */
if (!DistributedRelationSize(relationId, sizeQueryType, failOnError, &relationSize))
{
Assert(!failOnError);
PG_RETURN_NULL();
}

PG_RETURN_INT64(tableSize);
PG_RETURN_INT64(relationSize);
}


/*
* citus_relation_size accept a table name and returns a relation's 'main'
* citus_relation_size accept a distributed relation name and returns a relation's 'main'
* fork's size.
*
* Input relation is allowed to be an index on a distributed table too.
*/
Datum
citus_relation_size(PG_FUNCTION_ARGS)
Expand All @@ -344,7 +348,7 @@ citus_relation_size(PG_FUNCTION_ARGS)
SizeQueryType sizeQueryType = RELATION_SIZE;
uint64 relationSize = 0;

if (!DistributedTableSize(relationId, sizeQueryType, failOnError, &relationSize))
if (!DistributedRelationSize(relationId, sizeQueryType, failOnError, &relationSize))
{
Assert(!failOnError);
PG_RETURN_NULL();
Expand Down Expand Up @@ -506,13 +510,16 @@ ReceiveShardIdAndSizeResults(List *connectionList, Tuplestorestate *tupleStore,


/*
* DistributedTableSize is helper function for each kind of citus size functions.
* It first checks whether the table is distributed and size query can be run on
* it. Connection to each node has to be established to get the size of the table.
* DistributedRelationSize is helper function for each kind of citus size
* functions. It first checks whether the relation is a distributed table or an
* index belonging to a distributed table and size query can be run on it.
* Connection to each node has to be established to get the size of the
* relation.
* Input relation is allowed to be an index on a distributed table too.
*/
static bool
DistributedTableSize(Oid relationId, SizeQueryType sizeQueryType, bool failOnError,
uint64 *tableSize)
DistributedRelationSize(Oid relationId, SizeQueryType sizeQueryType,
bool failOnError, uint64 *relationSize)
{
int logLevel = WARNING;

Expand All @@ -538,7 +545,7 @@ DistributedTableSize(Oid relationId, SizeQueryType sizeQueryType, bool failOnErr
if (relation == NULL)
{
ereport(logLevel,
(errmsg("could not compute table size: relation does not exist")));
(errmsg("could not compute relation size: relation does not exist")));

return false;
}
Expand All @@ -553,8 +560,9 @@ DistributedTableSize(Oid relationId, SizeQueryType sizeQueryType, bool failOnErr
{
uint64 relationSizeOnNode = 0;

bool gotSize = DistributedTableSizeOnWorker(workerNode, relationId, sizeQueryType,
failOnError, &relationSizeOnNode);
bool gotSize = DistributedRelationSizeOnWorker(workerNode, relationId,
sizeQueryType,
failOnError, &relationSizeOnNode);
if (!gotSize)
{
return false;
Expand All @@ -563,21 +571,22 @@ DistributedTableSize(Oid relationId, SizeQueryType sizeQueryType, bool failOnErr
sumOfSizes += relationSizeOnNode;
}

*tableSize = sumOfSizes;
*relationSize = sumOfSizes;

return true;
}


/*
* DistributedTableSizeOnWorker gets the workerNode and relationId to calculate
* DistributedRelationSizeOnWorker gets the workerNode and relationId to calculate
* size of that relation on the given workerNode by summing up the size of each
* shard placement.
* Input relation is allowed to be an index on a distributed table too.
*/
static bool
DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
SizeQueryType sizeQueryType,
bool failOnError, uint64 *tableSize)
DistributedRelationSizeOnWorker(WorkerNode *workerNode, Oid relationId,
SizeQueryType sizeQueryType,
bool failOnError, uint64 *relationSize)
{
int logLevel = WARNING;

Expand All @@ -591,28 +600,40 @@ DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
uint32 connectionFlag = 0;
PGresult *result = NULL;

/* if the relation is an index, update relationId and define indexId */
Oid indexId = InvalidOid;
Oid relKind = get_rel_relkind(relationId);
if (relKind == RELKIND_INDEX || relKind == RELKIND_PARTITIONED_INDEX)
{
indexId = relationId;

bool missingOk = false;
relationId = IndexGetRelation(indexId, missingOk);
}

List *shardIntervalsOnNode = ShardIntervalsOnWorkerGroup(workerNode, relationId);

/*
* We pass false here, because if we optimize this, we would include child tables.
* But citus size functions shouldn't include them, like PG.
*/
bool optimizePartitionCalculations = false;
StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements(
StringInfo relationSizeQuery = GenerateSizeQueryOnMultiplePlacements(
shardIntervalsOnNode,
indexId,
sizeQueryType,
optimizePartitionCalculations);

MultiConnection *connection = GetNodeConnection(connectionFlag, workerNodeName,
workerNodePort);
int queryResult = ExecuteOptionalRemoteCommand(connection, tableSizeQuery->data,
int queryResult = ExecuteOptionalRemoteCommand(connection, relationSizeQuery->data,
&result);

if (queryResult != 0)
{
ereport(logLevel, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not connect to %s:%d to get size of "
"table \"%s\"",
"relation \"%s\"",
workerNodeName, workerNodePort,
get_rel_name(relationId))));

Expand All @@ -626,19 +647,19 @@ DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
ClearResults(connection, failOnError);

ereport(logLevel, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("cannot parse size of table \"%s\" from %s:%d",
errmsg("cannot parse size of relation \"%s\" from %s:%d",
get_rel_name(relationId), workerNodeName,
workerNodePort)));

return false;
}

StringInfo tableSizeStringInfo = (StringInfo) linitial(sizeList);
char *tableSizeString = tableSizeStringInfo->data;
StringInfo relationSizeStringInfo = (StringInfo) linitial(sizeList);
char *relationSizeString = relationSizeStringInfo->data;

if (strlen(tableSizeString) > 0)
if (strlen(relationSizeString) > 0)
{
*tableSize = SafeStringToUint64(tableSizeString);
*relationSize = SafeStringToUint64(relationSizeString);
}
else
{
Expand All @@ -647,7 +668,7 @@ DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
* being executed. For this case we get an empty string as table size.
* We can take that as zero to prevent any unnecessary errors.
*/
*tableSize = 0;
*relationSize = 0;
}

PQclear(result);
Expand Down Expand Up @@ -732,17 +753,21 @@ ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId)

/*
* GenerateSizeQueryOnMultiplePlacements generates a select size query to get
* size of multiple tables. Note that, different size functions supported by PG
* size of multiple relations. Note that, different size functions supported by PG
* are also supported by this function changing the size query type given as the
* last parameter to function. Depending on the sizeQueryType enum parameter, the
* generated query will call one of the functions: pg_relation_size,
* pg_total_relation_size, pg_table_size and cstore_table_size.
* This function uses UDFs named worker_partitioned_*_size for partitioned tables,
* if the parameter optimizePartitionCalculations is true. The UDF to be called is
* determined by the parameter sizeQueryType.
*
* indexId is provided if we're interested in the size of an index, not the whole
* table.
*/
StringInfo
GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
Oid indexId,
SizeQueryType sizeQueryType,
bool optimizePartitionCalculations)
{
Expand All @@ -766,16 +791,20 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
*/
continue;
}

/* we need to build the shard relation name, being an index or table */
Oid objectId = OidIsValid(indexId) ? indexId : shardInterval->relationId;

uint64 shardId = shardInterval->shardId;
Oid schemaId = get_rel_namespace(shardInterval->relationId);
Oid schemaId = get_rel_namespace(objectId);
char *schemaName = get_namespace_name(schemaId);
char *shardName = get_rel_name(shardInterval->relationId);
char *shardName = get_rel_name(objectId);
AppendShardIdToName(&shardName, shardId);

char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName);
char *quotedShardName = quote_literal_cstr(shardQualifiedName);

/* for partitoned tables, we will call worker_partitioned_... size functions */
/* for partitioned tables, we will call worker_partitioned_... size functions */
if (optimizePartitionCalculations && PartitionedTable(shardInterval->relationId))
{
partitionedShardNames = lappend(partitionedShardNames, quotedShardName);
Expand Down Expand Up @@ -1010,19 +1039,40 @@ AppendShardIdNameValues(StringInfo selectQuery, ShardInterval *shardInterval)


/*
* ErrorIfNotSuitableToGetSize determines whether the table is suitable to find
* ErrorIfNotSuitableToGetSize determines whether the relation is suitable to find
* its' size with internal functions.
*/
static void
ErrorIfNotSuitableToGetSize(Oid relationId)
{
if (!IsCitusTable(relationId))
{
char *relationName = get_rel_name(relationId);
char *escapedQueryString = quote_literal_cstr(relationName);
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("cannot calculate the size because relation %s is not "
"distributed", escapedQueryString)));
Oid relKind = get_rel_relkind(relationId);
if (relKind != RELKIND_INDEX && relKind != RELKIND_PARTITIONED_INDEX)
{
char *relationName = get_rel_name(relationId);
char *escapedRelationName = quote_literal_cstr(relationName);
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg(
"cannot calculate the size because relation %s "
"is not distributed",
escapedRelationName)));
}
bool missingOk = false;
Oid indexId = relationId;
relationId = IndexGetRelation(relationId, missingOk);
if (!IsCitusTable(relationId))
{
char *tableName = get_rel_name(relationId);
char *escapedTableName = quote_literal_cstr(tableName);
char *indexName = get_rel_name(indexId);
char *escapedIndexName = quote_literal_cstr(indexName);
ereport(ERROR, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg(
"cannot calculate the size because table %s for "
"index %s is not distributed",
escapedTableName, escapedIndexName)));
}
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/backend/distributed/operations/shard_transfer.c
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,12 @@ ShardListSizeInBytes(List *shardList, char *workerNodeName, uint32

/* we skip child tables of a partitioned table if this boolean variable is true */
bool optimizePartitionCalculations = true;

/* we're interested in whole table, not a particular index */
Oid indexId = InvalidOid;

StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements(shardList,
indexId,
TOTAL_RELATION_SIZE,
optimizePartitionCalculations);

Expand Down
1 change: 1 addition & 0 deletions src/include/distributed/metadata_utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ extern void LookupTaskPlacementHostAndPort(ShardPlacement *taskPlacement, char *
int *nodePort);
extern bool IsDummyPlacement(ShardPlacement *taskPlacement);
extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
Oid indexId,
SizeQueryType sizeQueryType,
bool optimizePartitionCalculations);
extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList);
Expand Down
1 change: 1 addition & 0 deletions src/test/regress/citus_tests/run_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ def extra_tests(self):
),
"grant_on_schema_propagation": TestDeps("minimal_schedule"),
"propagate_extension_commands": TestDeps("minimal_schedule"),
"multi_size_queries": TestDeps("base_schedule", ["multi_copy"]),
}


Expand Down
2 changes: 1 addition & 1 deletion src/test/regress/expected/isolation_drop_vs_all.out
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ step s1-drop: DROP TABLE drop_hash;
step s2-table-size: SELECT citus_total_relation_size('drop_hash'); <waiting ...>
step s1-commit: COMMIT;
step s2-table-size: <... completed>
ERROR: could not compute table size: relation does not exist
ERROR: could not compute relation size: relation does not exist
step s2-commit: COMMIT;
step s1-select-count: SELECT COUNT(*) FROM drop_hash;
ERROR: relation "drop_hash" does not exist
Expand Down
6 changes: 6 additions & 0 deletions src/test/regress/expected/multi_cluster_management.out
Original file line number Diff line number Diff line change
Expand Up @@ -1258,3 +1258,9 @@ SELECT bool_and(hasmetadata) AND bool_and(metadatasynced) FROM pg_dist_node WHER
t
(1 row)

-- Grant all on public schema to public
--
-- That's the default on Postgres versions < 15 and we want to
-- keep permissions compatible accross versions, in regression
-- tests.
GRANT ALL ON SCHEMA public TO PUBLIC;
Loading

0 comments on commit 37415ef

Please sign in to comment.