diff --git a/python/dataproc_templates/bigquery/README.md b/python/dataproc_templates/bigquery/README.md index 5ac6d4f9a..4b2a63d5d 100644 --- a/python/dataproc_templates/bigquery/README.md +++ b/python/dataproc_templates/bigquery/README.md @@ -10,6 +10,7 @@ It uses the [Spark BigQuery connector](https://cloud.google.com/dataproc-serverl * `bigquery.gcs.output.location`: Cloud Storage location for output files (format: `gs://BUCKET/...`) * `bigquery.gcs.output.mode`: Output write mode (one of: append,overwrite,ignore,errorifexists) (Defaults to append) #### Optional Arguments +* `bigquery.gcs.output.partition.column`: Partition column name to partition the final output in destination bucket * `bigquery.gcs.output.chartoescapequoteescaping`: Sets a single character used for escaping the escape for the quote character. The default value is escape character when escape and quote characters are different, \0 otherwise * `bigquery.gcs.output.compression`: None * `bigquery.gcs.output.dateformat`: Sets the string that indicates a date format. This applies to date type @@ -38,6 +39,7 @@ usage: main.py [-h] --bigquery.gcs.output.format {avro,parquet,csv,json} --bigquery.gcs.output.location BIGQUERY.GCS.OUTPUT.LOCATION [--bigquery.gcs.output.mode {overwrite,append,ignore,errorifexists}] + [--bigquery.gcs.output.partition.column BIGQUERY.GCS.OUTPUT.PARTITION.COLUMN] [--bigquery.gcs.output.chartoescapequoteescaping BIGQUERY.GCS.OUTPUT.CHARTOESCAPEQUOTEESCAPING] [--bigquery.gcs.output.compression BIGQUERY.GCS.OUTPUT.COMPRESSION] [--bigquery.gcs.output.dateformat BIGQUERY.GCS.OUTPUT.DATEFORMAT] @@ -66,6 +68,8 @@ options: Cloud Storage location for output files --bigquery.gcs.output.mode {overwrite,append,ignore,errorifexists} Output write mode (one of: append,overwrite,ignore,errorifexists) (Defaults to append) + --bigquery.gcs.output.partition.column BIGQUERY.GCS.OUTPUT.PARTITION.COLUMN + Partition column name to partition the final output in destination bucket --bigquery.gcs.output.chartoescapequoteescaping BIGQUERY.GCS.OUTPUT.CHARTOESCAPEQUOTEESCAPING Sets a single character used for escaping the escape for the quote character. The default value is escape character when escape and quote characters are different, \0 otherwise diff --git a/python/dataproc_templates/bigquery/bigquery_to_gcs.py b/python/dataproc_templates/bigquery/bigquery_to_gcs.py index ff96fe41b..5036a3dba 100644 --- a/python/dataproc_templates/bigquery/bigquery_to_gcs.py +++ b/python/dataproc_templates/bigquery/bigquery_to_gcs.py @@ -61,6 +61,13 @@ def parse_args(args: Optional[Sequence[str]] = None) -> Dict[str, Any]: required=True, help='Cloud Storage location for output files' ) + parser.add_argument( + f'--{constants.BQ_GCS_OUTPUT_PARTITION_COLUMN}', + dest=constants.BQ_GCS_OUTPUT_PARTITION_COLUMN, + required=False, + default="", + help='Partition column name to partition the final output in destination bucket' + ) parser.add_argument( f'--{constants.BQ_GCS_OUTPUT_MODE}', dest=constants.BQ_GCS_OUTPUT_MODE, @@ -92,6 +99,7 @@ def run(self, spark: SparkSession, args: Dict[str, Any]) -> None: # Arguments input_table: str = args[constants.BQ_GCS_INPUT_TABLE] output_mode: str = args[constants.BQ_GCS_OUTPUT_MODE] + output_partition_column: str = args[constants.BQ_GCS_OUTPUT_PARTITION_COLUMN] output_location: str = args[constants.BQ_GCS_OUTPUT_LOCATION] output_format: str = args[constants.BQ_GCS_OUTPUT_FORMAT] @@ -108,5 +116,8 @@ def run(self, spark: SparkSession, args: Dict[str, Any]) -> None: .load() # Write - writer: DataFrameWriter = input_data.write.mode(output_mode) + if output_partition_column: + writer: DataFrameWriter = input_data.write.mode(output_mode).partitionBy(output_partition_column) + else: + writer: DataFrameWriter = input_data.write.mode(output_mode) persist_dataframe_to_cloud_storage(writer, args, output_location, output_format, "bigquery.gcs.output.") diff --git a/python/dataproc_templates/util/template_constants.py b/python/dataproc_templates/util/template_constants.py index 9798eb5eb..d7ab397c7 100644 --- a/python/dataproc_templates/util/template_constants.py +++ b/python/dataproc_templates/util/template_constants.py @@ -334,6 +334,7 @@ def get_csv_output_spark_options(prefix): BQ_GCS_INPUT_TABLE = "bigquery.gcs.input.table" BQ_GCS_OUTPUT_FORMAT = "bigquery.gcs.output.format" BQ_GCS_OUTPUT_MODE = "bigquery.gcs.output.mode" +BQ_GCS_OUTPUT_PARTITION_COLUMN = "bigquery.gcs.output.partition.column" BQ_GCS_OUTPUT_LOCATION = "bigquery.gcs.output.location" # GCS To GCS with transformations