Skip to content

Commit

Permalink
Merge branch 'master' into velioglu/function_expand
Browse files Browse the repository at this point in the history
  • Loading branch information
Burak Velioglu committed Feb 28, 2022
2 parents 0700dbb + b825232 commit aee452a
Show file tree
Hide file tree
Showing 26 changed files with 338 additions and 190 deletions.
5 changes: 3 additions & 2 deletions src/backend/distributed/commands/citus_global_signal.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,11 @@ CitusSignalBackend(uint64 globalPID, uint64 timeout, int sig)
}
#endif

int nodeId = ExtractNodeIdFromGlobalPID(globalPID);
bool missingOk = false;
int nodeId = ExtractNodeIdFromGlobalPID(globalPID, missingOk);
int processId = ExtractProcessIdFromGlobalPID(globalPID);

WorkerNode *workerNode = FindNodeWithNodeId(nodeId);
WorkerNode *workerNode = FindNodeWithNodeId(nodeId, missingOk);

StringInfo cancelQuery = makeStringInfo();

Expand Down
2 changes: 1 addition & 1 deletion src/backend/distributed/commands/utility_hook.c
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ multi_ProcessUtility(PlannedStmt *pstmt,
else if (IsA(parsetree, DoStmt))
{
/*
* All statements in a DO block are executed in a single transaciton,
* All statements in a DO block are executed in a single transaction,
* so we need to keep track of whether we are inside a DO block.
*/
DoBlockLevel += 1;
Expand Down
2 changes: 0 additions & 2 deletions src/backend/distributed/executor/citus_custom_scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,6 @@ RegisterCitusCustomScanMethods(void)
static void
CitusBeginScan(CustomScanState *node, EState *estate, int eflags)
{
MarkCitusInitiatedCoordinatorBackend();

CitusScanState *scanState = (CitusScanState *) node;

/*
Expand Down
90 changes: 61 additions & 29 deletions src/backend/distributed/metadata/metadata_utility.c
Original file line number Diff line number Diff line change
Expand Up @@ -1291,6 +1291,52 @@ NodeGroupHasShardPlacements(int32 groupId, bool onlyConsiderActivePlacements)
}


/*
* IsActiveShardPlacement checks if the shard placement is labelled as
* active, and that it is placed in an active worker.
* Expects shard worker to not be NULL.
*/
bool
IsActiveShardPlacement(ShardPlacement *shardPlacement)
{
WorkerNode *workerNode =
FindWorkerNode(shardPlacement->nodeName, shardPlacement->nodePort);

if (!workerNode)
{
ereport(ERROR, (errmsg("There is a shard placement on node %s:%d but "
"could not find the node.", shardPlacement->nodeName,
shardPlacement->nodePort)));
}

return shardPlacement->shardState == SHARD_STATE_ACTIVE &&
workerNode->isActive;
}


/*
* FilterShardPlacementList filters a list of shard placements based on a filter.
* Keep only the shard for which the filter function returns true.
*/
List *
FilterShardPlacementList(List *shardPlacementList, bool (*filter)(ShardPlacement *))
{
List *filteredShardPlacementList = NIL;
ShardPlacement *shardPlacement = NULL;

foreach_ptr(shardPlacement, shardPlacementList)
{
if (filter(shardPlacement))
{
filteredShardPlacementList = lappend(filteredShardPlacementList,
shardPlacement);
}
}

return filteredShardPlacementList;
}


/*
* ActiveShardPlacementListOnGroup returns a list of active shard placements
* that are sitting on group with groupId for given shardId.
Expand Down Expand Up @@ -1323,53 +1369,39 @@ ActiveShardPlacementListOnGroup(uint64 shardId, int32 groupId)
List *
ActiveShardPlacementList(uint64 shardId)
{
List *activePlacementList = NIL;
List *shardPlacementList =
ShardPlacementListIncludingOrphanedPlacements(shardId);

ShardPlacement *shardPlacement = NULL;
foreach_ptr(shardPlacement, shardPlacementList)
{
WorkerNode *workerNode =
FindWorkerNode(shardPlacement->nodeName, shardPlacement->nodePort);
List *activePlacementList = FilterShardPlacementList(shardPlacementList,
IsActiveShardPlacement);

/*
* We have already resolved the placement to node, so would have
* errored out earlier.
*/
Assert(workerNode != NULL);
return SortList(activePlacementList, CompareShardPlacementsByWorker);
}

if (shardPlacement->shardState == SHARD_STATE_ACTIVE &&
workerNode->isActive)
{
activePlacementList = lappend(activePlacementList, shardPlacement);
}
}

return SortList(activePlacementList, CompareShardPlacementsByWorker);
/*
* IsShardPlacementNotOrphaned checks returns true if a shard placement is not orphaned
* Orphaned shards are shards marked to be deleted at a later point (shardstate = 4).
*/
static inline bool
IsShardPlacementNotOrphaned(ShardPlacement *shardPlacement)
{
return shardPlacement->shardState != SHARD_STATE_TO_DELETE;
}


/*
* ShardPlacementListWithoutOrphanedPlacements returns shard placements exluding
* the ones that are orphaned, because they are marked to be deleted at a later
* point (shardstate = 4).
* the ones that are orphaned.
*/
List *
ShardPlacementListWithoutOrphanedPlacements(uint64 shardId)
{
List *activePlacementList = NIL;
List *shardPlacementList =
ShardPlacementListIncludingOrphanedPlacements(shardId);

ShardPlacement *shardPlacement = NULL;
foreach_ptr(shardPlacement, shardPlacementList)
{
if (shardPlacement->shardState != SHARD_STATE_TO_DELETE)
{
activePlacementList = lappend(activePlacementList, shardPlacement);
}
}
List *activePlacementList = FilterShardPlacementList(shardPlacementList,
IsShardPlacementNotOrphaned);

return SortList(activePlacementList, CompareShardPlacementsByWorker);
}
Expand Down
7 changes: 5 additions & 2 deletions src/backend/distributed/metadata/node_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -1541,7 +1541,7 @@ FindWorkerNodeAnyCluster(const char *nodeName, int32 nodePort)
* If the node cannot be found this functions errors.
*/
WorkerNode *
FindNodeWithNodeId(int nodeId)
FindNodeWithNodeId(int nodeId, bool missingOk)
{
List *workerList = ActiveReadableNodeList();
WorkerNode *workerNode = NULL;
Expand All @@ -1555,7 +1555,10 @@ FindNodeWithNodeId(int nodeId)
}

/* there isn't any node with nodeId in pg_dist_node */
elog(ERROR, "worker node with node id %d could not be found", nodeId);
if (!missingOk)
{
elog(ERROR, "worker node with node id %d could not be found", nodeId);
}

return NULL;
}
Expand Down
53 changes: 26 additions & 27 deletions src/backend/distributed/operations/shard_rebalancer.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "distributed/lock_graph.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_utility.h"
#include "distributed/multi_client_executor.h"
#include "distributed/multi_progress.h"
#include "distributed/multi_server_executor.h"
Expand Down Expand Up @@ -190,7 +191,7 @@ static void UpdateShardPlacement(PlacementUpdateEvent *placementUpdateEvent,
List *responsiveNodeList, Oid shardReplicationModeOid);

/* static declarations for main logic's utility functions */
static HTAB * ActivePlacementsHash(List *shardPlacementList);
static HTAB * ShardPlacementsListToHash(List *shardPlacementList);
static bool PlacementsHashFind(HTAB *placementsHash, uint64 shardId,
WorkerNode *workerNode);
static void PlacementsHashEnter(HTAB *placementsHash, uint64 shardId,
Expand Down Expand Up @@ -396,6 +397,7 @@ FullShardPlacementList(Oid relationId, ArrayType *excludedShardArray)
placement->shardId = groupPlacement->shardId;
placement->shardLength = groupPlacement->shardLength;
placement->shardState = groupPlacement->shardState;
placement->nodeId = worker->nodeId;
placement->nodeName = pstrdup(worker->workerName);
placement->nodePort = worker->workerPort;
placement->placementId = groupPlacement->placementId;
Expand Down Expand Up @@ -446,14 +448,17 @@ GetRebalanceSteps(RebalanceOptions *options)

/* sort the lists to make the function more deterministic */
List *activeWorkerList = SortedActiveWorkers();
List *shardPlacementListList = NIL;
List *activeShardPlacementListList = NIL;

Oid relationId = InvalidOid;
foreach_oid(relationId, options->relationIdList)
{
List *shardPlacementList = FullShardPlacementList(relationId,
options->excludedShardArray);
shardPlacementListList = lappend(shardPlacementListList, shardPlacementList);
List *activeShardPlacementListForRelation =
FilterShardPlacementList(shardPlacementList, IsActiveShardPlacement);
activeShardPlacementListList =
lappend(activeShardPlacementListList, activeShardPlacementListForRelation);
}

if (options->threshold < options->rebalanceStrategy->minimumThreshold)
Expand All @@ -471,7 +476,7 @@ GetRebalanceSteps(RebalanceOptions *options)
}

return RebalancePlacementUpdates(activeWorkerList,
shardPlacementListList,
activeShardPlacementListList,
options->threshold,
options->maxShardMoves,
options->drainOnly,
Expand Down Expand Up @@ -795,7 +800,6 @@ rebalance_table_shards(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
ErrorIfMoveUnsupportedTableType(relationId);

relationIdList = list_make1_oid(relationId);
}
else
Expand Down Expand Up @@ -951,9 +955,11 @@ replicate_table_shards(PG_FUNCTION_ARGS)

List *activeWorkerList = SortedActiveWorkers();
List *shardPlacementList = FullShardPlacementList(relationId, excludedShardArray);
List *activeShardPlacementList = FilterShardPlacementList(shardPlacementList,
IsActiveShardPlacement);

List *placementUpdateList = ReplicationPlacementUpdates(activeWorkerList,
shardPlacementList,
activeShardPlacementList,
shardReplicationFactor);
placementUpdateList = list_truncate(placementUpdateList, maxShardCopies);

Expand Down Expand Up @@ -1737,13 +1743,13 @@ ExecuteRebalancerCommandInSeparateTransaction(char *command)
* which is placed in the source node but not in the target node as the shard to
* move.
*
* The shardPlacementListList argument contains a list of lists of shard
* The activeShardPlacementListList argument contains a list of lists of active shard
* placements. Each of these lists are balanced independently. This is used to
* make sure different colocation groups are balanced separately, so each list
* contains the placements of a colocation group.
*/
List *
RebalancePlacementUpdates(List *workerNodeList, List *shardPlacementListList,
RebalancePlacementUpdates(List *workerNodeList, List *activeShardPlacementListList,
double threshold,
int32 maxShardMoves,
bool drainOnly,
Expand All @@ -1755,7 +1761,7 @@ RebalancePlacementUpdates(List *workerNodeList, List *shardPlacementListList,
List *shardPlacementList = NIL;
List *placementUpdateList = NIL;

foreach_ptr(shardPlacementList, shardPlacementListList)
foreach_ptr(shardPlacementList, activeShardPlacementListList)
{
state = InitRebalanceState(workerNodeList, shardPlacementList,
functions);
Expand Down Expand Up @@ -1861,7 +1867,7 @@ InitRebalanceState(List *workerNodeList, List *shardPlacementList,

RebalanceState *state = palloc0(sizeof(RebalanceState));
state->functions = functions;
state->placementsHash = ActivePlacementsHash(shardPlacementList);
state->placementsHash = ShardPlacementsListToHash(shardPlacementList);

/* create empty fill state for all of the worker nodes */
foreach_ptr(workerNode, workerNodeList)
Expand Down Expand Up @@ -2413,29 +2419,25 @@ FindAndMoveShardCost(float4 utilizationLowerBound,
/*
* ReplicationPlacementUpdates returns a list of placement updates which
* replicates shard placements that need re-replication. To do this, the
* function loops over the shard placements, and for each shard placement
* function loops over the active shard placements, and for each shard placement
* which needs to be re-replicated, it chooses an active worker node with
* smallest number of shards as the target node.
*/
List *
ReplicationPlacementUpdates(List *workerNodeList, List *shardPlacementList,
ReplicationPlacementUpdates(List *workerNodeList, List *activeShardPlacementList,
int shardReplicationFactor)
{
List *placementUpdateList = NIL;
ListCell *shardPlacementCell = NULL;
uint32 workerNodeIndex = 0;
HTAB *placementsHash = ActivePlacementsHash(shardPlacementList);
HTAB *placementsHash = ShardPlacementsListToHash(activeShardPlacementList);
uint32 workerNodeCount = list_length(workerNodeList);

/* get number of shards per node */
uint32 *shardCountArray = palloc0(workerNodeCount * sizeof(uint32));
foreach(shardPlacementCell, shardPlacementList)
foreach(shardPlacementCell, activeShardPlacementList)
{
ShardPlacement *placement = lfirst(shardPlacementCell);
if (placement->shardState != SHARD_STATE_ACTIVE)
{
continue;
}

for (workerNodeIndex = 0; workerNodeIndex < workerNodeCount; workerNodeIndex++)
{
Expand All @@ -2449,7 +2451,7 @@ ReplicationPlacementUpdates(List *workerNodeList, List *shardPlacementList,
}
}

foreach(shardPlacementCell, shardPlacementList)
foreach(shardPlacementCell, activeShardPlacementList)
{
WorkerNode *sourceNode = NULL;
WorkerNode *targetNode = NULL;
Expand Down Expand Up @@ -2586,11 +2588,11 @@ ShardActivePlacementCount(HTAB *activePlacementsHash, uint64 shardId,


/*
* ActivePlacementsHash creates and returns a hash set for the placements in
* the given list of shard placements which are in active state.
* ShardPlacementsListToHash creates and returns a hash set from a shard
* placement list.
*/
static HTAB *
ActivePlacementsHash(List *shardPlacementList)
ShardPlacementsListToHash(List *shardPlacementList)
{
ListCell *shardPlacementCell = NULL;
HASHCTL info;
Expand All @@ -2609,11 +2611,8 @@ ActivePlacementsHash(List *shardPlacementList)
foreach(shardPlacementCell, shardPlacementList)
{
ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(shardPlacementCell);
if (shardPlacement->shardState == SHARD_STATE_ACTIVE)
{
void *hashKey = (void *) shardPlacement;
hash_search(shardPlacementsHash, hashKey, HASH_ENTER, NULL);
}
void *hashKey = (void *) shardPlacement;
hash_search(shardPlacementsHash, hashKey, HASH_ENTER, NULL);
}

return shardPlacementsHash;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

DROP FUNCTION pg_catalog.create_distributed_function(regprocedure, text, text, bool);

DROP FUNCTION pg_catalog.worker_partition_query_result(text, text, int, citus.distribution_type, text[], text[], boolean, boolean, boolean);
#include "../udfs/worker_partition_query_result/9.2-1.sql"

CREATE FUNCTION pg_catalog.master_apply_delete_command(text)
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ CREATE OR REPLACE FUNCTION pg_catalog.worker_create_or_replace_object(statements
AS 'MODULE_PATHNAME', $$worker_create_or_replace_object_array$$;

COMMENT ON FUNCTION pg_catalog.worker_create_or_replace_object(statements text[])
IS 'takes a lost of sql statements, before executing these it will check if the object already exists in that exact state otherwise replaces that named object with the new object';
IS 'takes an array of sql statements, before executing these it will check if the object already exists in that exact state otherwise replaces that named object with the new object';
Loading

0 comments on commit aee452a

Please sign in to comment.