aws防火墙信息抓取
拉取aws防火墙信息 需要调用到基础接口脚本里的数据库读写和boto3库
import boto3
from botocore.exceptions
import ClientError
from BasicInterfaces import executeMysqlData
#获取所有可用区域
def get_all_regions():
try:
ec2_client = boto3.client('ec2')
response = ec2_client.describe_regions()
regions = [region['RegionName']
for region in response['Regions']]
return regions
except ClientError as e:
print(f" Error fetching regions: {e}")
return []
#获取指定区域的安全组
def get_security_groups_in_region(session, region):
try:
ec2_ client = session.client('ec2', region_name=region)
response = ec2_client.describe_security_groups()
return response['SecurityGroups']
except ClientError as e:
print(f" Error describing security groups in region {region}: {e}")
return []
#遍历所有区域获取安全组
def get_security_groups_by_region(session):
security_groups_by_region = {}
regions = get_all_regions()
for region in regions:
print(f" Processing region: {region}... ")
security_groups = get_security_groups_in_region(session, region)
security_groups_by_region[region] = security_groups
return security_groups_by_region
#示例:遍历所有账户和区域的安全组
def collect_security_groups_by_account_and_region(target _ accounts):
account_security_groups = {}
for role_arn in target_accounts:
# 从 RoleArn 提取 Account ID
account_id = role_arn.split(":")[4]
print(f" Processing account {account _ id}... ")
#切换到目标账户角色
session = assume_role(role_arn)
if not session:
continue
else:
security_groups_by_region = get_security_groups_by_region(session)
account_security_groups[account_id] = security_groups_by_region
return account_security_groups
# AssumeRole 函数(参考前面代码)
def assume_role(role_arn, session_name="CrossAccountSession"):
sts_client = boto3.client('sts')
try:
response = sts_client.assume_role(RoleArn= role_arn,RoleSessionName = session_name)
credentials = response['Credentials']
session = boto3.session.Session(
aws_access_key_id= credentials['AccessKeyId'],
aws_secret_access_key= credentials['SecretAccessKey'],
aws_session_token= credentials['SessionToken']
)
return session
except ClientError as e:
print(f" Error assuming role {role _ arn}: {e}")
return None
#示例目标账户 RoleArn 列表
TARGET_ACCOUNTS = ["arn:aws:iam::<awsID>:role/<跨账号访问的角色名>"," arn:aws:iam::<awsID>:role/<本地账号角色>"]
#获取所有账户和区域的安全组
security_groups_by_account_and_region = collect_security_groups_by_account_and_region(TARGET_ACCOUNTS)
#打印结果
for account_id,regions in security_groups_by_account_and_region.items():
account = account_id
for region,security_groups in regions.items():
regionRecord = region
for sg in security_groups:
rulename = sg['GroupName']
for rule in sg['IpPermissions']:
if rule['IpProtocol'] != '-1':
for i in rule['IpRanges']:
if '0.0.0.0/0' in i['CidrIp']:
protocol = rule['IpProtocol']
fromPort = rule['FromPort']
toPort = rule['ToPort']
ports = str(fromPort)+ '-' + str(toPort)
sql= f"""INSERT INTO gcpfirewall (cloud,project,rulename,action,direction,protocol,ports) VALUES('aws',{"'" + regionRecord + "'"},{"'" + rulename + "'"},'allow',{"'" + account + "'"},{"'" + protocol + "'"},{"'" + ports + "'"})"""
executeMysqLData(1,' write', sql)


