diff --git a/cpp/arcticdb/CMakeLists.txt b/cpp/arcticdb/CMakeLists.txt index 8e3391b77a..8af156c2ab 100644 --- a/cpp/arcticdb/CMakeLists.txt +++ b/cpp/arcticdb/CMakeLists.txt @@ -526,7 +526,8 @@ set(arcticdb_srcs version/version_core.cpp version/version_store_api.cpp version/version_utils.cpp - version/version_map_batch_methods.cpp) + version/version_map_batch_methods.cpp processing/resample_boundaries.hpp pipeline/chunking.cpp pipeline/error.hpp) + add_library(arcticdb_core_object OBJECT ${arcticdb_srcs}) @@ -753,8 +754,8 @@ if (SSL_LINK) find_package(OpenSSL REQUIRED) list(APPEND arcticdb_core_libraries OpenSSL::SSL) if (NOT WIN32) - list(APPEND arcticdb_core_libraries ${KERBEROS_LIBRARY}) - list(APPEND arcticdb_core_includes ${KERBEROS_INCLUDE_DIR}) + #list(APPEND arcticdb_core_libraries ${KERBEROS_LIBRARY}) + #list(APPEND arcticdb_core_includes ${KERBEROS_INCLUDE_DIR}) endif() endif () target_link_libraries(arcticdb_core_object PUBLIC ${arcticdb_core_libraries}) diff --git a/cpp/arcticdb/column_store/test/ingestion_stress_test.cpp b/cpp/arcticdb/column_store/test/ingestion_stress_test.cpp index 105c9e37ae..689b74f542 100644 --- a/cpp/arcticdb/column_store/test/ingestion_stress_test.cpp +++ b/cpp/arcticdb/column_store/test/ingestion_stress_test.cpp @@ -129,7 +129,7 @@ TEST_F(IngestionStressStore, ScalarIntAppend) { read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); - auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, ro, handler_data); + auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, ro, *handler_data); GTEST_COUT << "columns in res: " << read_result.frame_data.index_columns().size(); } @@ -217,7 +217,7 @@ TEST_F(IngestionStressStore, ScalarIntDynamicSchema) { read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); - auto read_result = test_store_->read_dataframe_version_internal(symbol, VersionQuery{}, read_query, read_options, handler_data); + auto read_result = test_store_->read_dataframe_version_internal(symbol, VersionQuery{}, read_query, read_options, *handler_data); } TEST_F(IngestionStressStore, DynamicSchemaWithStrings) { @@ -270,6 +270,6 @@ TEST_F(IngestionStressStore, DynamicSchemaWithStrings) { read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); - auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, read_options, handler_data); + auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, read_options, *handler_data); ARCTICDB_DEBUG(log::version(), "result columns: {}", read_result.frame_data.names()); } diff --git a/cpp/arcticdb/pipeline/chunking.cpp b/cpp/arcticdb/pipeline/chunking.cpp new file mode 100644 index 0000000000..723ec5f9e9 --- /dev/null +++ b/cpp/arcticdb/pipeline/chunking.cpp @@ -0,0 +1,100 @@ +/* Copyright 2023 Man Group Operations Limited + * + * Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. + */ + +#include +#include +#include + +namespace arcticdb { + +ChunkIterator::ChunkIterator( + pipelines::index::IndexSegmentReader&& index_segment_reader, + std::shared_ptr pipeline_context, + AtomKey index_key, + std::shared_ptr store, + ReadQuery& read_query, + const ReadOptions& read_options, + std::any& handler_data, + DecodePathData shared_data) : + index_segment_reader_(std::move(index_segment_reader)), + pipeline_context_(std::move(pipeline_context)), + index_key_(index_key), + store_(std::move(store)), + handler_data_(handler_data), + shared_data_(std::move(shared_data)), + read_query_(std::make_shared(read_query)), + read_options_(read_options), + read_ahead_(ConfigsMap::instance()->get_int("Chunk.ReadaheadRows", 1)), + row_ranges_to_read_(pipeline_context_->fetch_index_.count()) { + do_read_ahead(); +} + +std::optional ChunkIterator::next() { + auto release_gil = std::make_unique(); + if (row_pos_ == row_ranges_to_read_) + return std::nullopt; + + auto required_row_pos = row_pos_; + util::check(required_row_pos < results_.size(), + "Request for row pos that has not been scheduled: {} >= {}", + required_row_pos, + results_.size()); + ++row_pos_; + do_read_ahead(); + auto read_version = std::move(results_[required_row_pos]).get(); + return create_python_read_result(read_version.versioned_item_, std::move(read_version.frame_and_descriptor_)); +} + +void ChunkIterator::do_read_ahead() { + while (scheduled_row_pos_ < row_pos_ + read_ahead_ && scheduled_row_pos_ < row_ranges_to_read_) + schedule_row_range(); +} + +void ChunkIterator::schedule_row_range() { + auto local_context = std::make_shared(); + local_context->set_descriptor(pipeline_context_->descriptor()); + const auto previous_row_range = pipeline_context_->slice_and_keys_[slice_and_key_pos_].slice_.row_range; + auto current_row_range = previous_row_range; + const auto start_pos = slice_and_key_pos_; + while (current_row_range == previous_row_range && slice_and_key_pos_ < pipeline_context_->slice_and_keys_.size()) { + local_context->slice_and_keys_.emplace_back(pipeline_context_->slice_and_keys_[slice_and_key_pos_]); + local_context->fetch_index_.set_bit(row_pos_, pipeline_context_->fetch_index_[slice_and_key_pos_]); + ++slice_and_key_pos_; + if (slice_and_key_pos_ == pipeline_context_->slice_and_keys_.size()) + break; + + current_row_range = pipeline_context_->slice_and_keys_[slice_and_key_pos_].slice_.row_range; + } + + const auto row_range_size = slice_and_key_pos_ - start_pos; + util::check(row_range_size == local_context->slice_and_keys_.size(), + "Expected equality of range size and slice and keys {} != {},", + row_range_size, + local_context->slice_and_keys_.size()); + + pipeline_context_->fetch_index_.copy_range(local_context->fetch_index_, start_pos, slice_and_key_pos_); + local_context->fetch_index_.resize(row_range_size); + local_context->norm_meta_ = pipeline_context_->norm_meta_; + + auto frame = allocate_frame(local_context); + auto output = do_direct_read_or_process(store_, + read_query_, + read_options_, + local_context, + shared_data_, + handler_data_).thenValue( + [this, frame, local_context](auto&&) mutable { + return ReadVersionOutput{ + VersionedItem{to_atom(index_key_)}, + FrameAndDescriptor{frame, TimeseriesDescriptor{index_segment_reader_.tsd()}, {}, {}}}; + }); + + results_.emplace_back(std::move(output)); + ++scheduled_row_pos_; +} + +} // namespace arcticdb \ No newline at end of file diff --git a/cpp/arcticdb/pipeline/chunking.hpp b/cpp/arcticdb/pipeline/chunking.hpp new file mode 100644 index 0000000000..e74540ffe6 --- /dev/null +++ b/cpp/arcticdb/pipeline/chunking.hpp @@ -0,0 +1,55 @@ +/* Copyright 2023 Man Group Operations Limited + * + * Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace arcticdb { + +class ChunkIterator { + pipelines::index::IndexSegmentReader index_segment_reader_; + std::shared_ptr pipeline_context_; + AtomKey index_key_; + std::shared_ptr store_; + std::any handler_data_; + DecodePathData shared_data_; + std::shared_ptr read_query_; + ReadOptions read_options_; + size_t row_pos_ = 0; + size_t scheduled_row_pos_ = 0; + size_t read_ahead_; + const size_t row_ranges_to_read_; + size_t slice_and_key_pos_ = 0; + std::vector> results_; + +public: + ChunkIterator( + pipelines::index::IndexSegmentReader&& index_segment_reader, + std::shared_ptr pipeline_context, + AtomKey index_key, + std::shared_ptr store, + pipelines::ReadQuery& read_query, + const ReadOptions& read_options, + std::any& handler_data, + DecodePathData shared_data); + + std::optional next(); +private: + void do_read_ahead(); + + void schedule_row_range(); + +}; + +} // namespace arcticdb \ No newline at end of file diff --git a/cpp/arcticdb/pipeline/error.hpp b/cpp/arcticdb/pipeline/error.hpp new file mode 100644 index 0000000000..e604e7c085 --- /dev/null +++ b/cpp/arcticdb/pipeline/error.hpp @@ -0,0 +1,22 @@ +/* Copyright 2023 Man Group Operations Limited + * + * Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. + */ +#pragma once + +#include +#include +#include + +namespace arcticdb { +struct Error { + + explicit Error(folly::Function raiser, std::string msg); + void throw_error(); + + folly::Function raiser_; + std::string msg_; +}; +} \ No newline at end of file diff --git a/cpp/arcticdb/pipeline/pipeline_utils.hpp b/cpp/arcticdb/pipeline/pipeline_utils.hpp index ae2666ed66..2d59ba3911 100644 --- a/cpp/arcticdb/pipeline/pipeline_utils.hpp +++ b/cpp/arcticdb/pipeline/pipeline_utils.hpp @@ -46,9 +46,15 @@ inline ReadResult read_result_from_single_frame(FrameAndDescriptor& frame_and_de auto descriptor = std::make_shared(frame_and_desc.frame_.descriptor()); pipeline_context->begin()->set_descriptor(std::move(descriptor)); auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(); - reduce_and_fix_columns(pipeline_context, frame_and_desc.frame_, ReadOptions{}, handler_data).get(); - apply_type_handlers(frame_and_desc.frame_, handler_data); + reduce_and_fix_columns(pipeline_context, frame_and_desc.frame_, ReadOptions{}, *handler_data).get(); + apply_type_handlers(frame_and_desc.frame_, *handler_data); return create_python_read_result(VersionedItem{key}, std::move(frame_and_desc)); } +inline void sort_by_row_range(std::vector& slices_and_keys) { + std::sort(std::begin(slices_and_keys), std::end(slices_and_keys), [] (const SliceAndKey& left, const SliceAndKey& right) { + return std::tie(left.slice_.row_range.first, left.slice_.col_range.first) < std::tie(right.slice_.row_range.first, right.slice_.col_range.first); + }); +} + } \ No newline at end of file diff --git a/cpp/arcticdb/pipeline/read_frame.cpp b/cpp/arcticdb/pipeline/read_frame.cpp index de65763748..2dc67d8274 100644 --- a/cpp/arcticdb/pipeline/read_frame.cpp +++ b/cpp/arcticdb/pipeline/read_frame.cpp @@ -27,7 +27,10 @@ #include #include #include - +#include +#include +#include +#include #include #include @@ -631,11 +634,10 @@ struct ReduceColumnTask : async::BaseTask { }; folly::Future reduce_and_fix_columns( - std::shared_ptr &context, - SegmentInMemory &frame, - const ReadOptions& read_options, - std::any& handler_data -) { + std::shared_ptr &context, + SegmentInMemory &frame, + const ReadOptions& read_options, + std::any& handler_data) { ARCTICDB_SAMPLE_DEFAULT(ReduceAndFixStringCol) ARCTICDB_DEBUG(log::version(), "Reduce and fix columns"); if(frame.empty()) @@ -665,7 +667,7 @@ folly::Future reduce_and_fix_columns( } folly::Future fetch_data( - SegmentInMemory&& frame, + SegmentInMemory frame, const std::shared_ptr &context, const std::shared_ptr& ssource, bool dynamic_schema, @@ -700,4 +702,642 @@ folly::Future fetch_data( .thenValue([frame](auto&&){ return frame; }); } +void set_output_descriptors( + const ProcessingUnit& proc, + const std::vector>& clauses, + const std::shared_ptr& pipeline_context) { + std::optional index_column; + for (auto clause = clauses.rbegin(); clause != clauses.rend(); ++clause) { + bool should_break = util::variant_match( + (*clause)->clause_info().index_, + [](const KeepCurrentIndex&) { return false; }, + [&](const KeepCurrentTopLevelIndex&) { + if (pipeline_context->norm_meta_->mutable_df()->mutable_common()->has_multi_index()) { + const auto& multi_index = pipeline_context->norm_meta_->mutable_df()->mutable_common()->multi_index(); + auto name = multi_index.name(); + auto tz = multi_index.tz(); + bool fake_name{false}; + for (auto pos: multi_index.fake_field_pos()) { + if (pos == 0) { + fake_name = true; + break; + } + } + auto mutable_index = pipeline_context->norm_meta_->mutable_df()->mutable_common()->mutable_index(); + mutable_index->set_tz(tz); + mutable_index->set_is_physically_stored(true); + mutable_index->set_name(name); + mutable_index->set_fake_name(fake_name); + } + return true; + }, + [&](const NewIndex& new_index) { + index_column = new_index; + auto mutable_index = pipeline_context->norm_meta_->mutable_df()->mutable_common()->mutable_index(); + mutable_index->set_name(new_index); + mutable_index->clear_fake_name(); + mutable_index->set_is_physically_stored(true); + return true; + }); + if (should_break) { + break; + } + } + std::optional new_stream_descriptor; + if (proc.segments_.has_value() && !proc.segments_->empty()) { + new_stream_descriptor = std::make_optional(); + new_stream_descriptor->set_index(proc.segments_->at(0)->descriptor().index()); + for (size_t idx = 0; idx < new_stream_descriptor->index().field_count(); idx++) { + new_stream_descriptor->add_field(proc.segments_->at(0)->descriptor().field(idx)); + } + } + if (new_stream_descriptor.has_value() && proc.segments_.has_value()) { + std::vector> fields; + for (const auto& segment: *proc.segments_) { + fields.push_back(segment->descriptor().fields_ptr()); + } + new_stream_descriptor = merge_descriptors(*new_stream_descriptor, + fields, + std::vector{}); + } + if (new_stream_descriptor.has_value()) { + // Finding and erasing fields from the FieldCollection contained in StreamDescriptor is O(n) in number of fields + // So maintain map from field names to types in the new_stream_descriptor to make these operations O(1) + // Cannot use set of FieldRef as the name in the output might match the input, but with a different type after processing + std::unordered_map new_fields; + for (const auto& field: new_stream_descriptor->fields()) { + new_fields.emplace(field.name(), field.type()); + } + // Columns might be in a different order to the original dataframe, so reorder here + auto original_stream_descriptor = pipeline_context->descriptor(); + StreamDescriptor final_stream_descriptor{original_stream_descriptor.id()}; + final_stream_descriptor.set_index(new_stream_descriptor->index()); + // Erase field from new_fields as we add them to final_stream_descriptor, as all fields left in new_fields + // after these operations were created by the processing pipeline, and so should be appended + // Index columns should always appear first + if (index_column.has_value()) { + const auto nh = new_fields.extract(*index_column); + internal::check(!nh.empty(), "New index column not found in processing pipeline"); + final_stream_descriptor.add_field(FieldRef{nh.mapped(), nh.key()}); + } + for (const auto& field: original_stream_descriptor.fields()) { + if (const auto nh = new_fields.extract(field.name()); nh) { + final_stream_descriptor.add_field(FieldRef{nh.mapped(), nh.key()}); + } + } + // Iterate through new_stream_descriptor->fields() rather than remaining new_fields to preserve ordering + // e.g. if there were two projections then users will expect the column produced by the first one to appear + // first in the output df + for (const auto& field: new_stream_descriptor->fields()) { + if (new_fields.contains(field.name())) { + final_stream_descriptor.add_field(field); + } + } + pipeline_context->set_descriptor(final_stream_descriptor); + } +} + + +std::vector generate_ranges_and_keys(PipelineContext& pipeline_context) { + std::vector res; + res.reserve(pipeline_context.slice_and_keys_.size()); + bool is_incomplete{false}; + for (auto it = pipeline_context.begin(); it != pipeline_context.end(); it++) { + if (it == pipeline_context.incompletes_begin()) { + is_incomplete = true; + } + auto& sk = it->slice_and_key(); + // Take a copy here as things like defrag need the keys in pipeline_context->slice_and_keys_ that aren't being modified at the end + auto key = sk.key(); + res.emplace_back(sk.slice(), std::move(key), is_incomplete); + } + return res; +} + +util::BitSet get_incompletes_bitset(const std::vector& all_ranges) { + util::BitSet output(all_ranges.size()); + util::BitSet::bulk_insert_iterator it(output); + for(auto&& [index, range] : folly::enumerate(all_ranges)) { + if(range.is_incomplete()) + it = index; + } + it.flush(); + return output; +} + +std::shared_ptr> columns_to_decode(const std::shared_ptr& pipeline_context) { + std::shared_ptr> res; + ARCTICDB_DEBUG(log::version(), "Creating columns list with {} bits set", pipeline_context->overall_column_bitset_ ? pipeline_context->overall_column_bitset_->count() : -1); + if(pipeline_context->overall_column_bitset_) { + res = std::make_shared>(); + auto en = pipeline_context->overall_column_bitset_->first(); + auto en_end = pipeline_context->overall_column_bitset_->end(); + while (en < en_end) { + ARCTICDB_DEBUG(log::version(), "Adding field {}", pipeline_context->desc_->field(*en).name()); + res->insert(std::string(pipeline_context->desc_->field(*en++).name())); + } + } + return res; +} + +std::vector> add_schema_check( + const std::shared_ptr &pipeline_context, + std::vector>&& segment_and_slice_futures, + util::BitSet&& incomplete_bitset, + const ProcessingConfig &processing_config) { + std::vector> res; + res.reserve(segment_and_slice_futures.size()); + for (size_t i = 0; i < segment_and_slice_futures.size(); ++i) { + auto&& fut = segment_and_slice_futures.at(i); + const bool is_incomplete = incomplete_bitset[i]; + if (is_incomplete) { + res.push_back( + std::move(fut) + .thenValueInline([pipeline_desc=pipeline_context->descriptor(), processing_config](SegmentAndSlice &&read_result) { + if (!processing_config.dynamic_schema_) { + auto check = check_schema_matches_incomplete(read_result.segment_in_memory_.descriptor(), pipeline_desc); + if (std::holds_alternative(check)) { + std::get(check).throw_error(); + } + } + return std::move(read_result); + })); + } else { + res.push_back(std::move(fut)); + } + } + return res; +} + +CheckOutcome check_schema_matches_incomplete(const StreamDescriptor& stream_descriptor_incomplete, const StreamDescriptor& pipeline_desc) { + // We need to check that the index names match regardless of the dynamic schema setting + if(!index_names_match(stream_descriptor_incomplete, pipeline_desc)) { + return Error{ + throw_error, + fmt::format("{} All staged segments must have the same index names." + "{} is different than {}", + error_code_data.name_, + stream_descriptor_incomplete, + pipeline_desc) + }; + } + if (!columns_match(stream_descriptor_incomplete, pipeline_desc)) { + return Error{ + throw_error, + fmt::format("{} When static schema is used all staged segments must have the same column and column types." + "{} is different than {}", + error_code_data.name_, + stream_descriptor_incomplete, + pipeline_desc) + }; + } + return std::monostate{}; +} + +std::vector> generate_segment_and_slice_futures( + const std::shared_ptr &store, + const std::shared_ptr &pipeline_context, + const ProcessingConfig &processing_config, + std::vector&& all_ranges) { + auto incomplete_bitset = get_incompletes_bitset(all_ranges); + auto segment_and_slice_futures = store->batch_read_uncompressed(std::move(all_ranges), columns_to_decode(pipeline_context)); + return add_schema_check(pipeline_context, std::move(segment_and_slice_futures), std::move(incomplete_bitset), processing_config); +} + +std::pair>, std::shared_ptr>> get_entity_ids_and_position_map( + std::shared_ptr& component_manager, + size_t num_segments, + std::vector>&& processing_unit_indexes) { + // Map from entity id to position in segment_and_slice_futures + auto id_to_pos = std::make_shared>(); + id_to_pos->reserve(num_segments); + + // Map from position in segment_and_slice_future_splitters to entity ids + std::vector pos_to_id; + pos_to_id.reserve(num_segments); + + auto ids = component_manager->get_new_entity_ids(num_segments); + for (auto&& [idx, id]: folly::enumerate(ids)) { + pos_to_id.emplace_back(id); + id_to_pos->emplace(id, idx); + } + + std::vector> entity_work_units; + entity_work_units.reserve(processing_unit_indexes.size()); + for (const auto& indexes: processing_unit_indexes) { + entity_work_units.emplace_back(); + entity_work_units.back().reserve(indexes.size()); + for (auto index: indexes) { + entity_work_units.back().emplace_back(pos_to_id[index]); + } + } + + return std::make_pair(std::move(entity_work_units), std::move(id_to_pos)); +} + +void add_slice_to_component_manager( + EntityId entity_id, + pipelines::SegmentAndSlice& segment_and_slice, + std::shared_ptr component_manager, + EntityFetchCount fetch_count) { + ARCTICDB_DEBUG(log::memory(), "Adding entity id {}", entity_id); + component_manager->add_entity( + entity_id, + std::make_shared(std::move(segment_and_slice.segment_in_memory_)), + std::make_shared(std::move(segment_and_slice.ranges_and_key_.row_range_)), + std::make_shared(std::move(segment_and_slice.ranges_and_key_.col_range_)), + std::make_shared(std::move(segment_and_slice.ranges_and_key_.key_)), + fetch_count + ); +} + +std::shared_ptr>>> schedule_first_iteration( + std::shared_ptr component_manager, + size_t num_segments, + std::vector>&& entities_by_work_unit, + std::shared_ptr>&& segment_fetch_counts, + std::vector&& segment_and_slice_future_splitters, + std::shared_ptr>&& id_to_pos, + std::shared_ptr>>& clauses) { + // Used to make sure each entity is only added into the component manager once + auto slice_added_mtx = std::make_shared>(num_segments); + auto slice_added = std::make_shared>(num_segments, false); + auto futures = std::make_shared>>>(); + + for (auto&& entity_ids: entities_by_work_unit) { + std::vector> local_futs; + local_futs.reserve(entity_ids.size()); + for (auto id: entity_ids) { + const auto pos = id_to_pos->at(id); + auto& future_or_splitter = segment_and_slice_future_splitters[pos]; + // Some of the entities for this unit of work may be shared with other units of work + util::variant_match(future_or_splitter, + [&local_futs] (folly::Future& fut) { + local_futs.emplace_back(std::move(fut)); + }, + [&local_futs] (folly::FutureSplitter& splitter) { + local_futs.emplace_back(splitter.getFuture()); + }); + } + + futures->emplace_back( + folly::collect(local_futs) + .via(&async::io_executor()) // Stay on the same executor as the read so that we can inline if possible + .thenValueInline([component_manager, segment_fetch_counts, id_to_pos, slice_added_mtx, slice_added, clauses,entity_ids = std::move(entity_ids)] + (std::vector&& segment_and_slices) mutable { + for (auto&& [idx, segment_and_slice]: folly::enumerate(segment_and_slices)) { + auto entity_id = entity_ids[idx]; + auto pos = id_to_pos->at(entity_id); + std::lock_guard lock{slice_added_mtx->at(pos)}; + if (!(*slice_added)[pos]) { + ARCTICDB_DEBUG(log::version(), "Adding entity {}", entity_id); + add_slice_to_component_manager(entity_id, segment_and_slice, component_manager, segment_fetch_counts->at(pos)); + (*slice_added)[pos] = true; + } + } + return async::MemSegmentProcessingTask(*clauses, std::move(entity_ids))(); + })); + } + return futures; +} + + +size_t num_scheduling_iterations(const std::vector>& clauses) { + size_t res = 1UL; + auto it = std::next(clauses.cbegin()); + while (it != clauses.cend()) { + auto prev_it = std::prev(it); + if ((*prev_it)->clause_info().output_structure_ != (*it)->clause_info().input_structure_) { + ++res; + } + ++it; + } + ARCTICDB_DEBUG(log::memory(), "Processing pipeline has {} scheduling stages after the initial read and process", res); + return res; +} + +void remove_processed_clauses(std::vector>& clauses) { + // Erase all the clauses we have already scheduled to run + ARCTICDB_SAMPLE_DEFAULT(RemoveProcessedClauses) + auto it = std::next(clauses.cbegin()); + while (it != clauses.cend()) { + auto prev_it = std::prev(it); + if ((*prev_it)->clause_info().output_structure_ == (*it)->clause_info().input_structure_) { + ++it; + } else { + break; + } + } + clauses.erase(clauses.cbegin(), it); +} + +folly::Future> schedule_clause_processing( + std::shared_ptr component_manager, + std::vector>&& segment_and_slice_futures, + std::vector>&& processing_unit_indexes, + std::shared_ptr>> clauses) { + // All the shared pointers as arguments to this function and created within it are to ensure that resources are + // correctly kept alive after this function returns its future + const auto num_segments = segment_and_slice_futures.size(); + + // Map from index in segment_and_slice_future_splitters to the number of calls to process in the first clause that + // will require that segment + auto segment_fetch_counts = generate_segment_fetch_counts(processing_unit_indexes, num_segments); + + auto segment_and_slice_future_splitters = split_futures(std::move(segment_and_slice_futures), *segment_fetch_counts); + + auto [entities_by_work_unit, entity_id_to_segment_pos] = get_entity_ids_and_position_map(component_manager, num_segments, std::move(processing_unit_indexes)); + + // At this point we have a set of entity ids grouped by the work units produced by the original structure_for_processing, + // and a map of those ids to the position in the vector of futures or future-splitters (which is the same order as + // originally generated from the index via the pipeline_context and ranges_and_keys), so we can add each entity id and + // its components to the component manager and schedule the first stage of work (i.e. from the beginning until either + // the end of the pipeline or the next required structure_for_processing + auto futures = schedule_first_iteration( + component_manager, + num_segments, + std::move(entities_by_work_unit), + std::move(segment_fetch_counts), + std::move(segment_and_slice_future_splitters), + std::move(entity_id_to_segment_pos), + clauses); + + auto entity_ids_vec_fut = folly::collect(*futures).via(&async::io_executor()); + + const auto scheduling_iterations = num_scheduling_iterations(*clauses); + for (auto i = 1UL; i < scheduling_iterations; ++i) { + entity_ids_vec_fut = std::move(entity_ids_vec_fut).thenValue([clauses, scheduling_iterations, i] (std::vector>&& entity_id_vectors) { + ARCTICDB_RUNTIME_DEBUG(log::memory(), "Scheduling iteration {} of {}", i, scheduling_iterations); + + util::check(!clauses->empty(), "Scheduling iteration {} has no clauses to process", scheduling_iterations); + remove_processed_clauses(*clauses); + auto next_units_of_work = clauses->front()->structure_for_processing(std::move(entity_id_vectors)); + + std::vector>> work_futures; + for(auto&& unit_of_work : next_units_of_work) { + ARCTICDB_RUNTIME_DEBUG(log::memory(), "Scheduling work for entity ids: {}", unit_of_work); + work_futures.emplace_back(async::submit_cpu_task(async::MemSegmentProcessingTask{*clauses, std::move(unit_of_work)})); + } + + return folly::collect(work_futures).via(&async::io_executor()); + }); + } + + return std::move(entity_ids_vec_fut).thenValueInline([](std::vector>&& entity_id_vectors) { + return flatten_entities(std::move(entity_id_vectors)); + }); +} + +/* + * Processes the slices in the given pipeline_context. + * + * Slices are processed in an order defined by the first clause in the pipeline, with slices corresponding to the same + * processing unit collected into a single ProcessingUnit. Slices contained within a single ProcessingUnit are processed + * within a single thread. + * + * The processing of a ProcessingUnit is scheduled via the Async Store. Within a single thread, the + * segments will be retrieved from storage and decompressed before being passed to a MemSegmentProcessingTask which + * will process all clauses up until a clause that requires a repartition. + */ + folly::Future> read_and_process( + const std::shared_ptr& store, + const std::shared_ptr& pipeline_context, + const std::shared_ptr& read_query, + const ReadOptions& read_options) { + auto component_manager = std::make_shared(); + ProcessingConfig processing_config{opt_false(read_options.dynamic_schema_), pipeline_context->rows_}; + for (auto& clause: read_query->clauses_) { + clause->set_processing_config(processing_config); + clause->set_component_manager(component_manager); + } + + auto ranges_and_keys = generate_ranges_and_keys(*pipeline_context); + + // Each element of the vector corresponds to one processing unit containing the list of indexes in ranges_and_keys required for that processing unit + // i.e. if the first processing unit needs ranges_and_keys[0] and ranges_and_keys[1], and the second needs ranges_and_keys[2] and ranges_and_keys[3] + // then the structure will be {{0, 1}, {2, 3}} + std::vector> processing_unit_indexes = read_query->clauses_[0]->structure_for_processing(ranges_and_keys); + + // Start reading as early as possible + auto segment_and_slice_futures = generate_segment_and_slice_futures(store, pipeline_context, processing_config, std::move(ranges_and_keys)); + + return schedule_clause_processing( + component_manager, + std::move(segment_and_slice_futures), + std::move(processing_unit_indexes), + std::make_shared>>(read_query->clauses_)) + .via(&async::cpu_executor()) + .thenValue([component_manager, read_query, pipeline_context](std::vector&& processed_entity_ids) { + auto proc = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager, std::move(processed_entity_ids)); + + if (std::any_of(read_query->clauses_.begin(), read_query->clauses_.end(), [](const std::shared_ptr& clause) { + return clause->clause_info().modifies_output_descriptor_; + })) { + set_output_descriptors(proc, read_query->clauses_, pipeline_context); + } + return collect_segments(std::move(proc)); + }); +} + + + +void copy_frame_data_to_buffer( + const SegmentInMemory& destination, + size_t target_index, + SegmentInMemory& source, + size_t source_index, + const RowRange& row_range, + const DecodePathData& shared_data, + std::any& handler_data) { + const auto num_rows = row_range.diff(); + if (num_rows == 0) { + return; + } + auto& src_column = source.column(static_cast(source_index)); + auto& dst_column = destination.column(static_cast(target_index)); + auto& buffer = dst_column.data().buffer(); + auto dst_rawtype_size = data_type_size(dst_column.type(), DataTypeMode::EXTERNAL); + auto offset = dst_rawtype_size * (row_range.first - destination.offset()); + auto total_size = dst_rawtype_size * num_rows; + buffer.assert_size(offset + total_size); + + auto src_data = src_column.data(); + auto dst_ptr = buffer.data() + offset; + + auto type_promotion_error_msg = fmt::format("Can't promote type {} to type {} in field {}", + src_column.type(), dst_column.type(), destination.field(target_index).name()); + if(auto handler = get_type_handler(src_column.type(), dst_column.type()); handler) { + handler->convert_type(src_column, buffer, num_rows, offset, src_column.type(), dst_column.type(), shared_data, handler_data, source.string_pool_ptr()); + } else if (is_empty_type(src_column.type().data_type())) { + dst_column.type().visit_tag([&](auto dst_desc_tag) { + util::default_initialize(dst_ptr, num_rows * dst_rawtype_size); + }); + // Do not use src_column.is_sparse() here, as that misses columns that are dense, but have fewer than num_rows values + } else if (src_column.opt_sparse_map().has_value() && has_valid_type_promotion(src_column.type(), dst_column.type())) { + details::visit_type(dst_column.type().data_type(), [&](auto dst_tag) { + using dst_type_info = ScalarTypeInfo; + util::default_initialize(dst_ptr, num_rows * dst_rawtype_size); + auto typed_dst_ptr = reinterpret_cast(dst_ptr); + details::visit_type(src_column.type().data_type(), [&](auto src_tag) { + using src_type_info = ScalarTypeInfo; + Column::for_each_enumerated(src_column, [typed_dst_ptr](auto enumerating_it) { + typed_dst_ptr[enumerating_it.idx()] = static_cast(enumerating_it.value()); + }); + }); + }); + } else if (trivially_compatible_types(src_column.type(), dst_column.type())) { + details::visit_type(src_column.type().data_type() ,[&src_data, &dst_ptr] (auto src_desc_tag) { + using SourceTDT = ScalarTagType; + using SourceType = typename decltype(src_desc_tag)::DataTypeTag::raw_type; + while (auto block = src_data.template next()) { + const auto row_count = block->row_count(); + memcpy(dst_ptr, block->data(), row_count * sizeof(SourceType)); + dst_ptr += row_count * sizeof(SourceType); + } + }); + } else if (has_valid_type_promotion(src_column.type(), dst_column.type())) { + details::visit_type(dst_column.type().data_type() ,[&src_data, &dst_ptr, &src_column, &type_promotion_error_msg] (auto dest_desc_tag) { + using DestinationType = typename decltype(dest_desc_tag)::DataTypeTag::raw_type; + auto typed_dst_ptr = reinterpret_cast(dst_ptr); + details::visit_type(src_column.type().data_type() ,[&src_data, &typed_dst_ptr, &type_promotion_error_msg] (auto src_desc_tag) { + using source_type_info = ScalarTypeInfo; + if constexpr(std::is_arithmetic_v && std::is_arithmetic_v) { + const auto src_cend = src_data.cend(); + for (auto src_it = src_data.cbegin(); src_it != src_cend; ++src_it) { + *typed_dst_ptr++ = static_cast(*src_it); + } + } else { + util::raise_rte(type_promotion_error_msg.c_str()); + } + }); + }); + } else { + util::raise_rte(type_promotion_error_msg.c_str()); + } +} + +struct CopyToBufferTask : async::BaseTask { + SegmentInMemory&& source_segment_; + SegmentInMemory target_segment_; + FrameSlice frame_slice_; + DecodePathData shared_data_; + std::any& handler_data_; + bool fetch_index_; + + CopyToBufferTask( + SegmentInMemory&& source_segment, + SegmentInMemory target_segment, + FrameSlice frame_slice, + DecodePathData shared_data, + std::any& handler_data, + bool fetch_index) : + source_segment_(std::move(source_segment)), + target_segment_(std::move(target_segment)), + frame_slice_(std::move(frame_slice)), + shared_data_(std::move(shared_data)), + handler_data_(handler_data), + fetch_index_(fetch_index) { + + } + + folly::Unit operator()() { + const auto index_field_count = get_index_field_count(target_segment_); + for (auto idx = 0u; idx < index_field_count && fetch_index_; ++idx) { + copy_frame_data_to_buffer(target_segment_, idx, source_segment_, idx, frame_slice_.row_range, shared_data_, handler_data_); + } + + auto field_count = frame_slice_.col_range.diff() + index_field_count; + internal::check( + field_count == source_segment_.descriptor().field_count(), + "Column range does not match segment descriptor field count in copy_segments_to_frame: {} != {}", + field_count, source_segment_.descriptor().field_count()); + + const auto& fields = source_segment_.descriptor().fields(); + for (auto field_col = index_field_count; field_col < field_count; ++field_col) { + const auto& field = fields.at(field_col); + const auto& field_name = field.name(); + auto frame_loc_opt = target_segment_.column_index(field_name); + if (!frame_loc_opt) + continue; + + copy_frame_data_to_buffer(target_segment_, *frame_loc_opt, source_segment_, field_col, frame_slice_.row_range, shared_data_, handler_data_); + } + return folly::Unit{}; + } +}; + +folly::Future copy_segments_to_frame( + const std::shared_ptr& store, + const std::shared_ptr& pipeline_context, + SegmentInMemory frame, + std::any& handler_data) { + std::vector> copy_tasks; + DecodePathData shared_data; + for (auto context_row : folly::enumerate(*pipeline_context)) { + auto &slice_and_key = context_row->slice_and_key(); + auto &segment = slice_and_key.segment(store); + + copy_tasks.emplace_back(async::submit_cpu_task( + CopyToBufferTask{ + std::move(segment), + frame, + context_row->slice_and_key().slice(), + shared_data, + handler_data, + context_row->fetch_index()})); + } + return folly::collect(copy_tasks).via(&async::cpu_executor()).unit(); +} + +folly::Future prepare_output_frame( + std::vector&& items, + const std::shared_ptr& pipeline_context, + const std::shared_ptr& store, + const ReadOptions& read_options, + std::any& handler_data) { + pipeline_context->clear_vectors(); + pipeline_context->slice_and_keys_ = std::move(items); + std::sort(std::begin(pipeline_context->slice_and_keys_), std::end(pipeline_context->slice_and_keys_), [] (const auto& left, const auto& right) { + return std::tie(left.slice_.row_range, left.slice_.col_range) < std::tie(right.slice_.row_range, right.slice_.col_range); + }); + adjust_slice_rowcounts(pipeline_context); + const auto dynamic_schema = opt_false(read_options.dynamic_schema_); + mark_index_slices(pipeline_context, dynamic_schema, pipeline_context->bucketize_dynamic_); + pipeline_context->ensure_vectors(); + + for(auto row : *pipeline_context) { + row.set_compacted(false); + row.set_descriptor(row.slice_and_key().segment(store).descriptor_ptr()); + row.set_string_pool(row.slice_and_key().segment(store).string_pool_ptr()); + } + + auto frame = allocate_frame(pipeline_context); + return copy_segments_to_frame(store, pipeline_context, frame, handler_data).thenValue([frame](auto&&){ return frame; }); +} + +folly::Future do_direct_read_or_process( + const std::shared_ptr& store, + const std::shared_ptr& read_query, + const ReadOptions& read_options, + const std::shared_ptr& pipeline_context, + const DecodePathData& shared_data, + std::any& handler_data) { + if(!read_query->clauses_.empty()) { + ARCTICDB_SAMPLE(RunPipelineAndOutput, 0) + util::check_rte(!pipeline_context->is_pickled(),"Cannot filter pickled data"); + return read_and_process(store, pipeline_context, read_query, read_options) + .thenValue([store, pipeline_context, &read_options, &handler_data](std::vector&& segs) { + return prepare_output_frame(std::move(segs), pipeline_context, store, read_options, handler_data); + }); + } else { + ARCTICDB_SAMPLE(MarkAndReadDirect, 0) + util::check_rte(!(pipeline_context->is_pickled() && std::holds_alternative(read_query->row_filter)), "Cannot use head/tail/row_range with pickled data, use plain read instead"); + mark_index_slices(pipeline_context, opt_false(read_options.dynamic_schema_), pipeline_context->bucketize_dynamic_); + auto frame = allocate_frame(pipeline_context); + util::print_total_mem_usage(__FILE__, __LINE__, __FUNCTION__); + ARCTICDB_DEBUG(log::version(), "Fetching frame data"); + return fetch_data(std::move(frame), pipeline_context, store, opt_false(read_options.dynamic_schema_), shared_data, handler_data); + } +} + } // namespace read diff --git a/cpp/arcticdb/pipeline/read_frame.hpp b/cpp/arcticdb/pipeline/read_frame.hpp index b05868533b..3bb0950930 100644 --- a/cpp/arcticdb/pipeline/read_frame.hpp +++ b/cpp/arcticdb/pipeline/read_frame.hpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include @@ -71,13 +72,21 @@ void mark_index_slices( bool column_groups); folly::Future fetch_data( - SegmentInMemory&& frame, + SegmentInMemory frame, const std::shared_ptr &context, const std::shared_ptr& ssource, bool dynamic_schema, DecodePathData shared_data, std::any& handler_data); +folly::Future do_direct_read_or_process( + const std::shared_ptr& store, + const std::shared_ptr& read_query, + const ReadOptions& read_options, + const std::shared_ptr& pipeline_context, + const DecodePathData& shared_data, + std::any& handler_data); + void decode_into_frame_static( SegmentInMemory &frame, PipelineContextRow &context, @@ -104,7 +113,19 @@ StreamDescriptor get_filtered_descriptor( size_t get_index_field_count(const SegmentInMemory& frame); +folly::Future> schedule_clause_processing( + std::shared_ptr component_manager, + std::vector>&& segment_and_slice_futures, + std::vector>&& processing_unit_indexes, + std::shared_ptr>> clauses); +folly::Future> read_and_process( + const std::shared_ptr& store, + const std::shared_ptr& pipeline_context, + const std::shared_ptr& read_query, + const ReadOptions& read_options +); +CheckOutcome check_schema_matches_incomplete(const StreamDescriptor& stream_descriptor_incomplete, const StreamDescriptor& pipeline_context); } // namespace arcticdb::pipelines diff --git a/cpp/arcticdb/processing/resample_boundaries.hpp b/cpp/arcticdb/processing/resample_boundaries.hpp new file mode 100644 index 0000000000..89317a1cd1 --- /dev/null +++ b/cpp/arcticdb/processing/resample_boundaries.hpp @@ -0,0 +1,101 @@ +#pragma once + +namespace arcticdb { +static consteval timestamp one_day_in_nanoseconds() { + return timestamp(24) * 60 * 60 * 1'000'000'000; +} + +template +requires std::integral +[[nodiscard]] static T python_mod(T a, T b) { + return (a % b + b) % b; +} + +/// @param ts in nanoseconds +[[nodiscard]] static timestamp start_of_day_nanoseconds(timestamp ts) { + return ts - python_mod(ts, one_day_in_nanoseconds()); +} + +/// @param ts in nanoseconds +[[nodiscard]] static timestamp end_of_day_nanoseconds(timestamp ts) { + const timestamp start_of_day = start_of_day_nanoseconds(ts); + const bool is_midnnight = start_of_day == ts; + if (is_midnnight) { + return ts; + } + return start_of_day + one_day_in_nanoseconds(); +} + +[[nodiscard]] static std::pair compute_first_last_dates( + timestamp start, + timestamp end, + timestamp rule, + ResampleBoundary closed_boundary_arg, + timestamp offset, + const ResampleOrigin& origin +) { + // Origin value formula from Pandas: + // https://github.com/pandas-dev/pandas/blob/68d9dcab5b543adb3bfe5b83563c61a9b8afae77/pandas/core/resample.py#L2564 + auto [origin_ns, origin_adjusted_start] = util::variant_match( + origin, + [start](timestamp o) -> std::pair {return {o, start}; }, + [&](const std::string& o) -> std::pair { + if (o == "epoch") { + return { 0, start }; + } else if (o == "start") { + return { start, start }; + } else if (o == "start_day") { + return { start_of_day_nanoseconds(start), start }; + } else if (o == "end_day" || o == "end") { + const timestamp origin_last = o == "end" ? end: end_of_day_nanoseconds(end); + const timestamp bucket_count = (origin_last - start) / rule + (closed_boundary_arg == ResampleBoundary::LEFT); + const timestamp local_origin_ns = origin_last - bucket_count * rule; + return { local_origin_ns, local_origin_ns }; + } else { + user_input::raise( + "Invalid origin value {}. Supported values are: \"start\", \"start_day\", \"end\", \"end_day\", \"epoch\" or timestamp in nanoseconds", + o); + } + } + ); + origin_ns += offset; + + const timestamp ns_to_prev_offset_start = python_mod(origin_adjusted_start - origin_ns, rule); + const timestamp ns_to_prev_offset_end = python_mod(end - origin_ns, rule); + + if (closed_boundary_arg == ResampleBoundary::RIGHT) { + return { + ns_to_prev_offset_start > 0 ? origin_adjusted_start - ns_to_prev_offset_start : origin_adjusted_start - rule, + ns_to_prev_offset_end > 0 ? end + (rule - ns_to_prev_offset_end) : end + }; + } else { + return { + ns_to_prev_offset_start > 0 ? origin_adjusted_start - ns_to_prev_offset_start : origin_adjusted_start, + ns_to_prev_offset_end > 0 ? end + (rule - ns_to_prev_offset_end) : end + rule + }; + } +} + +std::vector generate_buckets( + timestamp start, + timestamp end, + std::string_view rule, + ResampleBoundary closed_boundary_arg, + timestamp offset, + const ResampleOrigin& origin +) { + const timestamp rule_ns = [](std::string_view rule) { + py::gil_scoped_acquire acquire_gil; + return python_util::pd_to_offset(rule); + }(rule); + const auto [start_with_offset, end_with_offset] = compute_first_last_dates(start, end, rule_ns, closed_boundary_arg, offset, origin); + const auto bucket_boundary_count = (end_with_offset - start_with_offset) / rule_ns + 1; + std::vector res; + res.reserve(bucket_boundary_count); + for (auto boundary = start_with_offset; boundary <= end_with_offset; boundary += rule_ns) { + res.push_back(boundary); + } + return res; +} + +} // namespace arcticdb \ No newline at end of file diff --git a/cpp/arcticdb/processing/test/test_parallel_processing.cpp b/cpp/arcticdb/processing/test/test_parallel_processing.cpp index f914f5587c..1a39f998ab 100644 --- a/cpp/arcticdb/processing/test/test_parallel_processing.cpp +++ b/cpp/arcticdb/processing/test/test_parallel_processing.cpp @@ -10,6 +10,7 @@ #include #include #include +#include using namespace arcticdb; using namespace arcticdb::pipelines; diff --git a/cpp/arcticdb/python/python_handler_data.hpp b/cpp/arcticdb/python/python_handler_data.hpp index 7b44331cd4..0c882274fe 100644 --- a/cpp/arcticdb/python/python_handler_data.hpp +++ b/cpp/arcticdb/python/python_handler_data.hpp @@ -35,8 +35,8 @@ struct PythonHandlerData { }; struct PythonHandlerDataFactory : public TypeHandlerDataFactory { - std::any get_data() const override { - return {PythonHandlerData{}}; + std::shared_ptr get_data() const override { + return std::make_shared(std::make_any()); } }; diff --git a/cpp/arcticdb/storage/file/file_store.hpp b/cpp/arcticdb/storage/file/file_store.hpp index 4b6e04e436..8f44cfe199 100644 --- a/cpp/arcticdb/storage/file/file_store.hpp +++ b/cpp/arcticdb/storage/file/file_store.hpp @@ -104,7 +104,7 @@ void write_dataframe_to_file_internal( single_file_store->finalize(storage::KeyData{offset, serialized_key.size()}); } -version_store::ReadVersionOutput read_dataframe_from_file_internal( +ReadVersionOutput read_dataframe_from_file_internal( const StreamId& stream_id, const std::string& path, const std::shared_ptr& read_query, diff --git a/cpp/arcticdb/storage/test/test_memory_storage.cpp b/cpp/arcticdb/storage/test/test_memory_storage.cpp index 894076118a..8be7e9342d 100644 --- a/cpp/arcticdb/storage/test/test_memory_storage.cpp +++ b/cpp/arcticdb/storage/test/test_memory_storage.cpp @@ -30,6 +30,6 @@ TEST(InMemory, ReadTwice) { auto read_query = std::make_shared(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); - auto read_result1 = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); - auto read_result2 = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); + auto read_result1 = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, ReadOptions{}, *handler_data); + auto read_result2 = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, ReadOptions{}, *handler_data); } \ No newline at end of file diff --git a/cpp/arcticdb/util/native_handler.hpp b/cpp/arcticdb/util/native_handler.hpp index 3fcdbbc580..0d5485f3f4 100644 --- a/cpp/arcticdb/util/native_handler.hpp +++ b/cpp/arcticdb/util/native_handler.hpp @@ -8,8 +8,8 @@ struct NativeHandlerData { }; struct NativeHandlerDataFactory : public TypeHandlerDataFactory { - std::any get_data() const override { - return {NativeHandlerData{}}; + std::shared_ptr get_data() const override { + return std::make_shared(std::make_any()); } }; diff --git a/cpp/arcticdb/util/type_handler.hpp b/cpp/arcticdb/util/type_handler.hpp index 5a5569a4bb..2042290d8d 100644 --- a/cpp/arcticdb/util/type_handler.hpp +++ b/cpp/arcticdb/util/type_handler.hpp @@ -87,7 +87,7 @@ using TypeHandler = folly::Poly; class TypeHandlerDataFactory { public: - virtual std::any get_data() const = 0; + virtual std::shared_ptr get_data() const = 0; virtual ~TypeHandlerDataFactory() = default; }; @@ -109,7 +109,7 @@ class TypeHandlerRegistry { handler_data_factory_ = std::move(data); } - std::any get_handler_data() { + std::shared_ptr get_handler_data() { util::check(static_cast(handler_data_factory_), "No type handler set"); return handler_data_factory_->get_data(); } @@ -132,7 +132,7 @@ inline std::shared_ptr get_type_handler(TypeDescriptor source, Type return TypeHandlerRegistry::instance()->get_handler(target); } -inline std::any get_type_handler_data() { +inline std::shared_ptr get_type_handler_data() { return TypeHandlerRegistry::instance()->get_handler_data(); } diff --git a/cpp/arcticdb/version/local_versioned_engine.cpp b/cpp/arcticdb/version/local_versioned_engine.cpp index 99c1a245af..187804367e 100644 --- a/cpp/arcticdb/version/local_versioned_engine.cpp +++ b/cpp/arcticdb/version/local_versioned_engine.cpp @@ -18,9 +18,12 @@ #include #include #include +#include #include #include #include +#include +#include namespace arcticdb::version_store { @@ -342,7 +345,7 @@ IndexRange LocalVersionedEngine::get_index_range( return index::get_index_segment_range(version->key_, store()); } -std::variant get_version_identifier( +std::variant check_have_version_if_required( const StreamId& stream_id, const VersionQuery& version_query, const ReadOptions& read_options, @@ -370,10 +373,52 @@ ReadVersionOutput LocalVersionedEngine::read_dataframe_version_internal( std::any& handler_data) { py::gil_scoped_release release_gil; auto version = get_version_to_read(stream_id, version_query); - const auto identifier = get_version_identifier(stream_id, version_query, read_options, version); + const auto identifier = check_have_version_if_required(stream_id, version_query, read_options, version); return read_frame_for_version(store(), identifier, read_query, read_options, handler_data).get(); } +ChunkIterator LocalVersionedEngine::read_dataframe_chunked_internal( + const StreamId &stream_id, + const VersionQuery& version_query, + ReadQuery& read_query, + const ReadOptions& read_options, + std::any& handler_data, + DecodePathData shared_data) { + auto version = get_version_to_read(stream_id, version_query); + const auto identifier = check_have_version_if_required(stream_id, version_query, read_options, version); + util::check(std::holds_alternative(identifier), "Chunking not implemented for recursive normalizers"); + const auto& index_key = std::get(identifier); + auto [index_segment_reader, slice_and_keys] = index::read_index_to_vector(store(), index_key.key_); + const bool dynamic_schema = opt_false(read_options.dynamic_schema_); + const bool bucketize_dynamic = index_segment_reader.bucketize_dynamic(); + + auto pipeline_context = std::make_shared(); + pipeline_context->stream_id_ = version->key_.id(); + read_indexed_keys_to_pipeline(store(), pipeline_context, *version, read_query, read_options); + + auto queries = get_column_bitset_and_query_functions( + read_query, + pipeline_context, + dynamic_schema, + bucketize_dynamic); + + pipeline_context->slice_and_keys_ = filter_index(index_segment_reader, combine_filter_functions(queries)); + pipelines::sort_by_row_range(pipeline_context->slice_and_keys_); + + generate_filtered_field_descriptors(pipeline_context, read_query.columns); + mark_index_slices(pipeline_context, dynamic_schema, bucketize_dynamic); + + return ChunkIterator( + std::move(index_segment_reader), + pipeline_context, + version->key_, + store(), + read_query, + read_options, + handler_data, + shared_data); +} + folly::Future LocalVersionedEngine::get_descriptor( AtomKey&& k){ const auto key = std::move(k); @@ -1066,7 +1111,7 @@ std::vector LocalVersionedEngine::batch_read_keys(const std:: std::vector> res; res.reserve(keys.size()); for (const auto& index_key: keys) { - res.emplace_back(read_frame_for_version(store(), {index_key}, std::make_shared(), ReadOptions{}, handler_data)); + res.emplace_back(read_frame_for_version(store(), {index_key}, std::make_shared(), ReadOptions{}, *handler_data)); } Allocator::instance()->trim(); return folly::collect(res).get(); diff --git a/cpp/arcticdb/version/local_versioned_engine.hpp b/cpp/arcticdb/version/local_versioned_engine.hpp index da140ceec4..30584289ff 100644 --- a/cpp/arcticdb/version/local_versioned_engine.hpp +++ b/cpp/arcticdb/version/local_versioned_engine.hpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -188,6 +189,14 @@ class LocalVersionedEngine : public VersionedEngine { arcticdb::proto::descriptors::UserDefinedMetadata&& user_meta ); + ChunkIterator read_dataframe_chunked_internal( + const StreamId &stream_id, + const VersionQuery& version_query, + ReadQuery& read_query, + const ReadOptions& read_options, + std::any& handler_data, + DecodePathData shared_data); + folly::Future, std::optional>> get_metadata( std::optional&& key); diff --git a/cpp/arcticdb/version/python_bindings.cpp b/cpp/arcticdb/version/python_bindings.cpp index b839159843..5e8868df99 100644 --- a/cpp/arcticdb/version/python_bindings.cpp +++ b/cpp/arcticdb/version/python_bindings.cpp @@ -22,108 +22,12 @@ #include #include #include +#include #include namespace arcticdb::version_store { -static consteval timestamp one_day_in_nanoseconds() { - return timestamp(24) * 60 * 60 * 1'000'000'000; -} - -template -requires std::integral -[[nodiscard]] static T python_mod(T a, T b) { - return (a % b + b) % b; -} - -/// @param ts in nanoseconds -[[nodiscard]] static timestamp start_of_day_nanoseconds(timestamp ts) { - return ts - python_mod(ts, one_day_in_nanoseconds()); -} - -/// @param ts in nanoseconds -[[nodiscard]] static timestamp end_of_day_nanoseconds(timestamp ts) { - const timestamp start_of_day = start_of_day_nanoseconds(ts); - const bool is_midnnight = start_of_day == ts; - if (is_midnnight) { - return ts; - } - return start_of_day + one_day_in_nanoseconds(); -} - -[[nodiscard]] static std::pair compute_first_last_dates( - timestamp start, - timestamp end, - timestamp rule, - ResampleBoundary closed_boundary_arg, - timestamp offset, - const ResampleOrigin& origin -) { - // Origin value formula from Pandas: - // https://github.com/pandas-dev/pandas/blob/68d9dcab5b543adb3bfe5b83563c61a9b8afae77/pandas/core/resample.py#L2564 - auto [origin_ns, origin_adjusted_start] = util::variant_match( - origin, - [start](timestamp o) -> std::pair {return {o, start}; }, - [&](const std::string& o) -> std::pair { - if (o == "epoch") { - return { 0, start }; - } else if (o == "start") { - return { start, start }; - } else if (o == "start_day") { - return { start_of_day_nanoseconds(start), start }; - } else if (o == "end_day" || o == "end") { - const timestamp origin_last = o == "end" ? end: end_of_day_nanoseconds(end); - const timestamp bucket_count = (origin_last - start) / rule + (closed_boundary_arg == ResampleBoundary::LEFT); - const timestamp origin_ns = origin_last - bucket_count * rule; - return { origin_ns, origin_ns }; - } else { - user_input::raise( - "Invalid origin value {}. Supported values are: \"start\", \"start_day\", \"end\", \"end_day\", \"epoch\" or timestamp in nanoseconds", - o); - } - } - ); - origin_ns += offset; - - const timestamp ns_to_prev_offset_start = python_mod(origin_adjusted_start - origin_ns, rule); - const timestamp ns_to_prev_offset_end = python_mod(end - origin_ns, rule); - - if (closed_boundary_arg == ResampleBoundary::RIGHT) { - return { - ns_to_prev_offset_start > 0 ? origin_adjusted_start - ns_to_prev_offset_start : origin_adjusted_start - rule, - ns_to_prev_offset_end > 0 ? end + (rule - ns_to_prev_offset_end) : end - }; - } else { - return { - ns_to_prev_offset_start > 0 ? origin_adjusted_start - ns_to_prev_offset_start : origin_adjusted_start, - ns_to_prev_offset_end > 0 ? end + (rule - ns_to_prev_offset_end) : end + rule - }; - } -} - -std::vector generate_buckets( - timestamp start, - timestamp end, - std::string_view rule, - ResampleBoundary closed_boundary_arg, - timestamp offset, - const ResampleOrigin& origin -) { - const timestamp rule_ns = [](std::string_view rule) { - py::gil_scoped_acquire acquire_gil; - return python_util::pd_to_offset(rule); - }(rule); - const auto [start_with_offset, end_with_offset] = compute_first_last_dates(start, end, rule_ns, closed_boundary_arg, offset, origin); - const auto bucket_boundary_count = (end_with_offset - start_with_offset) / rule_ns + 1; - std::vector res; - res.reserve(bucket_boundary_count); - for (auto boundary = start_with_offset; boundary <= end_with_offset; boundary += rule_ns) { - res.push_back(boundary); - } - return res; -} - template void declare_resample_clause(py::module& version) { const char* class_name = closed_boundary == ResampleBoundary::LEFT ? "ResampleClauseLeftClosed" : "ResampleClauseRightClosed"; @@ -226,9 +130,9 @@ void register_bindings(py::module &version, py::exception read_query){ + [] (StreamId sid, const std::string(path), std::shared_ptr read_query) { auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(); - return adapt_read_df(read_dataframe_from_file(sid, path, read_query, handler_data)); + return adapt_read_df(read_dataframe_from_file(sid, path, read_query, *handler_data)); }); using FrameDataWrapper = arcticdb::pipelines::FrameDataWrapper; @@ -253,6 +157,16 @@ void register_bindings(py::module &version, py::exception(version, "ChunkIterator") + .def("next", + [] (ChunkIterator& self) -> std::optional { + auto result = self.next(); + if(result) + return adapt_read_df(std::move(*result)); + else + return std::nullopt; + }); + py::enum_(version, "VersionRequestType", R"pbdoc( Enum of possible version request types passed to as_of. )pbdoc") @@ -541,6 +455,12 @@ void register_bindings(py::module &version, py::exception(), "Sort the index of a time series whose segments are internally sorted") + .def("read_chunked", + [&](PythonVersionStore& v, StreamId sid, const VersionQuery& version_query, std::shared_ptr read_query, const ReadOptions& read_options) { + auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(); + return v.read_dataframe_chunked(sid, version_query, *read_query, read_options, *handler_data); + }, + py::call_guard(), "Get an iterator to a dataframe that returns in the chunked order") .def("append", &PythonVersionStore::append, py::call_guard(), "Append a dataframe to the most recent version") @@ -665,7 +585,7 @@ void register_bindings(py::module &version, py::exception read_query, const ReadOptions& read_options) { auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(); - return adapt_read_df(v.read_dataframe_version(sid, version_query, read_query, read_options, handler_data)); + return adapt_read_df(v.read_dataframe_version(sid, version_query, read_query, read_options, *handler_data)); }, py::call_guard(), "Read the specified version of the dataframe from the store") @@ -764,7 +684,7 @@ void register_bindings(py::module &version, py::exception>& read_queries, const ReadOptions& read_options){ auto handler_data = TypeHandlerRegistry::instance()->get_handler_data(); - return python_util::adapt_read_dfs(v.batch_read(stream_ids, version_queries, read_queries, read_options, handler_data)); + return python_util::adapt_read_dfs(v.batch_read(stream_ids, version_queries, read_queries, read_options, *handler_data)); }, py::call_guard(), "Read a dataframe from the store") .def("batch_read_keys", diff --git a/cpp/arcticdb/version/read_version_output.hpp b/cpp/arcticdb/version/read_version_output.hpp new file mode 100644 index 0000000000..b6cd47e8e1 --- /dev/null +++ b/cpp/arcticdb/version/read_version_output.hpp @@ -0,0 +1,20 @@ +#pragma once + +#include +#include + +namespace arcticdb { + +struct ReadVersionOutput { + ReadVersionOutput() = delete; + ReadVersionOutput(VersionedItem&& versioned_item, FrameAndDescriptor&& frame_and_descriptor): + versioned_item_(std::move(versioned_item)), + frame_and_descriptor_(std::move(frame_and_descriptor)) {} + + ARCTICDB_MOVE_ONLY_DEFAULT(ReadVersionOutput) + + VersionedItem versioned_item_; + FrameAndDescriptor frame_and_descriptor_; +}; + +} \ No newline at end of file diff --git a/cpp/arcticdb/version/schema_checks.hpp b/cpp/arcticdb/version/schema_checks.hpp index 2a81e12ac4..9a02169c55 100644 --- a/cpp/arcticdb/version/schema_checks.hpp +++ b/cpp/arcticdb/version/schema_checks.hpp @@ -1,11 +1,20 @@ #pragma once +#include +#include +#include +#include #include #include #include namespace arcticdb { +using CheckOutcome = std::variant; +using StaticSchemaCompactionChecks = folly::Function; +using CompactionWrittenKeys = std::vector; +using CompactionResult = std::variant; + enum NormalizationOperation : uint8_t { APPEND, UPDATE, diff --git a/cpp/arcticdb/version/test/test_sparse.cpp b/cpp/arcticdb/version/test/test_sparse.cpp index 4182fae934..34c074bd22 100644 --- a/cpp/arcticdb/version/test/test_sparse.cpp +++ b/cpp/arcticdb/version/test/test_sparse.cpp @@ -88,7 +88,7 @@ TEST_F(SparseTestStore, SimpleRoundtrip) { read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); - auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); + auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, *handler_data); const auto& frame =read_result.frame_data.frame();; ASSERT_EQ(frame.row_count(), 2); @@ -180,7 +180,7 @@ TEST_F(SparseTestStore, SimpleRoundtripBackwardsCompat) { read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); - auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); + auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, *handler_data); const auto& frame =read_result.frame_data.frame();; ASSERT_EQ(frame.row_count(), 2); @@ -231,7 +231,7 @@ TEST_F(SparseTestStore, DenseToSparse) { read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); - auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); + auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, *handler_data); const auto& frame =read_result.frame_data.frame();; @@ -279,7 +279,7 @@ TEST_F(SparseTestStore, SimpleRoundtripStrings) { auto read_query = std::make_shared(); read_query->row_filter = universal_range(); auto handler_data = get_type_handler_data(); - auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); + auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, *handler_data); const auto& frame = read_result.frame_data.frame();; ASSERT_EQ(frame.row_count(), 2); @@ -334,7 +334,7 @@ TEST_F(SparseTestStore, Multiblock) { read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); - auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); + auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, *handler_data); const auto& frame =read_result.frame_data.frame();; for(size_t i = 0; i < num_rows; i += 2) { @@ -387,7 +387,7 @@ TEST_F(SparseTestStore, Segment) { read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); - auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); + auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, *handler_data); const auto& frame =read_result.frame_data.frame();; for(size_t i = 0; i < num_rows; i += 2) { @@ -447,7 +447,7 @@ TEST_F(SparseTestStore, SegmentWithExistingIndex) { read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); - auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); + auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, *handler_data); const auto& frame =read_result.frame_data.frame();; ASSERT_EQ(frame.row_count(), num_rows); @@ -508,7 +508,7 @@ TEST_F(SparseTestStore, SegmentAndFilterColumn) { read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); - auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); + auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, *handler_data); const auto& frame =read_result.frame_data.frame();; ASSERT_EQ(frame.row_count(), num_rows); ASSERT_EQ(frame.descriptor().field_count(), 2); @@ -564,7 +564,7 @@ TEST_F(SparseTestStore, SegmentWithRangeFilter) { read_query->row_filter = IndexRange(timestamp{3000}, timestamp{6999}); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); - auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); + auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, *handler_data); const auto& frame =read_result.frame_data.frame();; ASSERT_EQ(frame.row_count(), 4000); @@ -618,7 +618,7 @@ TEST_F(SparseTestStore, Compact) { read_query->row_filter = universal_range(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); - auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); + auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, *handler_data); const auto& frame = read_result.frame_data.frame(); ASSERT_EQ(frame.row_count(), num_rows); @@ -676,7 +676,7 @@ TEST_F(SparseTestStore, CompactWithStrings) { auto read_query = std::make_shared(); read_query->row_filter = universal_range(); auto handler_data = get_type_handler_data(); - auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, handler_data); + auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options, *handler_data); const auto& frame = read_result.frame_data.frame(); ASSERT_EQ(frame.row_count(), num_rows); diff --git a/cpp/arcticdb/version/test/test_version_store.cpp b/cpp/arcticdb/version/test/test_version_store.cpp index c5bc5abf87..2c184f7281 100644 --- a/cpp/arcticdb/version/test/test_version_store.cpp +++ b/cpp/arcticdb/version/test/test_version_store.cpp @@ -189,7 +189,7 @@ TEST_F(VersionStoreTest, SortMerge) { } wrapper.aggregator_.commit(); - data.emplace_back( SegmentToInputFrameAdapter{std::move(wrapper.segment())}); + data.emplace_back(std::move(wrapper.segment())); } std::mt19937 mt{42}; std::shuffle(data.begin(), data.end(), mt); @@ -259,7 +259,7 @@ TEST_F(VersionStoreTest, CompactIncompleteDynamicSchema) { auto read_query = std::make_shared(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); - auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); + auto read_result = test_store_->read_dataframe_version(symbol, VersionQuery{}, read_query, ReadOptions{}, *handler_data); const auto& seg = read_result.frame_data.frame(); count = 0; @@ -368,7 +368,7 @@ TEST_F(VersionStoreTest, StressBatchReadUncompressed) { register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); read_options.set_batch_throw_on_error(true); - auto latest_versions = test_store_->batch_read(symbols, std::vector(10), read_queries, read_options, handler_data); + auto latest_versions = test_store_->batch_read(symbols, std::vector(10), read_queries, read_options, *handler_data); for(auto&& [idx, version] : folly::enumerate(latest_versions)) { auto expected = get_test_simple_frame(std::get(version).item.symbol(), 10, idx); bool equal = expected.segment_ == std::get(version).frame_data.frame(); @@ -535,7 +535,7 @@ TEST(VersionStore, UpdateWithin) { auto read_query = std::make_shared(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); - auto read_result = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); + auto read_result = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, ReadOptions{}, *handler_data); const auto& seg = read_result.frame_and_descriptor_.frame_; for(auto i = 0u; i < num_rows; ++i) { @@ -575,7 +575,7 @@ TEST(VersionStore, UpdateBefore) { auto read_query = std::make_shared(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); - auto read_result = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); + auto read_result = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, ReadOptions{}, *handler_data); const auto& seg = read_result.frame_and_descriptor_.frame_; for(auto i = 0u; i < num_rows + update_range.diff(); ++i) { @@ -615,7 +615,7 @@ TEST(VersionStore, UpdateAfter) { auto read_query = std::make_shared(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); - auto read_result = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); + auto read_result = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, ReadOptions{}, *handler_data); const auto& seg = read_result.frame_and_descriptor_.frame_; for(auto i = 0u; i < num_rows + update_range.diff(); ++i) { @@ -656,7 +656,7 @@ TEST(VersionStore, UpdateIntersectBefore) { auto read_query = std::make_shared(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); - auto read_result = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); + auto read_result = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, ReadOptions{}, *handler_data); const auto &seg = read_result.frame_and_descriptor_.frame_; for (auto i = 0u; i < num_rows + 5; ++i) { @@ -697,7 +697,7 @@ TEST(VersionStore, UpdateIntersectAfter) { auto read_query = std::make_shared(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); - auto read_result = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, ReadOptions{}, handler_data); + auto read_result = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, ReadOptions{}, *handler_data); const auto &seg = read_result.frame_and_descriptor_.frame_; for (auto i = 0u; i < num_rows + 5; ++i) { @@ -748,7 +748,7 @@ TEST(VersionStore, UpdateWithinSchemaChange) { auto read_query = std::make_shared(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); - auto read_result = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, read_options, handler_data); + auto read_result = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, read_options, *handler_data); const auto &seg = read_result.frame_and_descriptor_.frame_; for (auto i = 0u;i < num_rows; ++i) { @@ -808,7 +808,7 @@ TEST(VersionStore, UpdateWithinTypeAndSchemaChange) { auto read_query = std::make_shared(); register_native_handler_data_factory(); auto handler_data = get_type_handler_data(); - auto read_result = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, read_options, handler_data); + auto read_result = version_store.read_dataframe_version_internal(symbol, VersionQuery{}, read_query, read_options, *handler_data); const auto &seg = read_result.frame_and_descriptor_.frame_; for (auto i = 0u;i < num_rows; ++i) { @@ -852,4 +852,29 @@ TEST(VersionStore, TestWriteAppendMapHead) { auto [next_key, total_rows] = read_head(version_store._test_get_store(), symbol); ASSERT_EQ(next_key, key); ASSERT_EQ(total_rows, num_rows); +} + +TEST(VersionStore, ChunkIterator) { + using namespace arcticdb; + + auto version_store = get_test_engine(); + StreamId stream_id{"test_chunk"}; + VersionId v_id{0}; + const size_t rows = 1000000; + + IndexPartialKey pk{stream_id, v_id}; + auto wrapper = get_test_simple_frame(stream_id, rows, 0); + auto copy_frame = wrapper.frame_; + version_store.write_versioned_dataframe_internal(stream_id, std::move(wrapper.frame_), false, false, false); + DecodePathData shared_data; + register_native_handler_data_factory(); + auto handler_data = get_type_handler_data(); + ReadQuery read_query; + auto iterator = version_store.read_dataframe_chunked_internal(stream_id, VersionQuery{}, read_query, ReadOptions{}, *handler_data, shared_data); + + size_t count = 0UL; + while (auto chunk = iterator.next()) { + ++count; + } + ASSERT_EQ(count, 10); } \ No newline at end of file diff --git a/cpp/arcticdb/version/version_core.cpp b/cpp/arcticdb/version/version_core.cpp index e81118a16b..1a2d2b9f7e 100644 --- a/cpp/arcticdb/version/version_core.cpp +++ b/cpp/arcticdb/version/version_core.cpp @@ -26,7 +26,7 @@ #include #include #include -#include +#include namespace arcticdb::version_store { @@ -511,417 +511,6 @@ folly::Future read_multi_key( }); } -void add_slice_to_component_manager( - EntityId entity_id, - pipelines::SegmentAndSlice& segment_and_slice, - std::shared_ptr component_manager, - EntityFetchCount fetch_count) { - ARCTICDB_DEBUG(log::memory(), "Adding entity id {}", entity_id); - component_manager->add_entity( - entity_id, - std::make_shared(std::move(segment_and_slice.segment_in_memory_)), - std::make_shared(std::move(segment_and_slice.ranges_and_key_.row_range_)), - std::make_shared(std::move(segment_and_slice.ranges_and_key_.col_range_)), - std::make_shared(std::move(segment_and_slice.ranges_and_key_.key_)), - fetch_count - ); -} - -size_t num_scheduling_iterations(const std::vector>& clauses) { - size_t res = 1UL; - auto it = std::next(clauses.cbegin()); - while (it != clauses.cend()) { - auto prev_it = std::prev(it); - if ((*prev_it)->clause_info().output_structure_ != (*it)->clause_info().input_structure_) { - ++res; - } - ++it; - } - ARCTICDB_DEBUG(log::memory(), "Processing pipeline has {} scheduling stages after the initial read and process", res); - return res; -} - -void remove_processed_clauses(std::vector>& clauses) { - // Erase all the clauses we have already scheduled to run - ARCTICDB_SAMPLE_DEFAULT(RemoveProcessedClauses) - auto it = std::next(clauses.cbegin()); - while (it != clauses.cend()) { - auto prev_it = std::prev(it); - if ((*prev_it)->clause_info().output_structure_ == (*it)->clause_info().input_structure_) { - ++it; - } else { - break; - } - } - clauses.erase(clauses.cbegin(), it); -} - -std::pair>, std::shared_ptr>> get_entity_ids_and_position_map( - std::shared_ptr& component_manager, - size_t num_segments, - std::vector>&& processing_unit_indexes) { - // Map from entity id to position in segment_and_slice_futures - auto id_to_pos = std::make_shared>(); - id_to_pos->reserve(num_segments); - - // Map from position in segment_and_slice_future_splitters to entity ids - std::vector pos_to_id; - pos_to_id.reserve(num_segments); - - auto ids = component_manager->get_new_entity_ids(num_segments); - for (auto&& [idx, id]: folly::enumerate(ids)) { - pos_to_id.emplace_back(id); - id_to_pos->emplace(id, idx); - } - - std::vector> entity_work_units; - entity_work_units.reserve(processing_unit_indexes.size()); - for (const auto& indexes: processing_unit_indexes) { - entity_work_units.emplace_back(); - entity_work_units.back().reserve(indexes.size()); - for (auto index: indexes) { - entity_work_units.back().emplace_back(pos_to_id[index]); - } - } - - return std::make_pair(std::move(entity_work_units), std::move(id_to_pos)); -} - -std::shared_ptr>>> schedule_first_iteration( - std::shared_ptr component_manager, - size_t num_segments, - std::vector>&& entities_by_work_unit, - std::shared_ptr>&& segment_fetch_counts, - std::vector&& segment_and_slice_future_splitters, - std::shared_ptr>&& id_to_pos, - std::shared_ptr>>& clauses) { - // Used to make sure each entity is only added into the component manager once - auto slice_added_mtx = std::make_shared>(num_segments); - auto slice_added = std::make_shared>(num_segments, false); - auto futures = std::make_shared>>>(); - - for (auto&& entity_ids: entities_by_work_unit) { - std::vector> local_futs; - local_futs.reserve(entity_ids.size()); - for (auto id: entity_ids) { - const auto pos = id_to_pos->at(id); - auto& future_or_splitter = segment_and_slice_future_splitters[pos]; - // Some of the entities for this unit of work may be shared with other units of work - util::variant_match(future_or_splitter, - [&local_futs] (folly::Future& fut) { - local_futs.emplace_back(std::move(fut)); - }, - [&local_futs] (folly::FutureSplitter& splitter) { - local_futs.emplace_back(splitter.getFuture()); - }); - } - - futures->emplace_back( - folly::collect(local_futs) - .via(&async::io_executor()) // Stay on the same executor as the read so that we can inline if possible - .thenValueInline([component_manager, segment_fetch_counts, id_to_pos, slice_added_mtx, slice_added, clauses,entity_ids = std::move(entity_ids)] - (std::vector&& segment_and_slices) mutable { - for (auto&& [idx, segment_and_slice]: folly::enumerate(segment_and_slices)) { - auto entity_id = entity_ids[idx]; - auto pos = id_to_pos->at(entity_id); - std::lock_guard lock{slice_added_mtx->at(pos)}; - if (!(*slice_added)[pos]) { - ARCTICDB_DEBUG(log::version(), "Adding entity {}", entity_id); - add_slice_to_component_manager(entity_id, segment_and_slice, component_manager, segment_fetch_counts->at(pos)); - (*slice_added)[pos] = true; - } - } - return async::MemSegmentProcessingTask(*clauses, std::move(entity_ids))(); - })); - } - return futures; -} - -folly::Future> schedule_clause_processing( - std::shared_ptr component_manager, - std::vector>&& segment_and_slice_futures, - std::vector>&& processing_unit_indexes, - std::shared_ptr>> clauses) { - // All the shared pointers as arguments to this function and created within it are to ensure that resources are - // correctly kept alive after this function returns its future - const auto num_segments = segment_and_slice_futures.size(); - - // Map from index in segment_and_slice_future_splitters to the number of calls to process in the first clause that - // will require that segment - auto segment_fetch_counts = generate_segment_fetch_counts(processing_unit_indexes, num_segments); - - auto segment_and_slice_future_splitters = split_futures(std::move(segment_and_slice_futures), *segment_fetch_counts); - - auto [entities_by_work_unit, entity_id_to_segment_pos] = get_entity_ids_and_position_map(component_manager, num_segments, std::move(processing_unit_indexes)); - - // At this point we have a set of entity ids grouped by the work units produced by the original structure_for_processing, - // and a map of those ids to the position in the vector of futures or future-splitters (which is the same order as - // originally generated from the index via the pipeline_context and ranges_and_keys), so we can add each entity id and - // its components to the component manager and schedule the first stage of work (i.e. from the beginning until either - // the end of the pipeline or the next required structure_for_processing - auto futures = schedule_first_iteration( - component_manager, - num_segments, - std::move(entities_by_work_unit), - std::move(segment_fetch_counts), - std::move(segment_and_slice_future_splitters), - std::move(entity_id_to_segment_pos), - clauses); - - auto entity_ids_vec_fut = folly::collect(*futures).via(&async::io_executor()); - - const auto scheduling_iterations = num_scheduling_iterations(*clauses); - for (auto i = 1UL; i < scheduling_iterations; ++i) { - entity_ids_vec_fut = std::move(entity_ids_vec_fut).thenValue([clauses, scheduling_iterations, i] (std::vector>&& entity_id_vectors) { - ARCTICDB_RUNTIME_DEBUG(log::memory(), "Scheduling iteration {} of {}", i, scheduling_iterations); - - util::check(!clauses->empty(), "Scheduling iteration {} has no clauses to process", scheduling_iterations); - remove_processed_clauses(*clauses); - auto next_units_of_work = clauses->front()->structure_for_processing(std::move(entity_id_vectors)); - - std::vector>> work_futures; - for(auto&& unit_of_work : next_units_of_work) { - ARCTICDB_RUNTIME_DEBUG(log::memory(), "Scheduling work for entity ids: {}", unit_of_work); - work_futures.emplace_back(async::submit_cpu_task(async::MemSegmentProcessingTask{*clauses, std::move(unit_of_work)})); - } - - return folly::collect(work_futures).via(&async::io_executor()); - }); - } - - return std::move(entity_ids_vec_fut).thenValueInline([](std::vector>&& entity_id_vectors) { - return flatten_entities(std::move(entity_id_vectors)); - }); -} - -void set_output_descriptors( - const ProcessingUnit& proc, - const std::vector>& clauses, - const std::shared_ptr& pipeline_context) { - std::optional index_column; - for (auto clause = clauses.rbegin(); clause != clauses.rend(); ++clause) { - bool should_break = util::variant_match( - (*clause)->clause_info().index_, - [](const KeepCurrentIndex&) { return false; }, - [&](const KeepCurrentTopLevelIndex&) { - if (pipeline_context->norm_meta_->mutable_df()->mutable_common()->has_multi_index()) { - const auto& multi_index = pipeline_context->norm_meta_->mutable_df()->mutable_common()->multi_index(); - auto name = multi_index.name(); - auto tz = multi_index.tz(); - bool fake_name{false}; - for (auto pos: multi_index.fake_field_pos()) { - if (pos == 0) { - fake_name = true; - break; - } - } - auto mutable_index = pipeline_context->norm_meta_->mutable_df()->mutable_common()->mutable_index(); - mutable_index->set_tz(tz); - mutable_index->set_is_physically_stored(true); - mutable_index->set_name(name); - mutable_index->set_fake_name(fake_name); - } - return true; - }, - [&](const NewIndex& new_index) { - index_column = new_index; - auto mutable_index = pipeline_context->norm_meta_->mutable_df()->mutable_common()->mutable_index(); - mutable_index->set_name(new_index); - mutable_index->clear_fake_name(); - mutable_index->set_is_physically_stored(true); - return true; - }); - if (should_break) { - break; - } - } - std::optional new_stream_descriptor; - if (proc.segments_.has_value() && !proc.segments_->empty()) { - new_stream_descriptor = std::make_optional(); - new_stream_descriptor->set_index(proc.segments_->at(0)->descriptor().index()); - for (size_t idx = 0; idx < new_stream_descriptor->index().field_count(); idx++) { - new_stream_descriptor->add_field(proc.segments_->at(0)->descriptor().field(idx)); - } - } - if (new_stream_descriptor.has_value() && proc.segments_.has_value()) { - std::vector> fields; - for (const auto& segment: *proc.segments_) { - fields.push_back(segment->descriptor().fields_ptr()); - } - new_stream_descriptor = merge_descriptors(*new_stream_descriptor, - fields, - std::vector{}); - } - if (new_stream_descriptor.has_value()) { - // Finding and erasing fields from the FieldCollection contained in StreamDescriptor is O(n) in number of fields - // So maintain map from field names to types in the new_stream_descriptor to make these operations O(1) - // Cannot use set of FieldRef as the name in the output might match the input, but with a different type after processing - std::unordered_map new_fields; - for (const auto& field: new_stream_descriptor->fields()) { - new_fields.emplace(field.name(), field.type()); - } - // Columns might be in a different order to the original dataframe, so reorder here - auto original_stream_descriptor = pipeline_context->descriptor(); - StreamDescriptor final_stream_descriptor{original_stream_descriptor.id()}; - final_stream_descriptor.set_index(new_stream_descriptor->index()); - // Erase field from new_fields as we add them to final_stream_descriptor, as all fields left in new_fields - // after these operations were created by the processing pipeline, and so should be appended - // Index columns should always appear first - if (index_column.has_value()) { - const auto nh = new_fields.extract(*index_column); - internal::check(!nh.empty(), "New index column not found in processing pipeline"); - final_stream_descriptor.add_field(FieldRef{nh.mapped(), nh.key()}); - } - for (const auto& field: original_stream_descriptor.fields()) { - if (const auto nh = new_fields.extract(field.name()); nh) { - final_stream_descriptor.add_field(FieldRef{nh.mapped(), nh.key()}); - } - } - // Iterate through new_stream_descriptor->fields() rather than remaining new_fields to preserve ordering - // e.g. if there were two projections then users will expect the column produced by the first one to appear - // first in the output df - for (const auto& field: new_stream_descriptor->fields()) { - if (new_fields.contains(field.name())) { - final_stream_descriptor.add_field(field); - } - } - pipeline_context->set_descriptor(final_stream_descriptor); - } -} - -std::shared_ptr> columns_to_decode(const std::shared_ptr& pipeline_context) { - std::shared_ptr> res; - ARCTICDB_DEBUG(log::version(), "Creating columns list with {} bits set", pipeline_context->overall_column_bitset_ ? pipeline_context->overall_column_bitset_->count() : -1); - if(pipeline_context->overall_column_bitset_) { - res = std::make_shared>(); - auto en = pipeline_context->overall_column_bitset_->first(); - auto en_end = pipeline_context->overall_column_bitset_->end(); - while (en < en_end) { - ARCTICDB_DEBUG(log::version(), "Adding field {}", pipeline_context->desc_->field(*en).name()); - res->insert(std::string(pipeline_context->desc_->field(*en++).name())); - } - } - return res; -} - -std::vector generate_ranges_and_keys(PipelineContext& pipeline_context) { - std::vector res; - res.reserve(pipeline_context.slice_and_keys_.size()); - bool is_incomplete{false}; - for (auto it = pipeline_context.begin(); it != pipeline_context.end(); it++) { - if (it == pipeline_context.incompletes_begin()) { - is_incomplete = true; - } - auto& sk = it->slice_and_key(); - // Take a copy here as things like defrag need the keys in pipeline_context->slice_and_keys_ that aren't being modified at the end - auto key = sk.key(); - res.emplace_back(sk.slice(), std::move(key), is_incomplete); - } - return res; -} - -util::BitSet get_incompletes_bitset(const std::vector& all_ranges) { - util::BitSet output(all_ranges.size()); - util::BitSet::bulk_insert_iterator it(output); - for(auto&& [index, range] : folly::enumerate(all_ranges)) { - if(range.is_incomplete()) - it = index; - } - it.flush(); - return output; -} - -std::vector> add_schema_check( - const std::shared_ptr &pipeline_context, - std::vector>&& segment_and_slice_futures, - util::BitSet&& incomplete_bitset, - const ProcessingConfig &processing_config) { - std::vector> res; - res.reserve(segment_and_slice_futures.size()); - for (size_t i = 0; i < segment_and_slice_futures.size(); ++i) { - auto&& fut = segment_and_slice_futures.at(i); - const bool is_incomplete = incomplete_bitset[i]; - if (is_incomplete) { - res.push_back( - std::move(fut) - .thenValueInline([pipeline_desc=pipeline_context->descriptor(), processing_config](SegmentAndSlice &&read_result) { - if (!processing_config.dynamic_schema_) { - auto check = check_schema_matches_incomplete(read_result.segment_in_memory_.descriptor(), pipeline_desc); - if (std::holds_alternative(check)) { - std::get(check).throw_error(); - } - } - return std::move(read_result); - })); - } else { - res.push_back(std::move(fut)); - } - } - return res; -} - -std::vector> generate_segment_and_slice_futures( - const std::shared_ptr &store, - const std::shared_ptr &pipeline_context, - const ProcessingConfig &processing_config, - std::vector&& all_ranges) { - auto incomplete_bitset = get_incompletes_bitset(all_ranges); - auto segment_and_slice_futures = store->batch_read_uncompressed(std::move(all_ranges), columns_to_decode(pipeline_context)); - return add_schema_check(pipeline_context, std::move(segment_and_slice_futures), std::move(incomplete_bitset), processing_config); -} - -/* - * Processes the slices in the given pipeline_context. - * - * Slices are processed in an order defined by the first clause in the pipeline, with slices corresponding to the same - * processing unit collected into a single ProcessingUnit. Slices contained within a single ProcessingUnit are processed - * within a single thread. - * - * The processing of a ProcessingUnit is scheduled via the Async Store. Within a single thread, the - * segments will be retrieved from storage and decompressed before being passed to a MemSegmentProcessingTask which - * will process all clauses up until a clause that requires a repartition. - */ -folly::Future> read_and_process( - const std::shared_ptr& store, - const std::shared_ptr& pipeline_context, - const std::shared_ptr& read_query, - const ReadOptions& read_options - ) { - auto component_manager = std::make_shared(); - ProcessingConfig processing_config{opt_false(read_options.dynamic_schema_), pipeline_context->rows_}; - for (auto& clause: read_query->clauses_) { - clause->set_processing_config(processing_config); - clause->set_component_manager(component_manager); - } - - auto ranges_and_keys = generate_ranges_and_keys(*pipeline_context); - - // Each element of the vector corresponds to one processing unit containing the list of indexes in ranges_and_keys required for that processing unit - // i.e. if the first processing unit needs ranges_and_keys[0] and ranges_and_keys[1], and the second needs ranges_and_keys[2] and ranges_and_keys[3] - // then the structure will be {{0, 1}, {2, 3}} - std::vector> processing_unit_indexes = read_query->clauses_[0]->structure_for_processing(ranges_and_keys); - - // Start reading as early as possible - auto segment_and_slice_futures = generate_segment_and_slice_futures(store, pipeline_context, processing_config, std::move(ranges_and_keys)); - - return schedule_clause_processing( - component_manager, - std::move(segment_and_slice_futures), - std::move(processing_unit_indexes), - std::make_shared>>(read_query->clauses_)) - .via(&async::cpu_executor()) - .thenValue([component_manager, read_query, pipeline_context](std::vector&& processed_entity_ids) { - auto proc = gather_entities, std::shared_ptr, std::shared_ptr>(*component_manager, std::move(processed_entity_ids)); - - if (std::any_of(read_query->clauses_.begin(), read_query->clauses_.end(), [](const std::shared_ptr& clause) { - return clause->clause_info().modifies_output_descriptor_; - })) { - set_output_descriptors(proc, read_query->clauses_, pipeline_context); - } - return collect_segments(std::move(proc)); - }); -} - void add_index_columns_to_query(const ReadQuery& read_query, const TimeseriesDescriptor& desc) { if (read_query.columns.has_value()) { auto index_columns = stream::get_index_columns_from_descriptor(desc); @@ -1093,12 +682,10 @@ bool read_incompletes_to_pipeline( } if (dynamic_schema) { - pipeline_context->staged_descriptor_ = - merge_descriptors(seg.descriptor(), incomplete_segments, read_query.columns); + pipeline_context->staged_descriptor_ = merge_descriptors(seg.descriptor(), incomplete_segments, read_query.columns); if (pipeline_context->desc_) { const std::array fields_ptr = {pipeline_context->desc_->fields_ptr()}; - pipeline_context->desc_ = - merge_descriptors(*pipeline_context->staged_descriptor_, fields_ptr, read_query.columns); + pipeline_context->desc_ = merge_descriptors(*pipeline_context->staged_descriptor_, fields_ptr, read_query.columns); } else { pipeline_context->desc_ = pipeline_context->staged_descriptor_; } @@ -1188,179 +775,6 @@ void check_incompletes_index_ranges_dont_overlap(const std::shared_ptr(source_index)); - auto& dst_column = destination.column(static_cast(target_index)); - auto& buffer = dst_column.data().buffer(); - auto dst_rawtype_size = data_type_size(dst_column.type(), DataTypeMode::EXTERNAL); - auto offset = dst_rawtype_size * (row_range.first - destination.offset()); - auto total_size = dst_rawtype_size * num_rows; - buffer.assert_size(offset + total_size); - - auto src_data = src_column.data(); - auto dst_ptr = buffer.data() + offset; - - auto type_promotion_error_msg = fmt::format("Can't promote type {} to type {} in field {}", - src_column.type(), dst_column.type(), destination.field(target_index).name()); - if(auto handler = get_type_handler(src_column.type(), dst_column.type()); handler) { - handler->convert_type(src_column, buffer, num_rows, offset, src_column.type(), dst_column.type(), shared_data, handler_data, source.string_pool_ptr()); - } else if (is_empty_type(src_column.type().data_type())) { - dst_column.type().visit_tag([&](auto dst_desc_tag) { - util::default_initialize(dst_ptr, num_rows * dst_rawtype_size); - }); - // Do not use src_column.is_sparse() here, as that misses columns that are dense, but have fewer than num_rows values - } else if (src_column.opt_sparse_map().has_value() && has_valid_type_promotion(src_column.type(), dst_column.type())) { - details::visit_type(dst_column.type().data_type(), [&](auto dst_tag) { - using dst_type_info = ScalarTypeInfo; - util::default_initialize(dst_ptr, num_rows * dst_rawtype_size); - auto typed_dst_ptr = reinterpret_cast(dst_ptr); - details::visit_type(src_column.type().data_type(), [&](auto src_tag) { - using src_type_info = ScalarTypeInfo; - Column::for_each_enumerated(src_column, [typed_dst_ptr](auto enumerating_it) { - typed_dst_ptr[enumerating_it.idx()] = static_cast(enumerating_it.value()); - }); - }); - }); - } else if (trivially_compatible_types(src_column.type(), dst_column.type())) { - details::visit_type(src_column.type().data_type() ,[&src_data, &dst_ptr] (auto src_desc_tag) { - using SourceTDT = ScalarTagType; - using SourceType = typename decltype(src_desc_tag)::DataTypeTag::raw_type; - while (auto block = src_data.template next()) { - const auto row_count = block->row_count(); - memcpy(dst_ptr, block->data(), row_count * sizeof(SourceType)); - dst_ptr += row_count * sizeof(SourceType); - } - }); - } else if (has_valid_type_promotion(src_column.type(), dst_column.type())) { - details::visit_type(dst_column.type().data_type() ,[&src_data, &dst_ptr, &src_column, &type_promotion_error_msg] (auto dest_desc_tag) { - using DestinationType = typename decltype(dest_desc_tag)::DataTypeTag::raw_type; - auto typed_dst_ptr = reinterpret_cast(dst_ptr); - details::visit_type(src_column.type().data_type() ,[&src_data, &typed_dst_ptr, &type_promotion_error_msg] (auto src_desc_tag) { - using source_type_info = ScalarTypeInfo; - if constexpr(std::is_arithmetic_v && std::is_arithmetic_v) { - const auto src_cend = src_data.cend(); - for (auto src_it = src_data.cbegin(); src_it != src_cend; ++src_it) { - *typed_dst_ptr++ = static_cast(*src_it); - } - } else { - util::raise_rte(type_promotion_error_msg.c_str()); - } - }); - }); - } else { - util::raise_rte(type_promotion_error_msg.c_str()); - } -} - -struct CopyToBufferTask : async::BaseTask { - SegmentInMemory&& source_segment_; - SegmentInMemory target_segment_; - FrameSlice frame_slice_; - DecodePathData shared_data_; - std::any& handler_data_; - bool fetch_index_; - - CopyToBufferTask( - SegmentInMemory&& source_segment, - SegmentInMemory target_segment, - FrameSlice frame_slice, - DecodePathData shared_data, - std::any& handler_data, - bool fetch_index) : - source_segment_(std::move(source_segment)), - target_segment_(std::move(target_segment)), - frame_slice_(std::move(frame_slice)), - shared_data_(std::move(shared_data)), - handler_data_(handler_data), - fetch_index_(fetch_index) { - - } - - folly::Unit operator()() { - const auto index_field_count = get_index_field_count(target_segment_); - for (auto idx = 0u; idx < index_field_count && fetch_index_; ++idx) { - copy_frame_data_to_buffer(target_segment_, idx, source_segment_, idx, frame_slice_.row_range, shared_data_, handler_data_); - } - - auto field_count = frame_slice_.col_range.diff() + index_field_count; - internal::check( - field_count == source_segment_.descriptor().field_count(), - "Column range does not match segment descriptor field count in copy_segments_to_frame: {} != {}", - field_count, source_segment_.descriptor().field_count()); - - const auto& fields = source_segment_.descriptor().fields(); - for (auto field_col = index_field_count; field_col < field_count; ++field_col) { - const auto& field = fields.at(field_col); - const auto& field_name = field.name(); - auto frame_loc_opt = target_segment_.column_index(field_name); - if (!frame_loc_opt) - continue; - - copy_frame_data_to_buffer(target_segment_, *frame_loc_opt, source_segment_, field_col, frame_slice_.row_range, shared_data_, handler_data_); - } - return folly::Unit{}; - } -}; - -folly::Future copy_segments_to_frame( - const std::shared_ptr& store, - const std::shared_ptr& pipeline_context, - SegmentInMemory frame, - std::any& handler_data) { - std::vector> copy_tasks; - DecodePathData shared_data; - for (auto context_row : folly::enumerate(*pipeline_context)) { - auto &slice_and_key = context_row->slice_and_key(); - auto &segment = slice_and_key.segment(store); - - copy_tasks.emplace_back(async::submit_cpu_task( - CopyToBufferTask{ - std::move(segment), - frame, - context_row->slice_and_key().slice(), - shared_data, - handler_data, - context_row->fetch_index()})); - } - return folly::collect(copy_tasks).via(&async::cpu_executor()).unit(); -} - -folly::Future prepare_output_frame( - std::vector&& items, - const std::shared_ptr& pipeline_context, - const std::shared_ptr& store, - const ReadOptions& read_options, - std::any& handler_data) { - pipeline_context->clear_vectors(); - pipeline_context->slice_and_keys_ = std::move(items); - std::sort(std::begin(pipeline_context->slice_and_keys_), std::end(pipeline_context->slice_and_keys_), [] (const auto& left, const auto& right) { - return std::tie(left.slice_.row_range, left.slice_.col_range) < std::tie(right.slice_.row_range, right.slice_.col_range); - }); - adjust_slice_rowcounts(pipeline_context); - const auto dynamic_schema = opt_false(read_options.dynamic_schema_); - mark_index_slices(pipeline_context, dynamic_schema, pipeline_context->bucketize_dynamic_); - pipeline_context->ensure_vectors(); - - for(auto row : *pipeline_context) { - row.set_compacted(false); - row.set_descriptor(row.slice_and_key().segment(store).descriptor_ptr()); - row.set_string_pool(row.slice_and_key().segment(store).string_pool_ptr()); - } - - auto frame = allocate_frame(pipeline_context); - return copy_segments_to_frame(store, pipeline_context, frame, handler_data).thenValue([frame](auto&&){ return frame; }); -} AtomKey index_key_to_column_stats_key(const IndexTypeKey& index_key) { // Note that we use the creation timestamp and content hash of the related index key @@ -1510,31 +924,6 @@ ColumnStats get_column_stats_info_impl( } } -folly::Future do_direct_read_or_process( - const std::shared_ptr& store, - const std::shared_ptr& read_query, - const ReadOptions& read_options, - const std::shared_ptr& pipeline_context, - const DecodePathData& shared_data, - std::any& handler_data) { - if(!read_query->clauses_.empty()) { - ARCTICDB_SAMPLE(RunPipelineAndOutput, 0) - util::check_rte(!pipeline_context->is_pickled(),"Cannot filter pickled data"); - return read_and_process(store, pipeline_context, read_query, read_options) - .thenValue([store, pipeline_context, &read_options, &handler_data](std::vector&& segs) { - return prepare_output_frame(std::move(segs), pipeline_context, store, read_options, handler_data); - }); - } else { - ARCTICDB_SAMPLE(MarkAndReadDirect, 0) - util::check_rte(!(pipeline_context->is_pickled() && std::holds_alternative(read_query->row_filter)), "Cannot use head/tail/row_range with pickled data, use plain read instead"); - mark_index_slices(pipeline_context, opt_false(read_options.dynamic_schema_), pipeline_context->bucketize_dynamic_); - auto frame = allocate_frame(pipeline_context); - util::print_total_mem_usage(__FILE__, __LINE__, __FUNCTION__); - ARCTICDB_DEBUG(log::version(), "Fetching frame data"); - return fetch_data(std::move(frame), pipeline_context, store, opt_false(read_options.dynamic_schema_), shared_data, handler_data); - } -} - VersionedItem collate_and_write( const std::shared_ptr& store, const std::shared_ptr& pipeline_context, @@ -1965,7 +1354,7 @@ void set_row_id_if_index_only( folly::Future read_frame_for_version( const std::shared_ptr& store, const std::variant& version_info, - const std::shared_ptr& read_query , + const std::shared_ptr& read_query, const ReadOptions& read_options, std::any& handler_data) { using namespace arcticdb::pipelines; @@ -2012,7 +1401,7 @@ folly::Future read_frame_for_version( ARCTICDB_DEBUG(log::version(), "Fetching data to frame"); DecodePathData shared_data; - return version_store::do_direct_read_or_process(store, read_query, read_options, pipeline_context, shared_data, handler_data) + return do_direct_read_or_process(store, read_query, read_options, pipeline_context, shared_data, handler_data) .thenValue([res_versioned_item, pipeline_context, &read_options, &handler_data, read_query, shared_data](auto&& frame) mutable { ARCTICDB_DEBUG(log::version(), "Reduce and fix columns"); return reduce_and_fix_columns(pipeline_context, frame, read_options, handler_data) @@ -2049,29 +1438,4 @@ bool is_segment_unsorted(const SegmentInMemory& segment) { return segment.descriptor().sorted() == SortedValue::DESCENDING || segment.descriptor().sorted() == SortedValue::UNSORTED; } -CheckOutcome check_schema_matches_incomplete(const StreamDescriptor& stream_descriptor_incomplete, const StreamDescriptor& pipeline_desc) { - // We need to check that the index names match regardless of the dynamic schema setting - if(!index_names_match(stream_descriptor_incomplete, pipeline_desc)) { - return Error{ - throw_error, - fmt::format("{} All staged segments must have the same index names." - "{} is different than {}", - error_code_data.name_, - stream_descriptor_incomplete, - pipeline_desc) - }; - } - if (!columns_match(stream_descriptor_incomplete, pipeline_desc)) { - return Error{ - throw_error, - fmt::format("{} When static schema is used all staged segments must have the same column and column types." - "{} is different than {}", - error_code_data.name_, - stream_descriptor_incomplete, - pipeline_desc) - }; - } - return std::monostate{}; -} - } diff --git a/cpp/arcticdb/version/version_core.hpp b/cpp/arcticdb/version/version_core.hpp index f9e0ae9294..c7f1e51ce3 100644 --- a/cpp/arcticdb/version/version_core.hpp +++ b/cpp/arcticdb/version/version_core.hpp @@ -23,6 +23,8 @@ #include #include #include +#include +#include #include @@ -32,25 +34,13 @@ using namespace arcticdb::entity; using namespace arcticdb::pipelines; struct CompactIncompleteOptions { - bool prune_previous_versions_; - bool append_; - bool convert_int_to_float_; - bool via_iteration_; - bool sparsify_; - bool validate_index_{true}; // Default value as unused in sort_merge - bool delete_staged_data_on_failure_{false}; -}; - -struct ReadVersionOutput { - ReadVersionOutput() = delete; - ReadVersionOutput(VersionedItem&& versioned_item, FrameAndDescriptor&& frame_and_descriptor): - versioned_item_(std::move(versioned_item)), - frame_and_descriptor_(std::move(frame_and_descriptor)) {} - - ARCTICDB_MOVE_ONLY_DEFAULT(ReadVersionOutput) - - VersionedItem versioned_item_; - FrameAndDescriptor frame_and_descriptor_; + bool prune_previous_versions_ = false; + bool append_ = false; + bool convert_int_to_float_ = false; + bool via_iteration_ = false; + bool sparsify_ = false; + bool validate_index_ = true; // Default value as unused in sort_merge + bool delete_staged_data_on_failure_ = false; }; VersionedItem write_dataframe_impl( @@ -132,12 +122,6 @@ folly::Future read_multi_key( const SegmentInMemory& index_key_seg, std::any& handler_data); -folly::Future> schedule_clause_processing( - std::shared_ptr component_manager, - std::vector>&& segment_and_slice_futures, - std::vector>&& processing_unit_indexes, - std::shared_ptr>> clauses); - FrameAndDescriptor read_segment_impl( const std::shared_ptr& store, const VariantKey& key); @@ -243,19 +227,7 @@ std::optional get_delete_keys_on_failure( namespace arcticdb { -struct Error { - - explicit Error(folly::Function raiser, std::string msg); - void throw_error(); - folly::Function raiser_; - std::string msg_; -}; - -using CheckOutcome = std::variant; -using StaticSchemaCompactionChecks = folly::Function; -using CompactionWrittenKeys = std::vector; -using CompactionResult = std::variant; void remove_written_keys(Store* store, CompactionWrittenKeys&& written_keys); @@ -341,8 +313,6 @@ template (); + return read_dataframe_chunked_internal(stream_id, version_query, read_query, read_options, handler_data, shared_data); +} + VersionedItem PythonVersionStore::write_versioned_composite_data( const StreamId& stream_id, const py::object &metastruct, diff --git a/cpp/arcticdb/version/version_store_api.hpp b/cpp/arcticdb/version/version_store_api.hpp index 9539856985..2c152babab 100644 --- a/cpp/arcticdb/version/version_store_api.hpp +++ b/cpp/arcticdb/version/version_store_api.hpp @@ -304,6 +304,13 @@ class PythonVersionStore : public LocalVersionedEngine { const std::vector& version_queries, const ReadOptions& read_options); + ChunkIterator read_dataframe_chunked( + const StreamId &stream_id, + const VersionQuery& version_query, + ReadQuery& read_query, + const ReadOptions& read_options, + std::any& handler_data); + std::set list_streams( const std::optional& snap_name = std::nullopt, const std::optional ®ex = std::nullopt,