Skip to content

Commit

Permalink
FAI-13888 - Start creating sync message table and add warning when no…
Browse files Browse the repository at this point in the history
… records emitted for stream (#1861)
  • Loading branch information
cjwooo authored Dec 18, 2024
1 parent 6fea213 commit 2da3d3e
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 1 deletion.
14 changes: 13 additions & 1 deletion destinations/airbyte-faros-destination/src/destination.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
minimizeSpec,
parseAirbyteMessage,
SpecLoader,
SYNC_MESSAGE_TABLE,
SyncMessage,
SyncMode,
wrapApiError,
Expand All @@ -28,7 +29,7 @@ import {ConnectorVersion} from 'faros-airbyte-cdk/lib/runner';
import {FarosClientConfig, HasuraSchemaLoader, Schema} from 'faros-js-client';
import http from 'http';
import https from 'https';
import {difference, isEmpty, keyBy, pickBy, sortBy, uniq} from 'lodash';
import {difference, isEmpty, keyBy, pickBy, uniq} from 'lodash';
import path from 'path';
import readline from 'readline';
import {Writable} from 'stream';
Expand Down Expand Up @@ -729,6 +730,7 @@ export class FarosDestination extends AirbyteDestination<DestinationConfig> {
try {
let isBackfillSync = false;
let sourceConfigReceived = false;
let sourceModeOrType: string;
let sourceSucceeded = false;
let stateReset = false;
let streamStatusReceived = false;
Expand Down Expand Up @@ -768,6 +770,12 @@ export class FarosDestination extends AirbyteDestination<DestinationConfig> {
`No records emitted for ${streamName} stream.` +
' Will not reset non-incremental models.'
);
syncErrors.src.warnings.push(
SYNC_MESSAGE_TABLE.NO_RECORDS_FOR_STREAM(
streamName,
sourceModeOrType || 'source'
)
);
} else if (failedStreams.has(streamName)) {
this.logger.warn(
`Error previously occurred for ${streamName} stream. Will not reset its models.`
Expand All @@ -793,6 +801,10 @@ export class FarosDestination extends AirbyteDestination<DestinationConfig> {
isBackfillSync = true;
}
sourceConfigReceived = true;
sourceModeOrType =
msg.sourceType === 'faros-feeds'
? msg.sourceMode
: msg.sourceType;
await updateLocalAccount?.(msg);
sourceVersion.version = msg.sourceVersion;
ctx.setSourceConfig(msg.redactedConfig);
Expand Down
1 change: 1 addition & 0 deletions faros-airbyte-cdk/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ export * from './sources/source-logger';
export * from './sources/source-runner';
export * from './sources/streams/stream-base';
export * from './spec-loader';
export * from './sync-message-tables';
export * from './testing-tools';
export * from './utils';
40 changes: 40 additions & 0 deletions faros-airbyte-cdk/src/sync-message-tables.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import {SyncMessage} from './protocol';

// TODO: more entries
export const SYNC_MESSAGE_TABLE = {
FEATURE_NOT_ENABLED: (
featureName: string,
sourceType: string
): SyncMessage => ({
code: 1000,
summary: `${featureName} not enabled`,
details: `It seems this functionality, ${featureName}, is not enabled for the ${sourceType} source.`,
action: 'Contact support to enable this feature',
type: 'ERROR',
}),
CONNECTION_UNAVAILABLE: (sourceType: string): SyncMessage => ({
code: 1001,
summary: 'Connection unavailable',
details: `Unable to establish a connection with your ${sourceType} source.`,
action:
'Check if your source is configured to accept requests from our system.',
type: 'ERROR',
}),
INVALID_CREDENTIALS: (sourceType: string): SyncMessage => ({
code: 1002,
summary: 'Invalid credentials',
details: `We are unable to retrieve data from your ${sourceType} source.`,
action: `Check your ${sourceType} credentials. Ensure they are correct and up-to-date.`,
type: 'ERROR',
}),
NO_RECORDS_FOR_STREAM: (
streamName: string,
sourceType: string
): SyncMessage => ({
code: 1003,
summary: `No records emitted for ${streamName} stream`,
entity: streamName,
action: `Check your ${sourceType} credentials. Ensure they have the required permissions.`,
type: 'WARNING',
}),
};

0 comments on commit 2da3d3e

Please sign in to comment.