Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[c++] Modify ManagedQuery to perform async queries #1953

Merged
merged 3 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions libtiledbsoma/src/soma/managed_query.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,16 @@ ManagedQuery::ManagedQuery(
reset();
}

void ManagedQuery::close() {
if (query_future_.valid()) {
query_future_.wait();
}
array_->close();
}

void ManagedQuery::reset() {
query_ = std::make_unique<Query>(schema_->context(), *array_);
subarray_ = std::make_unique<Subarray>(schema_->context(), *array_);
query_ = std::make_unique<Query>(*ctx_, *array_);
subarray_ = std::make_unique<Subarray>(*ctx_, *array_);

subarray_range_set_ = false;
subarray_range_empty_ = {};
Expand Down Expand Up @@ -145,21 +152,24 @@ void ManagedQuery::submit_write() {
query_->submit();
}

std::shared_ptr<ArrayBuffers> ManagedQuery::submit_read() {
void ManagedQuery::submit_read() {
query_submitted_ = true;
query_future_ = std::async(std::launch::async, [&]() {
LOG_DEBUG("[ManagedQuery] submit thread start");
query_->submit();
LOG_DEBUG("[ManagedQuery] submit thread done");
});
}

std::shared_ptr<ArrayBuffers> ManagedQuery::results() {
if (is_empty_query()) {
return buffers_;
}

query_->submit();

// Poll status until query is not INPROGRESS
Query::Status status;
do {
status = query_->query_status();
} while (status == Query::Status::INPROGRESS);
LOG_DEBUG(fmt::format("[ManagedQuery] [{}] Waiting for query", name_));
query_future_.wait();
nguyenv marked this conversation as resolved.
Show resolved Hide resolved

LOG_DEBUG(fmt::format(
"[ManagedQuery] [{}] Query status = {}", name_, (int)status));
auto status = query_->query_status();

if (status == Query::Status::FAILED) {
throw TileDBSOMAError(
Expand Down
22 changes: 19 additions & 3 deletions libtiledbsoma/src/soma/managed_query.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#ifndef MANAGED_QUERY_H
#define MANAGED_QUERY_H

#include <future>
#include <stdexcept> // for windows: error C2039: 'runtime_error': is not a member of 'std'
#include <unordered_set>

Expand Down Expand Up @@ -68,6 +69,13 @@ class ManagedQuery {
ManagedQuery(ManagedQuery&&) = default;
~ManagedQuery() = default;

/**
* @brief Close the array after waiting for any asynchronous queries to
* complete.
*
*/
void close();

/**
* @brief Reset the state of this ManagedQuery object to prepare for a new
* query, while holding the array open.
Expand Down Expand Up @@ -373,11 +381,17 @@ class ManagedQuery {
}

/**
* @brief Submit and return results from the query.
* @brief Submit the query.
*
*/
void submit_read();

/**
* @brief Return results from the query.
*
* @return std::shared_ptr<ArrayBuffers>
*/
std::shared_ptr<ArrayBuffers> submit_read();
std::shared_ptr<ArrayBuffers> results();

/**
* @brief Submit the write query.
Expand Down Expand Up @@ -469,8 +483,10 @@ class ManagedQuery {

// True if the query has been submitted
bool query_submitted_ = false;
};

// Future for asyncronous query
std::future<void> query_future_;
};
}; // namespace tiledbsoma

#endif
10 changes: 7 additions & 3 deletions libtiledbsoma/src/soma/soma_array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,9 @@ void SOMAArray::open(
}

void SOMAArray::close() {
arr_->close();
// Close the array through the managed query to ensure any pending queries
// are completed.
mq_->close();
}

void SOMAArray::reset(
Expand Down Expand Up @@ -234,16 +236,18 @@ std::optional<std::shared_ptr<ArrayBuffers>> SOMAArray::read_next() {
if (mq_->is_empty_query()) {
if (first_read_next_) {
first_read_next_ = false;
return mq_->submit_read();
return mq_->results();
} else {
return std::nullopt;
}
}

first_read_next_ = false;

mq_->submit_read();

// Return the results, possibly incomplete
return mq_->submit_read();
return mq_->results();
}

void SOMAArray::write(std::shared_ptr<ArrayBuffers> buffers) {
Expand Down
9 changes: 6 additions & 3 deletions libtiledbsoma/test/unit_managed_query.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ TEST_CASE("ManagedQuery: Basic execution test") {
auto mq = ManagedQuery(array, ctx);
mq.setup_read();

auto results = mq.submit_read();
mq.submit_read();
auto results = mq.results();
REQUIRE(mq.results_complete());

auto num_cells = mq.total_num_cells();
Expand All @@ -144,7 +145,8 @@ TEST_CASE("ManagedQuery: Select test") {
mq.select_points<std::string>("d0", {"a"});
mq.setup_read();

auto results = mq.submit_read();
mq.submit_read();
auto results = mq.results();
REQUIRE(mq.results_complete());

auto num_cells = mq.total_num_cells();
Expand All @@ -166,7 +168,8 @@ TEST_CASE("ManagedQuery: Validity test") {
auto mq = ManagedQuery(array, ctx);
mq.setup_read();

auto results = mq.submit_read();
mq.submit_read();
auto results = mq.results();
REQUIRE(mq.results_complete());

auto num_cells = mq.total_num_cells();
Expand Down
Loading