forked from EmpireProject/Empire
-
-
Notifications
You must be signed in to change notification settings - Fork 584
/
Copy pathbasic_reporting.py
152 lines (132 loc) · 5.08 KB
/
basic_reporting.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import csv
import io
from empire.server.common.empire import MainMenu
from empire.server.common.plugins import Plugin
from empire.server.core.db import models
from empire.server.core.db.models import PluginTaskStatus
from empire.server.core.plugin_service import PluginService
class Plugin(Plugin):
def onLoad(self):
self.info = {
"Name": "basic_reporting",
"Authors": [
{
"Name": "Vincent Rose",
"Handle": "@vinnybod",
"Link": "https://github.com/vinnybod",
}
],
"Description": "Generates credentials.csv, sessions.csv, and master.log. Writes to server/data directory.",
"Software": "",
"Techniques": [],
"Comments": [],
}
self.options = {
"report": {
"Description": "Reports to generate.",
"Required": True,
"Value": "all",
"SuggestedValues": ["session", "credential", "log", "all"],
"Strict": True,
}
}
self.install_path = ""
def execute(self, command, **kwargs):
"""
Parses commands from the API
"""
user = kwargs["user"]
db = kwargs["db"]
input = f'Generating reports for: {command["report"]}'
plugin_task = models.PluginTask(
plugin_id=self.info["Name"],
input=input,
input_full=input,
user_id=user.id,
status=PluginTaskStatus.completed,
)
output = ""
db_downloads = []
report = command["report"]
if report in ["session", "all"]:
db_download = self.session_report(db, user)
db_downloads.append(db_download)
output += f"[*] Session report generated to {db_download.location}\n"
if report in ["credential", "all"]:
db_download = self.credential_report(db, user)
db_downloads.append(db_download)
output += f"[*] Credential report generated to {db_download.location}\n"
if report in ["log", "all"]:
db_download = self.generate_report(db, user)
db_downloads.append(db_download)
output += f"[*] Log report generated to {db_download.location}\n"
output += "[*] Execution complete.\n"
plugin_task.output = output
plugin_task.downloads = db_downloads
db.add(plugin_task)
db.flush()
def register(self, mainMenu: MainMenu):
"""
Any modifications to the mainMenu go here - e.g.
registering functions to be run by user commands
"""
self.install_path = mainMenu.installPath
self.main_menu = mainMenu
self.plugin_service: PluginService = mainMenu.pluginsv2
def session_report(self, db, user):
out = io.StringIO()
writer = csv.writer(out)
writer.writerow(["SessionID", "Hostname", "User Name", "First Check-in"])
for row in db.query(models.Agent).all():
writer.writerow(
[row.session_id, row.hostname, row.username, row.firstseen_time]
)
output_str = out.getvalue()
db_download = self.main_menu.downloadsv2.create_download_from_text(
db, user, output_str, "sessions.csv", "basic_reporting"
)
return db_download
def credential_report(self, db, user):
out = io.StringIO()
writer = csv.writer(out)
writer.writerow(["Domain", "Username", "Host", "Cred Type", "Password"])
for row in db.query(models.Credential).all():
writer.writerow(
[row.domain, row.username, row.host, row.credtype, row.password]
)
output_str = out.getvalue()
db_download = self.main_menu.downloadsv2.create_download_from_text(
db, user, output_str, "credentials.csv", "basic_reporting"
)
return db_download
def generate_report(self, db, user):
out = io.StringIO()
out.write("Empire Master Taskings & Results Log by timestamp\n")
out.write("=" * 50 + "\n\n")
for row in db.query(models.AgentTask).all():
row: models.AgentTask
username = row.user.username if row.user else "None"
out.write(
f"\n{xstr(row.created_at)} - {xstr(row.id)} ({xstr(row.agent_id)})> "
f"{xstr(username)}\n {xstr(row.input)}\n {xstr(row.output)}\n"
)
output_str = out.getvalue()
db_download = self.main_menu.downloadsv2.create_download_from_text(
db, user, output_str, "master.log", "basic_reporting"
)
return db_download
def shutdown(self):
"""
Kills additional processes that were spawned
"""
# If the plugin spawns a process provide a shutdown method for when Empire exits else leave it as pass
pass
def xstr(s):
"""
Safely cast to a string with a handler for None
"""
if s is None:
return ""
if isinstance(s, bytes):
return s.decode("utf-8")
return str(s)