Skip to content

Commit

Permalink
Allow fetching XCom with forward slash from the API and escape it in …
Browse files Browse the repository at this point in the history
…the UI (apache#45134)
  • Loading branch information
shahar1 authored Dec 21, 2024
1 parent ad9bbf9 commit 9316ed6
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 12 deletions.
1 change: 1 addition & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5528,6 +5528,7 @@ components:
name: xcom_key
schema:
type: string
format: path
required: true
description: The XCom key.

Expand Down
18 changes: 10 additions & 8 deletions airflow/www/static/js/api/useTaskXcom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,16 @@ export const useTaskXcomEntry = ({
}: TaskXcomProps) =>
useQuery(
["taskXcom", dagId, dagRunId, taskId, mapIndex, xcomKey, tryNumber],
() =>
axios.get<AxiosResponse, API.XCom>(
getMetaValue("task_xcom_entry_api")
.replace("_DAG_RUN_ID_", dagRunId)
.replace("_TASK_ID_", taskId)
.replace("_XCOM_KEY_", xcomKey),
{ params: { map_index: mapIndex, stringify: false } }
),
() => {
const taskXcomEntryApiUrl = getMetaValue("task_xcom_entry_api")
.replace("_DAG_RUN_ID_", dagRunId)
.replace("_TASK_ID_", taskId)
.replace("_XCOM_KEY_", encodeURIComponent(xcomKey));

return axios.get<AxiosResponse, API.XCom>(taskXcomEntryApiUrl, {
params: { map_index: mapIndex, stringify: false },
});
},
{
enabled: !!xcomKey,
}
Expand Down
1 change: 1 addition & 0 deletions newsfragments/45134.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
(v2 API & UI) Allow fetching XCom with forward slash from the API and escape it in the UI
23 changes: 19 additions & 4 deletions tests/api_connexion/endpoints/test_xcom_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,52 +226,67 @@ def _create_xcom_entry(
)

@pytest.mark.parametrize(
"allowed, query, expected_status_or_value",
"allowed, query, expected_status_or_value, key",
[
pytest.param(
True,
"?deserialize=true",
"real deserialized TEST_VALUE",
"key",
id="true",
),
pytest.param(
False,
"?deserialize=true",
400,
"key",
id="disallowed",
),
pytest.param(
True,
"?deserialize=false",
"orm deserialized TEST_VALUE",
"key",
id="false-irrelevant",
),
pytest.param(
False,
"?deserialize=false",
"orm deserialized TEST_VALUE",
"key",
id="false",
),
pytest.param(
True,
"",
"orm deserialized TEST_VALUE",
"key",
id="default-irrelevant",
),
pytest.param(
False,
"",
"orm deserialized TEST_VALUE",
"key",
id="default",
),
pytest.param(
False,
"",
"orm deserialized TEST_VALUE",
"key/with/slashes",
id="key-with-slashes",
),
],
)
@conf_vars({("core", "xcom_backend"): "tests.api_connexion.endpoints.test_xcom_endpoint.CustomXCom"})
def test_custom_xcom_deserialize(self, allowed: bool, query: str, expected_status_or_value: int | str):
def test_custom_xcom_deserialize(
self, allowed: bool, query: str, expected_status_or_value: int | str, key: str
):
XCom = resolve_xcom_backend()
self._create_xcom_entry("dag", "run", utcnow(), "task", "key", backend=XCom)
self._create_xcom_entry("dag", "run", utcnow(), "task", key, backend=XCom)

url = f"/api/v1/dags/dag/dagRuns/run/taskInstances/task/xcomEntries/key{query}"
url = f"/api/v1/dags/dag/dagRuns/run/taskInstances/task/xcomEntries/{key}{query}"
with mock.patch("airflow.api_connexion.endpoints.xcom_endpoint.XCom", XCom):
with conf_vars({("api", "enable_xcom_deserialize_support"): str(allowed)}):
response = self.client.get(url, environ_overrides={"REMOTE_USER": "test"})
Expand Down

0 comments on commit 9316ed6

Please sign in to comment.