forked from alybun/cve-alerts
-
Notifications
You must be signed in to change notification settings - Fork 0
/
bot.py
152 lines (119 loc) · 5.74 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import requests
import json
import time
from datetime import datetime, timedelta
# Set your Discord Webhook URL here
DISCORD_WEBHOOK_URL = ""
# Set the NIST CVE API URL
NIST_API_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0/"
# Dictionary to track posted CVEs and prevent duplicates
posted_cves = {}
def get_recent_cves():
# Fetches all CVEs from NIST API published in the last 3 hours
end_date = datetime.utcnow()
start_date = end_date - timedelta(hours=3)
params = {
"pubStartDate": start_date.strftime("%Y-%m-%dT%H:%M:%S.000Z"),
"pubEndDate": end_date.strftime("%Y-%m-%dT%H:%M:%S.000Z"),
"resultsPerPage": 2000, # Fetch maximum records per page
"startIndex": 0
}
all_cves = []
while True:
print(f"[{datetime.utcnow()}] Sending request to NIST API with params: {params}", flush=True)
try:
response = requests.get(NIST_API_URL, params=params)
response.raise_for_status() # Raises exception for non-2xx responses
data = response.json()
vulnerabilities = data.get("vulnerabilities", [])
print(f"[{datetime.utcnow()}] Received {len(vulnerabilities)} CVEs from NIST API.", flush=True)
print(f"[{datetime.utcnow()}] Full response content: {response.text}", flush=True)
all_cves.extend(vulnerabilities)
# Check if there are more CVEs to fetch
if len(vulnerabilities) < params['resultsPerPage']:
break
# Update startIndex for pages
params["startIndex"] += params["resultsPerPage"]
except requests.RequestException as e:
print(f"[{datetime.utcnow()}] Error fetching data from NIST API: {e}", flush=True)
break
return all_cves
def get_cwe_name(weakness):
# Extract the number part from the CWE string
cwe_number = weakness.split('-')[-1] if '-' in weakness else weakness
url = f"https://cwe-api.mitre.org/api/v1/cwe/weakness/{cwe_number}"
try:
response = requests.get(url)
response.raise_for_status()
data = response.json()
return data.get('Weaknesses', [{}])[0].get('Name', 'Unknown')
except requests.RequestException as e:
print(f"Error fetching CWE name for {weakness}: {e}", flush=True)
return 'Unknown'
def send_discord_message(cve):
# Sends a CVE alert to Discord
# Fetch CVE ID and ensure it exists
cve_id = cve['cve'].get('id', 'N/A')
# Fetch description
description = cve['cve'].get('descriptions', [{}])[0].get('value', 'No description available')
# Fetch published date
published_date = cve['cve'].get('published', 'N/A')
# Fetch base score
base_score = 'N/A'
metrics = cve['cve'].get('metrics')
if metrics:
cvss_metrics = metrics.get('cvssMetricV40', []) or metrics.get('cvssMetricV31', []) or metrics.get('cvssMetricV2', [])
if cvss_metrics:
base_score = cvss_metrics[0].get('cvssData', {}).get('baseScore', 'N/A')
# Fetch vendor comments
vendor_comments = cve['cve'].get('vendorComments', [])
vendor_comment_values = [comment.get('value', '') for comment in vendor_comments]
vendor_comments_str = ", ".join(filter(None, vendor_comment_values)) or "No vendor comments available"
# Fetch weaknesses
weaknesses = cve['cve'].get('weaknesses', [])
weakness_values = []
for weakness in weaknesses:
cwe = weakness.get('description', [{}])[0].get('value', '')
if cwe.startswith('CWE-'):
cwe_name = get_cwe_name(cwe)
weakness_values.append(f"{cwe} - {cwe_name}")
else:
weakness_values.append(cwe)
weaknesses_str = ", ".join(filter(None, weakness_values)) or "No weaknesses specified"
message = {
"content": f"# {cve_id}\n"
f"**Base Score**: {base_score}\n"
f"**Published Date**: {published_date}\n"
f"**CVE Link**: https://nvd.nist.gov/vuln/detail/{cve_id}\n"
f"**Description**: {description}\n"
f"**Vendor Comments**: {vendor_comments_str}\n"
f"**Weaknesses**: {weaknesses_str}\n"
}
# Print the message content for debugging
# print(f"[{datetime.utcnow()}] Message to be sent to Discord:", flush=True)
# print(json.dumps(message, indent=2), flush=True)
headers = {"Content-Type": "application/json"}
print(f"[{datetime.utcnow()}] Sending notification to Discord for CVE: {cve_id}", flush=True)
try:
response = requests.post(DISCORD_WEBHOOK_URL, data=json.dumps(message), headers=headers)
response.raise_for_status() # Raises exception for non-2xx responses
print(f"[{datetime.utcnow()}] Notification sent for CVE {cve_id}: {response.status_code}", flush=True)
except requests.RequestException as e:
print(f"[{datetime.utcnow()}] Failed to send Discord message for CVE {cve_id}: {e}", flush=True)
def main():
print(f"[{datetime.utcnow()}] Starting bot. Checking CVEs published in the last 3 hours.", flush=True)
while True:
# Fetch all CVEs published in the last 3 hours
cves = get_recent_cves()
if not cves:
print(f"[{datetime.utcnow()}] No new CVEs found.", flush=True)
for cve in cves:
# Check if CVE has already been posted
if cve['cve']['id'] not in posted_cves:
send_discord_message(cve)
posted_cves[cve['cve']['id']] = True # Mark this CVE as posted
# Sleep for 3600 seconds (60 minutes) before checking again
print(f"[{datetime.utcnow()}] Sleeping for 3600 seconds before checking for new CVEs.", flush=True)
time.sleep(3600)
if __name__ == "__main__":
main()