Skip to content

Commit

Permalink
Merge branch 'branch-24.02' of https://github.com/rapidsai/cudf into …
Browse files Browse the repository at this point in the history
…fea-write_orc-version-and-code
  • Loading branch information
vuule committed Dec 6, 2023
2 parents 5d69f6b + 42b533f commit 0552dd1
Show file tree
Hide file tree
Showing 21 changed files with 998 additions and 283 deletions.
137 changes: 137 additions & 0 deletions CHANGELOG.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion conda/environments/all_cuda-118_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ dependencies:
- numpy>=1.21,<1.25
- numpydoc
- nvcc_linux-64=11.8
- nvcomp==3.0.4
- nvcomp==3.0.5
- nvtx>=0.2.1
- packaging
- pandas>=1.3,<1.6.0dev0
Expand Down
2 changes: 1 addition & 1 deletion conda/environments/all_cuda-120_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ dependencies:
- numba>=0.57,<0.58
- numpy>=1.21,<1.25
- numpydoc
- nvcomp==3.0.4
- nvcomp==3.0.5
- nvtx>=0.2.1
- packaging
- pandas>=1.3,<1.6.0dev0
Expand Down
2 changes: 1 addition & 1 deletion conda/recipes/libcudf/conda_build_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ spdlog_version:
- ">=1.12.0,<1.13"

nvcomp_version:
- "=3.0.4"
- "=3.0.5"

zlib_version:
- ">=1.2.13"
Expand Down
8 changes: 5 additions & 3 deletions cpp/benchmarks/text/subword.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
#include <benchmarks/synchronization/synchronization.hpp>

#include <cudf_test/column_wrapper.hpp>
#include <cudf_test/file_utilities.hpp>

#include <cudf/strings/strings_column_view.hpp>

#include <nvtext/subword_tokenize.hpp>

#include <filesystem>
Expand All @@ -29,8 +31,8 @@

static std::string create_hash_vocab_file()
{
std::string dir_template{std::filesystem::temp_directory_path().string()};
if (char const* env_p = std::getenv("WORKSPACE")) dir_template = env_p;
static temp_directory const subword_tmpdir{"cudf_gbench"};
auto dir_template = subword_tmpdir.path();
std::string hash_file = dir_template + "/hash_vocab.txt";
// create a fake hashed vocab text file for this test
// this only works with words in the strings in the benchmark code below
Expand All @@ -57,7 +59,7 @@ static void BM_subword_tokenizer(benchmark::State& state)
auto const nrows = static_cast<cudf::size_type>(state.range(0));
std::vector<char const*> h_strings(nrows, "This is a test ");
cudf::test::strings_column_wrapper strings(h_strings.begin(), h_strings.end());
std::string hash_file = create_hash_vocab_file();
static std::string hash_file = create_hash_vocab_file();
std::vector<uint32_t> offsets{14};
uint32_t max_sequence_length = 64;
uint32_t stride = 48;
Expand Down
15 changes: 10 additions & 5 deletions cpp/src/copying/contiguous_split.cu
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,11 @@ struct dst_offset_output_iterator {

dst_offset_output_iterator operator+ __host__ __device__(int i) { return {c + i}; }

void operator++ __host__ __device__() { c++; }
dst_offset_output_iterator& operator++ __host__ __device__()
{
c++;
return *this;
}

reference operator[] __device__(int i) { return dereference(c + i); }
reference operator* __device__() { return dereference(c); }
Expand All @@ -873,13 +877,14 @@ struct dst_valid_count_output_iterator {
using reference = size_type&;
using iterator_category = thrust::output_device_iterator_tag;

dst_valid_count_output_iterator operator+ __host__ __device__(int i)
dst_valid_count_output_iterator operator+ __host__ __device__(int i) { return {c + i}; }

dst_valid_count_output_iterator& operator++ __host__ __device__()
{
return dst_valid_count_output_iterator{c + i};
c++;
return *this;
}

void operator++ __host__ __device__() { c++; }

reference operator[] __device__(int i) { return dereference(c + i); }
reference operator* __device__() { return dereference(c); }

Expand Down
31 changes: 26 additions & 5 deletions cpp/src/io/parquet/compact_protocol_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ class parquet_field_union_struct : public parquet_field {
inline bool operator()(CompactProtocolReader* cpr, int field_type)
{
T v;
bool const res = parquet_field_struct<T>(field(), v).operator()(cpr, field_type);
bool const res = parquet_field_struct<T>{field(), v}(cpr, field_type);
if (!res) {
val = v;
enum_val = static_cast<E>(field());
Expand Down Expand Up @@ -424,7 +424,7 @@ class parquet_field_optional : public parquet_field {
inline bool operator()(CompactProtocolReader* cpr, int field_type)
{
T v;
bool const res = FieldFunctor(field(), v).operator()(cpr, field_type);
bool const res = FieldFunctor{field(), v}(cpr, field_type);
if (!res) { val = v; }
return res;
}
Expand Down Expand Up @@ -631,6 +631,8 @@ bool CompactProtocolReader::read(ColumnChunk* c)

bool CompactProtocolReader::read(ColumnChunkMetaData* c)
{
using optional_size_statistics =
parquet_field_optional<SizeStatistics, parquet_field_struct<SizeStatistics>>;
auto op = std::make_tuple(parquet_field_enum<Type>(1, c->type),
parquet_field_enum_list(2, c->encodings),
parquet_field_string_list(3, c->path_in_schema),
Expand All @@ -641,7 +643,8 @@ bool CompactProtocolReader::read(ColumnChunkMetaData* c)
parquet_field_int64(9, c->data_page_offset),
parquet_field_int64(10, c->index_page_offset),
parquet_field_int64(11, c->dictionary_page_offset),
parquet_field_struct(12, c->statistics));
parquet_field_struct(12, c->statistics),
optional_size_statistics(16, c->size_statistics));
return function_builder(this, op);
}

Expand Down Expand Up @@ -700,17 +703,35 @@ bool CompactProtocolReader::read(PageLocation* p)

bool CompactProtocolReader::read(OffsetIndex* o)
{
auto op = std::make_tuple(parquet_field_struct_list(1, o->page_locations));
using optional_list_i64 = parquet_field_optional<std::vector<int64_t>, parquet_field_int64_list>;

auto op = std::make_tuple(parquet_field_struct_list(1, o->page_locations),
optional_list_i64(2, o->unencoded_byte_array_data_bytes));
return function_builder(this, op);
}

bool CompactProtocolReader::read(SizeStatistics* s)
{
using optional_i64 = parquet_field_optional<int64_t, parquet_field_int64>;
using optional_list_i64 = parquet_field_optional<std::vector<int64_t>, parquet_field_int64_list>;

auto op = std::make_tuple(optional_i64(1, s->unencoded_byte_array_data_bytes),
optional_list_i64(2, s->repetition_level_histogram),
optional_list_i64(3, s->definition_level_histogram));
return function_builder(this, op);
}

bool CompactProtocolReader::read(ColumnIndex* c)
{
using optional_list_i64 = parquet_field_optional<std::vector<int64_t>, parquet_field_int64_list>;

auto op = std::make_tuple(parquet_field_bool_list(1, c->null_pages),
parquet_field_binary_list(2, c->min_values),
parquet_field_binary_list(3, c->max_values),
parquet_field_enum<BoundaryOrder>(4, c->boundary_order),
parquet_field_int64_list(5, c->null_counts));
parquet_field_int64_list(5, c->null_counts),
optional_list_i64(6, c->repetition_level_histogram),
optional_list_i64(7, c->definition_level_histogram));
return function_builder(this, op);
}

Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/compact_protocol_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class CompactProtocolReader {
bool read(KeyValue* k);
bool read(PageLocation* p);
bool read(OffsetIndex* o);
bool read(SizeStatistics* s);
bool read(ColumnIndex* c);
bool read(Statistics* s);
bool read(ColumnOrder* c);
Expand Down
38 changes: 35 additions & 3 deletions cpp/src/io/parquet/compact_protocol_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ size_t CompactProtocolWriter::write(ColumnChunkMetaData const& s)
if (s.index_page_offset != 0) { c.field_int(10, s.index_page_offset); }
if (s.dictionary_page_offset != 0) { c.field_int(11, s.dictionary_page_offset); }
c.field_struct(12, s.statistics);
if (s.size_statistics.has_value()) { c.field_struct(16, s.size_statistics.value()); }
return c.value();
}

Expand Down Expand Up @@ -210,6 +211,24 @@ size_t CompactProtocolWriter::write(OffsetIndex const& s)
{
CompactProtocolFieldWriter c(*this);
c.field_struct_list(1, s.page_locations);
if (s.unencoded_byte_array_data_bytes.has_value()) {
c.field_int_list(2, s.unencoded_byte_array_data_bytes.value());
}
return c.value();
}

size_t CompactProtocolWriter::write(SizeStatistics const& s)
{
CompactProtocolFieldWriter c(*this);
if (s.unencoded_byte_array_data_bytes.has_value()) {
c.field_int(1, s.unencoded_byte_array_data_bytes.value());
}
if (s.repetition_level_histogram.has_value()) {
c.field_int_list(2, s.repetition_level_histogram.value());
}
if (s.definition_level_histogram.has_value()) {
c.field_int_list(3, s.definition_level_histogram.value());
}
return c.value();
}

Expand Down Expand Up @@ -286,13 +305,26 @@ inline void CompactProtocolFieldWriter::field_int(int field, int64_t val)
current_field_value = field;
}

template <>
inline void CompactProtocolFieldWriter::field_int_list<int64_t>(int field,
std::vector<int64_t> const& val)
{
put_field_header(field, current_field_value, ST_FLD_LIST);
put_byte(static_cast<uint8_t>((std::min(val.size(), 0xfUL) << 4) | ST_FLD_I64));
if (val.size() >= 0xfUL) { put_uint(val.size()); }
for (auto const v : val) {
put_int(v);
}
current_field_value = field;
}

template <typename Enum>
inline void CompactProtocolFieldWriter::field_int_list(int field, std::vector<Enum> const& val)
{
put_field_header(field, current_field_value, ST_FLD_LIST);
put_byte((uint8_t)((std::min(val.size(), (size_t)0xfu) << 4) | ST_FLD_I32));
if (val.size() >= 0xf) put_uint(val.size());
for (auto& v : val) {
put_byte(static_cast<uint8_t>((std::min(val.size(), 0xfUL) << 4) | ST_FLD_I32));
if (val.size() >= 0xfUL) { put_uint(val.size()); }
for (auto const& v : val) {
put_int(static_cast<int32_t>(v));
}
current_field_value = field;
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/io/parquet/compact_protocol_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class CompactProtocolWriter {
size_t write(Statistics const&);
size_t write(PageLocation const&);
size_t write(OffsetIndex const&);
size_t write(SizeStatistics const&);
size_t write(ColumnOrder const&);

protected:
Expand Down Expand Up @@ -113,4 +114,8 @@ class CompactProtocolFieldWriter {
inline void set_current_field(int const& field);
};

template <>
inline void CompactProtocolFieldWriter::field_int_list<int64_t>(int field,
std::vector<int64_t> const& val);

} // namespace cudf::io::parquet::detail
49 changes: 25 additions & 24 deletions cpp/src/io/parquet/delta_enc.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@ constexpr int buffer_size = 2 * block_size;
static_assert(block_size % 128 == 0);
static_assert(values_per_mini_block % 32 == 0);

using block_reduce = cub::BlockReduce<zigzag128_t, block_size>;
using warp_reduce = cub::WarpReduce<uleb128_t>;
using index_scan = cub::BlockScan<size_type, block_size>;

constexpr int rolling_idx(int index) { return rolling_index<buffer_size>(index); }

// Version of bit packer that can handle up to 64 bits values.
Expand Down Expand Up @@ -128,9 +124,15 @@ inline __device__ void bitpack_mini_block(
// Object used to turn a stream of integers into a DELTA_BINARY_PACKED stream. This takes as input
// 128 values with validity at a time, saving them until there are enough values for a block
// to be written.
// T is the input data type (either zigzag128_t or uleb128_t).
// T is the input data type (either int32_t or int64_t).
template <typename T>
class delta_binary_packer {
public:
using U = std::make_unsigned_t<T>;
using block_reduce = cub::BlockReduce<T, delta::block_size>;
using warp_reduce = cub::WarpReduce<U>;
using index_scan = cub::BlockScan<size_type, delta::block_size>;

private:
uint8_t* _dst; // sink to dump encoded values to
T* _buffer; // buffer to store values to be encoded
Expand All @@ -140,9 +142,9 @@ class delta_binary_packer {
uint8_t _mb_bits[delta::num_mini_blocks]; // bitwidth for each mini-block

// pointers to shared scratch memory for the warp and block scans/reduces
delta::index_scan::TempStorage* _scan_tmp;
delta::warp_reduce::TempStorage* _warp_tmp;
delta::block_reduce::TempStorage* _block_tmp;
index_scan::TempStorage* _scan_tmp;
typename warp_reduce::TempStorage* _warp_tmp;
typename block_reduce::TempStorage* _block_tmp;

void* _bitpack_tmp; // pointer to shared scratch memory used in bitpacking

Expand All @@ -164,9 +166,9 @@ class delta_binary_packer {
}

// Signed subtraction with defined wrapping behavior.
inline __device__ zigzag128_t subtract(zigzag128_t a, zigzag128_t b)
inline __device__ T subtract(T a, T b)
{
return static_cast<zigzag128_t>(static_cast<uleb128_t>(a) - static_cast<uleb128_t>(b));
return static_cast<T>(static_cast<U>(a) - static_cast<U>(b));
}

public:
Expand All @@ -178,9 +180,9 @@ class delta_binary_packer {
_dst = dest;
_num_values = num_values;
_buffer = buffer;
_scan_tmp = reinterpret_cast<delta::index_scan::TempStorage*>(temp_storage);
_warp_tmp = reinterpret_cast<delta::warp_reduce::TempStorage*>(temp_storage);
_block_tmp = reinterpret_cast<delta::block_reduce::TempStorage*>(temp_storage);
_scan_tmp = reinterpret_cast<index_scan::TempStorage*>(temp_storage);
_warp_tmp = reinterpret_cast<typename warp_reduce::TempStorage*>(temp_storage);
_block_tmp = reinterpret_cast<typename block_reduce::TempStorage*>(temp_storage);
_bitpack_tmp = _buffer + delta::buffer_size;
_current_idx = 0;
_values_in_buffer = 0;
Expand All @@ -193,7 +195,7 @@ class delta_binary_packer {
size_type const valid = is_valid;
size_type pos;
size_type num_valid;
delta::index_scan(*_scan_tmp).ExclusiveSum(valid, pos, num_valid);
index_scan(*_scan_tmp).ExclusiveSum(valid, pos, num_valid);

if (is_valid) { _buffer[delta::rolling_idx(pos + _current_idx + _values_in_buffer)] = value; }
__syncthreads();
Expand All @@ -216,7 +218,7 @@ class delta_binary_packer {
inline __device__ uint8_t const* flush()
{
using cudf::detail::warp_size;
__shared__ zigzag128_t block_min;
__shared__ T block_min;

int const t = threadIdx.x;
int const warp_id = t / warp_size;
Expand All @@ -225,27 +227,26 @@ class delta_binary_packer {
if (_values_in_buffer <= 0) { return _dst; }

// Calculate delta for this thread.
size_type const idx = _current_idx + t;
zigzag128_t const delta = idx < _num_values ? subtract(_buffer[delta::rolling_idx(idx)],
_buffer[delta::rolling_idx(idx - 1)])
: std::numeric_limits<zigzag128_t>::max();
size_type const idx = _current_idx + t;
T const delta = idx < _num_values ? subtract(_buffer[delta::rolling_idx(idx)],
_buffer[delta::rolling_idx(idx - 1)])
: std::numeric_limits<T>::max();

// Find min delta for the block.
auto const min_delta = delta::block_reduce(*_block_tmp).Reduce(delta, cub::Min());
auto const min_delta = block_reduce(*_block_tmp).Reduce(delta, cub::Min());

if (t == 0) { block_min = min_delta; }
__syncthreads();

// Compute frame of reference for the block.
uleb128_t const norm_delta = idx < _num_values ? subtract(delta, block_min) : 0;
U const norm_delta = idx < _num_values ? subtract(delta, block_min) : 0;

// Get max normalized delta for each warp, and use that to determine how many bits to use
// for the bitpacking of this warp.
zigzag128_t const warp_max =
delta::warp_reduce(_warp_tmp[warp_id]).Reduce(norm_delta, cub::Max());
U const warp_max = warp_reduce(_warp_tmp[warp_id]).Reduce(norm_delta, cub::Max());
__syncwarp();

if (lane_id == 0) { _mb_bits[warp_id] = sizeof(zigzag128_t) * 8 - __clzll(warp_max); }
if (lane_id == 0) { _mb_bits[warp_id] = sizeof(long long) * 8 - __clzll(warp_max); }
__syncthreads();

// write block header
Expand Down
Loading

0 comments on commit 0552dd1

Please sign in to comment.