From 1df63784fce8351b1dc8fbd6aa88a4ffc8b4109a Mon Sep 17 00:00:00 2001 From: Shubham Pathak Date: Fri, 12 Jan 2024 13:19:02 +0530 Subject: [PATCH 1/3] adding changes for issue 828 to add partition while writing to GCS from BQ --- .../dataproc_templates/bigquery/bigquery_to_gcs.py | 13 ++++++++++++- .../dataproc_templates/util/template_constants.py | 1 + 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/python/dataproc_templates/bigquery/bigquery_to_gcs.py b/python/dataproc_templates/bigquery/bigquery_to_gcs.py index ff96fe41b..5036a3dba 100644 --- a/python/dataproc_templates/bigquery/bigquery_to_gcs.py +++ b/python/dataproc_templates/bigquery/bigquery_to_gcs.py @@ -61,6 +61,13 @@ def parse_args(args: Optional[Sequence[str]] = None) -> Dict[str, Any]: required=True, help='Cloud Storage location for output files' ) + parser.add_argument( + f'--{constants.BQ_GCS_OUTPUT_PARTITION_COLUMN}', + dest=constants.BQ_GCS_OUTPUT_PARTITION_COLUMN, + required=False, + default="", + help='Partition column name to partition the final output in destination bucket' + ) parser.add_argument( f'--{constants.BQ_GCS_OUTPUT_MODE}', dest=constants.BQ_GCS_OUTPUT_MODE, @@ -92,6 +99,7 @@ def run(self, spark: SparkSession, args: Dict[str, Any]) -> None: # Arguments input_table: str = args[constants.BQ_GCS_INPUT_TABLE] output_mode: str = args[constants.BQ_GCS_OUTPUT_MODE] + output_partition_column: str = args[constants.BQ_GCS_OUTPUT_PARTITION_COLUMN] output_location: str = args[constants.BQ_GCS_OUTPUT_LOCATION] output_format: str = args[constants.BQ_GCS_OUTPUT_FORMAT] @@ -108,5 +116,8 @@ def run(self, spark: SparkSession, args: Dict[str, Any]) -> None: .load() # Write - writer: DataFrameWriter = input_data.write.mode(output_mode) + if output_partition_column: + writer: DataFrameWriter = input_data.write.mode(output_mode).partitionBy(output_partition_column) + else: + writer: DataFrameWriter = input_data.write.mode(output_mode) persist_dataframe_to_cloud_storage(writer, args, output_location, output_format, "bigquery.gcs.output.") diff --git a/python/dataproc_templates/util/template_constants.py b/python/dataproc_templates/util/template_constants.py index 9798eb5eb..d7ab397c7 100644 --- a/python/dataproc_templates/util/template_constants.py +++ b/python/dataproc_templates/util/template_constants.py @@ -334,6 +334,7 @@ def get_csv_output_spark_options(prefix): BQ_GCS_INPUT_TABLE = "bigquery.gcs.input.table" BQ_GCS_OUTPUT_FORMAT = "bigquery.gcs.output.format" BQ_GCS_OUTPUT_MODE = "bigquery.gcs.output.mode" +BQ_GCS_OUTPUT_PARTITION_COLUMN = "bigquery.gcs.output.partition.column" BQ_GCS_OUTPUT_LOCATION = "bigquery.gcs.output.location" # GCS To GCS with transformations From 0008d81e9249698e7eee5ca4177c84c25710af24 Mon Sep 17 00:00:00 2001 From: Shubham Pathak Date: Thu, 18 Jan 2024 15:32:01 +0530 Subject: [PATCH 2/3] adding changes for output partition column in readme file for BQ to GCS --- python/dataproc_templates/bigquery/README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/dataproc_templates/bigquery/README.md b/python/dataproc_templates/bigquery/README.md index 5ac6d4f9a..13a2d98c7 100644 --- a/python/dataproc_templates/bigquery/README.md +++ b/python/dataproc_templates/bigquery/README.md @@ -10,6 +10,7 @@ It uses the [Spark BigQuery connector](https://cloud.google.com/dataproc-serverl * `bigquery.gcs.output.location`: Cloud Storage location for output files (format: `gs://BUCKET/...`) * `bigquery.gcs.output.mode`: Output write mode (one of: append,overwrite,ignore,errorifexists) (Defaults to append) #### Optional Arguments +* `bigquery.gcs.output.partition.column`: Partition column name to partition the final output in destination bucket' * `bigquery.gcs.output.chartoescapequoteescaping`: Sets a single character used for escaping the escape for the quote character. The default value is escape character when escape and quote characters are different, \0 otherwise * `bigquery.gcs.output.compression`: None * `bigquery.gcs.output.dateformat`: Sets the string that indicates a date format. This applies to date type @@ -38,6 +39,7 @@ usage: main.py [-h] --bigquery.gcs.output.format {avro,parquet,csv,json} --bigquery.gcs.output.location BIGQUERY.GCS.OUTPUT.LOCATION [--bigquery.gcs.output.mode {overwrite,append,ignore,errorifexists}] + [--bigquery.gcs.output.partition.column BIGQUERY.GCS.OUTPUT.PARTITION.COLUMN] [--bigquery.gcs.output.chartoescapequoteescaping BIGQUERY.GCS.OUTPUT.CHARTOESCAPEQUOTEESCAPING] [--bigquery.gcs.output.compression BIGQUERY.GCS.OUTPUT.COMPRESSION] [--bigquery.gcs.output.dateformat BIGQUERY.GCS.OUTPUT.DATEFORMAT] @@ -66,6 +68,8 @@ options: Cloud Storage location for output files --bigquery.gcs.output.mode {overwrite,append,ignore,errorifexists} Output write mode (one of: append,overwrite,ignore,errorifexists) (Defaults to append) + --bigquery.gcs.output.partition.column BIGQUERY.GCS.OUTPUT.PARTITION.COLUMN + Partition column name to partition the final output in destination bucket --bigquery.gcs.output.chartoescapequoteescaping BIGQUERY.GCS.OUTPUT.CHARTOESCAPEQUOTEESCAPING Sets a single character used for escaping the escape for the quote character. The default value is escape character when escape and quote characters are different, \0 otherwise From 597d3fc211695e5557c7db5a10bbac4627c1d1e1 Mon Sep 17 00:00:00 2001 From: Shubham Pathak Date: Thu, 18 Jan 2024 15:33:39 +0530 Subject: [PATCH 3/3] adding changes for output partition column in readme file for BQ to GCS --- python/dataproc_templates/bigquery/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/dataproc_templates/bigquery/README.md b/python/dataproc_templates/bigquery/README.md index 13a2d98c7..4b2a63d5d 100644 --- a/python/dataproc_templates/bigquery/README.md +++ b/python/dataproc_templates/bigquery/README.md @@ -10,7 +10,7 @@ It uses the [Spark BigQuery connector](https://cloud.google.com/dataproc-serverl * `bigquery.gcs.output.location`: Cloud Storage location for output files (format: `gs://BUCKET/...`) * `bigquery.gcs.output.mode`: Output write mode (one of: append,overwrite,ignore,errorifexists) (Defaults to append) #### Optional Arguments -* `bigquery.gcs.output.partition.column`: Partition column name to partition the final output in destination bucket' +* `bigquery.gcs.output.partition.column`: Partition column name to partition the final output in destination bucket * `bigquery.gcs.output.chartoescapequoteescaping`: Sets a single character used for escaping the escape for the quote character. The default value is escape character when escape and quote characters are different, \0 otherwise * `bigquery.gcs.output.compression`: None * `bigquery.gcs.output.dateformat`: Sets the string that indicates a date format. This applies to date type