-
Notifications
You must be signed in to change notification settings - Fork 58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add batch support to generator framework #14
Add batch support to generator framework #14
Conversation
item_name = {{ item_name_builder }}(index, item) | ||
blocklist_match = any(re.match(i, item_name) for i in {{ blocklist }}) | ||
if blocklist_match: | ||
continue | ||
|
||
{% set item_input = 'item' %} | ||
{% set name_input = 'item_name' %} | ||
{% endif %} | ||
{{ node.target | sanitize_operator_name }}_builder( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I broke this out of the conditional blocks and templated item_input
and name_input
to avoid repetition, but I'm actually not sure if that's the best idea here... Thoughts?
def {{ batch_name_builder }}(index, items): | ||
return 'batch_%d_%d' % (index, len(items)) | ||
|
||
{# TODO: Import this from some util module #} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not quite sure of the most appropriate way to do this...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aw yeah that's not really an option for us right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noted. I'll update the comment to reference such a time in the future where this functionality exists.
@@ -422,8 +422,9 @@ def _build_task_id(self, execution_context): | |||
return base_name | |||
|
|||
suffix_mode = execution_context.referrer.item.get('auto_task_id_mode') | |||
if not suffix_mode or suffix_mode == 'item_name': | |||
return base_name + '-<<item_name>>' | |||
name_var = 'batch_name' if execution_context.referrer.item.get('batching', {'enabled': False})['enabled'] else 'item_name' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like there's probably a better way to do this... 🤔
if not suffix_mode or suffix_mode == 'item_name': | ||
return base_name + '-<<item_name>>' | ||
name_var = 'batch_name' if execution_context.referrer.item.get('batching', {'enabled': False})['enabled'] else 'item_name' | ||
if not suffix_mode or suffix_mode == name_var: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, should probably support either batch_name
or item_name
when batching is enabled, but only item_name
if batching is not enabled. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yup, if I had read this comment first I would have type a lot less up there ^^ haha
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second thought, I don't think it makes sense to use item_name
in a batching scenario. There's not really a good way to construct the item name in that case, so I think we'd have to use batch_name
. Let me know if I'm missing something here... Otherwise, I'll add some validation around this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -422,8 +422,10 @@ def _build_task_id(self, execution_context): | |||
return base_name | |||
|
|||
suffix_mode = execution_context.referrer.item.get('auto_task_id_mode') | |||
if not suffix_mode or suffix_mode == 'item_name': | |||
return base_name + '-<<item_name>>' | |||
batching_config = execution_context.referrer.item.get('batching', {'enabled': False}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like there's probably a better way to do this... 🤔
def {{ batch_name_builder }}(index, items): | ||
return 'batch_%d_%d' % (index, len(items)) | ||
|
||
{# TODO: Import this from some util module #} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aw yeah that's not really an option for us right now.
)): | ||
) | ||
|
||
{% if node.batching.enabled %} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if, instead of using a conditional block for this, you could instead:
- use a default batch size of 1
- create the
{{item_name}}
variable inside the builder function only if the batch size is 1 - have the batch-name builder default to just choosing the item_name if the batch size is 1?
I think something along these lines might simplify a lot of the code, because many lines and branches have to go into dealing with differences between item_name
and batch_name
. I guess batch_name
would become standard and item_name
would only be a special case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get what you're saying, but if things technically run in "batch" mode all the time, then all related functions should return a list (even if that list only contains one item). For backwards compatibility, a single-item list could be simplified to its singular element, but IMO, that inconsistency complicates the API. 🤷♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright yeah you convinced me 😄
I think I still do prefer the interface described below in the comment about BatchingSchema
, where if batch_size
is missing or None
then we implicitly interpret that to be equivalent to disabling batching, so that we don't need to fill in both the enabled
field and the batch_size
field. thoughts on that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like it. Makes usage a little bit easier. 👍
|
||
{# TODO: Import this from some util module #} | ||
{# https://stackoverflow.com/a/312464 #} | ||
def grouped(l, n): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Love the use of the generator for this. Could you rename this function to something more obscure? Right now, unfortunately, we don't have a way to ensure that variables/functions inserted here do not have name collisions with those inserted in other places. For example, if there were a task named grouped
in some DAG, boundary-layer would create a local variable named grouped
that would collide with this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, will do. Thanks for calling this out!
@@ -35,9 +35,15 @@ class ReferenceSchema(OperatorSchema): | |||
target = fields.String(required=True) | |||
|
|||
|
|||
class BatchingSchema(StrictSchema): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor maybe, but you could do this with just a single optional batch_size
argument in the generator schema, i think? and if that argument is missing then you implicitly assume that enabled == False
. This would also simplify the part above where you say you think there would be a better way...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my comment in generator_operator.j2
for my logic here.
if not suffix_mode or suffix_mode == 'item_name': | ||
return base_name + '-<<item_name>>' | ||
name_var = 'batch_name' if execution_context.referrer.item.get('batching', {'enabled': False})['enabled'] else 'item_name' | ||
if not suffix_mode or suffix_mode == name_var: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yup, if I had read this comment first I would have type a lot less up there ^^ haha
@@ -0,0 +1,57 @@ | |||
from boundary_layer.schemas.dag import BatchingSchema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me know if there's a better place to put this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh ya I wouldn't bet on us having perfect fidelity with this, but we have tried to align the locations of files in test/
with the directory structure that is used in the main package. So could you please put this file into a directory test/schemas
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, finally finished tweaking this to my liking... I added lots of unit tests to test the new functionality, including tests to show that the templates generate the expected Python code. I also did a little manual testing to verify that way as well. Results can be seen here: https://gist.github.com/nickmoorman/4a55a69f07f5c68a09939014eaf4179a
I think this should be good to go now. Let me know your thoughts! =]
)): | ||
) | ||
|
||
{% if node.batching_enabled %} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so I did what you suggested, but I ended up adding a batching_enabled
helper property to make it easier to resolve the batching situation. Like you proposed, we can implicitly know that batching is enabled if batching settings exist (for now, that means there's a batch_size
), and we can use the optional disabled
property to turn batching off even when we have the batch size configured. However, the simple check if not node.batching.disabled
returns false positives here when the batching config does not exist at all, so the check needed to be more like if node.batching|length > 0 and not node.batching.disabled
. Since this is a bit cumbersome, I created the batching_enabled
property to avoid having to duplicate this logic in multiple places (3 distinct places in this PR need to perform this check).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ooh, I like this solution 👍
@@ -25,3 +32,186 @@ def test_default_param_filler(): | |||
'timeout_sec': 5, | |||
'headers': {} | |||
} | |||
|
|||
|
|||
# Tests for batching functionality |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably went a little overboard here, but I like to be thorough... 😬
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
omg this is awesome! <3 overboard tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @nickmoorman this looks great. I really like the solution you settled upon for determining whether batching is enabled (and sorry for leading you down a partial rabbit hole with it, I had not anticipated the false positive that you found).
I had a few minor comments: one regarding python3 compliance and one that I think will prevent config files from being parsed with the batch_name
setting for auto_task_id_mode
, which is pretty esoteric so I'm not at all surprised that no tests caught it.
Oh also I responded to your question about the location of the schemas test file ;)
If you wouldn't mind making these changes, we can get this merged asap.
Also, I am 😍 at all those tests!
Thanks again!
)): | ||
) | ||
|
||
{% if node.batching_enabled %} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ooh, I like this solution 👍
item_name = item_name_builder(index, item) | ||
return not any(re.match(i, item_name) for i in blocklist) | ||
|
||
filtered = filter(lambda (index, item): not_in_blocklist(index, item), enumerate(items)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unfortunately python 3 does not support the syntax in which function/lambda args are automatically expanded into tuples (though I will admit I have not carefully considered whether other parts of the code-generating templates generate python3 compliant code... we may have some work to do in that area).
Maybe it would be better to just map and filter using a comprehension like
return [ index for (index, item) in enumerate(items)
if not_in_blocklist(index, item) ]
?
class GeneratorSchema(ReferenceSchema): | ||
auto_task_id_mode = fields.String() | ||
regex_blocklist = fields.List(fields.String()) | ||
batching = fields.Nested(BatchingSchema) | ||
|
||
@validates_schema | ||
def check_task_id_mode(self, data): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I did not notice this in the original PR, but we'll have to add logic in this check_task_id_mode()
method to allow the auto_task_id_mode
value to be set to batch_name
, otherwise I think the config parser will reject this setting. Maybe the logic that you already have for checking these values in operator.py
could be moved here?
@@ -25,3 +32,186 @@ def test_default_param_filler(): | |||
'timeout_sec': 5, | |||
'headers': {} | |||
} | |||
|
|||
|
|||
# Tests for batching functionality |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
omg this is awesome! <3 overboard tests
@@ -0,0 +1,57 @@ | |||
from boundary_layer.schemas.dag import BatchingSchema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh ya I wouldn't bet on us having perfect fidelity with this, but we have tried to align the locations of files in test/
with the directory structure that is used in the main package. So could you please put this file into a directory test/schemas
?
Closing due to inactivity |
This pull request adds batching support to the generator framework (issue #13). To enable batching in a generator, users can simply add a block like this to their generator's configuration:
If this configuration exists, batching is assumed to be desired. If for some reason a user doesn't want to remove the batching configuration, but doesn't want to actually use it (for example, during development etc.),
disabled: true
can be added to fall back to the original non-batching generator flow.