nmap扫描
基于Python3写的nmap扫描接口,仅用作代理
import nmap,json, requests, os, time, re, socket
from flask import Flask, request, jsonify, send_file
from typing import Dict
from BasicInterfaces import executeMysqlData
from PyNuclei import Nuclei
'''http接口供 nmap和 gcp日志下载使用'''
nm = nmap.PortScanner()
nucleiScanner = Nuclei()
app = Flask(__name__)
'''nmap扫描功能'''
@app.route('/', methods=['POST'])
def json_post():
#使用 flask开启 http服务, 用来接收 text格式的 ip地址
ip = str(request.data, encoding='utf-8')
dataJson = {}
dataJson[ip] = {}
print ('正在扫描%s' % ip)
#开始调用nmap进行扫描,超时时间1200秒,1200秒后强制结束扫描进程
result= nm.scan(ip, arguments='-T5 - n - sT - Pn - p-', timeout = 1200)
dataJson[ip]['portstatus']={}
try:
if result['nmap']['scanstats']['uphosts'] != 'θ':
for j in result['scan'][ip]['tcp']:
if result['scan'][ip]['tcp'][j]['state'] == 'open':
print (f'发现开放端口{j},正在详细扫描该端口。。。')
doubleScan = nm.scan(ip, arguments=f'-T5 - n - sV - Pn - p {j}', timeout = 1200)
print (f'端口扫描结果为{doubleScan}')
resultDict = {}
resultDict['service'] = doubleScan['scan'][ip]['tcp'][j]['name']
resultDict['application'] = doubleScan['scan'][ip]['tcp'][j]['product']
dataJson[ip]['portstatus'][j] = resultDict
return jsonify(dataJson)
else:
return 'False'
except:
return 'False'
'''域名扫描功能'''
@app.route('/scan/<domain>')
def scan(domain: str):
result = {}
headers = {"mobile":{"User-Agent": "Mozilla/5.θ (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/6θ4.1"}, " pc": {" User-Agent":" Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36(KHTML, like Gecko) Chrome/78.0.3984.108 Safari/537.36"}}
for i in headers:
scanResult = nucleiScanner.scan(
host = f"{domain}",
userAgent= headers[i],
rateLimit=150,
verbose= False,
metrics= False,
maxHostError=3θ,
stopAfter= None
)
result[i]=scanResult
return result
''' cve查询功能'''
@app.route('/versioncheck/< domainname>')
def versioncheck(domainname):
dataJson = {}
dataJson[domainname] = {}
try:
dataJson[domainname]['ip'] = socket.gethostbyname(domainname)
except:
dataJson[domainname]['ip'] = 'unknown'
url= f"https://{domainname}/version.txt"
headers = {"mobile":{"User-Agent": "Mozilla/5.θ (iPhone; CPU iPhone OS 13_2_3 like Mac OS X)AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/684.1"}, "pc": {"User-Agent":" Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36(KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36"}}
for i in headers:
try:
response = requests.get(url, headers= headers[i])
except:
pass
dataJson[domainname][i]={}
if 'response' in locals():
if response.status_code == 200:
if '<' not in response.text:
response = response.text
filteredText = "\n". join([line for line in response.splitlines() if"committer" not in line])
matches = re.findall(r":\s*(.+)", filteredText)
try:
dataJson[domainname][i]['project'] = matches[θ]
dataJson[domainname][i]['version'] = matches[1]
dataJson[domainname][i]['buildtime'] = matches[2]
except:
dataJson[domainname][i]['project'] = 'unknown'
dataJson[domainname][i]['version'] = response
dataJson[domainname][i]['buildtime'] = 'unknown'
dataJson[domainname][i]['origin'] = response
return jsonify(dataJson)
@app.route('/cve', methods=['POST'])
def cveQuery():
data = request.get_json()
software = data['software']
version = data['version']
startDateFormat = data['starttime']
endDateFormat = data['endtime']
urlParameter = data['cpe']
url= f"https://services.nvd.nist.gov/rest/json/cves/2.0?cpeName={urlParameter}&lastModStartDate={startDateFormat}&lastModEndDate={endDateFormat}"
headers = {'apiKey': '92127066-eb5f-478a-8d92-4978b3f1f9a2'.}
n = 0
while 1:
response = requests.get(url, headers= headers)
if response. status_code != 484 and response.status_code != 583:
break
else:
if n <4:
print (f'获取失败, 状态码为{response.status_code}')
time.sleep(5)
n += 1
else:
return 'no result of that software!',404
response = json.loads(response. text)
resultJson = {}
resultJson['software'] = software
resultJson['version'] = version
cveList = []
for i in response['vulnerabilities']:
try:
severity = i['cve']['metrics']['cvssMetricV31'][θ]['cvssData']['baseSeverity']
except:
severity = i['cve']['metrics']['cvssMetricV2'][0]['baseSeverity']
if severity == 'HIGH' or severity == "CRITICAL":
print ('catch high vuln!')
cveID = i['cve']['id']
cveList.append(cveID)
resultJson['cve'] = cveList
return jsonify(resultJson)
'''gcp日志下载功能的一部分,用来获取所有日志文件的路径'''
def get_all_files(directory):
file_paths = []
for root, dirs, files in os.walk(directory):
for file in files:
file_path = os.path.relpath(os.path.join(root,file), "directory")
file_paths.append(file_path.replace("\\", "/"))
return file_paths'''gcp日志下载功能的一部分,获取所有日志文件'''
@app.route('/gcp')
def list_files():
files = get_all_files("/var/log/gcplogs") + get_all_files("/var/log/AWSLogs")
return jsonify(files)
@app.route('/repos/<path:filename>')
@app.route('/conf/<path:filename>')
@app.route('/downloadgcp/<path:filename>')
def download_file(filename):
#获取请求的路径,判断是来自 clients 还是 conf
if request.path.startswith('/repos/'):
base_dir = '/var/log/repos'
elif request.path.startswith('/conf/'):
base_dir = '/var/log/conf'
elif request.path.startswith('/downloadgcp/'):
base_dir = '/var/log'
full_path = os.path.join(base_dir, filename)
#验证文件是否存在且是否为文件
if os.path.exists(full_path) and os.path.isfile(full_path):
return send_file(full_path,as_attachment= True)
else:
return "File not found", 404
@app.route('/query', methods=['GET'])
def query_data():
data = {}
if 'filter' in request.url:
condition0fQuery = request.args.get('filter')
condition0fQuery.split('=')[θ]
condition0fQuery.split('=')[1]
if conditionOfQuery:
sql= "select name, extip, intip from gcpinfo Where " + condition0fQuery. split('=')[θ] + '=' + "'" + conditionOfQuery.split('=')[1] + "'"
for record in executeMysqlData(4,' read', sql):
data[record[θ]] = {"extip": record[1], "intip": record[2]}
return jsonify(data)
else:
return jsonify(message=' Please provide filter criteria'), 400
else:
sql= " select name, extip, intip from gcpinfo"
for record in executeMysqlData(4,' read', sql):
data[record[θ]] = {"extip": record[1], "intip": record[2]}
return jsonify(data)
app.run(host='10.170.0.2', port=1080)