-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_frozen_lake.py
82 lines (73 loc) · 2.61 KB
/
test_frozen_lake.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import numpy as np
import random
import time
import gym
from gym import wrappers
def run_episode(env, policy, episode_len=100):
total_reward = 0
obs = env.reset()
for t in range(episode_len):
# env.render()
action = policy[obs]
obs, reward, done, _ = env.step(action)
total_reward += reward
if done:
# print('Epside finished after {} timesteps.'.format(t+1))
break
return total_reward
def evaluate_policy(env, policy, n_episodes=100):
total_rewards = 0.0
for _ in range(n_episodes):
total_rewards += run_episode(env, policy)
return total_rewards / n_episodes
def gen_random_policy():
return np.random.choice(4, size=((16)))
def crossover(policy1, policy2):
new_policy = policy1.copy()
for i in range(16):
rand = np.random.uniform()
if rand > 0.5:
new_policy[i] = policy2[i]
return new_policy
def mutation(policy, p=0.05):
new_policy = policy.copy()
for i in range(16):
rand = np.random.uniform()
if rand < p:
new_policy[i] = np.random.choice(4)
return new_policy
if __name__ == '__main__':
random.seed(1234)
np.random.seed(1234)
env = gym.make('FrozenLake-v0')
env.seed(0)
# env = wrappers.Monitor(env, '/tmp/frozenlake1', force=True)
## Policy search
n_policy = 100
n_steps = 20
start = time.time()
policy_pop = [gen_random_policy() for _ in range(n_policy)]
for idx in range(n_steps):
policy_scores = [evaluate_policy(env, p) for p in policy_pop]
print('Generation %d : max score = %0.2f' %(idx+1, max(policy_scores)))
policy_ranks = list(reversed(np.argsort(policy_scores)))
elite_set = [policy_pop[x] for x in policy_ranks[:5]]
select_probs = np.array(policy_scores) / np.sum(policy_scores)
child_set = [crossover(
policy_pop[np.random.choice(range(n_policy), p=select_probs)],
policy_pop[np.random.choice(range(n_policy), p=select_probs)])
for _ in range(n_policy - 5)]
mutated_list = [mutation(p) for p in child_set]
policy_pop = elite_set
policy_pop += mutated_list
policy_score = [evaluate_policy(env, p) for p in policy_pop]
best_policy = policy_pop[np.argmax(policy_score)]
end = time.time()
print('Best policy score = %0.2f. Time taken = %4.4f'
%(np.max(policy_score), (end-start)))
## Evaluation
env = wrappers.Monitor(env, '/tmp/frozenlake1', force=True)
for _ in range(200):
run_episode(env, best_policy)
env.close()
gym.upload('/tmp/frozenlake1', api_key=...)