Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add batch support to generator framework #14

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 42 additions & 4 deletions boundary_layer/builders/templates/generator_operator.j2
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,61 @@ def {{ item_name_builder }}(index, item):
latter would discard any default task args, expecting them to be filled-in
by airflow, while in fact airflow would not fill them in at all. #}
{% set properties = node.resolved_properties.values %}
for (index, item) in enumerate({{ iterable_builder }}(

{% set all_items = (node.name + '_all_items') | sanitize_operator_name %}
{{ all_items }} = {{ iterable_builder }}(
{% for arg in builder_args %}
{% if arg in properties %}
{{ arg }} = {{ properties[arg] | format_value }},
{% endif %}
{% endfor %}
)):
)

{% if node.batching_enabled %}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so I did what you suggested, but I ended up adding a batching_enabled helper property to make it easier to resolve the batching situation. Like you proposed, we can implicitly know that batching is enabled if batching settings exist (for now, that means there's a batch_size), and we can use the optional disabled property to turn batching off even when we have the batch size configured. However, the simple check if not node.batching.disabled returns false positives here when the batching config does not exist at all, so the check needed to be more like if node.batching|length > 0 and not node.batching.disabled. Since this is a bit cumbersome, I created the batching_enabled property to avoid having to duplicate this logic in multiple places (3 distinct places in this PR need to perform this check).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooh, I like this solution 👍

{# Generate code for batched situations #}
{% set batch_name_builder = (node.name + '_batch_name_builder') | sanitize_operator_name %}
def {{ batch_name_builder }}(index, items):
return 'batch_%d_%d' % (index, len(items))

{# TODO: Import this from some util module when such functionality is possible #}
def generator_helper_filter_with_blocklist(items, item_name_builder, blocklist):
def not_in_blocklist(index, item):
item_name = item_name_builder(index, item)
return not any(re.match(i, item_name) for i in blocklist)

filtered = filter(lambda (index, item): not_in_blocklist(index, item), enumerate(items))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unfortunately python 3 does not support the syntax in which function/lambda args are automatically expanded into tuples (though I will admit I have not carefully considered whether other parts of the code-generating templates generate python3 compliant code... we may have some work to do in that area).

Maybe it would be better to just map and filter using a comprehension like

return [ index for (index, item) in enumerate(items)
    if not_in_blocklist(index, item) ]

?


return map(lambda t: t[1], filtered)

{# TODO: Import this from some util module when such functionality is possible #}
{# Borrowed from: https://stackoverflow.com/a/312464 #}
def generator_helper_grouped_list(l, n):
for i in range(0, len(l), n):
yield l[i:i + n]

{% set filtered = (node.name + '_filtered') | sanitize_operator_name %}
{{ filtered }} = generator_helper_filter_with_blocklist({{ all_items }}, {{ item_name_builder }}, {{ blocklist }})

for (index, items) in enumerate(generator_helper_grouped_list({{ filtered }}, {{ node.batching.batch_size }})):
batch_name = {{ batch_name_builder }}(index, items)

{% set item_input = 'items' %}
{% set name_input = 'batch_name' %}
{% else %}
{# Generate code for non-batched situations #}
for (index, item) in enumerate({{ all_items }}):
item_name = {{ item_name_builder }}(index, item)
blocklist_match = any(re.match(i, item_name) for i in {{ blocklist }})
if blocklist_match:
continue

{% set item_input = 'item' %}
{% set name_input = 'item_name' %}
{% endif %}
{{ node.target | sanitize_operator_name }}_builder(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I broke this out of the conditional blocks and templated item_input and name_input to avoid repetition, but I'm actually not sure if that's the best idea here... Thoughts?

index = index,
item = item,
item_name = item_name,
{{ item_input }} = {{ item_input }},
{{ name_input }} = {{ name_input }},
dag = dag,
upstream_dependencies = {{ upstream_dependencies | sanitize_operator_name | verbatim | format_value }},
downstream_dependencies = {{ downstream_dependencies | sanitize_operator_name | verbatim | format_value }})
Expand Down
11 changes: 9 additions & 2 deletions boundary_layer/builders/templates/generator_preamble.j2
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,17 @@ You may obtain a copy of the License at
See the License for the specific language governing permissions and
limitations under the License.
#}
{% if referring_node.batching_enabled %}
{%- set item_input = 'items' %}
{%- set name_input = 'batch_name' %}
{% else %}
{%- set item_input = 'item' %}
{%- set name_input = 'item_name' %}
{% endif %}
def {{ generator_operator_name | sanitize_operator_name }}_builder(
index,
item,
item_name,
{{ item_input }},
{{ name_input }},
dag,
upstream_dependencies,
downstream_dependencies):
8 changes: 8 additions & 0 deletions boundary_layer/registry/types/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ class GeneratorNode(SubdagNode):
def regex_blocklist(self):
return self.item.get('regex_blocklist', ())

@property
def batching(self):
return self.item.get('batching', {})

@property
def batching_enabled(self):
return not self.batching.get('disabled') if self.batching else False


class GeneratorRegistry(ConfigFileRegistry):
node_cls = GeneratorNode
Expand Down
14 changes: 12 additions & 2 deletions boundary_layer/registry/types/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,18 @@ def _build_task_id(self, execution_context):
return base_name

suffix_mode = execution_context.referrer.item.get('auto_task_id_mode')
if not suffix_mode or suffix_mode == 'item_name':
return base_name + '-<<item_name>>'
batching_enabled = execution_context.referrer.batching_enabled
# Validate suffix_mode based on batching config
if batching_enabled and suffix_mode == 'item_name':
raise Exception(
'Cannot use `item_name` for auto_task_id_mode when batching is enabled')
elif not batching_enabled and suffix_mode == 'batch_name':
raise Exception(
'Cannot use `batch_name` for auto_task_id_mode when batching is disabled')

name_var = 'batch_name' if batching_enabled else 'item_name'
if not suffix_mode or suffix_mode == name_var:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, should probably support either batch_name or item_name when batching is enabled, but only item_name if batching is not enabled. Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yup, if I had read this comment first I would have type a lot less up there ^^ haha

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, I don't think it makes sense to use item_name in a batching scenario. There's not really a good way to construct the item name in that case, so I think we'd have to use batch_name. Let me know if I'm missing something here... Otherwise, I'll add some validation around this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

return base_name + '-<<' + name_var + '>>'
elif suffix_mode == 'index':
return base_name + '-<<str(index)>>'

Expand Down
6 changes: 6 additions & 0 deletions boundary_layer/schemas/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,15 @@ class ReferenceSchema(OperatorSchema):
target = fields.String(required=True)


class BatchingSchema(StrictSchema):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor maybe, but you could do this with just a single optional batch_size argument in the generator schema, i think? and if that argument is missing then you implicitly assume that enabled == False. This would also simplify the part above where you say you think there would be a better way...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment in generator_operator.j2 for my logic here.

disabled = fields.Boolean()
batch_size = fields.Integer(required=True)


class GeneratorSchema(ReferenceSchema):
auto_task_id_mode = fields.String()
regex_blocklist = fields.List(fields.String())
batching = fields.Nested(BatchingSchema)

@validates_schema
def check_task_id_mode(self, data):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I did not notice this in the original PR, but we'll have to add logic in this check_task_id_mode() method to allow the auto_task_id_mode value to be set to batch_name, otherwise I think the config parser will reject this setting. Maybe the logic that you already have for checking these values in operator.py could be moved here?

Expand Down
194 changes: 192 additions & 2 deletions test/test_generators.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
from boundary_layer.registry import NodeTypes
from boundary_layer.containers import ExecutionContext
import copy
import re
import yaml

from boundary_layer import plugins
from boundary_layer.builders import PrimaryDagBuilder
from boundary_layer.containers import ExecutionContext
from boundary_layer.registry import NodeTypes
from boundary_layer.registry.types.generator import GeneratorNode
from boundary_layer.schemas.internal.generators import GeneratorSpecSchema


def test_default_param_filler():
Expand All @@ -25,3 +32,186 @@ def test_default_param_filler():
'timeout_sec': 5,
'headers': {}
}


# Tests for batching functionality
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably went a little overboard here, but I like to be thorough... 😬

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

omg this is awesome! <3 overboard tests


BASE_GENERATOR_CONFIG = {
'name': 'test_generator',
'type': 'list_generator',
'target': 'some_target',
'properties': {
'items': ['a', 'b', 'c', 'd', 'e', 'f', 'g']
}
}

GENERATOR_CONFIG_YAML = """
name: list_generator
iterator_builder_method_code: return items
item_name_builder_code: return item
parameters_jsonschema:
properties:
items:
type: array
items:
type: string
additionalProperties: false
required:
- items
"""


class BatchingTestHelper(object):
"""
Helper class to reduce code required to test code generation under different cases.
"""

def __init__(self):
self.builder = PrimaryDagBuilder(None, None, None, None)
self.generator_spec_schema = GeneratorSpecSchema().load(yaml.load(GENERATOR_CONFIG_YAML))

def build_generator_node(self, batching_config):
node_config = copy.deepcopy(BASE_GENERATOR_CONFIG)
if batching_config is not None:
node_config['batching'] = batching_config

return GeneratorNode(config=self.generator_spec_schema.data, item=node_config)

def run_preamble_template_test(self, batching_config):
node = self.build_generator_node(batching_config)
template = self.builder.get_jinja_template('generator_preamble.j2')

rendered = template.render(
generator_operator_name='foo',
referring_node=node
)

items_batch_name_regex = re.compile(r'\s+items,\s+batch_name,')
item_item_name_regex = re.compile(r'\s+item,\s+item_name,')

items_batch_name_match = items_batch_name_regex.search(rendered)
item_item_name_match = item_item_name_regex.search(rendered)

return {
'items_batch_name': items_batch_name_match,
'item_item_name': item_item_name_match
}

def run_operator_template_test(self, batching_config):
node = self.build_generator_node(batching_config)
template = self.builder.get_jinja_template('generator_operator.j2')

node.resolve_properties(
execution_context=ExecutionContext(None, {}),
default_task_args={},
base_operator_loader=None,
preprocessor_loader=None
)

rendered = template.render(
node=node,
upstream_dependencies='upstream_foo',
downstream_dependencies='downstream_bar'
)

item_name_builder_regex = re.compile(r'.*def %s_item_name_builder\(.*' % node.name)
batch_name_builder_regex = re.compile(r'.*def %s_batch_name_builder\(.*' % node.name)
filter_helper_regex = re.compile(r'.*def generator_helper_filter_with_blocklist\(.*')
grouped_helper_regex = re.compile(r'.*def generator_helper_grouped_list\(.*')
builder_invocation = r'\s+%s_builder\(\s+index = index,' % node.target
items_batch_name_regex = re.compile(
r'%s\s+items = items,\s+batch_name = batch_name,' % builder_invocation
)
item_item_name_regex = re.compile(
r'%s\s+item = item,\s+item_name = item_name,' % builder_invocation
)

return {
'item_name_builder': item_name_builder_regex.search(rendered),
'batch_name_builder': batch_name_builder_regex.search(rendered),
'filter_helper': filter_helper_regex.search(rendered),
'grouped_helper': grouped_helper_regex.search(rendered),
'items_batch_name': items_batch_name_regex.search(rendered),
'item_item_name': item_item_name_regex.search(rendered),
}


HELPER = BatchingTestHelper()


def test_batching_enabled_enabled():
batching_config = {'batch_size': 3}
node = HELPER.build_generator_node(batching_config)

assert node.batching_enabled is True


def test_batching_enabled_disabled():
batching_config = {'batch_size': 3, 'disabled': True}
node = HELPER.build_generator_node(batching_config)

assert node.batching_enabled is False


def test_batching_enabled_undefined():
node = HELPER.build_generator_node(None)

assert node.batching_enabled is False


def test_preamble_template_batching_enabled():
batching_config = {'batch_size': 3}
matches = HELPER.run_preamble_template_test(batching_config)

assert matches['items_batch_name'] is not None
assert matches['item_item_name'] is None


def test_preamble_template_batching_disabled():
batching_config = {'batch_size': 3, 'disabled': True}
matches = HELPER.run_preamble_template_test(batching_config)

assert matches['item_item_name'] is not None
assert matches['items_batch_name'] is None


def test_preamble_template_batching_undefined():
matches = HELPER.run_preamble_template_test(None)

assert matches['item_item_name'] is not None
assert matches['items_batch_name'] is None


def test_operator_template_batching_enabled():
batching_config = {'batch_size': 3}
matches = HELPER.run_operator_template_test(batching_config)

assert matches['item_name_builder'] is not None
assert matches['batch_name_builder'] is not None
assert matches['filter_helper'] is not None
assert matches['grouped_helper'] is not None
assert matches['items_batch_name'] is not None
assert matches['item_item_name'] is None


def test_operator_template_batching_disabled():
batching_config = {'batch_size': 3, 'disabled': True}
matches = HELPER.run_operator_template_test(batching_config)

assert matches['item_name_builder'] is not None
assert matches['batch_name_builder'] is None
assert matches['filter_helper'] is None
assert matches['grouped_helper'] is None
assert matches['items_batch_name'] is None
assert matches['item_item_name'] is not None


def test_operator_template_batching_undefined():
matches = HELPER.run_operator_template_test(None)

assert matches['item_name_builder'] is not None
assert matches['batch_name_builder'] is None
assert matches['filter_helper'] is None
assert matches['grouped_helper'] is None
assert matches['items_batch_name'] is None
assert matches['item_item_name'] is not None
Loading