Skip to content

Commit

Permalink
feat: support lists in 'resolve_keyed_by'
Browse files Browse the repository at this point in the history
  • Loading branch information
ahal committed Nov 21, 2024
1 parent 59e0e04 commit 5daf787
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 23 deletions.
34 changes: 33 additions & 1 deletion src/taskgraph/util/keyed_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,40 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

from typing import Any, Generator

from .attributes import keymatch
from taskgraph.util.attributes import keymatch


def iter_dot_path(
container: dict[str, Any], subfield: str
) -> Generator[tuple[dict[str, Any], str], None, None]:
"""Given a container and a subfield in dot path notation, yield the parent
container of the dotpath's leaf node, along with the leaf node name that it
contains.
If the dot path contains a list object, each item in the list will be
yielded.
Args:
container (dict): The container to search for the dot path.
subfield (str): The dot path to search for.
"""
while "." in subfield:
f, subfield = subfield.split(".", 1)

if f not in container:
return

if isinstance(container[f], list):
for item in container[f]:
yield from iter_dot_path(item, subfield)
return

container = container[f]

if isinstance(container, dict) and subfield in container:
yield container, subfield


def evaluate_keyed_by(
Expand Down
31 changes: 9 additions & 22 deletions src/taskgraph/util/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
import voluptuous

import taskgraph

from .keyed_by import evaluate_keyed_by
from taskgraph.util.keyed_by import evaluate_keyed_by, iter_dot_path


def validate_schema(schema, obj, msg_prefix):
Expand Down Expand Up @@ -125,26 +124,14 @@ def resolve_keyed_by(
Returns:
dict: item which has also been modified in-place.
"""
# find the field, returning the item unchanged if anything goes wrong
container, subfield = item, field
while "." in subfield:
f, subfield = subfield.split(".", 1)
if f not in container:
return item
container = container[f]
if not isinstance(container, dict):
return item

if subfield not in container:
return item

container[subfield] = evaluate_keyed_by(
value=container[subfield],
item_name=f"`{field}` in `{item_name}`",
defer=defer,
enforce_single_match=enforce_single_match,
attributes=dict(item, **extra_values),
)
for container, subfield in iter_dot_path(item, field):
container[subfield] = evaluate_keyed_by(
value=container[subfield],
item_name=f"`{field}` in `{item_name}`",
defer=defer,
enforce_single_match=enforce_single_match,
attributes=dict(item, **extra_values),
)

return item

Expand Down
29 changes: 29 additions & 0 deletions test/test_util_keyed_by.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

from pprint import pprint

import pytest

from taskgraph.util.keyed_by import iter_dot_path


@pytest.mark.parametrize(
"container,subfield,expected",
(
pytest.param(
{"a": {"b": {"c": 1}}, "d": 2}, "a.b.c", [({"c": 1}, "c")], id="simple case"
),
pytest.param(
{"a": [{"b": 1}, {"b": 2}, {"b": 3}], "d": 2},
"a.b",
[({"b": 1}, "b"), ({"b": 2}, "b"), ({"b": 3}, "b")],
id="list case",
),
),
)
def test_iter_dot_paths(container, subfield, expected):
result = list(iter_dot_path(container, subfield))
pprint(result, indent=2)
assert result == expected
14 changes: 14 additions & 0 deletions test/test_util_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,20 @@ def test_nested(self):
{"x": {"by-bar": {"B1": 11, "B2": 12}}},
)

def test_list(self):
item = {
"y": {
"by-foo": {
"F1": 10,
"F2": 20,
},
}
}
self.assertEqual(
resolve_keyed_by({"x": [item, item]}, "x.y", "name", foo="F1"),
{"x": [{"y": 10}, {"y": 10}]},
)

def test_no_by_empty_dict(self):
self.assertEqual(resolve_keyed_by({"x": {}}, "x", "n"), {"x": {}})

Expand Down

0 comments on commit 5daf787

Please sign in to comment.