diff --git a/.tools/test/stacks/nuke/typescript/nuke_config_update.py b/.tools/test/stacks/nuke/typescript/nuke_config_update.py index 6faf6f791f8..6a338accb22 100644 --- a/.tools/test/stacks/nuke/typescript/nuke_config_update.py +++ b/.tools/test/stacks/nuke/typescript/nuke_config_update.py @@ -1,12 +1,20 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + """ - Python class responsible for updating the nuke generic config , based on exceptions to be filtered - and also updates dynamically the region attribute passed in from the StepFunctions invocation. This should be modified to suit your needs. +Python class responsible for updating the nuke generic config, based on exceptions to be filtered, +and also updates dynamically the region attribute passed in from the StepFunctions invocation. +This should be modified to suit your needs. """ + import argparse import copy +import logging +from typing import Any, Dict, List, Tuple import boto3 import yaml +from boto3.exceptions import ClientError GLOBAL_RESOURCE_EXCEPTIONS = [ {"property": "tag:DoNotNuke", "value": "True"}, @@ -19,19 +27,42 @@ class StackInfo: - def __init__(self, account, target_regions): + """ + Class responsible for managing StackInfo operations. + + Attributes: + session (boto3.Session): AWS session object. + regions (List[str]): List of target regions. + resources (Dict[str, List[Dict[str, str]]]): Dictionary of resources and their exceptions. + config (Dict[str, Any]): Configuration dictionary. + account (str): AWS account ID. + """ + + def __init__(self, account: str, target_regions: List[str]) -> None: + """ + Initialize StackInfo object. + + Args: + account (str): AWS account ID. + target_regions (List[str]): List of target regions. + """ self.session = boto3.Session(profile_name="nuke") - # Regions to be targeted set from the Stepfunctions/CodeBuild workflow self.regions = target_regions - self.resources = {} - self.config = {} + self.resources: Dict[str, List[Dict[str, str]]] = {} + self.config: Dict[str, Any] = {} self.account = account - def Populate(self): - self.UpdateCFNStackList() - self.OverrideDefaultConfig() + def populate(self) -> None: + """ + Populate resources and override the default configuration. + """ + self.update_cfn_stack_list() + self.override_default_config() - def UpdateCFNStackList(self): + def update_cfn_stack_list(self) -> None: + """ + Update the list of CloudFormation stacks and resources. + """ try: for region in self.regions: cfn_client = self.session.client("cloudformation", region_name=region) @@ -46,22 +77,28 @@ def UpdateCFNStackList(self): ] ) for page in responses: - for stack in page.get("StackSummaries"): - self.GetCFNResources(stack, cfn_client) - self.BuildIamExclusionList(region) - except Exception as e: - print("Error in calling UpdateCFNStackList:\n {}".format(e)) + for stack in page.get("StackSummaries", []): + self.get_cfn_resources(stack, cfn_client) + self.build_iam_exclusion_list(region) + except ClientError as e: + logging.error(f"Error in calling update_cfn_stack_list: {e}") + + def get_cfn_resources(self, stack: Dict[str, Any], cfn_client) -> None: + """ + Get resources from a CloudFormation stack. - def GetCFNResources(self, stack, cfn_client): + Args: + stack (Dict[str, Any]): CloudFormation stack details. + cfn_client: CloudFormation client object. + """ try: - stack_name = stack.get("StackName") + stack_name = stack.get("StackName") or stack.get("PhysicalResourceId") if stack_name is None: - stack_name = stack.get("PhysicalResourceId") + return stack_description = cfn_client.describe_stacks(StackName=stack_name) - print("Stack Description: ", stack_description) - tags = stack_description.get("Stacks")[0].get("Tags") + tags = stack_description.get("Stacks", [{}])[0].get("Tags", []) for tag in tags: key = tag.get("Key") value = tag.get("Value") @@ -75,11 +112,11 @@ def GetCFNResources(self, stack, cfn_client): stack_resources = cfn_client.list_stack_resources( StackName=stack_name ) - for resource in stack_resources.get("StackResourceSummaries"): + for resource in stack_resources.get("StackResourceSummaries", []): if resource.get("ResourceType") == "AWS::CloudFormation::Stack": - self.GetCFNResources(resource, cfn_client) + self.get_cfn_resources(resource, cfn_client) else: - nuke_type = self.UpdateResourceName( + nuke_type = self.update_resource_name( resource["ResourceType"] ) if nuke_type in self.resources: @@ -96,99 +133,128 @@ def GetCFNResources(self, stack, cfn_client): "value": resource["PhysicalResourceId"], } ] - except Exception as e: - print("Error calling GetCFNResources:\n {}".format(e)) + except ClientError as e: + logging.error(f"Error calling get_cfn_resources: {e}") - def UpdateResourceName(self, resource): - nuke_type = str.replace(resource, "AWS::", "") - nuke_type = str.replace(nuke_type, "::", "") - nuke_type = str.replace(nuke_type, "Config", "ConfigService", 1) + def update_resource_name(self, resource: str) -> str: + """ + Update the resource name to match the nuke resource type. + + Args: + resource (str): Resource name. + + Returns: + str: Updated resource name. + """ + nuke_type = resource.replace("AWS::", "") + nuke_type = nuke_type.replace("::", "") + nuke_type = nuke_type.replace("Config", "ConfigService", 1) return nuke_type - def BuildIamExclusionList(self, region): - # This excludes and appends to the config IAMRole resources , the roles that are federated principals - # You can add any other custom filterting logic based on regions for IAM/Global roles that should be excluded - iam_client = self.session.client("iam", region_name=region) - iam_paginator = iam_client.get_paginator("list_roles") - responses = iam_paginator.paginate() - for page in responses: - for role in page["Roles"]: - apd = role.get("AssumeRolePolicyDocument") - if apd is not None: - for item in apd.get("Statement"): - if item is not None: - for principal in item.get("Principal"): - if principal == "Federated": - if "IAMRole" in self.resources: - self.resources["IAMRole"].append( - role.get("RoleName") - ) - else: - self.resources["IAMRole"] = [ - role.get("RoleName") - ] - - def OverrideDefaultConfig(self): - # Open the nuke_generic_config.yaml and merge the captured resources/exclusions with it + def build_iam_exclusion_list(self, region: str) -> None: + """ + Build the IAM role exclusion list for the given region. + + Args: + region (str): AWS region. + """ + try: + iam_client = self.session.client("iam", region_name=region) + iam_paginator = iam_client.get_paginator("list_roles") + responses = iam_paginator.paginate() + for page in responses: + for role in page.get("Roles", []): + assume_role_policy_document = role.get("AssumeRolePolicyDocument") + if assume_role_policy_document: + for statement in assume_role_policy_document.get( + "Statement", [] + ): + if statement.get("Principal", {}).get("Federated"): + if "IAMRole" in self.resources: + self.resources["IAMRole"].append( + role.get("RoleName") + ) + else: + self.resources["IAMRole"] = [role.get("RoleName")] + except ClientError as e: + logging.error(f"Error building IAM exclusion list: {e}") + + def override_default_config(self) -> None: + """ + Override the default configuration with captured resources and exclusions. + """ try: - with open(r"nuke_generic_config.yaml") as config_file: - self.config = yaml.load(config_file) + with open("nuke_generic_config.yaml") as config_file: + self.config = yaml.safe_load(config_file) + # Not all resources handled by the tool, but we will add them to the exclusion anyhow. - for resource in self.resources: - if resource in self.config["accounts"]["ACCOUNT"]["filters"]: - self.config["accounts"]["ACCOUNT"]["filters"][resource].extend( - self.resources[resource] - ) + for resource, exceptions in self.resources.items(): + account_filters = self.config["accounts"]["ACCOUNT"]["filters"] + if resource in account_filters: + account_filters[resource].extend(exceptions) else: - self.config["accounts"]["ACCOUNT"]["filters"][ - resource - ] = self.resources[resource] + account_filters[resource] = exceptions + self.config["accounts"][self.account] = copy.deepcopy( self.config["accounts"]["ACCOUNT"] ) - if "ACCOUNT" in self.config["accounts"]: - self.config["accounts"].pop("ACCOUNT", None) + self.config["accounts"].pop("ACCOUNT", None) + # Global exclusions apply to every type of resource - for resource in self.config["accounts"][self.account]["filters"]: + for resource, exceptions in self.config["accounts"][self.account][ + "filters" + ].items(): for exception in GLOBAL_RESOURCE_EXCEPTIONS: - self.config["accounts"][self.account]["filters"][resource].append( - exception.copy() - ) - config_file.close() - except Exception as e: - print("Failed merging nuke-config-test.yaml with error {}".format(e)) - exit(1) + exceptions.append(exception.copy()) + except ClientError as e: + logging.error(f"Failed merging nuke-config-test.yaml with error {e}") - def WriteConfig(self): - # CodeBuild script updates the target region in the generic config and is validated here. + def write_config(self) -> None: + """ + Write the configuration to separate files for each target region. + """ try: for region in self.config["regions"]: - local_config = stackInfo.config.copy() + local_config = self.config.copy() local_config["regions"] = [region] - filename = "nuke_config_{}.yaml".format(region) + filename = f"nuke_config_{region}.yaml" with open(filename, "w") as output_file: - output = yaml.dump(local_config, output_file) - output_file.close() - except Exception as e: - print("Failed opening nuke_config.yaml for writing with error {}".format(e)) + yaml.safe_dump(local_config, output_file) + logging.info(f"Successfully wrote config to {filename}") + except KeyError: + logging.error("No 'regions' key found in the config dictionary") + except ClientError as e: + logging.error(f"An unexpected error occurred: {e}") -try: +def parse_arguments() -> Tuple[str, str]: + """ + Parse command-line arguments. + + Returns: + Tuple[str, str]: AWS account ID and target region. + """ parser = argparse.ArgumentParser() parser.add_argument( - "--account", dest="account", help="Account to nuke" - ) # Account and Region from StepFunctions - CodeBuild overridden params - parser.add_argument("--region", dest="region", help="Region to target for nuke") + "--account", dest="account", help="Account to nuke", required=True + ) + parser.add_argument( + "--region", dest="region", help="Region to target for nuke", required=True + ) args = parser.parse_args() - if not args.account or not args.region: - parser.print_help() - exit(1) -except Exception as e: - print(e) - exit(1) + return args.account, args.region + + +def main() -> None: + """ + Main entry point of the script. + """ + account, region = parse_arguments() + stack_info = StackInfo(account, [region]) + stack_info.populate() + stack_info.write_config() + if __name__ == "__main__": - print("Incoming Args: ", args) - stackInfo = StackInfo(args.account, [args.region]) - stackInfo.Populate() - stackInfo.WriteConfig() + logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") + main()