-
Notifications
You must be signed in to change notification settings - Fork 569
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a Feature Extractor for the Drakvuf Sandbox #2143
Changes from all commits
a408629
603d623
90ef348
1e8735a
d2cdccf
840f59f
9e13362
2e408d8
a73d16f
b28e0d0
c05b973
70d03eb
8d4f3c7
bf12ce8
84d68a4
00349d5
53439c7
2663fa6
3bea6e7
15a5efd
0c0c4d0
04ae280
e54f38f
cb7babc
5284ec0
21d50e0
885f216
3b2b022
1e4ed12
b7f4058
0f1750c
4749f24
37f82cb
c45aaa0
aeea39b
9b5dffc
c862f12
cea64d3
718d6ff
32c7a53
7248c0a
de43d1e
3cd5cde
454cd2d
f9d5c4a
6617fc0
8e7bc75
93240f5
c08c5bf
6e0a9eb
2bb7f3c
c0e9150
897e98b
e786552
4cab975
2576aa1
b5047a2
e26072e
d9e3ca1
3e3be41
729679d
3fb0eaf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved. | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at: [package root]/LICENSE.txt | ||
# Unless required by applicable law or agreed to in writing, software distributed under the License | ||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and limitations under the License. | ||
|
||
import logging | ||
from typing import Tuple, Iterator | ||
|
||
from capa.features.insn import API, Number | ||
from capa.features.common import String, Feature | ||
from capa.features.address import Address | ||
from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle | ||
from capa.features.extractors.drakvuf.models import Call | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def extract_call_features(ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -> Iterator[Tuple[Feature, Address]]: | ||
""" | ||
This method extracts the given call's features (such as API name and arguments), | ||
and returns them as API, Number, and String features. | ||
|
||
args: | ||
ph: process handle (for defining the extraction scope) | ||
th: thread handle (for defining the extraction scope) | ||
ch: call handle (for defining the extraction scope) | ||
|
||
yields: | ||
Feature, address; where Feature is either: API, Number, or String. | ||
""" | ||
call: Call = ch.inner | ||
|
||
# list similar to disassembly: arguments right-to-left, call | ||
yelhamer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for arg_value in reversed(call.arguments.values()): | ||
try: | ||
yield Number(int(arg_value, 0)), ch.address | ||
except ValueError: | ||
# DRAKVUF automatically resolves the contents of memory addresses, (e.g. Arg1="0xc6f217efe0:\"ntdll.dll\""). | ||
# For those cases we yield the entire string as it, since yielding the address only would | ||
# likely not provide any matches, and yielding just the memory contentswould probably be misleading, | ||
# but yielding the entire string would be helpful for an analyst looking at the verbose output | ||
yield String(arg_value), ch.address | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again, if I understand the code correctly, and this iterates over arguments from apimon, arg_value won't be a string. Instead, parsed values look like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @yelhamer please comment or address There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm now yielding the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @yelhamer can you show some examples from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @williballenthin I meant that for
With this in mind I think I might just revert to just yielding |
||
|
||
yield API(call.name), ch.address | ||
|
||
|
||
def extract_features(ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -> Iterator[Tuple[Feature, Address]]: | ||
for handler in CALL_HANDLERS: | ||
for feature, addr in handler(ph, th, ch): | ||
yield feature, addr | ||
|
||
|
||
CALL_HANDLERS = (extract_call_features,) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved. | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at: [package root]/LICENSE.txt | ||
# Unless required by applicable law or agreed to in writing, software distributed under the License | ||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and limitations under the License. | ||
|
||
import logging | ||
from typing import Dict, List, Tuple, Union, Iterator | ||
|
||
import capa.features.extractors.drakvuf.call | ||
import capa.features.extractors.drakvuf.file | ||
import capa.features.extractors.drakvuf.thread | ||
import capa.features.extractors.drakvuf.global_ | ||
import capa.features.extractors.drakvuf.process | ||
from capa.features.common import Feature, Characteristic | ||
from capa.features.address import NO_ADDRESS, Address, ThreadAddress, ProcessAddress, AbsoluteVirtualAddress, _NoAddress | ||
from capa.features.extractors.base_extractor import ( | ||
CallHandle, | ||
SampleHashes, | ||
ThreadHandle, | ||
ProcessHandle, | ||
DynamicFeatureExtractor, | ||
) | ||
from capa.features.extractors.drakvuf.models import Call, DrakvufReport | ||
from capa.features.extractors.drakvuf.helpers import index_calls | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class DrakvufExtractor(DynamicFeatureExtractor): | ||
def __init__(self, report: DrakvufReport): | ||
super().__init__( | ||
# DRAKVUF currently does not yield hash information about the sample in its output | ||
yelhamer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
hashes=SampleHashes(md5="", sha1="", sha256="") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should these be blank or contain an indication that this is not available/provided by the sandbox? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm I'm unsure. CAPE's extractor had one of them empty (since it doesn't report it) so I just did the same here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, it's a shame no hash at all is available... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, unfortunately DRAKVUF is primarily a full VM monitor. In DRAKVUF sandbox it's (ab)used to function as a malware sandbox, but drakmon.log is the output directly from DRAKVUF. Which is good! It makes this integration more generic (works with DRAKVUF, not just with DRAKVUF sandbox). But that purpose mismatch causes glitches like this. I think it's possible to send a PR to DRAKVUF that adds logging of sample hashes to the DRAKVUF's |
||
) | ||
|
||
self.report: DrakvufReport = report | ||
|
||
# sort the api calls to prevent going through the entire list each time | ||
self.sorted_calls: Dict[ProcessAddress, Dict[ThreadAddress, List[Call]]] = index_calls(report) | ||
|
||
# pre-compute these because we'll yield them at *every* scope. | ||
self.global_features = list(capa.features.extractors.drakvuf.global_.extract_features(self.report)) | ||
|
||
def get_base_address(self) -> Union[AbsoluteVirtualAddress, _NoAddress, None]: | ||
# DRAKVUF currently does not yield information about the PE's address | ||
return NO_ADDRESS | ||
|
||
def extract_global_features(self) -> Iterator[Tuple[Feature, Address]]: | ||
yield from self.global_features | ||
|
||
def extract_file_features(self) -> Iterator[Tuple[Feature, Address]]: | ||
yield from capa.features.extractors.drakvuf.file.extract_features(self.report) | ||
|
||
def get_processes(self) -> Iterator[ProcessHandle]: | ||
yield from capa.features.extractors.drakvuf.file.get_processes(self.sorted_calls) | ||
|
||
def extract_process_features(self, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]: | ||
yield from capa.features.extractors.drakvuf.process.extract_features(ph) | ||
|
||
def get_process_name(self, ph: ProcessHandle) -> str: | ||
return ph.inner["process_name"] | ||
|
||
def get_threads(self, ph: ProcessHandle) -> Iterator[ThreadHandle]: | ||
yield from capa.features.extractors.drakvuf.process.get_threads(self.sorted_calls, ph) | ||
|
||
def extract_thread_features(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]: | ||
if False: | ||
# force this routine to be a generator, | ||
# but we don't actually have any elements to generate. | ||
williballenthin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
yield Characteristic("never"), NO_ADDRESS | ||
return | ||
yelhamer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def get_calls(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[CallHandle]: | ||
yield from capa.features.extractors.drakvuf.thread.get_calls(self.sorted_calls, ph, th) | ||
|
||
def get_call_name(self, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) -> str: | ||
call: Call = ch.inner | ||
call_name = "{}({}){}".format( | ||
call.name, | ||
", ".join(f"{arg_name}={arg_value}" for arg_name, arg_value in call.arguments.items()), | ||
(f" -> {getattr(call, 'return_value', '')}"), # SysCalls don't have a return value, while WinApi calls do | ||
) | ||
return call_name | ||
|
||
def extract_call_features( | ||
self, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle | ||
) -> Iterator[Tuple[Feature, Address]]: | ||
yield from capa.features.extractors.drakvuf.call.extract_features(ph, th, ch) | ||
|
||
@classmethod | ||
def from_report(cls, report: Iterator[Dict]) -> "DrakvufExtractor": | ||
dr = DrakvufReport.from_raw_report(report) | ||
return DrakvufExtractor(report=dr) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved. | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at: [package root]/LICENSE.txt | ||
# Unless required by applicable law or agreed to in writing, software distributed under the License | ||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and limitations under the License. | ||
|
||
import logging | ||
from typing import Dict, List, Tuple, Iterator | ||
|
||
from capa.features.file import Import | ||
from capa.features.common import Feature | ||
from capa.features.address import Address, ThreadAddress, ProcessAddress, AbsoluteVirtualAddress | ||
from capa.features.extractors.helpers import generate_symbols | ||
from capa.features.extractors.base_extractor import ProcessHandle | ||
from capa.features.extractors.drakvuf.models import Call, DrakvufReport | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def get_processes(calls: Dict[ProcessAddress, Dict[ThreadAddress, List[Call]]]) -> Iterator[ProcessHandle]: | ||
""" | ||
Get all the created processes for a sample. | ||
""" | ||
for proc_addr, calls_per_thread in calls.items(): | ||
sample_call = next(iter(calls_per_thread.values()))[0] # get process name | ||
mr-tz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
yield ProcessHandle(proc_addr, inner={"process_name": sample_call.process_name}) | ||
|
||
|
||
def extract_import_names(report: DrakvufReport) -> Iterator[Tuple[Feature, Address]]: | ||
""" | ||
Extract imported function names. | ||
""" | ||
if report.loaded_dlls is None: | ||
Comment on lines
+33
to
+35
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. loaded DLLs means something else to me than imports - do they mean the same thing here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My understanding from reading the comments on the relevant drakvuf source code is that the output of this plugin includes imported functions from DLLs loaded by the PE loader, as well as the ones that might be dynamically loaded by a process. I think this because the comments say that they are hooking some windows system calls in order to do this (I believe?), and if this is the case then I feel like this plugin is providing an extensive list of imports which includes static ones as well as dynamic ones that malware might try to load discretely which is why I added this here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you please add this documentation to the code? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, I just noticed that Drakvuf reports the imported functions for each process. Should I extract the imported functions in the process scope instead? this way if a user is analyzing only a specific process then they wouldn't get false results from an import originating from another process. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the file scope extractors we're only interested in the imports of the target file. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an artifact of the static analysis module and likely differs in dynamic analysis and across sandboxes - so maybe we need a new way to handle these? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This thread needs to be resolved. At the very least, I think we should only yield the imports for the input file. Optionally, if we can come up with some good motivation and test cases, then we could also extend the sandbox extractor API to cover the recursively imported DLLs/names. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can confirm that DRAKVUF outputs only execution trace (including loaded DLLs and imported functions) and doesn't concern itself with static analysis. Can I help with resolving it somehow? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @yelhamer please emit only the import names from the target DLL or none at all. I understand that there's maybe another way to interpret these imports (such as all imports seen in the address space), but this would be inconsistent with other feature extractors, and will be difficult to keep straight and reason about. I suspect that these import features won't be commonly used, so emitting none at all is usually going to be fine. If we can come up with some specific problematic cases, then we can reassess. |
||
return | ||
dlls = report.loaded_dlls | ||
|
||
for dll in dlls: | ||
dll_base_name = dll.name.split("\\")[-1] | ||
for function_name, function_address in dll.imports.items(): | ||
for name in generate_symbols(dll_base_name, function_name, include_dll=True): | ||
yield Import(name), AbsoluteVirtualAddress(function_address) | ||
|
||
|
||
def extract_features(report: DrakvufReport) -> Iterator[Tuple[Feature, Address]]: | ||
for handler in FILE_HANDLERS: | ||
for feature, addr in handler(report): | ||
yield feature, addr | ||
|
||
|
||
FILE_HANDLERS = ( | ||
# TODO(yelhamer): extract more file features from other DRAKVUF plugins | ||
# https://github.com/mandiant/capa/issues/2169 | ||
extract_import_names, | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved. | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at: [package root]/LICENSE.txt | ||
# Unless required by applicable law or agreed to in writing, software distributed under the License | ||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and limitations under the License. | ||
|
||
import logging | ||
from typing import Tuple, Iterator | ||
|
||
from capa.features.common import OS, FORMAT_PE, ARCH_AMD64, OS_WINDOWS, Arch, Format, Feature | ||
from capa.features.address import NO_ADDRESS, Address | ||
from capa.features.extractors.drakvuf.models import DrakvufReport | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def extract_format(report: DrakvufReport) -> Iterator[Tuple[Feature, Address]]: | ||
# DRAKVUF sandbox currently supports only Windows as the guest: https://drakvuf-sandbox.readthedocs.io/en/latest/usage/getting_started.html | ||
yield Format(FORMAT_PE), NO_ADDRESS | ||
|
||
|
||
def extract_os(report: DrakvufReport) -> Iterator[Tuple[Feature, Address]]: | ||
# DRAKVUF sandbox currently supports only PE files: https://drakvuf-sandbox.readthedocs.io/en/latest/usage/getting_started.html | ||
yield OS(OS_WINDOWS), NO_ADDRESS | ||
|
||
|
||
def extract_arch(report: DrakvufReport) -> Iterator[Tuple[Feature, Address]]: | ||
# DRAKVUF sandbox currently supports only x64 Windows as the guest: https://drakvuf-sandbox.readthedocs.io/en/latest/usage/getting_started.html | ||
yield Arch(ARCH_AMD64), NO_ADDRESS | ||
|
||
|
||
def extract_features(report: DrakvufReport) -> Iterator[Tuple[Feature, Address]]: | ||
for global_handler in GLOBAL_HANDLER: | ||
for feature, addr in global_handler(report): | ||
yield feature, addr | ||
|
||
|
||
GLOBAL_HANDLER = ( | ||
mr-tz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
extract_format, | ||
extract_os, | ||
extract_arch, | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved. | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at: [package root]/LICENSE.txt | ||
# Unless required by applicable law or agreed to in writing, software distributed under the License | ||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and limitations under the License. | ||
|
||
import itertools | ||
from typing import Dict, List | ||
|
||
from capa.features.address import ThreadAddress, ProcessAddress | ||
from capa.features.extractors.drakvuf.models import Call, DrakvufReport | ||
|
||
|
||
def index_calls(report: DrakvufReport) -> Dict[ProcessAddress, Dict[ThreadAddress, List[Call]]]: | ||
# this method organizes calls into processes and threads, and then sorts them based on | ||
# timestamp so that we can address individual calls per index (CallAddress requires call index) | ||
result: Dict[ProcessAddress, Dict[ThreadAddress, List[Call]]] = {} | ||
for call in itertools.chain(report.syscalls, report.apicalls): | ||
if call.pid == 0: | ||
# DRAKVUF captures api/native calls from all processes running on the system. | ||
# we ignore the pid 0 since it's a system process and it's unlikely for it to | ||
# be hijacked or so on, in addition to capa addresses not supporting null pids | ||
yelhamer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
continue | ||
proc_addr = ProcessAddress(pid=call.pid, ppid=call.ppid) | ||
thread_addr = ThreadAddress(process=proc_addr, tid=call.tid) | ||
if proc_addr not in result: | ||
result[proc_addr] = {} | ||
if thread_addr not in result[proc_addr]: | ||
result[proc_addr][thread_addr] = [] | ||
|
||
result[proc_addr][thread_addr].append(call) | ||
|
||
for proc, threads in result.items(): | ||
for thread in threads: | ||
result[proc][thread].sort(key=lambda call: call.timestamp) | ||
|
||
return result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great!