-
Notifications
You must be signed in to change notification settings - Fork 1
/
sorel_net.py
110 lines (91 loc) · 3.92 KB
/
sorel_net.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import os
import sys
import torch
from torch import nn
import torch.nn.functional as F
import numpy as np
from malware_rl.envs.utils.ember import PEFeatureExtractor
# model_path = os.path.join(module_path, "sorelFFNN.pt")
class PENetwork(nn.Module):
"""
This is a simple network loosely based on the one used in
ALOHA: Auxiliary Loss Optimization for Hypothesis Augmentation (https://arxiv.org/abs/1903.05700)
Note that it uses fewer (and smaller) layers, as well as a single layer for all tag predictions,
performance will suffer accordingly.
"""
def __init__(self,use_malware=True,use_counts=True,use_tags=True,n_tags=None,feature_dimension=1024, layer_sizes = None):
self.use_malware=use_malware
self.use_counts=use_counts
self.use_tags=use_tags
self.n_tags = n_tags
if self.use_tags and self.n_tags == None:
raise ValueError("n_tags was None but we're trying to predict tags. Please include n_tags")
super(PENetwork,self).__init__()
p = 0.05
layers = []
if layer_sizes is None:layer_sizes=[512,512,128]
for i,ls in enumerate(layer_sizes):
if i == 0:
layers.append(nn.Linear(feature_dimension,ls))
else:
layers.append(nn.Linear(layer_sizes[i-1],ls))
layers.append(nn.LayerNorm(ls))
layers.append(nn.ELU())
layers.append(nn.Dropout(p))
self.model_base = nn.Sequential(*tuple(layers))
self.malware_head = nn.Sequential(nn.Linear(layer_sizes[-1], 1),
nn.Sigmoid())
self.count_head = nn.Linear(layer_sizes[-1], 1)
self.sigmoid = nn.Sigmoid()
if self.use_tags:
self.tag_head = nn.Sequential(nn.Linear(layer_sizes[-1],64),
nn.ELU(),
nn.Linear(64,64),
nn.ELU(),
nn.Linear(64,n_tags),
nn.Sigmoid())
def forward(self,data):
rv = {}
base_result = self.model_base.forward(data)
if self.use_malware:
rv['malware'] = self.malware_head(base_result)
if self.use_counts:
rv['count'] = self.count_head(base_result)
if self.use_tags:
rv['tags'] = self.tag_head(base_result)
return rv
class SorelFFNN:
def __init__(self, model_file):
self.model = PENetwork(use_malware=True, use_counts=False, use_tags=True, n_tags=11,
feature_dimension=2381)
self.model.load_state_dict(torch.load(model_file))
self.model.eval()
self.feature_version = 2
self.extractor = PEFeatureExtractor(self.feature_version)
def features_postproc_func(self, x):
x1 = np.copy(x)
lz = x1 < 0
gz = x1 > 0
x1[lz] = - np.log(1 - x1[lz])
x1[gz] = np.log(1 + x1[gz])
return x1
# def extract(self, bytez):
# extractor = PEFeatureExtractor(2)
# features = np.array(extractor.feature_vector(bytez), dtype=np.float32).reshape(1, -1)
# features = self.features_postproc_func(features)
# return features
# def predict(self, bytez):
# X = torch.from_numpy(bytez)
# predictions = self.model.forward(X)
# scores = predictions["malware"].detach().numpy().ravel()
# # print(scores.shape)
# return scores
def extract(self, bytez):
return np.array(self.extractor.feature_vector(bytez), dtype=np.float32)
def predict(self, features):
X = torch.from_numpy(self.features_postproc_func(features))
predictions = self.model(X)
# X = torch.from_numpy(bytez)
# predictions = self.model.forward(X)
scores = predictions["malware"].detach().numpy().ravel()
return scores