Skip to content

Commit

Permalink
Merge branch 'develop' of github.com:rudderlabs/rudder-transformer in…
Browse files Browse the repository at this point in the history
…to chore.integrate-allure-ci
  • Loading branch information
utsabc committed Jan 6, 2025
2 parents 479b1a4 + c47488d commit d01fb4b
Show file tree
Hide file tree
Showing 21 changed files with 4,767 additions and 986 deletions.
7 changes: 4 additions & 3 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
"ajv-draft-04": "^1.0.0",
"ajv-formats": "^2.1.1",
"amazon-dsp-formatter": "^1.0.2",
"axios": "^1.7.3",
"axios": "^1.7.9",
"btoa": "^1.2.1",
"component-each": "^0.2.6",
"crypto-js": "^4.2.0",
Expand Down
23 changes: 23 additions & 0 deletions src/v0/destinations/iterable/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,39 @@ const constructEndpoint = (dataCenter, category) => {
return `${baseUrl}${category.endpoint}`;
};

const BULK_ENDPOINTS = ['/api/users/bulkUpdate', '/api/events/trackBulk'];

const IDENTIFY_MAX_BATCH_SIZE = 1000;
const IDENTIFY_MAX_BODY_SIZE_IN_BYTES = 4000000;

const TRACK_MAX_BATCH_SIZE = 8000;

const ITERABLE_RESPONSE_USER_ID_PATHS = [
'invalidUserIds',
'failedUpdates.invalidUserIds',
'failedUpdates.notFoundUserIds',
'failedUpdates.forgottenUserIds',
'failedUpdates.conflictUserIds',
'failedUpdates.invalidDataUserIds',
];

const ITERABLE_RESPONSE_EMAIL_PATHS = [
'invalidEmails',
'failedUpdates.invalidEmails',
'failedUpdates.notFoundEmails',
'failedUpdates.forgottenEmails',
'failedUpdates.conflictEmails',
'failedUpdates.invalidDataEmails',
];

module.exports = {
mappingConfig,
ConfigCategory,
constructEndpoint,
TRACK_MAX_BATCH_SIZE,
IDENTIFY_MAX_BATCH_SIZE,
IDENTIFY_MAX_BODY_SIZE_IN_BYTES,
ITERABLE_RESPONSE_USER_ID_PATHS,
ITERABLE_RESPONSE_EMAIL_PATHS,
BULK_ENDPOINTS,
};
13 changes: 7 additions & 6 deletions src/v0/destinations/iterable/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ const batchUpdateUserEvents = (updateUserEvents, registerDeviceOrBrowserTokenEve

/**
* Processes chunks of catalog events, extracts the necessary data, and prepares batched requests for further processing
* ref : https://api.iterable.com/api/docs#catalogs_bulkUpdateCatalogItems
* @param {*} catalogEventsChunks
* @returns
*/
Expand Down Expand Up @@ -600,12 +601,12 @@ const batchTrackEvents = (trackEvents) => {
*/
const prepareBatchRequests = (filteredEvents) => {
const {
trackEvents,
catalogEvents,
errorRespList,
updateUserEvents,
eventResponseList,
registerDeviceOrBrowserTokenEvents,
trackEvents, // track
catalogEvents, // identify
errorRespList, // track
updateUserEvents, // identify
eventResponseList, // track
registerDeviceOrBrowserTokenEvents, // identify
} = filteredEvents;

const updateUserBatchedResponseList =
Expand Down
1 change: 0 additions & 1 deletion src/v0/destinations/iterable/util.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ const {
registerDeviceTokenEventPayloadBuilder,
registerBrowserTokenEventPayloadBuilder,
} = require('./util');

const { ConfigCategory } = require('./config');

const getTestMessage = () => {
Expand Down
33 changes: 33 additions & 0 deletions src/v1/destinations/iterable/networkHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { prepareProxyRequest, proxyRequest } from '../../../adapters/network';
import { processAxiosResponse } from '../../../adapters/utils/networkUtils';
import { BULK_ENDPOINTS } from '../../../v0/destinations/iterable/config';
import { GenericStrategy } from './strategies/generic';
import { TrackIdentifyStrategy } from './strategies/track-identify';
import { GenericProxyHandlerInput } from './types';

const strategyRegistry: { [key: string]: any } = {
[TrackIdentifyStrategy.name]: new TrackIdentifyStrategy(),
[GenericStrategy.name]: new GenericStrategy(),
};

const getResponseStrategy = (endpoint: string) => {
if (BULK_ENDPOINTS.some((path) => endpoint.includes(path))) {
return strategyRegistry[TrackIdentifyStrategy.name];
}
return strategyRegistry[GenericStrategy.name];
};

const responseHandler = (responseParams: GenericProxyHandlerInput) => {
const { destinationRequest } = responseParams;
const strategy = getResponseStrategy(destinationRequest.endpoint);
return strategy.handleResponse(responseParams);
};

function networkHandler(this: any) {
this.prepareProxy = prepareProxyRequest;
this.proxy = proxyRequest;
this.processAxiosResponse = processAxiosResponse;
this.responseHandler = responseHandler;
}

export { networkHandler };
22 changes: 22 additions & 0 deletions src/v1/destinations/iterable/strategies/base.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { isHttpStatusSuccess } from '../../../../v0/util';
import { GenericProxyHandlerInput } from '../types';

// Base strategy is the base class for all strategies in Iterable destination
abstract class BaseStrategy {
handleResponse(responseParams: GenericProxyHandlerInput): void {
const { destinationResponse } = responseParams;
const { status } = destinationResponse;

if (!isHttpStatusSuccess(status)) {
return this.handleError(responseParams);
}

return this.handleSuccess(responseParams);
}

abstract handleError(responseParams: GenericProxyHandlerInput): void;

abstract handleSuccess(responseParams: any): void;
}

export { BaseStrategy };
57 changes: 57 additions & 0 deletions src/v1/destinations/iterable/strategies/generic.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { BaseStrategy } from './base';
import {
GenericProxyHandlerInput,
IterableBulkApiResponse,
IterableSuccessResponse,
} from '../types';
import { ProxyMetdata } from '../../../../types';
import { TransformerProxyError } from '../../../../v0/util/errorTypes';
import { TAG_NAMES } from '../../../../v0/util/tags';
import { getDynamicErrorType } from '../../../../adapters/utils/networkUtils';

class GenericStrategy extends BaseStrategy {
handleSuccess(responseParams: {
destinationResponse: IterableBulkApiResponse;
rudderJobMetadata: ProxyMetdata[];
}): IterableSuccessResponse {
const { destinationResponse, rudderJobMetadata } = responseParams;
const { status } = destinationResponse;

const responseWithIndividualEvents = rudderJobMetadata.map((metadata) => ({
statusCode: status,
metadata,
error: 'success',
}));

return {
status,
message: '[ITERABLE Response Handler] - Request Processed Successfully',
destinationResponse,
response: responseWithIndividualEvents,
};
}

handleError(responseParams: GenericProxyHandlerInput): void {
const { destinationResponse, rudderJobMetadata } = responseParams;
const { response, status } = destinationResponse;
const responseMessage = response.params || response.msg || response.message;
const errorMessage = JSON.stringify(responseMessage) || 'unknown error format';

const responseWithIndividualEvents = rudderJobMetadata.map((metadata) => ({
statusCode: status,
metadata,
error: errorMessage,
}));

throw new TransformerProxyError(
`ITERABLE: Error transformer proxy during ITERABLE response transformation. ${errorMessage}`,
status,
{ [TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(status) },
destinationResponse,
'',
responseWithIndividualEvents,
);
}
}

export { GenericStrategy };
69 changes: 69 additions & 0 deletions src/v1/destinations/iterable/strategies/track-identify.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { BaseStrategy } from './base';
import { GenericProxyHandlerInput, IterableBulkProxyInput } from '../types';
import { checkIfEventIsAbortableAndExtractErrorMessage } from '../utils';
import { DeliveryJobState, DeliveryV1Response } from '../../../../types';
import { TransformerProxyError } from '../../../../v0/util/errorTypes';
import { getDynamicErrorType } from '../../../../adapters/utils/networkUtils';
import { TAG_NAMES } from '../../../../v0/util/tags';

class TrackIdentifyStrategy extends BaseStrategy {
handleSuccess(responseParams: IterableBulkProxyInput): DeliveryV1Response {
const { destinationResponse, rudderJobMetadata, destinationRequest } = responseParams;
const { status } = destinationResponse;
const responseWithIndividualEvents: DeliveryJobState[] = [];

const { events, users } = destinationRequest?.body.JSON || {};
const finalData = events || users;

if (finalData) {
finalData.forEach((event, idx) => {
const parsedOutput = {
statusCode: 200,
metadata: rudderJobMetadata[idx],
error: 'success',
};

const { isAbortable, errorMsg } = checkIfEventIsAbortableAndExtractErrorMessage(
event,
destinationResponse,
);
if (isAbortable) {
parsedOutput.statusCode = 400;
parsedOutput.error = errorMsg;
}
responseWithIndividualEvents.push(parsedOutput);
});
}

return {
status,
message: '[ITERABLE Response Handler] - Request Processed Successfully',
destinationResponse,
response: responseWithIndividualEvents,
};
}

handleError(responseParams: GenericProxyHandlerInput): void {
const { destinationResponse, rudderJobMetadata } = responseParams;
const { response, status } = destinationResponse;
const responseMessage = response.params || response.msg || response.message;
const errorMessage = JSON.stringify(responseMessage) || 'unknown error format';

const responseWithIndividualEvents = rudderJobMetadata.map((metadata) => ({
statusCode: status,
metadata,
error: errorMessage,
}));

throw new TransformerProxyError(
`ITERABLE: Error transformer proxy during ITERABLE response transformation. ${errorMessage}`,
status,
{ [TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(status) },
destinationResponse,
'',
responseWithIndividualEvents,
);
}
}

export { TrackIdentifyStrategy };
69 changes: 69 additions & 0 deletions src/v1/destinations/iterable/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { ProxyMetdata, ProxyV1Request } from '../../../types';

type FailedUpdates = {
invalidEmails?: string[];
invalidUserIds?: string[];
notFoundEmails?: string[];
notFoundUserIds?: string[];
invalidDataEmails?: string[];
invalidDataUserIds?: string[];
conflictEmails?: string[];
conflictUserIds?: string[];
forgottenEmails?: string[];
forgottenUserIds?: string[];
};

export type GeneralApiResponse = {
msg?: string;
code?: string;
params?: Record<string, unknown>;
successCount?: number;
failCount?: number;
invalidEmails?: string[];
invalidUserIds?: string[];
filteredOutFields?: string[];
createdFields?: string[];
disallowedEventNames?: string[];
failedUpdates?: FailedUpdates;
};

export type IterableBulkApiResponse = {
status: number;
response: GeneralApiResponse;
};

type IterableBulkRequestBody = {
events?: any[];
users?: any[];
};

export type IterableBulkProxyInput = {
destinationResponse: IterableBulkApiResponse;
rudderJobMetadata: ProxyMetdata[];
destType: string;
destinationRequest?: {
body: {
JSON: IterableBulkRequestBody;
};
};
};

export type GenericProxyHandlerInput = {
destinationResponse: any;
rudderJobMetadata: ProxyMetdata[];
destType: string;
destinationRequest: ProxyV1Request;
};

export type Response = {
statusCode: number;
metadata: any;
error: string;
};

export type IterableSuccessResponse = {
status: number;
message: string;
destinationResponse: IterableBulkApiResponse;
response: Response[];
};
Loading

0 comments on commit d01fb4b

Please sign in to comment.