From 9a19c0df88376d4d4b074174ae1161670a65d871 Mon Sep 17 00:00:00 2001 From: Khaliq Date: Mon, 1 Jul 2024 23:13:08 +0300 Subject: [PATCH] fix(slack-notifications): [nan-1164] multiple notifications on refresh error (#2433) ## Describe your changes This annoying issue is hard to reproduce without removing the locking delay on local for the `tryAcquire` method ``` const ttlInMs = 1; const acquisitionTimeoutMs = 1; const { tries } = await this.locking.tryAcquire(lockKey, ttlInMs, acquisitionTimeoutMs); ``` With that multiple slack notifications can be seen which in my testing was mitigated by: - Adding in transactions and `forUpdate` to the database lookups and inserts - Adding a slight randomized delay for webhook firing to prevent concurrent firing - `await` the slack notification call ## Testing - It seemed the customers issue was stemming from proxy calls and only with salesforce which is likely caused by the introspection call. So I tested with a script that made many concurrent proxy calls: ``` Promise.allSettled([ nango.get({ providerConfigKey: "salesforce", connectionId: "sf-2", endpoint: `/services/data/v51.0/sobjects/Contacts/describe`, }), nango.get({ providerConfigKey: "salesforce", connectionId: "sf-2", endpoint: `/services/data/v51.0/sobjects/Contacts/describe`, }), nango.get({ providerConfigKey: "salesforce", connectionId: "sf-2", endpoint: `/services/data/v51.0/sobjects/Contacts/describe`, }), nango.get({ providerConfigKey: "salesforce", connectionId: "sf-2", endpoint: `/services/data/v51.0/sobjects/Contacts/describe`, }), nango.get({ providerConfigKey: "salesforce", connectionId: "sf-2", endpoint: `/services/data/v51.0/sobjects/Contacts/describe`, }), nango.get({ providerConfigKey: "salesforce", connectionId: "sf-2", endpoint: `/services/data/v51.0/sobjects/Contacts/describe`, }), nango.get({ providerConfigKey: "salesforce", connectionId: "sf-2", endpoint: `/services/data/v51.0/sobjects/Contacts/describe`, }), nango.get({ providerConfigKey: "salesforce", connectionId: "sf-2", endpoint: `/services/data/v51.0/sobjects/Contacts/describe`, }), nango.get({ providerConfigKey: "salesforce", connectionId: "sf-2", endpoint: `/services/data/v51.0/sobjects/Contacts/describe`, }), nango.get({ providerConfigKey: "salesforce", connectionId: "sf-2", endpoint: `/services/data/v51.0/sobjects/Contacts/describe`, }), nango.get({ providerConfigKey: "salesforce", connectionId: "sf-2", endpoint: `/services/data/v51.0/sobjects/Contacts/describe`, }), nango.get({ providerConfigKey: "salesforce", connectionId: "sf-2", endpoint: `/services/data/v51.0/sobjects/Contacts/describe`, }), nango.get({ providerConfigKey: "salesforce", connectionId: "sf-2", endpoint: `/services/data/v51.0/sobjects/Contacts/describe`, }), nango.get({ providerConfigKey: "salesforce", connectionId: "sf-2", endpoint: `/services/data/v51.0/sobjects/Contacts/describe`, }), nango.get({ providerConfigKey: "salesforce", connectionId: "sf-2", endpoint: `/services/data/v51.0/sobjects/Contacts/describe`, }), nango.get({ providerConfigKey: "salesforce", connectionId: "sf-2", endpoint: `/services/data/v51.0/sobjects/Contacts/describe`, }), nango.get({ providerConfigKey: "salesforce", connectionId: "sf-2", endpoint: `/services/data/v51.0/sobjects/Contacts/describe`, }), nango.get({ providerConfigKey: "salesforce", connectionId: "sf-2", endpoint: `/services/data/v51.0/sobjects/Contacts/describe`, }), nango.get({ providerConfigKey: "salesforce", connectionId: "sf-2", endpoint: `/services/data/v51.0/sobjects/Contacts/describe`, }), nango.get({ providerConfigKey: "salesforce", connectionId: "sf-2", endpoint: `/services/data/v51.0/sobjects/Contacts/describe`, }), nango.get({ providerConfigKey: "salesforce", connectionId: "sf-2", endpoint: `/services/data/v51.0/sobjects/Contacts/describe`, }), nango.get({ providerConfigKey: "salesforce", connectionId: "sf-2", endpoint: `/services/data/v51.0/sobjects/Contacts/describe`, }), nango.get({ providerConfigKey: "salesforce", connectionId: "sf-2", endpoint: `/services/data/v51.0/sobjects/Contacts/describe`, }), ``` With this I was seeing multiple slack notifications but with the update logic now only one is received consistently. ## Issue ticket number and link NAN-1163 ## Checklist before requesting a review (skip if just adding/editing APIs & templates) - [ ] I added tests, otherwise the reason is: - [ ] I added observability, otherwise the reason is: - [ ] I added analytics, otherwise the reason is: --- packages/server/lib/hooks/hooks.ts | 4 +- .../services/notification/slack.service.ts | 191 ++++++++++-------- 2 files changed, 105 insertions(+), 90 deletions(-) diff --git a/packages/server/lib/hooks/hooks.ts b/packages/server/lib/hooks/hooks.ts index a716804d69..3cc4ab8924 100644 --- a/packages/server/lib/hooks/hooks.ts +++ b/packages/server/lib/hooks/hooks.ts @@ -142,7 +142,7 @@ export const connectionRefreshSuccess = async ({ const slackNotificationService = new SlackService({ orchestratorClient: getOrchestratorClient(), logContextGetter }); - void slackNotificationService.removeFailingConnection(connection, connection.connection_id, 'auth', null, environment.id, config.provider); + await slackNotificationService.removeFailingConnection(connection, connection.connection_id, 'auth', null, environment.id, config.provider); }; export const connectionRefreshFailed = async ({ @@ -185,7 +185,7 @@ export const connectionRefreshFailed = async ({ const slackNotificationService = new SlackService({ orchestratorClient: getOrchestratorClient(), logContextGetter }); - void slackNotificationService.reportFailure(connection, connection.connection_id, 'auth', logCtx.id, environment.id, config.provider); + await slackNotificationService.reportFailure(connection, connection.connection_id, 'auth', logCtx.id, environment.id, config.provider); }; export const connectionTest = async ( diff --git a/packages/shared/lib/services/notification/slack.service.ts b/packages/shared/lib/services/notification/slack.service.ts index 6ed9ca13f0..c3124c8a99 100644 --- a/packages/shared/lib/services/notification/slack.service.ts +++ b/packages/shared/lib/services/notification/slack.service.ts @@ -3,7 +3,7 @@ import type { SlackNotification } from '../../models/SlackNotification.js'; import type { NangoConnection } from '../../models/Connection.js'; import type { ServiceResponse } from '../../models/Generic.js'; import environmentService from '../environment.service.js'; -import { basePublicUrl, getLogger } from '@nangohq/utils'; +import { basePublicUrl, getLogger, stringToHash } from '@nangohq/utils'; import connectionService from '../connection.service.js'; import accountService from '../account.service.js'; import type { LogContext, LogContextGetter } from '@nangohq/logs'; @@ -436,11 +436,13 @@ export class SlackService { async hasOpenNotification( nangoConnection: NangoConnection, name: string, - type: string + type: string, + trx = db.knex ): Promise | null> { - const hasOpenNotification = await schema() + const hasOpenNotification = await trx .select('id', 'connection_list', 'slack_timestamp', 'admin_slack_timestamp') .from(TABLE) + .forUpdate() .where({ open: true, environment_id: nangoConnection.environment_id, @@ -460,8 +462,8 @@ export class SlackService { * @desc create a new notification for the given name and environment id * and return the id of the created notification. */ - async createNotification(nangoConnection: NangoConnection, name: string, type: string): Promise | null> { - const result = await schema() + async createNotification(nangoConnection: NangoConnection, name: string, type: string, trx = db.knex): Promise | null> { + const result = await trx .from(TABLE) .insert({ open: true, @@ -485,35 +487,68 @@ export class SlackService { * and if so add the connection id to the connection list. */ async addFailingConnection(nangoConnection: NangoConnection, name: string, type: string): Promise> { - const isOpen = await this.hasOpenNotification(nangoConnection, name, type); + return await db.knex.transaction(async (trx) => { + const lockKey = stringToHash(`${nangoConnection.environment_id}-${name}-${type}`); - if (isOpen && type === 'auth') { - return { - success: true, - error: null, - response: null - }; - } + const { rows } = await trx.raw<{ rows: { pg_try_advisory_xact_lock: boolean }[] }>(`SELECT pg_try_advisory_xact_lock(?);`, [lockKey]); - logger.info(`Notifying ${nangoConnection.id} type:${type} name:${name}`); + if (!rows?.[0]?.pg_try_advisory_xact_lock) { + logger.info(`${lockKey} could not acquire lock, skipping`); + return { success: true, error: null, response: null }; + } - if (!isOpen) { - const created = await this.createNotification(nangoConnection, name, type); + const isOpen = await this.hasOpenNotification(nangoConnection, name, type, trx); - return { - success: true, - error: null, - response: { - id: created?.id as number, - isOpen: false, - connectionCount: 1 - } - }; - } + if (isOpen && type === 'auth') { + return { + success: true, + error: null, + response: null + }; + } - const { id, connection_list } = isOpen; + logger.info(`Notifying ${nangoConnection.id} type:${type} name:${name}`); + + if (!isOpen) { + const created = await this.createNotification(nangoConnection, name, type, trx); + + return { + success: true, + error: null, + response: { + id: created?.id as number, + isOpen: false, + connectionCount: 1 + } + }; + } + + const { id, connection_list } = isOpen; + + if (connection_list.includes(nangoConnection.id as number)) { + return { + success: true, + error: null, + response: { + id: id as number, + isOpen: true, + slack_timestamp: isOpen.slack_timestamp as string, + admin_slack_timestamp: isOpen.admin_slack_timestamp as string, + connectionCount: connection_list.length + } + }; + } + + connection_list.push(nangoConnection.id as number); + + await trx + .from(TABLE) + .where({ id: id as number }) + .update({ + connection_list, + updated_at: new Date() + }); - if (connection_list.includes(nangoConnection.id as number)) { return { success: true, error: null, @@ -525,29 +560,7 @@ export class SlackService { connectionCount: connection_list.length } }; - } - - connection_list.push(nangoConnection.id as number); - - await schema() - .from(TABLE) - .where({ id: id as number }) - .update({ - connection_list, - updated_at: new Date() - }); - - return { - success: true, - error: null, - response: { - id: id as number, - isOpen: true, - slack_timestamp: isOpen.slack_timestamp as string, - admin_slack_timestamp: isOpen.admin_slack_timestamp as string, - connectionCount: connection_list.length - } - }; + }); } /** @@ -565,52 +578,54 @@ export class SlackService { environment_id: number, provider: string ): Promise { - const slackNotificationsEnabled = await environmentService.getSlackNotificationsEnabled(nangoConnection.environment_id); + await db.knex.transaction(async (trx) => { + const slackNotificationsEnabled = await environmentService.getSlackNotificationsEnabled(nangoConnection.environment_id); - if (!slackNotificationsEnabled) { - return; - } + if (!slackNotificationsEnabled) { + return; + } - const isOpen = await this.hasOpenNotification(nangoConnection, name, type); + const isOpen = await this.hasOpenNotification(nangoConnection, name, type, trx); - if (!isOpen) { - return; - } + if (!isOpen) { + return; + } - const { id, connection_list, slack_timestamp, admin_slack_timestamp } = isOpen; + const { id, connection_list, slack_timestamp, admin_slack_timestamp } = isOpen; - const index = connection_list.indexOf(nangoConnection.id as number); - if (index === -1) { - return; - } + const index = connection_list.indexOf(nangoConnection.id as number); + if (index === -1) { + return; + } - logger.info(`Resolving ${nangoConnection.id} type:${type} name:${name}`); + logger.info(`Resolving ${nangoConnection.id} type:${type} name:${name}`); - connection_list.splice(index, 1); + connection_list.splice(index, 1); - await db.knex - .from(TABLE) - .where({ id: id as number }) - .update({ - open: connection_list.length > 0, - connection_list, - updated_at: new Date() - }); + await trx + .from(TABLE) + .where({ id: id as number }) + .update({ + open: connection_list.length > 0, + connection_list, + updated_at: new Date() + }); - // we report resolution to the slack channel which could be either - // 1) The slack notification is resolved, connection_list === 0 - // 2) The list of failing connections has been decremented - await this.reportResolution( - nangoConnection, - name, - type, - originalActivityLogId, - environment_id, - provider, - slack_timestamp as string, - admin_slack_timestamp as string, - connection_list.length - ); + // we report resolution to the slack channel which could be either + // 1) The slack notification is resolved, connection_list === 0 + // 2) The list of failing connections has been decremented + await this.reportResolution( + nangoConnection, + name, + type, + originalActivityLogId, + environment_id, + provider, + slack_timestamp as string, + admin_slack_timestamp as string, + connection_list.length + ); + }); } async closeAllOpenNotifications(environment_id: number): Promise {