aws虚拟机信息抓取
拉取aws虚拟机信息 需要调用到基础接口脚本里的数据库读写和boto3库
import boto3
from BasicInterfaces import executeMysqlData
def assume_role(account_id,role_name,region_name):
#假设角色,返回新的 AWS 会话
sts_client = boto3.client("sts",region_name= region_name)
role_arn = f"arn:aws:iam::{account_id}:role/{role_name}"
#假设角色
response = sts_client.assume_role(RoleArn= role_arn,RoleSessionName="AssumeRoleSession")
#获取临时凭证
credentials = response["Credentials"]
#返回新的会话
return boto3.Session(
aws_access_key_id= credentials["AccessKeyId"],
aws_secret_access_key= credentials["SecretAccessKey"],
aws_session_token= credentials["SessionToken"],
region_name= region_name
)
def get_all_regions(session):
#获取所有支持的区域
ec2_client = session.client("ec2")
response = ec2_client.describe_regions(AllRegions= True)
return [region["RegionName"] for region in response[" Regions"]]
def get_ec2_instance_info(session, region_name):
#获取指定区域内的 EC2 实例信息
ec2_client = session.client("ec2",region_name= region _ name)
response = ec2_client.describe_instances()
instance_info_list = []
for reservation in response[" Reservations"]:
for instance in reservation[" Instances"]:
instance_info = {
"InstanceId": instance.get("InstanceId"),
"Name": "",
"Platform": instance.get("Platform", "Linux/Unix"),
"PrivateIp": instance.get("PrivateIpAddress"),
"PublicIp": instance.get("PublicIpAddress"),
"SecurityGroups": [sg["GroupName"] for sg in instance. get("SecurityGroups", [])],
"Region": region_name
}
#获取实例名称(从标签中获取 Name 标签)
if"Tags" in instance:
for tag in instance["Tags"]:
if tag["Key"] == "Name":
instance_info["Name"] = tag["Value"]
break
instance_info_list.append(instance_info)
return instance_info_list
def main():
#多账户配置
accounts = [{
"account_id": "<awsID>",
"role_name": "<rolename>"
},
{
" account_id": "<awsID>", " role_name": "<rolename>"
}]
all_instances = []
for account in accounts:
print(f"Fetching EC2 instances for account: {account['account_id']}")
try:
#假设角色到第一个区域以获取所有可用区域
session = assume_role(account["account_id"], account["role_name"], "us-east-1")
regions = get_all_regions(session)
for region in regions:
try:
print(f"Fetching EC2 instances for region: {region}")
#假设角色并切换区域
session = assume_role(account["account_id"], account["role_name"], region)
instances = get_ec2_instance_info(session, region)
#添加账户标识
for instance in instances:
instance["AccountId"] = account["account_id"]
all_instances.extend(instances)
except Exception as e:
pass
except Exception as e:
pass
#输出所有实例信息
for instance in all_instances:
if instance['PublicIp'] == None:
instance['PublicIp'] = 'Unknown'
sql= f"""INSERT INTO gcpinfo(project, name, extip, intip, tags, machinetype, cloud) VALUES ({"'" + instance['AccountId'] + "'"},{"'" + instance['Name'] + "'"},{"'" + instance['PublicIp'] + "'"},{"'" + instance['PrivateIp'] + "'"},{"'" + instance['InstanceType'] + "'"},{"'" + instance['Platform'] +"'"},'aws')"""
executeMysqlData(1,'write', sql)