from requests import urllib3
from requests.auth import HTTPBasicAuth
import json, requests, mysql.connector, re, os,smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMETextfrom base64
import b64encode
def executeMysqlData(serverID, oper= None,dataSQL= None):
if serverID == 1:
mydb = mysql.onnector.connect(
host="x.x.x.x",
user="x.x.x.x",
password="x.x.x.x",
database="x.x.x.x",
port=3306)
elif serverID == 2 :
mydb = mysql.connector.connect(
host="y.y.y.y",
user=" yyyy",
password=" yyyy",
database=" yyyy",
port=3306)
mycursor = mydb. cursor()
if oper.upper() == 'WRITE' and dataSQL.split()[θ].upper() != 'SELECT':
mycursor.execute(dataSQL)
mydb.commit()
mycursor.close()
mydb.close()
if mycursor.rowcount > θ:
return True
else:
return False
elif oper.upper() == 'READ' and dataSQL.split()[0].upper() != 'INSERT' and dataSQL.split()[θ].upper() != 'UPDATE':
mycursor. execute(dataSQL)
resultList = []
for i in mycursor:
resultList. append(i)
mycursor.close()
mydb.close()
return resultList
else:
return ' unexpected request! check and try again!'
#定义 mcafee查询 ip和机器名信息,允许传递 ip地址或者机器名或者FQDN名,返回为 json数据
def apiFromMcafee(actionId=1, input= None):
urllib3.disable.warnings()
username = 'xxx'
password = 'yyy'
if actionId == 1:
url= f'https://xxxx:8443/remote/system.find?searchText={input}&:output=json'
curl_command=f'curl -u{'"'+username+'"'}:{'"'+password+'"'} -s -k "{url}"'
data = os.popen(curl_command).read()
patternIP = re.compile(r'10.((25[θ-5]|2[0-4]\d|1\d{2}|[1-9]?\d)\.){2}')
record={}
if bool(re. match(patternIP, input)) == True:
if len(data) > 20:
if data[θ:2] == 'OK':
jsonStr = json. loads(data[5:])
for i in range(0, len(jsonStr)):
ip = jsonStr[i]['EPOComputerProperties. IPAddress']
subnetSearch = "'" + patternIP. search(ip)[0] + '0/24' + "'"
querySubnetInfo = f"SELECT name from IPStatus WHERE subnet = {subnetSearch}"
if input == ip:
#这些只是为了看着好看,所以未知的统一改成 Unknown
if jsonStr[i]['EPOComputerProperties. UserName'] == 'N/A' or jsonStr[i]['EPOComputerProperties.Username'] == 'None':
user = ' Unknown'
else:
user = jsonStr[i]['EPOComputerProperties.UserName']
if jsonStr[i]['EPOComputerProperties. IPHostName'] == 'N/A' or jsonStr[i]['EPOComputerProperties.IPHostName'] == 'None':
hostname = ' Unknown'
else:
hostname = jsonStr[i]['EPOComputerProperties.IPHostName']
queryDepartment = executeMysqlData(1,'read',querySubnetInfo)
if len(queryDepartment)== 0:
departmentName = 'Unknown'
else:
departmentName = queryDepartment[0][0]
record = {
'ip':ip,
'username':user,
'department':departmentName,
'computerName':hostname
}
break
else:
record = False
else:
record = False
else:
record = False
elif actionId == 2:
url = f'https://xxxx:8443/remote/core.executeQuery?queryId={input}&:output=json'
curl_command = f'curl -u {'"' + username + '"'}:{'"' + password + '"'} -s -k"{url}"'
response = os.popen(curl_command).read()
return response
#定义发送邮件接口的方法, subject为标题, data为内容, sendTo为收件人, type为邮件类型, 默认为 html, 如乘是纯文本, 则写 plain
def sendMail(subject, data,sendTo='xxxx@xxxx.xxx',type='html'):
msg = MIMEMultipart('alternative')
msg['Subject'] = subject
msg['From'] = 'xxxx@xxxx.xxx'
msg['To'] = sendTo
text_part = MIMEText(data, type)
msg.attach(text_part)
#此处填写邮件服务器接口
with smtplib.SMTP('xxxx',25) as smtp:
try:
smtp.send_message(msg)
return True
except:
return False
#定义读取ELK的方法, serverId为区分不同的ELK集群,indexData为要搜索的索引目录, bodyData为要搜索的条件, 格式为json,返回json格式的字符串
def readELK(serverId=2,indexDatα=None,bodyDatα=None,esVersion=8):
#xxxx
if serverId == 1:
apiKey = 'aaaa'
url = ' https://ssss:9200'
#wazuh的ELK
elif serverId == 3:
username = 'wazuh'
password = 'wazuh'
url = 'https://aaaa:9200'
if esVersion == 8:
headers = {" Authorization": "ApiKey" + apiKey}
resultJson = json.loads((requests. get(url + "/" + indexData + "/_search", headers= headers, json=bodyData,verify=False)).text)
elif esVersion == 7:
resultJson = json.loads((requests. get(url + "/" + indexData + "/_search", auth=HTTPBasicAuth(username,password),json=bodyData,verify=False)).text)
print ("读取ELK数据完毕")
return resultJson
#定义读写jira的api的方法,第一个为url,也就是条件,第二个为jsonPost,如果要创建case则使用jsonPost,如果只是get的话第二项则不用写
def apiFromJira(url,jsonPost= None):
usernameForJira = passwordForJira = 'xxxx'
urllib3. disable _ warnings()
if jsonPost is None:
response = requests. get(url, verify= False, auth=(usernameForJira, passwordForJira))
else:
if url[-8:] == 'assignee':
response = requests.put(url,json=jsonPost,verify=False,auth=(usernameForJira, passwordForJira))
else:
response = requests.post(url,json=jsonPost,verify=False,auth=(usernameForJira, passwordForJira))
return response. text
def getClientFromWazuh(option=1, params='limit=10000', action='get',urlParameters=''):
urllib3. disable _ warnings()
#默认用户名和密码
username = password = 'xxxx'
if option == 1:
host = 'xxxx'
elif option == 2:
host= 'wazuh.bstops.com'
username = 'xxxx'
password = 'xxxx'
host = '10.190.18.8'
protocol = 'https'
port = 55000
login_endpoint = 'security/user/authenticate'
login_url = f"{protocol}://{host}:{port}/{login_endpoint}"
basic_auth = f"{username}:{password}".encode()
login_headers = {
'Content-Type': 'application/json',
'Authorization': f'Basic{b64encode(basic_auth).decode()}'
}
response = requests.post(login_url, headers = login_headers, verify= False)
if response.status_code == 405:
response = requests.get(login_url, headers = login_headers, verify= False)
token = json.loads(response.content.decode())['data']['token']
requests_headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer{token}'
}
if action == 'get':
if urlParameters == '' or 'agents/' in urlParameters:
response = requests.get(f"{protocol}://{host}:{port}/{urlParameters}?{params}&pretty=true",headers = requests_headers, verify=False)
else:
response = requests.get(f"{protocol}://{host}:{port}/{urlParameters}", headers = requests_headers,verify = False,params = params)
return response.text
elif action == 'put':
response = requests.put(f"{protocol}://{host}:{port}/{urlParameters}?agents_list=2310429&pretty=true&wait_for_complete=true",headers = requests_headers,verify = False , params = params)
return response.text
elif action == 'post':
response = requests.post(f"{protocol}://{host}:{port}/{urlParameters}", headers= requests_headers, verify = False , params = params)
return response.text