Skip to content

Commit

Permalink
make column_id a std::optional and simplify the API
Browse files Browse the repository at this point in the history
  • Loading branch information
vuule committed Apr 9, 2021
1 parent 502f782 commit d5abf45
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 23 deletions.
4 changes: 2 additions & 2 deletions cpp/src/io/orc/orc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ void ProtobufReader::read(StripeFooter &s, size_t maxlen)
void ProtobufReader::read(Stream &s, size_t maxlen)
{
auto op = std::make_tuple(make_field_reader(1, s.kind),
make_field_reader(2, s.column_id()),
make_field_reader(2, s.column_id),
make_field_reader(3, s.length));
function_builder(s, maxlen, op);
}
Expand Down Expand Up @@ -319,7 +319,7 @@ size_t ProtobufWriter::write(const Stream &s)
{
ProtobufFieldWriter w(this);
w.field_uint(1, s.kind);
w.field_uint(2, s.column_id());
if (s.column_id) w.field_uint(2, *s.column_id);
w.field_uint(3, s.length);
return w.value();
}
Expand Down
38 changes: 24 additions & 14 deletions cpp/src/io/orc/orc.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,30 +82,21 @@ struct FileFooter {

struct Stream {
StreamKind kind = INVALID_STREAM_KIND;
uint64_t length = 0; // the number of bytes in the file
std::optional<uint32_t> column_id; // ORC column id (different from column index in the table!)
uint64_t length = 0; // the number of bytes in the file

// 'column 0' has id 0, table columns have ids [1,...,n]
Stream(StreamKind kind, uint32_t column_id, uint64_t length=0)
: kind{kind}, length{length}, _column_id{column_id}
Stream(StreamKind kind, uint32_t column_id, uint64_t length = 0)
: kind{kind}, column_id{column_id}, length{length}
{
}
Stream() = default;

// Needs to be a non-const reference because of the `ProtobufReader`
auto &column_id() noexcept { return _column_id; }
auto const &column_id() const noexcept { return _column_id; }

// Returns index of the column in the table, if any
// Stream of the 'column 0' does not have a corresponding column in the table
thrust::optional<uint32_t> column_index() const noexcept
{
return _column_id > 0 ? thrust::optional<uint32_t>{_column_id - 1} : thrust::nullopt;
return column_id.value_or(0) > 0 ? thrust::optional<uint32_t>{*column_id - 1} : thrust::nullopt;
}

private:
// ORC column id (different from column index in the table!)
// Zero means no corresponding column in the table
uint32_t _column_id = 0;
};

struct ColumnEncoding {
Expand Down Expand Up @@ -251,6 +242,15 @@ class ProtobufReader {
return encode_field_number_base<typename T::element_type>(field_number);
}

// optional fields don't change the field number encoding
template <typename T,
typename std::enable_if_t<std::is_same<T, std::optional<typename T::value_type>>::value>
* = nullptr>
int static constexpr encode_field_number(int field_number) noexcept
{
return encode_field_number_base<typename T::value_type>(field_number);
}

uint32_t read_field_size(const uint8_t *end);

template <typename T, typename std::enable_if_t<std::is_integral<T>::value> * = nullptr>
Expand Down Expand Up @@ -303,6 +303,16 @@ class ProtobufReader {
value = std::make_unique<typename T::element_type>(std::move(contained_value));
}

template <typename T,
typename std::enable_if_t<std::is_same<T, std::optional<typename T::value_type>>::value>
* = nullptr>
void read_field(T &value, const uint8_t *end)
{
typename T::value_type contained_value;
read_field(contained_value, end);
value = std::optional<typename T::value_type>{std::move(contained_value)};
}

template <typename T>
auto read_field(T &value, const uint8_t *end) -> decltype(read(value, 0))
{
Expand Down
13 changes: 7 additions & 6 deletions cpp/src/io/orc/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -162,17 +162,18 @@ size_t gather_stream_info(const size_t stripe_index,
uint64_t src_offset = 0;
uint64_t dst_offset = 0;
for (const auto &stream : stripefooter->streams) {
if (stream.column_id() >= orc2gdf.size()) {
if (!stream.column_id || *stream.column_id >= orc2gdf.size()) {
dst_offset += stream.length;
continue;
}

auto col = orc2gdf[stream.column_id()];
auto const column_id = *stream.column_id;
auto col = orc2gdf[column_id];
if (col == -1) {
// A struct-type column has no data itself, but rather child columns
// for each of its fields. There is only a PRESENT stream, which
// needs to be included for the reader.
const auto schema_type = types[stream.column_id()];
const auto schema_type = types[column_id];
if (schema_type.subtypes.size() != 0) {
if (schema_type.kind == orc::STRUCT && stream.kind == orc::PRESENT) {
for (const auto &idx : schema_type.subtypes) {
Expand All @@ -192,16 +193,16 @@ size_t gather_stream_info(const size_t stripe_index,
// NOTE: skip_count field is temporarily used to track index ordering
auto &chunk = chunks[stripe_index * num_columns + col];
const auto idx =
get_index_type_and_pos(stream.kind, chunk.skip_count, col == orc2gdf[stream.column_id()]);
get_index_type_and_pos(stream.kind, chunk.skip_count, col == orc2gdf[column_id]);
if (idx.first < gpu::CI_NUM_STREAMS) {
chunk.strm_id[idx.first] = stream_info.size();
chunk.strm_len[idx.first] = stream.length;
chunk.skip_count = idx.second;

if (idx.first == gpu::CI_DICTIONARY) {
chunk.dictionary_start = *num_dictionary_entries;
chunk.dict_len = stripefooter->columns[stream.column_id()].dictionarySize;
*num_dictionary_entries += stripefooter->columns[stream.column_id()].dictionarySize;
chunk.dict_len = stripefooter->columns[column_id].dictionarySize;
*num_dictionary_entries += stripefooter->columns[column_id].dictionarySize;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/orc/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1243,7 +1243,7 @@ void writer::impl::write(table_view const &table)
ff.types[0].subtypes.resize(num_columns);
ff.types[0].fieldNames.resize(num_columns);
for (auto const &column : orc_columns) {
ff.types[column.id()].kind = column.orc_kind();
ff.types[column.id()].kind = column.orc_kind();
ff.types[0].subtypes[column.index()] = column.id();
ff.types[0].fieldNames[column.index()] = column.orc_name();
}
Expand Down

0 comments on commit d5abf45

Please sign in to comment.