Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core-clp)!: Migrate archive metadata file format to MessagePack. #700

Merged
merged 14 commits into from
Feb 5, 2025
61 changes: 32 additions & 29 deletions components/core/src/clp/streaming_archive/ArchiveMetadata.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
#include "ArchiveMetadata.hpp"

#include <sys/stat.h>

#include <fmt/core.h>

#include "../Array.hpp"
#include "../FileReader.hpp"

namespace clp::streaming_archive {
ArchiveMetadata::ArchiveMetadata(
archive_format_version_t archive_format_version,
Expand All @@ -8,29 +15,29 @@ ArchiveMetadata::ArchiveMetadata(
)
: m_archive_format_version(archive_format_version),
m_creator_id(std::move(creator_id)),
m_creation_idx(creation_idx) {
if (m_creator_id.length() > UINT16_MAX) {
throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__);
m_creation_idx(creation_idx) {}

auto ArchiveMetadata::create_from_file(std::string_view file_path) -> ArchiveMetadata {
FileReader file_reader{std::string(file_path)};
struct stat file_stat{};
if (auto const clp_rc = file_reader.try_fstat(file_stat);
clp::ErrorCode::ErrorCode_Success != clp_rc)
{
throw OperationFailed(clp_rc, __FILENAME__, __LINE__);
}

clp::Array<char> buf{static_cast<size_t>(file_stat.st_size)};
if (auto const clp_rc = file_reader.try_read_exact_length(buf.data(), buf.size());
clp::ErrorCode::ErrorCode_Success != clp_rc)
{
throw OperationFailed(clp_rc, __FILENAME__, __LINE__);
}
m_creator_id_len = m_creator_id.length();

// NOTE: We set this to the size of this metadata on disk; when adding new members that will be
// written to disk, you must update this
m_compressed_size += sizeof(m_archive_format_version) + sizeof(m_creator_id_len)
+ m_creator_id.length() + sizeof(m_creation_idx)
+ sizeof(m_uncompressed_size) + sizeof(m_begin_timestamp)
+ sizeof(m_end_timestamp) + sizeof(m_compressed_size);
}

ArchiveMetadata::ArchiveMetadata(FileReader& file_reader) {
file_reader.read_numeric_value(m_archive_format_version, false);
file_reader.read_numeric_value(m_creator_id_len, false);
file_reader.read_string(m_creator_id_len, m_creator_id, false);
file_reader.read_numeric_value(m_creation_idx, false);
file_reader.read_numeric_value(m_uncompressed_size, false);
file_reader.read_numeric_value(m_compressed_size, false);
file_reader.read_numeric_value(m_begin_timestamp, false);
file_reader.read_numeric_value(m_end_timestamp, false);
ArchiveMetadata metadata;
msgpack::object_handle const obj_handle = msgpack::unpack(buf.data(), buf.size());
msgpack::object const obj = obj_handle.get();
obj.convert(metadata);
return metadata;
}

void ArchiveMetadata::expand_time_range(epochtime_t begin_timestamp, epochtime_t end_timestamp) {
Expand All @@ -43,13 +50,9 @@ void ArchiveMetadata::expand_time_range(epochtime_t begin_timestamp, epochtime_t
}

void ArchiveMetadata::write_to_file(FileWriter& file_writer) const {
file_writer.write_numeric_value(m_archive_format_version);
file_writer.write_numeric_value(m_creator_id_len);
file_writer.write_string(m_creator_id);
file_writer.write_numeric_value(m_creation_idx);
file_writer.write_numeric_value(m_uncompressed_size + m_dynamic_uncompressed_size);
file_writer.write_numeric_value(m_compressed_size + m_dynamic_uncompressed_size);
file_writer.write_numeric_value(m_begin_timestamp);
file_writer.write_numeric_value(m_end_timestamp);
std::ostringstream buf;
msgpack::pack(buf, *this);
auto const& string_buf = buf.str();
file_writer.write(string_buf.data(), string_buf.size());
Comment on lines +53 to +56
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for the write operation.

The write operation should be checked for errors to ensure the data is written successfully.

Apply this diff to add error handling:

 void ArchiveMetadata::write_to_file(FileWriter& file_writer) const {
     std::ostringstream buf;
     msgpack::pack(buf, *this);
     auto const& string_buf = buf.str();
-    file_writer.write(string_buf.data(), string_buf.size());
+    if (auto const clp_rc = file_writer.try_write(string_buf.data(), string_buf.size());
+        clp::ErrorCode::ErrorCode_Success != clp_rc)
+    {
+        throw OperationFailed(clp_rc, __FILENAME__, __LINE__);
+    }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
std::ostringstream buf;
msgpack::pack(buf, *this);
auto const& string_buf = buf.str();
file_writer.write(string_buf.data(), string_buf.size());
std::ostringstream buf;
msgpack::pack(buf, *this);
auto const& string_buf = buf.str();
if (auto const clp_rc = file_writer.try_write(string_buf.data(), string_buf.size());
clp::ErrorCode::ErrorCode_Success != clp_rc)
{
throw OperationFailed(clp_rc, __FILENAME__, __LINE__);
}

}
} // namespace clp::streaming_archive
39 changes: 33 additions & 6 deletions components/core/src/clp/streaming_archive/ArchiveMetadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
#define CLP_STREAMING_ARCHIVE_ARCHIVEMETADATA_HPP

#include <cstdint>
#include <string_view>

#include <msgpack.hpp>

#include "../Defs.h"
#include "../FileReader.hpp"
#include "../FileWriter.hpp"
#include "Constants.hpp"

Expand All @@ -28,6 +30,10 @@ class ArchiveMetadata {
};

// Constructors
// We need a default constructor to convert from a msgpack::object in `create_from_file`. See
// https://github.com/msgpack/msgpack-c/wiki/v2_0_cpp_adaptor
ArchiveMetadata() = default;
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Constructs a metadata object with the given parameters
* @param archive_format_version
Expand All @@ -40,13 +46,20 @@ class ArchiveMetadata {
uint64_t creation_idx
);

// Methods
/**
* Constructs a metadata object and initializes it from the given file reader
* @param file_reader
* Reads serialized MessagePack data from a file and unpacks it into an `ArchiveMetadata`
* instance.
*
* @param file_path
* @return The created instance.
* @throw `ArchiveMetadata::OperationFailed` if `stat` fails or the file couldn't be read.
* @throw `msgpack::unpack_error` if the data cannot be unpacked into a MessagePack object.
* @throw `msgpack::type_error` if the MessagePack object can't be converted to an
* `ArchiveMetadata` instance.
*/
explicit ArchiveMetadata(FileReader& file_reader);
[[nodiscard]] static auto create_from_file(std::string_view file_path) -> ArchiveMetadata;

// Methods
[[nodiscard]] auto get_archive_format_version() const { return m_archive_format_version; }

[[nodiscard]] auto get_creator_id() const -> std::string const& { return m_creator_id; }
Expand Down Expand Up @@ -86,13 +99,27 @@ class ArchiveMetadata {
*/
void expand_time_range(epochtime_t begin_timestamp, epochtime_t end_timestamp);

/**
* Packs this instance into a MessagePack object and writes it to the open file.
*
* @param file_writer
*/
void write_to_file(FileWriter& file_writer) const;

MSGPACK_DEFINE_MAP(
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
MSGPACK_NVP("archive_format_version", m_archive_format_version),
MSGPACK_NVP("creator_id", m_creator_id),
MSGPACK_NVP("creation_idx", m_creation_idx),
MSGPACK_NVP("begin_timestamp", m_begin_timestamp),
MSGPACK_NVP("end_timestamp", m_end_timestamp),
MSGPACK_NVP("uncompressed_size", m_uncompressed_size),
MSGPACK_NVP("compressed_size", m_compressed_size)
);

private:
// Variables
archive_format_version_t m_archive_format_version{cArchiveFormatVersion::Version};
std::string m_creator_id;
uint16_t m_creator_id_len{0};
uint64_t m_creation_idx{0};
epochtime_t m_begin_timestamp{cEpochTimeMax};
epochtime_t m_end_timestamp{cEpochTimeMin};
Expand Down
3 changes: 1 addition & 2 deletions components/core/src/clp/streaming_archive/reader/Archive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ void Archive::open(string const& path) {
string metadata_file_path = path + '/' + cMetadataFileName;
archive_format_version_t format_version{};
try {
FileReader file_reader{metadata_file_path};
ArchiveMetadata const metadata{file_reader};
auto const metadata = ArchiveMetadata::create_from_file(metadata_file_path);
format_version = metadata.get_archive_format_version();
} catch (TraceableException& traceable_exception) {
auto error_code = traceable_exception.get_error_code();
Expand Down
Loading