aws虚拟机信息抓取

基于Python3写的抓取aws虚拟机信息

拉取aws虚拟机信息 需要调用到基础接口脚本里的数据库读写和boto3库

import boto3
from BasicInterfaces import executeMysqlData
def assume_role(account_id,role_name,region_name):
    #假设角色,返回新的 AWS 会话
    sts_client = boto3.client("sts",region_name= region_name)
    role_arn = f"arn:aws:iam::{account_id}:role/{role_name}"
    #假设角色
    response = sts_client.assume_role(RoleArn= role_arn,RoleSessionName="AssumeRoleSession")
    #获取临时凭证
    credentials = response["Credentials"]
    #返回新的会话
    return boto3.Session(
        aws_access_key_id= credentials["AccessKeyId"],
        aws_secret_access_key= credentials["SecretAccessKey"],
        aws_session_token= credentials["SessionToken"],
        region_name= region_name
        )
def get_all_regions(session):
    #获取所有支持的区域
    ec2_client = session.client("ec2")
    response = ec2_client.describe_regions(AllRegions= True)
    return [region["RegionName"] for region in response[" Regions"]]
def get_ec2_instance_info(session, region_name):
    #获取指定区域内的 EC2 实例信息
    ec2_client = session.client("ec2",region_name= region _ name)
    response = ec2_client.describe_instances()
    instance_info_list = []
    for reservation in response[" Reservations"]:
        for instance in reservation[" Instances"]:
            instance_info = {
                "InstanceId": instance.get("InstanceId"),
                "Name": "",
                "Platform": instance.get("Platform", "Linux/Unix"),
                "PrivateIp": instance.get("PrivateIpAddress"),
                "PublicIp": instance.get("PublicIpAddress"),
                "SecurityGroups": [sg["GroupName"] for sg in instance. get("SecurityGroups", [])],
                "Region": region_name
                }
                #获取实例名称(从标签中获取 Name 标签)
                if"Tags" in instance:
                    for tag in instance["Tags"]:
                        if tag["Key"] == "Name":
                            instance_info["Name"] = tag["Value"]
                            break
            instance_info_list.append(instance_info)
    return instance_info_list
def main():
    #多账户配置
    accounts = [{
        "account_id": "<awsID>", 
        "role_name": "<rolename>"
        },
        {
            " account_id": "<awsID>", " role_name": "<rolename>"
        }]
    all_instances = []
    for account in accounts:
        print(f"Fetching EC2 instances for account: {account['account_id']}")
        try:
            #假设角色到第一个区域以获取所有可用区域
            session = assume_role(account["account_id"], account["role_name"], "us-east-1")
            regions = get_all_regions(session)
            for region in regions:
                try:
                    print(f"Fetching EC2 instances for region: {region}")
                    #假设角色并切换区域
                    session = assume_role(account["account_id"], account["role_name"], region)
                    instances = get_ec2_instance_info(session, region)
                    #添加账户标识
                    for instance in instances:
                        instance["AccountId"] = account["account_id"]
                        all_instances.extend(instances)
                except Exception as e:
                    pass
        except Exception as e:
            pass
    #输出所有实例信息
    for instance in all_instances:
        if instance['PublicIp'] == None:
            instance['PublicIp'] = 'Unknown'
        sql= f"""INSERT INTO gcpinfo(project, name, extip, intip, tags, machinetype, cloud) VALUES ({"'" + instance['AccountId'] +  "'"},{"'" + instance['Name'] + "'"},{"'" + instance['PublicIp'] + "'"},{"'" + instance['PrivateIp'] + "'"},{"'" + instance['InstanceType'] + "'"},{"'" + instance['Platform'] +"'"},'aws')"""
        executeMysqlData(1,'write', sql)