Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid unnecessary copies in PromiseStream #2915

Merged
merged 5 commits into from
Apr 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions fdbrpc/FlowTests.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1360,3 +1360,94 @@ TEST_CASE("/flow/DeterministicRandom/SignedOverflow") {
std::numeric_limits<int64_t>::max() - 1);
return Void();
}

struct Tracker {
int copied;
bool moved;
Tracker(int copied = 0) : moved(false), copied(copied) {}
Tracker(Tracker&& other) : Tracker(other.copied) {
ASSERT(!other.moved);
other.moved = true;
}
Tracker& operator=(Tracker&& other) {
ASSERT(!other.moved);
other.moved = true;
this->moved = false;
this->copied = other.copied;
return *this;
}
Tracker(const Tracker& other) : Tracker(other.copied + 1) { ASSERT(!other.moved); }
Tracker& operator=(const Tracker& other) {
ASSERT(!other.moved);
this->moved = false;
this->copied = other.copied + 1;
return *this;
}

ACTOR static Future<Void> listen(FutureStream<Tracker> stream) {
Tracker t = waitNext(stream);
ASSERT(!t.moved);
ASSERT(t.copied == 0);
return Void();
}
};

TEST_CASE("/flow/flow/PromiseStream/move") {
tclinken marked this conversation as resolved.
Show resolved Hide resolved
state PromiseStream<Tracker> stream;
{
// This tests the case when a callback is added before
// a movable value is sent
state Future<Void> listener = Tracker::listen(stream.getFuture());
stream.send(Tracker{});
wait(listener);
}

{
// This tests the case when a callback is added before
// a unmovable value is sent
listener = Tracker::listen(stream.getFuture());
Tracker namedTracker;
stream.send(namedTracker);
wait(listener);
}
{
// This tests the case when no callback is added until
// after a movable value is sent
stream.send(Tracker{});
stream.send(Tracker{});
{
Tracker t = waitNext(stream.getFuture());
ASSERT(!t.moved);
ASSERT(t.copied == 0);
}
choose {
when(Tracker t = waitNext(stream.getFuture())) {
ASSERT(!t.moved);
ASSERT(t.copied == 0);
}
}
}
{
// This tests the case when no callback is added until
// after an unmovable value is sent
Tracker namedTracker1;
Tracker namedTracker2;
stream.send(namedTracker1);
stream.send(namedTracker2);
{
Tracker t = waitNext(stream.getFuture());
ASSERT(!t.moved);
// must copy onto queue
ASSERT(t.copied == 1);
}
choose {
when(Tracker t = waitNext(stream.getFuture())) {
ASSERT(!t.moved);
// must copy onto queue
ASSERT(t.copied == 1);
}
}
}

return Void();
}
9 changes: 9 additions & 0 deletions fdbrpc/fdbrpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,15 @@ class RequestStream {
else
queue->send(value);
}

void send(T&& value) const {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I think that this should work out that template<class T> T&& is a forwarding reference, so you can merge the const T& value and T&& cases into only the T&& case, and then use std::forward<T>(value) instead of std::move(value).

However, it's been... a while since I've had to look this up, so I'm going to merge this PR anyway, and if it turns out that we can shave some code off later, we can do that in a second PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I think that this should work out that template<class T> T&& is a forwarding reference

I think since T is not a template type parameter of send, value is not a forwarding reference. Here's an example from forwarding reference

template<class T> struct A {
    template<class U>
    A(T&& x, U&& y, int* p); // x is not a forwarding reference: T is not a
                             // type template parameter of the constructor,
                             // but y is a forwarding reference
};

if (queue->isRemoteEndpoint()) {
FlowTransport::transport().sendUnreliable(SerializeSource<T>(std::move(value)), getEndpoint(), true);
}
else
queue->send(std::move(value));
}

/*void sendError(const Error& error) const {
ASSERT( !queue->isRemoteEndpoint() );
queue->sendError(error);
Expand Down
2 changes: 1 addition & 1 deletion fdbserver/MasterProxyServer.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ ACTOR Future<Void> commitBatcher(ProxyCommitData *commitData, PromiseStream<std:
}

if((batchBytes + bytes > CLIENT_KNOBS->TRANSACTION_SIZE_LIMIT || req.firstInBatch()) && batch.size()) {
out.send({ batch, batchBytes });
out.send({ std::move(batch), batchBytes });
lastBatch = now();
timeout = delayJittered(commitData->commitBatchInterval, TaskPriority::ProxyCommitBatcher);
batch = std::vector<CommitTransactionRequest>();
Expand Down
2 changes: 1 addition & 1 deletion flow/actorcompiler/ActorCompiler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ void CompileStatement(ChooseStatement stmt, Context cx)
returnType = "void",
formalParameters = new string[] {
ch.CallbackTypeInStateClass + "*",
ch.Stmt.wait.result.type + " value"
ch.Stmt.wait.result.type + " const& value"
},
endIsUnreachable = true
};
Expand Down
5 changes: 4 additions & 1 deletion flow/flow.h
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ struct NotifiedQueue : private SingleCallback<T>, FastAllocated<NotifiedQueue<T>
if (error.isValid()) throw error;
throw internal_error();
}
auto copy = queue.front();
auto copy = std::move(queue.front());
queue.pop();
return copy;
}
Expand Down Expand Up @@ -908,6 +908,9 @@ class PromiseStream {
void send(const T& value) const {
queue->send(value);
}
void send(T&& value) const {
queue->send(std::move(value));
}
void sendError(const Error& error) const {
queue->sendError(error);
}
Expand Down