From 521f442fdd3927e9c2521bb0d4acc5a40d47b014 Mon Sep 17 00:00:00 2001 From: Craig Ringer Date: Wed, 7 Feb 2018 15:27:27 +0800 Subject: [PATCH] Expose an explicit Tracer.Flush() method Extend the OpenTracing API with an explicit jaegertracing::Tracer::flush() method to force spans to be flushed eagerly without closing the tracer. It returns only when the spans are flushed. Fixes #53 Call flush() from Close(), but not from the Tracer dtor. So we follow the spec and ensure we flush buffers on explicit Close only. Fixes #52 WIP. For as-yet-undiagnosed reasons `flush()` sometimes waits a full bufferFlushInterval before returning. Signed-off-by: Craig Ringer --- src/jaegertracing/Tracer.h | 13 +++++++++++-- src/jaegertracing/reporters/RemoteReporter.cpp | 14 ++++++++++++-- src/jaegertracing/reporters/RemoteReporter.h | 4 +++- src/jaegertracing/reporters/Reporter.h | 2 ++ 4 files changed, 28 insertions(+), 5 deletions(-) diff --git a/src/jaegertracing/Tracer.h b/src/jaegertracing/Tracer.h index e077c817..253775f6 100644 --- a/src/jaegertracing/Tracer.h +++ b/src/jaegertracing/Tracer.h @@ -102,7 +102,7 @@ class Tracer : public opentracing::Tracer, serviceName, sampler, reporter, logger, metrics, options)); } - ~Tracer() { Close(); } + ~Tracer() { close(); } std::unique_ptr StartSpanWithOptions(string_view operationName, @@ -181,6 +181,12 @@ class Tracer : public opentracing::Tracer, } void Close() noexcept override + { + flush(); + close(); + } + + void close() noexcept { try { _reporter->close(); @@ -192,7 +198,10 @@ class Tracer : public opentracing::Tracer, } } - void close() noexcept { Close(); } + void flush() + { + _reporter->flush(); + } const std::string& serviceName() const { return _serviceName; } diff --git a/src/jaegertracing/reporters/RemoteReporter.cpp b/src/jaegertracing/reporters/RemoteReporter.cpp index 5b115070..69256df9 100644 --- a/src/jaegertracing/reporters/RemoteReporter.cpp +++ b/src/jaegertracing/reporters/RemoteReporter.cpp @@ -94,7 +94,7 @@ void RemoteReporter::sweepQueue() sendSpan(span); } else if (bufferFlushIntervalExpired()) { - flush(); + async_flush(); } } catch (...) { auto logger = logging::consoleLogger(); @@ -122,12 +122,13 @@ void RemoteReporter::sendSpan(const Span& span) } } -void RemoteReporter::flush() +void RemoteReporter::async_flush() { try { const auto flushed = _sender->flush(); if (flushed > 0) { _metrics.reporterSuccess().inc(flushed); + _cv.notify_one(); } } catch (const Transport::Exception& ex) { _metrics.reporterFailure().inc(ex.numFailed()); @@ -137,5 +138,14 @@ void RemoteReporter::flush() _lastFlush = Clock::now(); } +void RemoteReporter::flush() +{ + std::unique_lock lock(_mutex); + _cv.wait(lock, [this]() { + async_flush(); + return !_running || _queue.empty(); + }); +} + } // namespace reporters } // namespace jaegertracing diff --git a/src/jaegertracing/reporters/RemoteReporter.h b/src/jaegertracing/reporters/RemoteReporter.h index d8151f3d..a6208548 100644 --- a/src/jaegertracing/reporters/RemoteReporter.h +++ b/src/jaegertracing/reporters/RemoteReporter.h @@ -48,12 +48,14 @@ class RemoteReporter : public Reporter { void close() override; + void flush() override; + private: void sweepQueue(); void sendSpan(const Span& span); - void flush(); + void async_flush(); bool bufferFlushIntervalExpired() const { diff --git a/src/jaegertracing/reporters/Reporter.h b/src/jaegertracing/reporters/Reporter.h index 4e6ea978..d0432d7f 100644 --- a/src/jaegertracing/reporters/Reporter.h +++ b/src/jaegertracing/reporters/Reporter.h @@ -30,6 +30,8 @@ class Reporter { virtual void report(const Span& span) = 0; virtual void close() = 0; + + virtual void flush() {}; }; } // namespace reporters