Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add batch support to generator framework #14

Closed
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 36 additions & 8 deletions boundary_layer/builders/templates/generator_operator.j2
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,59 @@ def {{ iterable_builder }}({{ ', '.join(builder_args) }}):
{% set blocklist = (node.name + '_blocklist') | sanitize_operator_name %}
{{ blocklist }} = {{ node.regex_blocklist }}

{% set item_name_builder = (node.name + '_item_name_builder') | sanitize_operator_name %}
def {{ item_name_builder }}(index, item):
{{ node.config.item_name_builder_code | add_leading_spaces(1) }}

{# Note: we use resolved_properties rather than operator_args, because the
latter would discard any default task args, expecting them to be filled-in
by airflow, while in fact airflow would not fill them in at all. #}
{% set properties = node.resolved_properties.values %}
for (index, item) in enumerate({{ iterable_builder }}(

items = {{ iterable_builder }}(
{% for arg in builder_args %}
{% if arg in properties %}
{{ arg }} = {{ properties[arg] | format_value }},
{% endif %}
{% endfor %}
)):
)

{% if node.batching.enabled %}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if, instead of using a conditional block for this, you could instead:

  • use a default batch size of 1
  • create the {{item_name}} variable inside the builder function only if the batch size is 1
  • have the batch-name builder default to just choosing the item_name if the batch size is 1?

I think something along these lines might simplify a lot of the code, because many lines and branches have to go into dealing with differences between item_name and batch_name. I guess batch_name would become standard and item_name would only be a special case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get what you're saying, but if things technically run in "batch" mode all the time, then all related functions should return a list (even if that list only contains one item). For backwards compatibility, a single-item list could be simplified to its singular element, but IMO, that inconsistency complicates the API. 🤷‍♂️

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright yeah you convinced me 😄

I think I still do prefer the interface described below in the comment about BatchingSchema, where if batch_size is missing or None then we implicitly interpret that to be equivalent to disabling batching, so that we don't need to fill in both the enabled field and the batch_size field. thoughts on that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it. Makes usage a little bit easier. 👍

{# Generate code for batched situations #}
{% set batch_name_builder = (node.name + '_batch_name_builder') | sanitize_operator_name %}
def {{ batch_name_builder }}(index, items):
return 'batch_%d_%d' % (index, len(items))

{# TODO: Import this from some util module #}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite sure of the most appropriate way to do this...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aw yeah that's not really an option for us right now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted. I'll update the comment to reference such a time in the future where this functionality exists.

{# https://stackoverflow.com/a/312464 #}
def grouped(l, n):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love the use of the generator for this. Could you rename this function to something more obscure? Right now, unfortunately, we don't have a way to ensure that variables/functions inserted here do not have name collisions with those inserted in other places. For example, if there were a task named grouped in some DAG, boundary-layer would create a local variable named grouped that would collide with this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, will do. Thanks for calling this out!

for i in range(0, len(l), n):
yield l[i:i + n]

for (index, batch) in enumerate(grouped(items, {{ node.batching.batch_size }})):
batch_name = {{ batch_name_builder }}(index, items)
{# TODO: How do we handle this? #}
blocklist_match = any(re.match(i, item_name) for i in {{ blocklist }})
if blocklist_match:
continue

{% set item_input = 'items' %}
{% set name_input = 'batch_name' %}
{% else %}
{# Generate code for non-batched situations #}
{% set item_name_builder = (node.name + '_item_name_builder') | sanitize_operator_name %}
def {{ item_name_builder }}(index, item):
{{ node.config.item_name_builder_code | add_leading_spaces(1) }}

for (index, item) in enumerate(items):
item_name = {{ item_name_builder }}(index, item)
blocklist_match = any(re.match(i, item_name) for i in {{ blocklist }})
if blocklist_match:
continue

{% set item_input = 'item' %}
{% set name_input = 'item_name' %}
{% endif %}
{{ node.target | sanitize_operator_name }}_builder(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I broke this out of the conditional blocks and templated item_input and name_input to avoid repetition, but I'm actually not sure if that's the best idea here... Thoughts?

index = index,
item = item,
item_name = item_name,
{{ item_input }} = {{ item_input }},
{{ name_input }} = {{ name_input }},
dag = dag,
upstream_dependencies = {{ upstream_dependencies | sanitize_operator_name | verbatim | format_value }},
downstream_dependencies = {{ downstream_dependencies | sanitize_operator_name | verbatim | format_value }})
Expand Down
4 changes: 4 additions & 0 deletions boundary_layer/registry/types/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ class GeneratorNode(SubdagNode):
def regex_blocklist(self):
return self.item.get('regex_blocklist', ())

@property
def batching(self):
return self.item.get('batching', {'enabled': False, 'batch_size': 1})


class GeneratorRegistry(ConfigFileRegistry):
node_cls = GeneratorNode
Expand Down
6 changes: 6 additions & 0 deletions boundary_layer/schemas/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,15 @@ class ReferenceSchema(OperatorSchema):
target = fields.String(required=True)


class BatchingSchema(StrictSchema):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor maybe, but you could do this with just a single optional batch_size argument in the generator schema, i think? and if that argument is missing then you implicitly assume that enabled == False. This would also simplify the part above where you say you think there would be a better way...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment in generator_operator.j2 for my logic here.

enabled = fields.Boolean(required=True)
batch_size = fields.Integer(required=True)


class GeneratorSchema(ReferenceSchema):
auto_task_id_mode = fields.String()
regex_blocklist = fields.List(fields.String())
batching = fields.Nested(BatchingSchema)

@validates_schema
def check_task_id_mode(self, data):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I did not notice this in the original PR, but we'll have to add logic in this check_task_id_mode() method to allow the auto_task_id_mode value to be set to batch_name, otherwise I think the config parser will reject this setting. Maybe the logic that you already have for checking these values in operator.py could be moved here?

Expand Down