Skip to content

Commit

Permalink
Adds 2PC distributed commands to pools
Browse files Browse the repository at this point in the history
  • Loading branch information
halilozanakgul committed Dec 1, 2023
1 parent dbdde11 commit b5606fa
Show file tree
Hide file tree
Showing 39 changed files with 689 additions and 19 deletions.
1 change: 1 addition & 0 deletions src/backend/distributed/commands/function.c
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,7 @@ UpdateFunctionDistributionInfo(const ObjectAddress *distAddress,

char *workerPgDistObjectUpdateCommand =
MarkObjectsDistributedCreateCommand(objectAddressList,
NIL,
distArgumentIndexList,
colocationIdList,
forceDelegationList);
Expand Down
57 changes: 57 additions & 0 deletions src/backend/distributed/commands/utility_hook.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "access/htup_details.h"
#include "catalog/catalog.h"
#include "catalog/dependency.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_database.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
Expand All @@ -44,6 +45,7 @@
#include "nodes/makefuncs.h"
#include "nodes/parsenodes.h"
#include "nodes/pg_list.h"
#include "postmaster/postmaster.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
Expand Down Expand Up @@ -77,6 +79,7 @@
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/reference_table_utils.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/string_utils.h"
#include "distributed/transaction_management.h"
Expand Down Expand Up @@ -112,6 +115,8 @@ static void PostStandardProcessUtility(Node *parsetree);
static void DecrementUtilityHookCountersIfNecessary(Node *parsetree);
static bool IsDropSchemaOrDB(Node *parsetree);
static bool ShouldCheckUndistributeCitusLocalTables(void);
static void RunPreprocessMainDBCommand(Node *parsetree, const char *queryString);
static void RunPostprocessMainDBCommand(Node *parsetree);

/*
* ProcessUtilityParseTree is a convenience method to create a PlannedStmt out of
Expand Down Expand Up @@ -243,13 +248,23 @@ citus_ProcessUtility(PlannedStmt *pstmt,

if (!CitusHasBeenLoaded())
{
if (!IsCitusMainDB())
{
RunPreprocessMainDBCommand(parsetree, queryString);
}

/*
* Ensure that utility commands do not behave any differently until CREATE
* EXTENSION is invoked.
*/
PrevProcessUtility(pstmt, queryString, false, context,
params, queryEnv, dest, completionTag);

if (!IsCitusMainDB())
{
RunPostprocessMainDBCommand(parsetree);
}

return;
}
else if (IsA(parsetree, CallStmt))
Expand Down Expand Up @@ -1572,3 +1587,45 @@ DropSchemaOrDBInProgress(void)
{
return activeDropSchemaOrDBs > 0;
}


/*
* RunPreprocessMainDBCommand runs the necessary commands for a query, in main
* database before query is run on the local node with PrevProcessUtility
*/
static void RunPreprocessMainDBCommand(Node *parsetree, const char *queryString)
{
if (IsA(parsetree, CreateRoleStmt))
{
StringInfo mainDBQuery = makeStringInfo();
appendStringInfo(mainDBQuery,
"SELECT citus_internal_start_management_transaction('%lu')",
GetCurrentFullTransactionId().value);
RunCitusMainDBQuery(mainDBQuery->data);
mainDBQuery = makeStringInfo();
appendStringInfo(mainDBQuery,

Check warning on line 1606 in src/backend/distributed/commands/utility_hook.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/utility_hook.c#L1605-L1606

Added lines #L1605 - L1606 were not covered by tests
"SELECT execute_command_on_other_nodes(%s)",
quote_literal_cstr(queryString));
RunCitusMainDBQuery(mainDBQuery->data);

Check warning on line 1609 in src/backend/distributed/commands/utility_hook.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/utility_hook.c#L1609

Added line #L1609 was not covered by tests
}
}

/*
* RunPostprocessMainDBCommand runs the necessary commands for a query, in main
* database after query is run on the local node with PrevProcessUtility
*/
static void RunPostprocessMainDBCommand(Node *parsetree)
{
if (IsA(parsetree, CreateRoleStmt))
{
StringInfo mainDBQuery = makeStringInfo();
CreateRoleStmt *createRoleStmt = castNode(CreateRoleStmt, parsetree);
Oid roleOid = get_role_oid(createRoleStmt->role, false);
appendStringInfo(mainDBQuery,

Check warning on line 1624 in src/backend/distributed/commands/utility_hook.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/utility_hook.c#L1621-L1624

Added lines #L1621 - L1624 were not covered by tests
"SELECT citus_mark_object_distributed(%d, %s, %d)",
AuthIdRelationId,
quote_literal_cstr(createRoleStmt->role),

Check warning on line 1627 in src/backend/distributed/commands/utility_hook.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/utility_hook.c#L1627

Added line #L1627 was not covered by tests
roleOid);
RunCitusMainDBQuery(mainDBQuery->data);

Check warning on line 1629 in src/backend/distributed/commands/utility_hook.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/commands/utility_hook.c#L1629

Added line #L1629 was not covered by tests
}
}
14 changes: 13 additions & 1 deletion src/backend/distributed/connection/connection_configuration.c
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,19 @@ GetEffectiveConnKey(ConnectionHashKey *key)
return key;
}

WorkerNode *worker = FindWorkerNode(key->hostname, key->port);
WorkerNode *worker = NULL;

if (!CitusHasBeenLoaded())
{
/*
* This happens when we connect to main database over localhost
* from some non Citus database.
*/
return key;
}

worker = FindWorkerNode(key->hostname, key->port);

if (worker == NULL)
{
/* this can be hit when the key references an unknown node */
Expand Down
48 changes: 44 additions & 4 deletions src/backend/distributed/metadata/distobject.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,39 @@
#include "distributed/metadata/pg_dist_object.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/remote_commands.h"
#include "distributed/version_compat.h"
#include "distributed/worker_transaction.h"

static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress);
static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress,
char *objectName);
static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes,
Datum *paramValues);
static bool IsObjectDistributed(const ObjectAddress *address);

PG_FUNCTION_INFO_V1(citus_mark_object_distributed);
PG_FUNCTION_INFO_V1(citus_unmark_object_distributed);
PG_FUNCTION_INFO_V1(master_unmark_object_distributed);


/*
* citus_mark_object_distributed adds an object to pg_dist_object
* in all of the nodes.
*/
Datum
citus_mark_object_distributed(PG_FUNCTION_ARGS)

Check warning on line 72 in src/backend/distributed/metadata/distobject.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/metadata/distobject.c#L72

Added line #L72 was not covered by tests
{
Oid classId = PG_GETARG_OID(0);
text *objectNameText = PG_GETARG_TEXT_P(1);
char *objectName = text_to_cstring(objectNameText);
Oid objectId = PG_GETARG_OID(2);
ObjectAddress *objectAddress = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*objectAddress, classId, objectId);
MarkObjectDistributedWithName(objectAddress, objectName);
PG_RETURN_VOID();

Check warning on line 81 in src/backend/distributed/metadata/distobject.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/metadata/distobject.c#L74-L81

Added lines #L74 - L81 were not covered by tests
}


/*
* citus_unmark_object_distributed(classid oid, objid oid, objsubid int)
*
Expand Down Expand Up @@ -160,12 +181,28 @@ ObjectExists(const ObjectAddress *address)
void
MarkObjectDistributed(const ObjectAddress *distAddress)
{
MarkObjectDistributedWithName(distAddress, "");
}


/*
* MarkObjectDistributedWithName marks an object as a distributed object.
* Same as MarkObjectDistributed but this function also allows passing an objectName
* that is used in case the object does not exists for the current transaction.
*/
void
MarkObjectDistributedWithName(const ObjectAddress *distAddress, char* objectName)
{
if (!CitusHasBeenLoaded())
{
return;
}
MarkObjectDistributedLocally(distAddress);

if (EnableMetadataSync)
{
char *workerPgDistObjectUpdateCommand =
CreatePgDistObjectEntryCommand(distAddress);
CreatePgDistObjectEntryCommand(distAddress, objectName);
SendCommandToRemoteNodesWithMetadata(workerPgDistObjectUpdateCommand);
}
}
Expand All @@ -188,7 +225,7 @@ MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress)
if (EnableMetadataSync)
{
char *workerPgDistObjectUpdateCommand =
CreatePgDistObjectEntryCommand(distAddress);
CreatePgDistObjectEntryCommand(distAddress, "");
SendCommandToRemoteNodesWithMetadataViaSuperUser(workerPgDistObjectUpdateCommand);
}
}
Expand Down Expand Up @@ -279,17 +316,20 @@ ShouldMarkRelationDistributed(Oid relationId)
* for the given object address.
*/
static char *
CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress)
CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress, char *objectName)
{
/* create a list by adding the address of value to not to have warning */
List *objectAddressList =
list_make1((ObjectAddress *) objectAddress);
/* names also require a list so we create a nested list here */
List *objectNameList = list_make1(list_make1((char *)objectName));
List *distArgumetIndexList = list_make1_int(INVALID_DISTRIBUTION_ARGUMENT_INDEX);
List *colocationIdList = list_make1_int(INVALID_COLOCATION_ID);
List *forceDelegationList = list_make1_int(NO_FORCE_PUSHDOWN);

char *workerPgDistObjectUpdateCommand =
MarkObjectsDistributedCreateCommand(objectAddressList,
objectNameList,
distArgumetIndexList,
colocationIdList,
forceDelegationList);
Expand Down
5 changes: 5 additions & 0 deletions src/backend/distributed/metadata/metadata_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_placement.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/remote_commands.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/shared_library_init.h"
#include "distributed/utils/array_type.h"
Expand Down Expand Up @@ -5722,6 +5723,10 @@ GetPoolinfoViaCatalog(int32 nodeId)
char *
GetAuthinfoViaCatalog(const char *roleName, int64 nodeId)
{
if (!CitusHasBeenLoaded())
{
return "";
}
char *authinfo = "";
Datum nodeIdDatumArray[2] = {
Int32GetDatum(nodeId),
Expand Down
23 changes: 21 additions & 2 deletions src/backend/distributed/metadata/metadata_sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
#include "distributed/pg_dist_shard.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
#include "distributed/remote_transaction.h"
#include "distributed/resource_lock.h"
#include "distributed/tenant_schema_metadata.h"
#include "distributed/utils/array_type.h"
Expand Down Expand Up @@ -900,6 +901,7 @@ NodeListIdempotentInsertCommand(List *workerNodeList)
*/
char *
MarkObjectsDistributedCreateCommand(List *addresses,
List *namesArg,
List *distributionArgumentIndexes,
List *colocationIds,
List *forceDelegations)
Expand All @@ -924,9 +926,25 @@ MarkObjectsDistributedCreateCommand(List *addresses,
int forceDelegation = list_nth_int(forceDelegations, currentObjectCounter);
List *names = NIL;
List *args = NIL;
char *objectType = NULL;

char *objectType = getObjectTypeDescription(address, false);
getObjectIdentityParts(address, &names, &args, false);
if (IsMainDBCommand)
{
/*
* When we try to distribute an object that's being created in a non Citus
* main database, we cannot find the name, since the object is not visible
* in Citus main database.
* Because of that we need to pass the name to this function.
*/
names = list_nth(namesArg, currentObjectCounter);
bool missingOk = false;
objectType = getObjectTypeDescription(address, missingOk);

Check warning on line 941 in src/backend/distributed/metadata/metadata_sync.c

View check run for this annotation

Codecov / codecov/patch

src/backend/distributed/metadata/metadata_sync.c#L939-L941

Added lines #L939 - L941 were not covered by tests
}
else
{
objectType = getObjectTypeDescription(address, false);
getObjectIdentityParts(address, &names, &args, IsMainDBCommand);
}

if (!isFirstObject)
{
Expand Down Expand Up @@ -5148,6 +5166,7 @@ SendDistObjectCommands(MetadataSyncContext *context)

char *workerMetadataUpdateCommand =
MarkObjectsDistributedCreateCommand(list_make1(address),
NIL,
list_make1_int(distributionArgumentIndex),
list_make1_int(colocationId),
list_make1_int(forceDelegation));
Expand Down
3 changes: 3 additions & 0 deletions src/backend/distributed/shared_library_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
#include "distributed/reference_table_utils.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
#include "distributed/remote_transaction.h"
#include "distributed/repartition_executor.h"
#include "distributed/replication_origin_session_utils.h"
#include "distributed/resource_lock.h"
Expand Down Expand Up @@ -3149,6 +3150,8 @@ CitusAuthHook(Port *port, int status)
*/
InitializeBackendData(port->application_name);

IsMainDB = (strncmp(MainDb, "", NAMEDATALEN) == 0 ||
strncmp(MainDb, port->database_name, NAMEDATALEN) == 0);

/* let other authentication hooks to kick in first */
if (original_client_auth_hook)
Expand Down
8 changes: 8 additions & 0 deletions src/backend/distributed/sql/citus--12.0-1--12.1-1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,11 @@
#include "udfs/citus_internal_delete_placement_metadata/12.1-1.sql"

#include "udfs/citus_schema_move/12.1-1.sql"


#include "udfs/citus_internal_start_management_transaction/12.1-1.sql"
#include "udfs/execute_command_on_other_nodes/12.1-1.sql"
#include "udfs/citus_mark_object_distributed/12.1-1.sql"
#include "udfs/commit_management_command_2pc/12.1-1.sql"

ALTER TABLE pg_catalog.pg_dist_transaction ADD COLUMN outer_xid xid8;
17 changes: 17 additions & 0 deletions src/backend/distributed/sql/downgrades/citus--12.1-1--12.0-1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,20 @@ DROP FUNCTION pg_catalog.citus_schema_move(
schema_id regnamespace, target_node_id integer,
shard_transfer_mode citus.shard_transfer_mode
);


DROP FUNCTION pg_catalog.citus_internal_start_management_transaction(
outer_xid xid8
);

DROP FUNCTION pg_catalog.execute_command_on_other_nodes(
query text
);

DROP FUNCTION pg_catalog.citus_mark_object_distributed(
classId Oid, objectName text, objectId Oid
);

DROP FUNCTION pg_catalog.commit_management_command_2pc();

ALTER TABLE pg_catalog.pg_dist_transaction DROP COLUMN outer_xid;

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_start_management_transaction(outer_xid xid8)
RETURNS VOID
LANGUAGE C
AS 'MODULE_PATHNAME', $$citus_internal_start_management_transaction$$;

COMMENT ON FUNCTION pg_catalog.citus_internal_start_management_transaction(outer_xid xid8)
IS 'internal Citus function that starts a management transaction in the main database';

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_mark_object_distributed(classId Oid, objectName text, objectId Oid)
RETURNS VOID
LANGUAGE C
AS 'MODULE_PATHNAME', $$citus_mark_object_distributed$$;

COMMENT ON FUNCTION pg_catalog.citus_mark_object_distributed(classId Oid, objectName text, objectId Oid)
IS 'adds an object to pg_dist_object on all nodes';

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b5606fa

Please sign in to comment.