-
Notifications
You must be signed in to change notification settings - Fork 80
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
3a2e712
commit fa57a80
Showing
12 changed files
with
1,211,754 additions
and
0 deletions.
There are no files selected for viewing
117 changes: 117 additions & 0 deletions
117
core/services/ardupilot_manager/autopilot/environment.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
|
||
import abc | ||
from typing import List, Optional, Type | ||
|
||
from loguru import logger | ||
|
||
from autopilot.exceptions import InvalidEnvironmentImplementation | ||
from autopilot.firmware import AutopilotFirmware | ||
from flight_controller import FlightController | ||
from mavlink_proxy.Endpoint import Endpoint, EndpointType | ||
from mavlink_proxy.exceptions import EndpointAlreadyExists | ||
from mavlink_proxy.Manager import Manager as MavlinkManager | ||
|
||
|
||
class AutopilotEnvironment(abc.ABC): | ||
def __init__(self, owner: str) -> None: | ||
self.owner = owner | ||
self.mavlink_manager: Optional[MavlinkManager] = None | ||
|
||
@classmethod | ||
def all(cls) -> List[str]: | ||
return [subclass.__name__ for subclass in cls.__subclasses__()] | ||
|
||
@classmethod | ||
def get(cls, name: str) -> Type["AutopilotEnvironment"]: | ||
for subclass in cls.__subclasses__(): | ||
if subclass.__name__ == name: | ||
return subclass() | ||
raise InvalidEnvironmentImplementation(f"{name} is not a valid environment implementation class") | ||
|
||
async def start_mavlink_manager(self, device: Endpoint) -> None: | ||
default_endpoints = [ | ||
Endpoint( | ||
name="GCS Server Link", | ||
owner=self.owner, | ||
connection_type=EndpointType.UDPServer, | ||
place="0.0.0.0", | ||
argument=14550, | ||
persistent=True, | ||
enabled=False, | ||
), | ||
Endpoint( | ||
name="GCS Client Link", | ||
owner=self.owner, | ||
connection_type=EndpointType.UDPClient, | ||
place="192.168.2.1", | ||
argument=14550, | ||
persistent=True, | ||
enabled=True, | ||
), | ||
Endpoint( | ||
name="MAVLink2RestServer", | ||
owner=self.owner, | ||
connection_type=EndpointType.UDPServer, | ||
place="127.0.0.1", | ||
argument=14001, | ||
persistent=True, | ||
protected=True, | ||
), | ||
Endpoint( | ||
name="MAVLink2Rest", | ||
owner=self.owner, | ||
connection_type=EndpointType.UDPClient, | ||
place="127.0.0.1", | ||
argument=14000, | ||
persistent=True, | ||
protected=True, | ||
overwrite_settings=True, | ||
), | ||
Endpoint( | ||
name="Internal Link", | ||
owner=self.owner, | ||
connection_type=EndpointType.TCPServer, | ||
place="127.0.0.1", | ||
argument=5777, | ||
persistent=True, | ||
protected=True, | ||
overwrite_settings=True, | ||
), | ||
Endpoint( | ||
name="Ping360 Heading", | ||
owner=self.owner, | ||
connection_type=EndpointType.UDPServer, | ||
place="0.0.0.0", | ||
argument=14660, | ||
persistent=True, | ||
protected=True, | ||
), | ||
] | ||
for endpoint in default_endpoints: | ||
try: | ||
self.mavlink_manager.add_endpoint(endpoint) | ||
except EndpointAlreadyExists: | ||
pass | ||
except Exception as error: | ||
logger.warning(str(error)) | ||
await self.mavlink_manager.start(device) | ||
|
||
@abc.abstractmethod | ||
async def start(self) -> None: | ||
raise NotImplementedError | ||
|
||
@abc.abstractmethod | ||
async def stop(self) -> None: | ||
raise NotImplementedError | ||
|
||
@abc.abstractmethod | ||
async def restart(self) -> None: | ||
raise NotImplementedError | ||
|
||
@abc.abstractmethod | ||
async def setup(self) -> None: | ||
raise NotImplementedError | ||
|
||
@abc.abstractmethod | ||
def boards(self, firmware: AutopilotFirmware) -> Optional[FlightController]: | ||
raise NotImplementedError |
4 changes: 4 additions & 0 deletions
4
core/services/ardupilot_manager/autopilot/environments/linux.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
from autopilot.environment import AutopilotEnvironment | ||
|
||
class LinuxAutopilotEnvironment(AutopilotEnvironment): | ||
pass |
44 changes: 44 additions & 0 deletions
44
core/services/ardupilot_manager/autopilot/environments/serial.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
from typing import Optional | ||
from autopilot.environment import AutopilotEnvironment | ||
from autopilot.firmware import AutopilotFirmware | ||
from flight_controller import FlightController | ||
from mavlink_proxy.Endpoint import Endpoint, EndpointType | ||
|
||
class SerialEnvironment(AutopilotEnvironment): | ||
def __init__(self, owner: str, board: FlightController) -> None: | ||
self.board: FlightController = board | ||
super().__init__(owner) | ||
|
||
async def start(self) -> None: | ||
if not self.board.path: | ||
raise ValueError(f"Could not find device path for board {self.board.name}.") | ||
|
||
baudrate = 115200 | ||
is_px4 = "px4" in self.board.name.lower() | ||
if is_px4: | ||
baudrate = 57600 | ||
await self.start_mavlink_manager( | ||
Endpoint( | ||
name="Master", | ||
owner=self.owner, | ||
connection_type=EndpointType.Serial, | ||
place=self.board.path, | ||
argument=baudrate, | ||
protected=True, | ||
) | ||
) | ||
if is_px4: | ||
# PX4 needs at least one initial heartbeat to start sending data | ||
await self.vehicle_manager.burst_heartbeat() | ||
|
||
def stop(self) -> None: | ||
raise NotImplementedError | ||
|
||
def restart(self) -> None: | ||
raise NotImplementedError | ||
|
||
def setup(self) -> None: | ||
raise NotImplementedError | ||
|
||
def boards(self, firmware: AutopilotFirmware) -> Optional[FlightController]: | ||
raise NotImplementedError |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
|
||
class InvalidEnvironmentImplementation(ValueError): | ||
"""Required platform implementation class does not exists""" | ||
|
||
class InvalidFirmwareImplementation(ValueError): | ||
"""Required firmware implementation class does not exists""" | ||
|
||
class InvalidAutopilotManifestData(ValueError): | ||
"""Invalid manifest data""" | ||
|
||
class InvalidAutopilotManifestURL(ValueError): | ||
"""Invalid manifest URL""" | ||
|
||
class ManifestDataFetchFailed(RuntimeError): | ||
"""Failed to fetch manifest data""" | ||
|
||
class BackendIsOffline(RuntimeError): | ||
"""Backend is offline""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
from typing import List, Optional, Type | ||
|
||
from autopilot.exceptions import InvalidFirmwareImplementation | ||
from flight_controller import FlightController | ||
|
||
|
||
class AutopilotFirmware: | ||
@classmethod | ||
def all(cls) -> List[str]: | ||
return [subclass.__name__ for subclass in cls.__subclasses__()] | ||
|
||
@classmethod | ||
def get(cls, name: str) -> Optional[Type["AutopilotFirmware"]]: | ||
for subclass in cls.__subclasses__(): | ||
if subclass.__name__ == name: | ||
return subclass() | ||
raise InvalidFirmwareImplementation(f"{name} is not a valid firmware implementation class") | ||
|
||
def supported_boards(self) -> List[FlightController]: | ||
raise NotImplementedError |
5 changes: 5 additions & 0 deletions
5
core/services/ardupilot_manager/autopilot/firmwares/ardupilot/ardupilot.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from autopilot.firmware import AutopilotFirmware | ||
|
||
|
||
class ArdupilotFirmware(AutopilotFirmware): | ||
pass |
Oops, something went wrong.