From 9b09eda3f33029d063739927522d9dc77ea22818 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Mon, 30 Dec 2024 12:43:12 -0500 Subject: [PATCH] Update xlang kinesis to v2 (#33416) * [WIP] Update xlang kinesis to v2 * cleanup * Add missed file * Fix up * Fix up * Fix up * fix * fmt * Fix test * lint * Add serializer * Add serializer * Allow configuration to be serialized * Allow configuration to be serialized * Allow configuration to be serialized * Allow configuration to be serialized * debug info * debug info * debug info * debug info * debug info * debug info * Allow writebuilder to be serialized * Try skipping certs * Make sure it gets set for now * put behind flag * Doc + debug further * Merge in master * Debug info * Pass through param * Remove debug * Remove debug * override trust manager * checkstyle * Try disabling aggregation * easier debugging * Try upgrading localstack * change how containers are started * change how containers are started * force http1 * Add back all tests * Update changes wording * Better change description --- .../trigger_files/beam_PostCommit_Python.json | 2 +- CHANGES.md | 2 +- .../expansion-service/build.gradle | 39 +++ .../io/aws2/common/ClientBuilderFactory.java | 50 ++- .../io/aws2/common/ClientConfiguration.java | 14 + .../kinesis/KinesisTransformRegistrar.java | 322 ++++++++++++++++++ .../io/external/xlang_kinesisio_it_test.py | 23 +- sdks/python/apache_beam/io/kinesis.py | 22 +- .../python/test-suites/portable/common.gradle | 7 +- settings.gradle.kts | 1 + 10 files changed, 451 insertions(+), 31 deletions(-) create mode 100644 sdks/java/io/amazon-web-services2/expansion-service/build.gradle create mode 100644 sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index 9c7a70ceed74..dd3d3e011a0c 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 7 + "modification": 8 } diff --git a/CHANGES.md b/CHANGES.md index edca7a196127..d5cbb76fb3d5 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -70,7 +70,7 @@ ## Breaking Changes -* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)). +* AWS V1 I/Os have been removed (Java). As part of this, x-lang Python Kinesis I/O has been updated to consume the V2 IO and it also no longer supports setting producer_properties ([#33430](https://github.com/apache/beam/issues/33430)). ## Deprecations diff --git a/sdks/java/io/amazon-web-services2/expansion-service/build.gradle b/sdks/java/io/amazon-web-services2/expansion-service/build.gradle new file mode 100644 index 000000000000..fd712737f53c --- /dev/null +++ b/sdks/java/io/amazon-web-services2/expansion-service/build.gradle @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +apply plugin: 'org.apache.beam.module' +apply plugin: 'application' +mainClassName = "org.apache.beam.sdk.expansion.service.ExpansionService" + +applyJavaNature( + automaticModuleName: 'org.apache.beam.sdk.io.amazon-web-services2.expansion.service', + exportJavadoc: false, + validateShadowJar: false, + shadowClosure: {}, +) + +description = "Apache Beam :: SDKs :: Java :: IO :: Amazon Web Services 2 :: Expansion Service" +ext.summary = "Expansion service serving AWS2" + +dependencies { + implementation project(":sdks:java:expansion-service") + permitUnusedDeclared project(":sdks:java:expansion-service") + implementation project(":sdks:java:io:amazon-web-services2") + permitUnusedDeclared project(":sdks:java:io:amazon-web-services2") + runtimeOnly library.java.slf4j_jdk14 +} \ No newline at end of file diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java index 6398de57b5c3..8d8531ce5cdf 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java @@ -24,10 +24,14 @@ import java.io.Serializable; import java.net.URI; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; import java.time.Duration; import java.util.function.Consumer; import java.util.function.Function; import javax.annotation.Nullable; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; import org.apache.beam.sdk.io.aws2.options.AwsOptions; import org.apache.beam.sdk.util.InstanceBuilder; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; @@ -37,9 +41,12 @@ import software.amazon.awssdk.core.client.builder.SdkSyncClientBuilder; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.retry.RetryPolicy; +import software.amazon.awssdk.http.Protocol; +import software.amazon.awssdk.http.TlsTrustManagersProvider; import software.amazon.awssdk.http.apache.ApacheHttpClient; import software.amazon.awssdk.http.apache.ProxyConfiguration; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.internal.http.NoneTlsKeyManagersProvider; import software.amazon.awssdk.regions.Region; /** @@ -113,6 +120,32 @@ static , ClientT> ClientT b return ClientBuilderFactory.getFactory(options).create(builder, config, options).build(); } + /** Trust provider to skip certificate verification. Should only be used for test pipelines. */ + class SkipCertificateVerificationTrustManagerProvider implements TlsTrustManagersProvider { + public SkipCertificateVerificationTrustManagerProvider() {} + + @Override + public TrustManager[] trustManagers() { + TrustManager tm = + new X509TrustManager() { + @Override + public void checkClientTrusted(X509Certificate[] x509CertificateArr, String str) + throws CertificateException {} + + @Override + public void checkServerTrusted(X509Certificate[] x509CertificateArr, String str) + throws CertificateException {} + + @Override + public X509Certificate[] getAcceptedIssuers() { + return new X509Certificate[0]; + } + }; + TrustManager[] tms = {tm}; + return tms; + } + } + /** * Default implementation of {@link ClientBuilderFactory}. This implementation can configure both, * synchronous clients using {@link ApacheHttpClient} as well as asynchronous clients using {@link @@ -161,7 +194,11 @@ public , ClientT> BuilderT HttpClientConfiguration httpConfig = options.getHttpClientConfiguration(); ProxyConfiguration proxyConfig = options.getProxyConfiguration(); - if (proxyConfig != null || httpConfig != null) { + boolean skipCertificateVerification = false; + if (config.skipCertificateVerification() != null) { + skipCertificateVerification = config.skipCertificateVerification(); + } + if (proxyConfig != null || httpConfig != null || skipCertificateVerification) { if (builder instanceof SdkSyncClientBuilder) { ApacheHttpClient.Builder client = syncClientBuilder(); @@ -177,6 +214,11 @@ public , ClientT> BuilderT setOptional(httpConfig.maxConnections(), client::maxConnections); } + if (skipCertificateVerification) { + client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance()); + client.tlsTrustManagersProvider(new SkipCertificateVerificationTrustManagerProvider()); + } + // must use builder to make sure client is managed by the SDK ((SdkSyncClientBuilder) builder).httpClientBuilder(client); } else if (builder instanceof SdkAsyncClientBuilder) { @@ -201,6 +243,12 @@ public , ClientT> BuilderT setOptional(httpConfig.maxConnections(), client::maxConcurrency); } + if (skipCertificateVerification) { + client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance()); + client.tlsTrustManagersProvider(new SkipCertificateVerificationTrustManagerProvider()); + client.protocol(Protocol.HTTP1_1); + } + // must use builder to make sure client is managed by the SDK ((SdkAsyncClientBuilder) builder).httpClientBuilder(client); } diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientConfiguration.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientConfiguration.java index 08fb595bd037..385a25b5a13f 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientConfiguration.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientConfiguration.java @@ -76,6 +76,13 @@ public abstract class ClientConfiguration implements Serializable { return regionId() != null ? Region.of(regionId()) : null; } + /** + * Optional flag to skip certificate verification. Should only be overriden for test scenarios. If + * set, this overwrites the default in {@link AwsOptions#skipCertificateVerification()}. + */ + @JsonProperty + public abstract @Nullable @Pure Boolean skipCertificateVerification(); + /** * Optional service endpoint to use AWS compatible services instead, e.g. for testing. If set, * this overwrites the default in {@link AwsOptions#getEndpoint()}. @@ -156,6 +163,13 @@ public Builder retry(Consumer retry) { return retry(builder.build()); } + /** + * Optional flag to skip certificate verification. Should only be overriden for test scenarios. + * If set, this overwrites the default in {@link AwsOptions#skipCertificateVerification()}. + */ + @JsonProperty + public abstract Builder skipCertificateVerification(boolean skipCertificateVerification); + abstract Builder regionId(String region); abstract Builder credentialsProviderAsJson(String credentialsProvider); diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java new file mode 100644 index 000000000000..51d4202e4027 --- /dev/null +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.aws2.kinesis; + +import com.google.auto.service.AutoService; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Map; +import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; +import org.apache.beam.sdk.io.aws2.common.ClientConfiguration; +import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ExternalTransformBuilder; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.joda.time.Instant; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.kinesis.common.InitialPositionInStream; + +/** + * Exposes {@link org.apache.beam.sdk.io.aws2.kinesis.KinesisIO.Write} and {@link + * org.apache.beam.sdk.io.aws2.kinesis.KinesisIO.Read} as an external transform for cross-language + * usage. + */ +@AutoService(ExternalTransformRegistrar.class) +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class KinesisTransformRegistrar implements ExternalTransformRegistrar { + public static final String WRITE_URN = "beam:transform:org.apache.beam:kinesis_write:v2"; + public static final String READ_DATA_URN = "beam:transform:org.apache.beam:kinesis_read_data:v2"; + + @Override + public Map> knownBuilderInstances() { + return ImmutableMap.of(WRITE_URN, new WriteBuilder(), READ_DATA_URN, new ReadDataBuilder()); + } + + private abstract static class CrossLanguageConfiguration { + String streamName; + String awsAccessKey; + String awsSecretKey; + String region; + @Nullable String serviceEndpoint; + boolean verifyCertificate; + + public void setStreamName(String streamName) { + this.streamName = streamName; + } + + public void setAwsAccessKey(String awsAccessKey) { + this.awsAccessKey = awsAccessKey; + } + + public void setAwsSecretKey(String awsSecretKey) { + this.awsSecretKey = awsSecretKey; + } + + public void setRegion(String region) { + this.region = region; + } + + public void setServiceEndpoint(@Nullable String serviceEndpoint) { + this.serviceEndpoint = serviceEndpoint; + } + + public void setVerifyCertificate(@Nullable Boolean verifyCertificate) { + this.verifyCertificate = verifyCertificate == null || verifyCertificate; + } + } + + public static class WriteBuilder + implements ExternalTransformBuilder< + WriteBuilder.Configuration, PCollection, KinesisIO.Write.Result> { + + public static class Configuration extends CrossLanguageConfiguration { + private String partitionKey; + + public void setPartitionKey(String partitionKey) { + this.partitionKey = partitionKey; + } + } + + @Override + public PTransform, KinesisIO.Write.Result> buildExternal( + Configuration configuration) { + AwsBasicCredentials creds = + AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); + String pk = configuration.partitionKey; + StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); + SerializableFunction serializer = v -> v; + @Nullable URI endpoint = null; + if (configuration.serviceEndpoint != null) { + try { + endpoint = new URI(configuration.serviceEndpoint); + } catch (URISyntaxException ex) { + throw new RuntimeException( + String.format( + "Service endpoint must be URI format, got: %s", configuration.serviceEndpoint)); + } + } + KinesisIO.Write writeTransform = + KinesisIO.write() + .withStreamName(configuration.streamName) + .withClientConfiguration( + ClientConfiguration.builder() + .credentialsProvider(provider) + .region(Region.of(configuration.region)) + .endpoint(endpoint) + .skipCertificateVerification(!configuration.verifyCertificate) + .build()) + .withPartitioner(p -> pk) + .withRecordAggregationDisabled() + .withSerializer(serializer); + + return writeTransform; + } + } + + public static class ReadDataBuilder + implements ExternalTransformBuilder< + ReadDataBuilder.Configuration, PBegin, PCollection> { + + public static class Configuration extends CrossLanguageConfiguration { + private @Nullable Long maxNumRecords; + private @Nullable Duration maxReadTime; + private @Nullable InitialPositionInStream initialPositionInStream; + private @Nullable Instant initialTimestampInStream; + private @Nullable Integer requestRecordsLimit; + private @Nullable Duration upToDateThreshold; + private @Nullable Long maxCapacityPerShard; + private @Nullable WatermarkPolicy watermarkPolicy; + private @Nullable Duration watermarkIdleDurationThreshold; + private @Nullable Duration rateLimit; + + public void setMaxNumRecords(@Nullable Long maxNumRecords) { + this.maxNumRecords = maxNumRecords; + } + + public void setMaxReadTime(@Nullable Long maxReadTime) { + if (maxReadTime != null) { + this.maxReadTime = Duration.millis(maxReadTime); + } + } + + public void setInitialPositionInStream(@Nullable String initialPositionInStream) { + if (initialPositionInStream != null) { + this.initialPositionInStream = InitialPositionInStream.valueOf(initialPositionInStream); + } + } + + public void setInitialTimestampInStream(@Nullable Long initialTimestampInStream) { + if (initialTimestampInStream != null) { + this.initialTimestampInStream = Instant.ofEpochMilli(initialTimestampInStream); + } + } + + public void setRequestRecordsLimit(@Nullable Long requestRecordsLimit) { + if (requestRecordsLimit != null) { + this.requestRecordsLimit = requestRecordsLimit.intValue(); + } + } + + public void setUpToDateThreshold(@Nullable Long upToDateThreshold) { + if (upToDateThreshold != null) { + this.upToDateThreshold = Duration.millis(upToDateThreshold); + } + } + + public void setMaxCapacityPerShard(@Nullable Long maxCapacityPerShard) { + this.maxCapacityPerShard = maxCapacityPerShard; + } + + public void setWatermarkPolicy(@Nullable String watermarkPolicy) { + if (watermarkPolicy != null) { + this.watermarkPolicy = WatermarkPolicy.valueOf(watermarkPolicy); + } + } + + public void setWatermarkIdleDurationThreshold(@Nullable Long watermarkIdleDurationThreshold) { + if (watermarkIdleDurationThreshold != null) { + this.watermarkIdleDurationThreshold = Duration.millis(watermarkIdleDurationThreshold); + } + } + + public void setRateLimit(@Nullable Long rateLimit) { + if (rateLimit != null) { + this.rateLimit = Duration.millis(rateLimit); + } + } + } + + private enum WatermarkPolicy { + ARRIVAL_TIME, + PROCESSING_TIME + } + + @Override + public PTransform> buildExternal( + ReadDataBuilder.Configuration configuration) { + AwsBasicCredentials creds = + AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); + StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); + @Nullable URI endpoint = null; + if (configuration.serviceEndpoint != null) { + try { + endpoint = new URI(configuration.serviceEndpoint); + } catch (URISyntaxException ex) { + throw new RuntimeException( + String.format( + "Service endpoint must be URI format, got: %s", configuration.serviceEndpoint)); + } + } + KinesisIO.Read readTransform = + KinesisIO.read() + .withStreamName(configuration.streamName) + .withClientConfiguration( + ClientConfiguration.builder() + .credentialsProvider(provider) + .region(Region.of(configuration.region)) + .endpoint(endpoint) + .skipCertificateVerification(!configuration.verifyCertificate) + .build()); + + if (configuration.maxNumRecords != null) { + readTransform = readTransform.withMaxNumRecords(configuration.maxNumRecords); + } + if (configuration.upToDateThreshold != null) { + readTransform = readTransform.withUpToDateThreshold(configuration.upToDateThreshold); + } + if (configuration.maxCapacityPerShard != null) { + readTransform = + readTransform.withMaxCapacityPerShard(configuration.maxCapacityPerShard.intValue()); + } + if (configuration.watermarkPolicy != null) { + switch (configuration.watermarkPolicy) { + case ARRIVAL_TIME: + readTransform = + configuration.watermarkIdleDurationThreshold != null + ? readTransform.withArrivalTimeWatermarkPolicy( + configuration.watermarkIdleDurationThreshold) + : readTransform.withArrivalTimeWatermarkPolicy(); + break; + case PROCESSING_TIME: + readTransform = readTransform.withProcessingTimeWatermarkPolicy(); + break; + default: + throw new RuntimeException( + String.format( + "Unsupported watermark policy type: %s", configuration.watermarkPolicy)); + } + } + if (configuration.rateLimit != null) { + readTransform = readTransform.withFixedDelayRateLimitPolicy(configuration.rateLimit); + } + if (configuration.maxReadTime != null) { + readTransform = readTransform.withMaxReadTime(configuration.maxReadTime); + } + if (configuration.initialPositionInStream != null) { + readTransform = + readTransform.withInitialPositionInStream(configuration.initialPositionInStream); + } + if (configuration.requestRecordsLimit != null) { + readTransform = readTransform.withRequestRecordsLimit(configuration.requestRecordsLimit); + } + if (configuration.initialTimestampInStream != null) { + readTransform = + readTransform.withInitialTimestampInStream(configuration.initialTimestampInStream); + } + + return new KinesisReadToBytes(readTransform); + } + } + + public static class KinesisReadToBytes extends PTransform> { + private KinesisIO.Read readTransform; + + private KinesisReadToBytes(KinesisIO.Read readTransform) { + this.readTransform = readTransform; + } + + @Override + public PCollection expand(PBegin input) { + // Convert back to bytes to keep consistency with previous verison: + // https://github.com/apache/beam/blob/5eed396caf9e0065d8ed82edcc236bad5b71ba22/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java + return input + .apply(this.readTransform) + .apply( + "Convert to bytes", + ParDo.of( + new DoFn() { + @ProcessElement + public byte[] processElement(ProcessContext c) { + KinesisRecord record = c.element(); + return record.getDataAsBytes(); + } + })); + } + } +} diff --git a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py index 151d63d84684..c9181fb2a721 100644 --- a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py @@ -64,7 +64,7 @@ DockerContainer = None # pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports -LOCALSTACK_VERSION = '0.11.3' +LOCALSTACK_VERSION = '3.8.1' NUM_RECORDS = 10 MAX_READ_TIME = 5 * 60 * 1000 # 5min NOW_SECONDS = time.time() @@ -116,9 +116,7 @@ def run_kinesis_write(self): region=self.aws_region, service_endpoint=self.aws_service_endpoint, verify_certificate=(not self.use_localstack), - partition_key='1', - producer_properties=self.producer_properties, - )) + partition_key='1')) def run_kinesis_read(self): records = [RECORD + str(i).encode() for i in range(NUM_RECORDS)] @@ -145,12 +143,11 @@ def run_kinesis_read(self): def set_localstack(self): self.localstack = DockerContainer('localstack/localstack:{}' - .format(LOCALSTACK_VERSION))\ - .with_env('SERVICES', 'kinesis')\ - .with_env('KINESIS_PORT', '4568')\ - .with_env('USE_SSL', 'true')\ - .with_exposed_ports(4568)\ - .with_volume_mapping('/var/run/docker.sock', '/var/run/docker.sock', 'rw') + .format(LOCALSTACK_VERSION))\ + .with_bind_ports(4566, 4566) + + for i in range(4510, 4560): + self.localstack = self.localstack.with_bind_ports(i, i) # Repeat if ReadTimeout is raised. for i in range(4): @@ -164,7 +161,7 @@ def set_localstack(self): self.aws_service_endpoint = 'https://{}:{}'.format( self.localstack.get_container_host_ip(), - self.localstack.get_exposed_port('4568'), + self.localstack.get_exposed_port('4566'), ) def setUp(self): @@ -219,10 +216,6 @@ def setUp(self): self.aws_service_endpoint = known_args.aws_service_endpoint self.use_localstack = not known_args.use_real_aws self.expansion_service = known_args.expansion_service - self.producer_properties = { - 'CollectionMaxCount': str(NUM_RECORDS), - 'ConnectTimeout': str(MAX_READ_TIME), - } if self.use_localstack: self.set_localstack() diff --git a/sdks/python/apache_beam/io/kinesis.py b/sdks/python/apache_beam/io/kinesis.py index bc5e1fa787b4..ce0bb2623a38 100644 --- a/sdks/python/apache_beam/io/kinesis.py +++ b/sdks/python/apache_beam/io/kinesis.py @@ -49,7 +49,8 @@ In this option, Python SDK will either download (for released Beam version) or build (when running from a Beam Git clone) a expansion service jar and use that to expand transforms. Currently Kinesis transforms use the - 'beam-sdks-java-io-kinesis-expansion-service' jar for this purpose. + 'beam-sdks-java-io-amazon-web-services2-expansion-service' jar for this + purpose. *Option 2: specify a custom expansion service* @@ -81,7 +82,6 @@ import logging import time -from typing import Mapping from typing import NamedTuple from typing import Optional @@ -99,7 +99,7 @@ def default_io_expansion_service(): return BeamJarExpansionService( - 'sdks:java:io:kinesis:expansion-service:shadowJar') + 'sdks:java:io:amazon-web-services2:expansion-service:shadowJar') WriteToKinesisSchema = NamedTuple( @@ -112,7 +112,6 @@ def default_io_expansion_service(): ('partition_key', str), ('service_endpoint', Optional[str]), ('verify_certificate', Optional[bool]), - ('producer_properties', Optional[Mapping[str, str]]), ], ) @@ -123,7 +122,7 @@ class WriteToKinesis(ExternalTransform): Experimental; no backwards compatibility guarantees. """ - URN = 'beam:transform:org.apache.beam:kinesis_write:v1' + URN = 'beam:transform:org.apache.beam:kinesis_write:v2' def __init__( self, @@ -148,11 +147,15 @@ def __init__( :param verify_certificate: Enable or disable certificate verification. Never set to False on production. True by default. :param partition_key: Specify default partition key. - :param producer_properties: Specify the configuration properties for Kinesis - Producer Library (KPL) as dictionary. - Example: {'CollectionMaxCount': '1000', 'ConnectTimeout': '10000'} + :param producer_properties: (Deprecated) This option no longer is available + since the AWS IOs upgraded to v2. Trying to set it will lead to an + error. For more info, see https://github.com/apache/beam/issues/33430. :param expansion_service: The address (host:port) of the ExpansionService. """ + if producer_properties is not None: + raise ValueError( + 'producer_properties is no longer supported and will be removed ' + + 'in a future release.') super().__init__( self.URN, NamedTupleBasedPayloadBuilder( @@ -164,7 +167,6 @@ def __init__( partition_key=partition_key, service_endpoint=service_endpoint, verify_certificate=verify_certificate, - producer_properties=producer_properties, )), expansion_service or default_io_expansion_service(), ) @@ -199,7 +201,7 @@ class ReadDataFromKinesis(ExternalTransform): Experimental; no backwards compatibility guarantees. """ - URN = 'beam:transform:org.apache.beam:kinesis_read_data:v1' + URN = 'beam:transform:org.apache.beam:kinesis_read_data:v2' def __init__( self, diff --git a/sdks/python/test-suites/portable/common.gradle b/sdks/python/test-suites/portable/common.gradle index be87be749862..2d216a01f320 100644 --- a/sdks/python/test-suites/portable/common.gradle +++ b/sdks/python/test-suites/portable/common.gradle @@ -265,7 +265,8 @@ project.tasks.register("postCommitPy${pythonVersionSuffix}") { ':runners:spark:3:job-server:shadowJar', 'portableLocalRunnerJuliaSetWithSetupPy', 'portableWordCountSparkRunnerBatch', - 'portableLocalRunnerTestWithRequirementsFile'] + 'portableLocalRunnerTestWithRequirementsFile' + ] } project.tasks.register("flinkExamples") { @@ -376,7 +377,7 @@ project.tasks.register("postCommitPy${pythonVersionSuffix}IT") { ':sdks:java:testing:kafka-service:buildTestKafkaServiceJar', ':sdks:java:io:expansion-service:shadowJar', ':sdks:java:io:google-cloud-platform:expansion-service:shadowJar', - ':sdks:java:io:kinesis:expansion-service:shadowJar', + ':sdks:java:io:amazon-web-services2:expansion-service:shadowJar', ':sdks:java:extensions:schemaio-expansion-service:shadowJar', ':sdks:java:io:debezium:expansion-service:shadowJar' ] @@ -426,7 +427,7 @@ project.tasks.register("xlangSpannerIOIT") { ":sdks:java:container:${currentJavaVersion}:docker", ':sdks:java:io:expansion-service:shadowJar', ':sdks:java:io:google-cloud-platform:expansion-service:shadowJar', - ':sdks:java:io:kinesis:expansion-service:shadowJar', + ':sdks:java:io:amazon-web-services2:expansion-service:shadowJar', ':sdks:java:extensions:schemaio-expansion-service:shadowJar', ':sdks:java:io:debezium:expansion-service:shadowJar' ] diff --git a/settings.gradle.kts b/settings.gradle.kts index a8bee45a05ac..624e9f970d9d 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -206,6 +206,7 @@ include(":sdks:java:harness") include(":sdks:java:harness:jmh") include(":sdks:java:io:amazon-web-services") include(":sdks:java:io:amazon-web-services2") +include(":sdks:java:io:amazon-web-services2:expansion-service") include(":sdks:java:io:amqp") include(":sdks:java:io:azure") include(":sdks:java:io:azure-cosmos")