diff --git a/CHANGELOG.md b/CHANGELOG.md index e499cc0e14..7af48e037b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,10 @@ If possible, provide tooling that performs the changes, e.g. a shell-script. #### Core * Added traits for "metaprogramming" with `seqan3::type_list` and type packs. +#### Input/Output + +* Asynchronous input (background file reading) supported via seqan3::view::async_input_buffer. + ## API changes * **Customising for third party types has changes slightly:** diff --git a/include/seqan3/contrib/parallel/buffer_queue.hpp b/include/seqan3/contrib/parallel/buffer_queue.hpp index 683748b076..41288332b7 100644 --- a/include/seqan3/contrib/parallel/buffer_queue.hpp +++ b/include/seqan3/contrib/parallel/buffer_queue.hpp @@ -64,9 +64,9 @@ class buffer_queue buffer_queue() : buffer_queue{0u} {} buffer_queue(buffer_queue const &) = delete; - buffer_queue(buffer_queue &&) = default; + buffer_queue(buffer_queue &&) = delete; buffer_queue & operator=(buffer_queue const &) = delete; - buffer_queue & operator=(buffer_queue &&) = default; + buffer_queue & operator=(buffer_queue &&) = delete; ~buffer_queue() = default; // you can set the initial capacity here diff --git a/include/seqan3/range/detail/inherited_iterator_base.hpp b/include/seqan3/range/detail/inherited_iterator_base.hpp index 4fa2e5fb82..c4cfc8d205 100644 --- a/include/seqan3/range/detail/inherited_iterator_base.hpp +++ b/include/seqan3/range/detail/inherited_iterator_base.hpp @@ -46,8 +46,13 @@ namespace seqan3::detail * \snippet test/unit/range/detail/inherited_iterator_base_test.cpp inherited_iterator_base def */ template -class inherited_iterator_base : public std::conditional_t, empty_type, base_t> +class inherited_iterator_base : public std::conditional_t || !std::semiregular, + empty_type, + base_t> { +private: + //!\brief Whether this iterator inherits or wraps. + static constexpr bool wrap_base = std::is_pointer_v || !std::semiregular; public: /*!\name Associated types * \brief All are derived from the base_t. @@ -86,7 +91,7 @@ class inherited_iterator_base : public std::conditional_t) //!\cond - requires !std::is_pointer_v + requires !wrap_base //!\endcond : base_t{std::move(it)} {} @@ -94,7 +99,7 @@ class inherited_iterator_base : public std::conditional_t + requires wrap_base //!\endcond : member{std::move(it)} {} @@ -181,11 +186,17 @@ ame Arithmetic operators return *this_derived(); } + //!\brief Post-increment of non-copyable iterators returns void. + constexpr void operator++(int) noexcept(noexcept(++std::declval())) + { + ++(*this_derived()); + } + //!\brief Post-increment, return previous iterator state. constexpr derived_t operator++(int) noexcept(noexcept(++std::declval()) && noexcept(derived_t(std::declval()))) //!\cond - requires std::input_iterator + requires std::semiregular //!\endcond { derived_t cpy{*this_to_base()}; @@ -292,16 +303,34 @@ ame Arithmetic operators * \{ */ //!\brief Dereference operator returns element currently pointed at. - constexpr reference operator*() const noexcept(noexcept(*std::declval())) + constexpr reference operator*() noexcept(noexcept(*std::declval())) //!\cond - requires std::input_iterator + requires std::readable + //!\endcond + { + return **this_to_base(); + } + + //!\brief Dereference operator returns element currently pointed at. + constexpr decltype(auto) operator*() const noexcept(noexcept(*std::declval())) + //!\cond + requires std::readable //!\endcond { return **this_to_base(); } //!\brief Return pointer to this iterator. - constexpr pointer operator->() const noexcept(noexcept(*std::declval())) + constexpr pointer operator->() noexcept(noexcept(*std::declval())) + //!\cond + requires std::input_iterator + //!\endcond + { + return &*this_to_base(); + } + + //!\brief Return pointer to this iterator. + constexpr decltype(auto) operator->() const noexcept(noexcept(*std::declval())) //!\cond requires std::input_iterator //!\endcond @@ -310,7 +339,7 @@ ame Arithmetic operators } //!\brief Return underlying container value currently pointed at. - constexpr decltype(auto) operator[](std::make_signed_t const n) const + constexpr decltype(auto) operator[](std::make_signed_t const n) noexcept(noexcept(*std::declval()) && noexcept(std::declval() + 3)) //!\cond requires std::random_access_iterator @@ -318,11 +347,21 @@ ame Arithmetic operators { return *(*this_derived() + n); } + + //!\brief Return underlying container value currently pointed at. + constexpr decltype(auto) operator[](std::make_signed_t const n) const + noexcept(noexcept(*std::declval()) && noexcept(std::declval() + 3)) + //!\cond + requires std::random_access_iterator + //!\endcond + { + return *(*this_derived() + n); + } //!\} private: //!\brief If the base is a pointer, we wrap it instead of inheriting. - std::conditional_t, base_t, empty_type> member; + std::conditional_t member; //!\brief Befriend the derived type so it can access the private members. friend derived_t; @@ -342,7 +381,7 @@ ame Arithmetic operators //!\brief Cast this to base type. constexpr base_t * this_to_base() { - if constexpr (std::is_pointer_v) + if constexpr (wrap_base) return &member; else return static_cast(this); @@ -351,7 +390,7 @@ ame Arithmetic operators //!\copydoc this_to_base constexpr base_t const * this_to_base() const { - if constexpr (std::is_pointer_v) + if constexpr (wrap_base) return &member; else return static_cast(this); diff --git a/include/seqan3/range/view/async_input_buffer.hpp b/include/seqan3/range/view/async_input_buffer.hpp new file mode 100644 index 0000000000..e142929d78 --- /dev/null +++ b/include/seqan3/range/view/async_input_buffer.hpp @@ -0,0 +1,503 @@ +// ----------------------------------------------------------------------------------------------------- +// Copyright (c) 2006-2019, Knut Reinert & Freie Universität Berlin +// Copyright (c) 2016-2019, Knut Reinert & MPI für molekulare Genetik +// This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License +// shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md +// ----------------------------------------------------------------------------------------------------- + +/*!\file + * \author Hannes Hauswedell + * \brief Provides seqan3::view::async_input_buffer. + */ + +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include + +//----------------------------------------------------------------------------- +// This is the path a value takes when using this view: +// urange +// → async_input_buffer_view.buffer [size n] +// → async_input_buffer_iterator.cached_value [size 1] +// → user +//----------------------------------------------------------------------------- + +namespace seqan3::detail +{ + +/*!\brief The type returned by seqan3::view::async_input_buffer. + * \tparam urng_t The underlying range type. + * \implements std::ranges::InputRange + * \ingroup view + */ +template +class async_input_buffer_view : public std::ranges::view_interface> +{ +private: + static_assert(std::ranges::input_range, + "The range parameter to async_input_buffer_view must be at least an std::ranges::InputRange."); + static_assert(std::ranges::view, + "The range parameter to async_input_buffer_view must model std::ranges::View."); + static_assert(std::movable>, + "The range parameter to async_input_buffer_view must have a value_type that is std::Movable."); + static_assert(std::constructible_from, std::remove_reference_t> &&>, + "The range parameter to async_input_buffer_view must have a value_type that is constructible by a moved " + "value of its reference type."); + + //!\brief The iterator type for the underlying range. + using urng_iterator_type = std::ranges::iterator_t; + + //!\brief Buffer and thread and shared between copies of this type. + struct state + { + //!\brief The underlying range. + urng_t urange; + + //!\brief The buffer queue. + contrib::fixed_buffer_queue> buffer; + + //!\brief Thread that rebuffers in the background. + std::thread producer; + }; + + //!\brief Shared holder of the state. + std::shared_ptr state_ptr = nullptr; + + //!\brief The iterator of the seqan3::detail::async_input_buffer_view. + class async_input_buffer_iterator; + +public: + /*!\name Constructor, destructor, and assignment. + * \{ + */ + async_input_buffer_view() = default; //!< Defaulted. + async_input_buffer_view(async_input_buffer_view const &) = default; //!< Defaulted. + async_input_buffer_view(async_input_buffer_view &&) = default; //!< Defaulted. + async_input_buffer_view & operator=(async_input_buffer_view const &) = default; //!< Defaulted. + async_input_buffer_view & operator=(async_input_buffer_view &&) = default; //!< Defaulted. + ~async_input_buffer_view() = default; //!< Defaulted. + + //!\brief Construction from the underlying view. + async_input_buffer_view(urng_t _urng, size_t const buffer_size) + { + auto deleter = [] (state * p) + { + if (p != nullptr) + { + p->buffer.close(); + p->producer.join(); + delete p; + } + }; + + state_ptr = std::shared_ptr(new state{std::move(_urng), + contrib::fixed_buffer_queue>{buffer_size}, + std::thread{}}, // thread is set/started below, needs rest of state + deleter); + + auto runner = [&state = *state_ptr] () + { + for (auto && val : state.urange) + if (state.buffer.wait_push(std::move(val)) == contrib::queue_op_status::closed) + break; + + state.buffer.close(); + }; + + state_ptr->producer = std::thread{runner}; + } + + //!\brief Construction from std::ranges::ViewableRange. + template + //!\cond + requires !std::same_as, async_input_buffer_view> && // prevent recursive instantiation + std::ranges::viewable_range && + std::constructible_from>> + //!\endcond + async_input_buffer_view(other_urng_t && _urng, size_t const buffer_size) : + async_input_buffer_view{std::view::all(_urng), buffer_size} + {} + //!\} + + /*!\name Iterators + * \{ + */ + /*!\brief Returns an iterator to the current begin of the underlying range. + * + * \details + * + * ### Thread-Safety + * + * It is thread-safe to call this function. Subsequent calls to begin will result in different + * iterators that are each valid individually. It is thread-safe to operate on different iterators + * from different threads (however it is not thread-safe to operate on a single iterator from different + * threads). + */ + async_input_buffer_iterator begin() + { + assert(state_ptr != nullptr); + return {state_ptr->buffer}; + } + + //!\brief Const-qualified async_input_buffer_view::begin() is deleted, because iterating changes the view. + async_input_buffer_iterator begin() const = delete; + + //!\copydoc async_input_buffer_view::begin() const + async_input_buffer_iterator cbegin() const = delete; + + //!\brief Returns a sentinel. + std::ranges::default_sentinel_t end() + { + return std::ranges::default_sentinel; + } + + //!\brief Const-qualified async_input_buffer_view::end() is deleted, because iterating changes the view. + std::ranges::default_sentinel_t end() const = delete; + + //!\copydoc async_input_buffer_view::end() const + std::ranges::default_sentinel_t cend() const = delete; + //!\} +}; + +//!\brief The iterator of the seqan3::detail::async_input_buffer_view. +template +class async_input_buffer_view::async_input_buffer_iterator +{ + //!\brief The sentinel type to compare to. + using sentinel_type = std::ranges::default_sentinel_t; + + //!\brief The pointer to the associated view. + contrib::fixed_buffer_queue> * buffer_ptr = nullptr; + + //!\brief The cached value this iterator holds. + value_type_t cached_value; + + //!\brief Whether this iterator is at end (the buffer is empty and closed). + bool at_end = false; + +public: + + /*!\name Associated types + * \{ + */ + //!\brief Difference type. + using difference_type = difference_type_t; + //!\brief Value type. + using value_type = value_type_t; + //!\brief Pointer type. + using pointer = value_type *; + //!\brief Reference type. + using reference = value_type &; + //!\brief Iterator category. + using iterator_category = void; + //!\brief Iterator concept. + using iterator_concept = std::input_iterator_tag; + //!\} + + /*!\name Construction, destruction and assignment + * \{ + */ + async_input_buffer_iterator() noexcept = default; //!< Defaulted. + //TODO: delete: + async_input_buffer_iterator(async_input_buffer_iterator const & rhs) noexcept = default; //!< Defaulted. + async_input_buffer_iterator(async_input_buffer_iterator && rhs) noexcept = default; //!< Defaulted. + //TODO: delete: + async_input_buffer_iterator & operator=(async_input_buffer_iterator const & rhs) noexcept = default; //!< Defaulted. + async_input_buffer_iterator & operator=(async_input_buffer_iterator && rhs) noexcept = default; //!< Defaulted. + ~async_input_buffer_iterator() noexcept = default; //!< Defaulted. + + //!\brief Constructing from the underlying seqan3::async_input_buffer_view. + async_input_buffer_iterator(contrib::fixed_buffer_queue> & buffer) noexcept : + buffer_ptr{&buffer} + { + ++(*this); // cache first value + } + //!\} + + /*!\name Access operations + * \{ + */ + //!\brief Dereferences the cached iterator. + reference operator*() noexcept + { + return cached_value; + } + + //!\brief Dereferences the cached iterator. + value_type const & operator*() const noexcept + { + return cached_value; + } + + //!\brief Returns pointer to the pointed-to object. + pointer operator->() const noexcept + { + return std::addressof(cached_value); + } + //!\} + + /*!\name Iterator operations + * \{ + */ + //!\brief Pre-increment. + async_input_buffer_iterator & operator++() noexcept + { + if (at_end) // TODO unlikely + return *this; + + assert(buffer_ptr != nullptr); + + if (buffer_ptr->wait_pop(cached_value) == contrib::queue_op_status::closed) + at_end = true; + + return *this; + } + + //!\brief Post-increment. + void operator++(int) noexcept + { + ++(*this); + } + //!\} + + /*!\name Comparison operators + * \{ + */ + //!\brief Compares for equality with sentinel. + friend constexpr bool operator==(async_input_buffer_iterator const & lhs, + std::ranges::default_sentinel_t const &) noexcept + { + return lhs.at_end; + } + + //!\copydoc operator== + friend constexpr bool operator==(std::ranges::default_sentinel_t const &, + async_input_buffer_iterator const & rhs) noexcept + { + return rhs == std::ranges::default_sentinel_t{}; + } + + //!\brief Compares for inequality with sentinel. + friend constexpr bool operator!=(async_input_buffer_iterator const & lhs, + std::ranges::default_sentinel_t const &) noexcept + { + return !(lhs == std::ranges::default_sentinel_t{}); + } + + //!\copydoc operator!= + friend constexpr bool operator!=(std::ranges::default_sentinel_t const &, + async_input_buffer_iterator const & rhs) noexcept + { + return rhs != std::ranges::default_sentinel_t{}; + } + //!\} +}; + +/*!\name Deduction guide. + * \relates seqan3::detail::async_input_buffer_view + * \{ + */ + +//!\brief Deduces the async_input_buffer_view from the underlying range if it is a std::ranges::ViewableRange. +template +async_input_buffer_view(urng_t &&, size_t const buffer_size) -> async_input_buffer_view>; +//!\} + +// ============================================================================ +// async_input_buffer_fn (adaptor definition +// ============================================================================ + +//!\brief Definition of the range adaptor object type for seqan3::view::async_input_buffer. +struct async_input_buffer_fn +{ + //!\brief Store the argument and return a range adaptor closure object. + constexpr auto operator()(size_t const buffer_size) const + { + return detail::adaptor_from_functor{*this, buffer_size}; + } + + /*!\brief Directly return an instance of the view, initialised with the given parameters. + * \param[in] urange The underlying range. + * \param[in] buffer_size The frame that should be used for translation. + * \returns A range of translated sequence(s). + */ + template + constexpr auto operator()(urng_t && urange, size_t const buffer_size) const + { + static_assert(std::ranges::input_range, + "The range parameter to view::async_input_buffer must be at least an std::ranges::InputRange."); + static_assert(std::ranges::viewable_range, + "The range parameter to view::async_input_buffer cannot be a temporary of a non-view range."); + static_assert(std::movable>, + "The range parameter to view::async_input_buffer must have a value_type that is std::Movable."); + static_assert(std::constructible_from, std::remove_reference_t> &&>, + "The range parameter to view::async_input_buffer must have a value_type that is constructible by a moved " + "value of its reference type."); + + if (buffer_size == 0) + throw std::invalid_argument{"The buffer_size parameter to view::async_input_buffer must be > 0."}; + + return detail::async_input_buffer_view{std::forward(urange), buffer_size}; + } +}; + +} // seqan3::detail + +//----------------------------------------------------------------------------- +// View shortcut for functor. +//----------------------------------------------------------------------------- + +namespace seqan3::view +{ +/*!\name General purpose views + * \{ + */ + +/*!\brief A view adapter that returns a concurrent-queue-like view over the underlying range. + * \tparam urng_t The type of the range being processed. See below for requirements. + * \param[in,out] urange The range being processed. + * \param[in] buffer_size Size of the buffer. Choose the size (> 0) depending on the expected work per element. + * \returns A view that pre-fetches elements from the underlying range and provides a thread-safe interface. + * See below for the properties of the returned range. + * \ingroup view + * + * \details + * + * **Header** + * ```cpp + * #include + * ``` + * + * ### Summary + * + * This view spawns a background thread that pre-fetches elements from the underlying range and stores them in a + * concurrent queue. Iterating over this view then pops elements out of the queue and returns them. + * This is primarily useful if dereferencing/incrementing the iterator of the underlying range + * is expensive, e.g. with SeqAn files which lazily perform I/O. + * + * Another advantage of this view is that multiple iterators can be created that are safe to iterate individually, + * even from different threads, i.e. you can use multiple threads to iterate safely over a single-pass input view + * with the added benefit of background pre-fetching. + * + * In technical terms: this view facilitates a single-producer, multi-consumer design; it's a range interface over + * a concurrent queue. + * + * ### Size of the buffer + * + * The `buffer_size` parameter should be chosen depending on the expected work per element, e.g. if the underlying + * range is an input file over short reads, a buffer size of 100 or 1000 could be beneficial; if on the other hand + * the file contains genome-sized sequences, it would be better to buffer only a single sequence (buffering 100 + * sequences would result in the entire file being preloaded and likely consuming significant memory). + * + * ### Range consumption + * + * This view always moves elements from the underlying range into its buffer which means that the elements in + * the underlying range will be invalidated! For underlying ranges that are single-pass, this makes no difference, but + * it might be unexpected for multi-pass ranges (std::ranges::forward_range). + * + * Typically this adaptor is used when you want to consume the entire underlying range. Destructing + * this view before all elements have been read will also stop the thread that moves object from the underlying + * range. + * **In general, it is not safe to access the underlying range in other contexts once it has been passed + * to seqan3::view::async_input_buffer.** + * + * Note that in addition to the buffer of the view, every iterator has its own one-element-buffer. Dereferencing + * the iterator returns a reference to the element in the buffer, usually you will want to move this element out + * of the buffer with std::move std::ranges::iter_move. Incrementing the iterator refills the buffer from the queue + * inside the view (which in turn is then refilled from the underlying range). + * + * ### View properties + * + * | concepts and reference type | `urng_t` (underlying range type) | `rrng_t` (returned range type) | + * |-------------------------------------------|:---------------------------------:|:---------------------------------:| + * | std::ranges::input_range | *required* | *preserved* | + * | std::ranges::forward_range | | *lost* | + * | std::ranges::bidirectional_range | | *lost* | + * | std::ranges::random_access_range | | *lost* | + * | std::ranges::contiguous_range | | *lost* | + * | | | | + * | std::ranges::viewable_range | *required* | *guaranteed* | + * | std::ranges::view | | *guaranteed* | + * | std::ranges::sized_range | | *lost* | + * | std::ranges::common_range | | *lost* | + * | std::ranges::output_range | | *lost* | + * | seqan3::const_iterable_range | | *lost* | + * | | | | + * | std::ranges::range_reference_t | | `seqan3::value_type_t &` | + * | | | | + * | std::iterator_traits \::iterator_category | | *none* | + * + * See the \link view view submodule documentation \endlink for detailed descriptions of the view properties. + * + * ### Thread safety + * + * The following operations are **thread-safe**: + * + * * calling `.begin()` and `.end()` on the view returned by this adaptor; + * * calling operators on the different iterator objects. + * + * Calling operators on the same iterator object from different threads is not safe, i.e. you can pass the view + * to different threads by reference, and have each of those threads call `begin()` on the view and then + * perform operations (dereference, increment...) on that iterator from the respective thread; but you + * cannot call `begin()` in a parent thread, pass the iterator to different threads and operate on that + * concurrently. + * + * ### Example + * + * \include test/snippet/range/view/async_input_buffer.cpp + * + * Running the snippet could yield the following output: + * + * ``` + * Thread: 0x80116bf00 Seq: seq2 + * Thread: 0x80116bf00 Seq: seq3 + * Thread: 0x80116ba00 Seq: seq1 + * Thread: 0x80116bf00 Seq: seq4 + * Thread: 0x80116bf00 Seq: seq6 + * Thread: 0x80116ba00 Seq: seq5 + * Thread: 0x80116bf00 Seq: seq7 + * Thread: 0x80116ba00 Seq: seq8 + * Thread: 0x80116bf00 Seq: seq9 + * Thread: 0x80116bf00 Seq: seq11 + * Thread: 0x80116bf00 Seq: seq12 + * Thread: 0x80116ba00 Seq: seq10 + * ``` + * This shows that indeed elements from the underlying range are processed non-sequentially, that there are two threads + * and that work is "balanced" between them (one thread processed more element than the other, because its "work" + * per item happened to be smaller). + * + * Note that you might encounter jumbled output if by chance two threads write to the stream at the exact same time. + * + * If you remove the line starting with `auto f1 = ...` you will get sequential processing: + * ``` + * Thread: 0x80116aa00 Seq: seq1 + * Thread: 0x80116aa00 Seq: seq2 + * Thread: 0x80116aa00 Seq: seq3 + * Thread: 0x80116aa00 Seq: seq4 + * Thread: 0x80116aa00 Seq: seq5 + * Thread: 0x80116aa00 Seq: seq6 + * Thread: 0x80116aa00 Seq: seq7 + * Thread: 0x80116aa00 Seq: seq8 + * Thread: 0x80116aa00 Seq: seq9 + * Thread: 0x80116aa00 Seq: seq10 + * Thread: 0x80116aa00 Seq: seq11 + * Thread: 0x80116aa00 Seq: seq12 + * ``` + * + * Note that even if you have a single processing thread, using this view can still improve performance measurably, + * because loading of the elements into the buffer (which reads input from disk) happens in a background thread. + * + * \hideinitializer + */ +inline constexpr auto async_input_buffer = detail::async_input_buffer_fn{}; + +//!\} +} // namespace seqan3::view diff --git a/test/snippet/range/view/async_input_buffer.cpp b/test/snippet/range/view/async_input_buffer.cpp new file mode 100644 index 0000000000..7c1cd1c266 --- /dev/null +++ b/test/snippet/range/view/async_input_buffer.cpp @@ -0,0 +1,66 @@ +#include // std::rand +#include // std::async +#include // std::string + +#include // seqan3::debug_stream +#include // seqan3::sequence_file_input +#include // seqan3::view::async_input_buffer + +std::string fasta_file = +R"(> seq1 +ACGACTACGACGATCATCGATCGATCGATCGATCGATCGATCGATCGTACTACGATCGATCG +> seq2 +ACGACTACGACGATCATCGATCGATCGATCGATCGATCGATCGATCGTACTACGATCGATCG +> seq3 +ACGACTACGACGATCATCGATCGATCGATCGATCGATCGATCGATCGTACTACGATCGATCG +> seq4 +ACGACTACGACGATCATCGATCGATCGATCGATCGATCGATCGATCGTACTACGATCGATCG +> seq5 +ACGACTACGACGATCATCGATCGATCGATCGATCGATCGATCGATCGTACTACGATCGATCG +> seq6 +ACGACTACGACGATCATCGATCGATCGATCGATCGATCGATCGATCGTACTACGATCGATCG +> seq7 +ACGACTACGACGATCATCGATCGATCGATCGATCGATCGATCGATCGTACTACGATCGATCG +> seq8 +ACGACTACGACGATCATCGATCGATCGATCGATCGATCGATCGATCGTACTACGATCGATCG +> seq9 +ACGACTACGACGATCATCGATCGATCGATCGATCGATCGATCGATCGTACTACGATCGATCG +> seq10 +ACGACTACGACGATCATCGATCGATCGATCGATCGATCGATCGATCGTACTACGATCGATCG +> seq11 +ACGACTACGACGATCATCGATCGATCGATCGATCGATCGATCGATCGTACTACGATCGATCG +> seq12 +ACGACTACGACGATCATCGATCGATCGATCGATCGATCGATCGATCGTACTACGATCGATCG +)"; + +int main() +{ + // initialise random number generator, only needed for demonstration purposes + std::srand(std::time(nullptr)); + + // create an input file from the string above + seqan3::sequence_file_input fin{std::istringstream{fasta_file}, seqan3::format_fasta{}}; + + // create the async buffer around the input file + // spawns a background thread that tries to keep four records in the buffer + auto v = fin | seqan3::view::async_input_buffer(4); + + // create a lambda function that iterates over the async buffer when called + // (the buffer gets dynamically refilled as soon as possible) + auto worker = [&v] () + { + for (auto & record : v) + { + // pretend we are doing some work + std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 1000)); + // print current thread and sequence ID + seqan3::debug_stream << "Thread: " << std::this_thread::get_id() << '\t' + << "Seq: " << seqan3::get(record) << '\n'; + + } + }; + + // launch two threads and pass the lambda function to both + auto f0 = std::async(std::launch::async, worker); + auto f1 = std::async(std::launch::async, worker); +} diff --git a/test/unit/range/view/CMakeLists.txt b/test/unit/range/view/CMakeLists.txt index 693984768c..e21106a26a 100644 --- a/test/unit/range/view/CMakeLists.txt +++ b/test/unit/range/view/CMakeLists.txt @@ -2,6 +2,7 @@ add_subdirectories() seqan3_test(adaptor_base_test.cpp) seqan3_test(view_all_test.cpp) +seqan3_test(view_async_input_buffer_test.cpp) seqan3_test(view_char_to_test.cpp) seqan3_test(view_complement_test.cpp) seqan3_test(view_convert_test.cpp) diff --git a/test/unit/range/view/view_async_input_buffer_test.cpp b/test/unit/range/view/view_async_input_buffer_test.cpp new file mode 100644 index 0000000000..bfbf001822 --- /dev/null +++ b/test/unit/range/view/view_async_input_buffer_test.cpp @@ -0,0 +1,124 @@ +// ----------------------------------------------------------------------------------------------------- +// Copyright (c) 2006-2019, Knut Reinert & Freie Universität Berlin +// Copyright (c) 2016-2019, Knut Reinert & MPI für molekulare Genetik +// This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License +// shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md +// ----------------------------------------------------------------------------------------------------- + +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "../iterator_test_template.hpp" + +using namespace seqan3; + +using iterator_type = std::ranges::iterator_t< + decltype(std::declval&>() | view::async_input_buffer(3))>; + +template <> +struct iterator_fixture : public ::testing::Test +{ + using iterator_tag = std::input_iterator_tag; + static constexpr bool const_iterable = false; + + std::vector vec{"ACGTACGTACGTATCGAGAGCTTTAGC"_dna4}; + std::vector expected_range{"ACGTACGTACGTATCGAGAGCTTTAGC"_dna4}; + decltype(view::async_input_buffer(vec, 3)) test_range = view::async_input_buffer(vec, 3); +}; + +using test_type = ::testing::Types; +INSTANTIATE_TYPED_TEST_CASE_P(iterator_fixture, iterator_fixture, test_type); + +TEST(async_input_buffer, in_out) +{ + std::vector vec{"ACGTACGTACGTATCGAGAGCTTTAGC"_dna4}; + + auto v = vec | view::async_input_buffer(3); + + EXPECT_TRUE(std::ranges::equal(vec, v)); +} + +TEST(async_input_buffer, in_out_empty) +{ + std::vector vec{}; + + auto v = vec | view::async_input_buffer(3); + + EXPECT_TRUE(v.begin() == v.end()); +} + +TEST(async_input_buffer, buffer_size_zero) +{ + std::vector vec{"ACGTACGTACGTATCGAGAGCTTTAGC"_dna4}; + + EXPECT_THROW(vec | view::async_input_buffer(0), std::invalid_argument); +} + +TEST(async_input_buffer, buffer_size_huge) +{ + std::vector vec{"ACGTACGTACGTATCGAGAGCTTTAGC"_dna4}; + + auto v = vec | view::async_input_buffer(100000); + + EXPECT_TRUE(std::ranges::equal(vec, v)); +} + +TEST(async_input_buffer, destruct_with_full_buffer) +{ + std::vector vec{"ACGTACGTACGTATCGAGAGCTTTAGC"_dna4}; + + auto v0 = vec | view::single_pass_input; + + { + auto v1 = v0 | view::async_input_buffer(5); + + // consume five elements (construction already consumes one) + auto b = std::ranges::begin(v1); + ++b; ++b; ++b; ++b; + + /* Give time to rebuffer next five elements so the queue will not be empty. + * This is not required for this test to be successful, but it is the only + * way destruction with non-empty buffer is at least likely to happen. + * And we want it to happen to make sure we don't dead-lock on it. + */ + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } // thread sync at destruction of v1; tests working destruction with full buffer + + EXPECT_GE(std::ranges::distance(v0), 17); // total of at most 10 chars consumed +} + +TEST(async_input_buffer, combinability) +{ + std::vector vec{"ACGTACGTACGTATCGAGAGCTTTAGC"_dna4}; + std::vector cmp{"ACGTACGTAC"_dna4}; + + auto adapt = view::async_input_buffer(5) | view::take(10); + + auto v = vec | adapt; + + EXPECT_TRUE(std::ranges::equal(cmp, v)); +} + +TEST(async_input_buffer, concepts) +{ + std::vector vec; + + auto v1 = vec | view::async_input_buffer(1); + + EXPECT_TRUE(std::ranges::input_range); + EXPECT_FALSE(std::ranges::forward_range); + EXPECT_FALSE(std::ranges::random_access_range); + EXPECT_FALSE(std::ranges::sized_range); + EXPECT_FALSE(const_iterable_range); + EXPECT_TRUE(std::ranges::view); +}