Skip to content

Commit

Permalink
Resampling MVP (#1495)
Browse files Browse the repository at this point in the history
Closes #1010
  • Loading branch information
alexowens90 authored May 30, 2024
1 parent 53ff421 commit 81ee498
Show file tree
Hide file tree
Showing 33 changed files with 3,039 additions and 119 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ __pycache__/
.vscode/
.vs/
.project
.idea

*.so
*.a
Expand Down
13 changes: 10 additions & 3 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ set(arcticdb_srcs
pipeline/value_set.hpp
pipeline/write_frame.hpp
pipeline/write_options.hpp
processing/aggregation.hpp
processing/aggregation_utils.hpp
processing/component_manager.hpp
processing/operation_dispatch.hpp
processing/operation_dispatch_binary.hpp
Expand All @@ -263,6 +263,8 @@ set(arcticdb_srcs
processing/clause.hpp
processing/expression_context.hpp
processing/expression_node.hpp
processing/sorted_aggregation.hpp
processing/unsorted_aggregation.hpp
storage/constants.hpp
storage/common.hpp
storage/config_resolvers.hpp
Expand Down Expand Up @@ -426,7 +428,7 @@ set(arcticdb_srcs
pipeline/write_frame.cpp
python/normalization_checks.cpp
processing/processing_unit.cpp
processing/aggregation.cpp
processing/aggregation_utils.cpp
processing/clause.cpp
processing/component_manager.cpp
processing/expression_node.cpp
Expand All @@ -437,6 +439,8 @@ set(arcticdb_srcs
processing/operation_dispatch_binary_gt.cpp
processing/operation_dispatch_binary_lt.cpp
processing/operation_dispatch_binary_operator.cpp
processing/sorted_aggregation.cpp
processing/unsorted_aggregation.cpp
python/python_to_tensor_frame.cpp
storage/config_resolvers.cpp
storage/failure_simulation.cpp
Expand Down Expand Up @@ -787,6 +791,7 @@ if(${TEST})
processing/test/test_filter_and_project_sparse.cpp
processing/test/test_has_valid_type_promotion.cpp
processing/test/test_operation_dispatch.cpp
processing/test/test_resample.cpp
processing/test/test_set_membership.cpp
processing/test/test_signed_unsigned_comparison.cpp
processing/test/test_type_comparison.cpp
Expand Down Expand Up @@ -932,12 +937,14 @@ if(${TEST})
column_store/test/rapidcheck_column_data_random_accessor.cpp
column_store/test/rapidcheck_column_map.cpp
column_store/test/test_chunked_buffer.cpp
processing/test/rapidcheck_resample.cpp
stream/test/stream_test_common.cpp
util/test/rapidcheck_decimal.cpp
util/test/rapidcheck_generators.cpp
util/test/rapidcheck_string_pool.cpp
util/test/rapidcheck_main.cpp
version/test/rapidcheck_version_map.cpp)
version/test/rapidcheck_version_map.cpp
)

add_executable(arcticdb_rapidcheck_tests ${rapidcheck_srcs})
install(TARGETS arcticdb_rapidcheck_tests RUNTIME
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/async/task_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class TaskScheduler {
using CPUSchedulerType = folly::FutureExecutor<folly::CPUThreadPoolExecutor>;
using IOSchedulerType = folly::FutureExecutor<folly::IOThreadPoolExecutor>;

explicit TaskScheduler(const std::optional<size_t>& cpu_thread_count = std::nullopt, const std::optional<size_t>& io_thread_count = std::nullopt) :
explicit TaskScheduler(const std::optional<size_t>& cpu_thread_count = std::nullopt, const std::optional<size_t>& io_thread_count = std::nullopt) :
cpu_thread_count_(cpu_thread_count ? *cpu_thread_count : ConfigsMap::instance()->get_int("VersionStore.NumCPUThreads", get_default_num_cpus())),
io_thread_count_(io_thread_count ? *io_thread_count : ConfigsMap::instance()->get_int("VersionStore.NumIOThreads", std::min(100, (int) (cpu_thread_count_ * 1.5)))),
cpu_exec_(cpu_thread_count_, std::make_shared<InstrumentedNamedFactory>("CPUPool")) ,
Expand Down
18 changes: 14 additions & 4 deletions cpp/arcticdb/async/tasks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <arcticdb/codec/codec.hpp>

#include <type_traits>
#include <ranges>

namespace arcticdb::async {

Expand Down Expand Up @@ -405,12 +406,21 @@ struct MemSegmentProcessingTask : BaseTask {
ARCTICDB_MOVE_ONLY_DEFAULT(MemSegmentProcessingTask)

Composite<EntityIds> operator()() {
for(const auto& clause : clauses_) {
entity_ids_ = clause->process(std::move(entity_ids_));

if(clause->clause_info().requires_repartition_)
// TODO: Replace with commented out code once C++20 is reinstated
// std::ranges::reverse_view reversed_clauses{clauses_};
// for (const auto& clause: reversed_clauses) {
// entity_ids_ = clause->process(std::move(entity_ids_));
//
// if(clause->clause_info().requires_repartition_)
// break;
// }
for (auto clause = clauses_.crbegin(); clause != clauses_.crend(); ++clause) {
entity_ids_ = (*clause)->process(std::move(entity_ids_));

if((*clause)->clause_info().requires_repartition_)
break;
}
// end TODO
return std::move(entity_ids_);
}

Expand Down
19 changes: 19 additions & 0 deletions cpp/arcticdb/column_store/chunked_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,25 @@ class ChunkedBufferImpl {
return res;
}

// Trim will reduce the size of the chunked buffer to the specified size by dropping blocks that are wholly unneeded
// i.e. no allocation/memcpy is involved, only deallocation
// Use in performance critical code where a column is being created, the final size is unknown at construction
// time but a maximum size is known, by creating a Column using a chunked buffer that is presized in blocks. This
// unlocks ColumnDataIterator usage (more performant than repeated calls to Column::push_back). Once the column is
// created and the number of elements known, use this to drop unneeded blocks.
void trim(size_t requested_size) {
internal::check<ErrorCode::E_ASSERTION_FAILURE>(requested_size <= bytes_,
"Cannot trim ChunkedBuffer with {} bytes to {} bytes",
bytes_,
requested_size);
while (bytes_ - last_block().bytes() >= requested_size) {
bytes_ -= last_block().bytes();
free_last_block();
}
last_block().resize(last_block().bytes() - (bytes_ - requested_size));
bytes_ = requested_size;
}

struct BlockAndOffset {
MemBlock* block_;
size_t offset_;
Expand Down
12 changes: 8 additions & 4 deletions cpp/arcticdb/column_store/column.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -768,17 +768,19 @@ class Column {
// One sparse, one dense. Use the enumerating forward iterator over the sparse column as it is more efficient than random access
auto right_accessor = random_accessor<right_input_tdt>(&right_input_data);
const auto right_column_row_count = right_input_column.row_count();
const auto left_input_data_cend = left_input_data.cend<left_input_tdt, IteratorType::ENUMERATED, IteratorDensity::SPARSE>();
for (auto left_it = left_input_data.cbegin<left_input_tdt, IteratorType::ENUMERATED, IteratorDensity::SPARSE>();
left_it != left_input_data.cend<left_input_tdt, IteratorType::ENUMERATED, IteratorDensity::SPARSE>() && left_it->idx() < right_column_row_count;
left_it != left_input_data_cend && left_it->idx() < right_column_row_count;
++left_it) {
*output_it++ = f(left_it->value(), right_accessor.at(left_it->idx()));
}
} else if (!left_input_column.is_sparse() && right_input_column.is_sparse()) {
// One sparse, one dense. Use the enumerating forward iterator over the sparse column as it is more efficient than random access
auto left_accessor = random_accessor<left_input_tdt>(&left_input_data);
const auto left_column_row_count = left_input_column.row_count();
const auto right_input_data_cend = right_input_data.cend<right_input_tdt, IteratorType::ENUMERATED, IteratorDensity::SPARSE>();
for (auto right_it = right_input_data.cbegin<right_input_tdt, IteratorType::ENUMERATED, IteratorDensity::SPARSE>();
right_it != right_input_data.cend<right_input_tdt, IteratorType::ENUMERATED, IteratorDensity::SPARSE>() && right_it->idx() < left_column_row_count;
right_it != right_input_data_cend && right_it->idx() < left_column_row_count;
++right_it) {
*output_it++ = f(left_accessor.at(right_it->idx()), right_it->value());
}
Expand Down Expand Up @@ -871,8 +873,9 @@ class Column {
initialise_output_bitset(left_input_column.sparse_map(), sparse_missing_value_output, output_bitset);
auto right_accessor = random_accessor<right_input_tdt>(&right_input_data);
const auto right_column_row_count = right_input_column.row_count();
const auto left_input_data_cend = left_input_data.cend<left_input_tdt, IteratorType::ENUMERATED, IteratorDensity::SPARSE>();
for (auto left_it = left_input_data.cbegin<left_input_tdt, IteratorType::ENUMERATED, IteratorDensity::SPARSE>();
left_it != left_input_data.cend<left_input_tdt, IteratorType::ENUMERATED, IteratorDensity::SPARSE>() && left_it->idx() < right_column_row_count;
left_it != left_input_data_cend && left_it->idx() < right_column_row_count;
++left_it) {
if(f(left_it->value(), right_accessor.at(left_it->idx()))) {
inserter = left_it->idx();
Expand All @@ -883,8 +886,9 @@ class Column {
initialise_output_bitset(right_input_column.sparse_map(), sparse_missing_value_output, output_bitset);
auto left_accessor = random_accessor<left_input_tdt>(&left_input_data);
const auto left_column_row_count = left_input_column.row_count();
const auto right_input_data_cend = right_input_data.cend<right_input_tdt, IteratorType::ENUMERATED, IteratorDensity::SPARSE>();
for (auto right_it = right_input_data.cbegin<right_input_tdt, IteratorType::ENUMERATED, IteratorDensity::SPARSE>();
right_it != right_input_data.cend<right_input_tdt, IteratorType::ENUMERATED, IteratorDensity::SPARSE>() && right_it->idx() < left_column_row_count;
right_it != right_input_data_cend && right_it->idx() < left_column_row_count;
++right_it) {
if(f(left_accessor.at(right_it->idx()), right_it->value())) {
inserter = right_it->idx();
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/column_store/test/test_memory_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ TEST(MemSegment, StdFindIf) {
auto num_rows = 100u;
auto frame_wrapper = get_test_timeseries_frame("modify", num_rows, 0);
auto &segment = frame_wrapper.segment_;
auto it = std::find_if(std::begin(segment), std::end(segment), [] (SegmentInMemory::Row& row) { return row.template index<TimeseriesIndex>() == 50; });
const auto it = std::find_if(std::begin(segment), std::end(segment), [] (SegmentInMemory::Row& row) { return row.template index<TimeseriesIndex>() == 50; });
auto val_it = it->begin();
ASSERT_EQ(it->index<TimeseriesIndex>(), 50);
std::advance(val_it, 1);
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/pipeline/column_stats.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#include <pipeline/column_stats.hpp>

#include <arcticdb/processing/aggregation_interface.hpp>
#include <arcticdb/processing/aggregation.hpp>
#include <arcticdb/processing/unsorted_aggregation.hpp>
#include <arcticdb/entity/type_utils.hpp>
#include <arcticdb/util/preconditions.hpp>

Expand Down
8 changes: 8 additions & 0 deletions cpp/arcticdb/pipeline/frame_slice.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,14 @@ struct RangesAndKey {
RangesAndKey() = delete;
ARCTICDB_MOVE_COPY_DEFAULT(RangesAndKey)

friend bool operator==(const RangesAndKey& left, const RangesAndKey& right) {
return left.row_range_ == right.row_range_ && left.col_range_ == right.col_range_ && left.key_ == right.key_;
}

bool operator!=(const RangesAndKey& right) const {
return !(*this == right);
}

RowRange row_range_;
ColRange col_range_;
entity::AtomKey key_;
Expand Down
27 changes: 27 additions & 0 deletions cpp/arcticdb/processing/aggregation_utils.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/* Copyright 2024 Man Group Operations Limited
*
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
*/

#include <arcticdb/entity/type_utils.hpp>
#include <arcticdb/processing/aggregation_utils.hpp>

namespace arcticdb {

void add_data_type_impl(entity::DataType data_type, std::optional<entity::DataType>& current_data_type) {
if (current_data_type.has_value()) {
auto common_type = has_valid_common_type(entity::TypeDescriptor(*current_data_type, 0),
entity::TypeDescriptor(data_type, 0));
schema::check<ErrorCode::E_UNSUPPORTED_COLUMN_TYPE>(
common_type.has_value(),
"Cannot perform aggregation on column, incompatible types present: {} and {}",
entity::TypeDescriptor(*current_data_type, 0), entity::TypeDescriptor(data_type, 0));
current_data_type = common_type->data_type();
} else {
current_data_type = data_type;
}
}

} // namespace arcticdb
16 changes: 16 additions & 0 deletions cpp/arcticdb/processing/aggregation_utils.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/* Copyright 2024 Man Group Operations Limited
*
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
*/

#pragma once

#include <arcticdb/entity/types.hpp>

namespace arcticdb {

void add_data_type_impl(entity::DataType data_type, std::optional<entity::DataType>& current_data_type);

} // namespace arcticdb
Loading

0 comments on commit 81ee498

Please sign in to comment.