Skip to content

Commit

Permalink
Snapshot diff merge operations (#142)
Browse files Browse the repository at this point in the history
* Added beginnings of custom diff merger

* Add merge strategies

* Added logic for handling merge regions

* Added tests for snapshot merge operations

* Add locking

* Tests for client/ server snapshot merge ops

* Extensible test

* Tidying up tests as they are

* More detailed tests for server

* Fix out by one and missing break

* Tests for invalid merges

* Add test for edge cases of snapshots
  • Loading branch information
Shillaker authored Sep 28, 2021
1 parent b558ee2 commit 8c2ddef
Show file tree
Hide file tree
Showing 8 changed files with 812 additions and 14 deletions.
8 changes: 8 additions & 0 deletions include/faabric/util/bytes.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include <string>
#include <vector>

#include <faabric/util/macros.h>

namespace faabric::util {
std::vector<uint8_t> stringToBytes(const std::string& str);

Expand Down Expand Up @@ -42,4 +44,10 @@ size_t readBytesOf(const std::vector<uint8_t>& container,
std::copy_n(container.data() + offset, sizeof(T), outStart);
return offset + sizeof(T);
}

template<typename T>
std::vector<uint8_t> valueToBytes(T val)
{
return std::vector(BYTES(&val), BYTES(&val) + sizeof(T));
}
}
46 changes: 44 additions & 2 deletions include/faabric/util/snapshot.h
Original file line number Diff line number Diff line change
@@ -1,15 +1,47 @@
#pragma once

#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <vector>

namespace faabric::util {

struct SnapshotDiff
enum SnapshotDataType
{
Raw,
Int
};

enum SnapshotMergeOperation
{
Overwrite,
Sum,
Product,
Subtract,
Max,
Min
};

struct SnapshotMergeRegion
{
uint32_t offset = 0;
size_t length = 0;
SnapshotDataType dataType = SnapshotDataType::Raw;
SnapshotMergeOperation operation = SnapshotMergeOperation::Overwrite;
};

class SnapshotDiff
{
public:
uint32_t offset = 0;
size_t size = 0;
const uint8_t* data = nullptr;
SnapshotDataType dataType = SnapshotDataType::Raw;
SnapshotMergeOperation operation = SnapshotMergeOperation::Overwrite;

SnapshotDiff() = default;

SnapshotDiff(uint32_t offsetIn, const uint8_t* dataIn, size_t sizeIn)
{
Expand All @@ -26,11 +58,21 @@ class SnapshotData
uint8_t* data = nullptr;
int fd = 0;

SnapshotData() = default;

std::vector<SnapshotDiff> getDirtyPages();

std::vector<SnapshotDiff> getChangeDiffs(const uint8_t* updated,
size_t updatedSize);

void applyDiff(size_t diffOffset, const uint8_t* diffData, size_t diffLen);
void addMergeRegion(uint32_t offset,
size_t length,
SnapshotDataType dataType,
SnapshotMergeOperation operation);

private:
// Note - we care about the order of this map, as we iterate through it in
// order of offsets
std::map<uint32_t, SnapshotMergeRegion> mergeRegions;
};
}
2 changes: 2 additions & 0 deletions src/flat/faabric.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ table SnapshotDeleteRequest {

table SnapshotDiffChunk {
offset:int;
dataType:int;
mergeOp:int;
data:[ubyte];
}

Expand Down
4 changes: 3 additions & 1 deletion src/snapshot/SnapshotClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ void SnapshotClient::pushSnapshotDiffs(
std::vector<flatbuffers::Offset<SnapshotDiffChunk>> diffsFbVector;
for (const auto& d : diffs) {
auto dataOffset = mb.CreateVector<uint8_t>(d.data, d.size);
auto chunk = CreateSnapshotDiffChunk(mb, d.offset, dataOffset);

auto chunk = CreateSnapshotDiffChunk(
mb, d.offset, d.dataType, d.operation, dataOffset);
diffsFbVector.push_back(chunk);
}

Expand Down
60 changes: 58 additions & 2 deletions src/snapshot/SnapshotServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <faabric/transport/macros.h>
#include <faabric/util/func.h>
#include <faabric/util/logging.h>
#include <faabric/util/snapshot.h>

#include <sys/mman.h>

Expand Down Expand Up @@ -117,9 +118,64 @@ SnapshotServer::recvPushSnapshotDiffs(const uint8_t* buffer, size_t bufferSize)
faabric::snapshot::getSnapshotRegistry();
faabric::util::SnapshotData& snap = reg.getSnapshot(r->key()->str());

// Copy diffs to snapshot
// Apply diffs to snapshot
for (const auto* r : *r->chunks()) {
snap.applyDiff(r->offset(), r->data()->data(), r->data()->size());
uint8_t* dest = snap.data + r->offset();
switch (r->dataType()) {
case (faabric::util::SnapshotDataType::Raw): {
switch (r->mergeOp()) {
case (faabric::util::SnapshotMergeOperation::Overwrite): {
std::memcpy(dest, r->data()->data(), r->data()->size());
break;
}
default: {
SPDLOG_ERROR("Unsupported raw merge operation: {}",
r->mergeOp());
throw std::runtime_error(
"Unsupported raw merge operation");
}
}
break;
}
case (faabric::util::SnapshotDataType::Int): {
const auto* value =
reinterpret_cast<const int32_t*>(r->data()->data());
auto* destValue = reinterpret_cast<int32_t*>(dest);
switch (r->mergeOp()) {
case (faabric::util::SnapshotMergeOperation::Sum): {
*destValue += *value;
break;
}
case (faabric::util::SnapshotMergeOperation::Subtract): {
*destValue -= *value;
break;
}
case (faabric::util::SnapshotMergeOperation::Product): {
*destValue *= *value;
break;
}
case (faabric::util::SnapshotMergeOperation::Min): {
*destValue = std::min(*destValue, *value);
break;
}
case (faabric::util::SnapshotMergeOperation::Max): {
*destValue = std::max(*destValue, *value);
break;
}
default: {
SPDLOG_ERROR("Unsupported int merge operation: {}",
r->mergeOp());
throw std::runtime_error(
"Unsupported int merge operation");
}
}
break;
}
default: {
SPDLOG_ERROR("Unsupported data type: {}", r->dataType());
throw std::runtime_error("Unsupported merge data type");
}
}
}

// Send response
Expand Down
128 changes: 119 additions & 9 deletions src/util/snapshot.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
#include <faabric/util/locks.h>
#include <faabric/util/logging.h>
#include <faabric/util/macros.h>
#include <faabric/util/memory.h>
#include <faabric/util/snapshot.h>

namespace faabric::util {

// TODO - this would be better as an instance variable on the SnapshotData
// class, but it can't be copy-constructed.
static std::mutex snapMx;

std::vector<SnapshotDiff> SnapshotData::getDirtyPages()
{
if (data == nullptr || size == 0) {
Expand Down Expand Up @@ -31,27 +37,126 @@ std::vector<SnapshotDiff> SnapshotData::getDirtyPages()
std::vector<SnapshotDiff> SnapshotData::getChangeDiffs(const uint8_t* updated,
size_t updatedSize)
{
// Work out which pages have changed in the comparison
// Work out which pages have changed
size_t nThisPages = getRequiredHostPages(size);
std::vector<int> dirtyPageNumbers =
getDirtyPageNumbers(updated, nThisPages);

// Get iterator over merge regions
std::map<uint32_t, SnapshotMergeRegion>::iterator mergeIt =
mergeRegions.begin();

// Get byte-wise diffs _within_ the dirty pages
//
// NOTE - this will cause diffs to be split across pages if they hit a page
// boundary, but we can be relatively confident that variables will be
// page-aligned so this shouldn't be a problem
//
// For each byte we encounter have the following possible scenarios:
//
// 1. the byte is dirty, and is the start of a new diff
// 2. the byte is dirty, but the byte before was also dirty, so we
// are inside a diff
// 3. the byte is not dirty but the previous one was, so we've reached the
// end of a diff
// 4. the last byte of the page is dirty, so we've also come to the end of
// a diff
// 5. the byte is dirty, but is within a special merge region, in which
// case we need to add a diff for that whole region, then skip
// to the next byte after that region
std::vector<SnapshotDiff> diffs;
for (int i : dirtyPageNumbers) {
int pageOffset = i * HOST_PAGE_SIZE;

// Iterate through each byte of the page
bool diffInProgress = false;
int diffStart = 0;
int offset = pageOffset;
for (int b = 0; b < HOST_PAGE_SIZE; b++) {
offset = pageOffset + b;
bool isDirtyByte = *(data + offset) != *(updated + offset);
if (isDirtyByte && !diffInProgress) {

bool isInMergeRegion =
mergeIt != mergeRegions.end() &&
offset >= mergeIt->second.offset &&
offset < (mergeIt->second.offset + mergeIt->second.length);

if (isDirtyByte && isInMergeRegion) {
SnapshotMergeRegion region = mergeIt->second;

// Set up the diff
const uint8_t* updatedValue = updated + region.offset;
const uint8_t* originalValue = data + region.offset;

SnapshotDiff diff(region.offset, updatedValue, region.length);
diff.dataType = region.dataType;
diff.operation = region.operation;

// Modify diff data for certain operations
switch (region.dataType) {
case (SnapshotDataType::Int): {
int originalInt =
*(reinterpret_cast<const int*>(originalValue));
int updatedInt =
*(reinterpret_cast<const int*>(updatedValue));

switch (region.operation) {
case (SnapshotMergeOperation::Sum): {
// Sums must send the value to be _added_, and
// not the final result
updatedInt -= originalInt;
break;
}
case (SnapshotMergeOperation::Subtract): {
// Subtractions must send the value to be
// subtracted, not the result
updatedInt = originalInt - updatedInt;
break;
}
case (SnapshotMergeOperation::Product): {
// Products must send the value to be
// multiplied, not the result
updatedInt /= originalInt;
break;
}
case (SnapshotMergeOperation::Max):
case (SnapshotMergeOperation::Min):
// Min and max don't need to change
break;
default: {
SPDLOG_ERROR(
"Unhandled integer merge operation: {}",
region.operation);
throw std::runtime_error(
"Unhandled integer merge operation");
}
}

// TODO - somehow avoid casting away the const here?
// Modify the memory in-place here
std::memcpy((uint8_t*)updatedValue,
BYTES(&updatedInt),
sizeof(int32_t));

break;
}
default: {
SPDLOG_ERROR("Merge region for unhandled data type: {}",
region.dataType);
throw std::runtime_error(
"Merge region for unhandled data type");
}
}

// Add the diff to the list
diffs.emplace_back(diff);

// Bump the loop variable to the end of this region (note that
// the loop itself will increment onto the next)
b = (region.offset - pageOffset) + (region.length - 1);

// Move onto the next merge region
++mergeIt;
} else if (isDirtyByte && !diffInProgress) {
// Diff starts here if it's different and diff not in progress
diffInProgress = true;
diffStart = offset;
Expand Down Expand Up @@ -81,12 +186,17 @@ std::vector<SnapshotDiff> SnapshotData::getChangeDiffs(const uint8_t* updated,
return diffs;
}

void SnapshotData::applyDiff(size_t diffOffset,
const uint8_t* diffData,
size_t diffLen)
void SnapshotData::addMergeRegion(uint32_t offset,
size_t length,
SnapshotDataType dataType,
SnapshotMergeOperation operation)
{
uint8_t* dest = data + diffOffset;
std::memcpy(dest, diffData, diffLen);
SnapshotMergeRegion region{ .offset = offset,
.length = length,
.dataType = dataType,
.operation = operation };
// Locking as this may be called in bursts by multiple threads
faabric::util::UniqueLock lock(snapMx);
mergeRegions[offset] = region;
}

}
Loading

0 comments on commit 8c2ddef

Please sign in to comment.