使用nmap扫描外租机开放的端口并识别

使用nmap扫描外租机开放的端口并识别

使用nmap扫描外租机开放的端口并识别 需要用到pubapi来写入数据库 不适用于所有环境,脚本只做技术储备

from requests.packages import urllib3
from BasicInterface import executeMysqlData
from base64 import b64encode
import json, time,re,requests,datetime
urllib3.disable_warnings()
#将cmdb的ip地址提交到gcp的nmap接口去扫描开放端口
def isPublicIpv4(ip):
    pattern = r'^(?:(?:(?:[1-9]?\d|1\d\d|2[0-4]\d|25[0-5])\.){3}{?:[1-9]?\d|1\d\d|2[0-4]\d|25[θ-5]))$'
    private_pattern = r'^(?:(?:10|127)\.\d{1,3}\.\d{1,3}\.\d{1,3}|(?:172\.(?:1[6-9]|2\d|3[01]))\.\d{1,3}\.\d{1,3}|192\.168\.\d{1,3}\.\d{1,3}}$'
    return re.match(pattern,ip) is not None and re.match(private_pattern,ip) is None
    #只要7天内有扫过的 ip就不会再扫描了,所以这里抓出7天内有扫描过的
    ipqueryIPExist = "SELECT ip FROM ScanResult WHERE ScanTime >= CURDATE() - INTERVAL 6 DAY AND ScanTime <=CURDATE() GROUP BY ip;"
    resultQuery = executeMysqlData(1,'read',queryIPExist)
    resultList = []
    for q in resultQuery:
        for r in q:
            resultList.append(r)
            cookies = {'sso' : '<ssoKey>'}
            #脚本中的接口有多页,所以做了一个遍历,n为页面编号
n=1
ipListJson = {}
#本地手动添加的ip列表,如果有需要增加的,则按格式添加即可
ipList = ['<ipaddress>']
while 1:
    response = requests.get('<serverUrl>'%n, cookies= cookies , verify= False)
    status_code = response.status_code
    response = json.loads(response.text)
    if status_code != 404:
        for m in range (θ,len(response['result'])):
            name = response['result'][m]['name']
            ip = response['result'][m]['ip']
            user = response['result'][m]['user']
            app = response['result'][m]['app']
            #有些ip地址栏里写了乱七八糟的东西,所以为了确保获取到正确的 ip,所以限制了长度
            if isPublicIpv4(ip) == True and ip != '0.0.0.0' and ip != '1.1.1.1' and ip != 'N/a' and ip!='θ' and ip !' and ip !='127.0.0.1':
                ipListJson[name] = {}
                ipListJson[name]['ip'] = ip
                ipListJson[name]['user'] = user
                ipListJson[name]['app'] = app
            else:
                break
            n += 1
for i in ipList:
    ipListJson[i] = {}
    ipListJson[i]['ip'] = i
    ipListJson[i]['user'] = ''
    #外租机的 ip和端口,外租机上使用 flask开放了1080端口用来接收数据,只能接收文本 text格式的数据
    url='<serverurl>'
    #使用 for循环进行发送数据,每个 ip发送一次
    for j in ipListJson:
        currentDatetime = datetime.datetime.now()
        mysqlDateFormat = "'" + currentDatetime.strftime('%Y-%m-%d %H:%M:%S') + "'"
        ip = ipListJson[j]['ip']
        if ip.split('.')[-1] == '255':
            print (f'该ip地址{ip}无法进行扫描, 忽略。。。')
        else:
            if ip not in resultList:
                print (f'{ip}提交到接口进行扫描')
                n=θ
                while 1:
                    try:
                        if n != 3:
                            response = requests.post(url, data= ip, timeout=1800)
                            break
                        else:
                            print (f'重试{n}次失败, 跳过。。。。')
                            break
                    except:
                        n += 1
                        print (f'扫描失败, 正在进行第{n}次重试')
                        time.sleep(30)
                if ipListJson[j][' user'] is not None:
                    #这里有的时候会扫描失败,造成返回不了 json数据,所以 try一下,无法格式化的 ip默认扫描失败,跳过
                    if isinstance(ipListJson[j]['user'], str):
                        userStr = "'" + ipListJson[j]['user'] + "'"
                    elif isinstance(ipListJson[j]['user'],list):
                        userStr = "'" + ", ". join(ipListJson[j][' user']) + "'"
                    else:
                        print (ipListJson[j]['user'])
                else:
                    userStr = "'" + ' unknown' + "'"
                try:
                    scanResultJson = response.json()
                    for k in scanResultJson:
                        #有些 ip地址所有的端口都是关闭的,所以扫描的时候返回的 portstatus的 key是没有的导致出问题,所以 try一下,如果失败的就当作扫描失败,跳过
                        try:
                            portStr = scanResultJson[k]['portstatus']
                        except:
                            sql= f"""INSERT INTO ScanResult (ScanTime, ip, user) VALUES ({mysqlDateFormat},{"'" + k + "'"},{userStr})"""
                            print (f'no port scaned open, will insert into sql through {sql}')
                            executeMysqlData(1,'write', sql)
                            #使用 for循环进行行放的端口读取并写入到数据库,按端口进行存入
                        for l in scanResultJson[k]['portstatus']:
                            applicationStr = scanResultJson[k]['portstatus'][l]['application']
                            if applicationStr == '':
                                applicationStr = "'" + 'Unknown' + "'"
                            serviceStr = "'" + scanResultJson[k]['portstatus'][l]['service'] + "'"
                            sql= f"""INSERT INTO ScanResult (ScanTime, application, ip, name, openport, service, user) VALUES  〔{mysqlDateFormat},{applicationStr},{"'" + k + "'"},{"'" + j + "'"},{"'" + l + "'"},{serviceStr},{userStr})"""
                            executeMysqlData(1,'write', sql)
                except:
                    sql= f"""INSERT INTO ScanResult (ScanTime, ip, user) VALUES〔{mysqlDateFormat},{"'" + ip + "'"},{userStr})"""
                    executeMysqlData(1,'write', sql)
                    #这里使用 for循环去读取 json数据,将数据提取出来以后,再写入数据库
            else:
                print (fˈip:{ip}已经扫描完毕, 忽略。。。')

同步多个gcp项目的防火墙

同步多个gcp项目的防火墙

同步多个gcp项目的防火墙,用于多个gcp项目(或账号)之间同步黑名单数据,需要使用基础库的数据库读写

from requests.packages import urllib3
import time, re
from google.cloud import compute _v1
from google. oauth2 import service_account
from BasicInterfaces import executeMysqlData
def listLessThan5kFirewallRule(credentials,projectId):
    client = compute _v1.FirewallsClient(credentials=credentials)
    firewallRuleList = client.list(project=projectId)
    query = "blocklist"
    for firewallName in firewallRuleList:
        if query in firewallName.name:
            if len(firewallName.source_ranges) != 5000:
                capacityFirewall = 5000 - Len(firewallName.source_ranges)
                return firewallName.name,capacityFirewall
            elif len(firewallName.source_ranges) == 2:
                try:
                    removeSourceIPToFirewallRule(credentials,projectId,firewallName.name,'224.1.1.132/32')
                except:
                    pass
            else:
                return None
def addSourceIPToFirewallRule(credentials,projectId,firewallRuleName,ipaddressForAdd):
    client = compute _v1.FirewallsClient(credentials=credentials)
    firewall = client.get(project=projectId,firewall=firewallRuleName)
    for i in ipaddressForAdd:
        firewall.source_ranges.append(i)
        try:
            operation = client.update(project=projectId, firewall=firewalLRuleName, firewall_resource=firewall)
            operation.result()
        except Exception as e:
            return str(e). split()[-1]
def removeSourceIPToFirewallRule(credentials,projectId,firewallRuleName,ipaddressForAdd):
    client = compute _v1.FirewallsClient(credentials=credentials)
    firewall = client.get(project=projectId, firewall=firewallRuleName)
    firewall.source_ranges.remove(ipaddressForAdd)
    operation = client.update(project=projectId, firewall=firewallRuleName, firewall_resource=firewall)
    operation. result()
def getResource0fFirewall(credentials,projectId,firewallRule):
    client = compute _v1.FirewallsClient(credentials=credentials)
    firewallRuleList = client.list(project=projectId)
    for firewallName in firewallRuleList:
        if firewallRule in firewallName.name:
            #print (firewallName.name)
            #time.sleep(30)
            #print (firewallName.source_ranges)
            return firewallName.source_ranges
def newFirewallPolicy(credentials,projectId, firewallRuleName, ipaddresses):
    client = compute _v1.FirewallsClient(credentials= credentials)
    firewall = {
        "denied": [{"I_p_ protocol": "all"}],
        "direction": "INGRESS",
        "name": firewallRuleName,
        "network": "global/networks/default",
        "priority": 999,
        "source_ranges": ipaddresses
        }
    try:
        client.insert(project=projectId, firewall_resource = firewall)
        return True
    except Exception as e:
        return (str(e).split()[-2] + " " + str(e).split()[-1])
projectsList = ["<projectname>"]
credentialsCST = service_account.Credentials.from_service_account_file('<xxx.json>')
for projectInfo in projectsList:
    print (f'开始处理{projectInfo}黑名单。。。')
    queryScanData = f"select ip from sync_ip_info where {projectInfo}=0"
    scanData = executeMysqlData(1,'read',queryScanData)
    ips = [item[θ] for item in scanData]
    if len(ips)!= 0:
        whileIPsList = ['<ips>']
        for i in whileIPsList:
            try:
                ips.remove(i)
                print(f'发现白名单 ip{i}在列表中, 尝试移除。。。')
                sql= f"""update sync_ip_info set {projectInfo}=1 where ip='{i}' and {projectInfo}=θ"""
                executeMysqlData(1,'write',sql)
                print (f'白名单 ip{i}移除完成')
            except:
                pass
        print (f'找到未同步的 ip{len(ips)}个')
        try:
            firewallRuleName,capacityFirewall = listLessThan5kFirewallRule(credentialsCST,projectInfo)
        except:
            firewallRuleName = None
        if firewallRuleName is not None:
            print (f'找到防火墙规则{firewallRuleName}, 尝试写入...')
            while 1:
                resultAddIP = addSourceIPToFirewallRule(credentialsCST,projectInfo,firewallRuleName,ips[θ:capacityFirewall])
                if resultAddIP is None:
                    print (f'更新黑名单ip成功,同步数据库中。。。')
                    for i in ips:
                        sql= f"""update sync_ip_info set {projectInfo}=1 where ip='{i}' and {projectInfo}=θ"""
                        executeMysqlData(1,'write', sql)
                    print (f'数据库同步完成')
                ips = ips[capacityFirewall:]
                if len(ips) > θ:
                    print (f'防火墙规则{firewallRuleName}写入完成,还剩下{len(ips)}个ip')
                    for i in range (1,1000):
                        firewallRuleName = "blocklist" + str(i)
                        resultAddPolicy = newFirewallPolicy(credentialsCST,projectInfo,firewallRuleName,ips[0:5000])
                        if resultAddPolicy == 'already exists':
                            i += 1
                        else:
                            print (f"防火墙规则{firewallRuleName}创建并写入成功!")
                            break
                        print (f'更新黑名单ip成功,同步数据库中。。')
                        for i in ips[θ:5000]:
                            sql= f"""update sync_ip_info set {projectInfo}=1 where ip='{i}' and {projectInfo}=θ"""
                            executeMysqlData(1,'write', sql)
                        print (f'数据库同步完成')
                    ips = ips[5001:]
                    if len(ips)>0:
                        print (f'防火墙规则{firewallRuleName}写入完成,还剩下{len(ips)}个ip')
                    else:
                        print (f"本次黑名单同步完成! ")
                        break
        else:
            while 1:
                for i in range (1,1000):
                    firewallRuleName = "blocklist" + str(i)
                    resultAddPolicy = newFirewallPolicy(credentialsCST,projectInfo,firewallRuleName,ips[0:5000])
                    if resultAddPolicy == 'already exists':
                        i += 1
                    else:
                        print (f"防火墙规则{firewallRuleName}创建并写入成功! ")
                        break
                    print (f'更新黑名单 ip成功,同步数据库中。。')
                    for i in ips[0:5000]:
                        sql= f"""update sync_ip_info set {projectInfo}=1 where ip='{i}' and {projectInfo}=0"""
                        executeMysqlData(1,'write', sql)
                    print (f'数据库同步完成')
                ips = ips[5001:]
                if len(ips) >0:
                    print (f'防火墙规则{firewallRuleName}写入完成,还剩下{len ips}个 ip')
                else:
                    print (f"本次黑名单同步完成! ")
                    break

收集windows上软件安装信息

收集windows上软件安装信息

收集windows上软件安装信息,需要编译成exe文件后运行(如果没有环境的话) 需要用到pubapi来写入数据库 由于wazuh有同样的功能可以替代,故本脚本只做技术储备

import winreg, requests, json, platform, socket, time, random
from datetime import date
from requests import urllib3
urllib3.disable_warnings()
#测试的时候可以调整这个数值,保留是因为服务端是flask写的,怕量太大崩掉
delay=random.randint(1,1200)
for i in range(delay, θ,-10):
    remaining = min(i, 10)
    print(f" This script will run in {i} seconds, please wait... ")
    time.sleep(remaining)
def getInstalledSoftware():
    softwareSet = set()
    #处理64位注册表中的软件信息
    uninstallKey64 = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE,r"Software\Microsoft\Windows\CurrentVersion\Uninstall")
    softwareSet.update(getSoftwareFromRegistry(uninstallKey64))
    #处理32位注册表中的软件信息
    uninstallKey32 = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE,r"Software\WOW6432Node\Microsoft\Windows\CurrentVersion\Uninstall")
    softwareSet.update(getSoftwareFromRegistry(uninstallKey32))
    return list(softwareSet)
def getSoftwareFromRegistry(regKey):
    softwareList = []
    for i in range(θ, winreg.QueryInfoKey(regKey)[θ]):
        subKeyName = winreg.EnumKey(regKey,i)
        subKey = winreg.OpenKey(regKey,subKeyName)
        try:
            displayName = winreg.QueryValueEx(subKey,"DisplayName")[0]
            displayVersion = winreg.QueryValueEx(subKey,"DispLayVersion")[θ]
            #白名单软件
            if displayName != 'aic94xx-firmware' and displayName != ' Azure Data Studio' and '系統更新應用程式' not in displayName and ' Update for' not in displayName and ' for SQL Server' not indisplayName and displayName != ' cim-schema' and ' Cloudbase' not in displayName and ' dstat' notin displayName and displayName != ' galera-4' and displayName != ' hdparm' and ' Hotfix' not indisplayName and displayName != 'iDRAC-with-Lifecycle-Controller' and 'IIS URL' not indisplayName and displayName != ' Integration Services' and displayName != ' Kits Configuration Installer' and ' Analysis Services' not in displayName and displayName != ' Microsoft Edge  Update' and ' Language Pack' not in displayName and ' Microsoft Exchange Speech' not in  displayName and ' Microsoft Help Viewer' not in displayName and ' Microsoft Server Speech' not in  displayName and ' Microsoft Speech' not in displayName and 'T-SQL' not in displayName and ' Setup  Support Files' not in displayName and ' Native Client' not in displayName and 'RsFx D river' not  in displayName and ' Setup (English)' not in displayName and ' Microsoft Unified Communications  Managed API' not in displayName and ' Microsoft Visual Studio Tools' not in displayName and  ' Microsoft VSS Writer' not in displayName and ' Notepad++' not in displayName and displayName !='SDK Debuggers' and ' Security Update for' not in displayName and ' Spice Agent' not indisplayName and ' Batch Parser' and ' Tools for' not in displayName and ' Client Tools' not in  displayName and ' Common Files' not in displayName and ' Connection Info' not in displayName and  ' Database Engine' not in displayName and 'DMF' not in displayName and ' DocumentationComponents' not in displayName and ' Shared Management Objects' not in displayName and 'SQL  Diagnostics' not in displayName and ' Reporting Services' not in displayName and ' Trellix ' not  in displayName and ' for SSMS' not in displayName and ' Wazuh Agent' not in displayName and 'SDK  EULA' not in displayName and ' Software Development Kit' not in displayName:
                softwareList.append((displayName, displayVersion)) # 将软件信息转换为元组
        except:
            pass
    return softwareList
def getComputerInfo():
    hostname = socket.gethostname()
    ip = socket.gethostbyname(hostname)
    osName = platform.system()
    osVersion = platform.version()
    today = date.today().strftime("%Y-%m-%d")
    return {"ip": ip, "hostname": hostname, "osname": osName, "osversion": osVersion, "date": today}
softwareInfo = getInstalledSoftware()
computerInfo = getComputerInfo()
finalData = []
for software in softwareInfo:
    if "'" in software[0]:
        softwareName = software[θ].
        split()[0]else:softwareName = software[θ]
        softwareData = {"ip": computerInfo["ip"],"hostname": computerInfo["hostname"],"software": softwareName,"version": software[1],"osname":computerInfo["osname"],"osversion": computerInfo["osversion"]}
        finalData.append(softwareData)
jsonData = json.dumps(finalData)
url=' <domainname>/insert'
headers = {'Content-Type': 'application/json'}
print (requests.post(url,verify= False, data=jsonData, headers=headers).text)

收集网络管理软件上的网段信息

收集网络管理软件上的网段信息

收集网络管理软件上的网段信息 需要用到pubapi来写入数据库 不适用于所有环境,脚本只做技术储备

import mysql.connector
import requests, json
from requests.packages import urllib3
from Basicinterface import executeMysqlData
#从地址库系统里定时拉数据出来,更新到自己的数据库里,如果有自己的网段也可以手动加进去,后续如果地址用处变更则会自动更新相关的条目,如果没有则会新建
apiToken = '33cbb6a54b5fa924e5be1a0917d0b856'
server=  '<serverUrl>'
serverQuery = [{'serverIP':server,'apiToken':apiToken}]
urllib3.disable_warnings()
n = 1
subnetDict = {}
for i in serverQuery:
    while 1:
        try:
            response = requests.get(f'https://{i['serverIP']}:8861/api/json/ipam/getAllSubnetSummary?apiKey={i['apiToken']}&&rows=100&page={n}',verify= False)
            #print (json.loads(response.text)[" records"])
        except:
            print(f'{i['serverIP']}获取失败!'
        try:
            jsonServerData = json.loads(response.text)['rows']
        except:
            print(f'{i['serverIP']}转换成 json失败!!')
            jsonServerData = []
            if len(jsonServerData) != θ:
                for j in jsonServerData:
                    subnetCidr = j['subnet-address-cidr']
                    if subnetCidr not in subnetDict:
                        subnetDict[subnetCidr] = {}
                    subnetDict[subnetCidr]['subnetName'] = j['subnet-name']
                n += 1
            else:
                n = 1
                break
for i in subnetDict:
    #查询数据是否有重复,如没有就写入,否则跳过
    subnets = "'"+ i + "'"
    nameOfSubnet = "'" + subnetDict[i]['subnetName'] + "'"
    querySubnet = f"SELECT * from IPStatus WHERE subnet = {subnets}"
    result = readMysqlData(1,querySubnet)
    if len(result) == θ :
        sql= f"""INSERT INTO IPStatus (subnet, name) VALUES ({"'"+ i + "'"},{"'" + name + "'"})"""
        executeMysqlData(3,'write',sql)
    elif result[θ][2] == "" or result[θ][2] != subnetDict[i]['subnetName']:
        sql= f"""UPDATE IPStatus set name = {"'" + nameOfSubnet + "'"} WHERE subnet = {"'" + subnet + "'"}"""
        executeMysqlData(3,'write',sql)

统计hfish节点信息

统计蜜罐节点信息

统计蜜罐节点信息 需要用到pubapi来写入数据库 不适用于所有环境,脚本只做技术储备

from BasicInterface import executeMysqlData
#该脚本主要功能是拉取所有蜜罐端点IP
serverlist = [2,5,4]
for server in serverlist:
    sql= """ select net_info from hosts"""
    if server == 2:
        print ('CST')
    elif server == 5:
        print ('SA')
    elif server == 4:
        print ('公网')
    result = executeMysqlData(server,'read', sql)
    for i in result:
        for j in i[θ].split('|'):
            sqlForCheck = f"""select up from honeypot where ip = {"'"+ j.split("&&")[2] + "'"}"""
            resultForCheck = executeMysqlData(3,'read',sqlForCheck)
            if len(resultForCheck) == 0:
                sqlForInsert = f"""insert into honeypot (ip) values({"'" + j.split("&&")[2] + "'"})"""
                executeMysqlData (3,'write',sqlForInsert)

读取ELK信息并写入数据库

读取ELK信息并写入数据库

读取ELK信息并写入数据库 需要用到pubapi来写入数据库 不适用于所有环境,脚本只做技术储备

from BasicInterface import *
from datetime import datetime, timedelta
import re, time
from requests.packages import urllib3
urllib3.disable_warnings()
#每小时拉取一次告警数据以供 grafana读取和审计
time1 = datetime.now().strftime("%Y-%m-%dT%H:%M:%S" + "+0800")
time2 = (datetime.now() - timedelta(minutes=60)).strftime("%Y-%m-%dT%H:%M:%S" + "+0800")
def requestWazuhLv12(server, index):
    if server == 3:
        srv = """' sec'"""
    elif server == 4:
        srv = """' saint""""
    elif server == 5:
        srv = """' saext'"""
    sql= f""" select * from WazuhExcept where server={srv}"""
    result = executeMysqlData(1,'read',sql)
    for item in result:
        body = {
            "query": {
                "bool": {
                    "filter":
                    [{
                        "bool":{
                            "should":{
                                "range": {
                                    "rule.level": {
                                        "gte": 12
                                        }
                                    }
                                }
                            }
                        }]
                    }
                },
                "size": 100
            }
        key = item[2]
        value = item[1]
        time2 = item[4]
        time1 = item[5]
        valueList = []
        try:
            for j in range (θ,len(value.split(','))):
                valueList.append(value.split(',')[j])
        except:
            valueList = value
        if time2 is None or time1 is None:
            time2 = (datetime.now() - timedelta(minutes=60)).strftime("%Y-%m-%dT%H:%M:%S" + "+880θ")
            time1 = datetime. now().strftime("%Y-%m-%dT%H:%M:%S" + "+8800")
        else:
            time2 = time2.strftime("%Y-%m-%dT%H:%M:%S" + "+8888")
            time1 = time1.strftime("%Y-%m-%dT%H:%M:%S" + "+0800")
            timerange = {"range": {
                "@timestamp": {
                    "gte": time2,
                    "lte": time1
                    }
                }
            }
            ruleset = [{
                "terms": {
                    key: valueList
                    }
                }]
            body["query"]["bool"]["filter"].append(timerange)
            body["query"]["bool"]["must_not"]= ruleset
            response = readELK(server, index, body,7)
            if response != False:
                if len(response['hits']['hits'])!= θ:
                    for i in response['hits']['hits']:
                        sourceHostname = "'" + i['_source']['agent']['name'] + "'"
                        timeWazuh = "'" + str(datetime.strptime(i['_source']['timestamp'].split('T')[θ] + " " + i['_source']['timestamp'].split('T')[1].split('+')[θ].split('.')[θ], "%Y-%m-%d %H:%M:%S"))+"'"
                        try:
                            usernameDomain = i['_source']['data']['win']['eventdata']['detection User']
                            if '\\' in usernameDomain:
                                username = "'" + usernameDomain.split('\\')[-1] + "'"
                                domain = "'" + usernameDomain.split('\\')[θ] + "'"
                        except:
                            username = '\'Unknown\''
                            domain = '\'None\''
                        if 'data' in i['_source']:
                            if 'vulnerability' not in i['_source']['data']:
                                if 'full_log' in i['_source']:
                                    if re.match("^CVE",i['_source']['full_log']) is None:
                                        contextWazuh = i['_source']['full_log'].replace("'", "")
                                elif 'category Name' in i['_source']['data']['win']['eventdata']:
                                    contextWazuh = i['_source']['data']['win']['eventdata']['category Name'] + ":" + i['_source']['data']['win']['eventdata']['path']
                                else:
                                    contextWazuh = i['_source']['data']['win']['system']['message']
                            else:
                                contextWazuh = i['_source']['data']['vulnerability']['title']
                            contextWazuh = contextWazuh.replace("'", "")
                        else:
                            contextWazuh = i['_source']['full_log']
                        if len(contextWazuh) >= 251:
                            contextWazuh = "'" + contextWazuh[:250] + "'"
                        else:
                            contextWazuh = "'" + contextWazuh + "'"
                        if server == 3:
                            if '10.70.4.' in i['_source']['agent']['ip'] or '10.70.5.' in i['_source']['agent']['ip'] or '10.70.6.' in i['_source']['agent']['ip'] or '10.70.8.' in i['_source']['agent']['ip'] or '10.78.6.' in i['_source']['agent']['ip'] or '10.70.8.' in i['_source']['agent']['ip'] or '10.79.27.' in i['_source']['agent']['ip'] or '10.21.25.' in i['_source']['agent']['ip']:
                                department = "'" + 'Techology' + "'"
                            else:
                                department = "'" + 'unknown' + "'"
                            try:
                                infoSource = "'" + i['_source']["data"]["win"]["system"]["providerName"] + "'"
                            except:
                                infoSource = "'"+ 'wazuh' + "'"
                            if 'Microsoft-Windows-Windows Defender' in infoSource:
                                sql=f"""INSERT INTO LogCollection (Type, Context, Infosource,HostnameOfSource, Domain, Account,TimeOfHappened, Auditer, Department) VALUES ('Audit',{contextWazuh},{infoSource},{sourceHostname},{domain},{username},{timeWzuh},'Doctor',{department})"""
                            else:
                                sql=f"""INSERT INTO LogCollection (Type, Context, Infosource,HostnameOfSource, Domain, Account,TimeOfHappened, Auditer, Department) VALUES(' Audit',{contextWazuh},{infoSource},{sourceHostname},{domain},{username},{timeWazuh},'Matteo',{department})"""
                        elif server == 4:
                            sql=f"""INSERT INTO LogCollection (Type,Context,Infosource,HostnameOfSource,Domain,Account,TimeOfHappened, Auditer, Department) VALUES ('Audit',{contextWazuh},'SaIntWazuh','{sourceHostname}','{domain}','{username}',{timeWazuh},'hsDanny','SA')"""
                        elif server == 5:
                            sql=f"""INSERT INTO LogCollection (Type, Context, Infosource,HostnameOfSource, Domain, Account,  TimeOfHappened, Auditer, Department) VALUES ('Audit',{contextWazuh},'SaExtWazuh',{sourceHostname},{domain},{username},{timeWazuh},'hsDanny','SA')"""
                        resultSql = executeMysqlData(3,'write', sql)
                        if resultSql == True:
                            print ('mcafee数据写入成功')
                        else:
                            print ('mcafee数据写入失败,请检查')
                else:
                    print (f'本次扫描完成, {server}没有发现12级以上的告警')
            else:
                print ('wazuh接口调用失败,故跳过本次查询!')
def requestWazuhAccount():
    body = {
        "query":{
            "bool": {
                "must": [{
                    "range": {
                        "@timestamp": {
                            "gte": time2,
                            "lte": time1
                            }
                        }
                    }],
                "filter": {
                    "bool": {
                        "must": [{
                            "terms": {
                                "data.win.eventdata.targetUserName": [
                                    "admin", "administrator", "zohoadm", "exadm","administratθr"]
                                    }
                                },
                        {
                            "bool": {
                                "must_not": {
                                    "term": {
                                        "rule.description":"Windows User Logoff."
                                        }
                                    }
                                }
                            },
                        {
                            "bool": {
                                "must_not": {
                                    "term": {
                                        "rule.description":"Windows logon success."
                                        }
                                    }
                                }
                            },
                        {
                            "bool": {
                                "must_not": {
                                    "term": {
                                        "data.win.eventdata.workstationName":"<computername>"
                                        }
                                    }
                                }
                            },
                        {
                            "bool": {
                                "must_not": {
                                    "term": {
                                        "data.win.eventdata.workstationName":"<computername>"
                                        }
                                    }
                                }
                            },
                        {
                            "bool": {
                                "must_not": {
                                    "term": {
                                        "rule.id": "60122"
                                        }
                                    }
                                }
                            },
                        {
                            "bool": {
                                "must_not": {
                                    "term": {
                                        "data.win.eventdata.logonProcessName": "NtLmSsp"
                                        }
                                    }
                                }
                            }]
                        }
                    }
                }
            }
        }
    result = readELK(serverId=3,indexDatα="<indexname>",bodyDatα=body,esVersion=7)
    if result != False:
        if len(result['hits']['hits'])!= 0:
            for i in result['hits']['hits']:
                pattern = r'[mM][aA][iɪ][lL]'
                if re.search(pattern, i['_source']['agent']['name'], re.IGNORECASE):
                    logForSql = "'"+ i['_source']['data']['win']['system']['severityValue'] + ", " + i['_source']['data']['win']['eventdata']['logonProcessName'] + "'"
                    accountForSql = "'" + i['_source']['data']['win']['eventdata']['targetUserName'] + "'"
                    hostForSql = "'" + i['_source']['agent']['name'] + "'"
                    ipForSql = "'" + i['_source']['agent']['ip'] + "'"
                    timeForSql = "'" + str(datetime.strptime(i['_source']['timestamp'].split('T')[θ] + " " + i['_source'][' timestamp'].split('T')[1].split('+')[θ].split('.')[θ], "%Y-%m-%d %H:%M:%S")) + "'"
                    sql=f"""INSERT INTO LogCollection (Type, Context,SourceIP, Infosource,HostnameOfSource, Account,TimeOfHappened) VALUES ('Audit',{logForSql},{ipForSql},'wazuh-特权账号登入',{hostForSql},{accountForSql},{timeForSql})"""
                    resultSql = executeMysqlData(3,'write', sql)
                    if resultSql == True:
                        print ('wazuh数据写入成功')
                    else:
                        print ('wazuh数据写入失败,请检查!')
                    check = θ
        else:
            print ('wazuh接口调用失败,故跳过本次查询!')
def requestMcafee():
    idList = ['877','878']
    for i in idList:
        data = apiFromMcafee(2,i)
        if data[θ:2] == 'OK':
            if len(data) > 10:
                jsonStr = json.loads(data[4:])
                if len(jsonStr) !=θ:
                    for j in jsonStr:
                        usernameDomain = j['EPOEvents.TargetUserName']
                        if usernameDomain != None:
                            if len(usernameDomain) == 1:
                                username = 'Unknown'
                                domain = 'Unknown'
                            elif '\\' in usernameDomain:
                                username = usernameDomain.split('\\')[-1]
                                domain = usernameDomain.split('\\')[θ]
                            elif '\\' not in usernameDomain:
                                username = usernameDomain
                                domain = 'Unknown'
                            else:
                                username= 'Unknown'
                                domain = 'Unknown'
                            try:
                                if len(j['EPExtendedEvent.NaturalLangDescription']) > 200:
                                    #print (j['EPExtendedEvent. NaturalLangDescription'])
                                    description = "'" + (j['EPExtendedEvent.NaturalLangDescription'])[θ:200] + "'"
                                else:
                                    description = "'" + (j['EPExtendedEvent.NaturalLangDescription']) + "'"
                            except:
                                if len(j['EPOEvents.ThreatName'])> 200:
                                    description = "'" + (j['EPOEvents.ThreatName'])[0:200] + "'"
                                else:
                                    description = "'" + (j['EPOEvents.ThreatName']) + "'"
                            sql=f"""INSERT INTO LogCollection (Type, Context, Infosource,HostnameOfSource, Domain,Account, Auditer) VALUES ('Audit',{description},'mcafee',{"'" + j['EPOComputerProperties.IPHostName']},{"'" + domain + "'"},{"'" + username + "'"},'Doctor')"""
                            resultSql = executeMysqLData(3,'write', sql)
                            if resultSql == True:
                                print ('mcafee数据写入成功')
                            else:
                                print ('mcafee数据写入失败,请检查!')
                else:
                    print ('mcafee近一小时没有收到告警, no news is good news~!!')
            else:
                print ('mcafee近一小时没有收到告警, no news is good news~!!')
serverlist = {3:{"index":"<indexname>"},4:{"index":"<indexname>"},5:{"index":"<indexname>"}}
for server in serverlist:
    requestWazuhLv12(server,serverlist[server]['index'])
requestWazuhAccount()
requestMcafee()

aws IAM信息并判断权限是否开大

基于Python3写的抓取aws IAM信息并判断权限是否开大

抓取aws IAM信息并判断权限是否开大 需要调用到基础接口脚本里的数据库读写和boto3库

import boto3,json
from botocore.exceptions import ClientError
from BasicInterfaces import executeMysqlData
def list_all_iam_users():
    #列出所有 IAM 用户
    iam_client = boto3.client('iam')
    users = []
    paginator = iam_client.get_paginator('list_users')
    for response in paginator. paginate():
        for user in response['Users']:
            users.append(user['UserName'])
    return users
def get_user_attached_policies(user_name):
    #获取用户附加的所有策略
    iam_client = boto3.client('iam')
    policies = iam_client.list_attached_user_policies(UserName= user_name)['AttachedPolicies']
    return policies
def get_policy_document(policy_arn):
    #获取策略文档
    iam_client = boto3.client('iam')
    policy_details = iam_client.get_policy(PolicyArn= policy_arn)
    policy_version = iam_client.get_policy_version(PolicyArn= policy_arn,VersionId= policy_details[' Policy']['DefaultVersionId'])
    return policy_version['PolicyVersion'][' Document']
def validate_policy_with_access_analyzer(policy_document):
    #使用 Access Analyzer 验证策略
    accessanalyzer_client = boto3.client('accessanalyzer')
    try:
        response = accessanalyzer_client.validate_policy(
            policyType='IDENTITY_POLICY',
            policyDocument= json.dumps(policy _ document)
            )
            return response[' findings']
    except ClientError as e:
        print(f"[ERROR] Access Analyzer 验证失败: {e}")
        return []
def detect_overly_permissive(policy_document):
    #自定义检测策略中过于宽泛的权限
    findings = []
    for statement in policy_document['Statement']:
        if statement['Effect'] == 'Allow':
            # 检查是否存在 Action 为 * 或 iam:*
            if 'Action' in statement and ('*' in statement['Action'] or 'iam:*' in statement[' Action']):
                findings.append("found over operation privielges: {}".format(statement[' Action']))
                # 检查是否存在 Resource 为 *
            if 'Resource' in statement and '*' in statement['Resource']:
                findings.append("found over resource privielges: {}".format(statement[' Resource']))
    return findings
def analyze_user_policies(user_name):
    #分析单个用户的策略
    print(f"\n[INF0] 开始分析用户: {user_name}")
    policies = get_user_attached_policies(user_name)
    if not policies:
        print(f"[INFO] 用户 {user_name} 没有附加策略")
        return
    for policy in policies:
        print(f"分析策略: {policy['PolicyName']}")
        policy_document = get_policy_document(policy['PolicyArn'])
        findings = validate_policy_with_access_analyzer(policy_document)
        if findings:
            print(f"Access Analyzer 发现策略问题:")
            for finding in findings:
                print(f" - {finding['findingDetails']}")
                sql= f"""INSERT INTO gcpIamCheck (project, account, rules, action, recommend) VALUES ('aws',{"'" + user_name + "'"},{"'" + policy['PolicyName'+ "'"},'remove',{"'" + findings['findingDetails'] + "'"})"""
                executeMysqlData(1,' write', sql)
        print (f"自动分析策略完成,开始自定义扫描策略")
        manual_findings = detect_overly_permissive(policy_document)
        if manual_findings:
            for f in manual_findings:
                print(f"  !自定义检测发现:{f}")
                if '*' in f:
                    sql= f"INSERT INTO gcpIamCheck (project, account, rules, action, recommend) VALUES  ('aws',{"'" + user_name + "'"},{"'" + policy['PolicyName'] + "'"},'remove','resource_all')"
                else:
                    sql= f"INSERT INTO gcpIamCheck (project, account, rules, action, recommend) VALUES  ('aws',{"'" + user_name + "'"},{"'" + policy['PolicyName'] + "'"},'remove',{"'" + f + "'"))"
                executeMysqlData(1,' write', sql)
        else:
            print(f"  自定义扫描未发现宽泛权限")
def main():
    print("[INFO] 开始扫描所有 IAM 用户…\n")
    all_users = list_all_iam_users()
    print (f"发现用户{all_users}")
    if not all_users:
        print("[INFO] 当前账户没有 IAM 用户")
        return
    for user in all_users:
        analyze_user_policies(user)
main()

aws虚拟机信息抓取

基于Python3写的抓取aws虚拟机信息

拉取aws虚拟机信息 需要调用到基础接口脚本里的数据库读写和boto3库

import boto3
from BasicInterfaces import executeMysqlData
def assume_role(account_id,role_name,region_name):
    #假设角色,返回新的 AWS 会话
    sts_client = boto3.client("sts",region_name= region_name)
    role_arn = f"arn:aws:iam::{account_id}:role/{role_name}"
    #假设角色
    response = sts_client.assume_role(RoleArn= role_arn,RoleSessionName="AssumeRoleSession")
    #获取临时凭证
    credentials = response["Credentials"]
    #返回新的会话
    return boto3.Session(
        aws_access_key_id= credentials["AccessKeyId"],
        aws_secret_access_key= credentials["SecretAccessKey"],
        aws_session_token= credentials["SessionToken"],
        region_name= region_name
        )
def get_all_regions(session):
    #获取所有支持的区域
    ec2_client = session.client("ec2")
    response = ec2_client.describe_regions(AllRegions= True)
    return [region["RegionName"] for region in response[" Regions"]]
def get_ec2_instance_info(session, region_name):
    #获取指定区域内的 EC2 实例信息
    ec2_client = session.client("ec2",region_name= region _ name)
    response = ec2_client.describe_instances()
    instance_info_list = []
    for reservation in response[" Reservations"]:
        for instance in reservation[" Instances"]:
            instance_info = {
                "InstanceId": instance.get("InstanceId"),
                "Name": "",
                "Platform": instance.get("Platform", "Linux/Unix"),
                "PrivateIp": instance.get("PrivateIpAddress"),
                "PublicIp": instance.get("PublicIpAddress"),
                "SecurityGroups": [sg["GroupName"] for sg in instance. get("SecurityGroups", [])],
                "Region": region_name
                }
                #获取实例名称(从标签中获取 Name 标签)
                if"Tags" in instance:
                    for tag in instance["Tags"]:
                        if tag["Key"] == "Name":
                            instance_info["Name"] = tag["Value"]
                            break
            instance_info_list.append(instance_info)
    return instance_info_list
def main():
    #多账户配置
    accounts = [{
        "account_id": "<awsID>", 
        "role_name": "<rolename>"
        },
        {
            " account_id": "<awsID>", " role_name": "<rolename>"
        }]
    all_instances = []
    for account in accounts:
        print(f"Fetching EC2 instances for account: {account['account_id']}")
        try:
            #假设角色到第一个区域以获取所有可用区域
            session = assume_role(account["account_id"], account["role_name"], "us-east-1")
            regions = get_all_regions(session)
            for region in regions:
                try:
                    print(f"Fetching EC2 instances for region: {region}")
                    #假设角色并切换区域
                    session = assume_role(account["account_id"], account["role_name"], region)
                    instances = get_ec2_instance_info(session, region)
                    #添加账户标识
                    for instance in instances:
                        instance["AccountId"] = account["account_id"]
                        all_instances.extend(instances)
                except Exception as e:
                    pass
        except Exception as e:
            pass
    #输出所有实例信息
    for instance in all_instances:
        if instance['PublicIp'] == None:
            instance['PublicIp'] = 'Unknown'
        sql= f"""INSERT INTO gcpinfo(project, name, extip, intip, tags, machinetype, cloud) VALUES ({"'" + instance['AccountId'] +  "'"},{"'" + instance['Name'] + "'"},{"'" + instance['PublicIp'] + "'"},{"'" + instance['PrivateIp'] + "'"},{"'" + instance['InstanceType'] + "'"},{"'" + instance['Platform'] +"'"},'aws')"""
        executeMysqlData(1,'write', sql)

aws防火墙信息抓取

基于Python3写的抓取aws防火墙信息

拉取aws防火墙信息 需要调用到基础接口脚本里的数据库读写和boto3库

import boto3
from botocore.exceptions
import ClientError
from BasicInterfaces import executeMysqlData
#获取所有可用区域
def get_all_regions():
    try:
        ec2_client = boto3.client('ec2')
        response = ec2_client.describe_regions()
        regions = [region['RegionName']
        for region in response['Regions']]
        return regions
    except ClientError as e:
        print(f" Error fetching regions: {e}")
        return []
#获取指定区域的安全组
def get_security_groups_in_region(session, region):
    try:
        ec2_ client = session.client('ec2', region_name=region)
        response = ec2_client.describe_security_groups()
        return response['SecurityGroups']
    except ClientError as e:
        print(f" Error describing security groups in region {region}: {e}")
        return []
#遍历所有区域获取安全组
def get_security_groups_by_region(session):
    security_groups_by_region = {}
    regions = get_all_regions()
    for region in regions:
        print(f" Processing region: {region}... ")
        security_groups = get_security_groups_in_region(session, region)
        security_groups_by_region[region] = security_groups
        return security_groups_by_region
#示例:遍历所有账户和区域的安全组
def collect_security_groups_by_account_and_region(target _ accounts):
    account_security_groups = {}
    for role_arn in target_accounts:
        # 从 RoleArn 提取 Account ID
        account_id = role_arn.split(":")[4]
        print(f" Processing account {account _ id}... ")
        #切换到目标账户角色
        session = assume_role(role_arn)
        if not session:
            continue
        else:
            security_groups_by_region = get_security_groups_by_region(session)
            account_security_groups[account_id] = security_groups_by_region
            return account_security_groups
# AssumeRole 函数(参考前面代码)
def assume_role(role_arn, session_name="CrossAccountSession"):
    sts_client = boto3.client('sts')
    try:
        response = sts_client.assume_role(RoleArn= role_arn,RoleSessionName = session_name)
        credentials = response['Credentials']
        session = boto3.session.Session(
            aws_access_key_id= credentials['AccessKeyId'],
            aws_secret_access_key= credentials['SecretAccessKey'],
            aws_session_token= credentials['SessionToken']
            )
        return session
    except ClientError as e:
        print(f" Error assuming role {role _ arn}: {e}")
        return None
#示例目标账户 RoleArn 列表
TARGET_ACCOUNTS = ["arn:aws:iam::<awsID>:role/<跨账号访问的角色名>"," arn:aws:iam::<awsID>:role/<本地账号角色>"]
#获取所有账户和区域的安全组
security_groups_by_account_and_region = collect_security_groups_by_account_and_region(TARGET_ACCOUNTS)
#打印结果
for account_id,regions in security_groups_by_account_and_region.items():
    account = account_id
    for region,security_groups in regions.items():
        regionRecord = region
        for sg in security_groups:
            rulename = sg['GroupName']
            for rule in sg['IpPermissions']:
                if rule['IpProtocol'] != '-1':
                    for i in rule['IpRanges']:
                        if '0.0.0.0/0' in i['CidrIp']:
                            protocol = rule['IpProtocol']
                            fromPort = rule['FromPort']
                            toPort = rule['ToPort']
                            ports = str(fromPort)+ '-' + str(toPort)
                            sql= f"""INSERT INTO gcpfirewall (cloud,project,rulename,action,direction,protocol,ports) VALUES('aws',{"'" + regionRecord + "'"},{"'" + rulename + "'"},'allow',{"'" + account + "'"},{"'" + protocol + "'"},{"'" + ports + "'"})"""
                            executeMysqLData(1,' write', sql)

gcp IAM信息并判断权限是否开大

基于Python3写的抓取gcp IAM信息并判断权限是否开大

抓取gcp IAM信息并判断权限是否开大 需要调用到基础接口脚本里的数据库读写和google cloud库

from google.cloud import resourcemanager_v3
from google.cloud import recommender_v1
from google.oauth2 import service_account
from BasicInterfaces import executeMysqlData
import json,time
credentials = service_account.Credentials.from_service_account_file('<xxx.json>')
resource_manager_client = resourcemanager_v3.ProjectsClient(credentials= credentials)
def list_iam_recommendations(project_id, location="global", recommender_id="google.iam.policy.Recommender"):
    """获取 IAM 权限优化建议"""
    parent = f"projects/{project_id}/locations/{location}/recommenders/{recommender_id}"
    #获取建议
    recommender_client = recommender_v1.RecommenderClient(credentials= credentials)
    recommendations = recommender_client.list_recommendations(parent= parent)
    userDict = {}
    for recommenders in recommendations:
        resourcesDict = dict(recommenders.content.overview)
        user = resourcesDict['member'].split(':')[-1]
        addRoles = []
        for i in resourcesDict['addedRoles']:
            addRoles.append(i.split('/')[-1])
        userDict[user] = {'remove':resourcesDict['removedRole'].split('/')[-1],'add':addRoles}
    return userDict
def get_project_iam_policy(project_id, data={}):
    """获取项目的 IAM 策略"""
    project_name = f"projects/{project_id}"
    policy = resource_manager_client.get_iam_policy(resource= project_name)
    for binding in policy.bindings:
        for i in binding.members:
            if i not in data:
                account = i.split(':')[-1].split('?')[θ]
                data[account] = {}data[account][" unknown"] = []data[account][" unknown"]. append(str(binding. role). split('/')[-1])return dataproject _ ids = {
                    "<gcp显示项目名>":" <gcp实际项目名>",
                    }
for project in project_ids:
    try:
        iamRecommenderDict = list_iam_recommendations(project_ids[project])
        iamDict = get_project_iam_policy(project_ids[project])
        for i in iamDict:
            if i not in iamRecommenderDict:
                iamRecommenderDict[i] = iamDict[i]
        for account in iamRecommenderDict:
            for i in iamRecommenderDict[account]:
                if i == 'unknown':
                    sql= f"""INSERT INTO gcpIamCheck (project, account,rules,action) VALUES ({"'" + project  + "'"},{"'" + account + "'"},{"'" + ",".join(iamRecommenderDict[account][i])+ "'"},{"'" + i + "'"})"""
                    executeMysqlData(1,' write', sql)
                elif i == 'remove':
                    sql= f"""INSERT INTO gcpIamCheck (project, account, recommend, action) VALUES ({"'" + project + "'"},{"'" + account + "'"},{"'" + iamRecommenderDict[account][i] + "'"},{"'"+ i + "'"})"""
                    executeMysqlData(1,' write', sql)
                elif i == ' add':
                    if len(iamRecommenderDict[account][i]) != θ:
                        sql= f"""INSERT INTO gcpIamCheck (project, account, recommend, action) VALUES ({"'" + project + "'"},{"'" + account + "'"},{"'" + iamRecommenderDict[account][i] + "'"},{"'" + i + "'"})"""
                        executeMysqlData(1,' write', sql)
        iamDict = {}
        iamRecommenderDict={}
    except:
        sql= f"""INSERT INTO gcpIamCheck (project, account, action) VALUES ({"'" + project + "'"},'unknown','unknown')"""executeMysqlData(1,' write', sql)