Skip to content

Commit

Permalink
FAI-14018 Move bucket related functions to its own file + function to…
Browse files Browse the repository at this point in the history
… apply round robin to state (#1834)
  • Loading branch information
ypc-faros authored Dec 5, 2024
1 parent 728821b commit 9750e6c
Show file tree
Hide file tree
Showing 5 changed files with 240 additions and 107 deletions.
72 changes: 72 additions & 0 deletions faros-airbyte-common/src/common/bucketing.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import {createHmac} from 'crypto';
import VError from 'verror';

export interface BucketExecutionState {
__bucket_execution_state?: {
last_executed_bucket_id: number;
};
[key: string]: any;
}

export interface RoundRobinConfig {
round_robin_bucket_execution?: boolean;
bucket_id?: number;
bucket_total?: number;
[key: string]: any;
}

export function bucket(key: string, data: string, bucketTotal: number): number {
const md5 = createHmac('md5', key);
md5.update(data);
const hex = md5.digest('hex').substring(0, 8);
return (parseInt(hex, 16) % bucketTotal) + 1; // 1-index for readability
}

export function validateBucketingConfig(
bucketId: number = 1,
bucketTotal: number = 1
): void {
if (bucketTotal < 1) {
throw new VError('bucket_total must be a positive integer');
}
if (bucketId < 1 || bucketId > bucketTotal) {
throw new VError(`bucket_id must be between 1 and ${bucketTotal}`);
}
}

export function nextBucketId(
config: RoundRobinConfig,
state?: BucketExecutionState
): number {
const bucketTotal = config.bucket_total ?? 1;
const lastExecutedBucketId =
state?.__bucket_execution_state?.last_executed_bucket_id ?? bucketTotal;

return (lastExecutedBucketId % bucketTotal) + 1;
}

export function applyRoundRobinBucketing(
config: RoundRobinConfig,
state?: BucketExecutionState,
logger?: (message: string) => void
): {config: RoundRobinConfig; state: BucketExecutionState} {
if (!config.round_robin_bucket_execution) {
return {config, state};
}

const next = nextBucketId(config, state);
logger?.(`Using round robin bucket execution. Bucket id: ${next}`);

return {
config: {
...config,
bucket_id: next,
},
state: {
...state,
__bucket_execution_state: {
last_executed_bucket_id: next,
},
},
};
}
38 changes: 7 additions & 31 deletions faros-airbyte-common/src/common/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import {createHmac} from 'crypto';
import {FarosClient, paginatedQueryV2} from 'faros-js-client';
import fs from 'fs';
import {toLower} from 'lodash';
import {DateTime} from 'luxon';
import path from 'path';
import {VError} from 'verror';

export {
bucket,
validateBucketingConfig,
nextBucketId,
applyRoundRobinBucketing,
} from './bucketing';

// TODO: Try https://www.npmjs.com/package/diff
export interface FileDiff {
deletions: number;
Expand All @@ -16,36 +22,6 @@ export interface FileDiff {
new?: boolean;
}

export function bucket(key: string, data: string, bucketTotal: number): number {
const md5 = createHmac('md5', key);
md5.update(data);
const hex = md5.digest('hex').substring(0, 8);
return (parseInt(hex, 16) % bucketTotal) + 1; // 1-index for readability
}

export function validateBucketingConfig(
bucketId: number = 1,
bucketTotal: number = 1
): void {
if (bucketTotal < 1) {
throw new VError('bucket_total must be a positive integer');
}
if (bucketId < 1 || bucketId > bucketTotal) {
throw new VError(`bucket_id must be between 1 and ${bucketTotal}`);
}
}

export function nextBucketId(
config: {bucket_total?: number},
state?: {__bucket_execution_state?: {last_executed_bucket_id?: number}}
): number {
const bucketTotal = config.bucket_total ?? 1;
const lastExecutedBucketId =
state?.__bucket_execution_state?.last_executed_bucket_id ?? bucketTotal;

return (lastExecutedBucketId % bucketTotal) + 1;
}

export function normalizeString(str: string): string {
return str.replace(/\s/g, '').toLowerCase();
}
Expand Down
148 changes: 148 additions & 0 deletions faros-airbyte-common/test/common/bucketing.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import {
applyRoundRobinBucketing,
bucket,
nextBucketId,
validateBucketingConfig,
} from '../../src/common';

describe('validateBucketingConfig', () => {
test('should not throw for valid config', () => {
expect(() => validateBucketingConfig(1, 1)).not.toThrow();
expect(() => validateBucketingConfig(2, 3)).not.toThrow();
expect(() => validateBucketingConfig(5, 5)).not.toThrow();
});

test('should throw for invalid bucket_total', () => {
expect(() => validateBucketingConfig(1, 0)).toThrow(
'bucket_total must be a positive integer'
);
expect(() => validateBucketingConfig(1, -1)).toThrow(
'bucket_total must be a positive integer'
);
});

test('should throw for invalid bucket_id', () => {
expect(() => validateBucketingConfig(0, 5)).toThrow(
'bucket_id must be between 1 and 5'
);
expect(() => validateBucketingConfig(6, 5)).toThrow(
'bucket_id must be between 1 and 5'
);
});
});

describe('getNextBucketId', () => {
test('should return 1 when bucket_total is 1', () => {
const config = {bucket_total: 1};
const state = undefined;
expect(nextBucketId(config, state)).toBe(1);
});

test('should return next bucket id when last_executed_bucket_id is provided', () => {
const config = {bucket_total: 3};
expect(
nextBucketId(config, {
__bucket_execution_state: {last_executed_bucket_id: 1},
})
).toBe(2);
expect(
nextBucketId(config, {
__bucket_execution_state: {last_executed_bucket_id: 2},
})
).toBe(3);
expect(
nextBucketId(config, {
__bucket_execution_state: {last_executed_bucket_id: 3},
})
).toBe(1);
});

test('should wrap around to 1 when reaching the last bucket', () => {
const config = {bucket_total: 3};
const state = {__bucket_execution_state: {last_executed_bucket_id: 3}};
expect(nextBucketId(config, state)).toBe(1);
});

test('should use default bucket_total of 1 when not provided', () => {
const config = {};
const state = undefined;
expect(nextBucketId(config, state)).toBe(1);
});

test('should use bucket_total as last_executed_bucket_id when state is undefined', () => {
const config = {bucket_total: 5};
const state = undefined;
expect(nextBucketId(config, state)).toBe(1);
});
});

describe('bucket', () => {
test('should return value within bucket range', () => {
const key = 'test-key';
const data = 'test-data';
const bucketTotal = 5;

const result = bucket(key, data, bucketTotal);

expect(result).toBeGreaterThanOrEqual(1);
expect(result).toBeLessThanOrEqual(bucketTotal);
});
});

describe('applyRoundRobinBucketing', () => {
test('should return unchanged config and state when round robin is disabled', () => {
const config = {round_robin_bucket_execution: false, another_field: 'test'};
const state = {someField: 'test-value'};

const result = applyRoundRobinBucketing(config, state);

expect(result.config).toEqual(config);
expect(result.state).toEqual(state);
});

test('should update config and state with next bucket when round robin is enabled', () => {
const config = {
round_robin_bucket_execution: true,
bucket_total: 3,
};
const state = {
__bucket_execution_state: {
last_executed_bucket_id: 2,
},
};

const result = applyRoundRobinBucketing(config, state);

expect(result.config.bucket_id).toBe(3);
expect(result.state.__bucket_execution_state.last_executed_bucket_id).toBe(
3
);
});

test('should start from bucket 1 when state is empty', () => {
const config = {
round_robin_bucket_execution: true,
bucket_total: 3,
};

const result = applyRoundRobinBucketing(config, {});

expect(result.config.bucket_id).toBe(1);
expect(result.state.__bucket_execution_state.last_executed_bucket_id).toBe(
1
);
});

test('should preserve other config properties', () => {
const config = {
round_robin_bucket_execution: true,
bucket_total: 3,
other_prop: 'value',
};

const result = applyRoundRobinBucketing(config, undefined);

expect(result.config.other_prop).toBe('value');
expect(result.config.round_robin_bucket_execution).toBe(true);
});
});
51 changes: 1 addition & 50 deletions faros-airbyte-common/test/common/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
import {
calculateDateRange,
collectReposByOrg,
nextBucketId,
} from '../../src/common';
import {calculateDateRange, collectReposByOrg} from '../../src/common';

describe('calculateDateRange', () => {
const logger = jest.fn();
Expand Down Expand Up @@ -135,48 +131,3 @@ describe('collectReposByOrg', () => {
expect(reposByOrg).toMatchSnapshot();
});
});

describe('getNextBucketId', () => {
test('should return 1 when bucket_total is 1', () => {
const config = {bucket_total: 1};
const state = undefined;
expect(nextBucketId(config, state)).toBe(1);
});

test('should return next bucket id when last_executed_bucket_id is provided', () => {
const config = {bucket_total: 3};
expect(
nextBucketId(config, {
__bucket_execution_state: {last_executed_bucket_id: 1},
})
).toBe(2);
expect(
nextBucketId(config, {
__bucket_execution_state: {last_executed_bucket_id: 2},
})
).toBe(3);
expect(
nextBucketId(config, {
__bucket_execution_state: {last_executed_bucket_id: 3},
})
).toBe(1);
});

test('should wrap around to 1 when reaching the last bucket', () => {
const config = {bucket_total: 3};
const state = {__bucket_execution_state: {last_executed_bucket_id: 3}};
expect(nextBucketId(config, state)).toBe(1);
});

test('should use default bucket_total of 1 when not provided', () => {
const config = {};
const state = undefined;
expect(nextBucketId(config, state)).toBe(1);
});

test('should use bucket_total as last_executed_bucket_id when state is undefined', () => {
const config = {bucket_total: 5};
const state = undefined;
expect(nextBucketId(config, state)).toBe(1);
});
});
38 changes: 12 additions & 26 deletions sources/github-source/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import {
AirbyteState,
AirbyteStreamBase,
} from 'faros-airbyte-cdk';
import {calculateDateRange, nextBucketId} from 'faros-airbyte-common/common';
import {
applyRoundRobinBucketing,
calculateDateRange,
} from 'faros-airbyte-common/common';
import {FarosClient} from 'faros-js-client';
import VError from 'verror';

Expand Down Expand Up @@ -154,36 +157,19 @@ export class GitHubSource extends AirbyteSourceBase<GitHubConfig> {
logger: this.logger.info.bind(this.logger),
});

if (config.round_robin_bucket_execution) {
const next = nextBucketId(config, state);
this.logger.info(
`Using round robin bucket execution. Bucket id: ${next}`
);
return {
config: {
...config,
startDate,
endDate,
bucket_id: next,
},
catalog: {streams},
state: {
...state,
__bucket_execution_state: {
last_executed_bucket_id: next,
},
},
};
}

const {config: newConfig, state: newState} = applyRoundRobinBucketing(
config,
state,
this.logger.info.bind(this.logger)
);
return {
config: {
...config,
...newConfig,
startDate,
endDate,
},
} as GitHubConfig,
catalog: {streams},
state,
state: newState,
};
}
}

0 comments on commit 9750e6c

Please sign in to comment.