From f90a2abaa9f4e5d5b4450fcd23515f3a94a85528 Mon Sep 17 00:00:00 2001 From: Mark K Mueller Date: Thu, 22 Feb 2018 17:36:11 -0800 Subject: [PATCH] Make reporter buffer size and period configurable. Co-authored-by: Kevin M Granger --- zipkin/include/zipkin/tracer.h | 9 ++++++-- zipkin/src/span_buffer.h | 6 +++++ zipkin/src/zipkin_http_transporter.cc | 7 ++++-- zipkin/src/zipkin_reporter_impl.cc | 22 ++++++++++++------- zipkin/src/zipkin_reporter_impl.h | 8 ++++++- .../include/zipkin/opentracing.h | 2 ++ zipkin_opentracing/src/opentracing.cc | 5 +++-- 7 files changed, 44 insertions(+), 15 deletions(-) diff --git a/zipkin/include/zipkin/tracer.h b/zipkin/include/zipkin/tracer.h index b1a3be0..aca8530 100644 --- a/zipkin/include/zipkin/tracer.h +++ b/zipkin/include/zipkin/tracer.h @@ -6,6 +6,9 @@ #include namespace zipkin { +const SteadyClock::duration DEFAULT_REPORTING_PERIOD = + std::chrono::milliseconds{500}; +const size_t DEFAULT_SPAN_BUFFER_SIZE = 1000; /** * Abstract class that delegates to users of the Tracer class the responsibility @@ -50,8 +53,10 @@ typedef std::unique_ptr ReporterPtr; * service. * @return a Reporter object. */ -ReporterPtr makeHttpReporter(const char *collector_host, - uint32_t collector_port); +ReporterPtr makeHttpReporter( + const char *collector_host, uint32_t collector_port, + SteadyClock::duration reporting_period = DEFAULT_REPORTING_PERIOD, + size_t max_buffered_spans = DEFAULT_SPAN_BUFFER_SIZE); /** * This class implements the Zipkin tracer. It has methods to create the diff --git a/zipkin/src/span_buffer.h b/zipkin/src/span_buffer.h index beb7059..dd2c569 100644 --- a/zipkin/src/span_buffer.h +++ b/zipkin/src/span_buffer.h @@ -40,6 +40,12 @@ class SpanBuffer { */ bool addSpan(const Span &span); + /** + * @return returns the number of spans that can be held in currently allocated + * storage. + */ + uint64_t spanCapacity() const { return span_buffer_.capacity(); } + /** * Empties the buffer. This method is supposed to be called when all buffered * spans have been sent to to the Zipkin service. diff --git a/zipkin/src/zipkin_http_transporter.cc b/zipkin/src/zipkin_http_transporter.cc index e961162..30d8895 100644 --- a/zipkin/src/zipkin_http_transporter.cc +++ b/zipkin/src/zipkin_http_transporter.cc @@ -52,10 +52,13 @@ void ZipkinHttpTransporter::transportSpans(SpanBuffer &spans) try { } ReporterPtr makeHttpReporter(const char *collector_host, - uint32_t collector_port) try { + uint32_t collector_port, + SteadyClock::duration reporting_period, + size_t max_buffered_spans) try { std::unique_ptr transporter{ new ZipkinHttpTransporter{collector_host, collector_port}}; - std::unique_ptr reporter{new ReporterImpl{std::move(transporter)}}; + std::unique_ptr reporter{new ReporterImpl{ + std::move(transporter), reporting_period, max_buffered_spans}}; return reporter; } catch (const CurlError &error) { std::cerr << error.what() << '\n'; diff --git a/zipkin/src/zipkin_reporter_impl.cc b/zipkin/src/zipkin_reporter_impl.cc index 9f36b0c..2c1fc6d 100644 --- a/zipkin/src/zipkin_reporter_impl.cc +++ b/zipkin/src/zipkin_reporter_impl.cc @@ -3,11 +3,11 @@ namespace zipkin { -const SteadyClock::duration reporting_period = std::chrono::milliseconds{500}; -const size_t max_buffered_spans = 1000; - -ReporterImpl::ReporterImpl(TransporterPtr &&transporter) - : transporter_{std::move(transporter)}, spans_{max_buffered_spans}, +ReporterImpl::ReporterImpl(TransporterPtr &&transporter, + std::chrono::steady_clock::duration reporting_period, + size_t max_buffered_spans) + : transporter_{std::move(transporter)}, reporting_period_(reporting_period), + max_buffered_spans_(max_buffered_spans), spans_{max_buffered_spans}, inflight_spans_{max_buffered_spans} { writer_ = std::thread(&ReporterImpl::writeReports, this); } @@ -22,7 +22,7 @@ void ReporterImpl::reportSpan(const Span &span) { { std::lock_guard lock(write_mutex_); num_spans_reported_ += spans_.addSpan(span); - is_full = spans_.pendingSpans() == max_buffered_spans; + is_full = spans_.pendingSpans() == max_buffered_spans_; } if (is_full) write_cond_.notify_one(); @@ -57,16 +57,22 @@ bool ReporterImpl::waitUntilNextReport(const SteadyTime &due_time) { } void ReporterImpl::writeReports() { - auto due_time = std::chrono::steady_clock::now() + reporting_period; + auto due_time = std::chrono::steady_clock::now() + reporting_period_; while (waitUntilNextReport(due_time)) { if (inflight_spans_.pendingSpans() > 0) { transporter_->transportSpans(inflight_spans_); num_spans_flushed_ += inflight_spans_.pendingSpans(); inflight_spans_.clear(); + + // If the buffer capacity has been changed, this is the place to resize + // it. + if (inflight_spans_.spanCapacity() != max_buffered_spans_) { + inflight_spans_.allocateBuffer(max_buffered_spans_); + } write_cond_.notify_all(); } auto now = std::chrono::steady_clock::now(); - due_time += reporting_period; + due_time += reporting_period_; if (due_time < now) due_time = now; } diff --git a/zipkin/src/zipkin_reporter_impl.h b/zipkin/src/zipkin_reporter_impl.h index 32cb572..34b2eac 100644 --- a/zipkin/src/zipkin_reporter_impl.h +++ b/zipkin/src/zipkin_reporter_impl.h @@ -28,7 +28,10 @@ class ReporterImpl : public Reporter { * * @param transporter The Transporter to be associated with the reporter. */ - explicit ReporterImpl(TransporterPtr &&transporter); + ReporterImpl( + TransporterPtr &&transporter, + SteadyClock::duration reporting_period = DEFAULT_REPORTING_PERIOD, + size_t max_buffered_spans = DEFAULT_SPAN_BUFFER_SIZE); /** * Destructor. @@ -51,6 +54,9 @@ class ReporterImpl : public Reporter { std::mutex write_mutex_; std::condition_variable write_cond_; + SteadyClock::duration reporting_period_; + size_t max_buffered_spans_; + bool write_exit_ = false; std::thread writer_; int64_t num_spans_reported_ = 0; diff --git a/zipkin_opentracing/include/zipkin/opentracing.h b/zipkin_opentracing/include/zipkin/opentracing.h index 4fc83b9..8c869dd 100644 --- a/zipkin_opentracing/include/zipkin/opentracing.h +++ b/zipkin_opentracing/include/zipkin/opentracing.h @@ -6,6 +6,8 @@ namespace zipkin { struct ZipkinOtTracerOptions { std::string collector_host = "localhost"; uint32_t collector_port = 9411; + SteadyClock::duration reporting_period = DEFAULT_REPORTING_PERIOD; + size_t max_buffered_spans = DEFAULT_SPAN_BUFFER_SIZE; std::string service_name; IpAddress service_address; diff --git a/zipkin_opentracing/src/opentracing.cc b/zipkin_opentracing/src/opentracing.cc index c2ffbf9..771c331 100644 --- a/zipkin_opentracing/src/opentracing.cc +++ b/zipkin_opentracing/src/opentracing.cc @@ -10,10 +10,10 @@ #include #include -using opentracing::string_view; using opentracing::Value; using opentracing::expected; using opentracing::make_unexpected; +using opentracing::string_view; namespace ot = opentracing; @@ -374,7 +374,8 @@ makeZipkinOtTracer(const ZipkinOtTracerOptions &options, std::shared_ptr makeZipkinOtTracer(const ZipkinOtTracerOptions &options) { auto reporter = - makeHttpReporter(options.collector_host.c_str(), options.collector_port); + makeHttpReporter(options.collector_host.c_str(), options.collector_port, + options.reporting_period, options.max_buffered_spans); return makeZipkinOtTracer(options, std::move(reporter)); } } // namespace zipkin