Skip to content

Commit

Permalink
[Arrow] Move ArrowUtil to its own file (#13990)
Browse files Browse the repository at this point in the history
Nothing major, this is just a chore.
For probably over 80% of our classes we have them live in their own
files, when I see `ArrowUtil` I would like to automatically navigate to
the correct file, the current structure breaks my path assumption.
  • Loading branch information
Mytherin authored Sep 19, 2024
2 parents 320a99e + 73adab8 commit 03dd0df
Show file tree
Hide file tree
Showing 13 changed files with 101 additions and 68 deletions.
1 change: 1 addition & 0 deletions src/common/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ add_library_unity(
OBJECT
arrow_appender.cpp
arrow_converter.cpp
arrow_util.cpp
arrow_merge_event.cpp
arrow_query_result.cpp
arrow_wrapper.cpp
Expand Down
1 change: 1 addition & 0 deletions src/common/arrow/arrow_merge_event.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "duckdb/common/arrow/arrow_merge_event.hpp"
#include "duckdb/common/arrow/arrow_util.hpp"
#include "duckdb/storage/storage_info.hpp"

namespace duckdb {
Expand Down
60 changes: 60 additions & 0 deletions src/common/arrow/arrow_util.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#include "duckdb/common/arrow/arrow_util.hpp"
#include "duckdb/common/arrow/arrow_appender.hpp"
#include "duckdb/common/types/data_chunk.hpp"

namespace duckdb {

bool ArrowUtil::TryFetchChunk(ChunkScanState &scan_state, ClientProperties options, idx_t batch_size, ArrowArray *out,
idx_t &count, ErrorData &error) {
count = 0;
ArrowAppender appender(scan_state.Types(), batch_size, std::move(options));
auto remaining_tuples_in_chunk = scan_state.RemainingInChunk();
if (remaining_tuples_in_chunk) {
// We start by scanning the non-finished current chunk
idx_t cur_consumption = MinValue(remaining_tuples_in_chunk, batch_size);
count += cur_consumption;
auto &current_chunk = scan_state.CurrentChunk();
appender.Append(current_chunk, scan_state.CurrentOffset(), scan_state.CurrentOffset() + cur_consumption,
current_chunk.size());
scan_state.IncreaseOffset(cur_consumption);
}
while (count < batch_size) {
if (!scan_state.LoadNextChunk(error)) {
if (scan_state.HasError()) {
error = scan_state.GetError();
}
return false;
}
if (scan_state.ChunkIsEmpty()) {
// The scan was successful, but an empty chunk was returned
break;
}
auto &current_chunk = scan_state.CurrentChunk();
if (scan_state.Finished() || current_chunk.size() == 0) {
break;
}
// The amount we still need to append into this chunk
auto remaining = batch_size - count;

// The amount remaining, capped by the amount left in the current chunk
auto to_append_to_batch = MinValue(remaining, scan_state.RemainingInChunk());
appender.Append(current_chunk, 0, to_append_to_batch, current_chunk.size());
count += to_append_to_batch;
scan_state.IncreaseOffset(to_append_to_batch);
}
if (count > 0) {
*out = appender.Finalize();
}
return true;
}

idx_t ArrowUtil::FetchChunk(ChunkScanState &scan_state, ClientProperties options, idx_t chunk_size, ArrowArray *out) {
ErrorData error;
idx_t result_count;
if (!TryFetchChunk(scan_state, std::move(options), chunk_size, out, result_count, error)) {
error.Throw();
}
return result_count;
}

} // namespace duckdb
54 changes: 1 addition & 53 deletions src/common/arrow/arrow_wrapper.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "duckdb/common/arrow/arrow_wrapper.hpp"
#include "duckdb/common/arrow/arrow_util.hpp"
#include "duckdb/common/arrow/arrow_converter.hpp"

#include "duckdb/common/assert.hpp"
Expand Down Expand Up @@ -176,57 +177,4 @@ ResultArrowArrayStreamWrapper::ResultArrowArrayStreamWrapper(unique_ptr<QueryRes
stream.get_last_error = ResultArrowArrayStreamWrapper::MyStreamGetLastError;
}

bool ArrowUtil::TryFetchChunk(ChunkScanState &scan_state, ClientProperties options, idx_t batch_size, ArrowArray *out,
idx_t &count, ErrorData &error) {
count = 0;
ArrowAppender appender(scan_state.Types(), batch_size, std::move(options));
auto remaining_tuples_in_chunk = scan_state.RemainingInChunk();
if (remaining_tuples_in_chunk) {
// We start by scanning the non-finished current chunk
idx_t cur_consumption = MinValue(remaining_tuples_in_chunk, batch_size);
count += cur_consumption;
auto &current_chunk = scan_state.CurrentChunk();
appender.Append(current_chunk, scan_state.CurrentOffset(), scan_state.CurrentOffset() + cur_consumption,
current_chunk.size());
scan_state.IncreaseOffset(cur_consumption);
}
while (count < batch_size) {
if (!scan_state.LoadNextChunk(error)) {
if (scan_state.HasError()) {
error = scan_state.GetError();
}
return false;
}
if (scan_state.ChunkIsEmpty()) {
// The scan was successful, but an empty chunk was returned
break;
}
auto &current_chunk = scan_state.CurrentChunk();
if (scan_state.Finished() || current_chunk.size() == 0) {
break;
}
// The amount we still need to append into this chunk
auto remaining = batch_size - count;

// The amount remaining, capped by the amount left in the current chunk
auto to_append_to_batch = MinValue(remaining, scan_state.RemainingInChunk());
appender.Append(current_chunk, 0, to_append_to_batch, current_chunk.size());
count += to_append_to_batch;
scan_state.IncreaseOffset(to_append_to_batch);
}
if (count > 0) {
*out = appender.Finalize();
}
return true;
}

idx_t ArrowUtil::FetchChunk(ChunkScanState &scan_state, ClientProperties options, idx_t chunk_size, ArrowArray *out) {
ErrorData error;
idx_t result_count;
if (!TryFetchChunk(scan_state, std::move(options), chunk_size, out, result_count, error)) {
error.Throw();
}
return result_count;
}

} // namespace duckdb
31 changes: 31 additions & 0 deletions src/include/duckdb/common/arrow/arrow_util.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
//===----------------------------------------------------------------------===//
// DuckDB
//
// duckdb/common/arrow/arrow_util.hpp
//
//
//===----------------------------------------------------------------------===//

#pragma once
#include "duckdb/common/arrow/arrow.hpp"
#include "duckdb/main/chunk_scan_state.hpp"
#include "duckdb/main/client_properties.hpp"
#include "duckdb/common/helper.hpp"
#include "duckdb/common/error_data.hpp"

namespace duckdb {

class QueryResult;
class DataChunk;

class ArrowUtil {
public:
static bool TryFetchChunk(ChunkScanState &scan_state, ClientProperties options, idx_t chunk_size, ArrowArray *out,
idx_t &result_count, ErrorData &error);
static idx_t FetchChunk(ChunkScanState &scan_state, ClientProperties options, idx_t chunk_size, ArrowArray *out);

private:
static bool TryFetchNext(QueryResult &result, unique_ptr<DataChunk> &out, ErrorData &error);
};

} // namespace duckdb
14 changes: 0 additions & 14 deletions src/include/duckdb/common/arrow/arrow_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,9 @@
#pragma once
#include "duckdb/common/arrow/arrow.hpp"
#include "duckdb/common/helper.hpp"
#include "duckdb/common/error_data.hpp"
#include "duckdb/main/chunk_scan_state.hpp"
#include "duckdb/main/client_properties.hpp"

//! Here we have the internal duckdb classes that interact with Arrow's Internal Header (i.e., duckdb/commons/arrow.hpp)
namespace duckdb {
class QueryResult;
class DataChunk;

class ArrowSchemaWrapper {
public:
Expand Down Expand Up @@ -59,13 +54,4 @@ class ArrowArrayStreamWrapper {
}
};

class ArrowUtil {
public:
static bool TryFetchChunk(ChunkScanState &scan_state, ClientProperties options, idx_t chunk_size, ArrowArray *out,
idx_t &result_count, ErrorData &error);
static idx_t FetchChunk(ChunkScanState &scan_state, ClientProperties options, idx_t chunk_size, ArrowArray *out);

private:
static bool TryFetchNext(QueryResult &result, unique_ptr<DataChunk> &out, ErrorData &error);
};
} // namespace duckdb
1 change: 1 addition & 0 deletions src/include/duckdb/execution/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "duckdb/common/mutex.hpp"
#include "duckdb/common/pair.hpp"
#include "duckdb/common/reference_map.hpp"
#include "duckdb/main/query_result.hpp"
#include "duckdb/execution/task_error_manager.hpp"
#include "duckdb/parallel/pipeline.hpp"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#pragma once

#include "duckdb/execution/physical_operator.hpp"
#include "duckdb/main/query_result.hpp"
#include "duckdb/common/enums/statement_type.hpp"

namespace duckdb {
Expand Down
1 change: 1 addition & 0 deletions src/include/duckdb/transaction/duck_transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "duckdb/transaction/transaction.hpp"
#include "duckdb/common/reference_map.hpp"
#include "duckdb/common/error_data.hpp"

namespace duckdb {
class CheckpointLock;
Expand Down
1 change: 1 addition & 0 deletions src/include/duckdb/transaction/local_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "duckdb/storage/table/table_index_list.hpp"
#include "duckdb/storage/table/table_statistics.hpp"
#include "duckdb/storage/optimistic_data_writer.hpp"
#include "duckdb/common/error_data.hpp"
#include "duckdb/common/reference_map.hpp"

namespace duckdb {
Expand Down
2 changes: 1 addition & 1 deletion src/include/duckdb/transaction/transaction_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#include "duckdb/common/common.hpp"
#include "duckdb/common/mutex.hpp"
#include "duckdb/common/vector.hpp"

#include "duckdb/common/error_data.hpp"
#include "duckdb/common/atomic.hpp"

namespace duckdb {
Expand Down
1 change: 1 addition & 0 deletions tools/pythonpkg/src/include/duckdb_python/pyresult.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "duckdb_python/numpy/numpy_result_conversion.hpp"
#include "duckdb.hpp"
#include "duckdb/main/chunk_scan_state.hpp"
#include "duckdb_python/pybind11/pybind_wrapper.hpp"
#include "duckdb_python/python_objects.hpp"
#include "duckdb_python/pybind11/dataframe.hpp"
Expand Down
1 change: 1 addition & 0 deletions tools/pythonpkg/src/pyresult.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "duckdb_python/arrow/arrow_array_stream.hpp"
#include "duckdb/common/arrow/arrow.hpp"
#include "duckdb/common/arrow/arrow_util.hpp"
#include "duckdb/common/arrow/arrow_converter.hpp"
#include "duckdb/common/arrow/arrow_wrapper.hpp"
#include "duckdb/common/arrow/result_arrow_wrapper.hpp"
Expand Down

0 comments on commit 03dd0df

Please sign in to comment.