Skip to content

Commit

Permalink
Merge pull request #2913 from alexmiller-apple/reduce-txn-state-store…
Browse files Browse the repository at this point in the history
…-size

Replace UID with Tag in keyServers to reduce transaction state store size
  • Loading branch information
xumengpanda authored Apr 14, 2020
2 parents c851ee4 + c8ab69c commit de9ab9d
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 35 deletions.
4 changes: 2 additions & 2 deletions cmake/AddFdbTest.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ function(create_test_package)
set(tar_file ${CMAKE_BINARY_DIR}/packages/correctness-${CMAKE_PROJECT_VERSION}.tar.gz)
add_custom_command(
OUTPUT ${tar_file}
DEPENDS ${out_files}
DEPENDS ${out_files} ${CMAKE_BINARY_DIR}/packages/bin/fdbserver
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_SOURCE_DIR}/contrib/Joshua/scripts/correctnessTest.sh ${CMAKE_BINARY_DIR}/packages/joshua_test
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_SOURCE_DIR}/contrib/Joshua/scripts/correctnessTimeout.sh ${CMAKE_BINARY_DIR}/packages/joshua_timeout
COMMAND ${CMAKE_COMMAND} -E tar cfz ${tar_file} ${CMAKE_BINARY_DIR}/packages/bin/fdbserver
Expand All @@ -209,7 +209,7 @@ function(create_test_package)
set(tar_file ${CMAKE_BINARY_DIR}/packages/valgrind-${CMAKE_PROJECT_VERSION}.tar.gz)
add_custom_command(
OUTPUT ${tar_file}
DEPENDS ${out_files}
DEPENDS ${out_files} ${CMAKE_BINARY_DIR}/packages/bin/fdbserver
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_SOURCE_DIR}/contrib/Joshua/scripts/valgrindTest.sh ${CMAKE_BINARY_DIR}/packages/joshua_test
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_SOURCE_DIR}/contrib/Joshua/scripts/valgrindTimeout.sh ${CMAKE_BINARY_DIR}/packages/joshua_timeout
COMMAND ${CMAKE_COMMAND} -E tar cfz ${tar_file} ${CMAKE_BINARY_DIR}/packages/bin/fdbserver
Expand Down
4 changes: 3 additions & 1 deletion fdbclient/NativeAPI.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2233,14 +2233,16 @@ ACTOR Future<Standalone<VectorRef<const char*>>> getAddressesForKeyActor(Key key
// If key >= allKeys.end, then getRange will return a kv-pair with an empty value. This will result in our serverInterfaces vector being empty, which will cause us to return an empty addresses list.

state Key ksKey = keyServersKey(key);
state Standalone<RangeResultRef> serverTagResult = wait( getRange(cx, ver, lastLessOrEqual(serverTagKeys.begin), firstGreaterThan(serverTagKeys.end), GetRangeLimits(CLIENT_KNOBS->TOO_MANY), false, info ) );
ASSERT( !serverTagResult.more && serverTagResult.size() < CLIENT_KNOBS->TOO_MANY );
Future<Standalone<RangeResultRef>> futureServerUids = getRange(cx, ver, lastLessOrEqual(ksKey), firstGreaterThan(ksKey), GetRangeLimits(1), false, info);
Standalone<RangeResultRef> serverUids = wait( futureServerUids );

ASSERT( serverUids.size() ); // every shard needs to have a team

vector<UID> src;
vector<UID> ignore; // 'ignore' is so named because it is the vector into which we decode the 'dest' servers in the case where this key is being relocated. But 'src' is the canonical location until the move is finished, because it could be cancelled at any time.
decodeKeyServersValue(serverUids[0].value, src, ignore);
decodeKeyServersValue(serverTagResult, serverUids[0].value, src, ignore);
Optional<vector<StorageServerInterface>> serverInterfaces = wait( transactionalGetServerInterfaces(ver, cx, info, src) );

ASSERT( serverInterfaces.present() ); // since this is happening transactionally, /FF/keyServers and /FF/serverList need to be consistent with one another
Expand Down
65 changes: 57 additions & 8 deletions fdbclient/SystemData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include "fdbclient/SystemData.h"
#include "fdbclient/StorageServerInterface.h"
#include "flow/TDMetric.actor.h"
#include "fdbclient/NativeAPI.actor.h"


const KeyRef systemKeysPrefix = LiteralStringRef("\xff");
const KeyRangeRef normalKeys(KeyRef(), systemKeysPrefix);
Expand All @@ -43,20 +45,67 @@ const Key keyServersKey( const KeyRef& k ) {
const KeyRef keyServersKey( const KeyRef& k, Arena& arena ) {
return k.withPrefix( keyServersPrefix, arena );
}
const Value keyServersValue( const vector<UID>& src, const vector<UID>& dest ) {
const Value keyServersValue( Standalone<RangeResultRef> result, const std::vector<UID>& src, const std::vector<UID>& dest ) {
std::vector<Tag> srcTag;
std::vector<Tag> destTag;

for (const KeyValueRef kv : result) {
UID uid = decodeServerTagKey(kv.key);
if (std::find(src.begin(), src.end(), uid) != src.end()) {
srcTag.push_back( decodeServerTagValue(kv.value) );
}
if (std::find(dest.begin(), dest.end(), uid) != dest.end()) {
destTag.push_back( decodeServerTagValue(kv.value) );
}
}

return keyServersValue(srcTag, destTag);
}
const Value keyServersValue( const std::vector<Tag>& srcTag, const std::vector<Tag>& destTag ) {
// src and dest are expected to be sorted
ASSERT( std::is_sorted(src.begin(), src.end()) && std::is_sorted(dest.begin(), dest.end()) );
BinaryWriter wr((IncludeVersion())); wr << src << dest;
BinaryWriter wr(IncludeVersion()); wr << srcTag << destTag;
return wr.toValue();
}
void decodeKeyServersValue( const ValueRef& value, vector<UID>& src, vector<UID>& dest ) {
if (value.size()) {
BinaryReader rd(value, IncludeVersion());
rd >> src >> dest;
} else {

void decodeKeyServersValue( Standalone<RangeResultRef> result, const ValueRef& value,
std::vector<UID>& src, std::vector<UID>& dest ) {
if (value.size() == 0) {
src.clear();
dest.clear();
return;
}

BinaryReader rd(value, IncludeVersion());
rd.checkpoint();
int srcLen, destLen;
rd >> srcLen;
rd.readBytes(srcLen * sizeof(Tag));
rd >> destLen;
rd.rewind();

if (value.size() != sizeof(ProtocolVersion) + sizeof(int) + srcLen * sizeof(Tag) + sizeof(int) + destLen * sizeof(Tag)) {
rd >> src >> dest;
rd.assertEnd();
return;
}

std::vector<Tag> srcTag, destTag;
rd >> srcTag >> destTag;

src.clear();
dest.clear();

for (const KeyValueRef kv : result) {
Tag tag = decodeServerTagValue(kv.value);
if (std::find(srcTag.begin(), srcTag.end(), tag) != srcTag.end()) {
src.push_back( decodeServerTagKey(kv.key) );
}
if (std::find(destTag.begin(), destTag.end(), tag) != destTag.end()) {
dest.push_back( decodeServerTagKey(kv.key) );
}
}
std::sort(src.begin(), src.end());
std::sort(dest.begin(), dest.end());
}

const KeyRef conflictingKeysPrefix = LiteralStringRef("/transaction/conflicting_keys/");
Expand Down
26 changes: 19 additions & 7 deletions fdbclient/SystemData.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
#include "fdbclient/StorageServerInterface.h"
#include "fdbclient/RestoreWorkerInterface.actor.h"

// Don't warn on constants being defined in this file.
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunused-variable"

struct RestoreLoaderInterface;
struct RestoreApplierInterface;
struct RestoreMasterInterface;
Expand All @@ -39,23 +43,28 @@ extern const KeyRangeRef allKeys; // '' to systemKeys.end
extern const KeyRangeRef specialKeys; // [FF][FF] to [FF][FF][FF][FF]
extern const KeyRef afterAllKeys;

// "\xff/keyServers/[[begin]]" := "[[vector<serverID>, vector<serverID>]]"
// "\xff/keyServers/[[begin]]" := "[[vector<serverID>, vector<serverID>]|[vector<Tag>, vector<Tag>]]"
extern const KeyRangeRef keyServersKeys, keyServersKeyServersKeys;
extern const KeyRef keyServersPrefix, keyServersEnd, keyServersKeyServersKey;
const Key keyServersKey( const KeyRef& k );
const KeyRef keyServersKey( const KeyRef& k, Arena& arena );
const Value keyServersValue(
const vector<UID>& src,
const vector<UID>& dest = vector<UID>() );
void decodeKeyServersValue( const ValueRef& value,
vector<UID>& src, vector<UID>& dest );
Standalone<RangeResultRef> result,
const std::vector<UID>& src,
const std::vector<UID>& dest = std::vector<UID>() );
const Value keyServersValue(
const std::vector<Tag>& srcTag,
const std::vector<Tag>& destTag = std::vector<Tag>());
// `result` must be the full result of getting serverTagKeys
void decodeKeyServersValue( Standalone<RangeResultRef> result, const ValueRef& value,
std::vector<UID>& src, std::vector<UID>& dest );

// "\xff/storageCache/[[begin]]" := "[[vector<uint16_t>]]"
extern const KeyRangeRef storageCacheKeys;
extern const KeyRef storageCachePrefix;
const Key storageCacheKey( const KeyRef& k );
const Value storageCacheValue( const vector<uint16_t>& serverIndices );
void decodeStorageCacheValue( const ValueRef& value, vector<uint16_t>& serverIndices );
const Value storageCacheValue( const std::vector<uint16_t>& serverIndices );
void decodeStorageCacheValue( const ValueRef& value, std::vector<uint16_t>& serverIndices );

// "\xff/serverKeys/[[serverID]]/[[begin]]" := "" | "1" | "2"
extern const KeyRef serverKeysPrefix;
Expand All @@ -82,6 +91,7 @@ extern const KeyRef cacheChangePrefix;
const Key cacheChangeKeyFor( uint16_t idx );
uint16_t cacheChangeKeyDecodeIndex( const KeyRef& key );

// "\xff/serverTag/[[serverID]]" = "[[Tag]]"
extern const KeyRangeRef serverTagKeys;
extern const KeyRef serverTagPrefix;
extern const KeyRangeRef serverTagMaxKeys;
Expand Down Expand Up @@ -366,4 +376,6 @@ std::pair<Key,Version> decodeHealthyZoneValue( ValueRef const& );
// Used to create artifically large txnStateStore instances in testing.
extern const KeyRangeRef testOnlyTxnStateStorePrefixRange;

#pragma clang diagnostic pop

#endif
5 changes: 4 additions & 1 deletion fdbserver/ApplyMetadataMutation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<MutationRe
KeyRef end = keyInfo->rangeContaining(k).end();
KeyRangeRef insertRange(k,end);
vector<UID> src, dest;
decodeKeyServersValue(m.param2, src, dest);
// txnStateStore is always an in-memory KVS, and must always be recovered before
// applyMetadataMutations is called, so a wait here should never be needed.
Future<Standalone<RangeResultRef>> fResult = txnStateStore->readRange(serverTagKeys);
decodeKeyServersValue(fResult.get(), m.param2, src, dest);

ASSERT(storageCache);
ServerCacheInfo info;
Expand Down
4 changes: 3 additions & 1 deletion fdbserver/DataDistribution.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,8 @@ ACTOR Future<Reference<InitialDataDistribution>> getInitialDataDistribution( Dat
try {
tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE);
wait(checkMoveKeysLockReadOnly(&tr, moveKeysLock));
state Standalone<RangeResultRef> UIDtoTagMap = wait(tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY));
ASSERT( !UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY );
Standalone<RangeResultRef> keyServers = wait(krmGetRanges(&tr, keyServersPrefix, KeyRangeRef(beginKey, allKeys.end), SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT, SERVER_KNOBS->MOVE_KEYS_KRM_LIMIT_BYTES));
succeeded = true;

Expand All @@ -482,7 +484,7 @@ ACTOR Future<Reference<InitialDataDistribution>> getInitialDataDistribution( Dat
// for each range
for(int i = 0; i < keyServers.size() - 1; i++) {
DDShardInfo info( keyServers[i].key );
decodeKeyServersValue( keyServers[i].value, src, dest );
decodeKeyServersValue( UIDtoTagMap, keyServers[i].value, src, dest );
if(remoteDcIds.size()) {
auto srcIter = team_cache.find(src);
if(srcIter == team_cache.end()) {
Expand Down
4 changes: 3 additions & 1 deletion fdbserver/DataDistributionQueue.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -546,14 +546,16 @@ struct DDQueueData {
servers.clear();
tr.setOption( FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE );
try {
state Standalone<RangeResultRef> UIDtoTagMap = wait( tr.getRange( serverTagKeys, CLIENT_KNOBS->TOO_MANY ) );
ASSERT( !UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY );
Standalone<RangeResultRef> keyServersEntries = wait(
tr.getRange( lastLessOrEqual( keyServersKey( input.keys.begin ) ),
firstGreaterOrEqual( keyServersKey( input.keys.end ) ), SERVER_KNOBS->DD_QUEUE_MAX_KEY_SERVERS ) );

if(keyServersEntries.size() < SERVER_KNOBS->DD_QUEUE_MAX_KEY_SERVERS) {
for( int shard = 0; shard < keyServersEntries.size(); shard++ ) {
vector<UID> src, dest;
decodeKeyServersValue( keyServersEntries[shard].value, src, dest );
decodeKeyServersValue( UIDtoTagMap, keyServersEntries[shard].value, src, dest );
ASSERT( src.size() );
for( int i = 0; i < src.size(); i++ ) {
servers.insert( src[i] );
Expand Down
3 changes: 2 additions & 1 deletion fdbserver/MasterProxyServer.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1912,6 +1912,7 @@ ACTOR Future<Void> masterProxyServerCore(
state KeyRange txnKeys = allKeys;
loop {
wait(yield());
Standalone<RangeResultRef> UIDtoTagMap = commitData.txnStateStore->readRange( serverTagKeys ).get();
Standalone<RangeResultRef> data = commitData.txnStateStore->readRange(txnKeys, SERVER_KNOBS->BUGGIFIED_ROW_LIMIT, SERVER_KNOBS->APPLY_MUTATION_BYTES).get();
if(!data.size()) break;
((KeyRangeRef&)txnKeys) = KeyRangeRef( keyAfter(data.back().key, txnKeys.arena()), txnKeys.end );
Expand All @@ -1924,7 +1925,7 @@ ACTOR Future<Void> masterProxyServerCore(
if( kv.key.startsWith(keyServersPrefix) ) {
KeyRef k = kv.key.removePrefix(keyServersPrefix);
if(k != allKeys.end) {
decodeKeyServersValue(kv.value, src, dest);
decodeKeyServersValue(UIDtoTagMap, kv.value, src, dest);
info.tags.clear();
info.src_info.clear();
info.dest_info.clear();
Expand Down
Loading

0 comments on commit de9ab9d

Please sign in to comment.