forked from adangert/JoustMania
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathplayer.py
232 lines (197 loc) · 7.48 KB
/
player.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
import abc
import asyncio
import collections
import enum
import functools
import itertools
import math
import time
import typing
import psmove
import common
from numpy import linalg
NUM_WARNING_FLASHES=5
WARNING_FLASH_DURATION=0.1
RAINBOW_PHASE_DURATION=0.1
class EventType(enum.Flag):
SENSOR = enum.auto()
BUTTON_DOWN = enum.auto()
BUTTON_UP = enum.auto()
# TODO: Add trigger events
class ControllerEvent(abc.ABC):
@abc.abstractproperty
def type(self): raise NotImplemented()
class SensorEvent(ControllerEvent):
"""Base class for controller events."""
def __init__(self, acceleration, jerk, gyroscope):
self.acceleration = acceleration
self.jerk = jerk
self.gyroscope = gyroscope
@property
def type(self):
return EventType.SENSOR
@property
def acceleration_magnitude(self):
return linalg.norm(self.acceleration)
@property
def jerk_magnitude(self):
return linalg.norm(self.jerk)
class ButtonDownEvent(ControllerEvent):
"""Sent when a player first presses a button."""
def __init__(self, button: common.Button):
self.button = button
@property
def type(self):
return EventType.BUTTON_DOWN
class ButtonUpEvent(ControllerEvent):
"""Sent when a player first releases a button."""
def __init__(self, player, button: common.Button):
super().__init__(player)
self.button = button
@property
def type(self):
return EventType.BUTTON_UP
class ControllerState:
"""The state of inputs on a controller at one point in time."""
def __init__(self, move):
self.buttons = common.Button(move.get_buttons())
self.trigger = move.get_trigger()
self.acceleration = tuple(move.get_accelerometer_frame(psmove.Frame_SecondHalf))
self.gyroscope = tuple(move.get_gyroscope_frame(psmove.Frame_SecondHalf))
self.ts = time.time()
@property
def acceleration_magnitude(self):
return math.sqrt(sum([ v*v for v in self.acceleration ]))
def get_events_from_state_diff(self, prev_state):
acc_diff = map(lambda a, b: a - b, self.acceleration, prev_state.acceleration)
time_diff = self.ts - prev_state.ts
jerk = tuple([ e / time_diff for e in acc_diff ])
yield SensorEvent(self.acceleration, jerk, self.gyroscope)
new_buttons = self.buttons & (~prev_state.buttons)
for button in common.Button:
if button in new_buttons:
yield ButtonDownEvent(button)
released_buttons = prev_state.buttons & (~self.buttons)
for button in common.Button:
if button in released_buttons:
yield ButtonDownEvent(button)
# TODO: Break this out into a util library if it seems useful.
def with_lock(lock):
"""Decorator that makes a coroutine hold a lock during execution"""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
async with lock:
return await func(*args, **kwargs)
return wrapper
return decorator
class Player:
def __init__(self, move):
self.move_ = move
self.color_ = common.Color.WHITE
self.effect_lock_ = asyncio.Lock()
self.warn_ = None
self.effect_ = None
self.previous_state_ = ControllerState(move)
self.flush_events_()
def flush_events_(self):
#why do we ever want to flush the events????
#it seems like this is wasted data?
while self.move_.poll(): pass
def get_events(self) -> typing.Iterator[ControllerEvent]:
"""Returns an iterator over events currently pending on the controller."""
self.flush_events_()
state = ControllerState(self.move_)
for ev in state.get_events_from_state_diff(self.previous_state_):
ev.player = self
yield ev
self.previous_state_ = state
# TODO: The moves need to be occasionally prodded to keep their leds lit.
# If we make the piparty loop async, move this logic in there as a task.
self.move_.update_leds()
def set_player_color(self, color: common.Color):
"""Set's the player's color -- this is the default color we return to during play."""
self.color_ = color
self.set_color_(color)
def set_color_(self, color: common.Color):
"""Changes the controller's indicator to the specified color."""
self.move_.set_leds(*color.rgb_bytes())
self.move_.update_leds()
def set_rumble(self, value):
self.move_.set_rumble(value)
# This is apparently needed to flush the instruction out.
self.move_.update_leds()
def set_effect_(self, future):
self.effect_ = asyncio.ensure_future(future)
return self.effect_
def cancel_effect(self):
if self.effect_ and not self.effect_.done():
self.effect_.cancel()
def warn(self):
"""Issues a warning to the player."""
if self.warn_:
return
@with_lock(self.effect_lock_)
async def run():
try:
for i in range(NUM_WARNING_FLASHES):
self.set_color_(common.Color.BLACK)
self.set_rumble(90)
await asyncio.sleep(WARNING_FLASH_DURATION)
self.set_color_(self.color_)
self.set_rumble(0)
await asyncio.sleep(WARNING_FLASH_DURATION)
finally:
self.set_color_(self.color_)
self.set_rumble(0)
self.warn_ = None
self.warn_ = self.set_effect_(run())
def show_rainbow(self, duration_seconds: float):
"""Shows the victory rainbow."""
if self.warn_:
self.warn_.cancel()
@with_lock(self.effect_lock_)
async def cycle_colors():
try:
for color in itertools.cycle(common.PLAYER_COLORS):
self.set_color_(color)
await asyncio.sleep(RAINBOW_PHASE_DURATION)
finally:
self.set_color_(self.color_)
async def run():
try:
await asyncio.wait_for(cycle_colors(), duration_seconds)
except asyncio.TimeoutError:
pass
return self.set_effect_(run())
def show_death(self):
"""Lets the player know they have died."""
if self.warn_:
self.warn_.cancel()
@with_lock(self.effect_lock_)
async def run():
try:
self.set_rumble(110)
self.set_color_(common.Color.RED)
await asyncio.sleep(3)
finally:
self.set_color_(common.Color.BLACK)
self.set_rumble(0)
self.set_effect_(run())
def __str__(self):
return '<Player %s %s>' % (self.move_, self.color_)
class PlayerCollection:
"""The set of players in a round of the game."""
def __init__(self, players):
self.players = players
self.active_players = set(players)
def kill_player(self, player: Player):
self.active_players.remove(player)
return player.show_death()
def active_player_events(self, event_type: EventType):
# consider randomizing this so players don't get an advantage by being first in the list.
for player in list(self.active_players):
yield from (e for e in player.get_events() if e.type in event_type)
def cancel_effects(self):
for player in self.players:
player.cancel_effect()