Skip to content

Commit

Permalink
[c++] Modify ManagedQuery to perform async queries (#1953)
Browse files Browse the repository at this point in the history
* Addition of async queries

* Modify Python CI workflow

* Check validity of `query_future_`
  • Loading branch information
nguyenv authored and github-actions[bot] committed Dec 6, 2023
1 parent 5408287 commit acc145e
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 21 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/python-ci-single.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ jobs:
build:
runs-on: ${{ inputs.os }}
steps:
- name: Show matrix OS
run: echo "inputs.os:" ${{ inputs.os }}

- name: Linux CPU info
if: ${{ inputs.os == 'ubuntu-22.04' }}
run: cat /proc/cpuinfo

- name: MacOS CPU info
if: ${{ inputs.os == 'macos-12' }}
run: sysctl -a | grep cpu
- name: Select XCode version
if: inputs.is_mac
uses: maxim-lobanov/setup-xcode@v1
Expand Down
39 changes: 27 additions & 12 deletions libtiledbsoma/src/soma/managed_query.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,16 @@ ManagedQuery::ManagedQuery(
reset();
}

void ManagedQuery::close() {
if (query_future_.valid()) {
query_future_.wait();
}
array_->close();
}

void ManagedQuery::reset() {
query_ = std::make_unique<Query>(schema_->context(), *array_);
subarray_ = std::make_unique<Subarray>(schema_->context(), *array_);
query_ = std::make_unique<Query>(*ctx_, *array_);
subarray_ = std::make_unique<Subarray>(*ctx_, *array_);

subarray_range_set_ = false;
subarray_range_empty_ = {};
Expand Down Expand Up @@ -145,21 +152,29 @@ void ManagedQuery::submit_write() {
query_->submit();
}

std::shared_ptr<ArrayBuffers> ManagedQuery::submit_read() {
void ManagedQuery::submit_read() {
query_submitted_ = true;
query_future_ = std::async(std::launch::async, [&]() {
LOG_DEBUG("[ManagedQuery] submit thread start");
query_->submit();
LOG_DEBUG("[ManagedQuery] submit thread done");
});
}

std::shared_ptr<ArrayBuffers> ManagedQuery::results() {
if (is_empty_query()) {
return buffers_;
}

query_->submit();

// Poll status until query is not INPROGRESS
Query::Status status;
do {
status = query_->query_status();
} while (status == Query::Status::INPROGRESS);
if (query_future_.valid()) {
LOG_DEBUG(fmt::format("[ManagedQuery] [{}] Waiting for query", name_));
query_future_.wait();
} else {
throw TileDBSOMAError(
fmt::format("[ManagedQuery] [{}] 'query_future_' invalid", name_));
}

LOG_DEBUG(fmt::format(
"[ManagedQuery] [{}] Query status = {}", name_, (int)status));
auto status = query_->query_status();

if (status == Query::Status::FAILED) {
throw TileDBSOMAError(
Expand Down
22 changes: 19 additions & 3 deletions libtiledbsoma/src/soma/managed_query.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#ifndef MANAGED_QUERY_H
#define MANAGED_QUERY_H

#include <future>
#include <stdexcept> // for windows: error C2039: 'runtime_error': is not a member of 'std'
#include <unordered_set>

Expand Down Expand Up @@ -68,6 +69,13 @@ class ManagedQuery {
ManagedQuery(ManagedQuery&&) = default;
~ManagedQuery() = default;

/**
* @brief Close the array after waiting for any asynchronous queries to
* complete.
*
*/
void close();

/**
* @brief Reset the state of this ManagedQuery object to prepare for a new
* query, while holding the array open.
Expand Down Expand Up @@ -373,11 +381,17 @@ class ManagedQuery {
}

/**
* @brief Submit and return results from the query.
* @brief Submit the query.
*
*/
void submit_read();

/**
* @brief Return results from the query.
*
* @return std::shared_ptr<ArrayBuffers>
*/
std::shared_ptr<ArrayBuffers> submit_read();
std::shared_ptr<ArrayBuffers> results();

/**
* @brief Submit the write query.
Expand Down Expand Up @@ -469,8 +483,10 @@ class ManagedQuery {

// True if the query has been submitted
bool query_submitted_ = false;
};

// Future for asyncronous query
std::future<void> query_future_;
};
}; // namespace tiledbsoma

#endif
10 changes: 7 additions & 3 deletions libtiledbsoma/src/soma/soma_array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,9 @@ void SOMAArray::open(
}

void SOMAArray::close() {
arr_->close();
// Close the array through the managed query to ensure any pending queries
// are completed.
mq_->close();
}

void SOMAArray::reset(
Expand Down Expand Up @@ -234,16 +236,18 @@ std::optional<std::shared_ptr<ArrayBuffers>> SOMAArray::read_next() {
if (mq_->is_empty_query()) {
if (first_read_next_) {
first_read_next_ = false;
return mq_->submit_read();
return mq_->results();
} else {
return std::nullopt;
}
}

first_read_next_ = false;

mq_->submit_read();

// Return the results, possibly incomplete
return mq_->submit_read();
return mq_->results();
}

void SOMAArray::write(std::shared_ptr<ArrayBuffers> buffers) {
Expand Down
9 changes: 6 additions & 3 deletions libtiledbsoma/test/unit_managed_query.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ TEST_CASE("ManagedQuery: Basic execution test") {
auto mq = ManagedQuery(array, ctx);
mq.setup_read();

auto results = mq.submit_read();
mq.submit_read();
auto results = mq.results();
REQUIRE(mq.results_complete());

auto num_cells = mq.total_num_cells();
Expand All @@ -144,7 +145,8 @@ TEST_CASE("ManagedQuery: Select test") {
mq.select_points<std::string>("d0", {"a"});
mq.setup_read();

auto results = mq.submit_read();
mq.submit_read();
auto results = mq.results();
REQUIRE(mq.results_complete());

auto num_cells = mq.total_num_cells();
Expand All @@ -166,7 +168,8 @@ TEST_CASE("ManagedQuery: Validity test") {
auto mq = ManagedQuery(array, ctx);
mq.setup_read();

auto results = mq.submit_read();
mq.submit_read();
auto results = mq.results();
REQUIRE(mq.results_complete());

auto num_cells = mq.total_num_cells();
Expand Down

0 comments on commit acc145e

Please sign in to comment.