-
Notifications
You must be signed in to change notification settings - Fork 1
/
ppo.py
100 lines (82 loc) · 3.09 KB
/
ppo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import os
import random
import sys
import argparse
import gym
import numpy as np
from gym import wrappers
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env
import torch as th
import malware_rl
parser = argparse.ArgumentParser()
parser.add_argument('--target', choices=['ember', 'sorel', 'sorelFFNN', 'AV1'], default='ember', help='target to test')
parser.add_argument('--seed', type=int, default=26871, help='random seed')
parser.add_argument('--num-episodes', type=int, default=300, help='number of episodes to run')
parser.add_argument('--num-queries', type=int, default=4096, help='number of queries to run')
args = parser.parse_args()
target = args.target
seed = args.seed
num_episodes = args.num_episodes
num_queries = args.num_queries
random.seed(seed)
np.random.seed(seed)
module_path = os.path.split(os.path.abspath(sys.modules[__name__].__file__))[0]
outdir = os.path.join(module_path, "data/logs/ppo-agent-results")
# Setting up the environment
# env = make_vec_env(f"{target}-train-v0", n_envs=1)
env = gym.make(f"{target}-train-v0")
env = wrappers.Monitor(env, directory=outdir, force=True)
env.seed(seed)
# Setting up testing parameters and holding variables
done = False
reward = 0
evasions = 0
evasion_history = {}
# Train the agent
policy_kwargs = dict(activation_fn=th.nn.Tanh,
net_arch=dict(pi=[64, 64], vf=[64, 64]))
agent = PPO("MlpPolicy",
env,
gamma=0.854,
n_epochs=10,
verbose=1,
n_steps=128,
learning_rate=0.00138,
max_grad_norm=0.4284,
tensorboard_log=f"./ppo_{target}_tensorboard/",
policy_kwargs=policy_kwargs)
# device='cpu')
# Total timesteps should be a multiple of envs*n_steps
agent.learn(total_timesteps=num_queries)
agent.save(f"saved_models/ppo-only-{target}-train-v0-{seed}")
# agent.load(f"saved_models/ppo-only-{target}-train-v0-{seed}")
print("[*] Evaluation phase begins")
eval_env = gym.make(f"{target}-test-v0")
eval_env = wrappers.Monitor(eval_env, directory=outdir, force=True)
eval_env.seed(seed)
# Test the agent in the eval environment
for i in range(num_episodes):
ob = eval_env.reset()
sha256 = eval_env.sha256
while True:
action, _ = agent.predict(ob, reward, done)
ob, reward, done, ep_history = eval_env.step(action)
if done and reward >= 10.0:
evasions += 1
evasion_history[sha256] = ep_history
break
elif done:
break
# Remove the skipped binaries
total_detected = (i+1) - eval_env.skipped
# Output metrics/evaluation stuff
evasion_rate = (evasions / total_detected) * 100
mean_action_count = np.mean(eval_env.get_episode_lengths())
print("History:", evasion_history)
print("Skipped episodes:", eval_env.skipped)
print("True number of episodes:", i+1)
print(f"{evasion_rate}% samples evaded model.")
print(f"Average of {mean_action_count} moves to evade model.")
print(f"Total steps in the training envirnoment: {env.get_total_steps()}")
print(f"Total steps in the eval envirnoment: {eval_env.get_total_steps()}")