-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
157 lines (131 loc) · 5.35 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# Copyright (c) farm-ng, inc.
#
# Licensed under the Amiga Development Kit License (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://github.com/farm-ng/amiga-dev-kit/blob/main/LICENSE
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import argparse
import asyncio
from pathlib import Path
from typing import Optional
import uvicorn
from farm_ng.core.event_client_manager import (
EventClient,
EventClientSubscriptionManager,
)
from farm_ng.core.event_service_pb2 import EventServiceConfigList
from farm_ng.core.event_service_pb2 import SubscribeRequest
from farm_ng.core.events_file_reader import proto_from_json_file
from farm_ng.core.uri_pb2 import Uri
from fastapi import FastAPI
from fastapi import WebSocket
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.staticfiles import StaticFiles
from google.protobuf.json_format import MessageToJson
app = FastAPI()
@app.on_event("startup")
async def startup_event():
print("Initializing App...")
asyncio.create_task(event_manager.update_subscriptions())
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allows all origins
allow_credentials=True,
allow_methods=["*"], # Allows all methods
allow_headers=["*"], # Allows all headers
)
# to store the events clients
clients: dict[str, EventClient] = {}
@app.get("/list_uris")
async def list_uris() -> JSONResponse:
"""Return all the uris from the event manager."""
all_uris_list: EventServiceConfigList = event_manager.get_all_uris_config_list(
config_name="all_subscription_uris"
)
all_uris = {}
for config in all_uris_list.configs:
if config.name == "all_subscription_uris":
for subscription in config.subscriptions:
uri = subscription.uri
# service_name is formatted as "service_name=gps", so we split on "=" and take the last [1] part of it.
service_name = uri.query.split("=")[1]
key = f"{service_name}{uri.path}"
value = {
"scheme": "protobuf",
"authority": config.host,
"path": uri.path,
"query": uri.query,
}
all_uris[key] = value
return JSONResponse(content=dict(sorted(all_uris.items())), status_code=200)
@app.websocket("/subscribe/{service_name}/{uri_path:path}")
@app.websocket("/subscribe/{service_name}/{sub_service_name}/{uri_path:path}")
async def subscribe(
websocket: WebSocket,
service_name: str,
uri_path: str,
sub_service_name: Optional[str] = None,
every_n: int = 1
):
"""Coroutine to subscribe to an event service via websocket.
Args:
websocket (WebSocket): the websocket connection
service_name (str): the name of the event service
uri_path (str): the uri path to subscribe to
sub_service_name (str, optional): the sub service name, if any
every_n (int, optional): the frequency to receive events. Defaults to 1.
Usage:
ws = new WebSocket("ws://localhost:8042/subscribe/gps/pvt")
ws = new WebSocket("ws://localhost:8042/subscribe/oak/0/imu")
"""
full_service_name = f"{service_name}/{sub_service_name}" if sub_service_name else service_name
client: EventClient = (
event_manager.clients[full_service_name]
if full_service_name not in ["gps", "oak/0", "oak/1", "oak/2", "oak/3"]
else event_manager.clients["amiga"]
)
await websocket.accept()
async for _, msg in client.subscribe(
SubscribeRequest(
uri=Uri(path=f"/{uri_path}", query=f"service_name={full_service_name}"),
every_n=every_n,
),
decode=True,
):
await websocket.send_json(MessageToJson(msg))
await websocket.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--config", type=Path, required=True, help="config file")
parser.add_argument("--port", type=int, default=8042, help="port to run the server")
parser.add_argument("--debug", action="store_true", help="debug mode")
args = parser.parse_args()
# NOTE: we only serve the react app in debug mode
if not args.debug:
react_build_directory = Path(__file__).parent / "ts" / "dist"
app.mount(
"/",
StaticFiles(directory=str(react_build_directory.resolve()), html=True),
)
# config with all the configs
base_config_list: EventServiceConfigList = proto_from_json_file(
args.config, EventServiceConfigList()
)
# filter out services to pass to the events client manager
service_config_list = EventServiceConfigList()
for config in base_config_list.configs:
if config.port == 0:
continue
service_config_list.configs.append(config)
event_manager = EventClientSubscriptionManager(config_list=service_config_list)
# run the server
uvicorn.run(app, host="0.0.0.0", port=args.port) # noqa: S104