aws IAM信息并判断权限是否开大
抓取aws IAM信息并判断权限是否开大 需要调用到基础接口脚本里的数据库读写和boto3库
import boto3,json
from botocore.exceptions import ClientError
from BasicInterfaces import executeMysqlData
def list_all_iam_users():
#列出所有 IAM 用户
iam_client = boto3.client('iam')
users = []
paginator = iam_client.get_paginator('list_users')
for response in paginator. paginate():
for user in response['Users']:
users.append(user['UserName'])
return users
def get_user_attached_policies(user_name):
#获取用户附加的所有策略
iam_client = boto3.client('iam')
policies = iam_client.list_attached_user_policies(UserName= user_name)['AttachedPolicies']
return policies
def get_policy_document(policy_arn):
#获取策略文档
iam_client = boto3.client('iam')
policy_details = iam_client.get_policy(PolicyArn= policy_arn)
policy_version = iam_client.get_policy_version(PolicyArn= policy_arn,VersionId= policy_details[' Policy']['DefaultVersionId'])
return policy_version['PolicyVersion'][' Document']
def validate_policy_with_access_analyzer(policy_document):
#使用 Access Analyzer 验证策略
accessanalyzer_client = boto3.client('accessanalyzer')
try:
response = accessanalyzer_client.validate_policy(
policyType='IDENTITY_POLICY',
policyDocument= json.dumps(policy _ document)
)
return response[' findings']
except ClientError as e:
print(f"[ERROR] Access Analyzer 验证失败: {e}")
return []
def detect_overly_permissive(policy_document):
#自定义检测策略中过于宽泛的权限
findings = []
for statement in policy_document['Statement']:
if statement['Effect'] == 'Allow':
# 检查是否存在 Action 为 * 或 iam:*
if 'Action' in statement and ('*' in statement['Action'] or 'iam:*' in statement[' Action']):
findings.append("found over operation privielges: {}".format(statement[' Action']))
# 检查是否存在 Resource 为 *
if 'Resource' in statement and '*' in statement['Resource']:
findings.append("found over resource privielges: {}".format(statement[' Resource']))
return findings
def analyze_user_policies(user_name):
#分析单个用户的策略
print(f"\n[INF0] 开始分析用户: {user_name}")
policies = get_user_attached_policies(user_name)
if not policies:
print(f"[INFO] 用户 {user_name} 没有附加策略")
return
for policy in policies:
print(f"分析策略: {policy['PolicyName']}")
policy_document = get_policy_document(policy['PolicyArn'])
findings = validate_policy_with_access_analyzer(policy_document)
if findings:
print(f"Access Analyzer 发现策略问题:")
for finding in findings:
print(f" - {finding['findingDetails']}")
sql= f"""INSERT INTO gcpIamCheck (project, account, rules, action, recommend) VALUES ('aws',{"'" + user_name + "'"},{"'" + policy['PolicyName'] + "'"},'remove',{"'" + findings['findingDetails'] + "'"})"""
executeMysqlData(1,' write', sql)
print (f"自动分析策略完成,开始自定义扫描策略")
manual_findings = detect_overly_permissive(policy_document)
if manual_findings:
for f in manual_findings:
print(f" !自定义检测发现:{f}")
if '*' in f:
sql= f"INSERT INTO gcpIamCheck (project, account, rules, action, recommend) VALUES ('aws',{"'" + user_name + "'"},{"'" + policy['PolicyName'] + "'"},'remove','resource_all')"
else:
sql= f"INSERT INTO gcpIamCheck (project, account, rules, action, recommend) VALUES ('aws',{"'" + user_name + "'"},{"'" + policy['PolicyName'] + "'"},'remove',{"'" + f + "'"))"
executeMysqlData(1,' write', sql)
else:
print(f" 自定义扫描未发现宽泛权限")
def main():
print("[INFO] 开始扫描所有 IAM 用户…\n")
all_users = list_all_iam_users()
print (f"发现用户{all_users}")
if not all_users:
print("[INFO] 当前账户没有 IAM 用户")
return
for user in all_users:
analyze_user_policies(user)
main()