读取ELK信息并写入数据库
读取ELK信息并写入数据库 需要用到pubapi来写入数据库 不适用于所有环境,脚本只做技术储备
from BasicInterface import *
from datetime import datetime, timedelta
import re, time
from requests.packages import urllib3
urllib3.disable_warnings()
#每小时拉取一次告警数据以供 grafana读取和审计
time1 = datetime.now().strftime("%Y-%m-%dT%H:%M:%S" + "+0800")
time2 = (datetime.now() - timedelta(minutes=60)).strftime("%Y-%m-%dT%H:%M:%S" + "+0800")
def requestWazuhLv12(server, index):
if server == 3:
srv = """' sec'"""
elif server == 4:
srv = """' saint""""
elif server == 5:
srv = """' saext'"""
sql= f""" select * from WazuhExcept where server={srv}"""
result = executeMysqlData(1,'read',sql)
for item in result:
body = {
"query": {
"bool": {
"filter":
[{
"bool":{
"should":{
"range": {
"rule.level": {
"gte": 12
}
}
}
}
}]
}
},
"size": 100
}
key = item[2]
value = item[1]
time2 = item[4]
time1 = item[5]
valueList = []
try:
for j in range (θ,len(value.split(','))):
valueList.append(value.split(',')[j])
except:
valueList = value
if time2 is None or time1 is None:
time2 = (datetime.now() - timedelta(minutes=60)).strftime("%Y-%m-%dT%H:%M:%S" + "+880θ")
time1 = datetime. now().strftime("%Y-%m-%dT%H:%M:%S" + "+8800")
else:
time2 = time2.strftime("%Y-%m-%dT%H:%M:%S" + "+8888")
time1 = time1.strftime("%Y-%m-%dT%H:%M:%S" + "+0800")
timerange = {"range": {
"@timestamp": {
"gte": time2,
"lte": time1
}
}
}
ruleset = [{
"terms": {
key: valueList
}
}]
body["query"]["bool"]["filter"].append(timerange)
body["query"]["bool"]["must_not"]= ruleset
response = readELK(server, index, body,7)
if response != False:
if len(response['hits']['hits'])!= θ:
for i in response['hits']['hits']:
sourceHostname = "'" + i['_source']['agent']['name'] + "'"
timeWazuh = "'" + str(datetime.strptime(i['_source']['timestamp'].split('T')[θ] + " " + i['_source']['timestamp'].split('T')[1].split('+')[θ].split('.')[θ], "%Y-%m-%d %H:%M:%S"))+"'"
try:
usernameDomain = i['_source']['data']['win']['eventdata']['detection User']
if '\\' in usernameDomain:
username = "'" + usernameDomain.split('\\')[-1] + "'"
domain = "'" + usernameDomain.split('\\')[θ] + "'"
except:
username = '\'Unknown\''
domain = '\'None\''
if 'data' in i['_source']:
if 'vulnerability' not in i['_source']['data']:
if 'full_log' in i['_source']:
if re.match("^CVE",i['_source']['full_log']) is None:
contextWazuh = i['_source']['full_log'].replace("'", "")
elif 'category Name' in i['_source']['data']['win']['eventdata']:
contextWazuh = i['_source']['data']['win']['eventdata']['category Name'] + ":" + i['_source']['data']['win']['eventdata']['path']
else:
contextWazuh = i['_source']['data']['win']['system']['message']
else:
contextWazuh = i['_source']['data']['vulnerability']['title']
contextWazuh = contextWazuh.replace("'", "")
else:
contextWazuh = i['_source']['full_log']
if len(contextWazuh) >= 251:
contextWazuh = "'" + contextWazuh[:250] + "'"
else:
contextWazuh = "'" + contextWazuh + "'"
if server == 3:
if '10.70.4.' in i['_source']['agent']['ip'] or '10.70.5.' in i['_source']['agent']['ip'] or '10.70.6.' in i['_source']['agent']['ip'] or '10.70.8.' in i['_source']['agent']['ip'] or '10.78.6.' in i['_source']['agent']['ip'] or '10.70.8.' in i['_source']['agent']['ip'] or '10.79.27.' in i['_source']['agent']['ip'] or '10.21.25.' in i['_source']['agent']['ip']:
department = "'" + 'Techology' + "'"
else:
department = "'" + 'unknown' + "'"
try:
infoSource = "'" + i['_source']["data"]["win"]["system"]["providerName"] + "'"
except:
infoSource = "'"+ 'wazuh' + "'"
if 'Microsoft-Windows-Windows Defender' in infoSource:
sql=f"""INSERT INTO LogCollection (Type, Context, Infosource,HostnameOfSource, Domain, Account,TimeOfHappened, Auditer, Department) VALUES ('Audit',{contextWazuh},{infoSource},{sourceHostname},{domain},{username},{timeWzuh},'Doctor',{department})"""
else:
sql=f"""INSERT INTO LogCollection (Type, Context, Infosource,HostnameOfSource, Domain, Account,TimeOfHappened, Auditer, Department) VALUES(' Audit',{contextWazuh},{infoSource},{sourceHostname},{domain},{username},{timeWazuh},'Matteo',{department})"""
elif server == 4:
sql=f"""INSERT INTO LogCollection (Type,Context,Infosource,HostnameOfSource,Domain,Account,TimeOfHappened, Auditer, Department) VALUES ('Audit',{contextWazuh},'SaIntWazuh','{sourceHostname}','{domain}','{username}',{timeWazuh},'hsDanny','SA')"""
elif server == 5:
sql=f"""INSERT INTO LogCollection (Type, Context, Infosource,HostnameOfSource, Domain, Account, TimeOfHappened, Auditer, Department) VALUES ('Audit',{contextWazuh},'SaExtWazuh',{sourceHostname},{domain},{username},{timeWazuh},'hsDanny','SA')"""
resultSql = executeMysqlData(3,'write', sql)
if resultSql == True:
print ('mcafee数据写入成功')
else:
print ('mcafee数据写入失败,请检查')
else:
print (f'本次扫描完成, {server}没有发现12级以上的告警')
else:
print ('wazuh接口调用失败,故跳过本次查询!')
def requestWazuhAccount():
body = {
"query":{
"bool": {
"must": [{
"range": {
"@timestamp": {
"gte": time2,
"lte": time1
}
}
}],
"filter": {
"bool": {
"must": [{
"terms": {
"data.win.eventdata.targetUserName": [
"admin", "administrator", "zohoadm", "exadm","administratθr"]
}
},
{
"bool": {
"must_not": {
"term": {
"rule.description":"Windows User Logoff."
}
}
}
},
{
"bool": {
"must_not": {
"term": {
"rule.description":"Windows logon success."
}
}
}
},
{
"bool": {
"must_not": {
"term": {
"data.win.eventdata.workstationName":"<computername>"
}
}
}
},
{
"bool": {
"must_not": {
"term": {
"data.win.eventdata.workstationName":"<computername>"
}
}
}
},
{
"bool": {
"must_not": {
"term": {
"rule.id": "60122"
}
}
}
},
{
"bool": {
"must_not": {
"term": {
"data.win.eventdata.logonProcessName": "NtLmSsp"
}
}
}
}]
}
}
}
}
}
result = readELK(serverId=3,indexDatα="<indexname>",bodyDatα=body,esVersion=7)
if result != False:
if len(result['hits']['hits'])!= 0:
for i in result['hits']['hits']:
pattern = r'[mM][aA][iɪ][lL]'
if re.search(pattern, i['_source']['agent']['name'], re.IGNORECASE):
logForSql = "'"+ i['_source']['data']['win']['system']['severityValue'] + ", " + i['_source']['data']['win']['eventdata']['logonProcessName'] + "'"
accountForSql = "'" + i['_source']['data']['win']['eventdata']['targetUserName'] + "'"
hostForSql = "'" + i['_source']['agent']['name'] + "'"
ipForSql = "'" + i['_source']['agent']['ip'] + "'"
timeForSql = "'" + str(datetime.strptime(i['_source']['timestamp'].split('T')[θ] + " " + i['_source'][' timestamp'].split('T')[1].split('+')[θ].split('.')[θ], "%Y-%m-%d %H:%M:%S")) + "'"
sql=f"""INSERT INTO LogCollection (Type, Context,SourceIP, Infosource,HostnameOfSource, Account,TimeOfHappened) VALUES ('Audit',{logForSql},{ipForSql},'wazuh-特权账号登入',{hostForSql},{accountForSql},{timeForSql})"""
resultSql = executeMysqlData(3,'write', sql)
if resultSql == True:
print ('wazuh数据写入成功')
else:
print ('wazuh数据写入失败,请检查!')
check = θ
else:
print ('wazuh接口调用失败,故跳过本次查询!')
def requestMcafee():
idList = ['877','878']
for i in idList:
data = apiFromMcafee(2,i)
if data[θ:2] == 'OK':
if len(data) > 10:
jsonStr = json.loads(data[4:])
if len(jsonStr) !=θ:
for j in jsonStr:
usernameDomain = j['EPOEvents.TargetUserName']
if usernameDomain != None:
if len(usernameDomain) == 1:
username = 'Unknown'
domain = 'Unknown'
elif '\\' in usernameDomain:
username = usernameDomain.split('\\')[-1]
domain = usernameDomain.split('\\')[θ]
elif '\\' not in usernameDomain:
username = usernameDomain
domain = 'Unknown'
else:
username= 'Unknown'
domain = 'Unknown'
try:
if len(j['EPExtendedEvent.NaturalLangDescription']) > 200:
#print (j['EPExtendedEvent. NaturalLangDescription'])
description = "'" + (j['EPExtendedEvent.NaturalLangDescription'])[θ:200] + "'"
else:
description = "'" + (j['EPExtendedEvent.NaturalLangDescription']) + "'"
except:
if len(j['EPOEvents.ThreatName'])> 200:
description = "'" + (j['EPOEvents.ThreatName'])[0:200] + "'"
else:
description = "'" + (j['EPOEvents.ThreatName']) + "'"
sql=f"""INSERT INTO LogCollection (Type, Context, Infosource,HostnameOfSource, Domain,Account, Auditer) VALUES ('Audit',{description},'mcafee',{"'" + j['EPOComputerProperties.IPHostName']},{"'" + domain + "'"},{"'" + username + "'"},'Doctor')"""
resultSql = executeMysqLData(3,'write', sql)
if resultSql == True:
print ('mcafee数据写入成功')
else:
print ('mcafee数据写入失败,请检查!')
else:
print ('mcafee近一小时没有收到告警, no news is good news~!!')
else:
print ('mcafee近一小时没有收到告警, no news is good news~!!')
serverlist = {3:{"index":"<indexname>"},4:{"index":"<indexname>"},5:{"index":"<indexname>"}}
for server in serverlist:
requestWazuhLv12(server,serverlist[server]['index'])
requestWazuhAccount()
requestMcafee()