From 866782d1622967e243e048e470b3d22f6dcd28b1 Mon Sep 17 00:00:00 2001 From: Craig Ringer Date: Wed, 7 Feb 2018 15:27:27 +0800 Subject: [PATCH] Expose an explicit Tracer.Flush() method Extend the OpenTracing API with an explicit jaegertracing::Tracer::flush() method to force spans to be flushed eagerly without closing the tracer. It returns only when the spans are flushed. To support this a new condition variable is introduced in the reporter to allow the main thread to wait on notification from the reporter flush thread. Fixes #53 Call flush() from Close(), but not from the Tracer dtor. So we follow the spec and ensure we flush buffers on explicit Close only. Fixes #52 Signed-off-by: Craig Ringer --- src/jaegertracing/Tracer.h | 13 +++++++++++-- src/jaegertracing/reporters/RemoteReporter.cpp | 17 +++++++++++++++-- src/jaegertracing/reporters/RemoteReporter.h | 5 ++++- src/jaegertracing/reporters/Reporter.h | 2 ++ 4 files changed, 32 insertions(+), 5 deletions(-) diff --git a/src/jaegertracing/Tracer.h b/src/jaegertracing/Tracer.h index e077c817..253775f6 100644 --- a/src/jaegertracing/Tracer.h +++ b/src/jaegertracing/Tracer.h @@ -102,7 +102,7 @@ class Tracer : public opentracing::Tracer, serviceName, sampler, reporter, logger, metrics, options)); } - ~Tracer() { Close(); } + ~Tracer() { close(); } std::unique_ptr StartSpanWithOptions(string_view operationName, @@ -181,6 +181,12 @@ class Tracer : public opentracing::Tracer, } void Close() noexcept override + { + flush(); + close(); + } + + void close() noexcept { try { _reporter->close(); @@ -192,7 +198,10 @@ class Tracer : public opentracing::Tracer, } } - void close() noexcept { Close(); } + void flush() + { + _reporter->flush(); + } const std::string& serviceName() const { return _serviceName; } diff --git a/src/jaegertracing/reporters/RemoteReporter.cpp b/src/jaegertracing/reporters/RemoteReporter.cpp index 5b115070..a7141a43 100644 --- a/src/jaegertracing/reporters/RemoteReporter.cpp +++ b/src/jaegertracing/reporters/RemoteReporter.cpp @@ -39,6 +39,7 @@ RemoteReporter::RemoteReporter(const Clock::duration& bufferFlushInterval, , _running(true) , _lastFlush(Clock::now()) , _cv() + , _cv_flush() , _mutex() , _thread() { @@ -94,8 +95,10 @@ void RemoteReporter::sweepQueue() sendSpan(span); } else if (bufferFlushIntervalExpired()) { - flush(); + async_flush(); } + /* If anyone's waiting on flush completion, notify them */ + _cv_flush.notify_one(); } catch (...) { auto logger = logging::consoleLogger(); assert(logger); @@ -122,12 +125,13 @@ void RemoteReporter::sendSpan(const Span& span) } } -void RemoteReporter::flush() +void RemoteReporter::async_flush() { try { const auto flushed = _sender->flush(); if (flushed > 0) { _metrics.reporterSuccess().inc(flushed); + _cv.notify_one(); } } catch (const Transport::Exception& ex) { _metrics.reporterFailure().inc(ex.numFailed()); @@ -137,5 +141,14 @@ void RemoteReporter::flush() _lastFlush = Clock::now(); } +void RemoteReporter::flush() +{ + std::unique_lock lock(_mutex); + _cv_flush.wait(lock, [this]() { + async_flush(); + return !_running || _queue.empty(); + }); +} + } // namespace reporters } // namespace jaegertracing diff --git a/src/jaegertracing/reporters/RemoteReporter.h b/src/jaegertracing/reporters/RemoteReporter.h index d8151f3d..19ad1a2b 100644 --- a/src/jaegertracing/reporters/RemoteReporter.h +++ b/src/jaegertracing/reporters/RemoteReporter.h @@ -48,12 +48,14 @@ class RemoteReporter : public Reporter { void close() override; + void flush() override; + private: void sweepQueue(); void sendSpan(const Span& span); - void flush(); + void async_flush(); bool bufferFlushIntervalExpired() const { @@ -70,6 +72,7 @@ class RemoteReporter : public Reporter { bool _running; Clock::time_point _lastFlush; std::condition_variable _cv; + std::condition_variable _cv_flush; std::mutex _mutex; std::thread _thread; }; diff --git a/src/jaegertracing/reporters/Reporter.h b/src/jaegertracing/reporters/Reporter.h index 4e6ea978..d0432d7f 100644 --- a/src/jaegertracing/reporters/Reporter.h +++ b/src/jaegertracing/reporters/Reporter.h @@ -30,6 +30,8 @@ class Reporter { virtual void report(const Span& span) = 0; virtual void close() = 0; + + virtual void flush() {}; }; } // namespace reporters