Skip to content

Commit

Permalink
Support key iteration in Python ULR. (#29839)
Browse files Browse the repository at this point in the history
This fixes #20878.
  • Loading branch information
robertwb authored Jan 3, 2024
1 parent 6d534fd commit 36bdff4
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 11 deletions.
11 changes: 0 additions & 11 deletions runners/portability/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -200,17 +200,6 @@ def createUlrValidatesRunnerTask = { name, environmentType, dockerImageTask = ""
// https://github.com/apache/beam/issues/20374
excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$BasicTests.testHotKeyCombiningWithAccumulationMode'

// https://github.com/apache/beam/issues/20878
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$MultipleInputsAndOutputTests.testSideInputAnnotationWithMultipleSideInputs'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testMapAsEntrySetSideInput'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testWindowedMultimapAsEntrySetSideInput'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testWindowedMapAsEntrySetSideInput'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testMultimapAsEntrySetSideInput'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptyMapSideInput'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptyMapSideInputWithNonDeterministicKeyCoder'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptyMultimapSideInput'
excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptyMultimapSideInputWithNonDeterministicKeyCoder'

for (String test : sickbayTests) {
excludeTestsMatching test
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,9 @@ def commit_side_inputs_to_state(
window=window))
self.state_servicer.append_raw(state_key, elements_data)
elif func_spec.urn == common_urns.side_inputs.MULTIMAP.urn:
# TODO(robertwb): Consider computing these lazily on demand rather than
# anticipating all potentail state requests which will be more cpu and
# memory efficient for large side inputs.
for (key, window, elements_data,
elements_count) in elements_by_window.encoded_items():
state_key = beam_fn_api_pb2.StateKey(
Expand All @@ -998,6 +1001,14 @@ def commit_side_inputs_to_state(
key=key))
self.state_servicer.append_raw(state_key, elements_data)

key_iter_state_key = beam_fn_api_pb2.StateKey(
multimap_keys_side_input=beam_fn_api_pb2.StateKey.
MultimapKeysSideInput(
transform_id=consuming_transform_id,
side_input_id=tag,
window=window))
self.state_servicer.append_raw(key_iter_state_key, key)

kv_iter_state_key = beam_fn_api_pb2.StateKey(
multimap_keys_values_side_input=beam_fn_api_pb2.StateKey.
MultimapKeysValuesSideInput(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,7 @@ class StateServicer(beam_fn_api_pb2_grpc.BeamFnStateServicer,
_SUPPORTED_STATE_TYPES = frozenset([
'runner',
'multimap_side_input',
'multimap_keys_side_input',
'multimap_keys_values_side_input',
'iterable_side_input',
'bag_user_state',
Expand Down

0 comments on commit 36bdff4

Please sign in to comment.