Skip to content

Commit

Permalink
Only update real sent time to span
Browse files Browse the repository at this point in the history
  • Loading branch information
chenBright committed Feb 8, 2025
1 parent 55b73bf commit 824d089
Show file tree
Hide file tree
Showing 9 changed files with 51 additions and 129 deletions.
10 changes: 2 additions & 8 deletions src/brpc/details/method_status.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,22 +149,16 @@ void MethodStatus::SetConcurrencyLimiter(ConcurrencyLimiter* cl) {
_cl.reset(cl);
}

int HandleResponseWritten(bthread_id_t id, void* data, int error_code,
const std::string& error_text) {
int HandleResponseWritten(bthread_id_t id, void* data, int /*error_code*/) {
auto args = static_cast<ResponseWriteInfo*>(data);
args->error_code = error_code;
args->error_text = error_text;
args->sent_us = butil::cpuwide_time_us();
CHECK_EQ(0, bthread_id_unlock_and_destroy(id));
return 0;
}

ConcurrencyRemover::~ConcurrencyRemover() {
if (_status) {
if (_sent_us < _received_us) {
_sent_us = butil::cpuwide_time_us();
}
_status->OnResponded(_c->ErrorCode(), _sent_us - _received_us);
_status->OnResponded(_c->ErrorCode(), butil::cpuwide_time_us() - _received_us);
_status = NULL;
}
ServerPrivateAccessor(_c->server()).RemoveConcurrency(_c);
Expand Down
7 changes: 1 addition & 6 deletions src/brpc/details/method_status.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,27 +76,22 @@ friend class Server;
};

struct ResponseWriteInfo {
int error_code{0};
std::string error_text;
int64_t sent_us{0};
};

int HandleResponseWritten(bthread_id_t id, void* data, int error_code,
const std::string& error_text);
int HandleResponseWritten(bthread_id_t id, void* data, int error_code);

class ConcurrencyRemover {
public:
ConcurrencyRemover(MethodStatus* status, Controller* c, int64_t received_us)
: _status(status) , _c(c) , _received_us(received_us) {}
~ConcurrencyRemover();

void set_sent_us(int64_t sent_us) { _sent_us = sent_us; }
private:
DISALLOW_COPY_AND_ASSIGN(ConcurrencyRemover);
MethodStatus* _status;
Controller* _c;
int64_t _received_us;
int64_t _sent_us{0};
};

inline bool MethodStatus::OnRequested(int* rejected_cc, Controller* cntl) {
Expand Down
58 changes: 14 additions & 44 deletions src/brpc/policy/baidu_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,19 +197,15 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl,
const google::protobuf::Message* req = NULL == messages ? NULL : messages->Request();
const google::protobuf::Message* res = NULL == messages ? NULL : messages->Response();

ResponseWriteInfo args;

// Recycle resources at the end of this function.
BRPC_SCOPE_EXIT {
{
// Remove concurrency and record latency at first.
ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);
concurrency_remover.set_sent_us(args.sent_us);
}

std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);


if (NULL == messages) {
return;
}
Expand Down Expand Up @@ -305,12 +301,13 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl,
}
}

ResponseWriteInfo args;
bthread_id_t response_id = INVALID_BTHREAD_ID;
if (span) {
span->set_response_size(res_buf.size());
CHECK_EQ(0, bthread_id_create(&response_id, &args, HandleResponseWritten));
}

bthread_id_t response_id;
CHECK_EQ(0, bthread_id_create2(&response_id, &args, HandleResponseWritten));
// Send rpc response over stream even if server side failed to create
// stream for some reason.
if (cntl->has_remote_stream()) {
Expand All @@ -328,38 +325,6 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl,
Stream::SetFailed(response_stream_ids, error_code,
"Fail to write into %s",
sock->description().c_str());
}
} else{
// Have the risk of unlimited pending responses, in which case, tell
// users to set max_concurrency.
Socket::WriteOptions wopt;
wopt.id_wait = response_id;
wopt.notify_on_success = true;
wopt.ignore_eovercrowded = true;
if (sock->Write(&res_buf, &wopt) != 0) {
const int errcode = errno;
PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
cntl->SetFailed(errcode, "Fail to write into %s",
sock->description().c_str());
return;
}
}

bthread_id_join(response_id);

error_code = args.error_code;
if (cntl->has_remote_stream()) {
if (0 != error_code) {
LOG_IF(WARNING, error_code != EPIPE)
<< "Fail to write into " << *sock
<< ", error text= " << args.error_text
<< ": " << berror(error_code);
cntl->SetFailed(error_code, "Fail to write into %s: %s",
sock->description().c_str(),
args.error_text.c_str());
Stream::SetFailed(response_stream_ids, error_code,
"Fail to write into %s",
args.error_text.c_str());
return;
}

Expand All @@ -384,17 +349,22 @@ void SendRpcResponse(int64_t correlation_id, Controller* cntl,
} else{
// Have the risk of unlimited pending responses, in which case, tell
// users to set max_concurrency.
if (0 != error_code) {
LOG_IF(WARNING, error_code != EPIPE) << "Fail to write into " << *sock
<< ", error text= " << args.error_text
<< ": " << strerror(error_code);
cntl->SetFailed(error_code, "Fail to write into %s: %s",
sock->description().c_str(), args.error_text.c_str());
Socket::WriteOptions wopt;
wopt.id_wait = response_id;
wopt.notify_on_success = true;
wopt.ignore_eovercrowded = true;
if (sock->Write(&res_buf, &wopt) != 0) {
const int errcode = errno;
PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
cntl->SetFailed(errcode, "Fail to write into %s",
sock->description().c_str());
return;
}
}

if (span) {
bthread_id_join(response_id);
// Do not care about the result of background writing.
// TODO: this is not sent
span->set_sent_us(args.sent_us);
}
Expand Down
17 changes: 6 additions & 11 deletions src/brpc/policy/http_rpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -935,8 +935,10 @@ HttpResponseSender::~HttpResponseSender() {
// Have the risk of unlimited pending responses, in which case, tell
// users to set max_concurrency.
ResponseWriteInfo args;
bthread_id_t response_id;
CHECK_EQ(0, bthread_id_create2(&response_id, &args, HandleResponseWritten));
bthread_id_t response_id = INVALID_BTHREAD_ID;
if (span) {
CHECK_EQ(0, bthread_id_create(&response_id, &args, HandleResponseWritten));
}
Socket::WriteOptions wopt;
wopt.id_wait = response_id;
wopt.notify_on_success = true;
Expand Down Expand Up @@ -986,16 +988,9 @@ HttpResponseSender::~HttpResponseSender() {
return;
}

bthread_id_join(response_id);
concurrency_remover.set_sent_us(args.sent_us);
const int errcode = args.error_code;
if (0 != errcode) {
LOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *socket;
cntl->SetFailed(errcode, "Fail to write into %s", socket->description().c_str());
return;
}

if (span) {
bthread_id_join(response_id);
// Do not care about the result of background writing.
// TODO: this is not sent
span->set_sent_us(args.sent_us);
}
Expand Down
17 changes: 6 additions & 11 deletions src/brpc/policy/hulu_pbrpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,10 @@ static void SendHuluResponse(int64_t correlation_id,
// Have the risk of unlimited pending responses, in which case, tell
// users to set max_concurrency.
ResponseWriteInfo args;
bthread_id_t response_id;
CHECK_EQ(0, bthread_id_create2(&response_id, &args, HandleResponseWritten));
bthread_id_t response_id = INVALID_BTHREAD_ID;
if (span) {
CHECK_EQ(0, bthread_id_create(&response_id, &args, HandleResponseWritten));
}
Socket::WriteOptions wopt;
wopt.id_wait = response_id;
wopt.notify_on_success = true;
Expand All @@ -319,16 +321,9 @@ static void SendHuluResponse(int64_t correlation_id,
return;
}

bthread_id_join(response_id);
concurrency_remover.set_sent_us(args.sent_us);
const int errcode = args.error_code;
if (0 != errcode) {
LOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
cntl->SetFailed(errcode, "Fail to write into %s", sock->description().c_str());
return;
}

if (span) {
bthread_id_join(response_id);
// Do not care about the result of background writing.
// TODO: this is not sent
span->set_sent_us(args.sent_us);
}
Expand Down
34 changes: 8 additions & 26 deletions src/brpc/policy/mongo_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,33 +96,15 @@ void SendMongoResponse::Run() {
}

if (res_buf.empty()) {
return;
}

// Have the risk of unlimited pending responses, in which case, tell
// users to set max_concurrency.
ResponseWriteInfo args;
bthread_id_t response_id;
CHECK_EQ(0, bthread_id_create2(&response_id, &args, HandleResponseWritten));
Socket::WriteOptions wopt;
wopt.id_wait = response_id;
wopt.notify_on_success = true;
wopt.ignore_eovercrowded = true;
wopt.ignore_eovercrowded = true;
if (socket->Write(&res_buf, &wopt) != 0) {
PLOG(WARNING) << "Fail to write into " << *socket;
return;
}

bthread_id_join(response_id);
concurrency_remover.set_sent_us(args.sent_us);
const int errcode = args.error_code;
if (0 != errcode) {
LOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *socket;
cntl.SetFailed(errcode, "Fail to write into %s", socket->description().c_str());
return;
// Have the risk of unlimited pending responses, in which case, tell
// users to set max_concurrency.
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (socket->Write(&res_buf, &wopt) != 0) {
PLOG(WARNING) << "Fail to write into " << *socket;
return;
}
}

}

ParseResult ParseMongoMessage(butil::IOBuf* source,
Expand Down
17 changes: 7 additions & 10 deletions src/brpc/policy/nshead_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ void NsheadClosure::Run() {
// users to set max_concurrency.
ResponseWriteInfo args;
bthread_id_t response_id;
CHECK_EQ(0, bthread_id_create2(&response_id, &args, HandleResponseWritten));
if (span) {
CHECK_EQ(0, bthread_id_create(&response_id, &args, HandleResponseWritten));
}
Socket::WriteOptions wopt;
wopt.id_wait = response_id;
wopt.notify_on_success = true;
Expand All @@ -128,16 +130,11 @@ void NsheadClosure::Run() {
return;
}

bthread_id_join(response_id);
concurrency_remover.set_sent_us(args.sent_us);
const int errcode = args.error_code;
if (0 != errcode) {
LOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
_controller.SetFailed(errcode, "Fail to write into %s",
sock->description().c_str());
return;
if (span) {
bthread_id_join(response_id);
// Do not care about the result of background writing.
sent_us = args.sent_us;
}
sent_us = args.sent_us;
}
if (span) {
// TODO: this is not sent
Expand Down
16 changes: 5 additions & 11 deletions src/brpc/policy/sofa_pbrpc_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,9 @@ static void SendSofaResponse(int64_t correlation_id,
// users to set max_concurrency.
ResponseWriteInfo args;
bthread_id_t response_id;
CHECK_EQ(0, bthread_id_create2(&response_id, &args, HandleResponseWritten));
if (span) {
CHECK_EQ(0, bthread_id_create(&response_id, &args, HandleResponseWritten));
}
Socket::WriteOptions wopt;
wopt.id_wait = response_id;
wopt.notify_on_success = true;
Expand All @@ -296,17 +298,9 @@ static void SendSofaResponse(int64_t correlation_id,
return;
}

bthread_id_join(response_id);
concurrency_remover.set_sent_us(args.sent_us);
const int errcode = args.error_code;
if (0 != errcode) {
LOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock;
cntl->SetFailed(errcode, "Fail to write into %s",
sock->description().c_str());
return;
}

if (span) {
bthread_id_join(response_id);
// Do not care about the result of background writing.
// TODO: this is not sent
span->set_sent_us(args.sent_us);
}
Expand Down
4 changes: 2 additions & 2 deletions src/brpc/policy/streaming_rpc_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ void SendStreamClose(Socket *sock, int64_t remote_stream_id,
int64_t source_stream_id);

int SendStreamData(Socket* sock, const butil::IOBuf* data,
int64_t remote_stream_id,
int64_t source_stream_id, bthread_id_t);
int64_t remote_stream_id, int64_t source_stream_id,
bthread_id_t);

} // namespace policy
} // namespace brpc
Expand Down

0 comments on commit 824d089

Please sign in to comment.