Skip to content

Commit

Permalink
Make reporter buffer size and period configurable.
Browse files Browse the repository at this point in the history
Co-authored-by: Kevin M Granger <[email protected]>
  • Loading branch information
MarkKMueller and KevinMGranger committed Feb 23, 2018
1 parent cbf3e54 commit f90a2ab
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 15 deletions.
9 changes: 7 additions & 2 deletions zipkin/include/zipkin/tracer.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
#include <zipkin/zipkin_core_types.h>

namespace zipkin {
const SteadyClock::duration DEFAULT_REPORTING_PERIOD =
std::chrono::milliseconds{500};
const size_t DEFAULT_SPAN_BUFFER_SIZE = 1000;

/**
* Abstract class that delegates to users of the Tracer class the responsibility
Expand Down Expand Up @@ -50,8 +53,10 @@ typedef std::unique_ptr<Reporter> ReporterPtr;
* service.
* @return a Reporter object.
*/
ReporterPtr makeHttpReporter(const char *collector_host,
uint32_t collector_port);
ReporterPtr makeHttpReporter(
const char *collector_host, uint32_t collector_port,
SteadyClock::duration reporting_period = DEFAULT_REPORTING_PERIOD,
size_t max_buffered_spans = DEFAULT_SPAN_BUFFER_SIZE);

/**
* This class implements the Zipkin tracer. It has methods to create the
Expand Down
6 changes: 6 additions & 0 deletions zipkin/src/span_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ class SpanBuffer {
*/
bool addSpan(const Span &span);

/**
* @return returns the number of spans that can be held in currently allocated
* storage.
*/
uint64_t spanCapacity() const { return span_buffer_.capacity(); }

/**
* Empties the buffer. This method is supposed to be called when all buffered
* spans have been sent to to the Zipkin service.
Expand Down
7 changes: 5 additions & 2 deletions zipkin/src/zipkin_http_transporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,13 @@ void ZipkinHttpTransporter::transportSpans(SpanBuffer &spans) try {
}

ReporterPtr makeHttpReporter(const char *collector_host,
uint32_t collector_port) try {
uint32_t collector_port,
SteadyClock::duration reporting_period,
size_t max_buffered_spans) try {
std::unique_ptr<Transporter> transporter{
new ZipkinHttpTransporter{collector_host, collector_port}};
std::unique_ptr<Reporter> reporter{new ReporterImpl{std::move(transporter)}};
std::unique_ptr<Reporter> reporter{new ReporterImpl{
std::move(transporter), reporting_period, max_buffered_spans}};
return reporter;
} catch (const CurlError &error) {
std::cerr << error.what() << '\n';
Expand Down
22 changes: 14 additions & 8 deletions zipkin/src/zipkin_reporter_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@

namespace zipkin {

const SteadyClock::duration reporting_period = std::chrono::milliseconds{500};
const size_t max_buffered_spans = 1000;

ReporterImpl::ReporterImpl(TransporterPtr &&transporter)
: transporter_{std::move(transporter)}, spans_{max_buffered_spans},
ReporterImpl::ReporterImpl(TransporterPtr &&transporter,
std::chrono::steady_clock::duration reporting_period,
size_t max_buffered_spans)
: transporter_{std::move(transporter)}, reporting_period_(reporting_period),
max_buffered_spans_(max_buffered_spans), spans_{max_buffered_spans},
inflight_spans_{max_buffered_spans} {
writer_ = std::thread(&ReporterImpl::writeReports, this);
}
Expand All @@ -22,7 +22,7 @@ void ReporterImpl::reportSpan(const Span &span) {
{
std::lock_guard<std::mutex> lock(write_mutex_);
num_spans_reported_ += spans_.addSpan(span);
is_full = spans_.pendingSpans() == max_buffered_spans;
is_full = spans_.pendingSpans() == max_buffered_spans_;
}
if (is_full)
write_cond_.notify_one();
Expand Down Expand Up @@ -57,16 +57,22 @@ bool ReporterImpl::waitUntilNextReport(const SteadyTime &due_time) {
}

void ReporterImpl::writeReports() {
auto due_time = std::chrono::steady_clock::now() + reporting_period;
auto due_time = std::chrono::steady_clock::now() + reporting_period_;
while (waitUntilNextReport(due_time)) {
if (inflight_spans_.pendingSpans() > 0) {
transporter_->transportSpans(inflight_spans_);
num_spans_flushed_ += inflight_spans_.pendingSpans();
inflight_spans_.clear();

// If the buffer capacity has been changed, this is the place to resize
// it.
if (inflight_spans_.spanCapacity() != max_buffered_spans_) {
inflight_spans_.allocateBuffer(max_buffered_spans_);
}
write_cond_.notify_all();
}
auto now = std::chrono::steady_clock::now();
due_time += reporting_period;
due_time += reporting_period_;
if (due_time < now)
due_time = now;
}
Expand Down
8 changes: 7 additions & 1 deletion zipkin/src/zipkin_reporter_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ class ReporterImpl : public Reporter {
*
* @param transporter The Transporter to be associated with the reporter.
*/
explicit ReporterImpl(TransporterPtr &&transporter);
ReporterImpl(
TransporterPtr &&transporter,
SteadyClock::duration reporting_period = DEFAULT_REPORTING_PERIOD,
size_t max_buffered_spans = DEFAULT_SPAN_BUFFER_SIZE);

/**
* Destructor.
Expand All @@ -51,6 +54,9 @@ class ReporterImpl : public Reporter {

std::mutex write_mutex_;
std::condition_variable write_cond_;
SteadyClock::duration reporting_period_;
size_t max_buffered_spans_;

bool write_exit_ = false;
std::thread writer_;
int64_t num_spans_reported_ = 0;
Expand Down
2 changes: 2 additions & 0 deletions zipkin_opentracing/include/zipkin/opentracing.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ namespace zipkin {
struct ZipkinOtTracerOptions {
std::string collector_host = "localhost";
uint32_t collector_port = 9411;
SteadyClock::duration reporting_period = DEFAULT_REPORTING_PERIOD;
size_t max_buffered_spans = DEFAULT_SPAN_BUFFER_SIZE;

std::string service_name;
IpAddress service_address;
Expand Down
5 changes: 3 additions & 2 deletions zipkin_opentracing/src/opentracing.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
#include <zipkin/utility.h>
#include <zipkin/zipkin_core_types.h>

using opentracing::string_view;
using opentracing::Value;
using opentracing::expected;
using opentracing::make_unexpected;
using opentracing::string_view;

namespace ot = opentracing;

Expand Down Expand Up @@ -374,7 +374,8 @@ makeZipkinOtTracer(const ZipkinOtTracerOptions &options,
std::shared_ptr<ot::Tracer>
makeZipkinOtTracer(const ZipkinOtTracerOptions &options) {
auto reporter =
makeHttpReporter(options.collector_host.c_str(), options.collector_port);
makeHttpReporter(options.collector_host.c_str(), options.collector_port,
options.reporting_period, options.max_buffered_spans);
return makeZipkinOtTracer(options, std::move(reporter));
}
} // namespace zipkin

0 comments on commit f90a2ab

Please sign in to comment.