Skip to content

Commit

Permalink
update assets Reader to use AsyncSeekForward rather than AsyncSeek
Browse files Browse the repository at this point in the history
  • Loading branch information
ldubos committed Aug 26, 2024
1 parent 1caa64d commit e4960f5
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 92 deletions.
27 changes: 24 additions & 3 deletions crates/bevy_asset/src/io/file/file_asset.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,35 @@
use crate::io::{
get_meta_path, AssetReader, AssetReaderError, AssetWriter, AssetWriterError, PathStream,
Reader, Writer,
get_meta_path, AssetReader, AssetReaderError, AssetWriter, AssetWriterError, AsyncSeekForward,
PathStream, Reader, Writer,
};
use async_fs::{read_dir, File};
use futures_io::AsyncSeek;
use futures_lite::StreamExt;

use std::path::Path;
use std::task;
use std::{path::Path, pin::Pin, task::Poll};

use super::{FileAssetReader, FileAssetWriter};

impl AsyncSeekForward for File {
fn poll_seek_forward(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
offset: u64,
) -> Poll<futures_io::Result<u64>> {
let offset: Result<i64, _> = offset.try_into();

if let Ok(offset) = offset {
Pin::new(&mut self).poll_seek(cx, futures_io::SeekFrom::Current(offset))
} else {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"seek position is out of range",
)))
}
}
}

impl Reader for File {}

impl AssetReader for FileAssetReader {
Expand Down
16 changes: 9 additions & 7 deletions crates/bevy_asset/src/io/file/sync_file_asset.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use futures_io::{AsyncRead, AsyncSeek, AsyncWrite};
use futures_io::{AsyncRead, AsyncWrite};
use futures_lite::Stream;

use crate::io::{
get_meta_path, AssetReader, AssetReaderError, AssetWriter, AssetWriterError, PathStream,
Reader, Writer,
get_meta_path, AssetReader, AssetReaderError, AssetWriter, AssetWriterError, AsyncSeekForward,
PathStream, Reader, Writer,
};

use std::{
Expand All @@ -30,14 +30,16 @@ impl AsyncRead for FileReader {
}
}

impl AsyncSeek for FileReader {
fn poll_seek(
impl AsyncSeekForward for FileReader {
fn poll_seek_forward(
self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
pos: std::io::SeekFrom,
offset: u64,
) -> Poll<std::io::Result<u64>> {
let this = self.get_mut();
let seek = this.0.seek(pos);
let current = this.0.stream_position()?;
let seek = this.0.seek(std::io::SeekFrom::Start(current + offset));

Poll::Ready(seek)
}
}
Expand Down
40 changes: 12 additions & 28 deletions crates/bevy_asset/src/io/memory.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use crate::io::{AssetReader, AssetReaderError, PathStream, Reader};
use bevy_utils::HashMap;
use futures_io::{AsyncRead, AsyncSeek};
use futures_io::AsyncRead;
use futures_lite::{ready, Stream};
use parking_lot::RwLock;
use std::io::SeekFrom;
use std::{
path::{Path, PathBuf},
pin::Pin,
sync::Arc,
task::Poll,
};

use super::AsyncSeekForward;

#[derive(Default, Debug)]
struct DirInternal {
assets: HashMap<Box<str>, Data>,
Expand Down Expand Up @@ -237,37 +238,20 @@ impl AsyncRead for DataReader {
}
}

impl AsyncSeek for DataReader {
fn poll_seek(
impl AsyncSeekForward for DataReader {
fn poll_seek_forward(
mut self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
pos: SeekFrom,
offset: u64,
) -> Poll<std::io::Result<u64>> {
let result = match pos {
SeekFrom::Start(offset) => offset.try_into(),
SeekFrom::End(offset) => self
.data
.value()
.len()
.try_into()
.map(|len: i64| len - offset),
SeekFrom::Current(offset) => self
.bytes_read
.try_into()
.map(|bytes_read: i64| bytes_read + offset),
};
let result = self
.bytes_read
.try_into()
.map(|bytes_read: u64| bytes_read + offset);

if let Ok(new_pos) = result {
if new_pos < 0 {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"seek position is out of range",
)))
} else {
self.bytes_read = new_pos as _;

Poll::Ready(Ok(new_pos as _))
}
self.bytes_read = new_pos as _;
Poll::Ready(Ok(new_pos as _))
} else {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
Expand Down
107 changes: 61 additions & 46 deletions crates/bevy_asset/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ pub use futures_lite::AsyncWriteExt;
pub use source::*;

use bevy_utils::{BoxedFuture, ConditionalSendFuture};
use futures_io::{AsyncRead, AsyncSeek, AsyncWrite};
use futures_io::{AsyncRead, AsyncWrite};
use futures_lite::{ready, Stream};
use std::task::Context;
use std::{
io::SeekFrom,
mem::size_of,
path::{Path, PathBuf},
pin::Pin,
sync::Arc,
task::{Context, Poll},
task::Poll,
};
use thiserror::Error;

Expand Down Expand Up @@ -81,13 +81,51 @@ pub const STACK_FUTURE_SIZE: usize = 10 * size_of::<&()>();

pub use stackfuture::StackFuture;

/// Asynchronously advances the cursor position by a specified number of bytes.
///
/// This trait is a simplified version of the [`futures_io::AsyncSeek`] trait, providing
/// support exclusively for the [`futures_io::SeekFrom::Current`] variant. It allows for relative
/// seeking from the current cursor position.
pub trait AsyncSeekForward {
/// Attempts to asynchronously seek forward by a specified number of bytes from the current cursor position.
///
/// Seeking beyond the end of the stream is allowed and the behavior for this case is defined by the implementation.
/// The new position, relative to the beginning of the stream, should be returned upon successful completion
/// of the seek operation.
///
/// If the seek operation completes successfully,
/// the new position relative to the beginning of the stream should be returned.
///
/// # Implementation
///
/// Implementations of this trait should handle [`Poll::Pending`] correctly, converting
/// [`std::io::ErrorKind::WouldBlock`] errors into [`Poll::Pending`] to indicate that the operation is not
/// yet complete and should be retried, and either internally retry or convert
/// [`std::io::ErrorKind::Interrupted`] into another error kind.
fn poll_seek_forward(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
offset: u64,
) -> Poll<futures_io::Result<u64>>;
}

impl<T: ?Sized + AsyncSeekForward + Unpin> AsyncSeekForward for Box<T> {
fn poll_seek_forward(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
offset: u64,
) -> Poll<futures_io::Result<u64>> {
Pin::new(&mut **self).poll_seek_forward(cx, offset)
}
}

/// A type returned from [`AssetReader::read`], which is used to read the contents of a file
/// (or virtual file) corresponding to an asset.
///
/// This is essentially a trait alias for types implementing [`AsyncRead`] and [`AsyncSeek`].
/// This is essentially a trait alias for types implementing [`AsyncRead`] and [`AsyncSeekForward`].
/// The only reason a blanket implementation is not provided for applicable types is to allow
/// implementors to override the provided implementation of [`Reader::read_to_end`].
pub trait Reader: AsyncRead + AsyncSeek + Unpin + Send + Sync {
pub trait Reader: AsyncRead + AsyncSeekForward + Unpin + Send + Sync {
/// Reads the entire contents of this reader and appends them to a vec.
///
/// # Note for implementors
Expand Down Expand Up @@ -533,32 +571,20 @@ impl AsyncRead for VecReader {
}
}

impl AsyncSeek for VecReader {
fn poll_seek(
impl AsyncSeekForward for VecReader {
fn poll_seek_forward(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
pos: SeekFrom,
offset: u64,
) -> Poll<std::io::Result<u64>> {
let result = match pos {
SeekFrom::Start(offset) => offset.try_into(),
SeekFrom::End(offset) => self.bytes.len().try_into().map(|len: i64| len - offset),
SeekFrom::Current(offset) => self
.bytes_read
.try_into()
.map(|bytes_read: i64| bytes_read + offset),
};
let result = self
.bytes_read
.try_into()
.map(|bytes_read: u64| bytes_read + offset);

if let Ok(new_pos) = result {
if new_pos < 0 {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"seek position is out of range",
)))
} else {
self.bytes_read = new_pos as _;

Poll::Ready(Ok(new_pos as _))
}
self.bytes_read = new_pos as _;
Poll::Ready(Ok(new_pos as _))
} else {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
Expand Down Expand Up @@ -618,32 +644,21 @@ impl<'a> AsyncRead for SliceReader<'a> {
}
}

impl<'a> AsyncSeek for SliceReader<'a> {
fn poll_seek(
impl<'a> AsyncSeekForward for SliceReader<'a> {
fn poll_seek_forward(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
pos: SeekFrom,
offset: u64,
) -> Poll<std::io::Result<u64>> {
let result = match pos {
SeekFrom::Start(offset) => offset.try_into(),
SeekFrom::End(offset) => self.bytes.len().try_into().map(|len: i64| len - offset),
SeekFrom::Current(offset) => self
.bytes_read
.try_into()
.map(|bytes_read: i64| bytes_read + offset),
};
let result = self
.bytes_read
.try_into()
.map(|bytes_read: u64| bytes_read + offset);

if let Ok(new_pos) = result {
if new_pos < 0 {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"seek position is out of range",
)))
} else {
self.bytes_read = new_pos as _;
self.bytes_read = new_pos as _;

Poll::Ready(Ok(new_pos as _))
}
Poll::Ready(Ok(new_pos as _))
} else {
Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
Expand Down
15 changes: 7 additions & 8 deletions crates/bevy_asset/src/io/processor_gated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,16 @@ use crate::{
};
use async_lock::RwLockReadGuardArc;
use bevy_utils::tracing::trace;
use futures_io::{AsyncRead, AsyncSeek};
use std::io::SeekFrom;
use futures_io::AsyncRead;
use std::task::Poll;
use std::{path::Path, pin::Pin, sync::Arc};

use super::ErasedAssetReader;
use super::{AsyncSeekForward, ErasedAssetReader};

/// An [`AssetReader`] that will prevent asset (and asset metadata) read futures from returning for a
/// given path until that path has been processed by [`AssetProcessor`].
///
/// [`AssetProcessor`]: crate::processor::AssetProcessor
/// [`AssetProcessor`]: crate::processor::AssetProcessor
pub struct ProcessorGatedReader {
reader: Box<dyn ErasedAssetReader>,
source: AssetSourceId<'static>,
Expand Down Expand Up @@ -142,13 +141,13 @@ impl AsyncRead for TransactionLockedReader<'_> {
}
}

impl AsyncSeek for TransactionLockedReader<'_> {
fn poll_seek(
impl AsyncSeekForward for TransactionLockedReader<'_> {
fn poll_seek_forward(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
pos: SeekFrom,
offset: u64,
) -> Poll<std::io::Result<u64>> {
Pin::new(&mut self.reader).poll_seek(cx, pos)
Pin::new(&mut self.reader).poll_seek_forward(cx, offset)
}
}

Expand Down

0 comments on commit e4960f5

Please sign in to comment.