Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 297 #298

Merged
merged 7 commits into from
Dec 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 52 additions & 38 deletions custom_components/mqtt_vacuum_camera/camera.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
"""
Camera
Version: v2024.12.0
Version: v2024.12.1
"""

from __future__ import annotations

import asyncio
from asyncio import gather, get_event_loop
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
from datetime import timedelta
from io import BytesIO
Expand Down Expand Up @@ -248,8 +247,9 @@ async def handle_obstacle_view(self, event):

if self._shared.camera_mode == CameraModes.OBSTACLE_VIEW:
self._shared.camera_mode = CameraModes.MAP_VIEW
_LOGGER.debug(f"Camera Mode Change to {self._shared.camera_mode}")
self._should_poll = True
return
return await self.async_update()

if (
self._shared.obstacles_data
Expand All @@ -258,6 +258,7 @@ async def handle_obstacle_view(self, event):
_LOGGER.debug(f"Received event: {event.event_type}, Data: {event.data}")
if event.data.get("entity_id") == self.entity_id:
self._shared.camera_mode = CameraModes.OBSTACLE_DOWNLOAD
_LOGGER.debug(f"Camera Mode Change to {self._shared.camera_mode}")
self._should_poll = False # Turn off polling
coordinates = event.data.get("coordinates")
if coordinates:
Expand Down Expand Up @@ -288,11 +289,14 @@ async def handle_obstacle_view(self, event):
)
self._should_poll = True # Turn on polling
self._shared.camera_mode = CameraModes.MAP_VIEW
_LOGGER.debug(
f"Camera Mode Change to {self._shared.camera_mode}"
)
return None
if temp_image is not None:
try:
# Open the downloaded image with PIL
pil_img = Image.open(temp_image)
pil_img = await self.async_open_image(temp_image)

# Resize the image if resize_to is provided
pil_img.thumbnail((self._image_w, self._image_h))
Expand All @@ -304,15 +308,24 @@ async def handle_obstacle_view(self, event):
f"{self._file_name}: Error processing image: {e}"
)
self._shared.camera_mode = CameraModes.MAP_VIEW
_LOGGER.debug(
f"Camera Mode Change to {self._shared.camera_mode}"
)
self._should_poll = True # Turn on polling
return None

self.Image = await self.hass.async_create_task(
self.run_async_pil_to_bytes(pil_img)
)
self._shared.camera_mode = CameraModes.OBSTACLE_VIEW
_LOGGER.debug(
f"Camera Mode Change to {self._shared.camera_mode}"
)
else:
self._shared.camera_mode = CameraModes.MAP_VIEW
_LOGGER.debug(
f"Camera Mode Change to {self._shared.camera_mode}"
)
self._should_poll = True # Turn on polling
else:
_LOGGER.debug("No nearby obstacle found.")
Expand All @@ -326,9 +339,9 @@ async def handle_obstacle_view(self, event):
async def _async_find_nearest_obstacle(x, y, obstacles):
"""Find the nearest obstacle to the given coordinates."""
nearest_obstacle = None
min_distance = float("inf") # Start with a very large distance
min_distance = 500 # Start with a very large distance
_LOGGER.debug(
f"Finding the nearest {min_distance} obstacle to coordinates: {x}, {y}"
f"Finding in the nearest {min_distance} pixels obstacle to coordinates: {x}, {y}"
)

for obstacle in obstacles:
Expand All @@ -345,10 +358,26 @@ async def _async_find_nearest_obstacle(x, y, obstacles):

return nearest_obstacle

async def async_open_image(self, file_path) -> Image.Image:
"""
Asynchronously open an image file using a thread pool.
Args:
file_path (str): Path to the image file.

Returns:
Image.Image: Opened PIL image.
"""
executor = ThreadPoolExecutor(
max_workers=1, thread_name_prefix=f"{self._file_name}_camera"
)
loop = asyncio.get_running_loop()
pil_img = await loop.run_in_executor(executor, Image.open, file_path)
return pil_img

sca075 marked this conversation as resolved.
Show resolved Hide resolved
@staticmethod
async def download_image(url: str, storage_path: str, filename: str):
"""
Asynchronously download an image using threading to avoid blocking.
Asynchronously download an image without blocking.

Args:
url (str): The URL to download the image from.
Expand All @@ -358,38 +387,23 @@ async def download_image(url: str, storage_path: str, filename: str):
Returns:
str: The full path to the saved image or None if the download fails.
"""
# Ensure the storage path exists
os.makedirs(storage_path, exist_ok=True)

obstacle_file = os.path.join(storage_path, filename)

async def blocking_download():
"""Run the blocking download in a separate thread."""
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
with open(obstacle_file, "wb") as f:
f.write(await response.read())
_LOGGER.debug(
f"Image downloaded successfully: {obstacle_file}"
)
return obstacle_file
else:
_LOGGER.warning(
f"Failed to download image: {response.status}"
)
return None
except Exception as e:
_LOGGER.error(f"Error downloading image: {e}")
return None

executor = ThreadPoolExecutor(max_workers=3) # Limit to 3 workers

# Run the blocking I/O in a thread
return await asyncio.get_running_loop().run_in_executor(
executor, asyncio.run, blocking_download()
)
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
with open(obstacle_file, "wb") as f:
f.write(await response.read())
_LOGGER.debug(f"Image downloaded successfully: {obstacle_file}")
return obstacle_file
else:
_LOGGER.warning(f"Failed to download image: {response.status}")
return None
except Exception as e:
_LOGGER.error(f"Error downloading image: {e}")
return None

@property
def should_poll(self) -> bool:
Expand Down Expand Up @@ -482,7 +496,7 @@ async def async_update(self):
f"{self._file_name}: Camera image data update available: {process_data}"
)
try:
parsed_json, is_a_test = await self._process_parsed_json()
parsed_json, is_a_test = await self._process_parsed_json(True)
except ValueError:
self._vac_json_available = "Error"
pass
Expand Down Expand Up @@ -666,7 +680,7 @@ async def run_async_pil_to_bytes(self, pil_img):
pil_img_list = [pil_img for _ in range(num_processes)]
loop = get_event_loop()

with concurrent.futures.ThreadPoolExecutor(
with ThreadPoolExecutor(
max_workers=1, thread_name_prefix=f"{self._file_name}_camera"
) as executor:
tasks = [
Expand Down
10 changes: 8 additions & 2 deletions custom_components/mqtt_vacuum_camera/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,20 @@ def compose_obstacle_links(vacuum_host_ip: str, obstacles: list) -> list:
Compose JSON with obstacle details including the image link.
"""
obstacle_links = []
if not obstacles or not vacuum_host_ip:
_LOGGER.debug(
f"Obstacle links: no obstacles: "
f"{obstacles} and / or ip: {vacuum_host_ip} to link."
)
return None

for obstacle in obstacles:
# Extract obstacle details
label = obstacle.get("label", "")
points = obstacle.get("points", {})
image_id = obstacle.get("id", "None")

if label and points and image_id:
if label and points and image_id and vacuum_host_ip:
# Append formatted obstacle data
if image_id != "None":
# Compose the link
Expand All @@ -246,6 +252,6 @@ def compose_obstacle_links(vacuum_host_ip: str, obstacles: list) -> list:
}
)

_LOGGER.debug(f"Obstacle links: {obstacle_links}")
_LOGGER.debug(f"Obstacle links: linked data complete.")

return obstacle_links
71 changes: 39 additions & 32 deletions custom_components/mqtt_vacuum_camera/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,35 +143,42 @@ async def async_update_sensor_data(self, sensor_data):
"""
Update the sensor data format before sending to the sensors.
"""

if sensor_data:
# Assume sensor_data is a dictionary or transform it into the expected format
battery_level = await self.connector.get_battery_level()
vacuum_state = await self.connector.get_vacuum_status()
vacuum_room = self.shared.current_room
if not vacuum_room:
vacuum_room = {"in_room": "Unsupported"}
last_run_stats = sensor_data.get("last_run_stats", {})
last_loaded_map = sensor_data.get("last_loaded_map", {})
formatted_data = {
"mainBrush": sensor_data.get("mainBrush", 0),
"sideBrush": sensor_data.get("sideBrush", 0),
"filter": sensor_data.get("filter", 0),
"currentCleanTime": sensor_data.get("currentCleanTime", 0),
"currentCleanArea": sensor_data.get("currentCleanArea", 0),
"cleanTime": sensor_data.get("cleanTime", 0),
"cleanArea": sensor_data.get("cleanArea", 0),
"cleanCount": sensor_data.get("cleanCount", 0),
"battery": battery_level,
"state": vacuum_state,
"last_run_start": last_run_stats.get("startTime", 0),
"last_run_end": last_run_stats.get("endTime", 0),
"last_run_duration": last_run_stats.get("duration", 0),
"last_run_area": last_run_stats.get("area", 0),
"last_bin_out": sensor_data.get("last_bin_out", 0),
"last_bin_full": sensor_data.get("last_bin_full", 0),
"last_loaded_map": last_loaded_map.get("name", "NoMap"),
"robot_in_room": vacuum_room.get("in_room", "Unsupported"),
}
return formatted_data
return SENSOR_NO_DATA
try:
if sensor_data:
# Assume sensor_data is a dictionary or transform it into the expected format
battery_level = await self.connector.get_battery_level()
vacuum_state = await self.connector.get_vacuum_status()
vacuum_room = self.shared.current_room
last_run_stats = sensor_data.get("last_run_stats", {})
last_loaded_map = sensor_data.get("last_loaded_map", {})

if not vacuum_room:
vacuum_room = {"in_room": "Unsupported"}
if last_loaded_map == {}:
last_loaded_map = {"name", "Default"}

formatted_data = {
"mainBrush": sensor_data.get("mainBrush", 0),
"sideBrush": sensor_data.get("sideBrush", 0),
"filter": sensor_data.get("filter", 0),
"currentCleanTime": sensor_data.get("currentCleanTime", 0),
"currentCleanArea": sensor_data.get("currentCleanArea", 0),
"cleanTime": sensor_data.get("cleanTime", 0),
"cleanArea": sensor_data.get("cleanArea", 0),
"cleanCount": sensor_data.get("cleanCount", 0),
"battery": battery_level,
"state": vacuum_state,
"last_run_start": last_run_stats.get("startTime", 0),
"last_run_end": last_run_stats.get("endTime", 0),
"last_run_duration": last_run_stats.get("duration", 0),
"last_run_area": last_run_stats.get("area", 0),
"last_bin_out": sensor_data.get("last_bin_out", 0),
"last_bin_full": sensor_data.get("last_bin_full", 0),
"last_loaded_map": last_loaded_map.get("name", "Default"),
"robot_in_room": vacuum_room.get("in_room"),
}
return formatted_data
return SENSOR_NO_DATA
except Exception as err:
_LOGGER.warning(f"Error processing sensor data: {err}")
return SENSOR_NO_DATA
2 changes: 1 addition & 1 deletion custom_components/mqtt_vacuum_camera/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@
"iot_class": "local_polling",
"issue_tracker": "https://github.com/sca075/mqtt_vacuum_camera/issues",
"requirements": ["pillow>=10.3.0,<=11.0.0", "numpy"],
"version": "2024.12.0"
"version": "2024.12.1"
}
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,9 @@ async def async_message_received(self, msg) -> None:
# When IPV4 and IPV6 are available, use IPV4
if vacuum_host_ip.split(",").__len__() > 1:
self._shared.vacuum_ips = vacuum_host_ip.split(",")[0]
else:
# Use IPV4 when no IPV6 without split
self._shared.vacuum_ips = vacuum_host_ip
_LOGGER.debug(f"Vacuum IPs: {self._shared.vacuum_ips}")

async def async_subscribe_to_topics(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,18 @@ async def async_draw_obstacle(
obstacle_objects.append(obstacle_obj)

# Store obstacle data in shared data
self.img_h.shared.obstacles_data = compose_obstacle_links(
self.img_h.shared.vacuum_ips, obstacle_objects
)
if self.img_h.shared.vacuum_ips:
self.img_h.shared.obstacles_data = compose_obstacle_links(
self.img_h.shared.vacuum_ips, obstacle_objects
)
elif self.img_h.shared.vacuum_api: # Fall back to API usage if no IP.
self.img_h.shared.obstacles_data = compose_obstacle_links(
self.img_h.shared.vacuum_api.split("http://")[1], obstacle_objects
)

# Draw obstacles on the map
if obstacle_objects:
_LOGGER.debug(f"{self.file_name} All obstacle detected: {obstacle_objects}")
_LOGGER.debug(f"{self.file_name} Obstacle detected.")
self.img_h.draw.draw_obstacles(np_array, obstacle_objects, color_no_go)

return np_array
Expand Down
Loading