aws防火墙信息抓取

基于Python3写的抓取aws防火墙信息

拉取aws防火墙信息 需要调用到基础接口脚本里的数据库读写和boto3库

import boto3
from botocore.exceptions
import ClientError
from BasicInterfaces import executeMysqlData
#获取所有可用区域
def get_all_regions():
    try:
        ec2_client = boto3.client('ec2')
        response = ec2_client.describe_regions()
        regions = [region['RegionName']
        for region in response['Regions']]
        return regions
    except ClientError as e:
        print(f" Error fetching regions: {e}")
        return []
#获取指定区域的安全组
def get_security_groups_in_region(session, region):
    try:
        ec2_ client = session.client('ec2', region_name=region)
        response = ec2_client.describe_security_groups()
        return response['SecurityGroups']
    except ClientError as e:
        print(f" Error describing security groups in region {region}: {e}")
        return []
#遍历所有区域获取安全组
def get_security_groups_by_region(session):
    security_groups_by_region = {}
    regions = get_all_regions()
    for region in regions:
        print(f" Processing region: {region}... ")
        security_groups = get_security_groups_in_region(session, region)
        security_groups_by_region[region] = security_groups
        return security_groups_by_region
#示例:遍历所有账户和区域的安全组
def collect_security_groups_by_account_and_region(target _ accounts):
    account_security_groups = {}
    for role_arn in target_accounts:
        # 从 RoleArn 提取 Account ID
        account_id = role_arn.split(":")[4]
        print(f" Processing account {account _ id}... ")
        #切换到目标账户角色
        session = assume_role(role_arn)
        if not session:
            continue
        else:
            security_groups_by_region = get_security_groups_by_region(session)
            account_security_groups[account_id] = security_groups_by_region
            return account_security_groups
# AssumeRole 函数(参考前面代码)
def assume_role(role_arn, session_name="CrossAccountSession"):
    sts_client = boto3.client('sts')
    try:
        response = sts_client.assume_role(RoleArn= role_arn,RoleSessionName = session_name)
        credentials = response['Credentials']
        session = boto3.session.Session(
            aws_access_key_id= credentials['AccessKeyId'],
            aws_secret_access_key= credentials['SecretAccessKey'],
            aws_session_token= credentials['SessionToken']
            )
        return session
    except ClientError as e:
        print(f" Error assuming role {role _ arn}: {e}")
        return None
#示例目标账户 RoleArn 列表
TARGET_ACCOUNTS = ["arn:aws:iam::<awsID>:role/<跨账号访问的角色名>"," arn:aws:iam::<awsID>:role/<本地账号角色>"]
#获取所有账户和区域的安全组
security_groups_by_account_and_region = collect_security_groups_by_account_and_region(TARGET_ACCOUNTS)
#打印结果
for account_id,regions in security_groups_by_account_and_region.items():
    account = account_id
    for region,security_groups in regions.items():
        regionRecord = region
        for sg in security_groups:
            rulename = sg['GroupName']
            for rule in sg['IpPermissions']:
                if rule['IpProtocol'] != '-1':
                    for i in rule['IpRanges']:
                        if '0.0.0.0/0' in i['CidrIp']:
                            protocol = rule['IpProtocol']
                            fromPort = rule['FromPort']
                            toPort = rule['ToPort']
                            ports = str(fromPort)+ '-' + str(toPort)
                            sql= f"""INSERT INTO gcpfirewall (cloud,project,rulename,action,direction,protocol,ports) VALUES('aws',{"'" + regionRecord + "'"},{"'" + rulename + "'"},'allow',{"'" + account + "'"},{"'" + protocol + "'"},{"'" + ports + "'"})"""
                            executeMysqLData(1,' write', sql)

gcp IAM信息并判断权限是否开大

基于Python3写的抓取gcp IAM信息并判断权限是否开大

抓取gcp IAM信息并判断权限是否开大 需要调用到基础接口脚本里的数据库读写和google cloud库

from google.cloud import resourcemanager_v3
from google.cloud import recommender_v1
from google.oauth2 import service_account
from BasicInterfaces import executeMysqlData
import json,time
credentials = service_account.Credentials.from_service_account_file('<xxx.json>')
resource_manager_client = resourcemanager_v3.ProjectsClient(credentials= credentials)
def list_iam_recommendations(project_id, location="global", recommender_id="google.iam.policy.Recommender"):
    """获取 IAM 权限优化建议"""
    parent = f"projects/{project_id}/locations/{location}/recommenders/{recommender_id}"
    #获取建议
    recommender_client = recommender_v1.RecommenderClient(credentials= credentials)
    recommendations = recommender_client.list_recommendations(parent= parent)
    userDict = {}
    for recommenders in recommendations:
        resourcesDict = dict(recommenders.content.overview)
        user = resourcesDict['member'].split(':')[-1]
        addRoles = []
        for i in resourcesDict['addedRoles']:
            addRoles.append(i.split('/')[-1])
        userDict[user] = {'remove':resourcesDict['removedRole'].split('/')[-1],'add':addRoles}
    return userDict
def get_project_iam_policy(project_id, data={}):
    """获取项目的 IAM 策略"""
    project_name = f"projects/{project_id}"
    policy = resource_manager_client.get_iam_policy(resource= project_name)
    for binding in policy.bindings:
        for i in binding.members:
            if i not in data:
                account = i.split(':')[-1].split('?')[θ]
                data[account] = {}data[account][" unknown"] = []data[account][" unknown"]. append(str(binding. role). split('/')[-1])return dataproject _ ids = {
                    "<gcp显示项目名>":" <gcp实际项目名>",
                    }
for project in project_ids:
    try:
        iamRecommenderDict = list_iam_recommendations(project_ids[project])
        iamDict = get_project_iam_policy(project_ids[project])
        for i in iamDict:
            if i not in iamRecommenderDict:
                iamRecommenderDict[i] = iamDict[i]
        for account in iamRecommenderDict:
            for i in iamRecommenderDict[account]:
                if i == 'unknown':
                    sql= f"""INSERT INTO gcpIamCheck (project, account,rules,action) VALUES ({"'" + project  + "'"},{"'" + account + "'"},{"'" + ",".join(iamRecommenderDict[account][i])+ "'"},{"'" + i + "'"})"""
                    executeMysqlData(1,' write', sql)
                elif i == 'remove':
                    sql= f"""INSERT INTO gcpIamCheck (project, account, recommend, action) VALUES ({"'" + project + "'"},{"'" + account + "'"},{"'" + iamRecommenderDict[account][i] + "'"},{"'"+ i + "'"})"""
                    executeMysqlData(1,' write', sql)
                elif i == ' add':
                    if len(iamRecommenderDict[account][i]) != θ:
                        sql= f"""INSERT INTO gcpIamCheck (project, account, recommend, action) VALUES ({"'" + project + "'"},{"'" + account + "'"},{"'" + iamRecommenderDict[account][i] + "'"},{"'" + i + "'"})"""
                        executeMysqlData(1,' write', sql)
        iamDict = {}
        iamRecommenderDict={}
    except:
        sql= f"""INSERT INTO gcpIamCheck (project, account, action) VALUES ({"'" + project + "'"},'unknown','unknown')"""executeMysqlData(1,' write', sql)

gcp存储桶使用情况

基于Python3写的读取gcp存储桶使用情况

读取gcp存储桶使用情况 需要调用到基础接口脚本里的数据库读写和google cloud库

from google.cloud import storage
from google.oauth2 import service_account
#替换为服务账号的密钥文件路径
SERVICE_ACCOUNT_FILE = "<xxx.json>"
#初始化客户端时使用服务账号凭据
credentials = service_account.Credentials.from_service_account_file(SERVICE_ACCOUNT_FILE)
storage_client = storage.Client(credentials= credentials)
def list_buckets_and_objects():
    """列出所有存储桶及其对象"""
    buckets = storage_client.list_buckets()
    print(f"{'Bucket Name':<30}{'Total Size (GB)':>20}")
    print("-" * 50)
    for bucket in buckets:
        total_size = 0
        blobs = storage_client.list_blobs(bucket.name)
        for blob in blobs:
            total_size += blob.size
        total_size_gb = total_size / (1024 ** 3)
        print(f"{bucket.name:<30}{total_size_gb:>20.2f}")
#执行函数
list_buckets_and_objects()

gcp虚拟机信息抓取

基于Python3写的抓取gcp虚拟机信息

拉取gcp虚拟机信息 需要调用到基础接口脚本里的数据库读写和google.auth库

from google.auth import compute_engine
from google.oauth2 import service_account
from googleapiclient.discovery import build
from google.cloud import resourcemanager_v3
from BasicInterfaces import executeMysqlData
import json,requests,time
'''拉取 gcp虚拟机信息'''
#没加入拉取的项目写在这条注释后面
serviceAccount = '<xxx.json>'
credentials = service_account.Credentials.from_service_account_file(serviceAccount)
compute = build('compute', 'v1',credentials= credentials)
project_ids = {"显示名称":"实际项目名称"}
for project_id in project_ids:
    request = compute.instances().aggregatedList(project= project_ids[project_id])
    # print (request)
    while request is not None:
        try:
            response = request.execute()
        except:
            print (f" get project {project _ id} information failed! skip... ")
            break
        for name, instances_scoped_list in response['items'].items():
            for instance in instances_scoped_list.get('instances',[]):
                hostname = instance['name']
                if 'canIpForward' in instance:
                    #是否有nat
                    if instance['canIpForward'] == 'false':
                        forward = θ
                    else:
                        forward = 1
                else:
                    forward = θ
                zone = instance['zone'].split(ˈ/ˈ)[-1]
                for networkInfo in instance[ˈnetworkInterfacesˈ]:
                    if 'accessConfigs' in networkInfo:
                        #查询是否有公网ip
                        for accessConfigInfo in networkInfo['accessConfigs']:
                            if 'natIP' in accessConfigInfo:
                                extip = accessConfigInfo['natIP']
                            else:
                                extip = 'None'
                    else:
                        extip = 'None'
                    intip = networkInfo['networkIP']
                if 'labels' in instance:
                    labels = json. dumps(str(instance[' labels']).replace("'",'"'))
                else:
                    labels = """+ json. dumps({})+ """
                if 'items' in instance['tags']:
                    tagList = instance['tags']['items']
                    tag = '.'. join(tagList)
                else:
                    tag = ' None'
                id = instance['id']
                machineType = instance['machineType'].split('/')[-1]
                # print (f' project:{project _ id}, name:{hostname}, forward:{forward}, zone:{zone}, extip:{extip},intip:{intip}, labels:{labels}, tags={tag}, id={id}')
                sqlForCheckDup = f""" select * from gcpinfo where project={"'" + project_id +"'"} and name={"'" + hostname + "'"}"""
                result = executeMysqlData(1,'read',sqlForCheckDup)
                if len(result) == θ:
                    print (f'插入新数据{extip}')
                    sql= f"""insert into gcpinfo(project, name, extip, intip, forward, label, tags, machinetype,cloud) VALUES({"'" + project_id + "'"},{"'" + hostname + "'"},{"'" + extip + "'"},{"'"+ intip + "'"},{forward},{labels},{"'" + tag + "'"},{"'" + machineType + "'"},'gcp')"""
                    # print (sql)
                    executeMysqlData(1,' write', sql)
                else:
                    print (f'更新新数据{extip}:{labels}')
                    sqlForUpdate = f""" update gcpinfo set extip={"'" + extip + "'"},intip={"'" + intip+ "'"},label={labels},tags={"'" + tags + "'"},machineType={"'" + machineType + "'"} WHERE project = {"'" + project_id + "'"} and name={"'" + hostname + "'"}"""
                    executeMysqlData(1,'write',sqlForUpdate)
        request = compute.instances().aggregatedList_next(previous_request=request,previous_response=response)

blog-site

Read full post gblog_arrow_right

gcp防火墙信息抓取

基于Python3写的抓取gcp防火墙信息

拉取gcp防火墙信息 需要调用到基础接口脚本里的数据库读写和boto3库

from google.cloud import compute _v1
from google.oauth2 import service_account
from BasicInterfaces import executeMysqlData
import time
def queryFirewallRule(credentials,projectId):
    dictFirewallDetails = {}
    client = compute_v1.FirewallsClient(credentials= credentials)
    try:
        firewallRuleList = client. list(project=projectId)
    except:
        print (f'发现没有权限读取配置的 project')
        dictFirewallDetails = {"projectid":{"rights":"no"}}
        return dictFirewallDetails
    portList = []
    for firewallName in firewallRuleList:
        if "0.0.0.0" in firewallName.source_ranges or "0.0.0.0/0" in firewallName.source_ranges:
            print (f'发现存在any的规则!规则名为{firewallName.name}')
            if len(firewallName.allowed)!= 0:
                action = 'allow'
                for context in firewallName.allowed:
                    for port in context.ports:
                        portList.append(port)
                        protocol = context.I_p_protocol
            elif len(firewallName. denied) != 0:
                action = 'deny'
                for context in firewallName.denied:
                    for port in context.ports:
                        portList.append(port)
                        protocol = context.I_p_protocol
            portStr = ','.join(portList)
            dictFirewallDetails[firewallName.name] = {
                "direction":firewallName.direction, 
                "action": action,
                "port":portStr,
                "protocol": protocol
                }
            portList = []
    return dictFirewallDetails
project_ids = {"<项目显示名称>":"<项目实际名称>"}
credentials = service_account.Credentials.from_service_account_file('<xxxx.json>')
for i in project_ids:
    print (f'开始扫描 project:{i}')
    result = queryFirewallRule(credentials, project_ids[i])
    for ruleName in result:
        if 'rights' not in result[ruleName]:
            sql= f"""INSERT INTO gcpfirewall (project, rulename, action, direction, protocol, ports, cloud) VALUES ({"'" + i + "'"},{"'" + ruleName + "'"},{"'" + result[ruleName]['action'] + "'"},{"'" +result[ruleName]['port'][0:280] + "'"},'GCP')"""
        else:
            sql= f"""INSERT INTO gcpfirewall (project, rulename, cloud) VALUES({"'" + i + "'"},'unknown','GCP')"""
        executeMysqlData(1,'write', sql)

hfish告警脚本

基于Python3写的hfish告警

由于原告警是excel文件不方便阅读,故写了个比较直观的脚本,并且当时环境是部署在gcp上的,故联动gcp接口调用防火墙黑名单 需要调用到基础接口脚本里的数据库读写和google.cloud库

from requests.packages import urllib3
import time,re,json,logging
from google.cloud import compute _v1
from google. oauth2 import service_account
from BasicInterfaces import executeMysqlData
from datetime import datetime,timedelta
def listFirewallRule(credentials,projectId):
    client = compute _v1.FirewallsClient(credentials= credentials)
    firewallRuleList = client.list(project=projectId)
    query = "blocklist"
    dataJson = {}
    ipList = []
    for firewallName in firewallRuleList:
        if query in firewallName.name:
            dataJson[firewallName.name] = len(firewallName.source_ranges)
            ipList. extend(firewallName.source_ranges)
            return dataJson,ipList
            
def addSourceIPToFirewallRule(credentials,projectId, firewallRuleName,ipaddressForAdd):
    client = compute_v1.FirewallsClient(credentials=credentials)
    firewall = client.get(project=projectId, firewall=firewallRuleName)
    firewall.source_ranges.append(ipaddressForAdd)
    try:
        operation = client.update(project=projectId, firewall=firewallRuleName,firewall_resource=firewall)
        operation.result()
    except Exception as e:
        return str(e).split()[-1]
def removeSourceIPToFirewallRule(credentials,projectId, firewallRuleName,ipaddressForAdd):
    client = compute _v1.FirewallsClient(credentials= credentials)
    firewall = client.get(project=projectId, firewall=firewallRuleName)
    firewall.source_ranges.remove(ipaddressForAdd)
    operation = client.update(project=projectId, firewall=firewallRuleName, firewall_resource= firewall)
    operation.result()
def newFirewallPolicy(credentials,projectId, firewallRuleName):
    client = compute_v1.FirewallsClient(credentials= credentials)
    firewall = {
        "denied": [{"I_p_protocol": "all"}],
        "direction": "INGRESS",
        "name": firewallRuleName,
        "network": "global/networks/ default",
        "priority": 999,
        "source_ranges": ["224.1.1.132/32"]
        }
    try:
        client.insert(project=projectId, firewall_resource = firewall)
        return True
    except Exception as e:
        return (str(e).split()[-2] + " " + str(e).split()[-1])
def isPublicIpv4(ip):
    pattern = r'^(?:(?:(?:[1-9]?\d|1\d\d|2[θ-4]\d|25[θ-5])\.){3}(?:[1-9]?\d|1\d\d|2[θ-4]\d|25[θ-5]))$'
    private_pattern = r'^(?:(?:10|127)\.\d{1,3}\.\d{1,3}\.\d{1,3}|(?:172\.(?:1[6-9]|2\d|3[01]))\.\d{1,3}\.\d{1,3}|192\.168\.\d{1,3}\
    return re.match(pattern, ip) is not None and re.match(private_pattern,ip) is None
def makeMaiLContext(data):
    table _ html = """
    <table style="border-collapse: collapse;">
        <tr bgcolor="aqua">
            <th style=" border: 1px solid black; padding: 8px;">扫描类型</th>
            <th style=" border: 1px solid black; padding: 8px;">来源IP</th>
            <th style=" border: 1px solid black; padding: 8px;">来源MAC</th>
            <th style=" border: 1px solid black; padding: 8px;">来源区域</th>
            <th style=" border: 1px solid black; padding: 8px;">目标IP</th>
            <th style=" border: 1px solid black; padding: 8px;">目标端口</th>
            <th style=" border: 1px solid black; padding: 8px;">扫描次数</th>
            <th style=" border: 1px solid black; padding: 8px;">开始时间</th>
            <th style=" border: 1px solid black; padding: 8px;">持续时间</th>
            <th style=" border: 1px solid black; padding: 8px;">攻击详情</th>
        </tr>
    """
    ports = ",".join(filter(None, data[3].split(",")))
    table _ html += f"""
    <tr>
        <td style=" border: 1px solid black; padding: 8px;">{data[6]}</td>
        <td style=" border: 1px solid black; padding: 8px;">{data[θ]}</td>
        <td style=" border: 1px solid black; padding: 8px;">{data[8]}</td>
        <td style=" border: 1px solid black; padding: 8px;">{data[1]}</td>
        <td style=" border: 1px solid black; padding: 8px;">{data[2]}</td>
        <td style=" border: 1px solid black; padding: 8px;">{ports}</td>
        <td style=" border: 1px solid black; padding: 8px;">{data[4]}</td>
        <td style=" border: 1px solid black; padding: 8px;">{data[7]}</td>
        <td style=" border: 1px solid black; padding: 8px;">{data[5].total_seconds()}</td>
        <td style=" border: 1px solid black; padding: 8px;">{data[9]}</td>
    </tr>
    """
    table_html += "</table>"
    return (table_html)
def beijing(sec, what):
    beijing_time = datetime.now()+ timedelta(hours=8)
    return beijing_time.timetuple()
"""蜜罐收集黑名单"""
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
logger = logging.getLogger()
logging.Formatter.converter = beijingprojectCSTId = "xxxx"
# Google Cloud项目ID
credentialsCST = service_account.Credentials.from_service_account_file('dddd.json')projectIdList = [{'projectId':projectCSTId,' credentials':credentialsCST}]
#遍历 project列表
while 1:
    logger.info('开始新一轮扫描')
    for projectInfo in projectIdList:
        firewallInfo,sourceRange = listFirewallRule(project Info['credentials'],projectInfo['projectId'])[θ],listFirewallRule(projectInfo['credentials'],projectInfo['projectId'])[1]
        for firewallName in firewallInfo:
            if firewallInfo[firewallName] < 5000:
                break
            queryScanData = "SELECT source_ip,source_ip_country,dest_ip,GROUP_CONCAT(DISTINCT dest_port),count(*) AS ipCount,TIMEDIFF(MAX(update_time),MIN(create_time)),GROUP_CONCAT(DISTINCT scan_type),create_time,source_mac,detail FROM scanners WHERE ifAlreadysent=0 GROUP BY source_ip"
            scanpata = executeMysqivata(1,' read', queryscanData)
            logger.info  ("开始扫描数据库,看是否有新的ip")
            if len(scanData) == 0:
                logger.info  ('本次扫描没有发现新的ip')
            else:
                for j in scanData:
                    logger.info (j[0])
                    if isPublicIpv4(j[0]) == True:
                        ###这边判断是否是自己的ip
                        if j[0] != 'x.x.x.x':
                            dataQuery = f"""select * from sync_ip_info where ip={"'" + j[0] + "'"}"""
                            queryDupIp = executeMysqlData(1,'read',dataQuery)
                            if len(queryDupIp) == 0:
                                resultAddIP = addSourceIPToFirewallRule(projectInfo['credentials'],projectInfo['projectId'],firewallName,j[0])
                                if resultAddIP == '5001.':
                                    logger.info  (f'防火墙规则已满,尝试创建新的规则')
                                    for i in range (1,1000):
                                        firewallRuleName = "blocklist" + str(i)
                                        resultAddPolicy = newFirewallPolicy(credentialsCST,projectCSTId,firewallRuleName)
                                        if resultAddPolicy == 'already exists':
                                            logger.info(f'防火墙{firewallRuleName}已经存在,尝试下一个。。。')
                                            i += 1
                                            else:logger.info  (f'防火墙{firewallRuleName}创建成功!')
                                            addSourceIPToFirewallRule(projectInfo['credentials'],projectInfo['projectId'],firewallName,j[0])
                                            break
                                        else:
                                            logger.info('防火墙规则写入成功')
                                            dataInsert = f"""insert into sync_ip_info(ip, sync) VALUES ({"'"+ j[0] + "'"},'honeypotserver')"""
                                            executeMysqlData(1,'write',dataInsert)
                                            logger.info("数据写入成功")
                                else:
                                    logger.info(f'该ip{j[0]}已经写入数据库!')
                            else:logger.info  (f'ip{j[0]}是公司IP,请注意检查!')
        time.sleep(60)

nmap扫描

基于Python3写的nmap扫描接口,仅用作代理

基于Python3写的nmap扫描接口,仅用作代理

import nmap,json, requests, os, time, re, socket
from flask import Flask, request, jsonify, send_file
from typing import Dict
from BasicInterfaces import executeMysqlData
from PyNuclei import Nuclei
'''http接口供 nmap和 gcp日志下载使用'''
nm = nmap.PortScanner()
nucleiScanner = Nuclei()
app = Flask(__name__)
'''nmap扫描功能'''
@app.route('/', methods=['POST'])
def json_post():
    #使用 flask开启 http服务, 用来接收 text格式的 ip地址
    ip = str(request.data, encoding='utf-8')
    dataJson = {}
    dataJson[ip] = {}
    print ('正在扫描%s' % ip)
    #开始调用nmap进行扫描,超时时间1200秒,1200秒后强制结束扫描进程
    result= nm.scan(ip, arguments='-T5 - n - sT - Pn - p-', timeout = 1200)
    dataJson[ip]['portstatus']={}
    try:
        if result['nmap']['scanstats']['uphosts'] != 'θ':
            for j in result['scan'][ip]['tcp']:
                if result['scan'][ip]['tcp'][j]['state'] == 'open':
                    print (f'发现开放端口{j},正在详细扫描该端口。。。')
                    doubleScan = nm.scan(ip, arguments=f'-T5 - n - sV - Pn - p {j}', timeout = 1200)
                    print (f'端口扫描结果为{doubleScan}')
                    resultDict = {}
                    resultDict['service'] = doubleScan['scan'][ip]['tcp'][j]['name']
                    resultDict['application'] = doubleScan['scan'][ip]['tcp'][j]['product']
                    dataJson[ip]['portstatus'][j] = resultDict
                    return jsonify(dataJson)
                else:
                    return 'False'
    except:
        return 'False'
'''域名扫描功能'''
@app.route('/scan/<domain>')
def scan(domain: str):
    result = {}
    headers = {"mobile":{"User-Agent": "Mozilla/5.θ (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/6θ4.1"}, " pc": {" User-Agent":" Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36(KHTML, like Gecko) Chrome/78.0.3984.108 Safari/537.36"}}
    for i in headers:
        scanResult = nucleiScanner.scan(
            host = f"{domain}",
            userAgent= headers[i],
            rateLimit=150,
            verbose= False,
            metrics= False,
            maxHostError=3θ,
            stopAfter= None
            )
        result[i]=scanResult
    return result
''' cve查询功能'''
@app.route('/versioncheck/< domainname>')
def versioncheck(domainname):
    dataJson = {}
    dataJson[domainname] = {}
    try:
        dataJson[domainname]['ip'] = socket.gethostbyname(domainname)
    except:
        dataJson[domainname]['ip'] = 'unknown'
    url= f"https://{domainname}/version.txt"
    headers = {"mobile":{"User-Agent": "Mozilla/5.θ (iPhone; CPU iPhone OS 13_2_3 like Mac OS X)AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/684.1"}, "pc":  {"User-Agent":" Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36(KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36"}}
    for i in headers:
        try:
            response = requests.get(url, headers= headers[i])
        except:
            pass
        dataJson[domainname][i]={}
        if 'response' in locals():
            if response.status_code == 200:
                if '<' not in response.text:
                    response = response.text
                    filteredText = "\n". join([line for line in response.splitlines() if"committer" not in  line])
                    matches = re.findall(r":\s*(.+)", filteredText)
                    try:
                        dataJson[domainname][i]['project'] = matches[θ]
                        dataJson[domainname][i]['version'] = matches[1]
                        dataJson[domainname][i]['buildtime'] = matches[2]
                    except:
                        dataJson[domainname][i]['project'] = 'unknown'
                        dataJson[domainname][i]['version'] = response
                        dataJson[domainname][i]['buildtime'] = 'unknown'
                    dataJson[domainname][i]['origin'] = response
    return jsonify(dataJson)
@app.route('/cve', methods=['POST'])
def cveQuery():
    data = request.get_json()
    software = data['software']
    version = data['version']
    startDateFormat = data['starttime']
    endDateFormat = data['endtime']
    urlParameter = data['cpe']
    url= f"https://services.nvd.nist.gov/rest/json/cves/2.0?cpeName={urlParameter}&lastModStartDate={startDateFormat}&lastModEndDate={endDateFormat}"
    headers = {'apiKey': '92127066-eb5f-478a-8d92-4978b3f1f9a2'.}
    n = 0
    while 1:
        response = requests.get(url, headers= headers)
        if response. status_code != 484 and response.status_code != 583:
            break
        else:
            if n <4: 
                print (f'获取失败, 状态码为{response.status_code}')
                time.sleep(5)
                n += 1
            else:
                return 'no result of that software!',404
    response = json.loads(response. text)
    resultJson = {}
    resultJson['software'] = software
    resultJson['version'] = version
    cveList = []
    for i in response['vulnerabilities']:
        try:
            severity = i['cve']['metrics']['cvssMetricV31'][θ]['cvssData']['baseSeverity']
        except:
            severity = i['cve']['metrics']['cvssMetricV2'][0]['baseSeverity']
        if severity == 'HIGH' or severity == "CRITICAL":
            print ('catch high vuln!')
            cveID = i['cve']['id']
            cveList.append(cveID)
    resultJson['cve'] = cveList
    return jsonify(resultJson)
'''gcp日志下载功能的一部分,用来获取所有日志文件的路径'''
def get_all_files(directory):
    file_paths = []
    for root, dirs, files in os.walk(directory):
        for file in files:
            file_path = os.path.relpath(os.path.join(root,file), "directory")
            file_paths.append(file_path.replace("\\", "/"))
    return file_paths'''gcp日志下载功能的一部分,获取所有日志文件'''
@app.route('/gcp')
def list_files():
    files = get_all_files("/var/log/gcplogs") + get_all_files("/var/log/AWSLogs")
    return jsonify(files)
@app.route('/repos/<path:filename>')
@app.route('/conf/<path:filename>')
@app.route('/downloadgcp/<path:filename>')
def download_file(filename):
    #获取请求的路径,判断是来自 clients 还是 conf
    if request.path.startswith('/repos/'):
        base_dir = '/var/log/repos'
    elif request.path.startswith('/conf/'):
        base_dir = '/var/log/conf'
    elif request.path.startswith('/downloadgcp/'):
        base_dir = '/var/log'
    full_path = os.path.join(base_dir, filename)
    #验证文件是否存在且是否为文件
    if os.path.exists(full_path) and os.path.isfile(full_path):
        return send_file(full_path,as_attachment= True)
    else:
        return "File not found", 404
@app.route('/query', methods=['GET'])
def query_data():
    data = {}
    if 'filter' in request.url:
        condition0fQuery = request.args.get('filter')
        condition0fQuery.split('=')[θ]
        condition0fQuery.split('=')[1]
        if conditionOfQuery:
            sql= "select name, extip, intip from gcpinfo Where " + condition0fQuery. split('=')[θ] + '=' + "'" + conditionOfQuery.split('=')[1] + "'"
            for record in executeMysqlData(4,' read', sql):
                data[record[θ]] = {"extip": record[1], "intip": record[2]}
            return jsonify(data)
        else:
            return jsonify(message=' Please provide filter criteria'), 400
    else:
        sql= " select name, extip, intip from gcpinfo"
        for record in executeMysqlData(4,' read', sql):
            data[record[θ]] = {"extip": record[1], "intip": record[2]}
        return jsonify(data)
app.run(host='10.170.0.2', port=1080)

基础接口脚本

基于Python3写的基础接口脚本

主要的功能是读写mysql数据库,读取ELK和Wazuh,发送邮件,后续会增加更多功能,至于import的库,可以自行使用pip下载

from requests import urllib3
from requests.auth import HTTPBasicAuth
import json, requests, mysql.connector, re, os,smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMETextfrom base64
import b64encode
def executeMysqlData(serverID, oper= None,dataSQL= None):
    if serverID == 1:
        mydb = mysql.onnector.connect(
            host="x.x.x.x",
            user="x.x.x.x",
            password="x.x.x.x",
            database="x.x.x.x",
            port=3306)
    elif serverID == 2 :
        mydb = mysql.connector.connect(
            host="y.y.y.y",
            user=" yyyy",
            password=" yyyy",
            database=" yyyy",
            port=3306)
    mycursor = mydb. cursor()
    if oper.upper() == 'WRITE' and dataSQL.split()[θ].upper() != 'SELECT':
        mycursor.execute(dataSQL)
        mydb.commit()
        mycursor.close()
        mydb.close()
        if mycursor.rowcount > θ:
            return True
        else:
            return False
    elif oper.upper() == 'READ' and dataSQL.split()[0].upper() != 'INSERT' and dataSQL.split()[θ].upper() != 'UPDATE':
        mycursor. execute(dataSQL)
        resultList = []
        for i in mycursor:
            resultList. append(i)
        mycursor.close()
        mydb.close()
        return resultList
    else:
        return ' unexpected request! check and try again!'
        #定义 mcafee查询 ip和机器名信息,允许传递 ip地址或者机器名或者FQDN名,返回为 json数据
        def apiFromMcafee(actionId=1, input= None):
            urllib3.disable.warnings()
            username = 'xxx'
            password = 'yyy'
            if actionId == 1:
                url= f'https://xxxx:8443/remote/system.find?searchText={input}&:output=json'
                curl_command=f'curl -u{'"'+username+'"'}:{'"'+password+'"'} -s -k "{url}"'
                data = os.popen(curl_command).read()
                patternIP = re.compile(r'10.((25[θ-5]|2[0-4]\d|1\d{2}|[1-9]?\d)\.){2}')
                record={}
                if bool(re. match(patternIP, input)) == True:
                    if len(data) > 20:
                        if data[θ:2] == 'OK':
                            jsonStr = json. loads(data[5:])
                            for i in range(0, len(jsonStr)):
                                ip = jsonStr[i]['EPOComputerProperties. IPAddress']
                                subnetSearch = "'" + patternIP. search(ip)[0] + '0/24' + "'"
                                querySubnetInfo = f"SELECT name from IPStatus WHERE subnet = {subnetSearch}" 
                                if input == ip:
                                    #这些只是为了看着好看,所以未知的统一改成 Unknown
                                    if jsonStr[i]['EPOComputerProperties. UserName'] == 'N/A' or jsonStr[i]['EPOComputerProperties.Username'] == 'None':
                                        user = ' Unknown'
                                    else:
                                        user = jsonStr[i]['EPOComputerProperties.UserName']
                                    if jsonStr[i]['EPOComputerProperties. IPHostName'] == 'N/A' or jsonStr[i]['EPOComputerProperties.IPHostName'] == 'None':
                                        hostname = ' Unknown'
                                    else:
                                        hostname = jsonStr[i]['EPOComputerProperties.IPHostName']
                                    queryDepartment = executeMysqlData(1,'read',querySubnetInfo)
                                    if len(queryDepartment)== 0:
                                        departmentName = 'Unknown'
                                    else:
                                        departmentName = queryDepartment[0][0]
                                    record = {
                                        'ip':ip,
                                        'username':user,
                                        'department':departmentName,
                                        'computerName':hostname
                                        }
                                    break
                                else:
                                    record = False
                        else:
                            record = False
                else:
                    record = False
            elif actionId == 2:
                url = f'https://xxxx:8443/remote/core.executeQuery?queryId={input}&:output=json'
                curl_command = f'curl -u {'"' + username + '"'}:{'"' + password + '"'} -s -k"{url}"'
                response = os.popen(curl_command).read()
            return response
        #定义发送邮件接口的方法, subject为标题, data为内容, sendTo为收件人, type为邮件类型, 默认为 html, 如乘是纯文本, 则写 plain
        def sendMail(subject, data,sendTo='xxxx@xxxx.xxx',type='html'):
            msg = MIMEMultipart('alternative')
            msg['Subject'] = subject
            msg['From'] = 'xxxx@xxxx.xxx'
            msg['To'] = sendTo
            text_part = MIMEText(data, type)
            msg.attach(text_part)
            #此处填写邮件服务器接口
            with smtplib.SMTP('xxxx',25) as smtp:
                try:
                    smtp.send_message(msg)
                    return True
                except:
                    return False
        #定义读取ELK的方法, serverId为区分不同的ELK集群,indexData为要搜索的索引目录, bodyData为要搜索的条件, 格式为json,返回json格式的字符串
        def readELK(serverId=2,indexDatα=None,bodyDatα=None,esVersion=8):
            #xxxx
            if serverId == 1:
                apiKey = 'aaaa'
                url = '  https://ssss:9200'
            #wazuh的ELK
            elif serverId == 3:
                username = 'wazuh'
                password = 'wazuh'
                url = 'https://aaaa:9200'
            if esVersion == 8:
                headers = {" Authorization": "ApiKey" + apiKey}
                resultJson = json.loads((requests. get(url + "/" + indexData + "/_search", headers= headers, json=bodyData,verify=False)).text)
            elif esVersion == 7:
                resultJson = json.loads((requests. get(url + "/" + indexData + "/_search", auth=HTTPBasicAuth(username,password),json=bodyData,verify=False)).text)
            print ("读取ELK数据完毕")
            return resultJson
        #定义读写jira的api的方法,第一个为url,也就是条件,第二个为jsonPost,如果要创建case则使用jsonPost,如果只是get的话第二项则不用写
        def apiFromJira(url,jsonPost= None):
            usernameForJira = passwordForJira = 'xxxx'
            urllib3. disable _ warnings()
            if jsonPost is None:
                response = requests. get(url, verify= False, auth=(usernameForJira, passwordForJira))
            else:
                if url[-8:] == 'assignee':
                    response = requests.put(url,json=jsonPost,verify=False,auth=(usernameForJira, passwordForJira))
                else:
                    response = requests.post(url,json=jsonPost,verify=False,auth=(usernameForJira, passwordForJira))
            return response. text
        def getClientFromWazuh(option=1, params='limit=10000', action='get',urlParameters=''):
            urllib3. disable _ warnings()
            #默认用户名和密码
            username = password = 'xxxx'
            if option == 1:
                host = 'xxxx'
            elif option == 2:
                host= 'wazuh.bstops.com'
                username = 'xxxx'
                password = 'xxxx'
            host = '10.190.18.8'
            protocol = 'https'
            port = 55000
            login_endpoint = 'security/user/authenticate'
            login_url = f"{protocol}://{host}:{port}/{login_endpoint}"
            basic_auth = f"{username}:{password}".encode()
            login_headers = {
                'Content-Type': 'application/json',
                'Authorization': f'Basic{b64encode(basic_auth).decode()}'
                }
            response = requests.post(login_url, headers = login_headers, verify= False)
            if response.status_code == 405:
                response = requests.get(login_url, headers = login_headers, verify= False)
            token = json.loads(response.content.decode())['data']['token']
            requests_headers = {
                'Content-Type': 'application/json',
                'Authorization': f'Bearer{token}'
                }
            if action == 'get':
                if urlParameters == '' or 'agents/' in urlParameters:
                    response = requests.get(f"{protocol}://{host}:{port}/{urlParameters}?{params}&pretty=true",headers = requests_headers, verify=False)
                else:
                    response = requests.get(f"{protocol}://{host}:{port}/{urlParameters}", headers = requests_headers,verify = False,params = params)
                return response.text
            elif action == 'put':
                response = requests.put(f"{protocol}://{host}:{port}/{urlParameters}?agents_list=2310429&pretty=true&wait_for_complete=true",headers = requests_headers,verify = False , params = params)
                return response.text
            elif action == 'post':
                response = requests.post(f"{protocol}://{host}:{port}/{urlParameters}", headers= requests_headers, verify = False , params = params)
                return response.text

如何使用Hugo搭建自己的静态博客

免费搭建一个静态博客

独立博客表面上是网站,但实际上我们作为内容的创作者,需要关注的只是它的内容——我们以后会经常更新的只是内容的部分。一个博客网站可以分为这几个部分:

blog-site

最常见的博客系统 WordPress 是一款动态博客系统(PHP 程序),这类系统需要在云主机商(如阿里云)那里买专门的空间或来存储和发布(存储并运行 PHP 程序)我们的博客网站。

静态博客系统

所谓静态博客系统指的是,博客网站是由一系列的静态 HTML 文件,以及对应的多媒体素材等组成,只要找一个可以存储文件的平台即可发布网站。有许多工具,可以把一篇篇博客文章的 Markdown 文件,配上博客主题之后转换为可以直接发布的 HTML 文件。

templating

静态博客系统的原理,就如同这篇教程,我创作时,写的是 Markdown 文件(readme.md),但你看到的确实是带有格式和交互的网页。因为这背后,有 GitHub 在帮我完成了从静态 Markdown 到静态 HTML 的转换。

如果要搭建一个静态博客网站,我们需要的工具有:

  • 把 Markdown 转化为 HTML 的工具(即静态网站生成工具)
  • 博客样式主题(通常与工具是配套的,一般来说找到了生成工具就找到了主题)
  • 能存储和发布静态文件的平台

常用的静态网站生成工具有 jekyll, hexohugo 等。接下来,我以 hugo 的使用为例介绍如何搭建一个静态博客网站。

Read full post gblog_arrow_right