Skip to content

Commit

Permalink
fix(slack-notifications): [nan-1164] multiple notifications on refres…
Browse files Browse the repository at this point in the history
…h error (#2433)

## Describe your changes
This annoying issue is hard to reproduce without removing the locking
delay on local for the `tryAcquire` method

```
const ttlInMs = 1;
const acquisitionTimeoutMs = 1;
const { tries } = await this.locking.tryAcquire(lockKey, ttlInMs, acquisitionTimeoutMs);
```
With that multiple slack notifications can be seen which in my testing
was mitigated by:
- Adding in transactions and `forUpdate` to the database lookups and
inserts
- Adding a slight randomized delay for webhook firing to prevent
concurrent firing
- `await` the slack notification call

## Testing
- It seemed the customers issue was stemming from proxy calls and only
with salesforce which is likely caused by the introspection call. So I
tested with a script that made many concurrent proxy calls:

```
Promise.allSettled([
  nango.get({
    providerConfigKey: "salesforce",
    connectionId: "sf-2",
    endpoint: `/services/data/v51.0/sobjects/Contacts/describe`,
  }),
  nango.get({
    providerConfigKey: "salesforce",
    connectionId: "sf-2",
    endpoint: `/services/data/v51.0/sobjects/Contacts/describe`,
  }),
  nango.get({
    providerConfigKey: "salesforce",
    connectionId: "sf-2",
    endpoint: `/services/data/v51.0/sobjects/Contacts/describe`,
  }),
  nango.get({
    providerConfigKey: "salesforce",
    connectionId: "sf-2",
    endpoint: `/services/data/v51.0/sobjects/Contacts/describe`,
  }),
  nango.get({
    providerConfigKey: "salesforce",
    connectionId: "sf-2",
    endpoint: `/services/data/v51.0/sobjects/Contacts/describe`,
  }),
  nango.get({
    providerConfigKey: "salesforce",
    connectionId: "sf-2",
    endpoint: `/services/data/v51.0/sobjects/Contacts/describe`,
  }),
  nango.get({
    providerConfigKey: "salesforce",
    connectionId: "sf-2",
    endpoint: `/services/data/v51.0/sobjects/Contacts/describe`,
  }),
  nango.get({
    providerConfigKey: "salesforce",
    connectionId: "sf-2",
    endpoint: `/services/data/v51.0/sobjects/Contacts/describe`,
  }),
  nango.get({
    providerConfigKey: "salesforce",
    connectionId: "sf-2",
    endpoint: `/services/data/v51.0/sobjects/Contacts/describe`,
  }),
  nango.get({
    providerConfigKey: "salesforce",
    connectionId: "sf-2",
    endpoint: `/services/data/v51.0/sobjects/Contacts/describe`,
  }),
  nango.get({
    providerConfigKey: "salesforce",
    connectionId: "sf-2",
    endpoint: `/services/data/v51.0/sobjects/Contacts/describe`,
  }),
  nango.get({
    providerConfigKey: "salesforce",
    connectionId: "sf-2",
    endpoint: `/services/data/v51.0/sobjects/Contacts/describe`,
  }),
  nango.get({
    providerConfigKey: "salesforce",
    connectionId: "sf-2",
    endpoint: `/services/data/v51.0/sobjects/Contacts/describe`,
  }),
  nango.get({
    providerConfigKey: "salesforce",
    connectionId: "sf-2",
    endpoint: `/services/data/v51.0/sobjects/Contacts/describe`,
  }),
  nango.get({
    providerConfigKey: "salesforce",
    connectionId: "sf-2",
    endpoint: `/services/data/v51.0/sobjects/Contacts/describe`,
  }),
  nango.get({
    providerConfigKey: "salesforce",
    connectionId: "sf-2",
    endpoint: `/services/data/v51.0/sobjects/Contacts/describe`,
  }),
  nango.get({
    providerConfigKey: "salesforce",
    connectionId: "sf-2",
    endpoint: `/services/data/v51.0/sobjects/Contacts/describe`,
  }),
  nango.get({
    providerConfigKey: "salesforce",
    connectionId: "sf-2",
    endpoint: `/services/data/v51.0/sobjects/Contacts/describe`,
  }),
  nango.get({
    providerConfigKey: "salesforce",
    connectionId: "sf-2",
    endpoint: `/services/data/v51.0/sobjects/Contacts/describe`,
  }),
  nango.get({
    providerConfigKey: "salesforce",
    connectionId: "sf-2",
    endpoint: `/services/data/v51.0/sobjects/Contacts/describe`,
  }),
  nango.get({
    providerConfigKey: "salesforce",
    connectionId: "sf-2",
    endpoint: `/services/data/v51.0/sobjects/Contacts/describe`,
  }),
  nango.get({
    providerConfigKey: "salesforce",
    connectionId: "sf-2",
    endpoint: `/services/data/v51.0/sobjects/Contacts/describe`,
  }),
  nango.get({
    providerConfigKey: "salesforce",
    connectionId: "sf-2",
    endpoint: `/services/data/v51.0/sobjects/Contacts/describe`,
  }),
```

With this I was seeing multiple slack notifications but with the update
logic now only one is received consistently.

## Issue ticket number and link
NAN-1163

## Checklist before requesting a review (skip if just adding/editing
APIs & templates)
- [ ] I added tests, otherwise the reason is: 
- [ ] I added observability, otherwise the reason is:
- [ ] I added analytics, otherwise the reason is:
  • Loading branch information
khaliqgant authored Jul 1, 2024
1 parent a12ca11 commit 9a19c0d
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 90 deletions.
4 changes: 2 additions & 2 deletions packages/server/lib/hooks/hooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ export const connectionRefreshSuccess = async ({

const slackNotificationService = new SlackService({ orchestratorClient: getOrchestratorClient(), logContextGetter });

void slackNotificationService.removeFailingConnection(connection, connection.connection_id, 'auth', null, environment.id, config.provider);
await slackNotificationService.removeFailingConnection(connection, connection.connection_id, 'auth', null, environment.id, config.provider);
};

export const connectionRefreshFailed = async ({
Expand Down Expand Up @@ -185,7 +185,7 @@ export const connectionRefreshFailed = async ({

const slackNotificationService = new SlackService({ orchestratorClient: getOrchestratorClient(), logContextGetter });

void slackNotificationService.reportFailure(connection, connection.connection_id, 'auth', logCtx.id, environment.id, config.provider);
await slackNotificationService.reportFailure(connection, connection.connection_id, 'auth', logCtx.id, environment.id, config.provider);
};

export const connectionTest = async (
Expand Down
191 changes: 103 additions & 88 deletions packages/shared/lib/services/notification/slack.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { SlackNotification } from '../../models/SlackNotification.js';
import type { NangoConnection } from '../../models/Connection.js';
import type { ServiceResponse } from '../../models/Generic.js';
import environmentService from '../environment.service.js';
import { basePublicUrl, getLogger } from '@nangohq/utils';
import { basePublicUrl, getLogger, stringToHash } from '@nangohq/utils';
import connectionService from '../connection.service.js';
import accountService from '../account.service.js';
import type { LogContext, LogContextGetter } from '@nangohq/logs';
Expand Down Expand Up @@ -436,11 +436,13 @@ export class SlackService {
async hasOpenNotification(
nangoConnection: NangoConnection,
name: string,
type: string
type: string,
trx = db.knex
): Promise<Pick<SlackNotification, 'id' | 'connection_list' | 'slack_timestamp' | 'admin_slack_timestamp'> | null> {
const hasOpenNotification = await schema()
const hasOpenNotification = await trx
.select('id', 'connection_list', 'slack_timestamp', 'admin_slack_timestamp')
.from<SlackNotification>(TABLE)
.forUpdate()
.where({
open: true,
environment_id: nangoConnection.environment_id,
Expand All @@ -460,8 +462,8 @@ export class SlackService {
* @desc create a new notification for the given name and environment id
* and return the id of the created notification.
*/
async createNotification(nangoConnection: NangoConnection, name: string, type: string): Promise<Pick<SlackNotification, 'id'> | null> {
const result = await schema()
async createNotification(nangoConnection: NangoConnection, name: string, type: string, trx = db.knex): Promise<Pick<SlackNotification, 'id'> | null> {
const result = await trx
.from<SlackNotification>(TABLE)
.insert({
open: true,
Expand All @@ -485,35 +487,68 @@ export class SlackService {
* and if so add the connection id to the connection list.
*/
async addFailingConnection(nangoConnection: NangoConnection, name: string, type: string): Promise<ServiceResponse<NotificationResponse>> {
const isOpen = await this.hasOpenNotification(nangoConnection, name, type);
return await db.knex.transaction(async (trx) => {
const lockKey = stringToHash(`${nangoConnection.environment_id}-${name}-${type}`);

if (isOpen && type === 'auth') {
return {
success: true,
error: null,
response: null
};
}
const { rows } = await trx.raw<{ rows: { pg_try_advisory_xact_lock: boolean }[] }>(`SELECT pg_try_advisory_xact_lock(?);`, [lockKey]);

logger.info(`Notifying ${nangoConnection.id} type:${type} name:${name}`);
if (!rows?.[0]?.pg_try_advisory_xact_lock) {
logger.info(`${lockKey} could not acquire lock, skipping`);
return { success: true, error: null, response: null };
}

if (!isOpen) {
const created = await this.createNotification(nangoConnection, name, type);
const isOpen = await this.hasOpenNotification(nangoConnection, name, type, trx);

return {
success: true,
error: null,
response: {
id: created?.id as number,
isOpen: false,
connectionCount: 1
}
};
}
if (isOpen && type === 'auth') {
return {
success: true,
error: null,
response: null
};
}

const { id, connection_list } = isOpen;
logger.info(`Notifying ${nangoConnection.id} type:${type} name:${name}`);

if (!isOpen) {
const created = await this.createNotification(nangoConnection, name, type, trx);

return {
success: true,
error: null,
response: {
id: created?.id as number,
isOpen: false,
connectionCount: 1
}
};
}

const { id, connection_list } = isOpen;

if (connection_list.includes(nangoConnection.id as number)) {
return {
success: true,
error: null,
response: {
id: id as number,
isOpen: true,
slack_timestamp: isOpen.slack_timestamp as string,
admin_slack_timestamp: isOpen.admin_slack_timestamp as string,
connectionCount: connection_list.length
}
};
}

connection_list.push(nangoConnection.id as number);

await trx
.from<SlackNotification>(TABLE)
.where({ id: id as number })
.update({
connection_list,
updated_at: new Date()
});

if (connection_list.includes(nangoConnection.id as number)) {
return {
success: true,
error: null,
Expand All @@ -525,29 +560,7 @@ export class SlackService {
connectionCount: connection_list.length
}
};
}

connection_list.push(nangoConnection.id as number);

await schema()
.from<SlackNotification>(TABLE)
.where({ id: id as number })
.update({
connection_list,
updated_at: new Date()
});

return {
success: true,
error: null,
response: {
id: id as number,
isOpen: true,
slack_timestamp: isOpen.slack_timestamp as string,
admin_slack_timestamp: isOpen.admin_slack_timestamp as string,
connectionCount: connection_list.length
}
};
});
}

/**
Expand All @@ -565,52 +578,54 @@ export class SlackService {
environment_id: number,
provider: string
): Promise<void> {
const slackNotificationsEnabled = await environmentService.getSlackNotificationsEnabled(nangoConnection.environment_id);
await db.knex.transaction(async (trx) => {
const slackNotificationsEnabled = await environmentService.getSlackNotificationsEnabled(nangoConnection.environment_id);

if (!slackNotificationsEnabled) {
return;
}
if (!slackNotificationsEnabled) {
return;
}

const isOpen = await this.hasOpenNotification(nangoConnection, name, type);
const isOpen = await this.hasOpenNotification(nangoConnection, name, type, trx);

if (!isOpen) {
return;
}
if (!isOpen) {
return;
}

const { id, connection_list, slack_timestamp, admin_slack_timestamp } = isOpen;
const { id, connection_list, slack_timestamp, admin_slack_timestamp } = isOpen;

const index = connection_list.indexOf(nangoConnection.id as number);
if (index === -1) {
return;
}
const index = connection_list.indexOf(nangoConnection.id as number);
if (index === -1) {
return;
}

logger.info(`Resolving ${nangoConnection.id} type:${type} name:${name}`);
logger.info(`Resolving ${nangoConnection.id} type:${type} name:${name}`);

connection_list.splice(index, 1);
connection_list.splice(index, 1);

await db.knex
.from<SlackNotification>(TABLE)
.where({ id: id as number })
.update({
open: connection_list.length > 0,
connection_list,
updated_at: new Date()
});
await trx
.from<SlackNotification>(TABLE)
.where({ id: id as number })
.update({
open: connection_list.length > 0,
connection_list,
updated_at: new Date()
});

// we report resolution to the slack channel which could be either
// 1) The slack notification is resolved, connection_list === 0
// 2) The list of failing connections has been decremented
await this.reportResolution(
nangoConnection,
name,
type,
originalActivityLogId,
environment_id,
provider,
slack_timestamp as string,
admin_slack_timestamp as string,
connection_list.length
);
// we report resolution to the slack channel which could be either
// 1) The slack notification is resolved, connection_list === 0
// 2) The list of failing connections has been decremented
await this.reportResolution(
nangoConnection,
name,
type,
originalActivityLogId,
environment_id,
provider,
slack_timestamp as string,
admin_slack_timestamp as string,
connection_list.length
);
});
}

async closeAllOpenNotifications(environment_id: number): Promise<void> {
Expand Down

0 comments on commit 9a19c0d

Please sign in to comment.