forked from TikhonJelvis/RL-book
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_approximate_dynamic_programming.py
111 lines (90 loc) · 3.5 KB
/
test_approximate_dynamic_programming.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
from numpy.testing import assert_allclose
import unittest
from rl.approximate_dynamic_programming import (evaluate_mrp,
evaluate_finite_mrp)
from rl.distribution import Categorical, Choose
from rl.finite_horizon import (finite_horizon_MRP, evaluate,
unwrap_finite_horizon_MRP, WithTime)
from rl.function_approx import Dynamic
from rl.markov_process import FiniteMarkovRewardProcess, NonTerminal
import rl.iterate as iterate
class FlipFlop(FiniteMarkovRewardProcess[bool]):
'''A version of FlipFlop implemented with the FiniteMarkovProcess
machinery.
'''
def __init__(self, p: float):
transition_reward_map = {
b: Categorical({(not b, 2.0): p, (b, 1.0): 1 - p})
for b in (True, False)
}
super().__init__(transition_reward_map)
class TestEvaluate(unittest.TestCase):
def setUp(self):
self.finite_flip_flop = FlipFlop(0.7)
def test_evaluate_finite_mrp(self):
start = Dynamic({s: 0.0 for s in
self.finite_flip_flop.non_terminal_states})
v = iterate.converged(
evaluate_finite_mrp(
self.finite_flip_flop,
γ=0.99,
approx_0=start
),
done=lambda a, b: a.within(b, 1e-4)
)
self.assertEqual(len(v.values_map), 2)
for s in v.values_map:
self.assertLess(abs(v(s) - 170), 0.1)
def test_evaluate_mrp(self):
start = Dynamic({s: 0.0 for s in
self.finite_flip_flop.non_terminal_states})
v = iterate.converged(
evaluate_mrp(
self.finite_flip_flop,
γ=0.99,
approx_0=start,
non_terminal_states_distribution=Choose(
self.finite_flip_flop.non_terminal_states
),
num_state_samples=5
),
done=lambda a, b: a.within(b, 1e-4)
)
self.assertEqual(len(v.values_map), 2)
for s in v.values_map:
self.assertLess(abs(v(s) - 170), 1.0)
v_finite = iterate.converged(
evaluate_finite_mrp(
self.finite_flip_flop,
γ=0.99,
approx_0=start
),
done=lambda a, b: a.within(b, 1e-4)
)
assert_allclose(v.evaluate([NonTerminal(True), NonTerminal(False)]),
v_finite.evaluate([NonTerminal(True),
NonTerminal(False)]),
rtol=0.01)
def test_compare_to_backward_induction(self):
finite_horizon = finite_horizon_MRP(self.finite_flip_flop, 10)
start = Dynamic({s: 0.0 for s in finite_horizon.non_terminal_states})
v = iterate.converged(
evaluate_finite_mrp(
finite_horizon,
γ=1,
approx_0=start
),
done=lambda a, b: a.within(b, 1e-4)
)
self.assertEqual(len(v.values_map), 20)
finite_v =\
list(evaluate(unwrap_finite_horizon_MRP(finite_horizon), gamma=1))
for time in range(10):
self.assertAlmostEqual(
v(NonTerminal(WithTime(state=True, time=time))),
finite_v[time][NonTerminal(True)]
)
self.assertAlmostEqual(
v(NonTerminal(WithTime(state=False, time=time))),
finite_v[time][NonTerminal(False)]
)