From d68048b06a046cc67ff431fdd8a687b0a1f43603 Mon Sep 17 00:00:00 2001 From: prathit06 Date: Mon, 30 Sep 2024 14:26:01 -0700 Subject: [PATCH] [SPARK-49833][K8S] Support user-defined annotations for OnDemand PVCs ### What changes were proposed in this pull request? Currently for on-demand PVCs we cannot add user-defined annotations, user-defined annotations can greatly help to add tags in underlying storage. For e.g. if we add `k8s-pvc-tagger/tags` annotation & provide a map like {"env":"dev"}, the same tags are reflected on underlying storage (for e.g. AWS EBS) ### Why are the changes needed? Changes are needed so users can set custom annotations to PVCs ### Does this PR introduce _any_ user-facing change? It does not break any existing behaviour but adds a new feature/improvement to enable custom annotations additions to ondemand PVCs ### How was this patch tested? This was tested in internal/production k8 cluster ### Was this patch authored or co-authored using generative AI tooling? No Closes #48299 from prathit06/ondemand-pvc-annotations. Authored-by: prathit06 Signed-off-by: Dongjoon Hyun --- docs/running-on-kubernetes.md | 18 +++++ .../org/apache/spark/deploy/k8s/Config.scala | 1 + .../deploy/k8s/KubernetesVolumeSpec.scala | 3 +- .../deploy/k8s/KubernetesVolumeUtils.scala | 14 +++- .../features/MountVolumesFeatureStep.scala | 7 +- .../spark/deploy/k8s/KubernetesTestConf.scala | 8 +- .../k8s/KubernetesVolumeUtilsSuite.scala | 42 +++++++++- .../MountVolumesFeatureStepSuite.scala | 77 +++++++++++++++++++ 8 files changed, 160 insertions(+), 10 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index d8be32e047717..f8b935fd77f5c 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -1191,6 +1191,15 @@ See the [configuration page](configuration.html) for information on Spark config 4.0.0 + + spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].annotation.[AnnotationName] + (none) + + Configure Kubernetes Volume annotations passed to the Kubernetes with AnnotationName as key having specified value, must conform with Kubernetes annotations format. For example, + spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.annotation.foo=bar. + + 4.0.0 + spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.path (none) @@ -1236,6 +1245,15 @@ See the [configuration page](configuration.html) for information on Spark config 4.0.0 + + spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].annotation.[AnnotationName] + (none) + + Configure Kubernetes Volume annotations passed to the Kubernetes with AnnotationName as key having specified value, must conform with Kubernetes annotations format. For example, + spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.annotation.foo=bar. + + 4.0.0 + spark.kubernetes.local.dirs.tmpfs false diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 9c50f8ddb00cc..db7fc85976c2a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -779,6 +779,7 @@ private[spark] object Config extends Logging { val KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY = "options.sizeLimit" val KUBERNETES_VOLUMES_OPTIONS_SERVER_KEY = "options.server" val KUBERNETES_VOLUMES_LABEL_KEY = "label." + val KUBERNETES_VOLUMES_ANNOTATION_KEY = "annotation." val KUBERNETES_DRIVER_ENV_PREFIX = "spark.kubernetes.driverEnv." val KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH = 253 diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala index b4fe414e3cde5..b7113a562fa06 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala @@ -25,7 +25,8 @@ private[spark] case class KubernetesPVCVolumeConf( claimName: String, storageClass: Option[String] = None, size: Option[String] = None, - labels: Option[Map[String, String]] = None) + labels: Option[Map[String, String]] = None, + annotations: Option[Map[String, String]] = None) extends KubernetesVolumeSpecificConf private[spark] case class KubernetesEmptyDirVolumeConf( diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala index 88bb998d88b7d..95821a909f351 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala @@ -47,6 +47,7 @@ object KubernetesVolumeUtils { val subPathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY" val subPathExprKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_SUBPATHEXPR_KEY" val labelKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_LABEL_KEY" + val annotationKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_ANNOTATION_KEY" verifyMutuallyExclusiveOptionKeys(properties, subPathKey, subPathExprKey) val volumeLabelsMap = properties @@ -54,6 +55,11 @@ object KubernetesVolumeUtils { .map { case (k, v) => k.replaceAll(labelKey, "") -> v } + val volumeAnnotationsMap = properties + .filter(_._1.startsWith(annotationKey)) + .map { + case (k, v) => k.replaceAll(annotationKey, "") -> v + } KubernetesVolumeSpec( volumeName = volumeName, @@ -62,7 +68,7 @@ object KubernetesVolumeUtils { mountSubPathExpr = properties.getOrElse(subPathExprKey, ""), mountReadOnly = properties.get(readOnlyKey).exists(_.toBoolean), volumeConf = parseVolumeSpecificConf(properties, - volumeType, volumeName, Option(volumeLabelsMap))) + volumeType, volumeName, Option(volumeLabelsMap), Option(volumeAnnotationsMap))) }.toSeq } @@ -86,7 +92,8 @@ object KubernetesVolumeUtils { options: Map[String, String], volumeType: String, volumeName: String, - labels: Option[Map[String, String]]): KubernetesVolumeSpecificConf = { + labels: Option[Map[String, String]], + annotations: Option[Map[String, String]]): KubernetesVolumeSpecificConf = { volumeType match { case KUBERNETES_VOLUMES_HOSTPATH_TYPE => val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY" @@ -107,7 +114,8 @@ object KubernetesVolumeUtils { options(claimNameKey), options.get(storageClassKey), options.get(sizeLimitKey), - labels) + labels, + annotations) case KUBERNETES_VOLUMES_EMPTYDIR_TYPE => val mediumKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala index eea4604010b21..3d89696f19fcc 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala @@ -74,7 +74,7 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf) new VolumeBuilder() .withHostPath(new HostPathVolumeSource(hostPath, volumeType)) - case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size, labels) => + case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size, labels, annotations) => val claimName = conf match { case c: KubernetesExecutorConf => claimNameTemplate @@ -91,12 +91,17 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf) case Some(customLabelsMap) => (customLabelsMap ++ defaultVolumeLabels).asJava case None => defaultVolumeLabels.asJava } + val volumeAnnotations = annotations match { + case Some(value) => value.asJava + case None => Map[String, String]().asJava + } additionalResources.append(new PersistentVolumeClaimBuilder() .withKind(PVC) .withApiVersion("v1") .withNewMetadata() .withName(claimName) .addToLabels(volumeLabels) + .addToAnnotations(volumeAnnotations) .endMetadata() .withNewSpec() .withStorageClassName(storageClass.get) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala index e0ddcd3d416f0..e5ed79718d733 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala @@ -118,7 +118,7 @@ object KubernetesTestConf { KUBERNETES_VOLUMES_OPTIONS_PATH_KEY -> hostPath, KUBERNETES_VOLUMES_OPTIONS_TYPE_KEY -> volumeType)) - case KubernetesPVCVolumeConf(claimName, storageClass, sizeLimit, labels) => + case KubernetesPVCVolumeConf(claimName, storageClass, sizeLimit, labels, annotations) => val sconf = storageClass .map { s => (KUBERNETES_VOLUMES_OPTIONS_CLAIM_STORAGE_CLASS_KEY, s) }.toMap val lconf = sizeLimit.map { l => (KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY, l) }.toMap @@ -126,9 +126,13 @@ object KubernetesTestConf { case Some(value) => value.map { case(k, v) => s"label.$k" -> v } case None => Map() } + val aannotations = annotations match { + case Some(value) => value.map { case (k, v) => s"annotation.$k" -> v } + case None => Map() + } (KUBERNETES_VOLUMES_PVC_TYPE, Map(KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY -> claimName) ++ - sconf ++ lconf ++ llabels) + sconf ++ lconf ++ llabels ++ aannotations) case KubernetesEmptyDirVolumeConf(medium, sizeLimit) => val mconf = medium.map { m => (KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY, m) }.toMap diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala index 1e62db725fb6e..3c57cba9a7ff0 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala @@ -96,7 +96,7 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite { assert(volumeSpec.mountPath === "/path") assert(volumeSpec.mountReadOnly) assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] === - KubernetesPVCVolumeConf("claimName", labels = Some(Map()))) + KubernetesPVCVolumeConf("claimName", labels = Some(Map()), annotations = Some(Map()))) } test("SPARK-49598: Parses persistentVolumeClaim volumes correctly with labels") { @@ -113,7 +113,8 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite { assert(volumeSpec.mountReadOnly) assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] === KubernetesPVCVolumeConf(claimName = "claimName", - labels = Some(Map("env" -> "test", "foo" -> "bar")))) + labels = Some(Map("env" -> "test", "foo" -> "bar")), + annotations = Some(Map()))) } test("SPARK-49598: Parses persistentVolumeClaim volumes & puts " + @@ -128,7 +129,8 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite { assert(volumeSpec.mountPath === "/path") assert(volumeSpec.mountReadOnly) assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] === - KubernetesPVCVolumeConf(claimName = "claimName", labels = Some(Map()))) + KubernetesPVCVolumeConf(claimName = "claimName", labels = Some(Map()), + annotations = Some(Map()))) } test("Parses emptyDir volumes correctly") { @@ -280,4 +282,38 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite { }.getMessage assert(m.contains("smaller than 1KiB. Missing units?")) } + + test("SPARK-49833: Parses persistentVolumeClaim volumes correctly with annotations") { + val sparkConf = new SparkConf(false) + sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path") + sparkConf.set("test.persistentVolumeClaim.volumeName.mount.readOnly", "true") + sparkConf.set("test.persistentVolumeClaim.volumeName.options.claimName", "claimName") + sparkConf.set("test.persistentVolumeClaim.volumeName.annotation.key1", "value1") + sparkConf.set("test.persistentVolumeClaim.volumeName.annotation.key2", "value2") + + val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head + assert(volumeSpec.volumeName === "volumeName") + assert(volumeSpec.mountPath === "/path") + assert(volumeSpec.mountReadOnly) + assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] === + KubernetesPVCVolumeConf(claimName = "claimName", + labels = Some(Map()), + annotations = Some(Map("key1" -> "value1", "key2" -> "value2")))) + } + + test("SPARK-49833: Parses persistentVolumeClaim volumes & puts " + + "annotations as empty Map if not provided") { + val sparkConf = new SparkConf(false) + sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path") + sparkConf.set("test.persistentVolumeClaim.volumeName.mount.readOnly", "true") + sparkConf.set("test.persistentVolumeClaim.volumeName.options.claimName", "claimName") + + val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head + assert(volumeSpec.volumeName === "volumeName") + assert(volumeSpec.mountPath === "/path") + assert(volumeSpec.mountReadOnly) + assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] === + KubernetesPVCVolumeConf(claimName = "claimName", labels = Some(Map()), + annotations = Some(Map()))) + } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala index c94a7a6ec26a7..293773ddb9ec5 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala @@ -496,4 +496,81 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { assert(mounts(1).getMountPath === "/tmp/bar") assert(mounts(1).getSubPath === "bar") } + + test("SPARK-49833: Create and mounts persistentVolumeClaims in driver with annotations") { + val volumeConf = KubernetesVolumeSpec( + "testVolume", + "/tmp", + "", + "", + true, + KubernetesPVCVolumeConf(claimName = MountVolumesFeatureStep.PVC_ON_DEMAND, + storageClass = Some("gp3"), + size = Some("1Mi"), + annotations = Some(Map("env" -> "test"))) + ) + + val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf)) + val step = new MountVolumesFeatureStep(kubernetesConf) + val configuredPod = step.configurePod(SparkPod.initialPod()) + assert(configuredPod.pod.getSpec.getVolumes.size() === 1) + val pvcClaim = configuredPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim + assert(pvcClaim.getClaimName.endsWith("-driver-pvc-0")) + } + + test("SPARK-49833: Create and mounts persistentVolumeClaims in executors with annotations") { + val volumeConf = KubernetesVolumeSpec( + "testVolume", + "/tmp", + "", + "", + true, + KubernetesPVCVolumeConf(claimName = MountVolumesFeatureStep.PVC_ON_DEMAND, + storageClass = Some("gp3"), + size = Some("1Mi"), + annotations = Some(Map("env" -> "exec-test"))) + ) + + val executorConf = KubernetesTestConf.createExecutorConf(volumes = Seq(volumeConf)) + val executorStep = new MountVolumesFeatureStep(executorConf) + val executorPod = executorStep.configurePod(SparkPod.initialPod()) + + assert(executorPod.pod.getSpec.getVolumes.size() === 1) + val executorPVC = executorPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim + assert(executorPVC.getClaimName.endsWith("-exec-1-pvc-0")) + } + + test("SPARK-49833: Mount multiple volumes to executor with annotations") { + val pvcVolumeConf1 = KubernetesVolumeSpec( + "checkpointVolume1", + "/checkpoints1", + "", + "", + true, + KubernetesPVCVolumeConf(claimName = "pvcClaim1", + storageClass = Some("gp3"), + size = Some("1Mi"), + annotations = Some(Map("env1" -> "exec-test-1"))) + ) + + val pvcVolumeConf2 = KubernetesVolumeSpec( + "checkpointVolume2", + "/checkpoints2", + "", + "", + true, + KubernetesPVCVolumeConf(claimName = "pvcClaim2", + storageClass = Some("gp3"), + size = Some("1Mi"), + annotations = Some(Map("env2" -> "exec-test-2"))) + ) + + val kubernetesConf = KubernetesTestConf.createExecutorConf( + volumes = Seq(pvcVolumeConf1, pvcVolumeConf2)) + val step = new MountVolumesFeatureStep(kubernetesConf) + val configuredPod = step.configurePod(SparkPod.initialPod()) + + assert(configuredPod.pod.getSpec.getVolumes.size() === 2) + assert(configuredPod.container.getVolumeMounts.size() === 2) + } }