Skip to content

Commit

Permalink
AIP-81 Add Overwrite for Bulk Insert Connection API (apache#45396)
Browse files Browse the repository at this point in the history
  • Loading branch information
jason810496 authored Jan 4, 2025
1 parent 3a9a032 commit c81610b
Show file tree
Hide file tree
Showing 9 changed files with 384 additions and 73 deletions.
1 change: 1 addition & 0 deletions airflow/api_fastapi/core_api/datamodels/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,4 @@ class ConnectionBulkBody(BaseModel):
"""Connections Serializer for requests body."""

connections: list[ConnectionBody]
overwrite: bool | None = Field(default=False)
22 changes: 17 additions & 5 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1738,21 +1738,21 @@ paths:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/connections/bulk:
post:
put:
tags:
- Connection
summary: Post Connections
summary: Put Connections
description: Create connection entry.
operationId: post_connections
operationId: put_connections
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/ConnectionBulkBody'
required: true
responses:
'201':
description: Successful Response
'200':
description: Created with overwrite
content:
application/json:
schema:
Expand All @@ -1775,6 +1775,12 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
'201':
description: Created
content:
application/json:
schema:
$ref: '#/components/schemas/ConnectionCollectionResponse'
'422':
description: Validation Error
content:
Expand Down Expand Up @@ -6761,6 +6767,12 @@ components:
$ref: '#/components/schemas/ConnectionBody'
type: array
title: Connections
overwrite:
anyOf:
- type: boolean
- type: 'null'
title: Overwrite
default: false
type: object
required:
- connections
Expand Down
44 changes: 37 additions & 7 deletions airflow/api_fastapi/core_api/routes/public/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import os
from typing import Annotated, cast

from fastapi import Depends, HTTPException, Query, status
from fastapi import Depends, HTTPException, Query, Response, status
from fastapi.exceptions import RequestValidationError
from pydantic import ValidationError
from sqlalchemy import select
Expand Down Expand Up @@ -135,18 +135,48 @@ def post_connection(
return connection


@connections_router.post(
@connections_router.put(
"/bulk",
status_code=status.HTTP_201_CREATED,
responses=create_openapi_http_exception_doc([status.HTTP_409_CONFLICT]),
responses={
**create_openapi_http_exception_doc([status.HTTP_409_CONFLICT]),
status.HTTP_201_CREATED: {
"description": "Created",
"model": ConnectionCollectionResponse,
},
status.HTTP_200_OK: {
"description": "Created with overwrite",
"model": ConnectionCollectionResponse,
},
},
)
def post_connections(
def put_connections(
response: Response,
post_body: ConnectionBulkBody,
session: SessionDep,
) -> ConnectionCollectionResponse:
"""Create connection entry."""
connections = [Connection(**body.model_dump(by_alias=True)) for body in post_body.connections]
session.add_all(connections)
response.status_code = status.HTTP_201_CREATED if not post_body.overwrite else status.HTTP_200_OK
connections: list[Connection]
if not post_body.overwrite:
connections = [Connection(**body.model_dump(by_alias=True)) for body in post_body.connections]
session.add_all(connections)
else:
connection_ids = [conn.connection_id for conn in post_body.connections]
existed_connections = session.execute(
select(Connection).filter(Connection.conn_id.in_(connection_ids))
).scalars()
existed_connections_dict = {conn.conn_id: conn for conn in existed_connections}
connections = []
# if conn_id exists, update the corresponding connection, else add a new connection
for body in post_body.connections:
if body.connection_id in existed_connections_dict:
connection = existed_connections_dict[body.connection_id]
for key, val in body.model_dump(by_alias=True).items():
setattr(connection, key, val)
connections.append(connection)
else:
connections.append(Connection(**body.model_dump(by_alias=True)))
session.add_all(connections)
return ConnectionCollectionResponse(
connections=cast(list[ConnectionResponse], connections),
total_entries=len(connections),
Expand Down
6 changes: 3 additions & 3 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1599,9 +1599,6 @@ export type BackfillServiceCreateBackfillMutationResult = Awaited<
export type ConnectionServicePostConnectionMutationResult = Awaited<
ReturnType<typeof ConnectionService.postConnection>
>;
export type ConnectionServicePostConnectionsMutationResult = Awaited<
ReturnType<typeof ConnectionService.postConnections>
>;
export type ConnectionServiceTestConnectionMutationResult = Awaited<
ReturnType<typeof ConnectionService.testConnection>
>;
Expand Down Expand Up @@ -1635,6 +1632,9 @@ export type BackfillServiceUnpauseBackfillMutationResult = Awaited<
export type BackfillServiceCancelBackfillMutationResult = Awaited<
ReturnType<typeof BackfillService.cancelBackfill>
>;
export type ConnectionServicePutConnectionsMutationResult = Awaited<
ReturnType<typeof ConnectionService.putConnections>
>;
export type DagParsingServiceReparseDagFileMutationResult = Awaited<
ReturnType<typeof DagParsingService.reparseDagFile>
>;
Expand Down
75 changes: 38 additions & 37 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2747,43 +2747,6 @@ export const useConnectionServicePostConnection = <
ConnectionService.postConnection({ requestBody }) as unknown as Promise<TData>,
...options,
});
/**
* Post Connections
* Create connection entry.
* @param data The data for the request.
* @param data.requestBody
* @returns ConnectionCollectionResponse Successful Response
* @throws ApiError
*/
export const useConnectionServicePostConnections = <
TData = Common.ConnectionServicePostConnectionsMutationResult,
TError = unknown,
TContext = unknown,
>(
options?: Omit<
UseMutationOptions<
TData,
TError,
{
requestBody: ConnectionBulkBody;
},
TContext
>,
"mutationFn"
>,
) =>
useMutation<
TData,
TError,
{
requestBody: ConnectionBulkBody;
},
TContext
>({
mutationFn: ({ requestBody }) =>
ConnectionService.postConnections({ requestBody }) as unknown as Promise<TData>,
...options,
});
/**
* Test Connection
* Test an API connection.
Expand Down Expand Up @@ -3291,6 +3254,44 @@ export const useBackfillServiceCancelBackfill = <
BackfillService.cancelBackfill({ backfillId }) as unknown as Promise<TData>,
...options,
});
/**
* Put Connections
* Create connection entry.
* @param data The data for the request.
* @param data.requestBody
* @returns ConnectionCollectionResponse Created with overwrite
* @returns ConnectionCollectionResponse Created
* @throws ApiError
*/
export const useConnectionServicePutConnections = <
TData = Common.ConnectionServicePutConnectionsMutationResult,
TError = unknown,
TContext = unknown,
>(
options?: Omit<
UseMutationOptions<
TData,
TError,
{
requestBody: ConnectionBulkBody;
},
TContext
>,
"mutationFn"
>,
) =>
useMutation<
TData,
TError,
{
requestBody: ConnectionBulkBody;
},
TContext
>({
mutationFn: ({ requestBody }) =>
ConnectionService.putConnections({ requestBody }) as unknown as Promise<TData>,
...options,
});
/**
* Reparse Dag File
* Request re-parsing a DAG file.
Expand Down
12 changes: 12 additions & 0 deletions airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,18 @@ export const $ConnectionBulkBody = {
type: "array",
title: "Connections",
},
overwrite: {
anyOf: [
{
type: "boolean",
},
{
type: "null",
},
],
title: "Overwrite",
default: false,
},
},
type: "object",
required: ["connections"],
Expand Down
13 changes: 7 additions & 6 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ import type {
GetConnectionsResponse,
PostConnectionData,
PostConnectionResponse,
PostConnectionsData,
PostConnectionsResponse,
PutConnectionsData,
PutConnectionsResponse,
TestConnectionData,
TestConnectionResponse,
GetDagRunData,
Expand Down Expand Up @@ -1108,16 +1108,17 @@ export class ConnectionService {
}

/**
* Post Connections
* Put Connections
* Create connection entry.
* @param data The data for the request.
* @param data.requestBody
* @returns ConnectionCollectionResponse Successful Response
* @returns ConnectionCollectionResponse Created with overwrite
* @returns ConnectionCollectionResponse Created
* @throws ApiError
*/
public static postConnections(data: PostConnectionsData): CancelablePromise<PostConnectionsResponse> {
public static putConnections(data: PutConnectionsData): CancelablePromise<PutConnectionsResponse> {
return __request(OpenAPI, {
method: "POST",
method: "PUT",
url: "/public/connections/bulk",
body: data.requestBody,
mediaType: "application/json",
Expand Down
15 changes: 10 additions & 5 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ export type ConnectionBody = {
*/
export type ConnectionBulkBody = {
connections: Array<ConnectionBody>;
overwrite?: boolean | null;
};

/**
Expand Down Expand Up @@ -1617,11 +1618,11 @@ export type PostConnectionData = {

export type PostConnectionResponse = ConnectionResponse;

export type PostConnectionsData = {
export type PutConnectionsData = {
requestBody: ConnectionBulkBody;
};

export type PostConnectionsResponse = ConnectionCollectionResponse;
export type PutConnectionsResponse = ConnectionCollectionResponse;

export type TestConnectionData = {
requestBody: ConnectionBody;
Expand Down Expand Up @@ -2989,11 +2990,15 @@ export type $OpenApiTs = {
};
};
"/public/connections/bulk": {
post: {
req: PostConnectionsData;
put: {
req: PutConnectionsData;
res: {
/**
* Successful Response
* Created with overwrite
*/
200: ConnectionCollectionResponse;
/**
* Created
*/
201: ConnectionCollectionResponse;
/**
Expand Down
Loading

0 comments on commit c81610b

Please sign in to comment.