-
Notifications
You must be signed in to change notification settings - Fork 3
/
monitor_pico.py
177 lines (151 loc) · 5.58 KB
/
monitor_pico.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# monitor_pico.py
# Runs on a Raspberry Pico board to receive data from monitor.py
# Firmware should be V1.20 or later
# Copyright (c) 2021-2024 Peter Hinch
# Released under the MIT License (MIT) - see LICENSE file
# Device gets a single ASCII byte defining the pin number and whether
# to increment (uppercase) or decrement (lowercase) the use count.
# Pin goes high if use count > 0 else low.
# incoming numbers are 0..21 which map onto 22 GPIO pins
import rp2
from machine import UART, Pin, Timer, freq
from time import ticks_ms, ticks_diff
freq(250_000_000)
# ****** SPI support ******
@rp2.asm_pio(autopush=True, in_shiftdir=rp2.PIO.SHIFT_LEFT, push_thresh=8)
def spi_in():
label("escape")
set(x, 0)
mov(isr, x) # Zero after DUT crash
wrap_target()
wait(1, pins, 2) # CS/ False
wait(0, pins, 2) # CS/ True
set(x, 7)
label("bit")
wait(0, pins, 1)
wait(1, pins, 1)
in_(pins, 1)
jmp(pin, "escape") # DUT crashed. On restart it sends a char with CS high.
jmp(x_dec, "bit") # Post decrement
wrap()
class PIOSPI:
def __init__(self):
self._sm = rp2.StateMachine(
0,
spi_in,
in_shiftdir=rp2.PIO.SHIFT_LEFT,
push_thresh=8,
in_base=Pin(0),
jmp_pin=Pin(2, Pin.IN, Pin.PULL_UP),
)
self._sm.active(1)
# Blocking read of 1 char. Returns ord(ch). If DUT crashes, worst case
# is where CS is left low. SM will hang until user restarts. On restart
# the app
def read(self):
return self._sm.get() & 0xFF
# ****** Define pins ******
# Valid GPIO pins
# GPIO 0,1,2 are for interface so pins are 3..22, 26..27
PIN_NOS = list(range(3, 23)) + list(range(26, 28))
# Index is incoming ID
# contents [Pin, instance_count, verbose]
pins = []
for pin_no in PIN_NOS:
pins.append([Pin(pin_no, Pin.OUT), 0, False])
# ****** Timing *****
pin_t = Pin(28, Pin.OUT)
def _cb(_):
pin_t(1)
print("Timeout.")
pin_t(0)
tim = Timer()
# ****** Monitor ******
SOON = const(0)
LATE = const(1)
MAX = const(2)
WIDTH = const(3)
# Modes. Pulses and reports only occur if an outage exceeds the threshold.
# SOON: pulse early when timer times out. Report at outage end.
# LATE: pulse when outage ends. Report at outage end.
# MAX: pulse when outage exceeds prior maximum. Report only in that instance.
# WIDTH: for measuring time between arbitrary points in code. When duration
# between 0x40 and 0x60 exceeds previous max, pulse and report.
# native reduced latency to 10μs
@micropython.native
def run(period=100, verbose=(), device="uart", vb=True):
if isinstance(period, int):
t_ms = period
mode = SOON
else:
t_ms, mode = period
if mode not in (SOON, LATE, MAX, WIDTH):
raise ValueError("Invalid mode.")
for x in verbose:
pins[x][2] = True
# A device must support a blocking read.
if device == "uart":
uart = UART(0, 1_000_000) # rx on GPIO 1
def read():
while not uart.any(): # Prevent UART timeouts
pass
return ord(uart.read(1))
elif device == "spi":
pio = PIOSPI()
def read():
return pio.read()
else:
raise ValueError("Unsupported device:", device)
vb and print("Awaiting communication.")
h_max = 0 # Max hog duration (ms)
h_start = -1 # Absolute hog start time: invalidate.
while True:
if x := read(): # Get an initial 0 on UART
tarr = ticks_ms() # Arrival time
if x == 0x7A: # Init: program under test has restarted
vb and print("Got communication.")
h_max = 0 # Restart timing
h_start = -1
for pin in pins:
pin[0](0) # Clear pin
pin[1] = 0 # and instance counter
continue
if mode == WIDTH:
if x == 0x40: # Leading edge on ident 0
h_start = tarr
elif x == 0x60 and h_start != -1: # Trailing edge
dt = ticks_diff(tarr, h_start)
if dt > t_ms and dt > h_max:
h_max = dt
print(f"Max width {dt}ms")
pin_t(1)
pin_t(0)
elif x == 0x40: # hog_detect task has started.
if mode == SOON: # Pulse on absence of activity
tim.init(period=t_ms, mode=Timer.ONE_SHOT, callback=_cb)
if h_start != -1: # There was a prior trigger
dt = ticks_diff(tarr, h_start)
if dt > t_ms: # Delay exceeds threshold
if mode != MAX:
print(f"Hog {dt}ms")
if mode == LATE:
pin_t(1)
pin_t(0)
if dt > h_max:
h_max = dt
print(f"Max hog {dt}ms")
if mode == MAX:
pin_t(1)
pin_t(0)
h_start = tarr
p = pins[x & 0x1F] # Key: 0x40 (ord('@')) is pin ID 0
if x & 0x20: # Going down
if p[1] > 0: # Might have restarted this script with a running client.
p[1] -= 1 # or might have sent trig(False) before True.
if not p[1]: # Instance count is zero
p[0](0)
else:
p[0](1)
p[1] += 1
if p[2]:
print(f"ident {i} count {p[1]}")