Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added support for running python templates in CLUSTER mode #862

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 29 additions & 7 deletions python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
* [TextToBigQuery](/python/dataproc_templates/gcs#text-to-bigquery)


Dataproc Templates (Python - PySpark) submit jobs to Dataproc Serverless using [batches submit pyspark](https://cloud.google.com/sdk/gcloud/reference/dataproc/batches/submit/pyspark).
Dataproc Templates (Python - PySpark) supports submitting jobs to both Dataproc Serverless using [batches submit pyspark](https://cloud.google.com/sdk/gcloud/reference/dataproc/batches/submit/pyspark) and Dataproc Cluster using [jobs submit pyspark](https://cloud.google.com/sdk/gcloud/reference/dataproc/jobs/submit/pyspark)

## Run using PyPi package

In this README, you see instructions on how to submit Dataproc Serverless template jobs.
In this README, you see instructions on how to run the templates.
Currently, 3 options are described:
- Using bin/start.sh
- Using gcloud CLI
Expand Down Expand Up @@ -110,7 +110,33 @@ coverage run \
coverage report --show-missing
```

## Submitting templates to Dataproc Serverless

## Running Templates

The Dataproc Templates (Python - PySpark) support both serverless and cluster modes. By default, serverless mode is used. To run these templates use the `gcloud` CLI directly or the provided `start.sh` shell script.

### Serverless Mode (Default)

Submits job to Dataproc Serverless using the [batches submit pyspark](https://cloud.google.com/sdk/gcloud/reference/dataproc/batches/submit/pyspark) command.

### Cluster Mode

Submits job to a Dataproc Standard cluster using the [jobs submit pyspark](https://cloud.google.com/sdk/gcloud/reference/dataproc/jobs/submit/pyspark) command.

To run the templates on an existing cluster, you must additionally specify the `JOB_TYPE=CLUSTER` and `CLUSTER=<full clusterId>` environment variables. For example:

```sh
export GCP_PROJECT=my-gcp-project
export REGION=gcp-region
export GCS_STAGING_LOCATION=gs://my-bucket/temp
export JOB_TYPE=CLUSTER
export CLUSTER=${DATAPROC_CLUSTER_NAME}
./bin/start.sh \
-- --template HIVETOBIGQUERY
```


## Submitting templates

A shell script is provided to:
- Build the python package
Expand All @@ -126,10 +152,6 @@ When submitting, there are 3 types of properties/parameters for the user to prov
- The **--log_level** parameter is optional, it defaults to INFO.
- Possible choices are the Spark log levels: ["ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN"].





<hr>

**bin/start.sh usage**:
Expand Down
60 changes: 45 additions & 15 deletions python/bin/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ OPT_PY_FILES="--py-files=${PROJECT_ROOT_DIR}/${PACKAGE_EGG_FILE}"
if [ -n "${SUBNET}" ]; then
OPT_SUBNET="--subnet=${SUBNET}"
fi
if [ -n "${CLUSTER}" ]; then
OPT_CLUSTER="--cluster=${CLUSTER}"
fi
if [ -n "${HISTORY_SERVER_CLUSTER}" ]; then
OPT_HISTORY_SERVER_CLUSTER="--history-server-cluster=${HISTORY_SERVER_CLUSTER}"
fi
Expand All @@ -71,6 +74,10 @@ fi
if [ -n "${SPARK_PROPERTIES}" ]; then
OPT_PROPERTIES="--properties=${SPARK_PROPERTIES}"
fi
if [ -z "${JOB_TYPE}" ]; then
JOB_TYPE=SERVERLESS
fi

#if Hbase catalog is passed, then required hbase dependency are copied to staging location and added to jars
if [ -n "${CATALOG}" ]; then
echo "Downloading Hbase jar dependency"
Expand Down Expand Up @@ -100,23 +107,46 @@ if [ -n "${HBASE_SITE_PATH}" ]; then
fi
fi

command=$(cat << EOF
gcloud beta dataproc batches submit pyspark \
${PROJECT_ROOT_DIR}/main.py \
${OPT_SPARK_VERSION} \
${OPT_PROJECT} \
${OPT_REGION} \
${OPT_JARS} \
${OPT_LABELS} \
${OPT_DEPS_BUCKET} \
${OPT_FILES} \
${OPT_PY_FILES} \
${OPT_PROPERTIES} \
${OPT_SUBNET} \
${OPT_HISTORY_SERVER_CLUSTER} \
${OPT_METASTORE_SERVICE}
# Construct the command based on JOB_TYPE
if [ "${JOB_TYPE}" == "CLUSTER" ]; then
echo "JOB_TYPE is CLUSTER, so will submit on an existing Dataproc cluster"
check_required_envvar CLUSTER
command=$(cat << EOF
gcloud dataproc jobs submit pyspark \
${PROJECT_ROOT_DIR}/main.py \
${OPT_PROJECT} \
${OPT_REGION} \
${OPT_CLUSTER} \
${OPT_JARS} \
${OPT_LABELS} \
${OPT_FILES} \
${OPT_PY_FILES} \
${OPT_PROPERTIES}
EOF
)
elif [ "${JOB_TYPE}" == "SERVERLESS" ]; then
echo "JOB_TYPE is SERVERLESS, so will submit on serverless Spark"
command=$(cat << EOF
gcloud beta dataproc batches submit pyspark \
${PROJECT_ROOT_DIR}/main.py \
${OPT_SPARK_VERSION} \
${OPT_PROJECT} \
${OPT_REGION} \
${OPT_JARS} \
${OPT_LABELS} \
${OPT_DEPS_BUCKET} \
${OPT_FILES} \
${OPT_PY_FILES} \
${OPT_PROPERTIES} \
${OPT_SUBNET} \
${OPT_HISTORY_SERVER_CLUSTER} \
${OPT_METASTORE_SERVICE}
EOF
)
else
echo "Unknown JOB_TYPE \"${JOB_TYPE}\""
exit 1
fi

echo "Triggering Spark Submit job"
echo ${command} "$@"
Expand Down