nmap扫描

基于Python3写的nmap扫描接口,仅用作代理

基于Python3写的nmap扫描接口,仅用作代理

import nmap,json, requests, os, time, re, socket
from flask import Flask, request, jsonify, send_file
from typing import Dict
from BasicInterfaces import executeMysqlData
from PyNuclei import Nuclei
'''http接口供 nmap和 gcp日志下载使用'''
nm = nmap.PortScanner()
nucleiScanner = Nuclei()
app = Flask(__name__)
'''nmap扫描功能'''
@app.route('/', methods=['POST'])
def json_post():
    #使用 flask开启 http服务, 用来接收 text格式的 ip地址
    ip = str(request.data, encoding='utf-8')
    dataJson = {}
    dataJson[ip] = {}
    print ('正在扫描%s' % ip)
    #开始调用nmap进行扫描,超时时间1200秒,1200秒后强制结束扫描进程
    result= nm.scan(ip, arguments='-T5 - n - sT - Pn - p-', timeout = 1200)
    dataJson[ip]['portstatus']={}
    try:
        if result['nmap']['scanstats']['uphosts'] != 'θ':
            for j in result['scan'][ip]['tcp']:
                if result['scan'][ip]['tcp'][j]['state'] == 'open':
                    print (f'发现开放端口{j},正在详细扫描该端口。。。')
                    doubleScan = nm.scan(ip, arguments=f'-T5 - n - sV - Pn - p {j}', timeout = 1200)
                    print (f'端口扫描结果为{doubleScan}')
                    resultDict = {}
                    resultDict['service'] = doubleScan['scan'][ip]['tcp'][j]['name']
                    resultDict['application'] = doubleScan['scan'][ip]['tcp'][j]['product']
                    dataJson[ip]['portstatus'][j] = resultDict
                    return jsonify(dataJson)
                else:
                    return 'False'
    except:
        return 'False'
'''域名扫描功能'''
@app.route('/scan/<domain>')
def scan(domain: str):
    result = {}
    headers = {"mobile":{"User-Agent": "Mozilla/5.θ (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/6θ4.1"}, " pc": {" User-Agent":" Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36(KHTML, like Gecko) Chrome/78.0.3984.108 Safari/537.36"}}
    for i in headers:
        scanResult = nucleiScanner.scan(
            host = f"{domain}",
            userAgent= headers[i],
            rateLimit=150,
            verbose= False,
            metrics= False,
            maxHostError=3θ,
            stopAfter= None
            )
        result[i]=scanResult
    return result
''' cve查询功能'''
@app.route('/versioncheck/< domainname>')
def versioncheck(domainname):
    dataJson = {}
    dataJson[domainname] = {}
    try:
        dataJson[domainname]['ip'] = socket.gethostbyname(domainname)
    except:
        dataJson[domainname]['ip'] = 'unknown'
    url= f"https://{domainname}/version.txt"
    headers = {"mobile":{"User-Agent": "Mozilla/5.θ (iPhone; CPU iPhone OS 13_2_3 like Mac OS X)AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/684.1"}, "pc":  {"User-Agent":" Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36(KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36"}}
    for i in headers:
        try:
            response = requests.get(url, headers= headers[i])
        except:
            pass
        dataJson[domainname][i]={}
        if 'response' in locals():
            if response.status_code == 200:
                if '<' not in response.text:
                    response = response.text
                    filteredText = "\n". join([line for line in response.splitlines() if"committer" not in  line])
                    matches = re.findall(r":\s*(.+)", filteredText)
                    try:
                        dataJson[domainname][i]['project'] = matches[θ]
                        dataJson[domainname][i]['version'] = matches[1]
                        dataJson[domainname][i]['buildtime'] = matches[2]
                    except:
                        dataJson[domainname][i]['project'] = 'unknown'
                        dataJson[domainname][i]['version'] = response
                        dataJson[domainname][i]['buildtime'] = 'unknown'
                    dataJson[domainname][i]['origin'] = response
    return jsonify(dataJson)
@app.route('/cve', methods=['POST'])
def cveQuery():
    data = request.get_json()
    software = data['software']
    version = data['version']
    startDateFormat = data['starttime']
    endDateFormat = data['endtime']
    urlParameter = data['cpe']
    url= f"https://services.nvd.nist.gov/rest/json/cves/2.0?cpeName={urlParameter}&lastModStartDate={startDateFormat}&lastModEndDate={endDateFormat}"
    headers = {'apiKey': '92127066-eb5f-478a-8d92-4978b3f1f9a2'.}
    n = 0
    while 1:
        response = requests.get(url, headers= headers)
        if response. status_code != 484 and response.status_code != 583:
            break
        else:
            if n <4: 
                print (f'获取失败, 状态码为{response.status_code}')
                time.sleep(5)
                n += 1
            else:
                return 'no result of that software!',404
    response = json.loads(response. text)
    resultJson = {}
    resultJson['software'] = software
    resultJson['version'] = version
    cveList = []
    for i in response['vulnerabilities']:
        try:
            severity = i['cve']['metrics']['cvssMetricV31'][θ]['cvssData']['baseSeverity']
        except:
            severity = i['cve']['metrics']['cvssMetricV2'][0]['baseSeverity']
        if severity == 'HIGH' or severity == "CRITICAL":
            print ('catch high vuln!')
            cveID = i['cve']['id']
            cveList.append(cveID)
    resultJson['cve'] = cveList
    return jsonify(resultJson)
'''gcp日志下载功能的一部分,用来获取所有日志文件的路径'''
def get_all_files(directory):
    file_paths = []
    for root, dirs, files in os.walk(directory):
        for file in files:
            file_path = os.path.relpath(os.path.join(root,file), "directory")
            file_paths.append(file_path.replace("\\", "/"))
    return file_paths'''gcp日志下载功能的一部分,获取所有日志文件'''
@app.route('/gcp')
def list_files():
    files = get_all_files("/var/log/gcplogs") + get_all_files("/var/log/AWSLogs")
    return jsonify(files)
@app.route('/repos/<path:filename>')
@app.route('/conf/<path:filename>')
@app.route('/downloadgcp/<path:filename>')
def download_file(filename):
    #获取请求的路径,判断是来自 clients 还是 conf
    if request.path.startswith('/repos/'):
        base_dir = '/var/log/repos'
    elif request.path.startswith('/conf/'):
        base_dir = '/var/log/conf'
    elif request.path.startswith('/downloadgcp/'):
        base_dir = '/var/log'
    full_path = os.path.join(base_dir, filename)
    #验证文件是否存在且是否为文件
    if os.path.exists(full_path) and os.path.isfile(full_path):
        return send_file(full_path,as_attachment= True)
    else:
        return "File not found", 404
@app.route('/query', methods=['GET'])
def query_data():
    data = {}
    if 'filter' in request.url:
        condition0fQuery = request.args.get('filter')
        condition0fQuery.split('=')[θ]
        condition0fQuery.split('=')[1]
        if conditionOfQuery:
            sql= "select name, extip, intip from gcpinfo Where " + condition0fQuery. split('=')[θ] + '=' + "'" + conditionOfQuery.split('=')[1] + "'"
            for record in executeMysqlData(4,' read', sql):
                data[record[θ]] = {"extip": record[1], "intip": record[2]}
            return jsonify(data)
        else:
            return jsonify(message=' Please provide filter criteria'), 400
    else:
        sql= " select name, extip, intip from gcpinfo"
        for record in executeMysqlData(4,' read', sql):
            data[record[θ]] = {"extip": record[1], "intip": record[2]}
        return jsonify(data)
app.run(host='10.170.0.2', port=1080)