-
Notifications
You must be signed in to change notification settings - Fork 74
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(clp-s): Add the write path for single-file archives. #563
Changes from 2 commits
3d4a2e5
402aef3
7792844
45e6ab5
a09ab6b
53434a7
3d51d22
5b67e39
eaa0982
5f774c3
b46d4c1
bec587b
816526e
250f5ba
d0167d4
ef91747
62c43bd
4c73e29
1e2be7e
4c7a50f
c698487
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -11,6 +11,7 @@ void ArchiveWriter::open(ArchiveWriterOption const& option) { | |||||
m_id = boost::uuids::to_string(option.id); | ||||||
m_compression_level = option.compression_level; | ||||||
m_print_archive_stats = option.print_archive_stats; | ||||||
m_single_file_archive = option.single_file_archive; | ||||||
auto archive_path = boost::filesystem::path(option.archives_dir) / m_id; | ||||||
|
||||||
boost::system::error_code boost_error_code; | ||||||
|
@@ -43,13 +44,37 @@ void ArchiveWriter::open(ArchiveWriterOption const& option) { | |||||
} | ||||||
|
||||||
void ArchiveWriter::close() { | ||||||
m_compressed_size += m_var_dict->close(); | ||||||
m_compressed_size += m_log_dict->close(); | ||||||
m_compressed_size += m_array_dict->close(); | ||||||
m_compressed_size += m_timestamp_dict->close(); | ||||||
m_compressed_size += m_schema_tree.store(m_archive_path, m_compression_level); | ||||||
m_compressed_size += m_schema_map.store(m_archive_path, m_compression_level); | ||||||
m_compressed_size += store_tables(); | ||||||
auto var_dict_compressed_size = m_var_dict->close(); | ||||||
auto log_dict_compressed_size = m_log_dict->close(); | ||||||
auto array_dict_compressed_size = m_array_dict->close(); | ||||||
auto timestamp_dict_compressed_size = m_timestamp_dict->close(); | ||||||
auto schema_tree_compressed_size = m_schema_tree.store(m_archive_path, m_compression_level); | ||||||
auto schema_map_compressed_size = m_schema_map.store(m_archive_path, m_compression_level); | ||||||
auto [table_metadata_compressed_size, table_compressed_size] = store_tables(); | ||||||
|
||||||
if (m_single_file_archive) { | ||||||
std::vector<ArchiveFileInfo> files{ | ||||||
{constants::cArchiveSchemaTreeFile, schema_tree_compressed_size}, | ||||||
{constants::cArchiveSchemaMapFile, schema_map_compressed_size}, | ||||||
{constants::cArchiveVarDictFile, var_dict_compressed_size}, | ||||||
{constants::cArchiveLogDictFile, log_dict_compressed_size}, | ||||||
{constants::cArchiveArrayDictFile, array_dict_compressed_size}, | ||||||
{constants::cArchiveTableMetadataFile, table_metadata_compressed_size}, | ||||||
{constants::cArchiveTablesFile, table_compressed_size} | ||||||
}; | ||||||
uint64_t offset = 0; | ||||||
for (auto& file : files) { | ||||||
uint64_t original_size = file.o; | ||||||
file.o = offset; | ||||||
offset += original_size; | ||||||
} | ||||||
write_single_file_archive(files, timestamp_dict_compressed_size); | ||||||
} else { | ||||||
m_compressed_size = var_dict_compressed_size + log_dict_compressed_size | ||||||
+ array_dict_compressed_size + timestamp_dict_compressed_size | ||||||
+ schema_tree_compressed_size + schema_map_compressed_size | ||||||
+ table_metadata_compressed_size + table_compressed_size; | ||||||
} | ||||||
|
||||||
if (m_metadata_db) { | ||||||
update_metadata_db(); | ||||||
|
@@ -67,6 +92,113 @@ void ArchiveWriter::close() { | |||||
m_compressed_size = 0UL; | ||||||
} | ||||||
|
||||||
void ArchiveWriter::write_single_file_archive( | ||||||
std::vector<ArchiveFileInfo> const& files, | ||||||
size_t timestamp_dict_compressed_size | ||||||
) { | ||||||
std::string archive_path = m_archive_path + constants::cArchiveFile; | ||||||
FileWriter archive_writer; | ||||||
archive_writer.open(archive_path, FileWriter::OpenMode::CreateForWriting); | ||||||
|
||||||
write_archive_metadata(archive_writer, files, timestamp_dict_compressed_size); | ||||||
size_t metadata_section_size = archive_writer.get_pos() - sizeof(ArchiveHeader); | ||||||
write_archive_files(archive_writer, files); | ||||||
m_compressed_size = archive_writer.get_pos(); | ||||||
write_archive_header(archive_writer, metadata_section_size); | ||||||
|
||||||
archive_writer.close(); | ||||||
} | ||||||
|
||||||
void ArchiveWriter::write_archive_metadata( | ||||||
FileWriter& archive_writer, | ||||||
std::vector<ArchiveFileInfo> const& files, | ||||||
size_t timestamp_dict_compressed_size | ||||||
) { | ||||||
archive_writer.seek_from_begin(sizeof(ArchiveHeader)); | ||||||
|
||||||
ZstdCompressor compressor; | ||||||
compressor.open(archive_writer, m_compression_level); | ||||||
compressor.write_numeric_value(3); // Number of packets | ||||||
gibber9809 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
// Write archive info | ||||||
ArchiveInfoPacket archive_info{.num_segments = 1}; | ||||||
std::stringstream msgpack_buffer; | ||||||
msgpack::pack(msgpack_buffer, archive_info); | ||||||
std::string archive_info_str = msgpack_buffer.str(); | ||||||
compressor.write_numeric_value(ArchiveMetadataPacketType::ArchiveInfo); | ||||||
compressor.write_numeric_value(archive_info_str.size()); | ||||||
gibber9809 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
compressor.write_string(archive_info_str); | ||||||
|
||||||
// Write archive file info | ||||||
ArchiveFileInfoPacket archive_file_info{.files{files}}; | ||||||
msgpack_buffer.clear(); | ||||||
gibber9809 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
msgpack::pack(msgpack_buffer, archive_file_info); | ||||||
std::string archive_file_info_str = msgpack_buffer.str(); | ||||||
compressor.write_numeric_value(ArchiveMetadataPacketType::ArchiveFileInfo); | ||||||
compressor.write_numeric_value(archive_file_info_str.size()); | ||||||
compressor.write_string(archive_file_info_str); | ||||||
|
||||||
// Write timestamp dictionary | ||||||
compressor.write_numeric_value(ArchiveMetadataPacketType::TimestampDictionary); | ||||||
compressor.write_numeric_value(timestamp_dict_compressed_size); | ||||||
std::string timestamp_dict_path = m_archive_path + constants::cArchiveTimestampDictFile; | ||||||
FileReader timestamp_dict_reader; | ||||||
timestamp_dict_reader.open(timestamp_dict_path); | ||||||
char read_buffer[cReadBlockSize]; | ||||||
while (true) { | ||||||
size_t num_bytes_read{0}; | ||||||
ErrorCode error_code | ||||||
= timestamp_dict_reader.try_read(read_buffer, cReadBlockSize, num_bytes_read); | ||||||
gibber9809 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
if (ErrorCodeSuccess != error_code) { | ||||||
break; | ||||||
gibber9809 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
} | ||||||
compressor.write(read_buffer, num_bytes_read); | ||||||
} | ||||||
timestamp_dict_reader.close(); | ||||||
std::filesystem::remove(timestamp_dict_path); | ||||||
compressor.close(); | ||||||
} | ||||||
|
||||||
void ArchiveWriter::write_archive_files( | ||||||
FileWriter& archive_writer, | ||||||
std::vector<ArchiveFileInfo> const& files | ||||||
) { | ||||||
for (auto const& file : files) { | ||||||
std::string file_path = m_archive_path + file.n; | ||||||
FileReader reader; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we declare it outside the loop? |
||||||
reader.open(file_path); | ||||||
char read_buffer[cReadBlockSize]; | ||||||
while (true) { | ||||||
size_t num_bytes_read{0}; | ||||||
ErrorCode const error_code | ||||||
= reader.try_read(read_buffer, cReadBlockSize, num_bytes_read); | ||||||
if (ErrorCodeSuccess != error_code) { | ||||||
break; | ||||||
gibber9809 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
} | ||||||
archive_writer.write(read_buffer, num_bytes_read); | ||||||
} | ||||||
reader.close(); | ||||||
boost::filesystem::remove(file_path); | ||||||
} | ||||||
} | ||||||
|
||||||
void ArchiveWriter::write_archive_header(FileWriter& archive_writer, size_t metadata_section_size) { | ||||||
ArchiveHeader header{ | ||||||
.magic_number{0}, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Initialize magic number properly The magic number should not be initialized as 0 and then overwritten later. Initialize it directly in the struct: - .magic_number{0},
+ .magic_number = *reinterpret_cast<const uint32_t*>(cStructuredSFAMagicNumber), 📝 Committable suggestion
Suggested change
|
||||||
.version | ||||||
= (cArchiveMajorVersion << 24) | (cArchiveMinorVersion << 16) | cArchivePatchVersion, | ||||||
.uncompressed_size = m_uncompressed_size, | ||||||
.compressed_size = m_compressed_size, | ||||||
.reserved_padding{0}, | ||||||
.metadata_section_size = static_cast<uint32_t>(metadata_section_size), | ||||||
.compression_type = static_cast<uint16_t>(ArchiveCompressionType::Zstd), | ||||||
.padding = 0 | ||||||
}; | ||||||
std::memcpy(&header.magic_number, "ARCHIVES", sizeof(header.magic_number)); | ||||||
gibber9809 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
archive_writer.seek_from_begin(0); | ||||||
archive_writer.write(reinterpret_cast<char const*>(&header), sizeof(header)); | ||||||
} | ||||||
|
||||||
void ArchiveWriter::append_message( | ||||||
int32_t schema_id, | ||||||
Schema const& schema, | ||||||
|
@@ -127,8 +259,7 @@ void ArchiveWriter::initialize_schema_writer(SchemaWriter* writer, Schema const& | |||||
} | ||||||
} | ||||||
|
||||||
size_t ArchiveWriter::store_tables() { | ||||||
size_t compressed_size = 0; | ||||||
std::pair<size_t, size_t> ArchiveWriter::store_tables() { | ||||||
m_tables_file_writer.open( | ||||||
m_archive_path + constants::cArchiveTablesFile, | ||||||
FileWriter::OpenMode::CreateForWriting | ||||||
|
@@ -153,13 +284,13 @@ size_t ArchiveWriter::store_tables() { | |||||
} | ||||||
m_table_metadata_compressor.close(); | ||||||
|
||||||
compressed_size += m_table_metadata_file_writer.get_pos(); | ||||||
compressed_size += m_tables_file_writer.get_pos(); | ||||||
auto table_metadata_compressed_size = m_table_metadata_file_writer.get_pos(); | ||||||
auto table_compressed_size = m_tables_file_writer.get_pos(); | ||||||
|
||||||
m_table_metadata_file_writer.close(); | ||||||
m_tables_file_writer.close(); | ||||||
|
||||||
return compressed_size; | ||||||
return {table_metadata_compressed_size, table_compressed_size}; | ||||||
} | ||||||
|
||||||
void ArchiveWriter::update_metadata_db() { | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
#ifndef CLP_S_ARCHIVEDEFS_HPP | ||
#define CLP_S_ARCHIVEDEFS_HPP | ||
|
||
#include <string> | ||
|
||
#include "msgpack.hpp" | ||
|
||
namespace clp_s { | ||
// define the version | ||
constexpr uint8_t cArchiveMajorVersion = 0; | ||
constexpr uint8_t cArchiveMinorVersion = 2; | ||
constexpr uint16_t cArchivePatchVersion = 0; | ||
|
||
struct ArchiveHeader { | ||
uint8_t magic_number[8]; | ||
gibber9809 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
uint32_t version; | ||
uint64_t uncompressed_size; | ||
uint64_t compressed_size; | ||
uint64_t reserved_padding[4]; | ||
uint32_t metadata_section_size; | ||
uint16_t compression_type; | ||
uint16_t padding; | ||
}; | ||
|
||
enum class ArchiveCompressionType : uint16_t { | ||
Zstd = 0, | ||
}; | ||
|
||
enum struct ArchiveMetadataPacketType : uint8_t { | ||
ArchiveInfo = 0, | ||
ArchiveFileInfo = 1, | ||
TimestampDictionary = 2, | ||
}; | ||
|
||
struct ArchiveInfoPacket { | ||
uint64_t num_segments; | ||
// TODO: Add more fields in the future | ||
|
||
MSGPACK_DEFINE(num_segments); | ||
}; | ||
|
||
struct ArchiveFileInfo { | ||
std::string n; // name | ||
uint64_t o; // offset | ||
|
||
MSGPACK_DEFINE(n, o); | ||
}; | ||
|
||
struct ArchiveFileInfoPacket { | ||
std::vector<ArchiveFileInfo> files; | ||
|
||
MSGPACK_DEFINE(files); | ||
gibber9809 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
}; | ||
} // namespace clp_s | ||
|
||
#endif // CLP_S_ARCHIVEDEFS_HPP |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we intend to have a file for the header+metadata section for the multi-file archive as well, but it seems like we only write this metadata section + header for the single-file archive right now. Could we add this to the multi-file write path as well? It will make the read side much simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per sidebar we will add the header to multi-file archive as part of a later PR so that we don't need to start changing the read side in this PR.