-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathserver.py
executable file
·313 lines (270 loc) · 12.2 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
#!/usr/bin/env python3
from __future__ import annotations
import asyncio
import logging
import sys
from typing import Any, Callable, Coroutine, Iterable, Sequence
from decouple import UndefinedValueError, config
from pydantic import IPvAnyInterface, ValidationError
from scpi import Commands, split_line
from _version import __version__
from config_parser import parse_log_level, parse_wavemeter_config
from scpi_protocol import ScpiException, UnexpectedNumberOfParameterException, create_scpi_protocol
from wmControl.wavemeter import Wavemeter
from wmControl.wlmConst import WavemeterServerInitialized, WavemeterServerShutdown
dll_path = None
if sys.platform == "win32":
if config("CONNECTION_TYPE", default="REMOTE") == "LOCAL":
dll_path = "C:/Windows/System32/wlmData.dll"
else:
dll_path = "./wmControl/wlmData.dll"
elif sys.platform == "linux":
if config("CONNECTION_TYPE", default="REMOTE") == "LOCAL":
raise ValueError("Cannot connect locally using Linux.")
dll_path = "./wmControl/libwlmData.so"
async def read_stream(reader: asyncio.StreamReader, job_queue: asyncio.Queue[bytes]) -> None:
"""
Reads input from client out of stream.
Parameter
---------
reader: asyncio.StreamReader
Reader of client connection.
requests: janus.AsyncQueue
Queue receiving requests from stream.
"""
request: bytes
async for request in reader:
# Commands are separated by a newline
if not request:
# The client closed the connection
break
logging.getLogger(__name__).debug("Received '%s' from client.", request)
await job_queue.put(request)
async def write_stream(
writer: asyncio.StreamWriter, protocol: Commands, job_queue: asyncio.Queue[bytes], device_timeout: float
) -> None:
"""
Parses the SCPI request and replies if needed. This is the main worker, because it parses the SCPI request and does
the error handling.
Parameter
---------
writer: asyncio.StreamWriter
Writer of client connection.
measurements: janus.AsyncQueue
Queue holding the results.
"""
while "sending replies":
request = await job_queue.get()
try:
request_str = request.decode().rstrip()
except UnicodeDecodeError:
continue # TODO: reply with an error
# Try to decode SCPI request.
try:
scpi_requests = split_line(request_str)
except KeyError:
logging.getLogger(__name__).info("Received unknown request: '%s'.", request_str)
continue
# await requests.put('Received unknown command.')
# TODO: reply with an error
for scpi_request in scpi_requests:
try:
parsed_command = protocol[scpi_request.name]
except KeyError:
# TODO: reply with an error
logging.getLogger(__name__).info("Unknown request received: '%s'.", scpi_requests)
break
logging.getLogger(__name__).debug("Received SCPI request: %s", parsed_command.get("doc", parsed_command))
try:
function_call = parsed_command["get" if scpi_request.query else "set"]
except KeyError:
# TODO: reply with an error
continue
result: str
try:
try:
if scpi_request.args:
coro = function_call(parsed_command["decode"](scpi_request.args))
else:
coro = function_call()
except TypeError:
raise UnexpectedNumberOfParameterException from None
result = await coro
except ScpiException as exc:
# Return a SCPI error
writer.write(f"{exc}\n".encode())
continue
except TimeoutError:
logging.getLogger(__name__).debug("Timeout error while querying the wavemeter. Dropping request.")
break
if scpi_request.query:
writer.write((parsed_command["encode"](result) + "\n").encode())
await writer.drain()
def create_client_handler(
wavemeter: Wavemeter,
) -> Callable[[asyncio.StreamReader, asyncio.StreamWriter], Coroutine[Any, Any, None]]:
"""
A closure to inject the wavemeter into the client callback handler.
Parameters
----------
wavemeter: Wavemeter
The wavemeter managed by this handler
Returns
-------
Callable
The client_connected_cb callback that can be passed to asyncio.start_server().
"""
async def client_handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
"""
The client handler exclusively serves a single client and passes the commands to the wavemeter while returning
the results to the client. The input (reader) and output (writer) is separated by a queue to minimize any
lag introduced by delays with the wavemeter communication.
Parameter
---------
reader : asyncio.StreamReader
Handles the incoming traffic.
writer : asyncio.StreamWriter
Pushes the outbound traffic to the client.
"""
# Limit the size of the job queue to create backpressure on the input
job_queue: asyncio.Queue[bytes] = asyncio.Queue(maxsize=5)
pending_tasks: set[asyncio.Task] = set() # Set with TODOs.
# Read the inputs from the client
input_task = asyncio.create_task(read_stream(reader, job_queue=job_queue))
pending_tasks.add(input_task)
# Execute commands and send back the results
protocol = create_scpi_protocol(wavemeter)
publish = asyncio.create_task(write_stream(writer, protocol, job_queue, device_timeout=2.0))
pending_tasks.add(publish)
try:
while pending_tasks:
done: set[asyncio.Task]
done, pending_tasks = await asyncio.wait(pending_tasks, return_when=asyncio.FIRST_COMPLETED)
for completed_task in done:
try:
task_exc = completed_task.exception() # Raises a CancelledError if the task has been cancelled
except asyncio.exceptions.CancelledError:
# If the task was canceled, there is no further action needed.
continue
if task_exc is not None:
# An exception was raised. Terminate now.
logging.getLogger(__name__).error("Error while serving client.", exc_info=task_exc)
for pending_task in pending_tasks:
pending_task.cancel()
try:
await asyncio.gather(*pending_tasks)
except asyncio.CancelledError:
pass
finally:
logging.getLogger(__name__).debug("Shutting down client handler.")
# Cancel all remaining tasks
for pending_task in pending_tasks:
pending_task.cancel()
try:
await asyncio.gather(*pending_tasks)
except asyncio.CancelledError:
pass
finally:
try:
await writer.drain()
writer.close()
await asyncio.wait_for(writer.wait_closed(), timeout=1.0)
except OSError:
# Catches TimeoutErrors (from wait_for) and ConnectionErrors (from writer.drain())
pass
return client_handler
async def monitor_wavemeter(wavemeter: Wavemeter):
async for event in wavemeter.read_events():
if isinstance(event, WavemeterServerShutdown):
break
if isinstance(event, WavemeterServerInitialized) and event.value == 0:
break
async def create_wm_server(product_id: int, interface: str | Sequence[str] | None, port: int) -> None:
"""
Create a wavemeter SCPI server. The server listens at the given port and passes the commands to the wavemeter with
the given product_id.
Parameter
---------
product_id: int
Version of the WM. Works like a serial number just not named like it.
interface: str or Sequence[str] or None
The interface to listen on. If a str is given, the server is bound to that interface. If a sequence is given,
the server is bound to the interfaces given. If set to None, the server is bound to all available interfaces.
port: int
The port number to listen at.
"""
assert isinstance(port, int) and port > 0
while "running the server":
pending_tasks: set[asyncio.Task] = set()
async with Wavemeter(product_id, dll_path=dll_path) as wavemeter: # Activate wavemeter.
server = await asyncio.start_server(
client_connected_cb=create_client_handler(wavemeter), host=interface, port=port
)
monitor_task = asyncio.create_task(monitor_wavemeter(wavemeter))
pending_tasks.add(monitor_task)
async with server:
logging.getLogger(__name__).info(
"Serving wavemeter %i on %s",
wavemeter.product_id,
", ".join(f"{sock.getsockname()[0]}:{sock.getsockname()[1]}" for sock in server.sockets),
)
client_task = asyncio.create_task(server.serve_forever())
pending_tasks.add(client_task)
done, pending_tasks = await asyncio.wait(pending_tasks, return_when=asyncio.FIRST_COMPLETED)
for pending_task in pending_tasks:
pending_task.cancel()
try:
await asyncio.gather(*pending_tasks)
except asyncio.CancelledError:
pass
if monitor_task in done:
await asyncio.sleep(0.5)
logging.getLogger(__name__).info(
"Restarting the wavemeter GUI for wavemeter %i.", wavemeter.product_id
)
async def main(wavemeter_config: Iterable[tuple[int, IPvAnyInterface | Sequence[IPvAnyInterface] | None, int]]):
server_list: set[asyncio.Task] = set()
wavemeters_configured: set[int] = set()
logging.getLogger(__name__).warning("#################################################")
logging.getLogger(__name__).warning("Starting SCPI daemon v%s...", __version__)
logging.getLogger(__name__).warning("#################################################")
for wavemeter_id, interface, port in wavemeter_config:
if interface is not None:
try:
interface_str = [str(iface.ip for iface in interface)]
except TypeError:
# Not an iterable
interface_str = str(interface.ip)
else:
interface_str = None
server = asyncio.create_task(create_wm_server(wavemeter_id, interface_str, port))
server_list.add(server)
wavemeters_configured.add(wavemeter_id)
logging.getLogger(__name__).info(
"Wavemeter configurations found for: %s.", ",".join(map(str, sorted(wavemeters_configured)))
)
await asyncio.gather(*server_list)
logging.basicConfig(
# format="%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s",
format="%(message)s",
level=config("APPLICATION_LOG_LEVEL", default=logging.INFO, cast=parse_log_level),
datefmt="%Y-%m-%d %H:%M:%S",
)
# 536: Quips B WS-6 192.168.1.240
# 4711: Quips B WS-8 192.168.1.240
# 4734: Quips C WS-8 192.168.1.45
try:
wavemeters = parse_wavemeter_config(config("WAVEMETERS"))
except UndefinedValueError:
logging.getLogger(__name__).error("No wavemeters defined. Check the 'WAVEMETERS' environment variable.")
except ValidationError as validation_exc:
logging.getLogger(__name__).error(f"Invalid wavemeter configuration: {validation_exc}")
else:
try:
asyncio.run(main(wavemeters))
except KeyboardInterrupt:
pass
finally:
logging.getLogger(__name__).warning("#################################################")
logging.getLogger(__name__).warning("Stopping SCPI daemon...")
logging.getLogger(__name__).warning("#################################################")