Skip to content

Commit

Permalink
feat: add support for packages param (#79)
Browse files Browse the repository at this point in the history
  • Loading branch information
hussein-awala authored Jul 17, 2024
1 parent 5206c4e commit 3cfea92
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ and the CLI. Here is a list of the available environment variables:
| SPARK_ON_K8S_SERVICE_ACCOUNT | The service account to use | spark |
| SPARK_ON_K8S_SPARK_CONF | The spark configuration to use | {} |
| SPARK_ON_K8S_CLASS_NAME | The class name to use | |
| SPARK_ON_K8S_PACKAGES | The maven packages list to add to the classpath |
| SPARK_ON_K8S_APP_ARGUMENTS | The arguments to pass to the app | [] |
| SPARK_ON_K8S_APP_WAITER | The waiter to use to wait for the app to finish | no_wait |
| SPARK_ON_K8S_IMAGE_PULL_POLICY | The image pull policy to use | IfNotPresent |
Expand Down
4 changes: 4 additions & 0 deletions spark_on_k8s/airflow/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class SparkOnK8SOperator(BaseOperator):
the format %Y%m%d%H%M%S prefixed with a dash.
spark_conf (dict[str, str], optional): Spark configuration. Defaults to None.
class_name (str, optional): Spark application class name. Defaults to None.
packages: List of maven coordinates of jars to include in the classpath. Defaults to None.
app_arguments (list[str], optional): Spark application arguments. Defaults to None.
app_waiter (Literal["no_wait", "wait", "log"], optional): Spark application waiter.
Defaults to "wait".
Expand Down Expand Up @@ -124,6 +125,7 @@ def __init__(
app_id_suffix: str = None,
spark_conf: dict[str, str] | None = None,
class_name: str | None = None,
packages: list[str] | None = None,
app_arguments: list[str] | None = None,
app_waiter: Literal["no_wait", "wait", "log"] = "wait",
image_pull_policy: Literal["Always", "Never", "IfNotPresent"] = "IfNotPresent",
Expand Down Expand Up @@ -160,6 +162,7 @@ def __init__(
self.app_id_suffix = app_id_suffix
self.spark_conf = spark_conf
self.class_name = class_name
self.packages = packages
self.app_arguments = app_arguments
self.app_waiter = app_waiter
self.image_pull_policy = image_pull_policy
Expand Down Expand Up @@ -302,6 +305,7 @@ def _submit_new_job(self, context: Context):
app_name=self.app_name,
spark_conf=self.spark_conf,
class_name=self.class_name,
packages=self.packages,
app_arguments=self.app_arguments,
app_waiter="no_wait",
image_pull_policy=self.image_pull_policy,
Expand Down
8 changes: 8 additions & 0 deletions spark_on_k8s/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def submit_app(
app_name: str | ArgNotSet = NOTSET,
spark_conf: dict[str, str] | ArgNotSet = NOTSET,
class_name: str | ArgNotSet = NOTSET,
packages: list[str] | ArgNotSet = NOTSET,
app_arguments: list[str] | ArgNotSet = NOTSET,
app_id_suffix: Callable[[], str] | ArgNotSet = NOTSET,
app_waiter: Literal["no_wait", "wait", "log"] | ArgNotSet = NOTSET,
Expand Down Expand Up @@ -148,6 +149,7 @@ def submit_app(
`spark-app{app_id_suffix()}`
spark_conf: Dictionary of spark configuration to pass to the application
class_name: Name of the class to execute
packages: List of maven coordinates of jars to include in the classpath
app_arguments: List of arguments to pass to the application
app_id_suffix: Function to generate a suffix for the application ID, defaults to
`default_app_id_suffix`
Expand Down Expand Up @@ -204,6 +206,10 @@ def submit_app(
spark_conf = Configuration.SPARK_ON_K8S_SPARK_CONF
if class_name is NOTSET:
class_name = Configuration.SPARK_ON_K8S_CLASS_NAME
if packages is NOTSET:
packages = (
Configuration.SPARK_ON_K8S_PACKAGES.split(",") if Configuration.SPARK_ON_K8S_PACKAGES else []
)
if app_arguments is NOTSET:
app_arguments = Configuration.SPARK_ON_K8S_APP_ARGUMENTS
if app_id_suffix is NOTSET:
Expand Down Expand Up @@ -330,6 +336,8 @@ def submit_app(
driver_command_args = ["driver", "--master", "k8s://https://kubernetes.default.svc.cluster.local:443"]
if class_name:
driver_command_args.extend(["--class", class_name])
if packages:
driver_command_args.extend(["--packages", ",".join(packages)])
driver_command_args.extend(
self._spark_config_to_arguments({**basic_conf, **spark_conf}) + [app_path, *main_class_parameters]
)
Expand Down
1 change: 1 addition & 0 deletions spark_on_k8s/utils/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class Configuration:
SPARK_ON_K8S_APP_NAME = getenv("SPARK_ON_K8S_APP_NAME")
SPARK_ON_K8S_SPARK_CONF = json.loads(getenv("SPARK_ON_K8S_SPARK_CONF", "{}"))
SPARK_ON_K8S_CLASS_NAME = getenv("SPARK_ON_K8S_CLASS_NAME")
SPARK_ON_K8S_PACKAGES = getenv("SPARK_ON_K8S_PACKAGES", "")
SPARK_ON_K8S_APP_ARGUMENTS = json.loads(getenv("SPARK_ON_K8S_APP_ARGUMENTS", "[]"))
SPARK_ON_K8S_APP_WAITER = getenv("SPARK_ON_K8S_APP_WAITER", "no_wait")
SPARK_ON_K8S_IMAGE_PULL_POLICY = getenv("SPARK_ON_K8S_IMAGE_PULL_POLICY", "IfNotPresent")
Expand Down
3 changes: 3 additions & 0 deletions tests/airflow/test_operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def test_execute(self, mock_submit_app):
app_arguments=["100000"],
app_name="pyspark-job-example",
service_account="spark",
packages=["some-package"],
app_waiter="no_wait",
driver_resources=PodResources(cpu=1, memory=1024, memory_overhead=512),
executor_resources=PodResources(cpu=1, memory=1024, memory_overhead=512),
Expand Down Expand Up @@ -76,6 +77,7 @@ def test_execute(self, mock_submit_app):
ui_reverse_proxy=True,
spark_conf=None,
class_name=None,
packages=["some-package"],
secret_values=None,
volumes=None,
driver_volume_mounts=None,
Expand Down Expand Up @@ -202,6 +204,7 @@ def test_rendering_templates(self, mock_submit_app):
"KEY2": "value from connection",
},
class_name=None,
packages=None,
volumes=None,
driver_volume_mounts=None,
executor_volume_mounts=None,
Expand Down

0 comments on commit 3cfea92

Please sign in to comment.