Skip to content

Commit

Permalink
Start Maintenance Daemon for Main DB at the server start. (#7254)
Browse files Browse the repository at this point in the history
DESCRIPTION: This change starts a maintenance deamon at the time of
server start if there is a designated main database.

This is the code flow:

1. User designates a main database:
   `ALTER SYSTEM SET citus.main_db =  "myadmindb";`

2. When postmaster starts, in _PG_Init, citus calls 
    `InitializeMaintenanceDaemonForMainDb`
  
This function registers a background worker to run
`CitusMaintenanceDaemonMain `with `databaseOid = 0 `

3. `CitusMaintenanceDaemonMain ` takes some special actions when
databaseOid is 0:
     - Gets the citus.main_db  value.
     - Connects to the  citus.main_db
     - Now the `MyDatabaseId `is available, creates a hash entry for it.
     - Then follows the same control flow as for a regular db,
  • Loading branch information
emelsimsek authored Oct 30, 2023
1 parent d0b093c commit ee8f4bb
Show file tree
Hide file tree
Showing 5 changed files with 301 additions and 71 deletions.
11 changes: 11 additions & 0 deletions src/backend/distributed/shared_library_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ _PG_init(void)
#endif

InitializeMaintenanceDaemon();
InitializeMaintenanceDaemonForMainDb();

/* initialize coordinated transaction management */
InitializeTransactionManagement();
Expand Down Expand Up @@ -1820,6 +1821,16 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE | GUC_UNIT_MS,
NULL, NULL, NULL);

DefineCustomStringVariable(
"citus.main_db",
gettext_noop("Which database is designated as the main_db"),
NULL,
&MainDb,
"",
PGC_POSTMASTER,
GUC_STANDARD,
NULL, NULL, NULL);

DefineCustomIntVariable(
"citus.max_adaptive_executor_pool_size",
gettext_noop("Sets the maximum number of connections per worker node used by "
Expand Down
271 changes: 200 additions & 71 deletions src/backend/distributed/utils/maintenanced.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ int Recover2PCInterval = 60000;
int DeferShardDeleteInterval = 15000;
int BackgroundTaskQueueCheckInterval = 5000;
int MaxBackgroundTaskExecutors = 4;
char *MainDb = "";

/* config variables for metadata sync timeout */
int MetadataSyncInterval = 60000;
Expand All @@ -112,7 +113,7 @@ static MaintenanceDaemonControlData *MaintenanceDaemonControl = NULL;
* activated.
*/
static HTAB *MaintenanceDaemonDBHash;

static ErrorContextCallback errorCallback = { 0 };
static volatile sig_atomic_t got_SIGHUP = false;
static volatile sig_atomic_t got_SIGTERM = false;

Expand All @@ -125,6 +126,8 @@ static void MaintenanceDaemonShmemExit(int code, Datum arg);
static void MaintenanceDaemonErrorContext(void *arg);
static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData);
static void WarnMaintenanceDaemonNotStarted(void);
static MaintenanceDaemonDBData * GetMaintenanceDaemonDBHashEntry(Oid databaseId,
bool *found);

/*
* InitializeMaintenanceDaemon, called at server start, is responsible for
Expand All @@ -139,6 +142,82 @@ InitializeMaintenanceDaemon(void)
}


/*
* GetMaintenanceDaemonDBHashEntry searches the MaintenanceDaemonDBHash for the
* databaseId. It returns the entry if found or creates a new entry and initializes
* the value with zeroes.
*/
MaintenanceDaemonDBData *
GetMaintenanceDaemonDBHashEntry(Oid databaseId, bool *found)
{
MaintenanceDaemonDBData *dbData = (MaintenanceDaemonDBData *) hash_search(
MaintenanceDaemonDBHash,
&MyDatabaseId,
HASH_ENTER_NULL,
found);

if (!dbData)
{
elog(LOG,
"cannot create or find the maintenance deamon hash entry for database %u",
databaseId);
return NULL;
}

if (!*found)
{
/* ensure the values in MaintenanceDaemonDBData are zero */
memset(((char *) dbData) + sizeof(Oid), 0,
sizeof(MaintenanceDaemonDBData) - sizeof(Oid));
}

return dbData;
}


/*
* InitializeMaintenanceDaemonForMainDb is called in _PG_Init
* at which stage we are not in a transaction or have databaseOid
*/
void
InitializeMaintenanceDaemonForMainDb(void)
{
if (strcmp(MainDb, "") == 0)
{
elog(LOG, "There is no designated Main database.");
return;
}

BackgroundWorker worker;

memset(&worker, 0, sizeof(worker));


strcpy_s(worker.bgw_name, sizeof(worker.bgw_name),
"Citus Maintenance Daemon for Main DB");

/* request ability to connect to target database */
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;

/*
* No point in getting started before able to run query, but we do
* want to get started on Hot-Standby.
*/
worker.bgw_start_time = BgWorkerStart_ConsistentState;

/* Restart after a bit after errors, but don't bog the system. */
worker.bgw_restart_time = 5;
strcpy_s(worker.bgw_library_name,
sizeof(worker.bgw_library_name), "citus");
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
"CitusMaintenanceDaemonMain");

worker.bgw_main_arg = (Datum) 0;

RegisterBackgroundWorker(&worker);
}


/*
* InitializeMaintenanceDaemonBackend, called at backend start and
* configuration changes, is responsible for starting a per-database
Expand All @@ -148,31 +227,20 @@ void
InitializeMaintenanceDaemonBackend(void)
{
Oid extensionOwner = CitusExtensionOwner();
bool found;
bool found = false;

LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);

MaintenanceDaemonDBData *dbData = (MaintenanceDaemonDBData *) hash_search(
MaintenanceDaemonDBHash,
&MyDatabaseId,
HASH_ENTER_NULL,
&found);
MaintenanceDaemonDBData *dbData = GetMaintenanceDaemonDBHashEntry(MyDatabaseId,
&found);

if (dbData == NULL)
{
WarnMaintenanceDaemonNotStarted();
LWLockRelease(&MaintenanceDaemonControl->lock);

return;
}

if (!found)
{
/* ensure the values in MaintenanceDaemonDBData are zero */
memset(((char *) dbData) + sizeof(Oid), 0,
sizeof(MaintenanceDaemonDBData) - sizeof(Oid));
}

if (IsMaintenanceDaemon)
{
/*
Expand Down Expand Up @@ -271,66 +339,97 @@ WarnMaintenanceDaemonNotStarted(void)


/*
* CitusMaintenanceDaemonMain is the maintenance daemon's main routine, it'll
* be started by the background worker infrastructure. If it errors out,
* it'll be restarted after a few seconds.
* ConnectToDatabase connects to the database for the given databaseOid.
* if databaseOid is 0, connects to MainDb and then creates a hash entry.
* If a hash entry cannot be created for MainDb it exits the process requesting a restart.
* However for regular databases, it exits without requesting a restart since another
* subsequent backend is expected to start the Maintenance Daemon.
* If the found hash entry has a valid workerPid, it exits
* without requesting a restart since there is already a daemon running.
*/
void
CitusMaintenanceDaemonMain(Datum main_arg)
static MaintenanceDaemonDBData *
ConnectToDatabase(Oid databaseOid)
{
Oid databaseOid = DatumGetObjectId(main_arg);
TimestampTz nextStatsCollectionTime USED_WITH_LIBCURL_ONLY =
TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 60 * 1000);
bool retryStatsCollection USED_WITH_LIBCURL_ONLY = false;
TimestampTz lastRecoveryTime = 0;
TimestampTz lastShardCleanTime = 0;
TimestampTz lastStatStatementsPurgeTime = 0;
TimestampTz nextMetadataSyncTime = 0;
MaintenanceDaemonDBData *myDbData = NULL;

/* state kept for the background tasks queue monitor */
TimestampTz lastBackgroundTaskQueueCheck = GetCurrentTimestamp();
BackgroundWorkerHandle *backgroundTasksQueueBgwHandle = NULL;
bool backgroundTasksQueueWarnedForLock = false;

/*
* We do metadata sync in a separate background worker. We need its
* handle to be able to check its status.
*/
BackgroundWorkerHandle *metadataSyncBgwHandle = NULL;
bool isMainDb = false;

/*
* Look up this worker's configuration.
*/
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);

MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *)
hash_search(MaintenanceDaemonDBHash, &databaseOid,
HASH_FIND, NULL);
if (!myDbData)

if (databaseOid == 0)
{
char *databaseName = MainDb;

/*
* When the database crashes, background workers are restarted, but
* the state in shared memory is lost. In that case, we exit and
* wait for a session to call InitializeMaintenanceDaemonBackend
* to properly add it to the hash.
* Since we cannot query databaseOid without initializing Postgres
* first, connect to the database by name.
*/
BackgroundWorkerInitializeConnection(databaseName, NULL, 0);

proc_exit(0);
}

if (myDbData->workerPid != 0)
{
/*
* Another maintenance daemon is running. This usually happens because
* postgres restarts the daemon after an non-zero exit, and
* InitializeMaintenanceDaemonBackend started one before postgres did.
* In that case, the first one stays and the last one exits.
* Now we have a valid MyDatabaseId.
* Insert the hash entry for the database to the Maintenance Deamon Hash.
*/
bool found = false;

myDbData = GetMaintenanceDaemonDBHashEntry(MyDatabaseId, &found);

if (!myDbData)
{
/*
* If an entry cannot be created,
* return code of 1 requests worker restart
* Since BackgroundWorker for the MainDb is only registered
* once during server startup, we need to retry.
*/
proc_exit(1);
}

if (found && myDbData->workerPid != 0)
{
/* Another maintenance daemon is running.*/

proc_exit(0);
}

proc_exit(0);
databaseOid = MyDatabaseId;
myDbData->userOid = GetSessionUserId();
isMainDb = true;
}
else
{
myDbData = (MaintenanceDaemonDBData *)
hash_search(MaintenanceDaemonDBHash, &databaseOid,
HASH_FIND, NULL);

before_shmem_exit(MaintenanceDaemonShmemExit, main_arg);
if (!myDbData)
{
/*
* When the database crashes, background workers are restarted, but
* the state in shared memory is lost. In that case, we exit and
* wait for a session to call InitializeMaintenanceDaemonBackend
* to properly add it to the hash.
*/

proc_exit(0);
}

if (myDbData->workerPid != 0)
{
/*
* Another maintenance daemon is running. This usually happens because
* postgres restarts the daemon after an non-zero exit, and
* InitializeMaintenanceDaemonBackend started one before postgres did.
* In that case, the first one stays and the last one exits.
*/

proc_exit(0);
}
}

before_shmem_exit(MaintenanceDaemonShmemExit, ObjectIdGetDatum(databaseOid));

/*
* Signal that I am the maintenance daemon now.
Expand All @@ -356,25 +455,55 @@ CitusMaintenanceDaemonMain(Datum main_arg)

LWLockRelease(&MaintenanceDaemonControl->lock);

/*
* Setup error context so log messages can be properly attributed. Some of
* them otherwise sound like they might be from a normal user connection.
* Do so before setting up signals etc, so we never exit without the
* context setup.
*/
ErrorContextCallback errorCallback = { 0 };
memset(&errorCallback, 0, sizeof(errorCallback));
errorCallback.callback = MaintenanceDaemonErrorContext;
errorCallback.arg = (void *) myDbData;
errorCallback.previous = error_context_stack;
error_context_stack = &errorCallback;


elog(LOG, "starting maintenance daemon on database %u user %u",
databaseOid, myDbData->userOid);

/* connect to database, after that we can actually access catalogs */
BackgroundWorkerInitializeConnectionByOid(databaseOid, myDbData->userOid, 0);
if (!isMainDb)
{
/* connect to database, after that we can actually access catalogs */
BackgroundWorkerInitializeConnectionByOid(databaseOid, myDbData->userOid, 0);
}

return myDbData;
}


/*
* CitusMaintenanceDaemonMain is the maintenance daemon's main routine, it'll
* be started by the background worker infrastructure. If it errors out,
* it'll be restarted after a few seconds.
*/
void
CitusMaintenanceDaemonMain(Datum main_arg)
{
Oid databaseOid = DatumGetObjectId(main_arg);
TimestampTz nextStatsCollectionTime USED_WITH_LIBCURL_ONLY =
TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 60 * 1000);
bool retryStatsCollection USED_WITH_LIBCURL_ONLY = false;
TimestampTz lastRecoveryTime = 0;
TimestampTz lastShardCleanTime = 0;
TimestampTz lastStatStatementsPurgeTime = 0;
TimestampTz nextMetadataSyncTime = 0;

/* state kept for the background tasks queue monitor */
TimestampTz lastBackgroundTaskQueueCheck = GetCurrentTimestamp();
BackgroundWorkerHandle *backgroundTasksQueueBgwHandle = NULL;
bool backgroundTasksQueueWarnedForLock = false;


/*
* We do metadata sync in a separate background worker. We need its
* handle to be able to check its status.
*/
BackgroundWorkerHandle *metadataSyncBgwHandle = NULL;

MaintenanceDaemonDBData *myDbData = ConnectToDatabase(databaseOid);

/* make worker recognizable in pg_stat_activity */
pgstat_report_appname("Citus Maintenance Daemon");
Expand All @@ -383,7 +512,7 @@ CitusMaintenanceDaemonMain(Datum main_arg)
* Terminate orphaned metadata sync daemons spawned from previously terminated
* or crashed maintenanced instances.
*/
SignalMetadataSyncDaemon(databaseOid, SIGTERM);
SignalMetadataSyncDaemon(MyDatabaseId, SIGTERM);

/* enter main loop */
while (!got_SIGTERM)
Expand Down Expand Up @@ -945,7 +1074,7 @@ MaintenanceDaemonShmemExit(int code, Datum arg)
}


/* MaintenanceDaemonSigTermHandler calls proc_exit(0) */
/* MaintenanceDaemonSigTermHandler sets the got_SIGTERM flag.*/
static void
MaintenanceDaemonSigTermHandler(SIGNAL_ARGS)
{
Expand Down
Loading

0 comments on commit ee8f4bb

Please sign in to comment.