Skip to content

Commit

Permalink
Merge branch 'master' into add-rag-ingestion-enrichment
Browse files Browse the repository at this point in the history
  • Loading branch information
claudevdm committed Dec 23, 2024
2 parents 358440d + 7c86bf3 commit e2274a8
Show file tree
Hide file tree
Showing 108 changed files with 2,671 additions and 514 deletions.
1 change: 1 addition & 0 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ github:

protected_branches:
master: {}
release-2.62.0: {}
release-2.61.0: {}
release-2.60.0: {}
release-2.59.0: {}
Expand Down
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 3
"modification": 6
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31798": "noting that PR #31798 should run this test",
"https://github.com/apache/beam/pull/32546": "noting that PR #32546 should run this test",
"https://github.com/apache/beam/pull/33267": "noting that PR #33267 should run this test",
"https://github.com/apache/beam/pull/33322": "noting that PR #33322 should run this test"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31798": "noting that PR #31798 should run this test",
"https://github.com/apache/beam/pull/32546": "noting that PR #32546 should run this test"
"https://github.com/apache/beam/pull/32546": "noting that PR #32546 should run this test",
"https://github.com/apache/beam/pull/33267": "noting that PR #33267 should run this test"
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31798": "noting that PR #31798 should run this test",
"https://github.com/apache/beam/pull/32546": "noting that PR #32546 should run this test",
"https://github.com/apache/beam/pull/33267": "noting that PR #33267 should run this test",
"https://github.com/apache/beam/pull/33322": "noting that PR #33322 should run this test"
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 3
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

name: Wordcount Python Cost Benchmarks Dataflow
name: Python Cost Benchmarks Dataflow

on:
schedule:
- cron: '30 18 * * 6' # Run at 6:30 pm UTC on Saturdays
workflow_dispatch:

#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event
Expand Down Expand Up @@ -47,16 +49,17 @@ env:
INFLUXDB_USER_PASSWORD: ${{ secrets.INFLUXDB_USER_PASSWORD }}

jobs:
beam_Inference_Python_Benchmarks_Dataflow:
beam_Python_Cost_Benchmarks_Dataflow:
if: |
github.event_name == 'workflow_dispatch'
github.event_name == 'workflow_dispatch' ||
(github.event_name == 'schedule' && github.repository == 'apache/beam')
runs-on: [self-hosted, ubuntu-20.04, main]
timeout-minutes: 900
name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
strategy:
matrix:
job_name: ["beam_Wordcount_Python_Cost_Benchmarks_Dataflow"]
job_phrase: ["Run Wordcount Cost Benchmark"]
job_name: ["beam_Python_CostBenchmark_Dataflow"]
job_phrase: ["Run Python Dataflow Cost Benchmarks"]
steps:
- uses: actions/checkout@v4
- name: Setup repository
Expand All @@ -76,10 +79,11 @@ jobs:
test-language: python
argument-file-paths: |
${{ github.workspace }}/.github/workflows/cost-benchmarks-pipeline-options/python_wordcount.txt
${{ github.workspace }}/.github/workflows/cost-benchmarks-pipeline-options/python_tf_mnist_classification.txt
# The env variables are created and populated in the test-arguments-action as "<github.job>_test_arguments_<argument_file_paths_index>"
- name: get current time
run: echo "NOW_UTC=$(date '+%m%d%H%M%S' --utc)" >> $GITHUB_ENV
- name: run wordcount on Dataflow Python
- name: Run wordcount on Dataflow
uses: ./.github/actions/gradle-command-self-hosted-action
timeout-minutes: 30
with:
Expand All @@ -88,4 +92,15 @@ jobs:
-PloadTest.mainClass=apache_beam.testing.benchmarks.wordcount.wordcount \
-Prunner=DataflowRunner \
-PpythonVersion=3.10 \
'-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_1 }} --job_name=benchmark-tests-wordcount-python-${{env.NOW_UTC}} --output=gs://temp-storage-for-end-to-end-tests/wordcount/result_wordcount-${{env.NOW_UTC}}.txt' \
'-PloadTest.args=${{ env.beam_Python_Cost_Benchmarks_Dataflow_test_arguments_1 }} --job_name=benchmark-tests-wordcount-python-${{env.NOW_UTC}} --output_file=gs://temp-storage-for-end-to-end-tests/wordcount/result_wordcount-${{env.NOW_UTC}}.txt' \
- name: Run Tensorflow MNIST Image Classification on Dataflow
uses: ./.github/actions/gradle-command-self-hosted-action
timeout-minutes: 30
with:
gradle-command: :sdks:python:apache_beam:testing:load_tests:run
arguments: |
-PloadTest.mainClass=apache_beam.testing.benchmarks.inference.tensorflow_mnist_classification_cost_benchmark \
-Prunner=DataflowRunner \
-PpythonVersion=3.10 \
-PloadTest.requirementsTxtFile=apache_beam/ml/inference/tensorflow_tests_requirements.txt \
'-PloadTest.args=${{ env.beam_Python_Cost_Benchmarks_Dataflow_test_arguments_2 }} --job_name=benchmark-tests-tf-mnist-classification-python-${{env.NOW_UTC}} --input_file=gs://apache-beam-ml/testing/inputs/it_mnist_data.csv --output_file=gs://temp-storage-for-end-to-end-tests/inference/result_tf_mnist-${{env.NOW_UTC}}.txt --model=gs://apache-beam-ml/models/tensorflow/mnist/' \
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

--region=us-central1
--machine_type=n1-standard-2
--num_workers=1
--disk_size_gb=50
--autoscaling_algorithm=NONE
--input_options={}
--staging_location=gs://temp-storage-for-perf-tests/loadtests
--temp_location=gs://temp-storage-for-perf-tests/loadtests
--requirements_file=apache_beam/ml/inference/tensorflow_tests_requirements.txt
--publish_to_big_query=true
--metrics_dataset=beam_run_inference
--metrics_table=tf_mnist_classification
--runner=DataflowRunner
41 changes: 31 additions & 10 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
## New Features / Improvements
* The datetime module is now available for use in jinja templatization for yaml.
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
## Breaking Changes
Expand All @@ -54,7 +53,7 @@
* ([#X](https://github.com/apache/beam/issues/X)).
-->

# [2.62.0] - Unreleased
# [2.63.0] - Unreleased

## Highlights

Expand All @@ -63,20 +62,14 @@

## I/Os

* gcs-connector config options can be set via GcsOptions (Java) ([#32769](https://github.com/apache/beam/pull/32769)).
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Upgraded the default version of Hadoop dependencies to 3.4.1. Hadoop 2.10.2 is still supported (Java) ([#33011](https://github.com/apache/beam/issues/33011)).

## New Features / Improvements

* Improved batch performance of SparkRunner's GroupByKey ([#20943](https://github.com/apache/beam/pull/20943)).
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Support OnWindowExpiration in Prism ([#32211](https://github.com/apache/beam/issues/32211)).
* This enables initial Java GroupIntoBatches support.

## Breaking Changes

* Upgraded ZetaSQL to 2024.11.1 ([#32902](https://github.com/apache/beam/pull/32902)). Java11+ is now needed if Beam's ZetaSQL component is used.
* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).

## Deprecations
Expand All @@ -86,16 +79,44 @@
## Bugfixes

* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Fixed EventTimeTimer ordering in Prism. ([#32222](https://github.com/apache/beam/issues/32222)).

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
* Fixed (CVE-2024-47561)[https://www.cve.org/CVERecord?id=CVE-2024-47561] (Java) by upgrading Avro version to 1.11.4

## Known Issues

* ([#X](https://github.com/apache/beam/issues/X)).

# [2.62.0] - Unreleased

## I/Os

* gcs-connector config options can be set via GcsOptions (Java) ([#32769](https://github.com/apache/beam/pull/32769)).
* [Managed Iceberg] Support partitioning by time (year, month, day, hour) for types `date`, `time`, `timestamp`, and `timestamp(tz)` ([#32939](https://github.com/apache/beam/pull/32939))
* Upgraded the default version of Hadoop dependencies to 3.4.1. Hadoop 2.10.2 is still supported (Java) ([#33011](https://github.com/apache/beam/issues/33011)).
* [BigQueryIO] Create managed BigLake tables dynamically ([#33125](https://github.com/apache/beam/pull/33125))

## New Features / Improvements

* Added support for stateful processing in Spark Runner for streaming pipelines. Timer functionality is not yet supported and will be implemented in a future release ([#33237](https://github.com/apache/beam/issues/33237)).
* The datetime module is now available for use in jinja templatization for yaml.
* Improved batch performance of SparkRunner's GroupByKey ([#20943](https://github.com/apache/beam/pull/20943)).
* Support OnWindowExpiration in Prism ([#32211](https://github.com/apache/beam/issues/32211)).
* This enables initial Java GroupIntoBatches support.
* Support OrderedListState in Prism ([#32929](https://github.com/apache/beam/issues/32929)).

## Breaking Changes

* Upgraded ZetaSQL to 2024.11.1 ([#32902](https://github.com/apache/beam/pull/32902)). Java11+ is now needed if Beam's ZetaSQL component is used.

## Bugfixes

* Fixed EventTimeTimer ordering in Prism. ([#32222](https://github.com/apache/beam/issues/32222)).

## Security Fixes

* Fixed (CVE-2024-47561)[https://www.cve.org/CVERecord?id=CVE-2024-47561] (Java) by upgrading Avro version to 1.11.4

# [2.61.0] - 2024-11-25

## Highlights
Expand Down
3 changes: 3 additions & 0 deletions contributor-docs/code-change-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,9 @@ Follow these steps for Maven projects.
<id>Maven-Snapshot</id>
<name>maven snapshot repository</name>
<url>https://repository.apache.org/content/groups/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
</repository>
```

Expand Down
52 changes: 25 additions & 27 deletions examples/multi-language/python/wordcount_external.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import logging

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.external import BeamJarExpansionService
from apache_beam.transforms.external_transform_provider import ExternalTransformProvider
from apache_beam.typehints.row_type import RowTypeConstraint
"""A Python multi-language pipeline that counts words using multiple Java SchemaTransforms.
Expand Down Expand Up @@ -60,39 +60,35 @@
--expansion_service_port <PORT>
"""

# Original Java transform is in ExtractWordsProvider.java
EXTRACT_IDENTIFIER = "beam:schematransform:org.apache.beam:extract_words:v1"
# Original Java transform is in JavaCountProvider.java
COUNT_IDENTIFIER = "beam:schematransform:org.apache.beam:count:v1"
# Original Java transform is in WriteWordsProvider.java
WRITE_IDENTIFIER = "beam:schematransform:org.apache.beam:write_words:v1"


def run(input_path, output_path, expansion_service_port, pipeline_args):
pipeline_options = PipelineOptions(pipeline_args)

# Discover and get external transforms from this expansion service
provider = ExternalTransformProvider("localhost:" + expansion_service_port)
# Get transforms with identifiers, then use them as you would a regular
# native PTransform
Extract = provider.get_urn(EXTRACT_IDENTIFIER)
Count = provider.get_urn(COUNT_IDENTIFIER)
Write = provider.get_urn(WRITE_IDENTIFIER)

with beam.Pipeline(options=pipeline_options) as p:
lines = p | 'Read' >> ReadFromText(input_path)

words = (lines
| 'Prepare Rows' >> beam.Map(lambda line: beam.Row(line=line))
| 'Extract Words' >> Extract())
word_counts = words | 'Count Words' >> Count()
formatted_words = (
word_counts
| 'Format Text' >> beam.Map(lambda row: beam.Row(line="%s: %s" % (
row.word, row.count))).with_output_types(
RowTypeConstraint.from_fields([('line', str)])))

formatted_words | 'Write' >> Write(file_path_prefix=output_path)
expansion_service = BeamJarExpansionService(
"examples:multi-language:shadowJar")
if expansion_service_port:
expansion_service = "localhost:" + expansion_service_port

provider = ExternalTransformProvider(expansion_service)
# Retrieve portable transforms
Extract = provider.get_urn(EXTRACT_IDENTIFIER)
Count = provider.get_urn(COUNT_IDENTIFIER)
Write = provider.get_urn(WRITE_IDENTIFIER)

_ = (p
| 'Read' >> beam.io.ReadFromText(input_path)
| 'Prepare Rows' >> beam.Map(lambda line: beam.Row(line=line))
| 'Extract Words' >> Extract(drop=["king", "palace"])
| 'Count Words' >> Count()
| 'Format Text' >> beam.Map(lambda row: beam.Row(line="%s: %s" % (
row.word, row.count))).with_output_types(
RowTypeConstraint.from_fields([('line', str)]))
| 'Write' >> Write(file_path_prefix=output_path))


if __name__ == '__main__':
Expand All @@ -110,8 +106,10 @@ def run(input_path, output_path, expansion_service_port, pipeline_args):
help='Output file')
parser.add_argument('--expansion_service_port',
dest='expansion_service_port',
required=True,
help='Expansion service port')
required=False,
help='Expansion service port. If left empty, the '
'existing multi-language examples service will '
'be used by default.')
known_args, pipeline_args = parser.parse_known_args()

run(known_args.input, known_args.output, known_args.expansion_service_port,
Expand Down
Loading

0 comments on commit e2274a8

Please sign in to comment.