Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BV-599] Kubeconfig helper #281

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
100 changes: 90 additions & 10 deletions leverage/containers/kubectl.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,36 @@
import os
from dataclasses import dataclass
from enum import Enum
from pathlib import Path

from click.exceptions import Exit
from docker.types import Mount
import ruamel.yaml
import simple_term_menu

from leverage import logger
from leverage._utils import AwsCredsEntryPoint, ExitError
from leverage._utils import AwsCredsEntryPoint, ExitError, CustomEntryPoint
from leverage.container import TerraformContainer


@dataclass
class ClusterInfo:
cluster_name: str
profile: str
region: str


class MetadataTypes(Enum):
K8S_CLUSTER = "k8s-eks-cluster"


class KubeCtlContainer(TerraformContainer):
"""Container specifically tailored to run kubectl commands."""

KUBECTL_CLI_BINARY = "/usr/local/bin/kubectl"
KUBECTL_CONFIG_PATH = Path(f"/home/{TerraformContainer.CONTAINER_USER}/.kube")
KUBECTL_CONFIG_FILE = KUBECTL_CONFIG_PATH / Path("config")
METADATA_FILENAME = "metadata.yaml"

def __init__(self, client):
super().__init__(client)
Expand All @@ -38,18 +55,23 @@ def start_shell(self):
with AwsCredsEntryPoint(self, override_entrypoint=""):
self._start(self.SHELL)

def configure(self):
# make sure we are on the cluster layer
self.paths.check_for_cluster_layer()

logger.info("Retrieving k8s cluster information...")
# generate the command that will configure the new cluster
with AwsCredsEntryPoint(self, override_entrypoint=""):
add_eks_cluster_cmd = self._get_eks_kube_config()
def configure(self, ci: ClusterInfo = None):
"""
Add the given EKS cluster configuration to the .kube/ files.
"""
if ci:
# if you have the details, generate the command right away
cmd = f"aws eks update-kubeconfig --region {ci.region} --name {ci.cluster_name} --profile {ci.profile}"
else:
# otherwise go get them from the layer
logger.info("Retrieving k8s cluster information...")
with CustomEntryPoint(self, entrypoint=""):
cmd = self._get_eks_kube_config()

logger.info("Configuring context...")
with AwsCredsEntryPoint(self, override_entrypoint=""):
exit_code = self._start(add_eks_cluster_cmd)
exit_code = self._start(cmd)

Comment on lines +58 to +74
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance AWS command construction robustness.

The AWS command construction could be more robust by:

  1. Sanitizing input parameters to prevent command injection
  2. Validating region format
  3. Adding error handling for AWS CLI command execution

Apply this change:

+    def _validate_cluster_info(self, ci: ClusterInfo) -> bool:
+        """Validate cluster information parameters."""
+        if not ci.region.strip().isalnum():
+            logger.error("Invalid region format")
+            return False
+        if not ci.cluster_name.strip().isalnum():
+            logger.error("Invalid cluster name format")
+            return False
+        return True

     def configure(self, ci: ClusterInfo = None):
         if ci:
+            if not self._validate_cluster_info(ci):
+                raise ExitError(1, "Invalid cluster configuration")
+            # Sanitize inputs and use shlex.quote if needed
             cmd = f"aws eks update-kubeconfig --region {ci.region} --name {ci.cluster_name} --profile {ci.profile}"

Committable suggestion skipped: line range outside the PR's diff.

if exit_code:
raise Exit(exit_code)

Expand All @@ -62,3 +84,61 @@ def _get_eks_kube_config(self) -> str:

aws_eks_cmd = next(op for op in output.split("\r\n") if op.startswith("aws eks update-kubeconfig"))
return aws_eks_cmd + f" --region {self.region}"

def _scan_clusters(self):
"""
Scan all the subdirectories in search of "cluster" metadata files.
"""
for root, dirs, files in os.walk(self.paths.cwd):
# exclude hidden directories
dirs[:] = [d for d in dirs if d[0] != "."]

for file in files:
if file != self.METADATA_FILENAME:
continue

cluster_file = Path(root) / file
try:
with open(cluster_file) as cluster_yaml_file:
data = ruamel.yaml.safe_load(cluster_yaml_file)
if data.get("type") != MetadataTypes.K8S_CLUSTER.value:
continue
except Exception as exc:
logger.warning(exc)
continue
else:
yield Path(root), data

Comment on lines +88 to +111
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance error handling in _scan_clusters method.

The generic Exception catch block could mask specific issues. Consider handling specific exceptions and providing more detailed error messages.

-                try:
+                try:
                     with open(cluster_file) as cluster_yaml_file:
                         data = ruamel.yaml.safe_load(cluster_yaml_file)
                     if data.get("type") != MetadataTypes.K8S_CLUSTER.value:
                         continue
-                except Exception as exc:
+                except (OSError, IOError) as exc:
+                    logger.warning(f"Failed to read cluster file {cluster_file}: {exc}")
+                    continue
+                except ruamel.yaml.YAMLError as exc:
+                    logger.warning(f"Invalid YAML in cluster file {cluster_file}: {exc}")
                     continue
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _scan_clusters(self):
"""
Scan all the subdirectories in search of "cluster" metadata files.
"""
for root, dirs, files in os.walk(self.paths.cwd):
# exclude hidden directories
dirs[:] = [d for d in dirs if d[0] != "."]
for file in files:
if file != self.METADATA_FILENAME:
continue
cluster_file = Path(root) / file
try:
with open(cluster_file) as cluster_yaml_file:
data = ruamel.yaml.safe_load(cluster_yaml_file)
if data.get("type") != MetadataTypes.K8S_CLUSTER.value:
continue
except Exception as exc:
logger.warning(exc)
continue
else:
yield Path(root), data
def _scan_clusters(self):
"""
Scan all the subdirectories in search of "cluster" metadata files.
"""
for root, dirs, files in os.walk(self.paths.cwd):
# exclude hidden directories
dirs[:] = [d for d in dirs if d[0] != "."]
for file in files:
if file != self.METADATA_FILENAME:
continue
cluster_file = Path(root) / file
try:
with open(cluster_file) as cluster_yaml_file:
data = ruamel.yaml.safe_load(cluster_yaml_file)
if data.get("type") != MetadataTypes.K8S_CLUSTER.value:
continue
except (OSError, IOError) as exc:
logger.warning(f"Failed to read cluster file {cluster_file}: {exc}")
continue
except ruamel.yaml.YAMLError as exc:
logger.warning(f"Invalid YAML in cluster file {cluster_file}: {exc}")
continue
else:
yield Path(root), data

def discover(self):
"""
Do a scan down the tree of subdirectories looking for k8s clusters metadata files.
Open up a menu with all the found items, where you can pick up and configure it on your .kubeconfig file.
"""
cluster_files = [(path, data) for path, data in self._scan_clusters()]
if not cluster_files:
raise ExitError(1, "No clusters found.")

terminal_menu = simple_term_menu.TerminalMenu(
[f"{c[1]['data']['cluster_name']}: {str(c[0])}" for c in cluster_files], title="Clusters found:"
)
Comment on lines +121 to +123
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Handle potential KeyError exceptions when accessing cluster data

When generating the menu options, accessing nested dictionary keys like c[1]['data']['cluster_name'] without checking if they exist may lead to a KeyError if the expected keys are missing. Consider adding checks or using the get method with default values to prevent the application from crashing due to unexpected data.

Apply this change:

-terminal_menu = simple_term_menu.TerminalMenu(
-    [f"{c[1]['data']['cluster_name']}: {str(c[0])}" for c in cluster_files], title="Clusters found:"
+terminal_menu = simple_term_menu.TerminalMenu(
+    [f"{c[1]['data'].get('cluster_name', 'Unknown')}: {str(c[0])}" for c in cluster_files], title="Clusters found:"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
terminal_menu = simple_term_menu.TerminalMenu(
[f"{c[1]['data']['cluster_name']}: {str(c[0])}" for c in cluster_files], title="Clusters found:"
)
terminal_menu = simple_term_menu.TerminalMenu(
[f"{c[1]['data'].get('cluster_name', 'Unknown')}: {str(c[0])}" for c in cluster_files], title="Clusters found:"
)

menu_entry_index = terminal_menu.show()
if menu_entry_index is None:
# selection cancelled
return

layer_path = cluster_files[menu_entry_index][0]
cluster_data = cluster_files[menu_entry_index][1]
cluster_info = ClusterInfo(
cluster_name=cluster_data["data"]["cluster_name"],
profile=cluster_data["data"]["profile"],
region=cluster_data["data"]["region"],
)
Comment on lines +131 to +135
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Validate cluster data before creating ClusterInfo instance

Before creating a ClusterInfo instance, ensure that the necessary keys (cluster_name, profile, and region) are present in cluster_data["data"]. Missing keys could result in a KeyError. Adding validation will make the code more robust.

Apply this change:

+required_keys = {'cluster_name', 'profile', 'region'}
+if not required_keys.issubset(cluster_data["data"]):
+    logger.error("Cluster data is missing required information.")
+    return

Insert the above code before creating the ClusterInfo instance.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
cluster_info = ClusterInfo(
cluster_name=cluster_data["data"]["cluster_name"],
profile=cluster_data["data"]["profile"],
region=cluster_data["data"]["region"],
)
required_keys = {'cluster_name', 'profile', 'region'}
if not required_keys.issubset(cluster_data["data"]):
logger.error("Cluster data is missing required information.")
return
cluster_info = ClusterInfo(
cluster_name=cluster_data["data"]["cluster_name"],
profile=cluster_data["data"]["profile"],
region=cluster_data["data"]["region"],
)


# cluster is the host path, so in order to be able to run commands in that layer
# we need to convert it into a relative inside the container
self.container_config["working_dir"] = (
self.paths.guest_base_path / layer_path.relative_to(self.paths.cwd)
).as_posix()
# now simulate we are standing on the chosen layer folder
self.paths.update_cwd(layer_path)
self.configure(cluster_info)
2 changes: 1 addition & 1 deletion leverage/modules/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def get_profiles(cli):
# these are files from the layer we are currently on
for name in ("config.tf", "locals.tf"):
try:
with open(name) as tf_file:
with open(cli.paths.cwd / name) as tf_file:
tf_config = hcl2.load(tf_file)
except FileNotFoundError:
continue
Expand Down
9 changes: 8 additions & 1 deletion leverage/modules/kubectl.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,26 @@ def kubectl(context, state, args):
"""Run Kubectl commands in a custom containerized environment."""
state.container = KubeCtlContainer(get_docker_client())
state.container.ensure_image()
state.container.paths.check_for_layer_location()
_handle_subcommand(context=context, cli_container=state.container, args=args)


@kubectl.command(context_settings=CONTEXT_SETTINGS)
@pass_container
def shell(kctl: KubeCtlContainer):
"""Spawn a shell with the kubectl credentials pre-configured."""
kctl.paths.check_for_layer_location()
kctl.start_shell()


@kubectl.command(context_settings=CONTEXT_SETTINGS)
@pass_container
def configure(kctl: KubeCtlContainer):
"""Automatically add the EKS cluster from the layer into your kubectl config file."""
kctl.paths.check_for_cluster_layer()
kctl.configure()


@kubectl.command(context_settings=CONTEXT_SETTINGS)
@pass_container
def discover(kctl: KubeCtlContainer):
kctl.discover()
27 changes: 19 additions & 8 deletions leverage/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Utilities to obtain relevant files' and directories' locations
"""
import os
import pathlib
from pathlib import Path
from subprocess import CalledProcessError
from subprocess import PIPE
Expand Down Expand Up @@ -169,6 +170,14 @@ def __init__(self, env_conf: dict, container_user: str):
self.host_aws_credentials_dir.mkdir(parents=True)
self.sso_cache = self.host_aws_credentials_dir / "sso" / "cache"

def update_cwd(self, new_cwd):
self.cwd = new_cwd
acc_folder = new_cwd.relative_to(self.root_dir).parts[0]

self.account_config_dir = self.root_dir / acc_folder / "config"
account_config_path = self.account_config_dir / self.ACCOUNT_TF_VARS
self.account_conf = hcl2.loads(account_config_path.read_text())

Comment on lines +173 to +180
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add validation and error handling to update_cwd

The method needs additional safeguards:

  1. Validate that new_cwd exists and is within the project
  2. Handle ValueError from relative_to() if new_cwd is not under root_dir
  3. Verify account config file exists
  4. Handle potential HCL parsing errors

Consider this safer implementation:

 def update_cwd(self, new_cwd):
+    if not new_cwd.exists():
+        raise ExitError(1, f"Directory {new_cwd} does not exist")
+    try:
+        relative_path = new_cwd.relative_to(self.root_dir)
+    except ValueError:
+        raise ExitError(1, f"Directory {new_cwd} is not within the project")
+
     self.cwd = new_cwd
-    acc_folder = new_cwd.relative_to(self.root_dir).parts[0]
+    acc_folder = relative_path.parts[0]
 
     self.account_config_dir = self.root_dir / acc_folder / "config"
     account_config_path = self.account_config_dir / self.ACCOUNT_TF_VARS
-    self.account_conf = hcl2.loads(account_config_path.read_text())
+    if not account_config_path.exists():
+        raise ExitError(1, f"Account config file not found: {account_config_path}")
+    try:
+        self.account_conf = hcl2.loads(account_config_path.read_text())
+    except Exception as e:
+        raise ExitError(1, f"Failed to parse account config: {e}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def update_cwd(self, new_cwd):
self.cwd = new_cwd
acc_folder = new_cwd.relative_to(self.root_dir).parts[0]
self.account_config_dir = self.root_dir / acc_folder / "config"
account_config_path = self.account_config_dir / self.ACCOUNT_TF_VARS
self.account_conf = hcl2.loads(account_config_path.read_text())
def update_cwd(self, new_cwd):
if not new_cwd.exists():
raise ExitError(1, f"Directory {new_cwd} does not exist")
try:
relative_path = new_cwd.relative_to(self.root_dir)
except ValueError:
raise ExitError(1, f"Directory {new_cwd} is not within the project")
self.cwd = new_cwd
acc_folder = relative_path.parts[0]
self.account_config_dir = self.root_dir / acc_folder / "config"
account_config_path = self.account_config_dir / self.ACCOUNT_TF_VARS
if not account_config_path.exists():
raise ExitError(1, f"Account config file not found: {account_config_path}")
try:
self.account_conf = hcl2.loads(account_config_path.read_text())
except Exception as e:
raise ExitError(1, f"Failed to parse account config: {e}")

@property
def guest_account_base_path(self):
return f"{self.guest_base_path}/{self.account_dir.relative_to(self.root_dir).as_posix()}"
Expand Down Expand Up @@ -257,25 +266,27 @@ def guest_config_file(self, file):
def tf_cache_dir(self):
return os.getenv("TF_PLUGIN_CACHE_DIR")

def check_for_layer_location(self):
"""Make sure the command is being ran at layer level. If not, bail."""
if self.cwd in (self.common_config_dir, self.account_config_dir):
def check_for_layer_location(self, path: Path = None):
"""Make sure the command is being run at layer level. If not, bail."""
path = path or self.cwd
if path in (self.common_config_dir, self.account_config_dir):
raise ExitError(1, "Currently in a configuration directory, no Terraform command can be run here.")

if self.cwd in (self.root_dir, self.account_dir):
if path in (self.root_dir, self.account_dir):
raise ExitError(
1,
"Terraform commands cannot run neither in the root of the project or in"
" the root directory of an account.",
)

if not list(self.cwd.glob("*.tf")):
if not list(path.glob("*.tf")):
raise ExitError(1, "This command can only run at [bold]layer[/bold] level.")

def check_for_cluster_layer(self):
self.check_for_layer_location()
def check_for_cluster_layer(self, path: Path = None):
path = path or self.cwd
self.check_for_layer_location(path)
# assuming the "cluster" layer will contain the expected EKS outputs
if self.cwd.parts[-1] != "cluster":
if path.parts[-1] != "cluster":
raise ExitError(1, "This command can only run at the [bold]cluster layer[/bold].")


Expand Down
Loading
Loading