Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snapshot diff merge operations #142

Merged
merged 12 commits into from
Sep 28, 2021
8 changes: 8 additions & 0 deletions include/faabric/util/bytes.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include <string>
#include <vector>

#include <faabric/util/macros.h>

namespace faabric::util {
std::vector<uint8_t> stringToBytes(const std::string& str);

Expand Down Expand Up @@ -42,4 +44,10 @@ size_t readBytesOf(const std::vector<uint8_t>& container,
std::copy_n(container.data() + offset, sizeof(T), outStart);
return offset + sizeof(T);
}

template<typename T>
std::vector<uint8_t> valueToBytes(T val)
{
return std::vector(BYTES(&val), BYTES(&val) + sizeof(T));
}
}
46 changes: 44 additions & 2 deletions include/faabric/util/snapshot.h
Original file line number Diff line number Diff line change
@@ -1,15 +1,47 @@
#pragma once

#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <vector>

namespace faabric::util {

struct SnapshotDiff
enum SnapshotDataType
{
Raw,
Int
};

enum SnapshotMergeOperation
{
Overwrite,
Sum,
Product,
Subtract,
Max,
Min
};

struct SnapshotMergeRegion
{
uint32_t offset = 0;
size_t length = 0;
SnapshotDataType dataType = SnapshotDataType::Raw;
SnapshotMergeOperation operation = SnapshotMergeOperation::Overwrite;
};

class SnapshotDiff
{
public:
uint32_t offset = 0;
size_t size = 0;
const uint8_t* data = nullptr;
SnapshotDataType dataType = SnapshotDataType::Raw;
SnapshotMergeOperation operation = SnapshotMergeOperation::Overwrite;

SnapshotDiff() = default;

SnapshotDiff(uint32_t offsetIn, const uint8_t* dataIn, size_t sizeIn)
{
Expand All @@ -26,11 +58,21 @@ class SnapshotData
uint8_t* data = nullptr;
int fd = 0;

SnapshotData() = default;

std::vector<SnapshotDiff> getDirtyPages();

std::vector<SnapshotDiff> getChangeDiffs(const uint8_t* updated,
size_t updatedSize);

void applyDiff(size_t diffOffset, const uint8_t* diffData, size_t diffLen);
void addMergeRegion(uint32_t offset,
size_t length,
SnapshotDataType dataType,
SnapshotMergeOperation operation);

private:
// Note - we care about the order of this map, as we iterate through it in
// order of offsets
std::map<uint32_t, SnapshotMergeRegion> mergeRegions;
};
}
2 changes: 2 additions & 0 deletions src/flat/faabric.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ table SnapshotDeleteRequest {

table SnapshotDiffChunk {
offset:int;
dataType:int;
mergeOp:int;
data:[ubyte];
}

Expand Down
4 changes: 3 additions & 1 deletion src/snapshot/SnapshotClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ void SnapshotClient::pushSnapshotDiffs(
std::vector<flatbuffers::Offset<SnapshotDiffChunk>> diffsFbVector;
for (const auto& d : diffs) {
auto dataOffset = mb.CreateVector<uint8_t>(d.data, d.size);
auto chunk = CreateSnapshotDiffChunk(mb, d.offset, dataOffset);

auto chunk = CreateSnapshotDiffChunk(
mb, d.offset, d.dataType, d.operation, dataOffset);
diffsFbVector.push_back(chunk);
}

Expand Down
60 changes: 58 additions & 2 deletions src/snapshot/SnapshotServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <faabric/transport/macros.h>
#include <faabric/util/func.h>
#include <faabric/util/logging.h>
#include <faabric/util/snapshot.h>

#include <sys/mman.h>

Expand Down Expand Up @@ -117,9 +118,64 @@ SnapshotServer::recvPushSnapshotDiffs(const uint8_t* buffer, size_t bufferSize)
faabric::snapshot::getSnapshotRegistry();
faabric::util::SnapshotData& snap = reg.getSnapshot(r->key()->str());

// Copy diffs to snapshot
// Apply diffs to snapshot
for (const auto* r : *r->chunks()) {
snap.applyDiff(r->offset(), r->data()->data(), r->data()->size());
uint8_t* dest = snap.data + r->offset();
switch (r->dataType()) {
case (faabric::util::SnapshotDataType::Raw): {
switch (r->mergeOp()) {
case (faabric::util::SnapshotMergeOperation::Overwrite): {
std::memcpy(dest, r->data()->data(), r->data()->size());
break;
}
default: {
SPDLOG_ERROR("Unsupported raw merge operation: {}",
r->mergeOp());
throw std::runtime_error(
"Unsupported raw merge operation");
}
}
break;
}
case (faabric::util::SnapshotDataType::Int): {
const auto* value =
reinterpret_cast<const int32_t*>(r->data()->data());
auto* destValue = reinterpret_cast<int32_t*>(dest);
switch (r->mergeOp()) {
case (faabric::util::SnapshotMergeOperation::Sum): {
*destValue += *value;
break;
}
case (faabric::util::SnapshotMergeOperation::Subtract): {
*destValue -= *value;
break;
}
case (faabric::util::SnapshotMergeOperation::Product): {
*destValue *= *value;
break;
}
case (faabric::util::SnapshotMergeOperation::Min): {
*destValue = std::min(*destValue, *value);
break;
}
case (faabric::util::SnapshotMergeOperation::Max): {
*destValue = std::max(*destValue, *value);
break;
}
default: {
SPDLOG_ERROR("Unsupported int merge operation: {}",
r->mergeOp());
throw std::runtime_error(
"Unsupported int merge operation");
}
}
break;
}
default: {
SPDLOG_ERROR("Unsupported data type: {}", r->dataType());
throw std::runtime_error("Unsupported merge data type");
}
}
}

// Send response
Expand Down
128 changes: 119 additions & 9 deletions src/util/snapshot.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
#include <faabric/util/locks.h>
#include <faabric/util/logging.h>
#include <faabric/util/macros.h>
#include <faabric/util/memory.h>
#include <faabric/util/snapshot.h>

namespace faabric::util {

// TODO - this would be better as an instance variable on the SnapshotData
// class, but it can't be copy-constructed.
static std::mutex snapMx;

std::vector<SnapshotDiff> SnapshotData::getDirtyPages()
{
if (data == nullptr || size == 0) {
Expand Down Expand Up @@ -31,27 +37,126 @@ std::vector<SnapshotDiff> SnapshotData::getDirtyPages()
std::vector<SnapshotDiff> SnapshotData::getChangeDiffs(const uint8_t* updated,
size_t updatedSize)
{
// Work out which pages have changed in the comparison
// Work out which pages have changed
size_t nThisPages = getRequiredHostPages(size);
std::vector<int> dirtyPageNumbers =
getDirtyPageNumbers(updated, nThisPages);

// Get iterator over merge regions
std::map<uint32_t, SnapshotMergeRegion>::iterator mergeIt =
mergeRegions.begin();

// Get byte-wise diffs _within_ the dirty pages
//
// NOTE - this will cause diffs to be split across pages if they hit a page
// boundary, but we can be relatively confident that variables will be
// page-aligned so this shouldn't be a problem
//
// For each byte we encounter have the following possible scenarios:
//
// 1. the byte is dirty, and is the start of a new diff
// 2. the byte is dirty, but the byte before was also dirty, so we
// are inside a diff
// 3. the byte is not dirty but the previous one was, so we've reached the
// end of a diff
// 4. the last byte of the page is dirty, so we've also come to the end of
// a diff
// 5. the byte is dirty, but is within a special merge region, in which
// case we need to add a diff for that whole region, then skip
// to the next byte after that region
std::vector<SnapshotDiff> diffs;
for (int i : dirtyPageNumbers) {
int pageOffset = i * HOST_PAGE_SIZE;

// Iterate through each byte of the page
bool diffInProgress = false;
int diffStart = 0;
int offset = pageOffset;
for (int b = 0; b < HOST_PAGE_SIZE; b++) {
offset = pageOffset + b;
bool isDirtyByte = *(data + offset) != *(updated + offset);
if (isDirtyByte && !diffInProgress) {

bool isInMergeRegion =
mergeIt != mergeRegions.end() &&
offset >= mergeIt->second.offset &&
offset < (mergeIt->second.offset + mergeIt->second.length);

if (isDirtyByte && isInMergeRegion) {
SnapshotMergeRegion region = mergeIt->second;

// Set up the diff
const uint8_t* updatedValue = updated + region.offset;
const uint8_t* originalValue = data + region.offset;

SnapshotDiff diff(region.offset, updatedValue, region.length);
diff.dataType = region.dataType;
diff.operation = region.operation;

// Modify diff data for certain operations
switch (region.dataType) {
case (SnapshotDataType::Int): {
int originalInt =
*(reinterpret_cast<const int*>(originalValue));
int updatedInt =
*(reinterpret_cast<const int*>(updatedValue));

switch (region.operation) {
case (SnapshotMergeOperation::Sum): {
// Sums must send the value to be _added_, and
// not the final result
updatedInt -= originalInt;
break;
}
case (SnapshotMergeOperation::Subtract): {
// Subtractions must send the value to be
// subtracted, not the result
updatedInt = originalInt - updatedInt;
break;
}
case (SnapshotMergeOperation::Product): {
// Products must send the value to be
// multiplied, not the result
updatedInt /= originalInt;
break;
}
case (SnapshotMergeOperation::Max):
case (SnapshotMergeOperation::Min):
// Min and max don't need to change
break;
default: {
SPDLOG_ERROR(
"Unhandled integer merge operation: {}",
region.operation);
throw std::runtime_error(
"Unhandled integer merge operation");
}
}

// TODO - somehow avoid casting away the const here?
// Modify the memory in-place here
std::memcpy((uint8_t*)updatedValue,
BYTES(&updatedInt),
sizeof(int32_t));

break;
}
default: {
SPDLOG_ERROR("Merge region for unhandled data type: {}",
region.dataType);
throw std::runtime_error(
"Merge region for unhandled data type");
}
}

// Add the diff to the list
diffs.emplace_back(diff);

// Bump the loop variable to the end of this region (note that
// the loop itself will increment onto the next)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, I think that offset + length should point to the first byte after the region, and then the for increment would skip one byte.

A test that would check this would consist of two consecutive regions with ints that need to be added. If I am not mistaken that test should now fail.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, great spot on both of these comments. There was an off-by-one error that meant that the second of two adjacent merge regions would be lost if both had changed. Have added the relevant -1s and a test.

b = (region.offset - pageOffset) + (region.length - 1);

// Move onto the next merge region
++mergeIt;
} else if (isDirtyByte && !diffInProgress) {
// Diff starts here if it's different and diff not in progress
diffInProgress = true;
diffStart = offset;
Expand Down Expand Up @@ -81,12 +186,17 @@ std::vector<SnapshotDiff> SnapshotData::getChangeDiffs(const uint8_t* updated,
return diffs;
}

void SnapshotData::applyDiff(size_t diffOffset,
const uint8_t* diffData,
size_t diffLen)
void SnapshotData::addMergeRegion(uint32_t offset,
size_t length,
SnapshotDataType dataType,
SnapshotMergeOperation operation)
{
uint8_t* dest = data + diffOffset;
std::memcpy(dest, diffData, diffLen);
SnapshotMergeRegion region{ .offset = offset,
.length = length,
.dataType = dataType,
.operation = operation };
// Locking as this may be called in bursts by multiple threads
faabric::util::UniqueLock lock(snapMx);
mergeRegions[offset] = region;
}

}
Loading