Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet find span support. #979

Merged
merged 5 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/nng/supplemental/nanolib/parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ parquet_data_packet *parquet_find_data_packet(conf_parquet *conf, char *filename

parquet_data_packet **parquet_find_data_packets(conf_parquet *conf, char **filenames, uint64_t *keys, uint32_t len);

parquet_data_packet **parquet_find_data_span_packets(conf_parquet *conf, uint64_t start_key, uint64_t end_key, uint32_t *size);

#ifdef __cplusplus
}
#endif
Expand Down
169 changes: 161 additions & 8 deletions src/supplemental/nanolib/parquet/parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,7 @@ parquet_write_launcher(conf_parquet *conf)
static void
get_range(const char *name, uint64_t range[2])
{
//{prefix}_{md5}-{start_key}~{end_key}.parquet
const char *start = strrchr(name, '-');
sscanf(start, "-%ld~%ld.parquet", &range[0], &range[1]);
return;
Expand Down Expand Up @@ -943,9 +944,8 @@ get_keys_indexes(
int16_t definition_level;
int16_t repetition_level;

// FIXME:
int index = 0;
for (const auto &key : keys) {
int i = 0;
bool found = false;
while (int64_reader->HasNext()) {
int64_t value;
Expand All @@ -954,16 +954,12 @@ get_keys_indexes(
&repetition_level, &value, &values_read);
if (1 == rows_read && 1 == values_read) {
if (((uint64_t) value) == key) {
if (index_vector.empty()) {
index_vector.push_back(i);
} else {
index_vector.push_back(i);
}
index_vector.push_back(index++);
found = true;
break;
}
}
i++;
index++;
}
if (!found) {
index_vector.push_back(-1);
Expand Down Expand Up @@ -1175,3 +1171,160 @@ parquet_find_data_packets(

return packets;
}

static vector<parquet_data_packet *>
parquet_read_span(conf_parquet *conf, const char *filename, uint64_t keys[2])
{
conf = g_conf;
vector<parquet_data_packet *> ret_vec;
std::string path_int64 = "key";
std::string path_str = "data";
parquet::ReaderProperties reader_properties =
parquet::default_reader_properties();

parquet_read_set_property(reader_properties, conf);
vector<int> index_vector(2);

// Create a ParquetReader instance
std::string exception_msg = "";
try {
std::unique_ptr<parquet::ParquetFileReader> parquet_reader =
parquet::ParquetFileReader::OpenFile(
filename, false, reader_properties);

// Get the File MetaData
std::shared_ptr<parquet::FileMetaData> file_metadata =
parquet_reader->metadata();

int num_row_groups =
file_metadata
->num_row_groups(); // Get the number of RowGroups
int num_columns =
file_metadata->num_columns(); // Get the number of Columns
assert(num_row_groups == 1);
assert(num_columns == 2);

for (int r = 0; r < num_row_groups; ++r) {

std::shared_ptr<parquet::RowGroupReader>
row_group_reader = parquet_reader->RowGroup(
r); // Get the RowGroup Reader
int64_t values_read = 0;
int64_t rows_read = 0;
int16_t definition_level;
std::shared_ptr<parquet::ColumnReader> column_reader;

column_reader = row_group_reader->Column(0);
parquet::Int64Reader *int64_reader =
static_cast<parquet::Int64Reader *>(
column_reader.get());

index_vector = get_keys_indexes(
int64_reader, vector<uint64_t>(keys, keys + 2));
if (-1 == index_vector[0] || -1 == index_vector[1]) {
ret_vec.push_back(NULL);
return ret_vec;
}
// Get the Column Reader for the ByteArray column
column_reader = row_group_reader->Column(1);
auto ba_reader =
dynamic_pointer_cast<parquet::ByteArrayReader>(
column_reader);

if (ba_reader->HasNext()) {
ba_reader->Skip(index_vector[0] - 1);
}

if (ba_reader->HasNext()) {
int64_t batch_size =
index_vector[1] - index_vector[0] + 1;
std::vector<parquet::ByteArray> values(
batch_size);
parquet::ByteArray value;
rows_read = ba_reader->ReadBatch(batch_size,
&definition_level, nullptr, values.data(),
&values_read);
if (batch_size == rows_read &&
batch_size == values_read) {
for (int64_t b = 0; b < batch_size;
b++) {
parquet_data_packet *pack =
(parquet_data_packet *)
malloc(sizeof(
parquet_data_packet));
pack->data =
(uint8_t *) malloc(
values[b].len *
sizeof(uint8_t));
memcpy(pack->data,
values[b].ptr,
values[b].len);
pack->size = values[b].len;
ret_vec.push_back(pack);
}
}
}

}

} catch (const std::exception &e) {
exception_msg = e.what();
log_error("exception_msg=[%s]", exception_msg.c_str());
}

return ret_vec;
}

typedef enum {
START_KEY,
END_KEY
} key_type;

static uint64_t
get_key(const char *filename, key_type type)
{
uint64_t range[2] = { 0 };
uint64_t res = 0;
get_range(filename, range);
switch (type) {
case START_KEY:
res = range[0];
break;
case END_KEY:
res = range[1];
break;
default:
break;
}
return res;
}

parquet_data_packet **
parquet_find_data_span_packets(conf_parquet *conf, uint64_t start_key, uint64_t end_key, uint32_t *size)
{
vector<parquet_data_packet *> ret_vec;
parquet_data_packet **packets = NULL;
uint32_t len = 0;

const char **filenames = parquet_find_span(start_key, end_key, &len);

for (uint32_t i = 0; i < len; i++) {
end_key = i == 0 ? end_key : get_key(filenames[i], END_KEY);
start_key = i == len - 1 ? start_key
: get_key(filenames[i], START_KEY);
uint64_t keys[2];
keys[0] = start_key;
keys[1] = end_key;
auto tmp = parquet_read_span(conf, filenames[i], keys);
ret_vec.insert(ret_vec.end(), tmp.begin(), tmp.end());
}

if (!ret_vec.empty()) {
packets = (parquet_data_packet **) malloc(
sizeof(parquet_data_packet *) * len);
copy(ret_vec.begin(), ret_vec.end(), packets);
*size = ret_vec.size();
}

return packets;
}
Loading