From 6bd6942eea541d45ea42c7bd462ab55909c4e28e Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Fri, 24 Nov 2023 12:35:21 +0100 Subject: [PATCH] [FLINK-33559] Externalize Kafka Python connector code --- .github/workflows/push_pr.yml | 8 + .gitignore | 17 +- flink-python/MANIFEST.in | 21 + flink-python/README.txt | 14 + flink-python/dev/integration_test.sh | 54 + flink-python/pom.xml | 209 +++ .../pyflink/datastream/connectors/kafka.py | 1163 +++++++++++++++++ .../datastream/connectors/tests/test_kafka.py | 669 ++++++++++ .../pyflink/pyflink_gateway_server.py | 291 +++++ flink-python/setup.py | 158 +++ flink-python/tox.ini | 51 + pom.xml | 1 + 12 files changed, 2655 insertions(+), 1 deletion(-) create mode 100644 flink-python/MANIFEST.in create mode 100644 flink-python/README.txt create mode 100755 flink-python/dev/integration_test.sh create mode 100644 flink-python/pom.xml create mode 100644 flink-python/pyflink/datastream/connectors/kafka.py create mode 100644 flink-python/pyflink/datastream/connectors/tests/test_kafka.py create mode 100644 flink-python/pyflink/pyflink_gateway_server.py create mode 100644 flink-python/setup.py create mode 100644 flink-python/tox.ini diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml index ddc50ab8d..71d3b6f55 100644 --- a/.github/workflows/push_pr.yml +++ b/.github/workflows/push_pr.yml @@ -29,3 +29,11 @@ jobs: uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils with: flink_version: ${{ matrix.flink }} + + python_test: + strategy: + matrix: + flink: [ 1.17.1, 1.18.0 ] + uses: pvary/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils + with: + flink_version: ${{ matrix.flink }} diff --git a/.gitignore b/.gitignore index 5f0068cda..586f64c74 100644 --- a/.gitignore +++ b/.gitignore @@ -35,4 +35,19 @@ out/ tools/flink tools/flink-* tools/releasing/release -tools/japicmp-output \ No newline at end of file +tools/japicmp-output + +# Generated file, do not store in git +flink-python/pyflink/datastream/connectors/kafka_connector_version.py +flink-python/apache_flink_connectors_kafka.egg-info/ +flink-python/.tox/ +flink-python/build +flink-python/dist +flink-python/dev/download +flink-python/dev/.conda/ +flink-python/dev/log/ +flink-python/dev/.stage.txt +flink-python/dev/install_command.sh +flink-python/dev/lint-python.sh +flink-python/dev/build-wheels.sh +flink-python/dev/dev-requirements.txt diff --git a/flink-python/MANIFEST.in b/flink-python/MANIFEST.in new file mode 100644 index 000000000..3578d2dfc --- /dev/null +++ b/flink-python/MANIFEST.in @@ -0,0 +1,21 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +graft pyflink +global-exclude *.py[cod] __pycache__ .DS_Store + diff --git a/flink-python/README.txt b/flink-python/README.txt new file mode 100644 index 000000000..a12c13e5e --- /dev/null +++ b/flink-python/README.txt @@ -0,0 +1,14 @@ +This is official Apache Flink Kafka Python connector. + +For the latest information about Flink connector, please visit our website at: + + https://flink.apache.org + +and our GitHub Account for Kafka connector + + https://github.com/apache/flink-connector-kafka + +If you have any questions, ask on our Mailing lists: + + user@flink.apache.org + dev@flink.apache.org diff --git a/flink-python/dev/integration_test.sh b/flink-python/dev/integration_test.sh new file mode 100755 index 000000000..436c43bc2 --- /dev/null +++ b/flink-python/dev/integration_test.sh @@ -0,0 +1,54 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +function test_module() { + module="$FLINK_PYTHON_DIR/pyflink/$1" + echo "test module $module" + pytest --durations=20 ${module} $2 + if [[ $? -ne 0 ]]; then + echo "test module $module failed" + exit 1 + fi +} + +function test_all_modules() { + # test datastream module + test_module "datastream" +} + +# CURRENT_DIR is "flink-connector-kafka/flink-python/dev/" +CURRENT_DIR="$(cd "$( dirname "$0" )" && pwd)" + +# FLINK_PYTHON_DIR is "flink-connector-kafka/flink-python" +FLINK_PYTHON_DIR=$(dirname "$CURRENT_DIR") + +# FLINK_SOURCE_DIR is "flink-connector-kafka" +FLINK_SOURCE_DIR=$(dirname "$FLINK_PYTHON_DIR") + +# set the FLINK_TEST_LIB_DIR to "flink-connector-kafka" +export FLINK_TEST_LIB_DIR="$FLINK_SOURCE_DIR" + +# Temporarily update the installed 'pyflink_gateway_server.py' files with the new one +# Needed only until Flink 1.19 release +echo "Checking ${FLINK_TEST_LIB_DIR} for 'pyflink_gateway_server.py'" +find "${FLINK_TEST_LIB_DIR}/flink-python" -name pyflink_gateway_server.py +find "${FLINK_TEST_LIB_DIR}/flink-python/.tox" -name pyflink_gateway_server.py -exec cp "${FLINK_TEST_LIB_DIR}/flink-python/pyflink/pyflink_gateway_server.py" {} \; + +# python test +test_all_modules diff --git a/flink-python/pom.xml b/flink-python/pom.xml new file mode 100644 index 000000000..73981d231 --- /dev/null +++ b/flink-python/pom.xml @@ -0,0 +1,209 @@ + + + + + 4.0.0 + + + org.apache.flink + flink-connector-kafka-parent + 3.1-SNAPSHOT + + + flink-connector-kafka-python + Flink : Connectors : SQL : Kafka : Python + + jar + + + + + + + org.apache.flink + flink-sql-connector-kafka + ${project.version} + + + org.apache.flink + flink-runtime + ${flink.version} + + + org.apache.flink + flink-streaming-java + ${flink.version} + + + org.apache.flink + flink-connector-test-utils + ${flink.version} + + + org.apache.flink + flink-sql-avro + ${flink.version} + + + + + + + org.apache.maven.plugins + maven-antrun-plugin + + + clean + clean + + run + + + + + + + + + + + + + + + + + + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + package + + copy + + + + + org.apache.flink + flink-sql-connector-kafka + + + org.apache.flink + flink-runtime + + tests + + + org.apache.flink + flink-streaming-java + + tests + + + org.apache.flink + flink-connector-test-utils + + + org.apache.flink + flink-test-utils + + + org.apache.flink + flink-sql-avro + + + ${project.build.directory}/test-dependencies + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-dependencies + package + + copy-dependencies + + + junit + ${project.build.directory}/test-dependencies + + + + + + + org.codehaus.mojo + wagon-maven-plugin + 2.0.2 + + + download-install + validate + + download-single + + + https://raw.githubusercontent.com/apache/flink/release-${flink.version}/flink-python/dev/install_command.sh + ${project.basedir}/dev + ${python.infra.download.skip} + + + + download-lint + validate + + download-single + + + https://raw.githubusercontent.com/apache/flink/release-${flink.version}/flink-python/dev/lint-python.sh + ${project.basedir}/dev + ${python.infra.download.skip} + + + + download-build-wheels + validate + + download-single + + + https://raw.githubusercontent.com/apache/flink/release-${flink.version}/flink-python/dev/build-wheels.sh + ${project.basedir}/dev + ${python.infra.download.skip} + + + + + + + diff --git a/flink-python/pyflink/datastream/connectors/kafka.py b/flink-python/pyflink/datastream/connectors/kafka.py new file mode 100644 index 000000000..0e0a12893 --- /dev/null +++ b/flink-python/pyflink/datastream/connectors/kafka.py @@ -0,0 +1,1163 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import warnings +from abc import ABC, abstractmethod +from enum import Enum +from typing import Dict, Union, List, Set, Callable, Any, Optional + +from py4j.java_gateway import JavaObject, get_java_class + +from pyflink.common import DeserializationSchema, TypeInformation, typeinfo, SerializationSchema, \ + Types, Row +from pyflink.datastream.connectors import Source, Sink +from pyflink.datastream.connectors.base import DeliveryGuarantee, SupportsPreprocessing, \ + StreamTransformer +from pyflink.datastream.functions import SinkFunction, SourceFunction +from pyflink.java_gateway import get_gateway +from pyflink.util.java_utils import to_jarray, get_field, get_field_value + +__all__ = [ + 'FlinkKafkaConsumer', + 'FlinkKafkaProducer', + 'KafkaSource', + 'KafkaSourceBuilder', + 'KafkaSink', + 'KafkaSinkBuilder', + 'Semantic', + 'KafkaTopicPartition', + 'KafkaOffsetsInitializer', + 'KafkaOffsetResetStrategy', + 'KafkaRecordSerializationSchema', + 'KafkaRecordSerializationSchemaBuilder', + 'KafkaTopicSelector' +] + + +# ---- FlinkKafkaConsumer ---- + +class FlinkKafkaConsumerBase(SourceFunction, ABC): + """ + Base class of all Flink Kafka Consumer data sources. This implements the common behavior across + all kafka versions. + + The Kafka version specific behavior is defined mainly in the specific subclasses. + """ + + def __init__(self, j_flink_kafka_consumer): + super(FlinkKafkaConsumerBase, self).__init__(source_func=j_flink_kafka_consumer) + + def set_commit_offsets_on_checkpoints(self, + commit_on_checkpoints: bool) -> 'FlinkKafkaConsumerBase': + """ + Specifies whether or not the consumer should commit offsets back to kafka on checkpoints. + This setting will only have effect if checkpointing is enabled for the job. If checkpointing + isn't enabled, only the "auto.commit.enable" (for 0.8) / "enable.auto.commit" (for 0.9+) + property settings will be used. + """ + self._j_function = self._j_function \ + .setCommitOffsetsOnCheckpoints(commit_on_checkpoints) + return self + + def set_start_from_earliest(self) -> 'FlinkKafkaConsumerBase': + """ + Specifies the consumer to start reading from the earliest offset for all partitions. This + lets the consumer ignore any committed group offsets in Zookeeper/ Kafka brokers. + + This method does not affect where partitions are read from when the consumer is restored + from a checkpoint or savepoint. When the consumer is restored from a checkpoint or + savepoint, only the offsets in the restored state will be used. + """ + self._j_function = self._j_function.setStartFromEarliest() + return self + + def set_start_from_latest(self) -> 'FlinkKafkaConsumerBase': + """ + Specifies the consuer to start reading from the latest offset for all partitions. This lets + the consumer ignore any committed group offsets in Zookeeper / Kafka brokers. + + This method does not affect where partitions are read from when the consumer is restored + from a checkpoint or savepoint. When the consumer is restored from a checkpoint or + savepoint, only the offsets in the restored state will be used. + """ + self._j_function = self._j_function.setStartFromLatest() + return self + + def set_start_from_timestamp(self, startup_offsets_timestamp: int) -> 'FlinkKafkaConsumerBase': + """ + Specifies the consumer to start reading partitions from a specified timestamp. The specified + timestamp must be before the current timestamp. This lets the consumer ignore any committed + group offsets in Zookeeper / Kafka brokers. + + The consumer will look up the earliest offset whose timestamp is greater than or equal to + the specific timestamp from Kafka. If there's no such offset, the consumer will use the + latest offset to read data from Kafka. + + This method does not affect where partitions are read from when the consumer is restored + from a checkpoint or savepoint. When the consumer is restored from a checkpoint or + savepoint, only the offsets in the restored state will be used. + + :param startup_offsets_timestamp: timestamp for the startup offsets, as milliseconds for + epoch. + """ + self._j_function = self._j_function.setStartFromTimestamp( + startup_offsets_timestamp) + return self + + def set_start_from_group_offsets(self) -> 'FlinkKafkaConsumerBase': + """ + Specifies the consumer to start reading from any committed group offsets found in Zookeeper/ + Kafka brokers. The 'group.id' property must be set in the configuration properties. If no + offset can be found for a partition, the behaviour in 'auto.offset.reset' set in the + configuration properties will be used for the partition. + + This method does not affect where partitions are read from when the consumer is restored + from a checkpoint or savepoint. When the consumer is restored from a checkpoint or + savepoint, only the offsets in the restored state will be used. + """ + self._j_function = self._j_function.setStartFromGroupOffsets() + return self + + def disable_filter_restored_partitions_with_subscribed_topics(self) -> 'FlinkKafkaConsumerBase': + """ + By default, when restoring from a checkpoint / savepoint, the consumer always ignores + restored partitions that are no longer associated with the current specified topics or topic + pattern to subscribe to. + + This method does not affect where partitions are read from when the consumer is restored + from a checkpoint or savepoint. When the consumer is restored from a checkpoint or + savepoint, only the offsets in the restored state will be used. + """ + self._j_function = self._j_function \ + .disableFilterRestoredPartitionsWithSubscribedTopics() + return self + + def get_produced_type(self) -> TypeInformation: + return typeinfo._from_java_type(self._j_function.getProducedType()) + + +def _get_kafka_consumer(topics, properties, deserialization_schema, j_consumer_clz): + if not isinstance(topics, list): + topics = [topics] + gateway = get_gateway() + j_properties = gateway.jvm.java.util.Properties() + for key, value in properties.items(): + j_properties.setProperty(key, value) + + j_flink_kafka_consumer = j_consumer_clz(topics, + deserialization_schema._j_deserialization_schema, + j_properties) + return j_flink_kafka_consumer + + +class FlinkKafkaConsumer(FlinkKafkaConsumerBase): + """ + The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from + Apache Kafka. The consumer can run in multiple parallel instances, each of which will + pull data from one or more Kafka partitions. + + The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost + during a failure, and that the computation processes elements 'exactly once. (These guarantees + naturally assume that Kafka itself does not lose any data.) + + Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. + The offsets committed to Kafka / Zookeeper are only to bring the outside view of progress in + sync with Flink's view of the progress. That way, monitoring and other jobs can get a view of + how far the Flink Kafka consumer has consumed a topic. + + Please refer to Kafka's documentation for the available configuration properties: + http://kafka.apache.org/documentation.html#newconsumerconfigs + """ + + def __init__(self, topics: Union[str, List[str]], deserialization_schema: DeserializationSchema, + properties: Dict): + """ + Creates a new Kafka streaming source consumer for Kafka 0.10.x. + + This constructor allows passing multiple topics to the consumer. + + :param topics: The Kafka topics to read from. + :param deserialization_schema: The de-/serializer used to convert between Kafka's byte + messages and Flink's objects. + :param properties: The properties that are used to configure both the fetcher and the offset + handler. + """ + + warnings.warn("Deprecated in 1.16. Use KafkaSource instead.", DeprecationWarning) + JFlinkKafkaConsumer = get_gateway().jvm \ + .org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer + j_flink_kafka_consumer = _get_kafka_consumer(topics, properties, deserialization_schema, + JFlinkKafkaConsumer) + super(FlinkKafkaConsumer, self).__init__(j_flink_kafka_consumer=j_flink_kafka_consumer) + + +# ---- FlinkKafkaProducer ---- + + +class Semantic(Enum): + """ + Semantics that can be chosen. + + :data: `EXACTLY_ONCE`: + + The Flink producer will write all messages in a Kafka transaction that will be committed to + the Kafka on a checkpoint. In this mode FlinkKafkaProducer sets up a pool of + FlinkKafkaProducer. Between each checkpoint there is created new Kafka transaction, which is + being committed on FlinkKafkaProducer.notifyCheckpointComplete(long). If checkpoint + complete notifications are running late, FlinkKafkaProducer can run out of + FlinkKafkaProducers in the pool. In that case any subsequent FlinkKafkaProducer.snapshot- + State() requests will fail and the FlinkKafkaProducer will keep using the + FlinkKafkaProducer from previous checkpoint. To decrease chances of failing checkpoints + there are four options: + + 1. decrease number of max concurrent checkpoints + 2. make checkpoints mre reliable (so that they complete faster) + 3. increase delay between checkpoints + 4. increase size of FlinkKafkaProducers pool + + :data: `AT_LEAST_ONCE`: + + The Flink producer will wait for all outstanding messages in the Kafka buffers to be + acknowledged by the Kafka producer on a checkpoint. + + :data: `NONE`: + + Means that nothing will be guaranteed. Messages can be lost and/or duplicated in case of + failure. + + """ + + EXACTLY_ONCE = 0, + AT_LEAST_ONCE = 1, + NONE = 2 + + def _to_j_semantic(self): + JSemantic = get_gateway().jvm \ + .org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic + return getattr(JSemantic, self.name) + + +class FlinkKafkaProducerBase(SinkFunction, ABC): + """ + Flink Sink to produce data into a Kafka topic. + + Please note that this producer provides at-least-once reliability guarantees when checkpoints + are enabled and set_flush_on_checkpoint(True) is set. Otherwise, the producer doesn;t provid any + reliability guarantees. + """ + + def __init__(self, j_flink_kafka_producer): + super(FlinkKafkaProducerBase, self).__init__(sink_func=j_flink_kafka_producer) + + def set_log_failures_only(self, log_failures_only: bool) -> 'FlinkKafkaProducerBase': + """ + Defines whether the producer should fail on errors, or only log them. If this is set to + true, then exceptions will be only logged, if set to false, exceptions will be eventually + thrown and cause the streaming program to fail (and enter recovery). + + :param log_failures_only: The flag to indicate logging-only on exceptions. + """ + self._j_function.setLogFailuresOnly(log_failures_only) + return self + + def set_flush_on_checkpoint(self, flush_on_checkpoint: bool) -> 'FlinkKafkaProducerBase': + """ + If set to true, the Flink producer will wait for all outstanding messages in the Kafka + buffers to be acknowledged by the Kafka producer on a checkpoint. + + This way, the producer can guarantee that messages in the Kafka buffers are part of the + checkpoint. + + :param flush_on_checkpoint: Flag indicating the flush mode (true = flush on checkpoint) + """ + self._j_function.setFlushOnCheckpoint(flush_on_checkpoint) + return self + + def set_write_timestamp_to_kafka(self, + write_timestamp_to_kafka: bool) -> 'FlinkKafkaProducerBase': + """ + If set to true, Flink will write the (event time) timestamp attached to each record into + Kafka. Timestamps must be positive for Kafka to accept them. + + :param write_timestamp_to_kafka: Flag indicating if Flink's internal timestamps are written + to Kafka. + """ + self._j_function.setWriteTimestampToKafka(write_timestamp_to_kafka) + return self + + +class FlinkKafkaProducer(FlinkKafkaProducerBase): + """ + Flink Sink to produce data into a Kafka topic. By + default producer will use AT_LEAST_ONCE semantic. Before using EXACTLY_ONCE please refer to + Flink's Kafka connector documentation. + """ + + def __init__(self, topic: str, serialization_schema: SerializationSchema, + producer_config: Dict, kafka_producer_pool_size: int = 5, + semantic=Semantic.AT_LEAST_ONCE): + """ + Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to the topic. + + Using this constructor, the default FlinkFixedPartitioner will be used as the partitioner. + This default partitioner maps each sink subtask to a single Kafka partition (i.e. all + records received by a sink subtask will end up in the same Kafka partition). + + :param topic: ID of the Kafka topic. + :param serialization_schema: User defined key-less serialization schema. + :param producer_config: Properties with the producer configuration. + """ + gateway = get_gateway() + j_properties = gateway.jvm.java.util.Properties() + for key, value in producer_config.items(): + j_properties.setProperty(key, value) + + JFlinkKafkaProducer = gateway.jvm \ + .org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer + + j_flink_kafka_producer = JFlinkKafkaProducer( + topic, serialization_schema._j_serialization_schema, j_properties, None, + semantic._to_j_semantic(), kafka_producer_pool_size) + super(FlinkKafkaProducer, self).__init__(j_flink_kafka_producer=j_flink_kafka_producer) + + def ignore_failures_after_transaction_timeout(self) -> 'FlinkKafkaProducer': + """ + Disables the propagation of exceptions thrown when committing presumably timed out Kafka + transactions during recovery of the job. If a Kafka transaction is timed out, a commit will + never be successful. Hence, use this feature to avoid recovery loops of the Job. Exceptions + will still be logged to inform the user that data loss might have occurred. + + Note that we use the System.currentTimeMillis() to track the age of a transaction. Moreover, + only exceptions thrown during the recovery are caught, i.e., the producer will attempt at + least one commit of the transaction before giving up. + + :return: This FlinkKafkaProducer. + """ + self._j_function.ignoreFailuresAfterTransactionTimeout() + return self + + +# ---- KafkaSource ---- + + +class KafkaSource(Source): + """ + The Source implementation of Kafka. Please use a :class:`KafkaSourceBuilder` to construct a + :class:`KafkaSource`. The following example shows how to create a KafkaSource emitting records + of String type. + + :: + + >>> source = KafkaSource \\ + ... .builder() \\ + ... .set_bootstrap_servers('MY_BOOTSTRAP_SERVERS') \\ + ... .set_group_id('MY_GROUP') \\ + ... .set_topics('TOPIC1', 'TOPIC2') \\ + ... .set_value_only_deserializer(SimpleStringSchema()) \\ + ... .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \\ + ... .build() + + .. versionadded:: 1.16.0 + """ + + def __init__(self, j_kafka_source: JavaObject): + super().__init__(j_kafka_source) + + @staticmethod + def builder() -> 'KafkaSourceBuilder': + """ + Get a kafkaSourceBuilder to build a :class:`KafkaSource`. + + :return: a Kafka source builder. + """ + return KafkaSourceBuilder() + + +class KafkaSourceBuilder(object): + """ + The builder class for :class:`KafkaSource` to make it easier for the users to construct a + :class:`KafkaSource`. + + The following example shows the minimum setup to create a KafkaSource that reads the String + values from a Kafka topic. + + :: + + >>> source = KafkaSource.builder() \\ + ... .set_bootstrap_servers('MY_BOOTSTRAP_SERVERS') \\ + ... .set_topics('TOPIC1', 'TOPIC2') \\ + ... .set_value_only_deserializer(SimpleStringSchema()) \\ + ... .build() + + The bootstrap servers, topics/partitions to consume, and the record deserializer are required + fields that must be set. + + To specify the starting offsets of the KafkaSource, one can call :meth:`set_starting_offsets`. + + By default, the KafkaSource runs in an CONTINUOUS_UNBOUNDED mode and never stops until the Flink + job is canceled or fails. To let the KafkaSource run in CONTINUOUS_UNBOUNDED but stops at some + given offsets, one can call :meth:`set_stopping_offsets`. For example the following KafkaSource + stops after it consumes up to the latest partition offsets at the point when the Flink started. + + :: + + >>> source = KafkaSource.builder() \\ + ... .set_bootstrap_servers('MY_BOOTSTRAP_SERVERS') \\ + ... .set_topics('TOPIC1', 'TOPIC2') \\ + ... .set_value_only_deserializer(SimpleStringSchema()) \\ + ... .set_unbounded(KafkaOffsetsInitializer.latest()) \\ + ... .build() + + .. versionadded:: 1.16.0 + """ + + def __init__(self): + self._j_builder = get_gateway().jvm.org.apache.flink.connector.kafka.source \ + .KafkaSource.builder() + + def build(self) -> 'KafkaSource': + return KafkaSource(self._j_builder.build()) + + def set_bootstrap_servers(self, bootstrap_servers: str) -> 'KafkaSourceBuilder': + """ + Sets the bootstrap servers for the KafkaConsumer of the KafkaSource. + + :param bootstrap_servers: the bootstrap servers of the Kafka cluster. + :return: this KafkaSourceBuilder. + """ + self._j_builder.setBootstrapServers(bootstrap_servers) + return self + + def set_group_id(self, group_id: str) -> 'KafkaSourceBuilder': + """ + Sets the consumer group id of the KafkaSource. + + :param group_id: the group id of the KafkaSource. + :return: this KafkaSourceBuilder. + """ + self._j_builder.setGroupId(group_id) + return self + + def set_topics(self, *topics: str) -> 'KafkaSourceBuilder': + """ + Set a list of topics the KafkaSource should consume from. All the topics in the list should + have existed in the Kafka cluster. Otherwise, an exception will be thrown. To allow some + topics to be created lazily, please use :meth:`set_topic_pattern` instead. + + :param topics: the list of topics to consume from. + :return: this KafkaSourceBuilder. + """ + self._j_builder.setTopics(to_jarray(get_gateway().jvm.java.lang.String, topics)) + return self + + def set_topic_pattern(self, topic_pattern: str) -> 'KafkaSourceBuilder': + """ + Set a topic pattern to consume from use the java Pattern. For grammar, check out + `JavaDoc `_ . + + :param topic_pattern: the pattern of the topic name to consume from. + :return: this KafkaSourceBuilder. + """ + self._j_builder.setTopicPattern(get_gateway().jvm.java.util.regex + .Pattern.compile(topic_pattern)) + return self + + def set_partitions(self, partitions: Set['KafkaTopicPartition']) -> 'KafkaSourceBuilder': + """ + Set a set of partitions to consume from. + + Example: + :: + + >>> KafkaSource.builder().set_partitions({ + ... KafkaTopicPartition('TOPIC1', 0), + ... KafkaTopicPartition('TOPIC1', 1), + ... }) + + :param partitions: the set of partitions to consume from. + :return: this KafkaSourceBuilder. + """ + j_set = get_gateway().jvm.java.util.HashSet() + for tp in partitions: + j_set.add(tp._to_j_topic_partition()) + self._j_builder.setPartitions(j_set) + return self + + def set_starting_offsets(self, starting_offsets_initializer: 'KafkaOffsetsInitializer') \ + -> 'KafkaSourceBuilder': + """ + Specify from which offsets the KafkaSource should start consume from by providing an + :class:`KafkaOffsetsInitializer`. + + The following :class:`KafkaOffsetsInitializer` s are commonly used and provided out of the + box. Currently, customized offset initializer is not supported in PyFlink. + + * :meth:`KafkaOffsetsInitializer.earliest` - starting from the earliest offsets. This is + also the default offset initializer of the KafkaSource for starting offsets. + * :meth:`KafkaOffsetsInitializer.latest` - starting from the latest offsets. + * :meth:`KafkaOffsetsInitializer.committedOffsets` - starting from the committed offsets of + the consumer group. If there is no committed offsets, starting from the offsets + specified by the :class:`KafkaOffsetResetStrategy`. + * :meth:`KafkaOffsetsInitializer.offsets` - starting from the specified offsets for each + partition. + * :meth:`KafkaOffsetsInitializer.timestamp` - starting from the specified timestamp for each + partition. Note that the guarantee here is that all the records in Kafka whose timestamp + is greater than the given starting timestamp will be consumed. However, it is possible + that some consumer records whose timestamp is smaller than the given starting timestamp + are also consumed. + + :param starting_offsets_initializer: the :class:`KafkaOffsetsInitializer` setting the + starting offsets for the Source. + :return: this KafkaSourceBuilder. + """ + self._j_builder.setStartingOffsets(starting_offsets_initializer._j_initializer) + return self + + def set_unbounded(self, stopping_offsets_initializer: 'KafkaOffsetsInitializer') \ + -> 'KafkaSourceBuilder': + """ + By default, the KafkaSource is set to run in CONTINUOUS_UNBOUNDED manner and thus never + stops until the Flink job fails or is canceled. To let the KafkaSource run as a streaming + source but still stops at some point, one can set an :class:`KafkaOffsetsInitializer` + to specify the stopping offsets for each partition. When all the partitions have reached + their stopping offsets, the KafkaSource will then exit. + + This method is different from :meth:`set_bounded` that after setting the stopping offsets + with this method, KafkaSource will still be CONTINUOUS_UNBOUNDED even though it will stop at + the stopping offsets specified by the stopping offset initializer. + + The following :class:`KafkaOffsetsInitializer` s are commonly used and provided out of the + box. Currently, customized offset initializer is not supported in PyFlink. + + * :meth:`KafkaOffsetsInitializer.latest` - starting from the latest offsets. + * :meth:`KafkaOffsetsInitializer.committedOffsets` - starting from the committed offsets of + the consumer group. If there is no committed offsets, starting from the offsets + specified by the :class:`KafkaOffsetResetStrategy`. + * :meth:`KafkaOffsetsInitializer.offsets` - starting from the specified offsets for each + partition. + * :meth:`KafkaOffsetsInitializer.timestamp` - starting from the specified timestamp for each + partition. Note that the guarantee here is that all the records in Kafka whose timestamp + is greater than the given starting timestamp will be consumed. However, it is possible + that some consumer records whose timestamp is smaller than the given starting timestamp + are also consumed. + + :param stopping_offsets_initializer: the :class:`KafkaOffsetsInitializer` to specify the + stopping offsets. + :return: this KafkaSourceBuilder + """ + self._j_builder.setUnbounded(stopping_offsets_initializer._j_initializer) + return self + + def set_bounded(self, stopping_offsets_initializer: 'KafkaOffsetsInitializer') \ + -> 'KafkaSourceBuilder': + """ + By default, the KafkaSource is set to run in CONTINUOUS_UNBOUNDED manner and thus never + stops until the Flink job fails or is canceled. To let the KafkaSource run in BOUNDED manner + and stop at some point, one can set an :class:`KafkaOffsetsInitializer` to specify the + stopping offsets for each partition. When all the partitions have reached their stopping + offsets, the KafkaSource will then exit. + + This method is different from :meth:`set_unbounded` that after setting the stopping offsets + with this method, :meth:`KafkaSource.get_boundedness` will return BOUNDED instead of + CONTINUOUS_UNBOUNDED. + + The following :class:`KafkaOffsetsInitializer` s are commonly used and provided out of the + box. Currently, customized offset initializer is not supported in PyFlink. + + * :meth:`KafkaOffsetsInitializer.latest` - starting from the latest offsets. + * :meth:`KafkaOffsetsInitializer.committedOffsets` - starting from the committed offsets of + the consumer group. If there is no committed offsets, starting from the offsets + specified by the :class:`KafkaOffsetResetStrategy`. + * :meth:`KafkaOffsetsInitializer.offsets` - starting from the specified offsets for each + partition. + * :meth:`KafkaOffsetsInitializer.timestamp` - starting from the specified timestamp for each + partition. Note that the guarantee here is that all the records in Kafka whose timestamp + is greater than the given starting timestamp will be consumed. However, it is possible + that some consumer records whose timestamp is smaller than the given starting timestamp + are also consumed. + + :param stopping_offsets_initializer: the :class:`KafkaOffsetsInitializer` to specify the + stopping offsets. + :return: this KafkaSourceBuilder + """ + self._j_builder.setBounded(stopping_offsets_initializer._j_initializer) + return self + + def set_value_only_deserializer(self, deserialization_schema: DeserializationSchema) \ + -> 'KafkaSourceBuilder': + """ + Sets the :class:`~pyflink.common.serialization.DeserializationSchema` for deserializing the + value of Kafka's ConsumerRecord. The other information (e.g. key) in a ConsumerRecord will + be ignored. + + :param deserialization_schema: the :class:`DeserializationSchema` to use for + deserialization. + :return: this KafkaSourceBuilder. + """ + self._j_builder.setValueOnlyDeserializer(deserialization_schema._j_deserialization_schema) + return self + + def set_client_id_prefix(self, prefix: str) -> 'KafkaSourceBuilder': + """ + Sets the client id prefix of this KafkaSource. + + :param prefix: the client id prefix to use for this KafkaSource. + :return: this KafkaSourceBuilder. + """ + self._j_builder.setClientIdPrefix(prefix) + return self + + def set_property(self, key: str, value: str) -> 'KafkaSourceBuilder': + """ + Set an arbitrary property for the KafkaSource and KafkaConsumer. The valid keys can be found + in ConsumerConfig and KafkaSourceOptions. + + Note that the following keys will be overridden by the builder when the KafkaSource is + created. + + * ``key.deserializer`` is always set to ByteArrayDeserializer. + * ``value.deserializer`` is always set to ByteArrayDeserializer. + * ``auto.offset.reset.strategy`` is overridden by AutoOffsetResetStrategy returned by + :class:`KafkaOffsetsInitializer` for the starting offsets, which is by default + :meth:`KafkaOffsetsInitializer.earliest`. + * ``partition.discovery.interval.ms`` is overridden to -1 when :meth:`set_bounded` has been + invoked. + + :param key: the key of the property. + :param value: the value of the property. + :return: this KafkaSourceBuilder. + """ + self._j_builder.setProperty(key, value) + return self + + def set_properties(self, props: Dict) -> 'KafkaSourceBuilder': + """ + Set arbitrary properties for the KafkaSource and KafkaConsumer. The valid keys can be found + in ConsumerConfig and KafkaSourceOptions. + + Note that the following keys will be overridden by the builder when the KafkaSource is + created. + + * ``key.deserializer`` is always set to ByteArrayDeserializer. + * ``value.deserializer`` is always set to ByteArrayDeserializer. + * ``auto.offset.reset.strategy`` is overridden by AutoOffsetResetStrategy returned by + :class:`KafkaOffsetsInitializer` for the starting offsets, which is by default + :meth:`KafkaOffsetsInitializer.earliest`. + * ``partition.discovery.interval.ms`` is overridden to -1 when :meth:`set_bounded` has been + invoked. + * ``client.id`` is overridden to "client.id.prefix-RANDOM_LONG", or "group.id-RANDOM_LONG" + if the client id prefix is not set. + + :param props: the properties to set for the KafkaSource. + :return: this KafkaSourceBuilder. + """ + gateway = get_gateway() + j_properties = gateway.jvm.java.util.Properties() + for key, value in props.items(): + j_properties.setProperty(key, value) + self._j_builder.setProperties(j_properties) + return self + + +class KafkaTopicPartition(object): + """ + Corresponding to Java ``org.apache.kafka.common.TopicPartition`` class. + + Example: + :: + + >>> topic_partition = KafkaTopicPartition('TOPIC1', 0) + + .. versionadded:: 1.16.0 + """ + + def __init__(self, topic: str, partition: int): + self._topic = topic + self._partition = partition + + def _to_j_topic_partition(self): + jvm = get_gateway().jvm + return jvm.org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition( + self._topic, self._partition) + + def __eq__(self, other): + if not isinstance(other, KafkaTopicPartition): + return False + return self._topic == other._topic and self._partition == other._partition + + def __hash__(self): + return 31 * (31 + self._partition) + hash(self._topic) + + +class KafkaOffsetResetStrategy(Enum): + """ + Corresponding to Java ``org.apache.kafka.client.consumer.OffsetResetStrategy`` class. + + .. versionadded:: 1.16.0 + """ + + LATEST = 0 + EARLIEST = 1 + NONE = 2 + + def _to_j_offset_reset_strategy(self): + JOffsetResetStrategy = get_gateway().jvm.org.apache.flink.kafka.shaded.org.apache.kafka.\ + clients.consumer.OffsetResetStrategy + return getattr(JOffsetResetStrategy, self.name) + + +class KafkaOffsetsInitializer(object): + """ + An interface for users to specify the starting / stopping offset of a KafkaPartitionSplit. + + .. versionadded:: 1.16.0 + """ + + def __init__(self, j_initializer: JavaObject): + self._j_initializer = j_initializer + + @staticmethod + def committed_offsets( + offset_reset_strategy: 'KafkaOffsetResetStrategy' = KafkaOffsetResetStrategy.NONE) -> \ + 'KafkaOffsetsInitializer': + """ + Get an :class:`KafkaOffsetsInitializer` which initializes the offsets to the committed + offsets. An exception will be thrown at runtime if there is no committed offsets. + + An optional :class:`KafkaOffsetResetStrategy` can be specified to initialize the offsets if + the committed offsets does not exist. + + :param offset_reset_strategy: the offset reset strategy to use when the committed offsets do + not exist. + :return: an offset initializer which initialize the offsets to the committed offsets. + """ + JOffsetsInitializer = get_gateway().jvm.org.apache.flink.connector.kafka.source.\ + enumerator.initializer.OffsetsInitializer + return KafkaOffsetsInitializer(JOffsetsInitializer.committedOffsets( + offset_reset_strategy._to_j_offset_reset_strategy())) + + @staticmethod + def timestamp(timestamp: int) -> 'KafkaOffsetsInitializer': + """ + Get an :class:`KafkaOffsetsInitializer` which initializes the offsets in each partition so + that the initialized offset is the offset of the first record whose record timestamp is + greater than or equals the give timestamp. + + :param timestamp: the timestamp to start the consumption. + :return: an :class:`OffsetsInitializer` which initializes the offsets based on the given + timestamp. + """ + JOffsetsInitializer = get_gateway().jvm.org.apache.flink.connector.kafka.source. \ + enumerator.initializer.OffsetsInitializer + return KafkaOffsetsInitializer(JOffsetsInitializer.timestamp(timestamp)) + + @staticmethod + def earliest() -> 'KafkaOffsetsInitializer': + """ + Get an :class:`KafkaOffsetsInitializer` which initializes the offsets to the earliest + available offsets of each partition. + + :return: an :class:`KafkaOffsetsInitializer` which initializes the offsets to the earliest + available offsets. + """ + JOffsetsInitializer = get_gateway().jvm.org.apache.flink.connector.kafka.source. \ + enumerator.initializer.OffsetsInitializer + return KafkaOffsetsInitializer(JOffsetsInitializer.earliest()) + + @staticmethod + def latest() -> 'KafkaOffsetsInitializer': + """ + Get an :class:`KafkaOffsetsInitializer` which initializes the offsets to the latest offsets + of each partition. + + :return: an :class:`KafkaOffsetsInitializer` which initializes the offsets to the latest + offsets. + """ + JOffsetsInitializer = get_gateway().jvm.org.apache.flink.connector.kafka.source. \ + enumerator.initializer.OffsetsInitializer + return KafkaOffsetsInitializer(JOffsetsInitializer.latest()) + + @staticmethod + def offsets(offsets: Dict['KafkaTopicPartition', int], + offset_reset_strategy: 'KafkaOffsetResetStrategy' = + KafkaOffsetResetStrategy.EARLIEST) -> 'KafkaOffsetsInitializer': + """ + Get an :class:`KafkaOffsetsInitializer` which initializes the offsets to the specified + offsets. + + An optional :class:`KafkaOffsetResetStrategy` can be specified to initialize the offsets in + case the specified offset is out of range. + + Example: + :: + + >>> KafkaOffsetsInitializer.offsets({ + ... KafkaTopicPartition('TOPIC1', 0): 0, + ... KafkaTopicPartition('TOPIC1', 1): 10000 + ... }, KafkaOffsetResetStrategy.EARLIEST) + + :param offsets: the specified offsets for each partition. + :param offset_reset_strategy: the :class:`KafkaOffsetResetStrategy` to use when the + specified offset is out of range. + :return: an :class:`KafkaOffsetsInitializer` which initializes the offsets to the specified + offsets. + """ + jvm = get_gateway().jvm + j_map_wrapper = jvm.org.apache.flink.python.util.HashMapWrapper( + None, get_java_class(jvm.Long)) + for tp, offset in offsets.items(): + j_map_wrapper.put(tp._to_j_topic_partition(), offset) + + JOffsetsInitializer = get_gateway().jvm.org.apache.flink.connector.kafka.source. \ + enumerator.initializer.OffsetsInitializer + return KafkaOffsetsInitializer(JOffsetsInitializer.offsets( + j_map_wrapper.asMap(), offset_reset_strategy._to_j_offset_reset_strategy())) + + +class KafkaSink(Sink, SupportsPreprocessing): + """ + Flink Sink to produce data into a Kafka topic. The sink supports all delivery guarantees + described by :class:`DeliveryGuarantee`. + + * :attr:`DeliveryGuarantee.NONE` does not provide any guarantees: messages may be lost in case + of issues on the Kafka broker and messages may be duplicated in case of a Flink failure. + * :attr:`DeliveryGuarantee.AT_LEAST_ONCE` the sink will wait for all outstanding records in the + Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. No messages will be + lost in case of any issue with the Kafka brokers but messages may be duplicated when Flink + restarts. + * :attr:`DeliveryGuarantee.EXACTLY_ONCE`: In this mode the KafkaSink will write all messages in + a Kafka transaction that will be committed to Kafka on a checkpoint. Thus, if the consumer + reads only committed data (see Kafka consumer config ``isolation.level``), no duplicates + will be seen in case of a Flink restart. However, this delays record writing effectively + until a checkpoint is written, so adjust the checkpoint duration accordingly. Please ensure + that you use unique transactional id prefixes across your applications running on the same + Kafka cluster such that multiple running jobs do not interfere in their transactions! + Additionally, it is highly recommended to tweak Kafka transaction timeout (link) >> maximum + checkpoint duration + maximum restart duration or data loss may happen when Kafka expires an + uncommitted transaction. + + .. versionadded:: 1.16.0 + """ + + def __init__(self, j_kafka_sink, transformer: Optional[StreamTransformer] = None): + super().__init__(j_kafka_sink) + self._transformer = transformer + + @staticmethod + def builder() -> 'KafkaSinkBuilder': + """ + Create a :class:`KafkaSinkBuilder` to construct :class:`KafkaSink`. + """ + return KafkaSinkBuilder() + + def get_transformer(self) -> Optional[StreamTransformer]: + return self._transformer + + +class KafkaSinkBuilder(object): + """ + Builder to construct :class:`KafkaSink`. + + The following example shows the minimum setup to create a KafkaSink that writes String values + to a Kafka topic. + + :: + + >>> record_serializer = KafkaRecordSerializationSchema.builder() \\ + ... .set_topic(MY_SINK_TOPIC) \\ + ... .set_value_serialization_schema(SimpleStringSchema()) \\ + ... .build() + >>> sink = KafkaSink.builder() \\ + ... .set_bootstrap_servers(MY_BOOTSTRAP_SERVERS) \\ + ... .set_record_serializer(record_serializer) \\ + ... .build() + + One can also configure different :class:`DeliveryGuarantee` by using + :meth:`set_delivery_guarantee` but keep in mind when using + :attr:`DeliveryGuarantee.EXACTLY_ONCE`, one must set the transactional id prefix + :meth:`set_transactional_id_prefix`. + + .. versionadded:: 1.16.0 + """ + + def __init__(self): + jvm = get_gateway().jvm + self._j_builder = jvm.org.apache.flink.connector.kafka.sink.KafkaSink.builder() + self._preprocessing = None + + def build(self) -> 'KafkaSink': + """ + Constructs the :class:`KafkaSink` with the configured properties. + """ + return KafkaSink(self._j_builder.build(), self._preprocessing) + + def set_bootstrap_servers(self, bootstrap_servers: str) -> 'KafkaSinkBuilder': + """ + Sets the Kafka bootstrap servers. + + :param bootstrap_servers: A comma separated list of valid URIs to reach the Kafka broker. + """ + self._j_builder.setBootstrapServers(bootstrap_servers) + return self + + def set_delivery_guarantee(self, delivery_guarantee: DeliveryGuarantee) -> 'KafkaSinkBuilder': + """ + Sets the wanted :class:`DeliveryGuarantee`. The default delivery guarantee is + :attr:`DeliveryGuarantee.NONE`. + + :param delivery_guarantee: The wanted :class:`DeliveryGuarantee`. + """ + self._j_builder.setDeliveryGuarantee(delivery_guarantee._to_j_delivery_guarantee()) + return self + + def set_transactional_id_prefix(self, transactional_id_prefix: str) -> 'KafkaSinkBuilder': + """ + Sets the prefix for all created transactionalIds if :attr:`DeliveryGuarantee.EXACTLY_ONCE` + is configured. + + It is mandatory to always set this value with :attr:`DeliveryGuarantee.EXACTLY_ONCE` to + prevent corrupted transactions if multiple jobs using the KafkaSink run against the same + Kafka Cluster. The default prefix is ``"kafka-sink"``. + + The size of the prefix is capped by MAXIMUM_PREFIX_BYTES (6400) formatted with UTF-8. + + It is important to keep the prefix stable across application restarts. If the prefix changes + it might happen that lingering transactions are not correctly aborted and newly written + messages are not immediately consumable until transactions timeout. + + :param transactional_id_prefix: The transactional id prefix. + """ + self._j_builder.setTransactionalIdPrefix(transactional_id_prefix) + return self + + def set_record_serializer(self, record_serializer: 'KafkaRecordSerializationSchema') \ + -> 'KafkaSinkBuilder': + """ + Sets the :class:`KafkaRecordSerializationSchema` that transforms incoming records to kafka + producer records. + + :param record_serializer: The :class:`KafkaRecordSerializationSchema`. + """ + # NOTE: If topic selector is a generated first-column selector, do extra preprocessing + j_topic_selector = get_field_value(record_serializer._j_serialization_schema, + 'topicSelector') + if ( + j_topic_selector.getClass().getCanonicalName() == + 'org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaBuilder.' + 'CachingTopicSelector' + ) and ( + get_field_value(j_topic_selector, 'topicSelector').getClass().getCanonicalName() + is not None and + (get_field_value(j_topic_selector, 'topicSelector').getClass().getCanonicalName() + .startswith('com.sun.proxy') or + get_field_value(j_topic_selector, 'topicSelector').getClass().getCanonicalName() + .startswith('jdk.proxy')) + ): + record_serializer._wrap_serialization_schema() + self._preprocessing = record_serializer._build_preprocessing() + + self._j_builder.setRecordSerializer(record_serializer._j_serialization_schema) + return self + + def set_property(self, key: str, value: str) -> 'KafkaSinkBuilder': + """ + Sets kafka producer config. + + :param key: Kafka producer config key. + :param value: Kafka producer config value. + """ + self._j_builder.setProperty(key, value) + return self + + +class KafkaRecordSerializationSchema(SerializationSchema): + """ + A serialization schema which defines how to convert the stream record to kafka producer record. + + .. versionadded:: 1.16.0 + """ + + def __init__(self, j_serialization_schema, + topic_selector: Optional['KafkaTopicSelector'] = None): + super().__init__(j_serialization_schema) + self._topic_selector = topic_selector + + @staticmethod + def builder() -> 'KafkaRecordSerializationSchemaBuilder': + """ + Creates a default schema builder to provide common building blocks i.e. key serialization, + value serialization, topic selection. + """ + return KafkaRecordSerializationSchemaBuilder() + + def _wrap_serialization_schema(self): + jvm = get_gateway().jvm + + def _wrap_schema(field_name): + j_schema_field = get_field(self._j_serialization_schema.getClass(), field_name) + if j_schema_field.get(self._j_serialization_schema) is not None: + j_schema_field.set( + self._j_serialization_schema, + jvm.org.apache.flink.python.util.PythonConnectorUtils + .SecondColumnSerializationSchema( + j_schema_field.get(self._j_serialization_schema) + ) + ) + + _wrap_schema('keySerializationSchema') + _wrap_schema('valueSerializationSchema') + + def _build_preprocessing(self) -> StreamTransformer: + class SelectTopicTransformer(StreamTransformer): + + def __init__(self, topic_selector: KafkaTopicSelector): + self._topic_selector = topic_selector + + def apply(self, ds): + output_type = Types.ROW([Types.STRING(), ds.get_type()]) + return ds.map(lambda v: Row(self._topic_selector.apply(v), v), + output_type=output_type) + + return SelectTopicTransformer(self._topic_selector) + + +class KafkaRecordSerializationSchemaBuilder(object): + """ + Builder to construct :class:`KafkaRecordSerializationSchema`. + + Example: + :: + + >>> KafkaRecordSerializationSchema.builder() \\ + ... .set_topic('topic') \\ + ... .set_key_serialization_schema(SimpleStringSchema()) \\ + ... .set_value_serialization_schema(SimpleStringSchema()) \\ + ... .build() + + And the sink topic can be calculated dynamically from each record: + :: + + >>> KafkaRecordSerializationSchema.builder() \\ + ... .set_topic_selector(lambda row: 'topic-' + row['category']) \\ + ... .set_value_serialization_schema( + ... JsonRowSerializationSchema.builder().with_type_info(ROW_TYPE).build()) \\ + ... .build() + + It is necessary to configure exactly one serialization method for the value and a topic. + + .. versionadded:: 1.16.0 + """ + + def __init__(self): + jvm = get_gateway().jvm + self._j_builder = jvm.org.apache.flink.connector.kafka.sink \ + .KafkaRecordSerializationSchemaBuilder() + self._fixed_topic = True # type: bool + self._topic_selector = None # type: Optional[KafkaTopicSelector] + self._key_serialization_schema = None # type: Optional[SerializationSchema] + self._value_serialization_schema = None # type: Optional[SerializationSchema] + + def build(self) -> 'KafkaRecordSerializationSchema': + """ + Constructs the :class:`KafkaRecordSerializationSchemaBuilder` with the configured + properties. + """ + if self._fixed_topic: + return KafkaRecordSerializationSchema(self._j_builder.build()) + else: + return KafkaRecordSerializationSchema(self._j_builder.build(), self._topic_selector) + + def set_topic(self, topic: str) -> 'KafkaRecordSerializationSchemaBuilder': + """ + Sets a fixed topic which used as destination for all records. + + :param topic: The fixed topic. + """ + self._j_builder.setTopic(topic) + self._fixed_topic = True + return self + + def set_topic_selector(self, topic_selector: Union[Callable[[Any], str], 'KafkaTopicSelector'])\ + -> 'KafkaRecordSerializationSchemaBuilder': + """ + Sets a topic selector which computes the target topic for every incoming record. + + :param topic_selector: A :class:`KafkaTopicSelector` implementation or a function that + consumes each incoming record and return the topic string. + """ + if not isinstance(topic_selector, KafkaTopicSelector) and not callable(topic_selector): + raise TypeError('topic_selector must be KafkaTopicSelector or a callable') + if not isinstance(topic_selector, KafkaTopicSelector): + class TopicSelectorFunctionAdapter(KafkaTopicSelector): + + def __init__(self, f: Callable[[Any], str]): + self._f = f + + def apply(self, data) -> str: + return self._f(data) + + topic_selector = TopicSelectorFunctionAdapter(topic_selector) + + jvm = get_gateway().jvm + self._j_builder.setTopicSelector( + jvm.org.apache.flink.python.util.PythonConnectorUtils.createFirstColumnTopicSelector( + get_java_class(jvm.org.apache.flink.connector.kafka.sink.TopicSelector) + ) + ) + self._fixed_topic = False + self._topic_selector = topic_selector + return self + + def set_key_serialization_schema(self, key_serialization_schema: SerializationSchema) \ + -> 'KafkaRecordSerializationSchemaBuilder': + """ + Sets a :class:`SerializationSchema` which is used to serialize the incoming element to the + key of the producer record. The key serialization is optional, if not set, the key of the + producer record will be null. + + :param key_serialization_schema: The :class:`SerializationSchema` to serialize each incoming + record as the key of producer record. + """ + self._key_serialization_schema = key_serialization_schema + self._j_builder.setKeySerializationSchema(key_serialization_schema._j_serialization_schema) + return self + + def set_value_serialization_schema(self, value_serialization_schema: SerializationSchema) \ + -> 'KafkaRecordSerializationSchemaBuilder': + """ + Sets a :class:`SerializationSchema` which is used to serialize the incoming element to the + value of the producer record. The value serialization is required. + + :param value_serialization_schema: The :class:`SerializationSchema` to serialize each data + record as the key of producer record. + """ + self._value_serialization_schema = value_serialization_schema + self._j_builder.setValueSerializationSchema( + value_serialization_schema._j_serialization_schema) + return self + + +class KafkaTopicSelector(ABC): + """ + Select topic for an incoming record + + .. versionadded:: 1.16.0 + """ + + @abstractmethod + def apply(self, data) -> str: + pass diff --git a/flink-python/pyflink/datastream/connectors/tests/test_kafka.py b/flink-python/pyflink/datastream/connectors/tests/test_kafka.py new file mode 100644 index 000000000..dea06b3e0 --- /dev/null +++ b/flink-python/pyflink/datastream/connectors/tests/test_kafka.py @@ -0,0 +1,669 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import json +from typing import Dict + +import pyflink.datastream.data_stream as data_stream +from pyflink.common import typeinfo + +from pyflink.common.configuration import Configuration +from pyflink.common.serialization import SimpleStringSchema, DeserializationSchema +from pyflink.common.typeinfo import Types +from pyflink.common.types import Row +from pyflink.common.watermark_strategy import WatermarkStrategy +from pyflink.datastream.connectors.base import DeliveryGuarantee +from pyflink.datastream.connectors.kafka import KafkaSource, KafkaTopicPartition, \ + KafkaOffsetsInitializer, KafkaOffsetResetStrategy, KafkaRecordSerializationSchema, KafkaSink, \ + FlinkKafkaProducer, FlinkKafkaConsumer +from pyflink.datastream.formats.avro import AvroRowDeserializationSchema, AvroRowSerializationSchema +from pyflink.datastream.formats.csv import CsvRowDeserializationSchema, CsvRowSerializationSchema +from pyflink.datastream.formats.json import JsonRowDeserializationSchema, JsonRowSerializationSchema +from pyflink.java_gateway import get_gateway +from pyflink.testing.test_case_utils import ( + PyFlinkStreamingTestCase, + PyFlinkTestCase, + invoke_java_object_method, + to_java_data_structure, +) +from pyflink.util.java_utils import to_jarray, is_instance_of, get_field_value + + +class KafkaSourceTests(PyFlinkStreamingTestCase): + + def test_legacy_kafka_connector(self): + source_topic = 'test_source_topic' + sink_topic = 'test_sink_topic' + props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'} + type_info = Types.ROW([Types.INT(), Types.STRING()]) + + # Test for kafka consumer + deserialization_schema = JsonRowDeserializationSchema.builder() \ + .type_info(type_info=type_info).build() + + flink_kafka_consumer = FlinkKafkaConsumer(source_topic, deserialization_schema, props) + flink_kafka_consumer.set_start_from_earliest() + flink_kafka_consumer.set_commit_offsets_on_checkpoints(True) + + j_properties = get_field_value(flink_kafka_consumer.get_java_function(), 'properties') + self.assertEqual('localhost:9092', j_properties.getProperty('bootstrap.servers')) + self.assertEqual('test_group', j_properties.getProperty('group.id')) + self.assertTrue(get_field_value(flink_kafka_consumer.get_java_function(), + 'enableCommitOnCheckpoints')) + j_start_up_mode = get_field_value(flink_kafka_consumer.get_java_function(), 'startupMode') + + j_deserializer = get_field_value(flink_kafka_consumer.get_java_function(), 'deserializer') + j_deserialize_type_info = invoke_java_object_method(j_deserializer, "getProducedType") + deserialize_type_info = typeinfo._from_java_type(j_deserialize_type_info) + self.assertTrue(deserialize_type_info == type_info) + self.assertTrue(j_start_up_mode.equals(get_gateway().jvm + .org.apache.flink.streaming.connectors + .kafka.config.StartupMode.EARLIEST)) + j_topic_desc = get_field_value(flink_kafka_consumer.get_java_function(), + 'topicsDescriptor') + j_topics = invoke_java_object_method(j_topic_desc, 'getFixedTopics') + self.assertEqual(['test_source_topic'], list(j_topics)) + + # Test for kafka producer + serialization_schema = JsonRowSerializationSchema.builder().with_type_info(type_info) \ + .build() + flink_kafka_producer = FlinkKafkaProducer(sink_topic, serialization_schema, props) + flink_kafka_producer.set_write_timestamp_to_kafka(False) + + j_producer_config = get_field_value(flink_kafka_producer.get_java_function(), + 'producerConfig') + self.assertEqual('localhost:9092', j_producer_config.getProperty('bootstrap.servers')) + self.assertEqual('test_group', j_producer_config.getProperty('group.id')) + self.assertFalse(get_field_value(flink_kafka_producer.get_java_function(), + 'writeTimestampToKafka')) + + def test_compiling(self): + source = KafkaSource.builder() \ + .set_bootstrap_servers('localhost:9092') \ + .set_topics('test_topic') \ + .set_value_only_deserializer(SimpleStringSchema()) \ + .build() + + ds = self.env.from_source(source=source, + watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(), + source_name='kafka source') + ds.print() + plan = json.loads(self.env.get_execution_plan()) + self.assertEqual('Source: kafka source', plan['nodes'][0]['type']) + + def test_set_properties(self): + source = KafkaSource.builder() \ + .set_bootstrap_servers('localhost:9092') \ + .set_group_id('test_group_id') \ + .set_client_id_prefix('test_client_id_prefix') \ + .set_property('test_property', 'test_value') \ + .set_topics('test_topic') \ + .set_value_only_deserializer(SimpleStringSchema()) \ + .build() + conf = self._get_kafka_source_configuration(source) + self.assertEqual(conf.get_string('bootstrap.servers', ''), 'localhost:9092') + self.assertEqual(conf.get_string('group.id', ''), 'test_group_id') + self.assertEqual(conf.get_string('client.id.prefix', ''), 'test_client_id_prefix') + self.assertEqual(conf.get_string('test_property', ''), 'test_value') + + def test_set_topics(self): + source = KafkaSource.builder() \ + .set_bootstrap_servers('localhost:9092') \ + .set_topics('test_topic1', 'test_topic2') \ + .set_value_only_deserializer(SimpleStringSchema()) \ + .build() + kafka_subscriber = get_field_value(source.get_java_function(), 'subscriber') + self.assertEqual( + kafka_subscriber.getClass().getCanonicalName(), + 'org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber' + ) + topics = get_field_value(kafka_subscriber, 'topics') + self.assertTrue(is_instance_of(topics, get_gateway().jvm.java.util.List)) + self.assertEqual(topics.size(), 2) + self.assertEqual(topics[0], 'test_topic1') + self.assertEqual(topics[1], 'test_topic2') + + def test_set_topic_pattern(self): + source = KafkaSource.builder() \ + .set_bootstrap_servers('localhost:9092') \ + .set_topic_pattern('test_topic*') \ + .set_value_only_deserializer(SimpleStringSchema()) \ + .build() + kafka_subscriber = get_field_value(source.get_java_function(), 'subscriber') + self.assertEqual( + kafka_subscriber.getClass().getCanonicalName(), + 'org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicPatternSubscriber' + ) + topic_pattern = get_field_value(kafka_subscriber, 'topicPattern') + self.assertTrue(is_instance_of(topic_pattern, get_gateway().jvm.java.util.regex.Pattern)) + self.assertEqual(topic_pattern.toString(), 'test_topic*') + + def test_set_partitions(self): + topic_partition_1 = KafkaTopicPartition('test_topic', 1) + topic_partition_2 = KafkaTopicPartition('test_topic', 2) + source = KafkaSource.builder() \ + .set_bootstrap_servers('localhost:9092') \ + .set_partitions({topic_partition_1, topic_partition_2}) \ + .set_value_only_deserializer(SimpleStringSchema()) \ + .build() + kafka_subscriber = get_field_value(source.get_java_function(), 'subscriber') + self.assertEqual( + kafka_subscriber.getClass().getCanonicalName(), + 'org.apache.flink.connector.kafka.source.enumerator.subscriber.PartitionSetSubscriber' + ) + partitions = get_field_value(kafka_subscriber, 'subscribedPartitions') + self.assertTrue(is_instance_of(partitions, get_gateway().jvm.java.util.Set)) + self.assertTrue(topic_partition_1._to_j_topic_partition() in partitions) + self.assertTrue(topic_partition_2._to_j_topic_partition() in partitions) + + def test_set_starting_offsets(self): + def _build_source(initializer: KafkaOffsetsInitializer): + return KafkaSource.builder() \ + .set_bootstrap_servers('localhost:9092') \ + .set_topics('test_topic') \ + .set_value_only_deserializer(SimpleStringSchema()) \ + .set_group_id('test_group') \ + .set_starting_offsets(initializer) \ + .build() + + self._check_latest_offsets_initializer( + _build_source(KafkaOffsetsInitializer.latest())) + self._check_reader_handled_offsets_initializer( + _build_source(KafkaOffsetsInitializer.earliest()), -2, + KafkaOffsetResetStrategy.EARLIEST + ) + self._check_reader_handled_offsets_initializer( + _build_source(KafkaOffsetsInitializer.committed_offsets()), -3, + KafkaOffsetResetStrategy.NONE + ) + self._check_reader_handled_offsets_initializer( + _build_source(KafkaOffsetsInitializer.committed_offsets( + KafkaOffsetResetStrategy.LATEST + )), -3, KafkaOffsetResetStrategy.LATEST + ) + self._check_timestamp_offsets_initializer( + _build_source(KafkaOffsetsInitializer.timestamp(100)), 100 + ) + specified_offsets = { + KafkaTopicPartition('test_topic1', 1): 1000, + KafkaTopicPartition('test_topic2', 2): 2000 + } + self._check_specified_offsets_initializer( + _build_source(KafkaOffsetsInitializer.offsets(specified_offsets)), specified_offsets, + KafkaOffsetResetStrategy.EARLIEST + ) + self._check_specified_offsets_initializer( + _build_source(KafkaOffsetsInitializer.offsets( + specified_offsets, + KafkaOffsetResetStrategy.LATEST + )), + specified_offsets, + KafkaOffsetResetStrategy.LATEST + ) + + def test_bounded(self): + def _build_source(initializer: KafkaOffsetsInitializer): + return KafkaSource.builder() \ + .set_bootstrap_servers('localhost:9092') \ + .set_topics('test_topic') \ + .set_value_only_deserializer(SimpleStringSchema()) \ + .set_group_id('test_group') \ + .set_bounded(initializer) \ + .build() + + def _check_bounded(source: KafkaSource): + self.assertEqual( + get_field_value(source.get_java_function(), 'boundedness').toString(), 'BOUNDED' + ) + + self._test_set_bounded_or_unbounded(_build_source, _check_bounded) + + def test_unbounded(self): + def _build_source(initializer: KafkaOffsetsInitializer): + return KafkaSource.builder() \ + .set_bootstrap_servers('localhost:9092') \ + .set_topics('test_topic') \ + .set_value_only_deserializer(SimpleStringSchema()) \ + .set_group_id('test_group') \ + .set_unbounded(initializer) \ + .build() + + def _check_bounded(source: KafkaSource): + self.assertEqual( + get_field_value(source.get_java_function(), 'boundedness').toString(), + 'CONTINUOUS_UNBOUNDED' + ) + + self._test_set_bounded_or_unbounded(_build_source, _check_bounded) + + def _test_set_bounded_or_unbounded(self, _build_source, _check_boundedness): + source = _build_source(KafkaOffsetsInitializer.latest()) + _check_boundedness(source) + self._check_latest_offsets_initializer(source, False) + source = _build_source(KafkaOffsetsInitializer.earliest()) + _check_boundedness(source) + self._check_reader_handled_offsets_initializer( + source, -2, KafkaOffsetResetStrategy.EARLIEST, False + ) + source = _build_source(KafkaOffsetsInitializer.committed_offsets()) + _check_boundedness(source) + self._check_reader_handled_offsets_initializer( + source, -3, KafkaOffsetResetStrategy.NONE, False + ) + source = _build_source(KafkaOffsetsInitializer.committed_offsets( + KafkaOffsetResetStrategy.LATEST + )) + _check_boundedness(source) + self._check_reader_handled_offsets_initializer( + source, -3, KafkaOffsetResetStrategy.LATEST, False + ) + source = _build_source(KafkaOffsetsInitializer.timestamp(100)) + _check_boundedness(source) + self._check_timestamp_offsets_initializer(source, 100, False) + specified_offsets = { + KafkaTopicPartition('test_topic1', 1): 1000, + KafkaTopicPartition('test_topic2', 2): 2000 + } + source = _build_source(KafkaOffsetsInitializer.offsets(specified_offsets)) + _check_boundedness(source) + self._check_specified_offsets_initializer( + source, specified_offsets, KafkaOffsetResetStrategy.EARLIEST, False + ) + source = _build_source(KafkaOffsetsInitializer.offsets( + specified_offsets, + KafkaOffsetResetStrategy.LATEST) + ) + _check_boundedness(source) + self._check_specified_offsets_initializer( + source, specified_offsets, KafkaOffsetResetStrategy.LATEST, False + ) + + def test_set_value_only_deserializer(self): + def _check(schema: DeserializationSchema, class_name: str): + source = KafkaSource.builder() \ + .set_bootstrap_servers('localhost:9092') \ + .set_topics('test_topic') \ + .set_value_only_deserializer(schema) \ + .build() + deserialization_schema_wrapper = get_field_value(source.get_java_function(), + 'deserializationSchema') + self.assertEqual( + deserialization_schema_wrapper.getClass().getCanonicalName(), + 'org.apache.flink.connector.kafka.source.reader.deserializer' + '.KafkaValueOnlyDeserializationSchemaWrapper' + ) + deserialization_schema = get_field_value(deserialization_schema_wrapper, + 'deserializationSchema') + self.assertEqual(deserialization_schema.getClass().getCanonicalName(), + class_name) + + _check(SimpleStringSchema(), 'org.apache.flink.api.common.serialization.SimpleStringSchema') + _check( + JsonRowDeserializationSchema.builder().type_info(Types.ROW([Types.STRING()])).build(), + 'org.apache.flink.formats.json.JsonRowDeserializationSchema' + ) + _check( + CsvRowDeserializationSchema.Builder(Types.ROW([Types.STRING()])).build(), + 'org.apache.flink.formats.csv.CsvRowDeserializationSchema' + ) + avro_schema_string = """ + { + "type": "record", + "name": "test_record", + "fields": [] + } + """ + _check( + AvroRowDeserializationSchema(avro_schema_string=avro_schema_string), + 'org.apache.flink.formats.avro.AvroRowDeserializationSchema' + ) + + def _check_reader_handled_offsets_initializer(self, + source: KafkaSource, + offset: int, + reset_strategy: KafkaOffsetResetStrategy, + is_start: bool = True): + if is_start: + field_name = 'startingOffsetsInitializer' + else: + field_name = 'stoppingOffsetsInitializer' + offsets_initializer = get_field_value(source.get_java_function(), field_name) + self.assertEqual( + offsets_initializer.getClass().getCanonicalName(), + 'org.apache.flink.connector.kafka.source.enumerator.initializer' + '.ReaderHandledOffsetsInitializer' + ) + + starting_offset = get_field_value(offsets_initializer, 'startingOffset') + self.assertEqual(starting_offset, offset) + + offset_reset_strategy = get_field_value(offsets_initializer, 'offsetResetStrategy') + self.assertTrue( + offset_reset_strategy.equals(reset_strategy._to_j_offset_reset_strategy()) + ) + + def _check_latest_offsets_initializer(self, + source: KafkaSource, + is_start: bool = True): + if is_start: + field_name = 'startingOffsetsInitializer' + else: + field_name = 'stoppingOffsetsInitializer' + offsets_initializer = get_field_value(source.get_java_function(), field_name) + self.assertEqual( + offsets_initializer.getClass().getCanonicalName(), + 'org.apache.flink.connector.kafka.source.enumerator.initializer' + '.LatestOffsetsInitializer' + ) + + def _check_timestamp_offsets_initializer(self, + source: KafkaSource, + timestamp: int, + is_start: bool = True): + if is_start: + field_name = 'startingOffsetsInitializer' + else: + field_name = 'stoppingOffsetsInitializer' + offsets_initializer = get_field_value(source.get_java_function(), field_name) + self.assertEqual( + offsets_initializer.getClass().getCanonicalName(), + 'org.apache.flink.connector.kafka.source.enumerator.initializer' + '.TimestampOffsetsInitializer' + ) + + starting_timestamp = get_field_value(offsets_initializer, 'startingTimestamp') + self.assertEqual(starting_timestamp, timestamp) + + def _check_specified_offsets_initializer(self, + source: KafkaSource, + offsets: Dict[KafkaTopicPartition, int], + reset_strategy: KafkaOffsetResetStrategy, + is_start: bool = True): + if is_start: + field_name = 'startingOffsetsInitializer' + else: + field_name = 'stoppingOffsetsInitializer' + offsets_initializer = get_field_value(source.get_java_function(), field_name) + self.assertEqual( + offsets_initializer.getClass().getCanonicalName(), + 'org.apache.flink.connector.kafka.source.enumerator.initializer' + '.SpecifiedOffsetsInitializer' + ) + + initial_offsets = get_field_value(offsets_initializer, 'initialOffsets') + self.assertTrue(is_instance_of(initial_offsets, get_gateway().jvm.java.util.Map)) + self.assertEqual(initial_offsets.size(), len(offsets)) + for j_topic_partition in initial_offsets: + topic_partition = KafkaTopicPartition(j_topic_partition.topic(), + j_topic_partition.partition()) + self.assertIsNotNone(offsets.get(topic_partition)) + self.assertEqual(initial_offsets[j_topic_partition], offsets[topic_partition]) + + offset_reset_strategy = get_field_value(offsets_initializer, 'offsetResetStrategy') + self.assertTrue( + offset_reset_strategy.equals(reset_strategy._to_j_offset_reset_strategy()) + ) + + @staticmethod + def _get_kafka_source_configuration(source: KafkaSource): + jvm = get_gateway().jvm + j_source = source.get_java_function() + j_to_configuration = j_source.getClass().getDeclaredMethod( + 'getConfiguration', to_jarray(jvm.java.lang.Class, []) + ) + j_to_configuration.setAccessible(True) + j_configuration = j_to_configuration.invoke(j_source, to_jarray(jvm.java.lang.Object, [])) + return Configuration(j_configuration=j_configuration) + + +class KafkaSinkTests(PyFlinkStreamingTestCase): + + def test_compile(self): + sink = KafkaSink.builder() \ + .set_bootstrap_servers('localhost:9092') \ + .set_record_serializer(self._build_serialization_schema()) \ + .build() + + ds = self.env.from_collection([], type_info=Types.STRING()) + ds.sink_to(sink) + + plan = json.loads(self.env.get_execution_plan()) + self.assertEqual(plan['nodes'][1]['type'], 'Sink: Writer') + self.assertEqual(plan['nodes'][2]['type'], 'Sink: Committer') + + def test_set_bootstrap_severs(self): + sink = KafkaSink.builder() \ + .set_bootstrap_servers('localhost:9092,localhost:9093') \ + .set_record_serializer(self._build_serialization_schema()) \ + .build() + config = get_field_value(sink.get_java_function(), 'kafkaProducerConfig') + self.assertEqual(config.get('bootstrap.servers'), 'localhost:9092,localhost:9093') + + def test_set_delivery_guarantee(self): + sink = KafkaSink.builder() \ + .set_bootstrap_servers('localhost:9092') \ + .set_record_serializer(self._build_serialization_schema()) \ + .build() + guarantee = get_field_value(sink.get_java_function(), 'deliveryGuarantee') + self.assertEqual(guarantee.toString(), 'none') + + sink = KafkaSink.builder() \ + .set_bootstrap_servers('localhost:9092') \ + .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \ + .set_record_serializer(self._build_serialization_schema()) \ + .build() + guarantee = get_field_value(sink.get_java_function(), 'deliveryGuarantee') + self.assertEqual(guarantee.toString(), 'at-least-once') + + sink = KafkaSink.builder() \ + .set_bootstrap_servers('localhost:9092') \ + .set_delivery_guarantee(DeliveryGuarantee.EXACTLY_ONCE) \ + .set_record_serializer(self._build_serialization_schema()) \ + .build() + guarantee = get_field_value(sink.get_java_function(), 'deliveryGuarantee') + self.assertEqual(guarantee.toString(), 'exactly-once') + + def test_set_transactional_id_prefix(self): + sink = KafkaSink.builder() \ + .set_bootstrap_servers('localhost:9092') \ + .set_transactional_id_prefix('test-prefix') \ + .set_record_serializer(self._build_serialization_schema()) \ + .build() + prefix = get_field_value(sink.get_java_function(), 'transactionalIdPrefix') + self.assertEqual(prefix, 'test-prefix') + + def test_set_property(self): + sink = KafkaSink.builder() \ + .set_bootstrap_servers('localhost:9092') \ + .set_record_serializer(self._build_serialization_schema()) \ + .set_property('test-key', 'test-value') \ + .build() + config = get_field_value(sink.get_java_function(), 'kafkaProducerConfig') + self.assertEqual(config.get('test-key'), 'test-value') + + def test_set_record_serializer(self): + sink = KafkaSink.builder() \ + .set_bootstrap_servers('localhost:9092') \ + .set_record_serializer(self._build_serialization_schema()) \ + .build() + serializer = get_field_value(sink.get_java_function(), 'recordSerializer') + self.assertEqual(serializer.getClass().getCanonicalName(), + 'org.apache.flink.connector.kafka.sink.' + 'KafkaRecordSerializationSchemaBuilder.' + 'KafkaRecordSerializationSchemaWrapper') + topic_selector = get_field_value(serializer, 'topicSelector') + self.assertEqual(topic_selector.apply(None), 'test-topic') + value_serializer = get_field_value(serializer, 'valueSerializationSchema') + self.assertEqual(value_serializer.getClass().getCanonicalName(), + 'org.apache.flink.api.common.serialization.SimpleStringSchema') + + @staticmethod + def _build_serialization_schema() -> KafkaRecordSerializationSchema: + return KafkaRecordSerializationSchema.builder() \ + .set_topic('test-topic') \ + .set_value_serialization_schema(SimpleStringSchema()) \ + .build() + + +class KafkaRecordSerializationSchemaTests(PyFlinkTestCase): + + def test_set_topic(self): + input_type = Types.ROW([Types.STRING()]) + + serialization_schema = KafkaRecordSerializationSchema.builder() \ + .set_topic('test-topic') \ + .set_value_serialization_schema( + JsonRowSerializationSchema.builder().with_type_info(input_type).build()) \ + .build() + jvm = get_gateway().jvm + serialization_schema._j_serialization_schema.open( + jvm.org.apache.flink.connector.testutils.formats.DummyInitializationContext(), + jvm.org.apache.flink.connector.kafka.sink.DefaultKafkaSinkContext( + 0, 1, jvm.java.util.Properties())) + + j_record = serialization_schema._j_serialization_schema.serialize( + to_java_data_structure(Row('test')), None, None + ) + self.assertEqual(j_record.topic(), 'test-topic') + self.assertIsNone(j_record.key()) + self.assertEqual(j_record.value(), b'{"f0":"test"}') + + def test_set_topic_selector(self): + def _select(data): + data = data[0] + if data == 'a': + return 'topic-a' + elif data == 'b': + return 'topic-b' + else: + return 'topic-dead-letter' + + def _check_record(data, topic, serialized_data): + input_type = Types.ROW([Types.STRING()]) + + serialization_schema = KafkaRecordSerializationSchema.builder() \ + .set_topic_selector(_select) \ + .set_value_serialization_schema( + JsonRowSerializationSchema.builder().with_type_info(input_type).build()) \ + .build() + jvm = get_gateway().jvm + serialization_schema._j_serialization_schema.open( + jvm.org.apache.flink.connector.testutils.formats.DummyInitializationContext(), + jvm.org.apache.flink.connector.kafka.sink.DefaultKafkaSinkContext( + 0, 1, jvm.java.util.Properties())) + sink = KafkaSink.builder() \ + .set_bootstrap_servers('localhost:9092') \ + .set_record_serializer(serialization_schema) \ + .build() + + ds = MockDataStream(Types.ROW([Types.STRING()])) + ds.sink_to(sink) + row = Row(data) + topic_row = ds.feed(row) # type: Row + j_record = serialization_schema._j_serialization_schema.serialize( + to_java_data_structure(topic_row), None, None + ) + self.assertEqual(j_record.topic(), topic) + self.assertIsNone(j_record.key()) + self.assertEqual(j_record.value(), serialized_data) + + _check_record('a', 'topic-a', b'{"f0":"a"}') + _check_record('b', 'topic-b', b'{"f0":"b"}') + _check_record('c', 'topic-dead-letter', b'{"f0":"c"}') + _check_record('d', 'topic-dead-letter', b'{"f0":"d"}') + + def test_set_key_serialization_schema(self): + def _check_key_serialization_schema(key_serialization_schema, expected_class): + serialization_schema = KafkaRecordSerializationSchema.builder() \ + .set_topic('test-topic') \ + .set_key_serialization_schema(key_serialization_schema) \ + .set_value_serialization_schema(SimpleStringSchema()) \ + .build() + schema_field = get_field_value(serialization_schema._j_serialization_schema, + 'keySerializationSchema') + self.assertIsNotNone(schema_field) + self.assertEqual(schema_field.getClass().getCanonicalName(), expected_class) + + self._check_serialization_schema_implementations(_check_key_serialization_schema) + + def test_set_value_serialization_schema(self): + def _check_value_serialization_schema(value_serialization_schema, expected_class): + serialization_schema = KafkaRecordSerializationSchema.builder() \ + .set_topic('test-topic') \ + .set_value_serialization_schema(value_serialization_schema) \ + .build() + schema_field = get_field_value(serialization_schema._j_serialization_schema, + 'valueSerializationSchema') + self.assertIsNotNone(schema_field) + self.assertEqual(schema_field.getClass().getCanonicalName(), expected_class) + + self._check_serialization_schema_implementations(_check_value_serialization_schema) + + @staticmethod + def _check_serialization_schema_implementations(check_function): + input_type = Types.ROW([Types.STRING()]) + + check_function( + JsonRowSerializationSchema.builder().with_type_info(input_type).build(), + 'org.apache.flink.formats.json.JsonRowSerializationSchema' + ) + check_function( + CsvRowSerializationSchema.Builder(input_type).build(), + 'org.apache.flink.formats.csv.CsvRowSerializationSchema' + ) + avro_schema_string = """ + { + "type": "record", + "name": "test_record", + "fields": [] + } + """ + check_function( + AvroRowSerializationSchema(avro_schema_string=avro_schema_string), + 'org.apache.flink.formats.avro.AvroRowSerializationSchema' + ) + check_function( + SimpleStringSchema(), + 'org.apache.flink.api.common.serialization.SimpleStringSchema' + ) + + +class MockDataStream(data_stream.DataStream): + + def __init__(self, original_type=None): + super().__init__(None) + self._operators = [] + self._type = original_type + + def feed(self, data): + for op in self._operators: + data = op(data) + return data + + def get_type(self): + return self._type + + def map(self, f, output_type=None): + self._operators.append(f) + self._type = output_type + + def sink_to(self, sink): + ds = self + from pyflink.datastream.connectors.base import SupportsPreprocessing + if isinstance(sink, SupportsPreprocessing) and sink.get_transformer() is not None: + ds = sink.get_transformer().apply(self) + return ds diff --git a/flink-python/pyflink/pyflink_gateway_server.py b/flink-python/pyflink/pyflink_gateway_server.py new file mode 100644 index 000000000..132ec81e0 --- /dev/null +++ b/flink-python/pyflink/pyflink_gateway_server.py @@ -0,0 +1,291 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# This is a copy of the pyflink_gateway_server.py file from the Flink. +# The original file which is accessible here: +# https://github.com/apache/flink/blob/master/flink-python/pyflink/pyflink_gateway_server.py +# Additional change is the handling of the FLINK_TEST_LIB_DIR environmental variable. +# It could be used to add extra testing jars for the gateway classpath. +# The plan is to remove this once Pyflink 1.19 is released + +import argparse +import getpass +import glob +import os +import platform +import signal +import socket +import sys +from collections import namedtuple +from string import Template +from subprocess import Popen, PIPE + +from pyflink.find_flink_home import _find_flink_home, _find_flink_source_root + +KEY_ENV_LOG_DIR = "env.log.dir" +KEY_ENV_YARN_CONF_DIR = "env.yarn.conf.dir" +KEY_ENV_HADOOP_CONF_DIR = "env.hadoop.conf.dir" +KEY_ENV_HBASE_CONF_DIR = "env.hbase.conf.dir" +KEY_ENV_JAVA_HOME = "env.java.home" +KEY_ENV_JAVA_OPTS = "env.java.opts.all" +KEY_ENV_JAVA_OPTS_DEPRECATED = "env.java.opts" + + +def on_windows(): + return platform.system() == "Windows" + + +def read_from_config(key, default_value, flink_conf_file): + value = default_value + # get the realpath of tainted path value to avoid CWE22 problem that constructs a path or URI + # using the tainted value and might allow an attacker to access, modify, or test the existence + # of critical or sensitive files. + with open(os.path.realpath(flink_conf_file), "r") as f: + while True: + line = f.readline() + if not line: + break + if line.startswith("#") or len(line.strip()) == 0: + continue + k, v = line.split(":", 1) + if k.strip() == key: + value = v.strip() + return value + + +def find_java_executable(): + java_executable = "java.exe" if on_windows() else "java" + flink_home = _find_flink_home() + flink_conf_file = os.path.join(flink_home, "conf", "flink-conf.yaml") + java_home = read_from_config(KEY_ENV_JAVA_HOME, None, flink_conf_file) + + if java_home is None and "JAVA_HOME" in os.environ: + java_home = os.environ["JAVA_HOME"] + + if java_home is not None: + java_executable = os.path.join(java_home, "bin", java_executable) + + return java_executable + + +def prepare_environment_variables(env): + flink_home = _find_flink_home() + # get the realpath of tainted path value to avoid CWE22 problem that constructs a path or URI + # using the tainted value and might allow an attacker to access, modify, or test the existence + # of critical or sensitive files. + real_flink_home = os.path.realpath(flink_home) + + if 'FLINK_CONF_DIR' in env: + flink_conf_directory = os.path.realpath(env['FLINK_CONF_DIR']) + else: + flink_conf_directory = os.path.join(real_flink_home, "conf") + env['FLINK_CONF_DIR'] = flink_conf_directory + + if 'FLINK_LIB_DIR' in env: + flink_lib_directory = os.path.realpath(env['FLINK_LIB_DIR']) + else: + flink_lib_directory = os.path.join(real_flink_home, "lib") + env['FLINK_LIB_DIR'] = flink_lib_directory + + if 'FLINK_OPT_DIR' in env: + flink_opt_directory = os.path.realpath(env['FLINK_OPT_DIR']) + else: + flink_opt_directory = os.path.join(real_flink_home, "opt") + env['FLINK_OPT_DIR'] = flink_opt_directory + + if 'FLINK_PLUGINS_DIR' in env: + flink_plugins_directory = os.path.realpath(env['FLINK_PLUGINS_DIR']) + else: + flink_plugins_directory = os.path.join(real_flink_home, "plugins") + env['FLINK_PLUGINS_DIR'] = flink_plugins_directory + + env["FLINK_BIN_DIR"] = os.path.join(real_flink_home, "bin") + + +def construct_log_settings(env): + templates = [ + "-Dlog.file=${flink_log_dir}/flink-${flink_ident_string}-python-${hostname}.log", + "-Dlog4j.configuration=${log4j_properties}", + "-Dlog4j.configurationFile=${log4j_properties}", + "-Dlogback.configurationFile=${logback_xml}" + ] + + flink_home = os.path.realpath(_find_flink_home()) + flink_conf_dir = env['FLINK_CONF_DIR'] + flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml") + + if "FLINK_LOG_DIR" in env: + flink_log_dir = env["FLINK_LOG_DIR"] + else: + flink_log_dir = read_from_config( + KEY_ENV_LOG_DIR, os.path.join(flink_home, "log"), flink_conf_file) + + if "LOG4J_PROPERTIES" in env: + log4j_properties = env["LOG4J_PROPERTIES"] + else: + log4j_properties = "%s/log4j-cli.properties" % flink_conf_dir + + if "LOGBACK_XML" in env: + logback_xml = env["LOGBACK_XML"] + else: + logback_xml = "%s/logback.xml" % flink_conf_dir + + if "FLINK_IDENT_STRING" in env: + flink_ident_string = env["FLINK_IDENT_STRING"] + else: + flink_ident_string = getpass.getuser() + + hostname = socket.gethostname() + log_settings = [] + for template in templates: + log_settings.append(Template(template).substitute( + log4j_properties=log4j_properties, + logback_xml=logback_xml, + flink_log_dir=flink_log_dir, + flink_ident_string=flink_ident_string, + hostname=hostname)) + return log_settings + + +def get_jvm_opts(env): + flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml") + jvm_opts = env.get( + 'FLINK_ENV_JAVA_OPTS', + read_from_config( + KEY_ENV_JAVA_OPTS, + read_from_config(KEY_ENV_JAVA_OPTS_DEPRECATED, "", flink_conf_file), + flink_conf_file)) + + # Remove leading and ending double quotes (if present) of value + jvm_opts = jvm_opts.strip("\"") + return jvm_opts.split(" ") + + +def construct_flink_classpath(env): + flink_home = _find_flink_home() + flink_lib_directory = env['FLINK_LIB_DIR'] + flink_opt_directory = env['FLINK_OPT_DIR'] + + if on_windows(): + # The command length is limited on Windows. To avoid the problem we should shorten the + # command length as much as possible. + lib_jars = os.path.join(flink_lib_directory, "*") + else: + lib_jars = os.pathsep.join(glob.glob(os.path.join(flink_lib_directory, "*.jar"))) + + flink_python_jars = glob.glob(os.path.join(flink_opt_directory, "flink-python*.jar")) + if len(flink_python_jars) < 1: + print("The flink-python jar is not found in the opt folder of the FLINK_HOME: %s" % + flink_home) + return lib_jars + flink_python_jar = flink_python_jars[0] + + return os.pathsep.join([lib_jars, flink_python_jar]) + + +def construct_hadoop_classpath(env): + flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml") + + hadoop_conf_dir = "" + if 'HADOOP_CONF_DIR' not in env and 'HADOOP_CLASSPATH' not in env: + if os.path.isdir("/etc/hadoop/conf"): + print("Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or" + "HADOOP_CLASSPATH was set.") + hadoop_conf_dir = "/etc/hadoop/conf" + + hbase_conf_dir = "" + if 'HBASE_CONF_DIR' not in env: + if os.path.isdir("/etc/hbase/conf"): + print("Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.") + hbase_conf_dir = "/etc/hbase/conf" + + return os.pathsep.join( + [env.get("HADOOP_CLASSPATH", ""), + env.get("YARN_CONF_DIR", + read_from_config(KEY_ENV_YARN_CONF_DIR, "", flink_conf_file)), + env.get("HADOOP_CONF_DIR", + read_from_config(KEY_ENV_HADOOP_CONF_DIR, hadoop_conf_dir, flink_conf_file)), + env.get("HBASE_CONF_DIR", + read_from_config(KEY_ENV_HBASE_CONF_DIR, hbase_conf_dir, flink_conf_file))]) + + +def construct_test_classpath(env): + test_jar_patterns = [ + "flink-python/target/test-dependencies/*", + "flink-python/target/artifacts/testDataStream.jar", + "flink-python/target/flink-python*-tests.jar", + ] + test_jars = [] + + # Connector tests need to add specific jars to the gateway classpath + # flink_source_root = env['FLINK_TEST_LIB_DIR'] + if 'FLINK_TEST_LIB_DIR' in env: + flink_source_root = env['FLINK_TEST_LIB_DIR'] + else: + flink_source_root = _find_flink_source_root() + + print("ROOOOT: " + flink_source_root) + for pattern in test_jar_patterns: + pattern = pattern.replace("/", os.path.sep) + test_jars += glob.glob(os.path.join(flink_source_root, pattern)) + return os.path.pathsep.join(test_jars) + + +def construct_program_args(args): + parser = argparse.ArgumentParser() + parser.add_argument("-c", "--class", required=True) + parser.add_argument("cluster_type", choices=["local", "remote", "yarn"]) + parse_result, other_args = parser.parse_known_args(args) + main_class = getattr(parse_result, "class") + cluster_type = parse_result.cluster_type + return namedtuple( + "ProgramArgs", ["main_class", "cluster_type", "other_args"])( + main_class, cluster_type, other_args) + + +def launch_gateway_server_process(env, args): + prepare_environment_variables(env) + program_args = construct_program_args(args) + if program_args.cluster_type == "local": + java_executable = find_java_executable() + log_settings = construct_log_settings(env) + jvm_args = env.get('JVM_ARGS', '') + jvm_opts = get_jvm_opts(env) + classpath = os.pathsep.join( + [construct_flink_classpath(env), construct_hadoop_classpath(env)]) + if "FLINK_TESTING" in env: + classpath = os.pathsep.join([classpath, construct_test_classpath(env)]) + command = [java_executable, jvm_args, "-XX:+IgnoreUnrecognizedVMOptions", + "--add-opens=jdk.proxy2/jdk.proxy2=ALL-UNNAMED"] \ + + jvm_opts + log_settings \ + + ["-cp", classpath, program_args.main_class] + program_args.other_args + else: + command = [os.path.join(env["FLINK_BIN_DIR"], "flink"), "run"] + program_args.other_args \ + + ["-c", program_args.main_class] + preexec_fn = None + if not on_windows(): + def preexec_func(): + # ignore ctrl-c / SIGINT + signal.signal(signal.SIGINT, signal.SIG_IGN) + preexec_fn = preexec_func + return Popen(list(filter(lambda c: len(c) != 0, command)), + stdin=PIPE, stderr=PIPE, preexec_fn=preexec_fn, env=env) + + +if __name__ == "__main__": + launch_gateway_server_process(os.environ, sys.argv[1:]) diff --git a/flink-python/setup.py b/flink-python/setup.py new file mode 100644 index 000000000..438ff271e --- /dev/null +++ b/flink-python/setup.py @@ -0,0 +1,158 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from __future__ import print_function + +import glob +import io +import os +import sys + +from setuptools import setup +from shutil import copy, rmtree +from xml.etree import ElementTree as ET + +PACKAGE_NAME = 'apache-flink-connectors-kafka' +# Source files, directories +CURRENT_DIR = os.path.abspath(os.path.dirname(__file__)) +POM_FILE = os.path.join(CURRENT_DIR, '../pom.xml') +README_FILE = os.path.join(CURRENT_DIR, 'README.txt') + +# Generated files and directories +VERSION_FILE = os.path.join(CURRENT_DIR, 'pyflink/datastream/connectors/kafka_connector_version.py') +LIB_PATH = os.path.join(CURRENT_DIR, 'pyflink/lib') +DEPENDENCY_FILE = os.path.join(CURRENT_DIR, 'dev/dev-requirements.txt') + + +# Removes a file or directory if exists. +def remove_if_exists(file_path): + if os.path.exists(file_path): + if os.path.isfile(file_path): + os.remove(file_path) + if os.path.isdir(file_path): + rmtree(file_path) + + +# Reads the content of the README.txt file. +def readme_content(): + with io.open(README_FILE, 'r', encoding='utf-8') as f: + return f.read() + + +# Reads the parameters used by the setup command. +# The source is the kafka_connector_version.py and the README.txt. +def setup_parameters(): + try: + exec(open(VERSION_FILE).read()) + return locals()['__connector_version__'], locals()['__flink_dependency__'], readme_content() + except IOError: + print("Failed to load PyFlink version file for packaging. " + + "'%s' not found!" % VERSION_FILE, + file=sys.stderr) + sys.exit(-1) + + +# Reads and parses the flink-connector-kafka main pom.xml. +# Based on the version data in the pom.xml prepares the pyflink dir: +# - Generates kafka_connector_version.py +# - Generates dev-requirements.txt +# - Copies the flink-sql-connector-kafka*.jar to the pyflink/lib dir +def prepare_pyflink_dir(): + # source files + pom_root = ET.parse(POM_FILE).getroot() + flink_version = pom_root.findall( + "./{http://maven.apache.org/POM/4.0.0}properties/" + + "{http://maven.apache.org/POM/4.0.0}flink.version" + )[0].text + connector_version = pom_root.findall( + "./{http://maven.apache.org/POM/4.0.0}version")[0].text.replace("-SNAPSHOT", ".dev0") + + flink_dependency = "apache-flink>=" + flink_version + + os.makedirs(LIB_PATH) + connector_jar = \ + glob.glob(CURRENT_DIR + '/target/test-dependencies/flink-sql-connector-kafka*.jar')[0] + copy(connector_jar, LIB_PATH) + + with io.open(VERSION_FILE, 'w', encoding='utf-8') as f: + f.write('# Generated file, do not edit\n') + f.write('__connector_version__ = "' + connector_version + '"\n') + f.write('__flink_dependency__ = "' + flink_dependency + '"\n') + + with io.open(DEPENDENCY_FILE, 'w', encoding='utf-8') as f: + f.write('# Generated file, do not edit\n') + f.write(flink_dependency + '\n') + + +# Main +print("Python version used to package: " + sys.version) + +# Python version check +if sys.version_info < (3, 7): + print("Python versions prior to 3.7 are not supported for PyFlink.", + file=sys.stderr) + sys.exit(-1) + +# Checks the running environment: +# - In the connector source root directory - package preparation +# - Otherwise - package deployment +in_flink_source = os.path.isfile("../flink-connector-kafka/src/main/" + + "java/org/apache/flink/connector/kafka/source/KafkaSource.java") + +# Cleans up the generated files and directories and regenerate them. +if in_flink_source: + remove_if_exists(VERSION_FILE) + remove_if_exists(DEPENDENCY_FILE) + remove_if_exists(LIB_PATH) + prepare_pyflink_dir() + print("\nPreparing Flink Kafka connector package") + +# Reads the current setup data from the kafka_connector_version.py file and the README.txt +(connector_version, flink_dependency, long_description) = setup_parameters() + +print("\nConnector version: " + connector_version) +print("Flink dependency: " + flink_dependency + "\n") + +if in_flink_source: + # Removes temporary directory used by the setup tool + remove_if_exists(PACKAGE_NAME.replace('-', '_') + '.egg-info') + +# Runs the python setup +setup( + name=PACKAGE_NAME, + version=connector_version, + include_package_data=True, + url='https://flink.apache.org', + license='https://www.apache.org/licenses/LICENSE-2.0', + author='Apache Software Foundation', + author_email='dev@flink.apache.org', + python_requires='>=3.8', + install_requires=[flink_dependency], + description='Apache Flink Python Kafka Connector API', + long_description=long_description, + long_description_content_type='text/plain', + zip_safe=False, + classifiers=[ + 'Development Status :: 5 - Production/Stable', + 'License :: OSI Approved :: Apache Software License', + 'Programming Language :: Python :: 3.8', + 'Programming Language :: Python :: 3.9', + 'Programming Language :: Python :: 3.10', + 'Programming Language :: Python :: 3.11'] +) + +print("\nFlink Kafka connector package is ready\n") diff --git a/flink-python/tox.ini b/flink-python/tox.ini new file mode 100644 index 000000000..c21c00f7f --- /dev/null +++ b/flink-python/tox.ini @@ -0,0 +1,51 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +[tox] +# tox (https://tox.readthedocs.io/) is a tool for running tests +# in multiple virtualenvs. This configuration file will run the +# test suite on all supported python versions. +# new environments will be excluded by default unless explicitly added to envlist. +envlist = {py38, py39, py310}-cython + +[testenv] +whitelist_externals = /bin/bash +deps = apache-flink +passenv = * +commands = + python --version + pip install pytest + bash ./dev/integration_test.sh +# Replace the default installation command with a custom retry installation script, because on high-speed +# networks, downloading a package may raise a ConnectionResetError: [Errno 104] Peer reset connection. +install_command = {toxinidir}/dev/install_command.sh {opts} {packages} + +[flake8] +# We follow PEP 8 (https://www.python.org/dev/peps/pep-0008/) with one exception: lines can be +# up to 100 characters in length, not 79. +ignore=E226,E241,E305,E402,E722,E731,E741,W503,W504 +max-line-length=100 +exclude=.tox/*,dev/*,lib/*,target/*,build/*,dist/* + +[mypy] +files=pyflink/datastream/connectors/*.py +ignore_missing_imports = True +strict_optional=False + +[mypy-pyflink.fn_execution.*] +ignore_errors = True diff --git a/pom.xml b/pom.xml index ff8efac36..d4cd25524 100644 --- a/pom.xml +++ b/pom.xml @@ -46,6 +46,7 @@ under the License. flink-connector-kafka flink-sql-connector-kafka flink-connector-kafka-e2e-tests + flink-python