diff --git a/src/jaegertracing/Tracer.h b/src/jaegertracing/Tracer.h index e077c817..253775f6 100644 --- a/src/jaegertracing/Tracer.h +++ b/src/jaegertracing/Tracer.h @@ -102,7 +102,7 @@ class Tracer : public opentracing::Tracer, serviceName, sampler, reporter, logger, metrics, options)); } - ~Tracer() { Close(); } + ~Tracer() { close(); } std::unique_ptr StartSpanWithOptions(string_view operationName, @@ -181,6 +181,12 @@ class Tracer : public opentracing::Tracer, } void Close() noexcept override + { + flush(); + close(); + } + + void close() noexcept { try { _reporter->close(); @@ -192,7 +198,10 @@ class Tracer : public opentracing::Tracer, } } - void close() noexcept { Close(); } + void flush() + { + _reporter->flush(); + } const std::string& serviceName() const { return _serviceName; } diff --git a/src/jaegertracing/reporters/RemoteReporter.cpp b/src/jaegertracing/reporters/RemoteReporter.cpp index 5b115070..a7141a43 100644 --- a/src/jaegertracing/reporters/RemoteReporter.cpp +++ b/src/jaegertracing/reporters/RemoteReporter.cpp @@ -39,6 +39,7 @@ RemoteReporter::RemoteReporter(const Clock::duration& bufferFlushInterval, , _running(true) , _lastFlush(Clock::now()) , _cv() + , _cv_flush() , _mutex() , _thread() { @@ -94,8 +95,10 @@ void RemoteReporter::sweepQueue() sendSpan(span); } else if (bufferFlushIntervalExpired()) { - flush(); + async_flush(); } + /* If anyone's waiting on flush completion, notify them */ + _cv_flush.notify_one(); } catch (...) { auto logger = logging::consoleLogger(); assert(logger); @@ -122,12 +125,13 @@ void RemoteReporter::sendSpan(const Span& span) } } -void RemoteReporter::flush() +void RemoteReporter::async_flush() { try { const auto flushed = _sender->flush(); if (flushed > 0) { _metrics.reporterSuccess().inc(flushed); + _cv.notify_one(); } } catch (const Transport::Exception& ex) { _metrics.reporterFailure().inc(ex.numFailed()); @@ -137,5 +141,14 @@ void RemoteReporter::flush() _lastFlush = Clock::now(); } +void RemoteReporter::flush() +{ + std::unique_lock lock(_mutex); + _cv_flush.wait(lock, [this]() { + async_flush(); + return !_running || _queue.empty(); + }); +} + } // namespace reporters } // namespace jaegertracing diff --git a/src/jaegertracing/reporters/RemoteReporter.h b/src/jaegertracing/reporters/RemoteReporter.h index d8151f3d..19ad1a2b 100644 --- a/src/jaegertracing/reporters/RemoteReporter.h +++ b/src/jaegertracing/reporters/RemoteReporter.h @@ -48,12 +48,14 @@ class RemoteReporter : public Reporter { void close() override; + void flush() override; + private: void sweepQueue(); void sendSpan(const Span& span); - void flush(); + void async_flush(); bool bufferFlushIntervalExpired() const { @@ -70,6 +72,7 @@ class RemoteReporter : public Reporter { bool _running; Clock::time_point _lastFlush; std::condition_variable _cv; + std::condition_variable _cv_flush; std::mutex _mutex; std::thread _thread; }; diff --git a/src/jaegertracing/reporters/Reporter.h b/src/jaegertracing/reporters/Reporter.h index 4e6ea978..d0432d7f 100644 --- a/src/jaegertracing/reporters/Reporter.h +++ b/src/jaegertracing/reporters/Reporter.h @@ -30,6 +30,8 @@ class Reporter { virtual void report(const Span& span) = 0; virtual void close() = 0; + + virtual void flush() {}; }; } // namespace reporters