Skip to content

Commit

Permalink
Support mounting volumes in the driver
Browse files Browse the repository at this point in the history
  • Loading branch information
hussein-awala committed Feb 7, 2024
1 parent 1670a26 commit 0b2bf77
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 0 deletions.
14 changes: 14 additions & 0 deletions spark_on_k8s/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ def submit_app(
should_print: bool | ArgNotSet = NOTSET,
secret_values: dict[str, str] | ArgNotSet = NOTSET,
driver_env_vars_from_secrets: list[str] | ArgNotSet = NOTSET,
volumes: list[k8s.V1Volume] | ArgNotSet = NOTSET,
driver_volume_mounts: list[k8s.V1VolumeMount] | ArgNotSet = NOTSET,
executor_volume_mounts: list[k8s.V1VolumeMount] | ArgNotSet = NOTSET,
) -> str:
"""Submit a Spark app to Kubernetes
Expand Down Expand Up @@ -152,6 +155,9 @@ def submit_app(
secret_values: Dictionary of secret values to pass to the application as environment variables
driver_env_vars_from_secrets: List of secret names to load environment variables from for
the driver
volumes: List of volumes to mount to the driver and/or executors
driver_volume_mounts: List of volume mounts to mount to the driver
executor_volume_mounts: List of volume mounts to mount to the executors
Returns:
Name of the Spark application pod
Expand Down Expand Up @@ -226,6 +232,12 @@ def submit_app(
driver_env_vars_from_secrets = Configuration.SPARK_ON_K8S_DRIVER_ENV_VARS_FROM_SECRET
if driver_env_vars_from_secrets:
env_from_secrets.extend(driver_env_vars_from_secrets)
if volumes is NOTSET:
volumes = []
if driver_volume_mounts is NOTSET:
driver_volume_mounts = []
if executor_volume_mounts is NOTSET:
executor_volume_mounts = []

spark_conf = spark_conf or {}
main_class_parameters = app_arguments or []
Expand Down Expand Up @@ -292,6 +304,8 @@ def submit_app(
},
},
env_from_secrets=env_from_secrets,
volumes=volumes,
volume_mounts=driver_volume_mounts,
)
with self.k8s_client_manager.client() as client:
api = k8s.CoreV1Api(client)
Expand Down
9 changes: 9 additions & 0 deletions spark_on_k8s/utils/app_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,8 @@ def create_spark_pod_spec(
image_pull_policy: Literal["Always", "Never", "IfNotPresent"] = "IfNotPresent",
extra_labels: dict[str, str] | None = None,
env_from_secrets: list[str] | None = None,
volumes: list[k8s.V1Volume] | None = None,
volume_mounts: list[k8s.V1VolumeMount] | None = None,
) -> k8s.V1PodTemplateSpec:
"""Create a pod spec for a Spark application
Expand All @@ -336,6 +338,8 @@ def create_spark_pod_spec(
image_pull_policy: Image pull policy for the driver and executors, defaults to "IfNotPresent"
extra_labels: Dictionary of extra labels to add to the pod template
env_from_secrets: List of secrets to load environment variables from
volumes: List of volumes to mount in the pod
volume_mounts: List of volume mounts to mount in the container
Returns:
Pod template spec for the Spark application
Expand All @@ -361,8 +365,10 @@ def create_spark_pod_spec(
args=args,
image_pull_policy=image_pull_policy,
env_from_secrets=env_from_secrets,
volume_mounts=volume_mounts,
)
],
volumes=volumes,
)
template = k8s.V1PodTemplateSpec(
metadata=pod_metadata,
Expand All @@ -380,6 +386,7 @@ def create_driver_container(
args: list[str] | None = None,
image_pull_policy: Literal["Always", "Never", "IfNotPresent"] = "IfNotPresent",
env_from_secrets: list[str] | None = None,
volume_mounts: list[k8s.V1VolumeMount] | None = None,
) -> k8s.V1Container:
"""Create a container spec for the Spark driver
Expand All @@ -391,6 +398,7 @@ def create_driver_container(
args: List of arguments to pass to the container
image_pull_policy: Image pull policy for the driver and executors, defaults to "IfNotPresent"
env_from_secrets: List of secrets to load environment variables from
volume_mounts: List of volume mounts to mount in the container
Returns:
Container spec for the Spark driver
Expand Down Expand Up @@ -432,6 +440,7 @@ def create_driver_container(
)
for secret_name in (env_from_secrets or [])
],
volume_mounts=volume_mounts,
)

@staticmethod
Expand Down

0 comments on commit 0b2bf77

Please sign in to comment.