Skip to content

Commit

Permalink
Merge branch 'bugfix/stuck-uploads-when-congested' into 'devel'
Browse files Browse the repository at this point in the history
Throttle uploads from server side

Closes #1173

See merge request sds-dev/sd-connect/swift-browser-ui!219
  • Loading branch information
Joonatan Mäkinen committed Dec 12, 2023
2 parents 86a4f95 + 1a44230 commit 3d24636
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 121 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- (GL #1154) Fix session cookie not getting properly clear on invalidation
- (GL #1160) Fix large uploaded file (> 5 GiB) showing wrong size in the UI
- (GL #1165) Fix download function not working reliably in Firefox
- (GL #1173) Fix new upload regressions in congested conditions

### Removed

Expand Down
2 changes: 2 additions & 0 deletions swift_browser_ui/upload/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ async def handle_upload_ws(
await upload_session.handle_begin_upload(msg_unpacked)
if msg_unpacked["command"] == "add_chunk":
await upload_session.handle_upload_chunk(msg_unpacked)
if msg_unpacked["command"] == "add_chunks":
await upload_session.handle_upload_chunks(msg_unpacked)
if msg_unpacked["command"] == "cancel":
await upload_session.handle_close()
if msg_unpacked["command"] == "finish":
Expand Down
195 changes: 124 additions & 71 deletions swift_browser_ui/upload/cryptupload.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import base64
import logging
import os
import random
import secrets
import ssl
import typing
Expand All @@ -30,7 +31,8 @@
SEGMENT_CHUNKS = 81885
CHUNK_SIZE = 65564

CRYPTUPLOAD_Q_DEPTH = int(os.environ.get("SWIFTUI_UPLOAD_RUNNER_Q_DEPTH", 96))
# Use an approx 10 MiB queue for each upload by default
CRYPTUPLOAD_Q_DEPTH = int(os.environ.get("SWIFTUI_UPLOAD_RUNNER_Q_DEPTH", 160))


class FileUpload:
Expand Down Expand Up @@ -77,25 +79,36 @@ def __init__(
self.chunk_cache: typing.Dict[int, bytes] = {}
self.failed: bool = False
self.finished: bool = False
self.aborted: bool = False

# Calculate upload parameters
self.total_segments: int = -(self.total // -SEGMENT_SIZE)
self.remainder_segment: int = self.total % SEGMENT_SIZE
self.total_chunks: int = -(self.total // -CHUNK_SIZE)
self.remainder_chunks: int = -(self.remainder_segment // -CHUNK_SIZE)

self.q_cache: typing.List[asyncio.Queue] = [
asyncio.Queue(maxsize=256) for _ in range(0, self.total_segments)
]
self.tasks: typing.List[asyncio.Task] = []

LOGGER.debug(f"total: {self.total}")
LOGGER.debug(f"segments: {self.total_segments}")
LOGGER.debug(f"chunks: {self.total_chunks}")

async def wait_for_cache(self) -> bool:
"""Block until the cache has enough space available."""
while len(self.chunk_cache) > CRYPTUPLOAD_Q_DEPTH:
if self.finished:
return False
# We can safely ignore the warning about using the random library
await asyncio.sleep(random.uniform(0.1, 0.2)) # nosec # noqa: S311
return True

async def add_header(self, header: bytes) -> None:
"""Add header for the file."""
if not await self.a_create_container():
if (
not await self.a_create_container()
and self.socket is not None
and not self.socket.closed
):
await self.socket.send_bytes(
msgpack.packb(
{
Expand All @@ -120,35 +133,38 @@ async def add_header(self, header: bytes) -> None:
)

self.tasks = [
asyncio.create_task(self.slice_into_queue(i, self.q_cache[i]))
asyncio.create_task(self.upload_segment(i))
for i in range(0, self.total_segments)
]

await self.start_upload()

async def start_upload(self):
"""Tell the frontend to start the file upload."""
await self.socket.send_bytes(
msgpack.packb(
{
"command": "start_upload",
"container": self.container,
"object": self.path,
}
if self.socket is not None and not self.socket.closed:
await self.socket.send_bytes(
msgpack.packb(
{
"command": "start_upload",
"container": self.container,
"object": self.path,
}
)
)
)

async def retry_chunk(self, order):
"""Retry a failed chunk."""
await self.socket.send_bytes(
msgpack.packb(
{
"command": "retry_chunk",
"container": self.container,
"object": self.path,
"order": order,
}
if self.socket is not None and not self.socket.closed:
await self.socket.send_bytes(
msgpack.packb(
{
"command": "retry_chunk",
"container": self.container,
"object": self.path,
"order": order,
}
)
)
)

async def add_to_chunks(
self,
Expand All @@ -158,6 +174,7 @@ async def add_to_chunks(
"""Add a chunk to cache."""
if order in self.done_chunks or order in self.chunk_cache:
return

self.chunk_cache[order] = data

async def a_create_container(self) -> bool:
Expand All @@ -178,7 +195,7 @@ async def a_create_container(self) -> bool:
return False
return True

async def slice_into_queue(self, segment: int, q: asyncio.Queue):
async def slice_segment(self, segment: int):
"""Slice a ~5GiB segment from queue."""
seg_start = segment * SEGMENT_CHUNKS
seg_end = seg_start + SEGMENT_CHUNKS
Expand All @@ -191,49 +208,50 @@ async def slice_into_queue(self, segment: int, q: asyncio.Queue):
LOGGER.debug(f"Using {self.total_chunks} as chunk amount for last segment.")
seg_end = self.total_chunks
LOGGER.debug(f"Consuming chunks {seg_start} through {seg_end}")
LOGGER.debug(f"Waiting until first chunk in segment {segment} is available.")

while (seg_start) not in self.chunk_cache:
await asyncio.sleep(0.1)
LOGGER.debug(f"Got first chunk for segment {segment}. Starting upload...")

# Create a task for segment upload
self.tasks.append(asyncio.create_task(self.upload_segment(segment)))

# Start the upload
LOGGER.debug(f"Pushing chunks from {seg_start} until {seg_end} to queue.")
LOGGER.debug(f"Generator yielding chunks from {seg_start} until {seg_end}.")
for i in range(seg_start, seg_end):
wait_count = 0
while i not in self.chunk_cache:
if self.aborted:
LOGGER.debug(
f"Terminating slicer for segment {segment} for {self.container}/{self.path} due to upload abortion."
)
return
wait_count += 1
await asyncio.sleep(0.05)
# We can safely ignore the warning about using the random library
await asyncio.sleep(random.uniform(0.1, 0.2)) # nosec # noqa: S311
# If handler has waited for too long for the next chunk, retry
# Currently 10 seconds is considered too long
if wait_count > 2000:
# Currently 60 seconds is considered too long
if wait_count > 600:
try:
await self.retry_chunk(i)
wait_count = 0
except ConnectionResetError:
pass
self.done_chunks.add(i)
chunk = self.chunk_cache.pop(i)
await q.put(chunk)

# Queue EOF
await q.put(b"")

async def queue_generator(self, q: asyncio.Queue):
"""Consume the upload queue."""
LOGGER.debug("Starting consumption of the queue.")
chunk = await q.get()
while chunk:
yield chunk
chunk = await q.get()

# Finally yield eof
return

async def upload_segment(self, order: int) -> int:
"""Upload the segment with given ordering number."""
# Fetch the queue from storage
q = self.q_cache[order]
seg_start = order * SEGMENT_CHUNKS
# Wait until first chunk is available in cache, before starting the request
LOGGER.debug(f"Waiting until first chunk in segment {order} is available.")
while (seg_start) not in self.chunk_cache:
if self.aborted:
LOGGER.debug(
f"Terminating segment {order} for {self.container}/{self.path} early due to upload abortion."
)
return 410
# We can safely ignore the warning about using the random library
await asyncio.sleep(random.uniform(0.1, 0.2)) # nosec # noqa: S311
LOGGER.debug(f"Got first chunk for segment {order}. Starting upload...")

headers = {
"X-Auth-Token": self.token,
"Content-Type": "application/swiftclient-segment",
Expand All @@ -245,14 +263,18 @@ async def upload_segment(self, order: int) -> int:
container=f"{self.container}{common.SEGMENTS_CONTAINER}",
object_name=f"{self.path}/{self.segment_id}/{(order + 1):08d}",
),
data=self.queue_generator(q),
data=self.slice_segment(order),
headers=headers,
timeout=UPL_TIMEOUT,
ssl=ssl_context,
) as resp:
LOGGER.info(f"Segment {order} finished with status {resp.status}.")

if self.total_segments - 1 == order:
if (
self.total_segments - 1 == order
and self.socket is not None
and not self.socket.closed
):
LOGGER.info("Informing client that file was finished.")
await self.socket.send_bytes(
msgpack.packb(
Expand Down Expand Up @@ -290,28 +312,24 @@ async def finish_upload(self):

async def abort_upload(self):
"""Abort the upload."""
await self.socket.send_bytes(
msgpack.packb(
{
"command": "abort",
"container": self.container,
"object": self.path,
"reason": "cancel",
}
if self.socket is not None and not self.socket.closed:
await self.socket.send_bytes(
msgpack.packb(
{
"command": "abort",
"container": self.container,
"object": self.path,
"reason": "cancel",
}
)
)
)

for q in self.q_cache:
await q.put(b"")
self.aborted = True

try:
await asyncio.gather(*self.tasks)
except ConnectionResetError:
pass
finally:
for task in self.tasks:
if not task.done():
task.cancel()
await asyncio.gather(*self.tasks)
for task in self.tasks:
if not task.done():
task.cancel()

# Delete segments that might've been uploaded
headers = {
Expand Down Expand Up @@ -369,7 +387,12 @@ async def handle_begin_upload(self, msg: typing.Dict[str, typing.Any]) -> None:
owner_name = str(msg["owner_name"])
total = int(msg["total"])

if container in self.uploads and path in self.uploads[container] and self.ws:
if (
container in self.uploads
and path in self.uploads[container]
and self.ws is not None
and not self.ws.closed
):
await self.ws.send_bytes(
msgpack.packb(
{
Expand Down Expand Up @@ -412,6 +435,33 @@ async def handle_upload_chunk(self, msg: typing.Dict[str, typing.Any]):
bytes(msg["data"]),
)

async def handle_upload_chunks(self, msg: typing.Dict[str, typing.Any]):
"""Handle the addition of multiple new chunks."""
container: str = str(msg["container"])
path: str = str(msg["object"])

for chunk in msg["chunks"]:
await self.uploads[container][path].add_to_chunks(
int(chunk["order"]),
bytes(chunk["data"]),
)

if (
container in self.uploads
and path in self.uploads[container]
and await self.uploads[container][path].wait_for_cache()
and self.ws is not None
):
await self.ws.send_bytes(
msgpack.packb(
{
"command": "next",
"container": container,
"object": path,
}
)
)

async def handle_finish_upload(self, msg: typing.Dict[str, typing.Any]):
"""Handle the upload end."""
container: str = str(msg["container"])
Expand All @@ -430,6 +480,9 @@ async def handle_close(self):
)
await asyncio.gather(*abort_tasks)

LOGGER.debug("Clearing upload session directory.")
self.uploads = {}


def get_encrypted_upload_session(
request: aiohttp.web.Request,
Expand Down
Loading

0 comments on commit 3d24636

Please sign in to comment.