Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: volume detection #75

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
A Python library to control ffmepg from asyncio for [Home Assistant](https://www.home-assistant.io).

- Emulate webcam from any video input source for HA
- Analyse a video/audio stream for noise or motion detection
- Analyse a video/audio stream for volume, noise or motion detection
- Grab image from a stream

Be carfull that you protect function calls to this library with `asyncio.shield`.
91 changes: 89 additions & 2 deletions haffmpeg/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import re
from time import time
from typing import Callable, Coroutine, Optional
from decimal import Decimal

import async_timeout

from .core import FFMPEG_STDOUT, HAFFmpegWorker
from .timeout import asyncio_timeout
Expand All @@ -12,7 +15,7 @@


class SensorNoise(HAFFmpegWorker):
"""Implement a noise detection on a autio stream."""
"""Implement a noise detection on a audio stream."""

STATE_NONE = 0
STATE_NOISE = 1
Expand Down Expand Up @@ -42,7 +45,7 @@ def open_sensor(
output_dest: Optional[str] = None,
extra_cmd: Optional[str] = None,
) -> Coroutine:
"""Open FFmpeg process for read autio stream.
"""Open FFmpeg process for read audio stream.

Return a coroutine.
"""
Expand Down Expand Up @@ -116,6 +119,90 @@ async def _worker_process(self) -> None:
_LOGGER.warning("Unknown data from queue!")


class SensorVolumeBase(HAFFmpegWorker):
"""Implement volume sensor on audio stream."""

def __init__(self, ffmpeg_bin: str, callback: Callable, re_state: re.Pattern):
"""Init volume sensor."""
super().__init__(ffmpeg_bin)
self._callback = callback
self._time_duration = 1
self._re_state: re.Pattern = re_state

def set_options(self, time_duration: int = 1) -> None:
"""Set option parameters for volume sensor."""
self._time_duration = time_duration

def open_sensor(
self,
input_source: str,
output_dest: Optional[str] = None,
extra_cmd: Optional[str] = None,
) -> Coroutine:
"""Open FFmpeg process for read audio stream.

Return a coroutine.
"""
command = ["-vn", "-filter:a", "volumedetect"]

# run ffmpeg, read output
return self.start_worker(
cmd=command,
input_source=input_source,
output=output_dest,
extra_cmd=extra_cmd,
)

async def _worker_process(self) -> None:
"""This function extracts mean/max dB into the state sensor."""
state: Decimal = -100
timeout = self._time_duration

self._loop.call_soon(self._callback, False)

# process queue data
while True:
try:
_LOGGER.debug("Reading mean: %ddB state with timeout: %s", state, timeout)
with async_timeout.timeout(timeout):
data = await self._queue.get()
timeout = None
if data is None:
self._loop.call_soon(self._callback, None)
return
except asyncio.TimeoutError:
_LOGGER.debug("Blocking timeout")
self._loop.call_soon(self._callback, None)
timeout = None
continue

match = self._re_state.search(data)
if match:
state = Decimal(match[1])
self._loop.call_soon(self._callback, state)
continue

_LOGGER.warning("Unknown data from queue!")


class MeanSensorVolume(SensorVolumeBase):
"""Implement a mean volume sensor."""

def __init__(self, ffmpeg_bin: str, callback: Callable):
"""Init mean volume sensor."""
re_state = re.compile(r"mean_volume: (\-\d{1,3}(\.\d{1,2})?) dB")
super().__init__(ffmpeg_bin, callback, re_state)


class MaxSensorVolume(SensorVolumeBase):
"""Implement a max volume sensor."""

def __init__(self, ffmpeg_bin: str, callback: Callable):
"""Init max volume sensor."""
re_state = re.compile(r"max_volume: (\-\d{1,3}(\.\d{1,2})?) dB")
super().__init__(ffmpeg_bin, callback, re_state)


class SensorMotion(HAFFmpegWorker):
"""Implement motion detection with ffmpeg scene detection."""

Expand Down
46 changes: 46 additions & 0 deletions test/sensor_max_volume.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import asyncio
import logging

import click

from haffmpeg.sensor import MaxSensorVolume

logging.basicConfig(level=logging.DEBUG)


@click.command()
@click.option("--ffmpeg", "-f", default="ffmpeg", help="FFmpeg binary")
@click.option("--source", "-s", help="Input file for ffmpeg")
@click.option("--output", "-o", default=None, help="Output ffmpeg target")
@click.option(
"--duration",
"-d",
default=1,
type=int,
help="Time duration to detect volume",
)
@click.option("--extra", "-e", help="Extra ffmpeg command line arguments")
def cli(ffmpeg, source, output, duration, extra):
"""FFMPEG max volume detection."""

def callback(state):
print("Max volume is: %s" % str(state))

async def run():

sensor = MaxSensorVolume(ffmpeg_bin=ffmpeg, callback=callback)
sensor.set_options(time_duration=duration)
await sensor.open_sensor(
input_source=source, output_dest=output, extra_cmd=extra
)
try:
while True:
await asyncio.sleep(0.1)
finally:
await sensor.close()

asyncio.run(run())


if __name__ == "__main__":
cli()
46 changes: 46 additions & 0 deletions test/sensor_mean_volume.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import asyncio
import logging

import click

from haffmpeg.sensor import MeanSensorVolume

logging.basicConfig(level=logging.DEBUG)


@click.command()
@click.option("--ffmpeg", "-f", default="ffmpeg", help="FFmpeg binary")
@click.option("--source", "-s", help="Input file for ffmpeg")
@click.option("--output", "-o", default=None, help="Output ffmpeg target")
@click.option(
"--duration",
"-d",
default=1,
type=int,
help="Time duration to detect volume",
)
@click.option("--extra", "-e", help="Extra ffmpeg command line arguments")
def cli(ffmpeg, source, output, duration, extra):
"""FFMPEG mean volume detection."""

def callback(state):
print("Mean volume is: %s" % str(state))

async def run():

sensor = MeanSensorVolume(ffmpeg_bin=ffmpeg, callback=callback)
sensor.set_options(time_duration=duration)
await sensor.open_sensor(
input_source=source, output_dest=output, extra_cmd=extra
)
try:
while True:
await asyncio.sleep(0.1)
finally:
await sensor.close()

asyncio.run(run())


if __name__ == "__main__":
cli()