Skip to content

Commit

Permalink
Merge pull request #2915 from tclinken/move-optimizations
Browse files Browse the repository at this point in the history
Avoid unnecessary copies in PromiseStream
  • Loading branch information
alexmiller-apple authored Apr 14, 2020
2 parents de9ab9d + 3a01d24 commit 2e53c8c
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 3 deletions.
91 changes: 91 additions & 0 deletions fdbrpc/FlowTests.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1360,3 +1360,94 @@ TEST_CASE("/flow/DeterministicRandom/SignedOverflow") {
std::numeric_limits<int64_t>::max() - 1);
return Void();
}

struct Tracker {
int copied;
bool moved;
Tracker(int copied = 0) : moved(false), copied(copied) {}
Tracker(Tracker&& other) : Tracker(other.copied) {
ASSERT(!other.moved);
other.moved = true;
}
Tracker& operator=(Tracker&& other) {
ASSERT(!other.moved);
other.moved = true;
this->moved = false;
this->copied = other.copied;
return *this;
}
Tracker(const Tracker& other) : Tracker(other.copied + 1) { ASSERT(!other.moved); }
Tracker& operator=(const Tracker& other) {
ASSERT(!other.moved);
this->moved = false;
this->copied = other.copied + 1;
return *this;
}

ACTOR static Future<Void> listen(FutureStream<Tracker> stream) {
Tracker t = waitNext(stream);
ASSERT(!t.moved);
ASSERT(t.copied == 0);
return Void();
}
};

TEST_CASE("/flow/flow/PromiseStream/move") {
state PromiseStream<Tracker> stream;
{
// This tests the case when a callback is added before
// a movable value is sent
state Future<Void> listener = Tracker::listen(stream.getFuture());
stream.send(Tracker{});
wait(listener);
}

{
// This tests the case when a callback is added before
// a unmovable value is sent
listener = Tracker::listen(stream.getFuture());
Tracker namedTracker;
stream.send(namedTracker);
wait(listener);
}
{
// This tests the case when no callback is added until
// after a movable value is sent
stream.send(Tracker{});
stream.send(Tracker{});
{
Tracker t = waitNext(stream.getFuture());
ASSERT(!t.moved);
ASSERT(t.copied == 0);
}
choose {
when(Tracker t = waitNext(stream.getFuture())) {
ASSERT(!t.moved);
ASSERT(t.copied == 0);
}
}
}
{
// This tests the case when no callback is added until
// after an unmovable value is sent
Tracker namedTracker1;
Tracker namedTracker2;
stream.send(namedTracker1);
stream.send(namedTracker2);
{
Tracker t = waitNext(stream.getFuture());
ASSERT(!t.moved);
// must copy onto queue
ASSERT(t.copied == 1);
}
choose {
when(Tracker t = waitNext(stream.getFuture())) {
ASSERT(!t.moved);
// must copy onto queue
ASSERT(t.copied == 1);
}
}
}

return Void();
}
9 changes: 9 additions & 0 deletions fdbrpc/fdbrpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,15 @@ class RequestStream {
else
queue->send(value);
}

void send(T&& value) const {
if (queue->isRemoteEndpoint()) {
FlowTransport::transport().sendUnreliable(SerializeSource<T>(std::move(value)), getEndpoint(), true);
}
else
queue->send(std::move(value));
}

/*void sendError(const Error& error) const {
ASSERT( !queue->isRemoteEndpoint() );
queue->sendError(error);
Expand Down
2 changes: 1 addition & 1 deletion fdbserver/MasterProxyServer.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ ACTOR Future<Void> commitBatcher(ProxyCommitData *commitData, PromiseStream<std:
}

if((batchBytes + bytes > CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT || req.firstInBatch()) && batch.size()) {
out.send({ batch, batchBytes });
out.send({ std::move(batch), batchBytes });
lastBatch = now();
timeout = delayJittered(commitData->commitBatchInterval, TaskPriority::ProxyCommitBatcher);
batch = std::vector<CommitTransactionRequest>();
Expand Down
2 changes: 1 addition & 1 deletion flow/actorcompiler/ActorCompiler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ void CompileStatement(ChooseStatement stmt, Context cx)
returnType = "void",
formalParameters = new string[] {
ch.CallbackTypeInStateClass + "*",
ch.Stmt.wait.result.type + " value"
ch.Stmt.wait.result.type + " const& value"
},
endIsUnreachable = true
};
Expand Down
5 changes: 4 additions & 1 deletion flow/flow.h
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ struct NotifiedQueue : private SingleCallback<T>, FastAllocated<NotifiedQueue<T>
if (error.isValid()) throw error;
throw internal_error();
}
auto copy = queue.front();
auto copy = std::move(queue.front());
queue.pop();
return copy;
}
Expand Down Expand Up @@ -908,6 +908,9 @@ class PromiseStream {
void send(const T& value) const {
queue->send(value);
}
void send(T&& value) const {
queue->send(std::move(value));
}
void sendError(const Error& error) const {
queue->sendError(error);
}
Expand Down

0 comments on commit 2e53c8c

Please sign in to comment.