Skip to content

Commit

Permalink
Merge pull request #902 from GoogleCloudPlatform/feature/issue-828-pa…
Browse files Browse the repository at this point in the history
…rtition-output-BQ-to-GCS

feat: Issue 828 add partition column functionality from bq to gcs
  • Loading branch information
shubhampathakk authored Jan 18, 2024
2 parents 7bc3e09 + 597d3fc commit 7ba34dd
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 1 deletion.
4 changes: 4 additions & 0 deletions python/dataproc_templates/bigquery/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ It uses the [Spark BigQuery connector](https://cloud.google.com/dataproc-serverl
* `bigquery.gcs.output.location`: Cloud Storage location for output files (format: `gs://BUCKET/...`)
* `bigquery.gcs.output.mode`: Output write mode (one of: append,overwrite,ignore,errorifexists) (Defaults to append)
#### Optional Arguments
* `bigquery.gcs.output.partition.column`: Partition column name to partition the final output in destination bucket
* `bigquery.gcs.output.chartoescapequoteescaping`: Sets a single character used for escaping the escape for the quote character. The default value is escape character when escape and quote characters are different, \0 otherwise
* `bigquery.gcs.output.compression`: None
* `bigquery.gcs.output.dateformat`: Sets the string that indicates a date format. This applies to date type
Expand Down Expand Up @@ -38,6 +39,7 @@ usage: main.py [-h]
--bigquery.gcs.output.format {avro,parquet,csv,json}
--bigquery.gcs.output.location BIGQUERY.GCS.OUTPUT.LOCATION
[--bigquery.gcs.output.mode {overwrite,append,ignore,errorifexists}]
[--bigquery.gcs.output.partition.column BIGQUERY.GCS.OUTPUT.PARTITION.COLUMN]
[--bigquery.gcs.output.chartoescapequoteescaping BIGQUERY.GCS.OUTPUT.CHARTOESCAPEQUOTEESCAPING]
[--bigquery.gcs.output.compression BIGQUERY.GCS.OUTPUT.COMPRESSION]
[--bigquery.gcs.output.dateformat BIGQUERY.GCS.OUTPUT.DATEFORMAT]
Expand Down Expand Up @@ -66,6 +68,8 @@ options:
Cloud Storage location for output files
--bigquery.gcs.output.mode {overwrite,append,ignore,errorifexists}
Output write mode (one of: append,overwrite,ignore,errorifexists) (Defaults to append)
--bigquery.gcs.output.partition.column BIGQUERY.GCS.OUTPUT.PARTITION.COLUMN
Partition column name to partition the final output in destination bucket
--bigquery.gcs.output.chartoescapequoteescaping BIGQUERY.GCS.OUTPUT.CHARTOESCAPEQUOTEESCAPING
Sets a single character used for escaping the escape for the quote character. The default value is escape character when escape and quote characters are
different, \0 otherwise
Expand Down
13 changes: 12 additions & 1 deletion python/dataproc_templates/bigquery/bigquery_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ def parse_args(args: Optional[Sequence[str]] = None) -> Dict[str, Any]:
required=True,
help='Cloud Storage location for output files'
)
parser.add_argument(
f'--{constants.BQ_GCS_OUTPUT_PARTITION_COLUMN}',
dest=constants.BQ_GCS_OUTPUT_PARTITION_COLUMN,
required=False,
default="",
help='Partition column name to partition the final output in destination bucket'
)
parser.add_argument(
f'--{constants.BQ_GCS_OUTPUT_MODE}',
dest=constants.BQ_GCS_OUTPUT_MODE,
Expand Down Expand Up @@ -92,6 +99,7 @@ def run(self, spark: SparkSession, args: Dict[str, Any]) -> None:
# Arguments
input_table: str = args[constants.BQ_GCS_INPUT_TABLE]
output_mode: str = args[constants.BQ_GCS_OUTPUT_MODE]
output_partition_column: str = args[constants.BQ_GCS_OUTPUT_PARTITION_COLUMN]

output_location: str = args[constants.BQ_GCS_OUTPUT_LOCATION]
output_format: str = args[constants.BQ_GCS_OUTPUT_FORMAT]
Expand All @@ -108,5 +116,8 @@ def run(self, spark: SparkSession, args: Dict[str, Any]) -> None:
.load()

# Write
writer: DataFrameWriter = input_data.write.mode(output_mode)
if output_partition_column:
writer: DataFrameWriter = input_data.write.mode(output_mode).partitionBy(output_partition_column)
else:
writer: DataFrameWriter = input_data.write.mode(output_mode)
persist_dataframe_to_cloud_storage(writer, args, output_location, output_format, "bigquery.gcs.output.")
1 change: 1 addition & 0 deletions python/dataproc_templates/util/template_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ def get_csv_output_spark_options(prefix):
BQ_GCS_INPUT_TABLE = "bigquery.gcs.input.table"
BQ_GCS_OUTPUT_FORMAT = "bigquery.gcs.output.format"
BQ_GCS_OUTPUT_MODE = "bigquery.gcs.output.mode"
BQ_GCS_OUTPUT_PARTITION_COLUMN = "bigquery.gcs.output.partition.column"
BQ_GCS_OUTPUT_LOCATION = "bigquery.gcs.output.location"

# GCS To GCS with transformations
Expand Down

0 comments on commit 7ba34dd

Please sign in to comment.