Skip to content

Commit

Permalink
FAI-14238 circleci bucketing + clean ups (#1836)
Browse files Browse the repository at this point in the history
  • Loading branch information
ypc-faros authored Dec 5, 2024
1 parent 5a4a0a9 commit 7f59d44
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 107 deletions.
1 change: 1 addition & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sources/circleci-source/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"axios": "^1.7.4",
"commander": "^9.3.0",
"faros-airbyte-cdk": "*",
"faros-airbyte-common": "*",
"faros-js-client": "^0.5.2",
"glob-to-regexp": "0.4.1",
"typescript-memoize": "^1.1.0",
Expand Down
50 changes: 43 additions & 7 deletions sources/circleci-source/resources/spec.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,49 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "CircleCI Spec",
"type": "object",
"required": ["token", "project_slugs"],
"required": [
"token",
"project_slugs"
],
"additionalProperties": true,
"properties": {
"token": {
"order": 1,
"type": "string",
"title": "token",
"description": "CircleCI personal API token. See https://circleci.com/docs/2.0/managing-api-tokens/#creating-a-personal-api-token",
"airbyte_secret": true
},
"url": {
"order": 2,
"type": "string",
"title": "API URL",
"default": "https://circleci.com/api/v2",
"description": "CircleCI API URL"
},
"reject_unauthorized": {
"order": 3,
"type": "boolean",
"title": "Enforce Authorized Requests",
"default": true,
"description": "Enable certificate validation for the CircleCI server"
},
"request_timeout": {
"order": 4,
"type": "integer",
"title": "Request Timeout",
"description": "The max time in milliseconds to wait for a request to CircleCI complete (0 - no timeout).",
"default": 120000
},
"max_retries": {
"order": 5,
"type": "integer",
"title": "Max Number of Retries",
"description": "The max number of retries before giving up on retrying requests to CircleCI API",
"default": 3
},
"project_slugs": {
"order": 6,
"type": "array",
"items": {
"type": "string"
Expand All @@ -52,6 +61,7 @@
]
},
"project_block_list": {
"order": 7,
"type": "array",
"items": {
"type": "string"
Expand All @@ -66,34 +76,60 @@
]
},
"pull_blocklist_from_graph": {
"order": 8,
"type": "boolean",
"title": "Pull Projects Blocklist from Faros Graph",
"default": false,
"description": "Should pull projects blocklist from Faros"
},
"cutoff_days": {
"type": "integer",
"title": "Cutoff Days",
"default": 90,
"description": "Only fetch data updated after cutoff"
},
"faros_api_url": {
"order": 9,
"type": "string",
"title": "Faros API URL",
"default": "https://prod.api.faros.ai",
"description": "Faros API URL"
},
"faros_api_key": {
"order": 10,
"type": "string",
"title": "Faros API Key",
"description": "Faros API Key",
"airbyte_secret": true
},
"faros_graph_name": {
"order": 11,
"type": "string",
"title": "Faros Graph",
"description": "Faros Graph Name",
"default": "default"
},
"bucket_id": {
"order": 12,
"type": "integer",
"title": "Bucket Number",
"description": "Bucket number for this source to determine which portion of projects to pull. Use it when distributing the load between multiple sources.",
"default": 1
},
"bucket_total": {
"order": 13,
"type": "integer",
"title": "Total Number of Buckets",
"description": "Total number of buckets to distribute projects across. Use it when distributing the load between multiple sources",
"default": 1
},
"round_robin_bucket_execution": {
"order": 14,
"type": "boolean",
"title": "Round Robin Bucket Execution",
"description": "When enabled, syncs rotate through all buckets, processing one bucket per sync. When disabled, only the bucket specified by 'bucket_id' is synced.",
"default": false
},
"cutoff_days": {
"order": 15,
"type": "integer",
"title": "Cutoff Days",
"default": 90,
"description": "Only fetch data updated after cutoff"
}
}
}
Expand Down
41 changes: 31 additions & 10 deletions sources/circleci-source/src/circleci/circleci.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import axios, {
AxiosResponse,
} from 'axios';
import {AirbyteLogger, wrapApiError} from 'faros-airbyte-cdk';
import {bucket, validateBucketingConfig} from 'faros-airbyte-common/common';
import https from 'https';
import {maxBy, toLower} from 'lodash';
import {Memoize} from 'typescript-memoize';
Expand All @@ -16,7 +17,8 @@ const DEFAULT_API_URL = 'https://circleci.com/api/v2';
const DEFAULT_MAX_RETRIES = 3;
const DEFAULT_CUTOFF_DAYS = 90;
const DEFAULT_REQUEST_TIMEOUT = 120000;

export const DEFAULT_BUCKET_ID = 1;
export const DEFAULT_BUCKET_TOTAL = 1;
export interface CircleCIConfig {
readonly token: string;
readonly url?: string;
Expand All @@ -30,22 +32,35 @@ export interface CircleCIConfig {
readonly cutoff_days?: number;
readonly request_timeout?: number;
readonly max_retries?: number;
readonly bucket_id?: number;
readonly bucket_total?: number;
readonly round_robin_bucket_execution?: boolean;
}

export class CircleCI {
private static circleCI: CircleCI = undefined;
private readonly cutoffDays: number;
private readonly maxRetries: number;
private readonly bucketId: number;
private readonly bucketTotal: number;

constructor(
config: CircleCIConfig,
private readonly logger: AirbyteLogger,
readonly v1: AxiosInstance,
readonly v2: AxiosInstance,
readonly cutoffDays: number,
private readonly maxRetries: number
) {}
private readonly v1: AxiosInstance,
private readonly v2: AxiosInstance
) {
this.cutoffDays = config.cutoff_days ?? DEFAULT_CUTOFF_DAYS;
this.maxRetries = config.max_retries ?? DEFAULT_MAX_RETRIES;
this.bucketId = config.bucket_id ?? DEFAULT_BUCKET_ID;
this.bucketTotal = config.bucket_total ?? DEFAULT_BUCKET_TOTAL;
}

static instance(config: CircleCIConfig, logger: AirbyteLogger): CircleCI {
if (CircleCI.circleCI) return CircleCI.circleCI;

validateBucketingConfig(config.bucket_id, config.bucket_total);

if (!config.token) {
throw new VError('No token provided');
}
Expand All @@ -70,17 +85,16 @@ export class CircleCI {
}
}

const cutoffDays = config.cutoff_days ?? DEFAULT_CUTOFF_DAYS;
const axios_v1_instance = this.getAxiosInstance(config, logger, 'v1.1');
const axios_v2_instance = this.getAxiosInstance(config, logger, 'v2');

CircleCI.circleCI = new CircleCI(
config,
logger,
axios_v1_instance,
axios_v2_instance,
cutoffDays,
config.max_retries ?? DEFAULT_MAX_RETRIES
axios_v2_instance
);

return CircleCI.circleCI;
}

Expand Down Expand Up @@ -389,6 +403,13 @@ export class CircleCI {
(item: any) => item
);
}

isProjectInBucket(project: string): boolean {
return (
bucket('farosai/airbyte-circleci-source', project, this.bucketTotal) ===
this.bucketId
);
}
}

// CircleCI API returns 404 if no workflows or jobs exist for a given pipeline
Expand Down
9 changes: 8 additions & 1 deletion sources/circleci-source/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
AirbyteState,
AirbyteStreamBase,
} from 'faros-airbyte-cdk';
import {applyRoundRobinBucketing} from 'faros-airbyte-common/common';
import GlobToRegExp from 'glob-to-regexp';
import {toLower} from 'lodash';
import VError from 'verror';
Expand Down Expand Up @@ -105,7 +106,13 @@ export class CircleCISource extends AirbyteSourceBase<CircleCIConfig> {
`Will sync ${config.project_slugs.length} project slugs: ${config.project_slugs}`
);

return {config, catalog, state};
const {config: newConfig, state: newState} = applyRoundRobinBucketing(
config,
state,
this.logger.info.bind(this.logger)
);

return {config: newConfig as CircleCIConfig, catalog, state: newState};
}

streams(config: CircleCIConfig): AirbyteStreamBase[] {
Expand Down
10 changes: 9 additions & 1 deletion sources/circleci-source/src/streams/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,22 @@ import {AirbyteLogger, AirbyteStreamBase} from 'faros-airbyte-cdk';

import {CircleCI, CircleCIConfig} from '../circleci/circleci';

export abstract class CircleCIStreamBase extends AirbyteStreamBase {
export abstract class StreamWithProjectSlices extends AirbyteStreamBase {
constructor(
protected readonly circleCI: CircleCI,
protected readonly cfg: CircleCIConfig,
protected readonly logger: AirbyteLogger
) {
super(logger);
}

async *streamSlices(): AsyncGenerator<StreamSlice> {
for (const projectSlug of this.cfg.project_slugs) {
if (this.circleCI.isProjectInBucket(projectSlug)) {
yield {projectSlug};
}
}
}
}

export type StreamSlice = {
Expand Down
10 changes: 2 additions & 8 deletions sources/circleci-source/src/streams/pipelines.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ import {SyncMode} from 'faros-airbyte-cdk';
import {Dictionary} from 'ts-essentials';

import {Pipeline} from '../circleci/types';
import {CircleCIStreamBase, StreamSlice} from './common';
import {StreamSlice, StreamWithProjectSlices} from './common';

type PipelineState = Dictionary<{lastUpdatedAt?: string}>;

export class Pipelines extends CircleCIStreamBase {
export class Pipelines extends StreamWithProjectSlices {
getJsonSchema(): Dictionary<any, string> {
return require('../../resources/schemas/pipelines.json');
}
Expand All @@ -19,12 +19,6 @@ export class Pipelines extends CircleCIStreamBase {
return ['computedProperties', 'updatedAt'];
}

async *streamSlices(): AsyncGenerator<StreamSlice> {
for (const projectSlug of this.cfg.project_slugs) {
yield {projectSlug};
}
}

async *readRecords(
syncMode: SyncMode,
cursorField?: string[],
Expand Down
10 changes: 2 additions & 8 deletions sources/circleci-source/src/streams/projects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ import {SyncMode} from 'faros-airbyte-cdk';
import {Dictionary} from 'ts-essentials';

import {Project} from '../circleci/types';
import {CircleCIStreamBase, StreamSlice} from './common';
import {StreamSlice, StreamWithProjectSlices} from './common';

export class Projects extends CircleCIStreamBase {
export class Projects extends StreamWithProjectSlices {
getJsonSchema(): Dictionary<any, string> {
return require('../../resources/schemas/projects.json');
}
Expand All @@ -13,12 +13,6 @@ export class Projects extends CircleCIStreamBase {
return 'id';
}

async *streamSlices(): AsyncGenerator<StreamSlice> {
for (const projectSlug of this.cfg.project_slugs) {
yield {projectSlug};
}
}

async *readRecords(
syncMode: SyncMode,
cursorField?: string[],
Expand Down
4 changes: 2 additions & 2 deletions sources/circleci-source/src/streams/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ import {SyncMode} from 'faros-airbyte-cdk';
import {Dictionary} from 'ts-essentials';

import {TestMetadata} from '../circleci/types';
import {CircleCIStreamBase, StreamSlice} from './common';
import {StreamSlice, StreamWithProjectSlices} from './common';

type TestsState = Dictionary<{lastUpdatedAt?: string}>;

export class Tests extends CircleCIStreamBase {
export class Tests extends StreamWithProjectSlices {
getJsonSchema(): Dictionary<any, string> {
return require('../../resources/schemas/tests.json');
}
Expand Down
Loading

0 comments on commit 7f59d44

Please sign in to comment.