From 7f59d448a4cb3befb2889608eb4b253b4e85766d Mon Sep 17 00:00:00 2001 From: ypc-faros <99700024+ypc-faros@users.noreply.github.com> Date: Thu, 5 Dec 2024 17:10:56 -0500 Subject: [PATCH] FAI-14238 circleci bucketing + clean ups (#1836) --- package-lock.json | 1 + sources/circleci-source/package.json | 1 + sources/circleci-source/resources/spec.json | 50 ++++++-- .../circleci-source/src/circleci/circleci.ts | 41 +++++-- sources/circleci-source/src/index.ts | 9 +- sources/circleci-source/src/streams/common.ts | 10 +- .../circleci-source/src/streams/pipelines.ts | 10 +- .../circleci-source/src/streams/projects.ts | 10 +- sources/circleci-source/src/streams/tests.ts | 4 +- sources/circleci-source/test/index.test.ts | 116 +++++++----------- 10 files changed, 145 insertions(+), 107 deletions(-) diff --git a/package-lock.json b/package-lock.json index c3e3467b4..73177e35e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -19079,6 +19079,7 @@ "axios": "^1.7.4", "commander": "^9.3.0", "faros-airbyte-cdk": "*", + "faros-airbyte-common": "*", "faros-js-client": "^0.5.2", "glob-to-regexp": "0.4.1", "typescript-memoize": "^1.1.0", diff --git a/sources/circleci-source/package.json b/sources/circleci-source/package.json index 2f8003ad9..bbe1cf4ce 100644 --- a/sources/circleci-source/package.json +++ b/sources/circleci-source/package.json @@ -34,6 +34,7 @@ "axios": "^1.7.4", "commander": "^9.3.0", "faros-airbyte-cdk": "*", + "faros-airbyte-common": "*", "faros-js-client": "^0.5.2", "glob-to-regexp": "0.4.1", "typescript-memoize": "^1.1.0", diff --git a/sources/circleci-source/resources/spec.json b/sources/circleci-source/resources/spec.json index 0df328383..4a0bbd5b8 100644 --- a/sources/circleci-source/resources/spec.json +++ b/sources/circleci-source/resources/spec.json @@ -4,40 +4,49 @@ "$schema": "http://json-schema.org/draft-07/schema#", "title": "CircleCI Spec", "type": "object", - "required": ["token", "project_slugs"], + "required": [ + "token", + "project_slugs" + ], "additionalProperties": true, "properties": { "token": { + "order": 1, "type": "string", "title": "token", "description": "CircleCI personal API token. See https://circleci.com/docs/2.0/managing-api-tokens/#creating-a-personal-api-token", "airbyte_secret": true }, "url": { + "order": 2, "type": "string", "title": "API URL", "default": "https://circleci.com/api/v2", "description": "CircleCI API URL" }, "reject_unauthorized": { + "order": 3, "type": "boolean", "title": "Enforce Authorized Requests", "default": true, "description": "Enable certificate validation for the CircleCI server" }, "request_timeout": { + "order": 4, "type": "integer", "title": "Request Timeout", "description": "The max time in milliseconds to wait for a request to CircleCI complete (0 - no timeout).", "default": 120000 }, "max_retries": { + "order": 5, "type": "integer", "title": "Max Number of Retries", "description": "The max number of retries before giving up on retrying requests to CircleCI API", "default": 3 }, "project_slugs": { + "order": 6, "type": "array", "items": { "type": "string" @@ -52,6 +61,7 @@ ] }, "project_block_list": { + "order": 7, "type": "array", "items": { "type": "string" @@ -66,34 +76,60 @@ ] }, "pull_blocklist_from_graph": { + "order": 8, "type": "boolean", "title": "Pull Projects Blocklist from Faros Graph", "default": false, "description": "Should pull projects blocklist from Faros" }, - "cutoff_days": { - "type": "integer", - "title": "Cutoff Days", - "default": 90, - "description": "Only fetch data updated after cutoff" - }, "faros_api_url": { + "order": 9, "type": "string", "title": "Faros API URL", "default": "https://prod.api.faros.ai", "description": "Faros API URL" }, "faros_api_key": { + "order": 10, "type": "string", "title": "Faros API Key", "description": "Faros API Key", "airbyte_secret": true }, "faros_graph_name": { + "order": 11, "type": "string", "title": "Faros Graph", "description": "Faros Graph Name", "default": "default" + }, + "bucket_id": { + "order": 12, + "type": "integer", + "title": "Bucket Number", + "description": "Bucket number for this source to determine which portion of projects to pull. Use it when distributing the load between multiple sources.", + "default": 1 + }, + "bucket_total": { + "order": 13, + "type": "integer", + "title": "Total Number of Buckets", + "description": "Total number of buckets to distribute projects across. Use it when distributing the load between multiple sources", + "default": 1 + }, + "round_robin_bucket_execution": { + "order": 14, + "type": "boolean", + "title": "Round Robin Bucket Execution", + "description": "When enabled, syncs rotate through all buckets, processing one bucket per sync. When disabled, only the bucket specified by 'bucket_id' is synced.", + "default": false + }, + "cutoff_days": { + "order": 15, + "type": "integer", + "title": "Cutoff Days", + "default": 90, + "description": "Only fetch data updated after cutoff" } } } diff --git a/sources/circleci-source/src/circleci/circleci.ts b/sources/circleci-source/src/circleci/circleci.ts index cee517666..3dcb1271f 100644 --- a/sources/circleci-source/src/circleci/circleci.ts +++ b/sources/circleci-source/src/circleci/circleci.ts @@ -5,6 +5,7 @@ import axios, { AxiosResponse, } from 'axios'; import {AirbyteLogger, wrapApiError} from 'faros-airbyte-cdk'; +import {bucket, validateBucketingConfig} from 'faros-airbyte-common/common'; import https from 'https'; import {maxBy, toLower} from 'lodash'; import {Memoize} from 'typescript-memoize'; @@ -16,7 +17,8 @@ const DEFAULT_API_URL = 'https://circleci.com/api/v2'; const DEFAULT_MAX_RETRIES = 3; const DEFAULT_CUTOFF_DAYS = 90; const DEFAULT_REQUEST_TIMEOUT = 120000; - +export const DEFAULT_BUCKET_ID = 1; +export const DEFAULT_BUCKET_TOTAL = 1; export interface CircleCIConfig { readonly token: string; readonly url?: string; @@ -30,22 +32,35 @@ export interface CircleCIConfig { readonly cutoff_days?: number; readonly request_timeout?: number; readonly max_retries?: number; + readonly bucket_id?: number; + readonly bucket_total?: number; + readonly round_robin_bucket_execution?: boolean; } export class CircleCI { private static circleCI: CircleCI = undefined; + private readonly cutoffDays: number; + private readonly maxRetries: number; + private readonly bucketId: number; + private readonly bucketTotal: number; constructor( + config: CircleCIConfig, private readonly logger: AirbyteLogger, - readonly v1: AxiosInstance, - readonly v2: AxiosInstance, - readonly cutoffDays: number, - private readonly maxRetries: number - ) {} + private readonly v1: AxiosInstance, + private readonly v2: AxiosInstance + ) { + this.cutoffDays = config.cutoff_days ?? DEFAULT_CUTOFF_DAYS; + this.maxRetries = config.max_retries ?? DEFAULT_MAX_RETRIES; + this.bucketId = config.bucket_id ?? DEFAULT_BUCKET_ID; + this.bucketTotal = config.bucket_total ?? DEFAULT_BUCKET_TOTAL; + } static instance(config: CircleCIConfig, logger: AirbyteLogger): CircleCI { if (CircleCI.circleCI) return CircleCI.circleCI; + validateBucketingConfig(config.bucket_id, config.bucket_total); + if (!config.token) { throw new VError('No token provided'); } @@ -70,17 +85,16 @@ export class CircleCI { } } - const cutoffDays = config.cutoff_days ?? DEFAULT_CUTOFF_DAYS; const axios_v1_instance = this.getAxiosInstance(config, logger, 'v1.1'); const axios_v2_instance = this.getAxiosInstance(config, logger, 'v2'); CircleCI.circleCI = new CircleCI( + config, logger, axios_v1_instance, - axios_v2_instance, - cutoffDays, - config.max_retries ?? DEFAULT_MAX_RETRIES + axios_v2_instance ); + return CircleCI.circleCI; } @@ -389,6 +403,13 @@ export class CircleCI { (item: any) => item ); } + + isProjectInBucket(project: string): boolean { + return ( + bucket('farosai/airbyte-circleci-source', project, this.bucketTotal) === + this.bucketId + ); + } } // CircleCI API returns 404 if no workflows or jobs exist for a given pipeline diff --git a/sources/circleci-source/src/index.ts b/sources/circleci-source/src/index.ts index 01e4d9ab4..c91d4de1b 100644 --- a/sources/circleci-source/src/index.ts +++ b/sources/circleci-source/src/index.ts @@ -8,6 +8,7 @@ import { AirbyteState, AirbyteStreamBase, } from 'faros-airbyte-cdk'; +import {applyRoundRobinBucketing} from 'faros-airbyte-common/common'; import GlobToRegExp from 'glob-to-regexp'; import {toLower} from 'lodash'; import VError from 'verror'; @@ -105,7 +106,13 @@ export class CircleCISource extends AirbyteSourceBase { `Will sync ${config.project_slugs.length} project slugs: ${config.project_slugs}` ); - return {config, catalog, state}; + const {config: newConfig, state: newState} = applyRoundRobinBucketing( + config, + state, + this.logger.info.bind(this.logger) + ); + + return {config: newConfig as CircleCIConfig, catalog, state: newState}; } streams(config: CircleCIConfig): AirbyteStreamBase[] { diff --git a/sources/circleci-source/src/streams/common.ts b/sources/circleci-source/src/streams/common.ts index 90d65de56..fb46ad68b 100644 --- a/sources/circleci-source/src/streams/common.ts +++ b/sources/circleci-source/src/streams/common.ts @@ -2,7 +2,7 @@ import {AirbyteLogger, AirbyteStreamBase} from 'faros-airbyte-cdk'; import {CircleCI, CircleCIConfig} from '../circleci/circleci'; -export abstract class CircleCIStreamBase extends AirbyteStreamBase { +export abstract class StreamWithProjectSlices extends AirbyteStreamBase { constructor( protected readonly circleCI: CircleCI, protected readonly cfg: CircleCIConfig, @@ -10,6 +10,14 @@ export abstract class CircleCIStreamBase extends AirbyteStreamBase { ) { super(logger); } + + async *streamSlices(): AsyncGenerator { + for (const projectSlug of this.cfg.project_slugs) { + if (this.circleCI.isProjectInBucket(projectSlug)) { + yield {projectSlug}; + } + } + } } export type StreamSlice = { diff --git a/sources/circleci-source/src/streams/pipelines.ts b/sources/circleci-source/src/streams/pipelines.ts index a9b63caa9..05f34783a 100644 --- a/sources/circleci-source/src/streams/pipelines.ts +++ b/sources/circleci-source/src/streams/pipelines.ts @@ -2,11 +2,11 @@ import {SyncMode} from 'faros-airbyte-cdk'; import {Dictionary} from 'ts-essentials'; import {Pipeline} from '../circleci/types'; -import {CircleCIStreamBase, StreamSlice} from './common'; +import {StreamSlice, StreamWithProjectSlices} from './common'; type PipelineState = Dictionary<{lastUpdatedAt?: string}>; -export class Pipelines extends CircleCIStreamBase { +export class Pipelines extends StreamWithProjectSlices { getJsonSchema(): Dictionary { return require('../../resources/schemas/pipelines.json'); } @@ -19,12 +19,6 @@ export class Pipelines extends CircleCIStreamBase { return ['computedProperties', 'updatedAt']; } - async *streamSlices(): AsyncGenerator { - for (const projectSlug of this.cfg.project_slugs) { - yield {projectSlug}; - } - } - async *readRecords( syncMode: SyncMode, cursorField?: string[], diff --git a/sources/circleci-source/src/streams/projects.ts b/sources/circleci-source/src/streams/projects.ts index 73a1a06dc..81e3f40ae 100644 --- a/sources/circleci-source/src/streams/projects.ts +++ b/sources/circleci-source/src/streams/projects.ts @@ -2,9 +2,9 @@ import {SyncMode} from 'faros-airbyte-cdk'; import {Dictionary} from 'ts-essentials'; import {Project} from '../circleci/types'; -import {CircleCIStreamBase, StreamSlice} from './common'; +import {StreamSlice, StreamWithProjectSlices} from './common'; -export class Projects extends CircleCIStreamBase { +export class Projects extends StreamWithProjectSlices { getJsonSchema(): Dictionary { return require('../../resources/schemas/projects.json'); } @@ -13,12 +13,6 @@ export class Projects extends CircleCIStreamBase { return 'id'; } - async *streamSlices(): AsyncGenerator { - for (const projectSlug of this.cfg.project_slugs) { - yield {projectSlug}; - } - } - async *readRecords( syncMode: SyncMode, cursorField?: string[], diff --git a/sources/circleci-source/src/streams/tests.ts b/sources/circleci-source/src/streams/tests.ts index 14e1596fc..e3fd40884 100644 --- a/sources/circleci-source/src/streams/tests.ts +++ b/sources/circleci-source/src/streams/tests.ts @@ -2,11 +2,11 @@ import {SyncMode} from 'faros-airbyte-cdk'; import {Dictionary} from 'ts-essentials'; import {TestMetadata} from '../circleci/types'; -import {CircleCIStreamBase, StreamSlice} from './common'; +import {StreamSlice, StreamWithProjectSlices} from './common'; type TestsState = Dictionary<{lastUpdatedAt?: string}>; -export class Tests extends CircleCIStreamBase { +export class Tests extends StreamWithProjectSlices { getJsonSchema(): Dictionary { return require('../../resources/schemas/tests.json'); } diff --git a/sources/circleci-source/test/index.test.ts b/sources/circleci-source/test/index.test.ts index 6dbbd118c..40bdbaacd 100644 --- a/sources/circleci-source/test/index.test.ts +++ b/sources/circleci-source/test/index.test.ts @@ -43,7 +43,7 @@ describe('index', () => { 'gh/faros-test/test-project2', ], project_block_list: ['gh/faros-test/*'], - cutoff_days: 90, + cutoff_days: 9999, reject_unauthorized: true, }; @@ -56,15 +56,9 @@ describe('index', () => { test('check connection', async () => { CircleCI.instance = jest.fn().mockImplementation(() => { - return new CircleCI( - logger, - null, - { - get: jest.fn().mockResolvedValue({}), - } as unknown as AxiosInstance, - 10000, - 1 - ); + return new CircleCI(sourceConfig, logger, null, { + get: jest.fn().mockResolvedValue({}), + } as unknown as AxiosInstance); }); const source = new sut.CircleCISource(logger); @@ -76,7 +70,7 @@ describe('index', () => { test('check connection - incorrect config', async () => { CircleCI.instance = jest.fn().mockImplementation(() => { - return new CircleCI(logger, null, null, 10000, 1); + return new CircleCI(sourceConfig, logger, null, null); }); const source = new sut.CircleCISource(logger); const res = await source.checkConnection(sourceConfig); @@ -89,18 +83,12 @@ describe('index', () => { test('streams - projects, use full_refresh sync mode', async () => { const fnProjectsList = jest.fn(); CircleCI.instance = jest.fn().mockImplementation(() => { - return new CircleCI( - logger, - null, - { - get: fnProjectsList.mockResolvedValue({ - data: readTestResourceFile('projects.json'), - status: 200, - }), - } as any, - 10000, - 1 - ); + return new CircleCI(sourceConfig, logger, null, { + get: fnProjectsList.mockResolvedValue({ + data: readTestResourceFile('projects.json'), + status: 200, + }), + } as any); }); const source = new sut.CircleCISource(logger); const streams = source.streams(sourceConfig); @@ -137,7 +125,7 @@ describe('index', () => { status: 200, }), } as any; - const circleCI = new CircleCI(logger, null, v2, 10000, 1); + const circleCI = new CircleCI(sourceConfig, logger, null, v2); CircleCI.instance = jest.fn().mockReturnValue(circleCI); const source = new sut.CircleCISource(logger); const streams = source.streams(sourceConfig); @@ -162,43 +150,37 @@ describe('index', () => { test('streams - tests, use full_refresh sync mode', async () => { const fnTestsList = jest.fn(); CircleCI.instance = jest.fn().mockImplementation(() => { - return new CircleCI( - logger, - null, - { - get: fnTestsList - .mockResolvedValueOnce({ - data: { - items: readTestResourceFile('pipeline_input.json'), - next_page_token: null, - }, - status: 200, - }) - .mockResolvedValueOnce({ - data: { - items: readTestResourceFile('workflows_input.json'), - next_page_token: null, - }, - status: 200, - }) - .mockResolvedValueOnce({ - data: { - items: readTestResourceFile('jobs_input.json'), - next_page_token: null, - }, - status: 200, - }) - .mockResolvedValueOnce({ - data: { - items: readTestResourceFile('tests_input.json'), - next_page_token: null, - }, - status: 200, - }), - } as any, - 10000, - 1 - ); + return new CircleCI(sourceConfig, logger, null, { + get: fnTestsList + .mockResolvedValueOnce({ + data: { + items: readTestResourceFile('pipeline_input.json'), + next_page_token: null, + }, + status: 200, + }) + .mockResolvedValueOnce({ + data: { + items: readTestResourceFile('workflows_input.json'), + next_page_token: null, + }, + status: 200, + }) + .mockResolvedValueOnce({ + data: { + items: readTestResourceFile('jobs_input.json'), + next_page_token: null, + }, + status: 200, + }) + .mockResolvedValueOnce({ + data: { + items: readTestResourceFile('tests_input.json'), + next_page_token: null, + }, + status: 200, + }), + } as any); }); const source = new sut.CircleCISource(logger); const streams = source.streams(sourceConfig); @@ -218,15 +200,9 @@ describe('index', () => { test('filter projects', () => { CircleCI.instance = jest.fn().mockImplementation(() => { - return new CircleCI( - logger, - null, - { - get: jest.fn().mockResolvedValue({}), - } as unknown as AxiosInstance, - 10000, - 1 - ); + return new CircleCI(sourceConfig, logger, null, { + get: jest.fn().mockResolvedValue({}), + } as unknown as AxiosInstance); }); const allProjectSlugs = [