Skip to content

Commit

Permalink
pylint issues
Browse files Browse the repository at this point in the history
  • Loading branch information
ford-at-aws committed Dec 13, 2024
1 parent 5484ca7 commit 2027309
Showing 1 changed file with 162 additions and 96 deletions.
258 changes: 162 additions & 96 deletions .tools/test/stacks/nuke/typescript/nuke_config_update.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

"""
Python class responsible for updating the nuke generic config , based on exceptions to be filtered
and also updates dynamically the region attribute passed in from the StepFunctions invocation. This should be modified to suit your needs.
Python class responsible for updating the nuke generic config, based on exceptions to be filtered,
and also updates dynamically the region attribute passed in from the StepFunctions invocation.
This should be modified to suit your needs.
"""

import argparse
import copy
import logging
from typing import Any, Dict, List, Tuple

import boto3
import yaml
from boto3.exceptions import ClientError

GLOBAL_RESOURCE_EXCEPTIONS = [
{"property": "tag:DoNotNuke", "value": "True"},
Expand All @@ -19,19 +27,42 @@


class StackInfo:
def __init__(self, account, target_regions):
"""
Class responsible for managing StackInfo operations.
Attributes:
session (boto3.Session): AWS session object.
regions (List[str]): List of target regions.
resources (Dict[str, List[Dict[str, str]]]): Dictionary of resources and their exceptions.
config (Dict[str, Any]): Configuration dictionary.
account (str): AWS account ID.
"""

def __init__(self, account: str, target_regions: List[str]) -> None:
"""
Initialize StackInfo object.
Args:
account (str): AWS account ID.
target_regions (List[str]): List of target regions.
"""
self.session = boto3.Session(profile_name="nuke")
# Regions to be targeted set from the Stepfunctions/CodeBuild workflow
self.regions = target_regions
self.resources = {}
self.config = {}
self.resources: Dict[str, List[Dict[str, str]]] = {}
self.config: Dict[str, Any] = {}
self.account = account

def Populate(self):
self.UpdateCFNStackList()
self.OverrideDefaultConfig()
def populate(self) -> None:
"""
Populate resources and override the default configuration.
"""
self.update_cfn_stack_list()
self.override_default_config()

def UpdateCFNStackList(self):
def update_cfn_stack_list(self) -> None:
"""
Update the list of CloudFormation stacks and resources.
"""
try:
for region in self.regions:
cfn_client = self.session.client("cloudformation", region_name=region)
Expand All @@ -46,22 +77,28 @@ def UpdateCFNStackList(self):
]
)
for page in responses:
for stack in page.get("StackSummaries"):
self.GetCFNResources(stack, cfn_client)
self.BuildIamExclusionList(region)
except Exception as e:
print("Error in calling UpdateCFNStackList:\n {}".format(e))
for stack in page.get("StackSummaries", []):
self.get_cfn_resources(stack, cfn_client)
self.build_iam_exclusion_list(region)
except ClientError as e:
logging.error(f"Error in calling update_cfn_stack_list: {e}")

def get_cfn_resources(self, stack: Dict[str, Any], cfn_client) -> None:
"""
Get resources from a CloudFormation stack.
def GetCFNResources(self, stack, cfn_client):
Args:
stack (Dict[str, Any]): CloudFormation stack details.
cfn_client: CloudFormation client object.
"""
try:
stack_name = stack.get("StackName")
stack_name = stack.get("StackName") or stack.get("PhysicalResourceId")

if stack_name is None:
stack_name = stack.get("PhysicalResourceId")
return

stack_description = cfn_client.describe_stacks(StackName=stack_name)
print("Stack Description: ", stack_description)
tags = stack_description.get("Stacks")[0].get("Tags")
tags = stack_description.get("Stacks", [{}])[0].get("Tags", [])
for tag in tags:
key = tag.get("Key")
value = tag.get("Value")
Expand All @@ -75,11 +112,11 @@ def GetCFNResources(self, stack, cfn_client):
stack_resources = cfn_client.list_stack_resources(
StackName=stack_name
)
for resource in stack_resources.get("StackResourceSummaries"):
for resource in stack_resources.get("StackResourceSummaries", []):
if resource.get("ResourceType") == "AWS::CloudFormation::Stack":
self.GetCFNResources(resource, cfn_client)
self.get_cfn_resources(resource, cfn_client)
else:
nuke_type = self.UpdateResourceName(
nuke_type = self.update_resource_name(
resource["ResourceType"]
)
if nuke_type in self.resources:
Expand All @@ -96,99 +133,128 @@ def GetCFNResources(self, stack, cfn_client):
"value": resource["PhysicalResourceId"],
}
]
except Exception as e:
print("Error calling GetCFNResources:\n {}".format(e))
except ClientError as e:
logging.error(f"Error calling get_cfn_resources: {e}")

def UpdateResourceName(self, resource):
nuke_type = str.replace(resource, "AWS::", "")
nuke_type = str.replace(nuke_type, "::", "")
nuke_type = str.replace(nuke_type, "Config", "ConfigService", 1)
def update_resource_name(self, resource: str) -> str:
"""
Update the resource name to match the nuke resource type.
Args:
resource (str): Resource name.
Returns:
str: Updated resource name.
"""
nuke_type = resource.replace("AWS::", "")
nuke_type = nuke_type.replace("::", "")
nuke_type = nuke_type.replace("Config", "ConfigService", 1)
return nuke_type

def BuildIamExclusionList(self, region):
# This excludes and appends to the config IAMRole resources , the roles that are federated principals
# You can add any other custom filterting logic based on regions for IAM/Global roles that should be excluded
iam_client = self.session.client("iam", region_name=region)
iam_paginator = iam_client.get_paginator("list_roles")
responses = iam_paginator.paginate()
for page in responses:
for role in page["Roles"]:
apd = role.get("AssumeRolePolicyDocument")
if apd is not None:
for item in apd.get("Statement"):
if item is not None:
for principal in item.get("Principal"):
if principal == "Federated":
if "IAMRole" in self.resources:
self.resources["IAMRole"].append(
role.get("RoleName")
)
else:
self.resources["IAMRole"] = [
role.get("RoleName")
]

def OverrideDefaultConfig(self):
# Open the nuke_generic_config.yaml and merge the captured resources/exclusions with it
def build_iam_exclusion_list(self, region: str) -> None:
"""
Build the IAM role exclusion list for the given region.
Args:
region (str): AWS region.
"""
try:
iam_client = self.session.client("iam", region_name=region)
iam_paginator = iam_client.get_paginator("list_roles")
responses = iam_paginator.paginate()
for page in responses:
for role in page.get("Roles", []):
assume_role_policy_document = role.get("AssumeRolePolicyDocument")
if assume_role_policy_document:
for statement in assume_role_policy_document.get(
"Statement", []
):
if statement.get("Principal", {}).get("Federated"):
if "IAMRole" in self.resources:
self.resources["IAMRole"].append(
role.get("RoleName")
)
else:
self.resources["IAMRole"] = [role.get("RoleName")]
except ClientError as e:
logging.error(f"Error building IAM exclusion list: {e}")

def override_default_config(self) -> None:
"""
Override the default configuration with captured resources and exclusions.
"""
try:
with open(r"nuke_generic_config.yaml") as config_file:
self.config = yaml.load(config_file)
with open("nuke_generic_config.yaml") as config_file:
self.config = yaml.safe_load(config_file)

# Not all resources handled by the tool, but we will add them to the exclusion anyhow.
for resource in self.resources:
if resource in self.config["accounts"]["ACCOUNT"]["filters"]:
self.config["accounts"]["ACCOUNT"]["filters"][resource].extend(
self.resources[resource]
)
for resource, exceptions in self.resources.items():
account_filters = self.config["accounts"]["ACCOUNT"]["filters"]
if resource in account_filters:
account_filters[resource].extend(exceptions)
else:
self.config["accounts"]["ACCOUNT"]["filters"][
resource
] = self.resources[resource]
account_filters[resource] = exceptions

self.config["accounts"][self.account] = copy.deepcopy(
self.config["accounts"]["ACCOUNT"]
)
if "ACCOUNT" in self.config["accounts"]:
self.config["accounts"].pop("ACCOUNT", None)
self.config["accounts"].pop("ACCOUNT", None)

# Global exclusions apply to every type of resource
for resource in self.config["accounts"][self.account]["filters"]:
for resource, exceptions in self.config["accounts"][self.account][
"filters"
].items():
for exception in GLOBAL_RESOURCE_EXCEPTIONS:
self.config["accounts"][self.account]["filters"][resource].append(
exception.copy()
)
config_file.close()
except Exception as e:
print("Failed merging nuke-config-test.yaml with error {}".format(e))
exit(1)
exceptions.append(exception.copy())
except ClientError as e:
logging.error(f"Failed merging nuke-config-test.yaml with error {e}")

def WriteConfig(self):
# CodeBuild script updates the target region in the generic config and is validated here.
def write_config(self) -> None:
"""
Write the configuration to separate files for each target region.
"""
try:
for region in self.config["regions"]:
local_config = stackInfo.config.copy()
local_config = self.config.copy()
local_config["regions"] = [region]
filename = "nuke_config_{}.yaml".format(region)
filename = f"nuke_config_{region}.yaml"
with open(filename, "w") as output_file:
output = yaml.dump(local_config, output_file)
output_file.close()
except Exception as e:
print("Failed opening nuke_config.yaml for writing with error {}".format(e))
yaml.safe_dump(local_config, output_file)
logging.info(f"Successfully wrote config to {filename}")
except KeyError:
logging.error("No 'regions' key found in the config dictionary")
except ClientError as e:
logging.error(f"An unexpected error occurred: {e}")


try:
def parse_arguments() -> Tuple[str, str]:
"""
Parse command-line arguments.
Returns:
Tuple[str, str]: AWS account ID and target region.
"""
parser = argparse.ArgumentParser()
parser.add_argument(
"--account", dest="account", help="Account to nuke"
) # Account and Region from StepFunctions - CodeBuild overridden params
parser.add_argument("--region", dest="region", help="Region to target for nuke")
"--account", dest="account", help="Account to nuke", required=True
)
parser.add_argument(
"--region", dest="region", help="Region to target for nuke", required=True
)
args = parser.parse_args()
if not args.account or not args.region:
parser.print_help()
exit(1)
except Exception as e:
print(e)
exit(1)
return args.account, args.region


def main() -> None:
"""
Main entry point of the script.
"""
account, region = parse_arguments()
stack_info = StackInfo(account, [region])
stack_info.populate()
stack_info.write_config()


if __name__ == "__main__":
print("Incoming Args: ", args)
stackInfo = StackInfo(args.account, [args.region])
stackInfo.Populate()
stackInfo.WriteConfig()
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
main()

0 comments on commit 2027309

Please sign in to comment.