diff --git a/include/nng/supplemental/nanolib/parquet.h b/include/nng/supplemental/nanolib/parquet.h index 7aa006576..f06ebc6f1 100644 --- a/include/nng/supplemental/nanolib/parquet.h +++ b/include/nng/supplemental/nanolib/parquet.h @@ -66,6 +66,8 @@ parquet_data_packet *parquet_find_data_packet(conf_parquet *conf, char *filename parquet_data_packet **parquet_find_data_packets(conf_parquet *conf, char **filenames, uint64_t *keys, uint32_t len); +parquet_data_packet **parquet_find_data_span_packets(conf_parquet *conf, uint64_t start_key, uint64_t end_key, uint32_t *size); + #ifdef __cplusplus } #endif diff --git a/src/supplemental/nanolib/parquet/parquet.cc b/src/supplemental/nanolib/parquet/parquet.cc index 132934b7d..c8376c6ae 100644 --- a/src/supplemental/nanolib/parquet/parquet.cc +++ b/src/supplemental/nanolib/parquet/parquet.cc @@ -726,6 +726,7 @@ parquet_write_launcher(conf_parquet *conf) static void get_range(const char *name, uint64_t range[2]) { + //{prefix}_{md5}-{start_key}~{end_key}.parquet const char *start = strrchr(name, '-'); sscanf(start, "-%ld~%ld.parquet", &range[0], &range[1]); return; @@ -943,9 +944,8 @@ get_keys_indexes( int16_t definition_level; int16_t repetition_level; - // FIXME: + int index = 0; for (const auto &key : keys) { - int i = 0; bool found = false; while (int64_reader->HasNext()) { int64_t value; @@ -954,16 +954,12 @@ get_keys_indexes( &repetition_level, &value, &values_read); if (1 == rows_read && 1 == values_read) { if (((uint64_t) value) == key) { - if (index_vector.empty()) { - index_vector.push_back(i); - } else { - index_vector.push_back(i); - } + index_vector.push_back(index++); found = true; break; } } - i++; + index++; } if (!found) { index_vector.push_back(-1); @@ -1175,3 +1171,160 @@ parquet_find_data_packets( return packets; } + +static vector +parquet_read_span(conf_parquet *conf, const char *filename, uint64_t keys[2]) +{ + conf = g_conf; + vector ret_vec; + std::string path_int64 = "key"; + std::string path_str = "data"; + parquet::ReaderProperties reader_properties = + parquet::default_reader_properties(); + + parquet_read_set_property(reader_properties, conf); + vector index_vector(2); + + // Create a ParquetReader instance + std::string exception_msg = ""; + try { + std::unique_ptr parquet_reader = + parquet::ParquetFileReader::OpenFile( + filename, false, reader_properties); + + // Get the File MetaData + std::shared_ptr file_metadata = + parquet_reader->metadata(); + + int num_row_groups = + file_metadata + ->num_row_groups(); // Get the number of RowGroups + int num_columns = + file_metadata->num_columns(); // Get the number of Columns + assert(num_row_groups == 1); + assert(num_columns == 2); + + for (int r = 0; r < num_row_groups; ++r) { + + std::shared_ptr + row_group_reader = parquet_reader->RowGroup( + r); // Get the RowGroup Reader + int64_t values_read = 0; + int64_t rows_read = 0; + int16_t definition_level; + std::shared_ptr column_reader; + + column_reader = row_group_reader->Column(0); + parquet::Int64Reader *int64_reader = + static_cast( + column_reader.get()); + + index_vector = get_keys_indexes( + int64_reader, vector(keys, keys + 2)); + if (-1 == index_vector[0] || -1 == index_vector[1]) { + ret_vec.push_back(NULL); + return ret_vec; + } + // Get the Column Reader for the ByteArray column + column_reader = row_group_reader->Column(1); + auto ba_reader = + dynamic_pointer_cast( + column_reader); + + if (ba_reader->HasNext()) { + ba_reader->Skip(index_vector[0] - 1); + } + + if (ba_reader->HasNext()) { + int64_t batch_size = + index_vector[1] - index_vector[0] + 1; + std::vector values( + batch_size); + parquet::ByteArray value; + rows_read = ba_reader->ReadBatch(batch_size, + &definition_level, nullptr, values.data(), + &values_read); + if (batch_size == rows_read && + batch_size == values_read) { + for (int64_t b = 0; b < batch_size; + b++) { + parquet_data_packet *pack = + (parquet_data_packet *) + malloc(sizeof( + parquet_data_packet)); + pack->data = + (uint8_t *) malloc( + values[b].len * + sizeof(uint8_t)); + memcpy(pack->data, + values[b].ptr, + values[b].len); + pack->size = values[b].len; + ret_vec.push_back(pack); + } + } + } + + } + + } catch (const std::exception &e) { + exception_msg = e.what(); + log_error("exception_msg=[%s]", exception_msg.c_str()); + } + + return ret_vec; +} + +typedef enum { + START_KEY, + END_KEY +} key_type; + +static uint64_t +get_key(const char *filename, key_type type) +{ + uint64_t range[2] = { 0 }; + uint64_t res = 0; + get_range(filename, range); + switch (type) { + case START_KEY: + res = range[0]; + break; + case END_KEY: + res = range[1]; + break; + default: + break; + } + return res; +} + +parquet_data_packet ** +parquet_find_data_span_packets(conf_parquet *conf, uint64_t start_key, uint64_t end_key, uint32_t *size) +{ + vector ret_vec; + parquet_data_packet **packets = NULL; + uint32_t len = 0; + + const char **filenames = parquet_find_span(start_key, end_key, &len); + + for (uint32_t i = 0; i < len; i++) { + end_key = i == 0 ? end_key : get_key(filenames[i], END_KEY); + start_key = i == len - 1 ? start_key + : get_key(filenames[i], START_KEY); + uint64_t keys[2]; + keys[0] = start_key; + keys[1] = end_key; + auto tmp = parquet_read_span(conf, filenames[i], keys); + ret_vec.insert(ret_vec.end(), tmp.begin(), tmp.end()); + } + + if (!ret_vec.empty()) { + packets = (parquet_data_packet **) malloc( + sizeof(parquet_data_packet *) * len); + copy(ret_vec.begin(), ret_vec.end(), packets); + *size = ret_vec.size(); + } + + return packets; +}