forked from stratosphereips/StratosphereLinuxIPS
-
Notifications
You must be signed in to change notification settings - Fork 0
/
metadata_manager.py
186 lines (164 loc) · 6.76 KB
/
metadata_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
from slips_files.common.imports import *
import socket
import psutil
import sys
import redis
import time
import os
import shutil
import json
from datetime import datetime
class MetadataManager:
def __init__(self, main):
self.main = main
def get_host_ip(self):
"""
Recognize the IP address of the machine
"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('1.1.1.1', 80))
ipaddr_check = s.getsockname()[0]
s.close()
except (socket.error):
# not connected to the internet
return None
return ipaddr_check
def get_pid_using_port(self, port):
"""
Returns the PID of the process using the given port or False if no process is using it
"""
port = int(port)
for conn in psutil.net_connections():
if conn.laddr.port == port:
return psutil.Process(conn.pid).pid #.name()
return None
def store_host_ip(self):
"""
Store the host IP address if input type is interface
"""
running_on_interface = '-i' in sys.argv or self.main.db.is_growing_zeek_dir()
if not running_on_interface:
return
hostIP = self.get_host_ip()
while True:
try:
self.main.db.set_host_ip(hostIP)
break
except redis.exceptions.DataError:
self.main.print(
'Not Connected to the internet. Reconnecting in 10s.'
)
time.sleep(10)
hostIP = self.get_host_ip()
return hostIP
def add_metadata(self):
"""
Create a metadata dir output/metadata/ that has a copy of slips.conf, whitelist.conf, current commit and date
"""
if not self.enable_metadata:
return
metadata_dir = os.path.join(self.main.args.output, 'metadata')
try:
os.mkdir(metadata_dir)
except FileExistsError:
# if the file exists it will be overwritten
pass
# Add a copy of slips.conf
config_file = self.main.args.config or 'config/slips.conf'
shutil.copy(config_file, metadata_dir)
# Add a copy of whitelist.conf
whitelist = self.main.conf.whitelist_path()
shutil.copy(whitelist, metadata_dir)
now = datetime.now()
now = utils.convert_format(now, utils.alerts_format)
self.info_path = os.path.join(metadata_dir, 'info.txt')
with open(self.info_path, 'w') as f:
f.write(f'Slips version: {self.main.version}\n'
f'File: {self.main.input_information}\n'
f'Branch: {self.main.db.get_branch()}\n'
f'Commit: {self.main.db.get_commit()}\n'
f'Slips start date: {now}\n'
)
print(f'[Main] Metadata added to {metadata_dir}')
return self.info_path
def set_analysis_end_date(self):
"""
Add the analysis end date to the metadata file and
the db for the web interface to display
"""
self.enable_metadata = self.main.conf.enable_metadata()
end_date = utils.convert_format(datetime.now(), utils.alerts_format)
self.main.db.set_input_metadata({'analysis_end': end_date})
if self.enable_metadata:
# add slips end date in the metadata dir
try:
with open(self.info_path, 'a') as f:
f.write(f'Slips end date: {end_date}\n')
except (NameError, AttributeError):
pass
return end_date
def set_input_metadata(self):
"""
save info about name, size, analysis start date in the db
"""
now = utils.convert_format(datetime.now(), utils.alerts_format)
to_ignore = self.main.conf.get_disabled_modules(self.main.input_type)
info = {
'slips_version': self.main.version,
'name': self.main.input_information,
'analysis_start': now,
'disabled_modules': json.dumps(to_ignore),
'output_dir': self.main.args.output,
'input_type': self.main.input_type,
'evidence_detection_threshold': self.main.conf.evidence_detection_threshold(),
}
if hasattr(self.main, 'zeek_dir'):
info.update({
'zeek_dir': self.main.zeek_dir
})
size_in_mb = '-'
if self.main.args.filepath not in (False, None) and os.path.exists(self.main.args.filepath):
size = os.stat(self.main.args.filepath).st_size
size_in_mb = float(size) / (1024 * 1024)
size_in_mb = format(float(size_in_mb), '.2f')
info.update({
'size_in_MB': size_in_mb,
})
# analysis end date will be set in shutdown_gracefully
# file(pcap,netflow, etc.) start date will be set in
self.main.db.set_input_metadata(info)
def check_if_port_is_in_use(self, port):
if port == 6379:
# even if it's already in use, slips will override it
return False
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(("localhost", port))
return False
except OSError as e:
print(f"[Main] Port {port} is already in use by another process."
f" Choose another port using -P <portnumber> \n"
f"Or kill your open redis ports using: ./slips.py -k ")
self.main.terminate_slips()
def update_slips_running_stats(self):
"""
updates the number of processed ips, slips internal time, and modified tws so far in the db
"""
slips_internal_time = float(self.main.db.getSlipsInternalTime()) + 1
# Get the amount of modified profiles since we last checked
modified_profiles, last_modified_tw_time = self.main.db.getModifiedProfilesSince(
slips_internal_time
)
modified_ips_in_the_last_tw = len(modified_profiles)
self.main.db.set_input_metadata({'modified_ips_in_the_last_tw': modified_ips_in_the_last_tw})
# Get the time of last modified timewindow and set it as a new
if last_modified_tw_time != 0:
self.main.db.set_slips_internal_time(
last_modified_tw_time
)
return modified_ips_in_the_last_tw, modified_profiles
def enable_metadata(self):
self.enable_metadata = self.main.conf.enable_metadata()
if self.enable_metadata:
self.info_path = self.add_metadata()