-
Notifications
You must be signed in to change notification settings - Fork 58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add batch support to generator framework #14
Changes from all commits
563a4bd
d751977
b2aa9ad
416a929
dd156a0
c3f45a5
109fcd0
9ce9e10
e3ebf6f
74165a4
2e98922
a7107ad
490c52c
709d5bf
c7fdb29
eeb596c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,23 +29,61 @@ def {{ item_name_builder }}(index, item): | |
latter would discard any default task args, expecting them to be filled-in | ||
by airflow, while in fact airflow would not fill them in at all. #} | ||
{% set properties = node.resolved_properties.values %} | ||
for (index, item) in enumerate({{ iterable_builder }}( | ||
|
||
{% set all_items = (node.name + '_all_items') | sanitize_operator_name %} | ||
{{ all_items }} = {{ iterable_builder }}( | ||
{% for arg in builder_args %} | ||
{% if arg in properties %} | ||
{{ arg }} = {{ properties[arg] | format_value }}, | ||
{% endif %} | ||
{% endfor %} | ||
)): | ||
) | ||
|
||
{% if node.batching_enabled %} | ||
{# Generate code for batched situations #} | ||
{% set batch_name_builder = (node.name + '_batch_name_builder') | sanitize_operator_name %} | ||
def {{ batch_name_builder }}(index, items): | ||
return 'batch_%d_%d' % (index, len(items)) | ||
|
||
{# TODO: Import this from some util module when such functionality is possible #} | ||
def generator_helper_filter_with_blocklist(items, item_name_builder, blocklist): | ||
def not_in_blocklist(index, item): | ||
item_name = item_name_builder(index, item) | ||
return not any(re.match(i, item_name) for i in blocklist) | ||
|
||
filtered = filter(lambda (index, item): not_in_blocklist(index, item), enumerate(items)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unfortunately python 3 does not support the syntax in which function/lambda args are automatically expanded into tuples (though I will admit I have not carefully considered whether other parts of the code-generating templates generate python3 compliant code... we may have some work to do in that area). Maybe it would be better to just map and filter using a comprehension like return [ index for (index, item) in enumerate(items)
if not_in_blocklist(index, item) ] ? |
||
|
||
return map(lambda t: t[1], filtered) | ||
|
||
{# TODO: Import this from some util module when such functionality is possible #} | ||
{# Borrowed from: https://stackoverflow.com/a/312464 #} | ||
def generator_helper_grouped_list(l, n): | ||
for i in range(0, len(l), n): | ||
yield l[i:i + n] | ||
|
||
{% set filtered = (node.name + '_filtered') | sanitize_operator_name %} | ||
{{ filtered }} = generator_helper_filter_with_blocklist({{ all_items }}, {{ item_name_builder }}, {{ blocklist }}) | ||
|
||
for (index, items) in enumerate(generator_helper_grouped_list({{ filtered }}, {{ node.batching.batch_size }})): | ||
batch_name = {{ batch_name_builder }}(index, items) | ||
|
||
{% set item_input = 'items' %} | ||
{% set name_input = 'batch_name' %} | ||
{% else %} | ||
{# Generate code for non-batched situations #} | ||
for (index, item) in enumerate({{ all_items }}): | ||
item_name = {{ item_name_builder }}(index, item) | ||
blocklist_match = any(re.match(i, item_name) for i in {{ blocklist }}) | ||
if blocklist_match: | ||
continue | ||
|
||
{% set item_input = 'item' %} | ||
{% set name_input = 'item_name' %} | ||
{% endif %} | ||
{{ node.target | sanitize_operator_name }}_builder( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I broke this out of the conditional blocks and templated |
||
index = index, | ||
item = item, | ||
item_name = item_name, | ||
{{ item_input }} = {{ item_input }}, | ||
{{ name_input }} = {{ name_input }}, | ||
dag = dag, | ||
upstream_dependencies = {{ upstream_dependencies | sanitize_operator_name | verbatim | format_value }}, | ||
downstream_dependencies = {{ downstream_dependencies | sanitize_operator_name | verbatim | format_value }}) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -422,8 +422,18 @@ def _build_task_id(self, execution_context): | |
return base_name | ||
|
||
suffix_mode = execution_context.referrer.item.get('auto_task_id_mode') | ||
if not suffix_mode or suffix_mode == 'item_name': | ||
return base_name + '-<<item_name>>' | ||
batching_enabled = execution_context.referrer.batching_enabled | ||
# Validate suffix_mode based on batching config | ||
if batching_enabled and suffix_mode == 'item_name': | ||
raise Exception( | ||
'Cannot use `item_name` for auto_task_id_mode when batching is enabled') | ||
elif not batching_enabled and suffix_mode == 'batch_name': | ||
raise Exception( | ||
'Cannot use `batch_name` for auto_task_id_mode when batching is disabled') | ||
|
||
name_var = 'batch_name' if batching_enabled else 'item_name' | ||
if not suffix_mode or suffix_mode == name_var: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, should probably support either There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh yup, if I had read this comment first I would have type a lot less up there ^^ haha There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On second thought, I don't think it makes sense to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
return base_name + '-<<' + name_var + '>>' | ||
elif suffix_mode == 'index': | ||
return base_name + '-<<str(index)>>' | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,9 +35,15 @@ class ReferenceSchema(OperatorSchema): | |
target = fields.String(required=True) | ||
|
||
|
||
class BatchingSchema(StrictSchema): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor maybe, but you could do this with just a single optional There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See my comment in |
||
disabled = fields.Boolean() | ||
batch_size = fields.Integer(required=True) | ||
|
||
|
||
class GeneratorSchema(ReferenceSchema): | ||
auto_task_id_mode = fields.String() | ||
regex_blocklist = fields.List(fields.String()) | ||
batching = fields.Nested(BatchingSchema) | ||
|
||
@validates_schema | ||
def check_task_id_mode(self, data): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh I did not notice this in the original PR, but we'll have to add logic in this |
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,13 @@ | ||
from boundary_layer.registry import NodeTypes | ||
from boundary_layer.containers import ExecutionContext | ||
import copy | ||
import re | ||
import yaml | ||
|
||
from boundary_layer import plugins | ||
from boundary_layer.builders import PrimaryDagBuilder | ||
from boundary_layer.containers import ExecutionContext | ||
from boundary_layer.registry import NodeTypes | ||
from boundary_layer.registry.types.generator import GeneratorNode | ||
from boundary_layer.schemas.internal.generators import GeneratorSpecSchema | ||
|
||
|
||
def test_default_param_filler(): | ||
|
@@ -25,3 +32,186 @@ def test_default_param_filler(): | |
'timeout_sec': 5, | ||
'headers': {} | ||
} | ||
|
||
|
||
# Tests for batching functionality | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably went a little overboard here, but I like to be thorough... 😬 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. omg this is awesome! <3 overboard tests |
||
|
||
BASE_GENERATOR_CONFIG = { | ||
'name': 'test_generator', | ||
'type': 'list_generator', | ||
'target': 'some_target', | ||
'properties': { | ||
'items': ['a', 'b', 'c', 'd', 'e', 'f', 'g'] | ||
} | ||
} | ||
|
||
GENERATOR_CONFIG_YAML = """ | ||
name: list_generator | ||
iterator_builder_method_code: return items | ||
item_name_builder_code: return item | ||
parameters_jsonschema: | ||
properties: | ||
items: | ||
type: array | ||
items: | ||
type: string | ||
additionalProperties: false | ||
required: | ||
- items | ||
""" | ||
|
||
|
||
class BatchingTestHelper(object): | ||
""" | ||
Helper class to reduce code required to test code generation under different cases. | ||
""" | ||
|
||
def __init__(self): | ||
self.builder = PrimaryDagBuilder(None, None, None, None) | ||
self.generator_spec_schema = GeneratorSpecSchema().load(yaml.load(GENERATOR_CONFIG_YAML)) | ||
|
||
def build_generator_node(self, batching_config): | ||
node_config = copy.deepcopy(BASE_GENERATOR_CONFIG) | ||
if batching_config is not None: | ||
node_config['batching'] = batching_config | ||
|
||
return GeneratorNode(config=self.generator_spec_schema.data, item=node_config) | ||
|
||
def run_preamble_template_test(self, batching_config): | ||
node = self.build_generator_node(batching_config) | ||
template = self.builder.get_jinja_template('generator_preamble.j2') | ||
|
||
rendered = template.render( | ||
generator_operator_name='foo', | ||
referring_node=node | ||
) | ||
|
||
items_batch_name_regex = re.compile(r'\s+items,\s+batch_name,') | ||
item_item_name_regex = re.compile(r'\s+item,\s+item_name,') | ||
|
||
items_batch_name_match = items_batch_name_regex.search(rendered) | ||
item_item_name_match = item_item_name_regex.search(rendered) | ||
|
||
return { | ||
'items_batch_name': items_batch_name_match, | ||
'item_item_name': item_item_name_match | ||
} | ||
|
||
def run_operator_template_test(self, batching_config): | ||
node = self.build_generator_node(batching_config) | ||
template = self.builder.get_jinja_template('generator_operator.j2') | ||
|
||
node.resolve_properties( | ||
execution_context=ExecutionContext(None, {}), | ||
default_task_args={}, | ||
base_operator_loader=None, | ||
preprocessor_loader=None | ||
) | ||
|
||
rendered = template.render( | ||
node=node, | ||
upstream_dependencies='upstream_foo', | ||
downstream_dependencies='downstream_bar' | ||
) | ||
|
||
item_name_builder_regex = re.compile(r'.*def %s_item_name_builder\(.*' % node.name) | ||
batch_name_builder_regex = re.compile(r'.*def %s_batch_name_builder\(.*' % node.name) | ||
filter_helper_regex = re.compile(r'.*def generator_helper_filter_with_blocklist\(.*') | ||
grouped_helper_regex = re.compile(r'.*def generator_helper_grouped_list\(.*') | ||
builder_invocation = r'\s+%s_builder\(\s+index = index,' % node.target | ||
items_batch_name_regex = re.compile( | ||
r'%s\s+items = items,\s+batch_name = batch_name,' % builder_invocation | ||
) | ||
item_item_name_regex = re.compile( | ||
r'%s\s+item = item,\s+item_name = item_name,' % builder_invocation | ||
) | ||
|
||
return { | ||
'item_name_builder': item_name_builder_regex.search(rendered), | ||
'batch_name_builder': batch_name_builder_regex.search(rendered), | ||
'filter_helper': filter_helper_regex.search(rendered), | ||
'grouped_helper': grouped_helper_regex.search(rendered), | ||
'items_batch_name': items_batch_name_regex.search(rendered), | ||
'item_item_name': item_item_name_regex.search(rendered), | ||
} | ||
|
||
|
||
HELPER = BatchingTestHelper() | ||
|
||
|
||
def test_batching_enabled_enabled(): | ||
batching_config = {'batch_size': 3} | ||
node = HELPER.build_generator_node(batching_config) | ||
|
||
assert node.batching_enabled is True | ||
|
||
|
||
def test_batching_enabled_disabled(): | ||
batching_config = {'batch_size': 3, 'disabled': True} | ||
node = HELPER.build_generator_node(batching_config) | ||
|
||
assert node.batching_enabled is False | ||
|
||
|
||
def test_batching_enabled_undefined(): | ||
node = HELPER.build_generator_node(None) | ||
|
||
assert node.batching_enabled is False | ||
|
||
|
||
def test_preamble_template_batching_enabled(): | ||
batching_config = {'batch_size': 3} | ||
matches = HELPER.run_preamble_template_test(batching_config) | ||
|
||
assert matches['items_batch_name'] is not None | ||
assert matches['item_item_name'] is None | ||
|
||
|
||
def test_preamble_template_batching_disabled(): | ||
batching_config = {'batch_size': 3, 'disabled': True} | ||
matches = HELPER.run_preamble_template_test(batching_config) | ||
|
||
assert matches['item_item_name'] is not None | ||
assert matches['items_batch_name'] is None | ||
|
||
|
||
def test_preamble_template_batching_undefined(): | ||
matches = HELPER.run_preamble_template_test(None) | ||
|
||
assert matches['item_item_name'] is not None | ||
assert matches['items_batch_name'] is None | ||
|
||
|
||
def test_operator_template_batching_enabled(): | ||
batching_config = {'batch_size': 3} | ||
matches = HELPER.run_operator_template_test(batching_config) | ||
|
||
assert matches['item_name_builder'] is not None | ||
assert matches['batch_name_builder'] is not None | ||
assert matches['filter_helper'] is not None | ||
assert matches['grouped_helper'] is not None | ||
assert matches['items_batch_name'] is not None | ||
assert matches['item_item_name'] is None | ||
|
||
|
||
def test_operator_template_batching_disabled(): | ||
batching_config = {'batch_size': 3, 'disabled': True} | ||
matches = HELPER.run_operator_template_test(batching_config) | ||
|
||
assert matches['item_name_builder'] is not None | ||
assert matches['batch_name_builder'] is None | ||
assert matches['filter_helper'] is None | ||
assert matches['grouped_helper'] is None | ||
assert matches['items_batch_name'] is None | ||
assert matches['item_item_name'] is not None | ||
|
||
|
||
def test_operator_template_batching_undefined(): | ||
matches = HELPER.run_operator_template_test(None) | ||
|
||
assert matches['item_name_builder'] is not None | ||
assert matches['batch_name_builder'] is None | ||
assert matches['filter_helper'] is None | ||
assert matches['grouped_helper'] is None | ||
assert matches['items_batch_name'] is None | ||
assert matches['item_item_name'] is not None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so I did what you suggested, but I ended up adding a
batching_enabled
helper property to make it easier to resolve the batching situation. Like you proposed, we can implicitly know that batching is enabled if batching settings exist (for now, that means there's abatch_size
), and we can use the optionaldisabled
property to turn batching off even when we have the batch size configured. However, the simple checkif not node.batching.disabled
returns false positives here when the batching config does not exist at all, so the check needed to be more likeif node.batching|length > 0 and not node.batching.disabled
. Since this is a bit cumbersome, I created thebatching_enabled
property to avoid having to duplicate this logic in multiple places (3 distinct places in this PR need to perform this check).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ooh, I like this solution 👍