Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core-clp)!: Migrate archive metadata file format to MessagePack. #700

Merged
merged 14 commits into from
Feb 5, 2025
68 changes: 38 additions & 30 deletions components/core/src/clp/streaming_archive/ArchiveMetadata.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
#include "ArchiveMetadata.hpp"

#include <sys/stat.h>

#include <fmt/core.h>

#include "../Array.hpp"
#include "../FileReader.hpp"

namespace clp::streaming_archive {
ArchiveMetadata::ArchiveMetadata(
archive_format_version_t archive_format_version,
Expand All @@ -8,29 +15,29 @@ ArchiveMetadata::ArchiveMetadata(
)
: m_archive_format_version(archive_format_version),
m_creator_id(std::move(creator_id)),
m_creation_idx(creation_idx) {
if (m_creator_id.length() > UINT16_MAX) {
throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__);
m_creation_idx(creation_idx) {}

auto ArchiveMetadata::create_from_file(std::string_view file_path) -> ArchiveMetadata {
FileReader file_reader{std::string(file_path)};
struct stat file_stat{};
if (auto const clp_rc = file_reader.try_fstat(file_stat);
clp::ErrorCode::ErrorCode_Success != clp_rc)
{
throw OperationFailed(clp_rc, __FILENAME__, __LINE__);
}
m_creator_id_len = m_creator_id.length();

// NOTE: We set this to the size of this metadata on disk; when adding new members that will be
// written to disk, you must update this
m_compressed_size += sizeof(m_archive_format_version) + sizeof(m_creator_id_len)
+ m_creator_id.length() + sizeof(m_creation_idx)
+ sizeof(m_uncompressed_size) + sizeof(m_begin_timestamp)
+ sizeof(m_end_timestamp) + sizeof(m_compressed_size);
}

ArchiveMetadata::ArchiveMetadata(FileReader& file_reader) {
file_reader.read_numeric_value(m_archive_format_version, false);
file_reader.read_numeric_value(m_creator_id_len, false);
file_reader.read_string(m_creator_id_len, m_creator_id, false);
file_reader.read_numeric_value(m_creation_idx, false);
file_reader.read_numeric_value(m_uncompressed_size, false);
file_reader.read_numeric_value(m_compressed_size, false);
file_reader.read_numeric_value(m_begin_timestamp, false);
file_reader.read_numeric_value(m_end_timestamp, false);
clp::Array<char> buf{static_cast<size_t>(file_stat.st_size)};
if (auto const clp_rc = file_reader.try_read_exact_length(buf.data(), buf.size());
clp::ErrorCode::ErrorCode_Success != clp_rc)
{
throw OperationFailed(clp_rc, __FILENAME__, __LINE__);
}

ArchiveMetadata metadata;
msgpack::object_handle const obj_handle = msgpack::unpack(buf.data(), buf.size());
msgpack::object const obj = obj_handle.get();
obj.convert(metadata);
return metadata;
}

void ArchiveMetadata::expand_time_range(epochtime_t begin_timestamp, epochtime_t end_timestamp) {
Expand All @@ -42,14 +49,15 @@ void ArchiveMetadata::expand_time_range(epochtime_t begin_timestamp, epochtime_t
}
}

void ArchiveMetadata::write_to_file(FileWriter& file_writer) const {
file_writer.write_numeric_value(m_archive_format_version);
file_writer.write_numeric_value(m_creator_id_len);
file_writer.write_string(m_creator_id);
file_writer.write_numeric_value(m_creation_idx);
file_writer.write_numeric_value(m_uncompressed_size + m_dynamic_uncompressed_size);
file_writer.write_numeric_value(m_compressed_size + m_dynamic_uncompressed_size);
file_writer.write_numeric_value(m_begin_timestamp);
file_writer.write_numeric_value(m_end_timestamp);
void ArchiveMetadata::write_to_file(FileWriter& file_writer) {
std::ostringstream buf;
msgpack::pack(buf, *this);
auto const& string_buf = buf.str();

size_t previous_metadata_size = m_metadata_size;
m_metadata_size = string_buf.size();
file_writer.write(string_buf.data(), m_metadata_size);

m_compressed_size = m_compressed_size - previous_metadata_size + m_metadata_size;
}
} // namespace clp::streaming_archive
42 changes: 35 additions & 7 deletions components/core/src/clp/streaming_archive/ArchiveMetadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
#define CLP_STREAMING_ARCHIVE_ARCHIVEMETADATA_HPP

#include <cstdint>
#include <string_view>

#include <msgpack.hpp>

#include "../Defs.h"
#include "../FileReader.hpp"
#include "../FileWriter.hpp"
#include "Constants.hpp"

Expand All @@ -28,6 +30,10 @@ class ArchiveMetadata {
};

// Constructors
// We need a default constructor to convert from a msgpack::object in `create_from_file`. See
// https://github.com/msgpack/msgpack-c/wiki/v2_0_cpp_adaptor
ArchiveMetadata() = default;
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Constructs a metadata object with the given parameters
* @param archive_format_version
Expand All @@ -40,13 +46,20 @@ class ArchiveMetadata {
uint64_t creation_idx
);

// Methods
/**
* Constructs a metadata object and initializes it from the given file reader
* @param file_reader
* Reads serialized MessagePack data from a file and unpacks it into an `ArchiveMetadata`
* instance.
*
* @param file_path
* @return The created instance.
* @throw `ArchiveMetadata::OperationFailed` if `stat` fails or the file couldn't be read.
* @throw `msgpack::unpack_error` if the data cannot be unpacked into a MessagePack object.
* @throw `msgpack::type_error` if the MessagePack object can't be converted to an
* `ArchiveMetadata` instance.
*/
explicit ArchiveMetadata(FileReader& file_reader);
[[nodiscard]] static auto create_from_file(std::string_view file_path) -> ArchiveMetadata;

// Methods
[[nodiscard]] auto get_archive_format_version() const { return m_archive_format_version; }

[[nodiscard]] auto get_creator_id() const -> std::string const& { return m_creator_id; }
Expand Down Expand Up @@ -86,13 +99,27 @@ class ArchiveMetadata {
*/
void expand_time_range(epochtime_t begin_timestamp, epochtime_t end_timestamp);

void write_to_file(FileWriter& file_writer) const;
/**
* Packs this instance into a MessagePack object and writes it to the open file.
*
* @param file_writer
*/
void write_to_file(FileWriter& file_writer);

MSGPACK_DEFINE_MAP(
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
MSGPACK_NVP("archive_format_version", m_archive_format_version),
MSGPACK_NVP("creator_id", m_creator_id),
MSGPACK_NVP("creation_idx", m_creation_idx),
MSGPACK_NVP("begin_timestamp", m_begin_timestamp),
MSGPACK_NVP("end_timestamp", m_end_timestamp),
MSGPACK_NVP("uncompressed_size", m_uncompressed_size),
MSGPACK_NVP("compressed_size", m_compressed_size)
);

private:
// Variables
archive_format_version_t m_archive_format_version{cArchiveFormatVersion::Version};
std::string m_creator_id;
uint16_t m_creator_id_len{0};
uint64_t m_creation_idx{0};
epochtime_t m_begin_timestamp{cEpochTimeMax};
epochtime_t m_end_timestamp{cEpochTimeMin};
Expand All @@ -101,6 +128,7 @@ class ArchiveMetadata {
uint64_t m_dynamic_uncompressed_size{0};
// The size of the archive
uint64_t m_compressed_size{0};
uint64_t m_metadata_size{0};
uint64_t m_dynamic_compressed_size{0};
};
} // namespace clp::streaming_archive
Expand Down
3 changes: 1 addition & 2 deletions components/core/src/clp/streaming_archive/reader/Archive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ void Archive::open(string const& path) {
string metadata_file_path = path + '/' + cMetadataFileName;
archive_format_version_t format_version{};
try {
FileReader file_reader{metadata_file_path};
ArchiveMetadata const metadata{file_reader};
auto const metadata = ArchiveMetadata::create_from_file(metadata_file_path);
format_version = metadata.get_archive_format_version();
} catch (TraceableException& traceable_exception) {
auto error_code = traceable_exception.get_error_code();
Expand Down
Loading