Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add camera support #336

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions hatasmota/camera.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
"""Tasmota camera."""

from __future__ import annotations

from collections.abc import Awaitable
from dataclasses import dataclass
import logging
from typing import Any

from aiohttp import ClientResponse, ClientSession

from .const import CONF_DEEP_SLEEP, CONF_IP, CONF_MAC
from .entity import (
TasmotaAvailability,
TasmotaAvailabilityConfig,
TasmotaEntity,
TasmotaEntityConfig,
)
from .utils import (
config_get_state_offline,
config_get_state_online,
get_topic_command_state,
get_topic_tele_will,
)

_LOGGER = logging.getLogger(__name__)


@dataclass(frozen=True, kw_only=True)
class TasmotaCameraConfig(TasmotaAvailabilityConfig, TasmotaEntityConfig):
"""Tasmota camera configuation."""

ip_address: str

@classmethod
def from_discovery_message(cls, config: dict, platform: str) -> TasmotaCameraConfig:
"""Instantiate from discovery message."""
return cls(
endpoint="camera",
idx=0,
friendly_name=None,
mac=config[CONF_MAC],
platform=platform,
poll_payload="",
poll_topic=get_topic_command_state(config),
availability_topic=get_topic_tele_will(config),
availability_offline=config_get_state_offline(config),
availability_online=config_get_state_online(config),
deep_sleep_enabled=config[CONF_DEEP_SLEEP],
ip_address=config[CONF_IP],
)


class TasmotaCamera(TasmotaAvailability, TasmotaEntity):
"""Representation of a Tasmota camera."""

_cfg: TasmotaCameraConfig

def __init__(self, **kwds: Any):
"""Initialize."""
self._sub_state: dict | None = None
super().__init__(**kwds)

async def subscribe_topics(self) -> None:
"""Subscribe to topics."""

availability_topics = self.get_availability_topics()
topics = {**availability_topics}

self._sub_state = await self._mqtt_client.subscribe(
self._sub_state,
topics,
)

async def unsubscribe_topics(self) -> None:
"""Unsubscribe to all MQTT topics."""
self._sub_state = await self._mqtt_client.unsubscribe(self._sub_state)

def get_still_image_stream(
self, websession: ClientSession
) -> Awaitable[ClientResponse]:
"""Get the io stream to read the static image."""
still_image_url = f"http://{self._cfg.ip_address}/snapshot.jpg"
return websession.get(still_image_url)

def get_mjpeg_stream(self, websession: ClientSession) -> Awaitable[ClientResponse]:
"""Get the io stream to read the mjpeg stream."""
mjpeg_url = f"http://{self._cfg.ip_address}:81/cam.mjpeg"
return websession.get(mjpeg_url)
1 change: 1 addition & 0 deletions hatasmota/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
CONF_FRIENDLYNAME: Final = "fn"
CONF_FULLTOPIC: Final = "ft"
CONF_IFAN: Final = "if"
CONF_CAM: Final = "cam"
CONF_IP: Final = "ip"
CONF_HOSTNAME: Final = "hn"
CONF_MAC: Final = "mac"
Expand Down
22 changes: 22 additions & 0 deletions hatasmota/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@

from . import config_validation as cv
from .button import TasmotaButtonTrigger, TasmotaButtonTriggerConfig
from .camera import TasmotaCamera, TasmotaCameraConfig
from .const import (
CONF_BATTERY,
CONF_BUTTON,
CONF_CAM,
CONF_DEEP_SLEEP,
CONF_DEVICENAME,
CONF_FRIENDLYNAME,
Expand Down Expand Up @@ -121,6 +123,7 @@
CONF_FULLTOPIC: cv.string,
CONF_HOSTNAME: cv.string,
vol.Optional(CONF_IFAN, default=0): cv.bit, # Added in Tasmota 9.0.0.4
vol.Optional(CONF_CAM, default=0): cv.bit,
CONF_IP: cv.string,
CONF_LIGHT_SUBTYPE: cv.positive_int,
CONF_LINK_RGB_CT: cv.bit,
Expand Down Expand Up @@ -334,6 +337,21 @@ def get_binary_sensor_entities(
return entities


def get_camera_entities(
discovery_msg: dict,
) -> list[tuple[TasmotaCameraConfig | None, DiscoveryHashType]]:
"""Generate camera configuration."""
camera_entities: list[tuple[TasmotaCameraConfig | None, DiscoveryHashType]] = []

entity = None
discovery_hash = (discovery_msg[CONF_MAC], "cam", "cam", 0)
if CONF_CAM in discovery_msg and discovery_msg[CONF_CAM]:
entity = TasmotaCameraConfig.from_discovery_message(discovery_msg, "camera")
camera_entities.append((entity, discovery_hash))

return camera_entities


def get_cover_entities(
discovery_msg: dict,
) -> list[tuple[TasmotaShutterConfig | None, DiscoveryHashType]]:
Expand Down Expand Up @@ -476,6 +494,8 @@ def get_entities_for_platform(
entities: list[tuple[TasmotaEntityConfig | None, DiscoveryHashType]] = []
if platform == "binary_sensor":
entities.extend(get_binary_sensor_entities(discovery_msg))
elif platform == "camera":
entities.extend(get_camera_entities(discovery_msg))
elif platform == "cover":
entities.extend(get_cover_entities(discovery_msg))
elif platform == "fan":
Expand All @@ -502,6 +522,8 @@ def get_entity(
platform = config.platform
if platform == "binary_sensor":
return TasmotaSwitch(config=config, mqtt_client=mqtt_client)
if platform == "camera":
return TasmotaCamera(config=config, mqtt_client=mqtt_client)
if platform == "cover":
return TasmotaShutter(config=config, mqtt_client=mqtt_client)
if platform == "fan":
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
voluptuous>=0.12.0
aiohttp>=3.11.12