aws IAM信息并判断权限是否开大

基于Python3写的抓取aws IAM信息并判断权限是否开大

抓取aws IAM信息并判断权限是否开大 需要调用到基础接口脚本里的数据库读写和boto3库

import boto3,json
from botocore.exceptions import ClientError
from BasicInterfaces import executeMysqlData
def list_all_iam_users():
    #列出所有 IAM 用户
    iam_client = boto3.client('iam')
    users = []
    paginator = iam_client.get_paginator('list_users')
    for response in paginator. paginate():
        for user in response['Users']:
            users.append(user['UserName'])
    return users
def get_user_attached_policies(user_name):
    #获取用户附加的所有策略
    iam_client = boto3.client('iam')
    policies = iam_client.list_attached_user_policies(UserName= user_name)['AttachedPolicies']
    return policies
def get_policy_document(policy_arn):
    #获取策略文档
    iam_client = boto3.client('iam')
    policy_details = iam_client.get_policy(PolicyArn= policy_arn)
    policy_version = iam_client.get_policy_version(PolicyArn= policy_arn,VersionId= policy_details[' Policy']['DefaultVersionId'])
    return policy_version['PolicyVersion'][' Document']
def validate_policy_with_access_analyzer(policy_document):
    #使用 Access Analyzer 验证策略
    accessanalyzer_client = boto3.client('accessanalyzer')
    try:
        response = accessanalyzer_client.validate_policy(
            policyType='IDENTITY_POLICY',
            policyDocument= json.dumps(policy _ document)
            )
            return response[' findings']
    except ClientError as e:
        print(f"[ERROR] Access Analyzer 验证失败: {e}")
        return []
def detect_overly_permissive(policy_document):
    #自定义检测策略中过于宽泛的权限
    findings = []
    for statement in policy_document['Statement']:
        if statement['Effect'] == 'Allow':
            # 检查是否存在 Action 为 * 或 iam:*
            if 'Action' in statement and ('*' in statement['Action'] or 'iam:*' in statement[' Action']):
                findings.append("found over operation privielges: {}".format(statement[' Action']))
                # 检查是否存在 Resource 为 *
            if 'Resource' in statement and '*' in statement['Resource']:
                findings.append("found over resource privielges: {}".format(statement[' Resource']))
    return findings
def analyze_user_policies(user_name):
    #分析单个用户的策略
    print(f"\n[INF0] 开始分析用户: {user_name}")
    policies = get_user_attached_policies(user_name)
    if not policies:
        print(f"[INFO] 用户 {user_name} 没有附加策略")
        return
    for policy in policies:
        print(f"分析策略: {policy['PolicyName']}")
        policy_document = get_policy_document(policy['PolicyArn'])
        findings = validate_policy_with_access_analyzer(policy_document)
        if findings:
            print(f"Access Analyzer 发现策略问题:")
            for finding in findings:
                print(f" - {finding['findingDetails']}")
                sql= f"""INSERT INTO gcpIamCheck (project, account, rules, action, recommend) VALUES ('aws',{"'" + user_name + "'"},{"'" + policy['PolicyName'+ "'"},'remove',{"'" + findings['findingDetails'] + "'"})"""
                executeMysqlData(1,' write', sql)
        print (f"自动分析策略完成,开始自定义扫描策略")
        manual_findings = detect_overly_permissive(policy_document)
        if manual_findings:
            for f in manual_findings:
                print(f"  !自定义检测发现:{f}")
                if '*' in f:
                    sql= f"INSERT INTO gcpIamCheck (project, account, rules, action, recommend) VALUES  ('aws',{"'" + user_name + "'"},{"'" + policy['PolicyName'] + "'"},'remove','resource_all')"
                else:
                    sql= f"INSERT INTO gcpIamCheck (project, account, rules, action, recommend) VALUES  ('aws',{"'" + user_name + "'"},{"'" + policy['PolicyName'] + "'"},'remove',{"'" + f + "'"))"
                executeMysqlData(1,' write', sql)
        else:
            print(f"  自定义扫描未发现宽泛权限")
def main():
    print("[INFO] 开始扫描所有 IAM 用户…\n")
    all_users = list_all_iam_users()
    print (f"发现用户{all_users}")
    if not all_users:
        print("[INFO] 当前账户没有 IAM 用户")
        return
    for user in all_users:
        analyze_user_policies(user)
main()