Skip to content

Commit

Permalink
[SPARK-49833][K8S] Support user-defined annotations for OnDemand PVCs
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Currently for on-demand PVCs we cannot add user-defined annotations, user-defined annotations can greatly help to add tags in underlying storage.
For e.g. if we add `k8s-pvc-tagger/tags` annotation & provide a map like {"env":"dev"}, the same tags are reflected on underlying storage (for e.g. AWS EBS)

### Why are the changes needed?
Changes are needed so users can set custom annotations to PVCs

### Does this PR introduce _any_ user-facing change?
It does not break any existing behaviour but adds a new feature/improvement to enable custom annotations additions to ondemand PVCs

### How was this patch tested?
This was tested in internal/production k8 cluster

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #48299 from prathit06/ondemand-pvc-annotations.

Authored-by: prathit06 <malik.prathit@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
prathit06 authored and dongjoon-hyun committed Sep 30, 2024
1 parent a7fa270 commit d68048b
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 10 deletions.
18 changes: 18 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -1191,6 +1191,15 @@ See the [configuration page](configuration.html) for information on Spark config
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].annotation.[AnnotationName]</code></td>
<td>(none)</td>
<td>
Configure <a href="https://kubernetes.io/docs/concepts/storage/volumes/">Kubernetes Volume</a> annotations passed to the Kubernetes with <code>AnnotationName</code> as key having specified value, must conform with Kubernetes annotations format. For example,
<code>spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.annotation.foo=bar</code>.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.path</code></td>
<td>(none)</td>
Expand Down Expand Up @@ -1236,6 +1245,15 @@ See the [configuration page](configuration.html) for information on Spark config
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].annotation.[AnnotationName]</code></td>
<td>(none)</td>
<td>
Configure <a href="https://kubernetes.io/docs/concepts/storage/volumes/">Kubernetes Volume</a> annotations passed to the Kubernetes with <code>AnnotationName</code> as key having specified value, must conform with Kubernetes annotations format. For example,
<code>spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.annotation.foo=bar</code>.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.local.dirs.tmpfs</code></td>
<td><code>false</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,7 @@ private[spark] object Config extends Logging {
val KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY = "options.sizeLimit"
val KUBERNETES_VOLUMES_OPTIONS_SERVER_KEY = "options.server"
val KUBERNETES_VOLUMES_LABEL_KEY = "label."
val KUBERNETES_VOLUMES_ANNOTATION_KEY = "annotation."
val KUBERNETES_DRIVER_ENV_PREFIX = "spark.kubernetes.driverEnv."

val KUBERNETES_DNS_SUBDOMAIN_NAME_MAX_LENGTH = 253
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ private[spark] case class KubernetesPVCVolumeConf(
claimName: String,
storageClass: Option[String] = None,
size: Option[String] = None,
labels: Option[Map[String, String]] = None)
labels: Option[Map[String, String]] = None,
annotations: Option[Map[String, String]] = None)
extends KubernetesVolumeSpecificConf

private[spark] case class KubernetesEmptyDirVolumeConf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,19 @@ object KubernetesVolumeUtils {
val subPathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY"
val subPathExprKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_SUBPATHEXPR_KEY"
val labelKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_LABEL_KEY"
val annotationKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_ANNOTATION_KEY"
verifyMutuallyExclusiveOptionKeys(properties, subPathKey, subPathExprKey)

val volumeLabelsMap = properties
.filter(_._1.startsWith(labelKey))
.map {
case (k, v) => k.replaceAll(labelKey, "") -> v
}
val volumeAnnotationsMap = properties
.filter(_._1.startsWith(annotationKey))
.map {
case (k, v) => k.replaceAll(annotationKey, "") -> v
}

KubernetesVolumeSpec(
volumeName = volumeName,
Expand All @@ -62,7 +68,7 @@ object KubernetesVolumeUtils {
mountSubPathExpr = properties.getOrElse(subPathExprKey, ""),
mountReadOnly = properties.get(readOnlyKey).exists(_.toBoolean),
volumeConf = parseVolumeSpecificConf(properties,
volumeType, volumeName, Option(volumeLabelsMap)))
volumeType, volumeName, Option(volumeLabelsMap), Option(volumeAnnotationsMap)))
}.toSeq
}

Expand All @@ -86,7 +92,8 @@ object KubernetesVolumeUtils {
options: Map[String, String],
volumeType: String,
volumeName: String,
labels: Option[Map[String, String]]): KubernetesVolumeSpecificConf = {
labels: Option[Map[String, String]],
annotations: Option[Map[String, String]]): KubernetesVolumeSpecificConf = {
volumeType match {
case KUBERNETES_VOLUMES_HOSTPATH_TYPE =>
val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY"
Expand All @@ -107,7 +114,8 @@ object KubernetesVolumeUtils {
options(claimNameKey),
options.get(storageClassKey),
options.get(sizeLimitKey),
labels)
labels,
annotations)

case KUBERNETES_VOLUMES_EMPTYDIR_TYPE =>
val mediumKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
new VolumeBuilder()
.withHostPath(new HostPathVolumeSource(hostPath, volumeType))

case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size, labels) =>
case KubernetesPVCVolumeConf(claimNameTemplate, storageClass, size, labels, annotations) =>
val claimName = conf match {
case c: KubernetesExecutorConf =>
claimNameTemplate
Expand All @@ -91,12 +91,17 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf)
case Some(customLabelsMap) => (customLabelsMap ++ defaultVolumeLabels).asJava
case None => defaultVolumeLabels.asJava
}
val volumeAnnotations = annotations match {
case Some(value) => value.asJava
case None => Map[String, String]().asJava
}
additionalResources.append(new PersistentVolumeClaimBuilder()
.withKind(PVC)
.withApiVersion("v1")
.withNewMetadata()
.withName(claimName)
.addToLabels(volumeLabels)
.addToAnnotations(volumeAnnotations)
.endMetadata()
.withNewSpec()
.withStorageClassName(storageClass.get)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,17 +118,21 @@ object KubernetesTestConf {
KUBERNETES_VOLUMES_OPTIONS_PATH_KEY -> hostPath,
KUBERNETES_VOLUMES_OPTIONS_TYPE_KEY -> volumeType))

case KubernetesPVCVolumeConf(claimName, storageClass, sizeLimit, labels) =>
case KubernetesPVCVolumeConf(claimName, storageClass, sizeLimit, labels, annotations) =>
val sconf = storageClass
.map { s => (KUBERNETES_VOLUMES_OPTIONS_CLAIM_STORAGE_CLASS_KEY, s) }.toMap
val lconf = sizeLimit.map { l => (KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY, l) }.toMap
val llabels = labels match {
case Some(value) => value.map { case(k, v) => s"label.$k" -> v }
case None => Map()
}
val aannotations = annotations match {
case Some(value) => value.map { case (k, v) => s"annotation.$k" -> v }
case None => Map()
}
(KUBERNETES_VOLUMES_PVC_TYPE,
Map(KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY -> claimName) ++
sconf ++ lconf ++ llabels)
sconf ++ lconf ++ llabels ++ aannotations)

case KubernetesEmptyDirVolumeConf(medium, sizeLimit) =>
val mconf = medium.map { m => (KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY, m) }.toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountReadOnly)
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] ===
KubernetesPVCVolumeConf("claimName", labels = Some(Map())))
KubernetesPVCVolumeConf("claimName", labels = Some(Map()), annotations = Some(Map())))
}

test("SPARK-49598: Parses persistentVolumeClaim volumes correctly with labels") {
Expand All @@ -113,7 +113,8 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
assert(volumeSpec.mountReadOnly)
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] ===
KubernetesPVCVolumeConf(claimName = "claimName",
labels = Some(Map("env" -> "test", "foo" -> "bar"))))
labels = Some(Map("env" -> "test", "foo" -> "bar")),
annotations = Some(Map())))
}

test("SPARK-49598: Parses persistentVolumeClaim volumes & puts " +
Expand All @@ -128,7 +129,8 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountReadOnly)
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] ===
KubernetesPVCVolumeConf(claimName = "claimName", labels = Some(Map())))
KubernetesPVCVolumeConf(claimName = "claimName", labels = Some(Map()),
annotations = Some(Map())))
}

test("Parses emptyDir volumes correctly") {
Expand Down Expand Up @@ -280,4 +282,38 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
}.getMessage
assert(m.contains("smaller than 1KiB. Missing units?"))
}

test("SPARK-49833: Parses persistentVolumeClaim volumes correctly with annotations") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path")
sparkConf.set("test.persistentVolumeClaim.volumeName.mount.readOnly", "true")
sparkConf.set("test.persistentVolumeClaim.volumeName.options.claimName", "claimName")
sparkConf.set("test.persistentVolumeClaim.volumeName.annotation.key1", "value1")
sparkConf.set("test.persistentVolumeClaim.volumeName.annotation.key2", "value2")

val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head
assert(volumeSpec.volumeName === "volumeName")
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountReadOnly)
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] ===
KubernetesPVCVolumeConf(claimName = "claimName",
labels = Some(Map()),
annotations = Some(Map("key1" -> "value1", "key2" -> "value2"))))
}

test("SPARK-49833: Parses persistentVolumeClaim volumes & puts " +
"annotations as empty Map if not provided") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path")
sparkConf.set("test.persistentVolumeClaim.volumeName.mount.readOnly", "true")
sparkConf.set("test.persistentVolumeClaim.volumeName.options.claimName", "claimName")

val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head
assert(volumeSpec.volumeName === "volumeName")
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountReadOnly)
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] ===
KubernetesPVCVolumeConf(claimName = "claimName", labels = Some(Map()),
annotations = Some(Map())))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -496,4 +496,81 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
assert(mounts(1).getMountPath === "/tmp/bar")
assert(mounts(1).getSubPath === "bar")
}

test("SPARK-49833: Create and mounts persistentVolumeClaims in driver with annotations") {
val volumeConf = KubernetesVolumeSpec(
"testVolume",
"/tmp",
"",
"",
true,
KubernetesPVCVolumeConf(claimName = MountVolumesFeatureStep.PVC_ON_DEMAND,
storageClass = Some("gp3"),
size = Some("1Mi"),
annotations = Some(Map("env" -> "test")))
)

val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf))
val step = new MountVolumesFeatureStep(kubernetesConf)
val configuredPod = step.configurePod(SparkPod.initialPod())
assert(configuredPod.pod.getSpec.getVolumes.size() === 1)
val pvcClaim = configuredPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim
assert(pvcClaim.getClaimName.endsWith("-driver-pvc-0"))
}

test("SPARK-49833: Create and mounts persistentVolumeClaims in executors with annotations") {
val volumeConf = KubernetesVolumeSpec(
"testVolume",
"/tmp",
"",
"",
true,
KubernetesPVCVolumeConf(claimName = MountVolumesFeatureStep.PVC_ON_DEMAND,
storageClass = Some("gp3"),
size = Some("1Mi"),
annotations = Some(Map("env" -> "exec-test")))
)

val executorConf = KubernetesTestConf.createExecutorConf(volumes = Seq(volumeConf))
val executorStep = new MountVolumesFeatureStep(executorConf)
val executorPod = executorStep.configurePod(SparkPod.initialPod())

assert(executorPod.pod.getSpec.getVolumes.size() === 1)
val executorPVC = executorPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim
assert(executorPVC.getClaimName.endsWith("-exec-1-pvc-0"))
}

test("SPARK-49833: Mount multiple volumes to executor with annotations") {
val pvcVolumeConf1 = KubernetesVolumeSpec(
"checkpointVolume1",
"/checkpoints1",
"",
"",
true,
KubernetesPVCVolumeConf(claimName = "pvcClaim1",
storageClass = Some("gp3"),
size = Some("1Mi"),
annotations = Some(Map("env1" -> "exec-test-1")))
)

val pvcVolumeConf2 = KubernetesVolumeSpec(
"checkpointVolume2",
"/checkpoints2",
"",
"",
true,
KubernetesPVCVolumeConf(claimName = "pvcClaim2",
storageClass = Some("gp3"),
size = Some("1Mi"),
annotations = Some(Map("env2" -> "exec-test-2")))
)

val kubernetesConf = KubernetesTestConf.createExecutorConf(
volumes = Seq(pvcVolumeConf1, pvcVolumeConf2))
val step = new MountVolumesFeatureStep(kubernetesConf)
val configuredPod = step.configurePod(SparkPod.initialPod())

assert(configuredPod.pod.getSpec.getVolumes.size() === 2)
assert(configuredPod.container.getVolumeMounts.size() === 2)
}
}

0 comments on commit d68048b

Please sign in to comment.