-
Notifications
You must be signed in to change notification settings - Fork 58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add batch support to generator framework #14
Changes from 7 commits
563a4bd
d751977
b2aa9ad
416a929
dd156a0
c3f45a5
109fcd0
9ce9e10
e3ebf6f
74165a4
2e98922
a7107ad
490c52c
709d5bf
c7fdb29
eeb596c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,23 +29,61 @@ def {{ item_name_builder }}(index, item): | |
latter would discard any default task args, expecting them to be filled-in | ||
by airflow, while in fact airflow would not fill them in at all. #} | ||
{% set properties = node.resolved_properties.values %} | ||
for (index, item) in enumerate({{ iterable_builder }}( | ||
|
||
{% set all_items = (node.name + '_all_items') | sanitize_operator_name %} | ||
{{ all_items }} = {{ iterable_builder }}( | ||
{% for arg in builder_args %} | ||
{% if arg in properties %} | ||
{{ arg }} = {{ properties[arg] | format_value }}, | ||
{% endif %} | ||
{% endfor %} | ||
)): | ||
) | ||
|
||
{% if node.batching.enabled %} | ||
{# Generate code for batched situations #} | ||
{% set batch_name_builder = (node.name + '_batch_name_builder') | sanitize_operator_name %} | ||
def {{ batch_name_builder }}(index, items): | ||
return 'batch_%d_%d' % (index, len(items)) | ||
|
||
{# TODO: Import this from some util module #} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not quite sure of the most appropriate way to do this... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. aw yeah that's not really an option for us right now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Noted. I'll update the comment to reference such a time in the future where this functionality exists. |
||
def filter_with_blocklist(items, item_name_builder, blocklist): | ||
def not_in_blocklist(index, item): | ||
item_name = item_name_builder(index, item) | ||
return not any(re.match(i, item_name) for i in blocklist) | ||
|
||
filtered = filter(lambda (index, item): not_in_blocklist(index, item), enumerate(items)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unfortunately python 3 does not support the syntax in which function/lambda args are automatically expanded into tuples (though I will admit I have not carefully considered whether other parts of the code-generating templates generate python3 compliant code... we may have some work to do in that area). Maybe it would be better to just map and filter using a comprehension like return [ index for (index, item) in enumerate(items)
if not_in_blocklist(index, item) ] ? |
||
|
||
return map(lambda t: t[1], filtered) | ||
|
||
{# TODO: Import this from some util module #} | ||
{# https://stackoverflow.com/a/312464 #} | ||
def grouped(l, n): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Love the use of the generator for this. Could you rename this function to something more obscure? Right now, unfortunately, we don't have a way to ensure that variables/functions inserted here do not have name collisions with those inserted in other places. For example, if there were a task named There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense, will do. Thanks for calling this out! |
||
for i in range(0, len(l), n): | ||
yield l[i:i + n] | ||
|
||
{% set filtered = (node.name + '_filtered') | sanitize_operator_name %} | ||
{{ filtered }} = filter_with_blocklist({{ all_items }}, {{ item_name_builder }}, {{ blocklist }}) | ||
|
||
for (index, items) in enumerate(grouped({{ filtered }}, {{ node.batching.batch_size }})): | ||
batch_name = {{ batch_name_builder }}(index, items) | ||
|
||
{% set item_input = 'items' %} | ||
{% set name_input = 'batch_name' %} | ||
{% else %} | ||
{# Generate code for non-batched situations #} | ||
for (index, item) in enumerate({{ all_items }}): | ||
item_name = {{ item_name_builder }}(index, item) | ||
blocklist_match = any(re.match(i, item_name) for i in {{ blocklist }}) | ||
if blocklist_match: | ||
continue | ||
|
||
{% set item_input = 'item' %} | ||
{% set name_input = 'item_name' %} | ||
{% endif %} | ||
{{ node.target | sanitize_operator_name }}_builder( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I broke this out of the conditional blocks and templated |
||
index = index, | ||
item = item, | ||
item_name = item_name, | ||
{{ item_input }} = {{ item_input }}, | ||
{{ name_input }} = {{ name_input }}, | ||
dag = dag, | ||
upstream_dependencies = {{ upstream_dependencies | sanitize_operator_name | verbatim | format_value }}, | ||
downstream_dependencies = {{ downstream_dependencies | sanitize_operator_name | verbatim | format_value }}) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -422,8 +422,9 @@ def _build_task_id(self, execution_context): | |
return base_name | ||
|
||
suffix_mode = execution_context.referrer.item.get('auto_task_id_mode') | ||
if not suffix_mode or suffix_mode == 'item_name': | ||
return base_name + '-<<item_name>>' | ||
name_var = 'batch_name' if execution_context.referrer.item.get('batching', {'enabled': False})['enabled'] else 'item_name' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel like there's probably a better way to do this... 🤔 |
||
if not suffix_mode or suffix_mode == name_var: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, should probably support either There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh yup, if I had read this comment first I would have type a lot less up there ^^ haha There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On second thought, I don't think it makes sense to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
return base_name + '-<<' + name_var + '>>' | ||
elif suffix_mode == 'index': | ||
return base_name + '-<<str(index)>>' | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,9 +35,15 @@ class ReferenceSchema(OperatorSchema): | |
target = fields.String(required=True) | ||
|
||
|
||
class BatchingSchema(StrictSchema): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor maybe, but you could do this with just a single optional There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See my comment in |
||
enabled = fields.Boolean(required=True) | ||
batch_size = fields.Integer(required=True) | ||
|
||
|
||
class GeneratorSchema(ReferenceSchema): | ||
auto_task_id_mode = fields.String() | ||
regex_blocklist = fields.List(fields.String()) | ||
batching = fields.Nested(BatchingSchema) | ||
|
||
@validates_schema | ||
def check_task_id_mode(self, data): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh I did not notice this in the original PR, but we'll have to add logic in this |
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if, instead of using a conditional block for this, you could instead:
{{item_name}}
variable inside the builder function only if the batch size is 1I think something along these lines might simplify a lot of the code, because many lines and branches have to go into dealing with differences between
item_name
andbatch_name
. I guessbatch_name
would become standard anditem_name
would only be a special case.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get what you're saying, but if things technically run in "batch" mode all the time, then all related functions should return a list (even if that list only contains one item). For backwards compatibility, a single-item list could be simplified to its singular element, but IMO, that inconsistency complicates the API. 🤷♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright yeah you convinced me 😄
I think I still do prefer the interface described below in the comment about
BatchingSchema
, where ifbatch_size
is missing orNone
then we implicitly interpret that to be equivalent to disabling batching, so that we don't need to fill in both theenabled
field and thebatch_size
field. thoughts on that?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like it. Makes usage a little bit easier. 👍