Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read in row-group order #2133

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,8 @@ set(arcticdb_srcs
version/version_core.cpp
version/version_store_api.cpp
version/version_utils.cpp
version/version_map_batch_methods.cpp)
version/version_map_batch_methods.cpp processing/resample_boundaries.hpp pipeline/chunking.cpp pipeline/error.hpp)


add_library(arcticdb_core_object OBJECT ${arcticdb_srcs})

Expand Down Expand Up @@ -753,8 +754,8 @@ if (SSL_LINK)
find_package(OpenSSL REQUIRED)
list(APPEND arcticdb_core_libraries OpenSSL::SSL)
if (NOT WIN32)
list(APPEND arcticdb_core_libraries ${KERBEROS_LIBRARY})
list(APPEND arcticdb_core_includes ${KERBEROS_INCLUDE_DIR})
#list(APPEND arcticdb_core_libraries ${KERBEROS_LIBRARY})
#list(APPEND arcticdb_core_includes ${KERBEROS_INCLUDE_DIR})
endif()
endif ()
target_link_libraries(arcticdb_core_object PUBLIC ${arcticdb_core_libraries})
Expand Down
100 changes: 100 additions & 0 deletions cpp/arcticdb/pipeline/chunking.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/* Copyright 2023 Man Group Operations Limited
*
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
*/

#include <optional>
#include <arcticdb/pipeline/chunking.hpp>
#include <arcticdb/pipeline/read_frame.hpp>

namespace arcticdb {

ChunkIterator::ChunkIterator(
pipelines::index::IndexSegmentReader&& index_segment_reader,
std::shared_ptr<pipelines::PipelineContext> pipeline_context,
AtomKey index_key,
std::shared_ptr<Store> store,
ReadQuery& read_query,
const ReadOptions& read_options,
std::any& handler_data,
DecodePathData shared_data) :
index_segment_reader_(std::move(index_segment_reader)),
pipeline_context_(std::move(pipeline_context)),
index_key_(index_key),
store_(std::move(store)),
handler_data_(handler_data),
shared_data_(std::move(shared_data)),
read_query_(std::make_shared<ReadQuery>(read_query)),
read_options_(read_options),
read_ahead_(ConfigsMap::instance()->get_int("Chunk.ReadaheadRows", 1)),
row_ranges_to_read_(pipeline_context_->fetch_index_.count()) {
do_read_ahead();
}

std::optional<ReadResult> ChunkIterator::next() {
auto release_gil = std::make_unique<py::gil_scoped_release>();
if (row_pos_ == row_ranges_to_read_)
return std::nullopt;

auto required_row_pos = row_pos_;
util::check(required_row_pos < results_.size(),
"Request for row pos that has not been scheduled: {} >= {}",
required_row_pos,
results_.size());
++row_pos_;
do_read_ahead();
auto read_version = std::move(results_[required_row_pos]).get();
return create_python_read_result(read_version.versioned_item_, std::move(read_version.frame_and_descriptor_));
}

void ChunkIterator::do_read_ahead() {
while (scheduled_row_pos_ < row_pos_ + read_ahead_ && scheduled_row_pos_ < row_ranges_to_read_)
schedule_row_range();
}

void ChunkIterator::schedule_row_range() {
auto local_context = std::make_shared<PipelineContext>();
local_context->set_descriptor(pipeline_context_->descriptor());
const auto previous_row_range = pipeline_context_->slice_and_keys_[slice_and_key_pos_].slice_.row_range;
auto current_row_range = previous_row_range;
const auto start_pos = slice_and_key_pos_;
while (current_row_range == previous_row_range && slice_and_key_pos_ < pipeline_context_->slice_and_keys_.size()) {
local_context->slice_and_keys_.emplace_back(pipeline_context_->slice_and_keys_[slice_and_key_pos_]);
local_context->fetch_index_.set_bit(row_pos_, pipeline_context_->fetch_index_[slice_and_key_pos_]);
++slice_and_key_pos_;
if (slice_and_key_pos_ == pipeline_context_->slice_and_keys_.size())
break;

current_row_range = pipeline_context_->slice_and_keys_[slice_and_key_pos_].slice_.row_range;
}

const auto row_range_size = slice_and_key_pos_ - start_pos;
util::check(row_range_size == local_context->slice_and_keys_.size(),
"Expected equality of range size and slice and keys {} != {},",
row_range_size,
local_context->slice_and_keys_.size());

pipeline_context_->fetch_index_.copy_range(local_context->fetch_index_, start_pos, slice_and_key_pos_);
local_context->fetch_index_.resize(row_range_size);
local_context->norm_meta_ = pipeline_context_->norm_meta_;

auto frame = allocate_frame(local_context);
auto output = do_direct_read_or_process(store_,
read_query_,
read_options_,
local_context,
shared_data_,
handler_data_).thenValue(
[this, frame, local_context](auto&&) mutable {
return ReadVersionOutput{
VersionedItem{to_atom(index_key_)},
FrameAndDescriptor{frame, TimeseriesDescriptor{index_segment_reader_.tsd()}, {}, {}}};
});

results_.emplace_back(std::move(output));
++scheduled_row_pos_;
}

} // namespace arcticdb
55 changes: 55 additions & 0 deletions cpp/arcticdb/pipeline/chunking.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/* Copyright 2023 Man Group Operations Limited
*
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
*/

#pragma once

#include <optional>
#include <arcticdb/pipeline/index_segment_reader.hpp>
#include <arcticdb/pipeline/pipeline_context.hpp>
#include <arcticdb/version/read_version_output.hpp>
#include <arcticdb/entity/read_result.hpp>
#include <arcticdb/pipeline/read_options.hpp>
#include <arcticdb/pipeline/query.hpp>

namespace arcticdb {

class ChunkIterator {
pipelines::index::IndexSegmentReader index_segment_reader_;
std::shared_ptr<pipelines::PipelineContext> pipeline_context_;
AtomKey index_key_;
std::shared_ptr<Store> store_;
std::any handler_data_;
DecodePathData shared_data_;
std::shared_ptr<pipelines::ReadQuery> read_query_;
ReadOptions read_options_;
size_t row_pos_ = 0;
size_t scheduled_row_pos_ = 0;
size_t read_ahead_;
const size_t row_ranges_to_read_;
size_t slice_and_key_pos_ = 0;
std::vector<folly::Future<ReadVersionOutput>> results_;

public:
ChunkIterator(
pipelines::index::IndexSegmentReader&& index_segment_reader,
std::shared_ptr<pipelines::PipelineContext> pipeline_context,
AtomKey index_key,
std::shared_ptr<Store> store,
pipelines::ReadQuery& read_query,
const ReadOptions& read_options,
std::any& handler_data,
DecodePathData shared_data);

std::optional<ReadResult> next();
private:
void do_read_ahead();

void schedule_row_range();

};

} // namespace arcticdb
22 changes: 22 additions & 0 deletions cpp/arcticdb/pipeline/error.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/* Copyright 2023 Man Group Operations Limited
*
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
*/
#pragma once

#include <vector>
#include <string>
#include <folly/Function.h>

namespace arcticdb {
struct Error {

explicit Error(folly::Function<void(std::string)> raiser, std::string msg);
void throw_error();

folly::Function<void(std::string)> raiser_;
std::string msg_;
};
}
6 changes: 6 additions & 0 deletions cpp/arcticdb/pipeline/pipeline_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,10 @@ inline ReadResult read_result_from_single_frame(FrameAndDescriptor& frame_and_de
return create_python_read_result(VersionedItem{key}, std::move(frame_and_desc));
}

inline void sort_by_row_range(std::vector<SliceAndKey>& slices_and_keys) {
std::sort(std::begin(slices_and_keys), std::end(slices_and_keys), [] (const SliceAndKey& left, const SliceAndKey& right) {
return std::tie(left.slice_.row_range.first, left.slice_.col_range.first) < std::tie(right.slice_.row_range.first, right.slice_.col_range.first);
});
}

}
Loading
Loading