diff --git a/src/taskgraph/util/keyed_by.py b/src/taskgraph/util/keyed_by.py index 00c84ba98..226b0194f 100644 --- a/src/taskgraph/util/keyed_by.py +++ b/src/taskgraph/util/keyed_by.py @@ -2,8 +2,40 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. +from typing import Any, Dict, Generator, Tuple -from .attributes import keymatch +from taskgraph.util.attributes import keymatch + + +def iter_dot_path( + container: Dict[str, Any], subfield: str +) -> Generator[Tuple[dict[str, Any], str], None, None]: + """Given a container and a subfield in dot path notation, yield the parent + container of the dotpath's leaf node, along with the leaf node name that it + contains. + + If the dot path contains a list object, each item in the list will be + yielded. + + Args: + container (dict): The container to search for the dot path. + subfield (str): The dot path to search for. + """ + while "." in subfield: + f, subfield = subfield.split(".", 1) + + if f not in container: + return + + if isinstance(container[f], list): + for item in container[f]: + yield from iter_dot_path(item, subfield) + return + + container = container[f] + + if isinstance(container, dict) and subfield in container: + yield container, subfield def evaluate_keyed_by( diff --git a/src/taskgraph/util/schema.py b/src/taskgraph/util/schema.py index 8b2b59051..590273676 100644 --- a/src/taskgraph/util/schema.py +++ b/src/taskgraph/util/schema.py @@ -10,8 +10,7 @@ import voluptuous import taskgraph - -from .keyed_by import evaluate_keyed_by +from taskgraph.util.keyed_by import evaluate_keyed_by, iter_dot_path def validate_schema(schema, obj, msg_prefix): @@ -125,26 +124,14 @@ def resolve_keyed_by( Returns: dict: item which has also been modified in-place. """ - # find the field, returning the item unchanged if anything goes wrong - container, subfield = item, field - while "." in subfield: - f, subfield = subfield.split(".", 1) - if f not in container: - return item - container = container[f] - if not isinstance(container, dict): - return item - - if subfield not in container: - return item - - container[subfield] = evaluate_keyed_by( - value=container[subfield], - item_name=f"`{field}` in `{item_name}`", - defer=defer, - enforce_single_match=enforce_single_match, - attributes=dict(item, **extra_values), - ) + for container, subfield in iter_dot_path(item, field): + container[subfield] = evaluate_keyed_by( + value=container[subfield], + item_name=f"`{field}` in `{item_name}`", + defer=defer, + enforce_single_match=enforce_single_match, + attributes=dict(item, **extra_values), + ) return item diff --git a/test/test_util_keyed_by.py b/test/test_util_keyed_by.py new file mode 100644 index 000000000..1a8ae5e79 --- /dev/null +++ b/test/test_util_keyed_by.py @@ -0,0 +1,29 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +from pprint import pprint + +import pytest + +from taskgraph.util.keyed_by import iter_dot_path + + +@pytest.mark.parametrize( + "container,subfield,expected", + ( + pytest.param( + {"a": {"b": {"c": 1}}, "d": 2}, "a.b.c", [({"c": 1}, "c")], id="simple case" + ), + pytest.param( + {"a": [{"b": 1}, {"b": 2}, {"b": 3}], "d": 2}, + "a.b", + [({"b": 1}, "b"), ({"b": 2}, "b"), ({"b": 3}, "b")], + id="list case", + ), + ), +) +def test_iter_dot_paths(container, subfield, expected): + result = list(iter_dot_path(container, subfield)) + pprint(result, indent=2) + assert result == expected diff --git a/test/test_util_schema.py b/test/test_util_schema.py index c56f818ac..5a8d6df4f 100644 --- a/test/test_util_schema.py +++ b/test/test_util_schema.py @@ -106,6 +106,20 @@ def test_nested(self): {"x": {"by-bar": {"B1": 11, "B2": 12}}}, ) + def test_list(self): + item = { + "y": { + "by-foo": { + "F1": 10, + "F2": 20, + }, + } + } + self.assertEqual( + resolve_keyed_by({"x": [item, item]}, "x.y", "name", foo="F1"), + {"x": [{"y": 10}, {"y": 10}]}, + ) + def test_no_by_empty_dict(self): self.assertEqual(resolve_keyed_by({"x": {}}, "x", "n"), {"x": {}})