Skip to content

Commit

Permalink
Extract file receiver from BackupManager.async_receive_backup to util
Browse files Browse the repository at this point in the history
  • Loading branch information
emontnemery committed Dec 4, 2024
1 parent 37853fd commit c2d54f2
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 39 deletions.
41 changes: 2 additions & 39 deletions homeassistant/components/backup/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io
import json
from pathlib import Path
from queue import SimpleQueue
import shutil
import tarfile
from tempfile import TemporaryDirectory
Expand Down Expand Up @@ -44,7 +43,7 @@
LOGGER,
)
from .models import AgentBackup, Folder
from .util import read_backup
from .util import read_backup, receieve_file


@dataclass(frozen=True, kw_only=True, slots=True)
Expand Down Expand Up @@ -383,48 +382,12 @@ async def async_receive_backup(
contents: aiohttp.BodyPartReader,
) -> None:
"""Receive and store a backup file from upload."""
queue: SimpleQueue[tuple[bytes, asyncio.Future[None] | None] | None] = (
SimpleQueue()
)
temp_dir_handler = await self.hass.async_add_executor_job(TemporaryDirectory)
target_temp_file = Path(
temp_dir_handler.name, contents.filename or "backup.tar"
)

def _sync_queue_consumer() -> None:
with target_temp_file.open("wb") as file_handle:
while True:
if (_chunk_future := queue.get()) is None:
break
_chunk, _future = _chunk_future
if _future is not None:
self.hass.loop.call_soon_threadsafe(_future.set_result, None)
file_handle.write(_chunk)

fut: asyncio.Future[None] | None = None
try:
fut = self.hass.async_add_executor_job(_sync_queue_consumer)
megabytes_sending = 0
while chunk := await contents.read_chunk(BUF_SIZE):
megabytes_sending += 1
if megabytes_sending % 5 != 0:
queue.put_nowait((chunk, None))
continue

chunk_future = self.hass.loop.create_future()
queue.put_nowait((chunk, chunk_future))
await asyncio.wait(
(fut, chunk_future),
return_when=asyncio.FIRST_COMPLETED,
)
if fut.done():
# The executor job failed
break

queue.put_nowait(None) # terminate queue consumer
finally:
if fut is not None:
await fut
await receieve_file(self.hass, contents, target_temp_file)

def _copy_and_cleanup(
local_file_paths: list[Path], backup: AgentBackup
Expand Down
47 changes: 47 additions & 0 deletions homeassistant/components/backup/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@

from __future__ import annotations

import asyncio
from pathlib import Path
from queue import SimpleQueue
import tarfile
from typing import cast

import aiohttp

from homeassistant.core import HomeAssistant
from homeassistant.util.json import JsonObjectType, json_loads_object

from .const import BUF_SIZE
Expand Down Expand Up @@ -57,3 +62,45 @@ def read_backup(backup_path: Path) -> AgentBackup:
protected=cast(bool, data.get("protected", False)),
size=backup_path.stat().st_size,
)


async def receieve_file(
hass: HomeAssistant, contents: aiohttp.BodyPartReader, path: Path
) -> None:
"""Receive a file from a stream and write it to a file."""
queue: SimpleQueue[tuple[bytes, asyncio.Future[None] | None] | None] = SimpleQueue()

def _sync_queue_consumer() -> None:
with path.open("wb") as file_handle:
while True:
if (_chunk_future := queue.get()) is None:
break
_chunk, _future = _chunk_future
if _future is not None:
hass.loop.call_soon_threadsafe(_future.set_result, None)

Check warning on line 80 in homeassistant/components/backup/util.py

View check run for this annotation

Codecov / codecov/patch

homeassistant/components/backup/util.py#L80

Added line #L80 was not covered by tests
file_handle.write(_chunk)

fut: asyncio.Future[None] | None = None
try:
fut = hass.async_add_executor_job(_sync_queue_consumer)
megabytes_sending = 0
while chunk := await contents.read_chunk(BUF_SIZE):
megabytes_sending += 1
if megabytes_sending % 5 != 0:
queue.put_nowait((chunk, None))
continue

chunk_future = hass.loop.create_future()
queue.put_nowait((chunk, chunk_future))
await asyncio.wait(

Check warning on line 95 in homeassistant/components/backup/util.py

View check run for this annotation

Codecov / codecov/patch

homeassistant/components/backup/util.py#L93-L95

Added lines #L93 - L95 were not covered by tests
(fut, chunk_future),
return_when=asyncio.FIRST_COMPLETED,
)
if fut.done():

Check warning on line 99 in homeassistant/components/backup/util.py

View check run for this annotation

Codecov / codecov/patch

homeassistant/components/backup/util.py#L99

Added line #L99 was not covered by tests
# The executor job failed
break

Check warning on line 101 in homeassistant/components/backup/util.py

View check run for this annotation

Codecov / codecov/patch

homeassistant/components/backup/util.py#L101

Added line #L101 was not covered by tests

queue.put_nowait(None) # terminate queue consumer
finally:
if fut is not None:
await fut

0 comments on commit c2d54f2

Please sign in to comment.