Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
feat: add point for write using latency_tracer tool (#511)
Browse files Browse the repository at this point in the history
  • Loading branch information
foreverneverer committed Nov 13, 2020
1 parent 0bd8f36 commit 85535ee
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 42 deletions.
31 changes: 21 additions & 10 deletions src/utils/latency_tracer.h → include/dsn/utils/latency_tracer.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,24 @@

#pragma once
#include <dsn/utility/synchronize.h>
#include <dsn/utility/flags.h>
#include <dsn/dist/fmt_logging.h>

namespace dsn {
namespace utils {

#define ADD_POINT(tracer) \
(tracer)->add_point(fmt::format("{}:{}:{}", __FILENAME__, __LINE__, __FUNCTION__))
#define ADD_CUSTOM_POINT(tracer, message) \
(tracer)->add_point( \
fmt::format("{}:{}:{}[{}]", __FILENAME__, __LINE__, __FUNCTION__, (message)))

/**
* latency_tracer is a tool for tracking the time spent in each of the stages during request
* execution. It can help users to figure out where the latency bottleneck is located. User needs to
* use `add_point` before entering one stage, which will record the name of this stage and its start
* time. When the request is finished, you can dump the formatted result by `dump_trace_points`.
* time. When the request is finished, the formatted result can be dumped automatically in
* deconstructer
*
* For example, given a request with a 4-stage pipeline (the `latency_tracer` need to
* be held by this request throughout the execution):
Expand Down Expand Up @@ -53,14 +62,19 @@ namespace utils {
* | | | |
* start---->stageA----->stageB---->end
*
* "request.tracer" will record the time duration among all tracepoints.
* "request.tracer" will record the time duration among all trace points.
**/
extern bool FLAGS_enable_latency_tracer;
DSN_DECLARE_bool(enable_latency_tracer);

class latency_tracer
{
public:
latency_tracer(const std::string &name);
// threshold < 0: don't dump any trace points
// threshold = 0: dump all trace points
// threshold > 0: dump the trace point when time_used > threshold
latency_tracer(const std::string &name, uint64_t threshold = 0);

~latency_tracer();

// add a trace point to the tracer
// -name: user specified name of the trace point
Expand All @@ -75,17 +89,14 @@ class latency_tracer
// stageA[rpc_message]--stageB[rpc_message]--
void set_sub_tracer(const std::shared_ptr<latency_tracer> &tracer);

// threshold < 0: don't dump any trace points
// threshold = 0: dump all trace points
// threshold > 0: dump the trace point when time_used > threshold
void dump_trace_points(int threshold);

private:
void dump_trace_points(int threshold, /*out*/ std::string &traces);
void dump_trace_points(/*out*/ std::string &traces);

utils::rw_lock_nr _lock;

const std::string _name;
const uint64_t _threshold;
bool _is_sub;
const uint64_t _start_time;
std::map<int64_t, std::string> _points;
std::shared_ptr<latency_tracer> _sub_tracer;
Expand Down
12 changes: 12 additions & 0 deletions src/replica/mutation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,17 @@
#include "mutation.h"
#include "mutation_log.h"
#include "replica.h"
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/flags.h>

namespace dsn {
namespace replication {

DSN_DEFINE_uint64("replication",
abnormal_write_trace_latency_threshold,
1000 * 1000 * 1000, // 1s
"latency trace will be logged when exceed the write latency threshold");

std::atomic<uint64_t> mutation::s_tid(0);

mutation::mutation()
Expand All @@ -52,6 +59,8 @@ mutation::mutation()
_appro_data_bytes = sizeof(mutation_header);
_create_ts_ns = dsn_now_ns();
_tid = ++s_tid;
tracer = std::make_shared<dsn::utils::latency_tracer>(
fmt::format("{}[{}]", "mutation", _tid), FLAGS_abnormal_write_trace_latency_threshold);
}

mutation_ptr mutation::copy_no_reply(const mutation_ptr &old_mu)
Expand Down Expand Up @@ -135,6 +144,9 @@ void mutation::copy_from(mutation_ptr &old)

void mutation::add_client_request(task_code code, dsn::message_ex *request)
{
if (request != nullptr) {
ADD_CUSTOM_POINT(tracer, request->header->id);
}
data.updates.push_back(mutation_update());
mutation_update &update = data.updates.back();
_appro_data_bytes += 32; // approximate code size
Expand Down
3 changes: 3 additions & 0 deletions src/replica/mutation.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <list>
#include <atomic>
#include <dsn/utility/link.h>
#include <dsn/utils/latency_tracer.h>

#ifndef __linux__
#pragma warning(disable : 4201)
Expand Down Expand Up @@ -142,6 +143,8 @@ class mutation : public ref_counter
// used by pending mutation queue only
mutation *next;

std::shared_ptr<dsn::utils::latency_tracer> tracer;

void set_is_sync_to_child(bool sync_to_child) { _is_sync_to_child = sync_to_child; }
bool is_sync_to_child() { return _is_sync_to_child; }

Expand Down
9 changes: 9 additions & 0 deletions src/replica/mutation_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "replica.h"
#include "mutation_log_utils.h"

#include <dsn/utils/latency_tracer.h>
#include <dsn/utility/filesystem.h>
#include <dsn/utility/crc.h>
#include <dsn/utility/fail_point.h>
Expand All @@ -52,6 +53,7 @@ ::dsn::task_ptr mutation_log_shared::append(mutation_ptr &mu,

_slock.lock();

ADD_POINT(mu->tracer);
// init pending buffer
if (nullptr == _pending_write) {
_pending_write = std::make_shared<log_appender>(mark_new_offset(0, true).second);
Expand Down Expand Up @@ -126,13 +128,20 @@ void mutation_log_shared::write_pending_mutations(bool release_lock_required)
void mutation_log_shared::commit_pending_mutations(log_file_ptr &lf,
std::shared_ptr<log_appender> &pending)
{
for (auto &mu : pending->mutations()) {
ADD_POINT(mu->tracer);
}
lf->commit_log_blocks( // forces a new line for params
*pending,
LPC_WRITE_REPLICATION_LOG_SHARED,
&_tracker,
[this, lf, pending](error_code err, size_t sz) mutable {
dassert(_is_writing.load(std::memory_order_relaxed), "");

for (auto &mu : pending->mutations()) {
ADD_CUSTOM_POINT(mu->tracer, "commit_pending_completed");
}

for (auto &block : pending->all_blocks()) {
auto hdr = (log_block_header *)block.front().data();
dassert(hdr->magic == 0xdeadbeef, "header magic is changed: 0x%x", hdr->magic);
Expand Down
2 changes: 2 additions & 0 deletions src/replica/prepare_list.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include "prepare_list.h"

#include <dsn/utils/latency_tracer.h>
#include <dsn/dist/fmt_logging.h>

namespace dsn {
Expand Down Expand Up @@ -74,6 +75,7 @@ error_code prepare_list::prepare(mutation_ptr &mu,
decree d = mu->data.header.decree;
dcheck_gt_replica(d, last_committed_decree());

ADD_POINT(mu->tracer);
error_code err;
switch (status) {
case partition_status::PS_PRIMARY:
Expand Down
3 changes: 3 additions & 0 deletions src/replica/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "backup/replica_backup_manager.h"
#include "bulk_load/replica_bulk_loader.h"

#include <dsn/utils/latency_tracer.h>
#include <dsn/cpp/json_helper.h>
#include <dsn/dist/replication/replication_app_base.h>
#include <dsn/dist/fmt_logging.h>
Expand Down Expand Up @@ -223,6 +224,7 @@ void replica::execute_mutation(mutation_ptr &mu)
name(),
mu->name(),
static_cast<int>(mu->client_requests.size()));
ADD_POINT(mu->tracer);

error_code err = ERR_OK;
decree d = mu->data.header.decree;
Expand Down Expand Up @@ -313,6 +315,7 @@ void replica::execute_mutation(mutation_ptr &mu)
}

// update table level latency perf-counters for primary partition
ADD_CUSTOM_POINT(mu->tracer, "completed");
if (partition_status::PS_PRIMARY == status()) {
uint64_t now_ns = dsn_now_ns();
for (auto update : mu->data.updates) {
Expand Down
13 changes: 13 additions & 0 deletions src/replica/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "mutation_log.h"
#include "replica_stub.h"
#include "bulk_load/replica_bulk_loader.h"
#include <dsn/utils/latency_tracer.h>
#include <dsn/dist/replication/replication_app_base.h>
#include <dsn/dist/fmt_logging.h>

Expand Down Expand Up @@ -135,6 +136,8 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c
"invalid partition_status, status = %s",
enum_to_string(status()));

ADD_POINT(mu->tracer);

error_code err = ERR_OK;
uint8_t count = 0;
const auto request_count = mu->client_requests.size();
Expand Down Expand Up @@ -282,6 +285,7 @@ void replica::send_prepare_message(::dsn::rpc_address addr,
bool pop_all_committed_mutations,
int64_t learn_signature)
{
ADD_CUSTOM_POINT(mu->tracer, addr.to_string());
dsn::message_ex *msg = dsn::message_ex::create_request(
RPC_PREPARE, timeout_milliseconds, get_gpid().thread_hash());
replica_configuration rconfig;
Expand Down Expand Up @@ -321,6 +325,7 @@ void replica::do_possible_commit_on_primary(mutation_ptr &mu)
"invalid partition_status, status = %s",
enum_to_string(status()));

ADD_POINT(mu->tracer);
if (mu->is_ready_for_commit()) {
_prepare_list->commit(mu->data.header.decree, COMMIT_ALL_READY);
}
Expand All @@ -339,6 +344,8 @@ void replica::on_prepare(dsn::message_ex *request)
mu = mutation::read_from(reader, request);
}

ADD_POINT(mu->tracer);

decree decree = mu->data.header.decree;

dinfo("%s: mutation %s on_prepare", name(), mu->name());
Expand Down Expand Up @@ -491,6 +498,8 @@ void replica::on_append_log_completed(mutation_ptr &mu, error_code err, size_t s
size,
err.to_string());

ADD_POINT(mu->tracer);

if (err == ERR_OK) {
mu->set_logged();
} else {
Expand Down Expand Up @@ -547,6 +556,8 @@ void replica::on_prepare_reply(std::pair<mutation_ptr, partition_status::type> p
mutation_ptr mu = pr.first;
partition_status::type target_status = pr.second;

ADD_CUSTOM_POINT(mu->tracer, request->to_address.to_string());

// skip callback for old mutations
if (partition_status::PS_PRIMARY != status() || mu->data.header.ballot < get_ballot() ||
mu->get_decree() <= last_committed_decree())
Expand Down Expand Up @@ -581,6 +592,7 @@ void replica::on_prepare_reply(std::pair<mutation_ptr, partition_status::type> p
enum_to_string(target_status),
resp.err.to_string());
} else {
ADD_CUSTOM_POINT(mu->tracer, fmt::format("error:{}", request->to_address.to_string()));
derror("%s: mutation %s on_prepare_reply from %s, appro_data_bytes = %d, "
"target_status = %s, err = %s",
name(),
Expand Down Expand Up @@ -702,6 +714,7 @@ void replica::on_prepare_reply(std::pair<mutation_ptr, partition_status::type> p

void replica::ack_prepare_message(error_code err, mutation_ptr &mu)
{
ADD_CUSTOM_POINT(mu->tracer, name());
prepare_ack resp;
resp.pid = get_gpid();
resp.err = err;
Expand Down
3 changes: 3 additions & 0 deletions src/replica/replication_app_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include "replica.h"
#include "mutation.h"
#include <dsn/utils/latency_tracer.h>
#include <dsn/dist/replication/replication_app_base.h>
#include <dsn/utility/factory_store.h>
#include <dsn/utility/filesystem.h>
Expand Down Expand Up @@ -472,6 +473,8 @@ ::dsn::error_code replication_app_base::apply_mutation(const mutation *mu)
(int)mu->client_requests.size());
dassert(mu->data.updates.size() > 0, "");

ADD_POINT(mu->tracer);

bool has_ingestion_request = false;
int request_count = static_cast<int>(mu->client_requests.size());
dsn::message_ex **batched_requests =
Expand Down
42 changes: 24 additions & 18 deletions src/utils/latency_tracer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#include "latency_tracer.h"

#include <dsn/utils/latency_tracer.h>
#include <dsn/service_api_c.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/flags.h>
Expand All @@ -26,7 +25,20 @@ namespace utils {

DSN_DEFINE_bool("replication", enable_latency_tracer, false, "whether enable the latency tracer");

latency_tracer::latency_tracer(const std::string &name) : _name(name), _start_time(dsn_now_ns()) {}
latency_tracer::latency_tracer(const std::string &name, uint64_t threshold)
: _name(name), _threshold(threshold), _is_sub(false), _start_time(dsn_now_ns())
{
}

latency_tracer::~latency_tracer()
{
if (_is_sub) {
return;
}

std::string traces;
dump_trace_points(traces);
}

void latency_tracer::add_point(const std::string &stage_name)
{
Expand All @@ -41,37 +53,31 @@ void latency_tracer::add_point(const std::string &stage_name)

void latency_tracer::set_sub_tracer(const std::shared_ptr<latency_tracer> &tracer)
{
if (!FLAGS_enable_latency_tracer) {
return;
}
tracer->_is_sub = true;
_sub_tracer = tracer;
}

void latency_tracer::dump_trace_points(int threshold)
{
std::string traces;
dump_trace_points(threshold, traces);
}

void latency_tracer::dump_trace_points(int threshold, /*out*/ std::string &traces)
void latency_tracer::dump_trace_points(/*out*/ std::string &traces)
{
if (threshold < 0 || !FLAGS_enable_latency_tracer) {
return;
}

if (_points.empty()) {
if (!FLAGS_enable_latency_tracer || _threshold < 0 || _points.empty()) {
return;
}

utils::auto_read_lock read(_lock);

uint64_t time_used = _points.rbegin()->first - _start_time;

if (time_used < threshold) {
if (time_used < _threshold) {
return;
}

traces.append(fmt::format("\t***************[TRACE:{}]***************\n", _name));
uint64_t previous_time = _start_time;
for (const auto &point : _points) {
std::string trace = fmt::format("\tTRACE:name={:<50}, span={:<20}, total={:<20}, "
std::string trace = fmt::format("\tTRACE:name={:<70}, span={:>20}, total={:>20}, "
"ts={:<20}\n",
point.second,
point.first - previous_time,
Expand All @@ -86,7 +92,7 @@ void latency_tracer::dump_trace_points(int threshold, /*out*/ std::string &trace
return;
}

_sub_tracer->dump_trace_points(0, traces);
_sub_tracer->dump_trace_points(traces);
}

} // namespace utils
Expand Down
Loading

0 comments on commit 85535ee

Please sign in to comment.