Skip to content

Commit

Permalink
Expose an explicit Tracer.Flush() method
Browse files Browse the repository at this point in the history
Extend the OpenTracing API with an explicit jaegertracing::Tracer::flush()
method to force spans to be flushed eagerly without closing the tracer. It
returns only when the spans are flushed.

To support this a new condition variable is introduced in the reporter to allow
the main thread to wait on notification from the reporter flush thread.

Fixes jaegertracing#53

Call flush() from Close(), but not from the Tracer dtor. So we follow the spec
and ensure we flush buffers on explicit Close only.

Fixes jaegertracing#52

Signed-off-by: Craig Ringer <[email protected]>
  • Loading branch information
ringerc committed Feb 9, 2018
1 parent c36adcf commit 866782d
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 5 deletions.
13 changes: 11 additions & 2 deletions src/jaegertracing/Tracer.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class Tracer : public opentracing::Tracer,
serviceName, sampler, reporter, logger, metrics, options));
}

~Tracer() { Close(); }
~Tracer() { close(); }

std::unique_ptr<opentracing::Span>
StartSpanWithOptions(string_view operationName,
Expand Down Expand Up @@ -181,6 +181,12 @@ class Tracer : public opentracing::Tracer,
}

void Close() noexcept override
{
flush();
close();
}

void close() noexcept
{
try {
_reporter->close();
Expand All @@ -192,7 +198,10 @@ class Tracer : public opentracing::Tracer,
}
}

void close() noexcept { Close(); }
void flush()
{
_reporter->flush();
}

const std::string& serviceName() const { return _serviceName; }

Expand Down
17 changes: 15 additions & 2 deletions src/jaegertracing/reporters/RemoteReporter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ RemoteReporter::RemoteReporter(const Clock::duration& bufferFlushInterval,
, _running(true)
, _lastFlush(Clock::now())
, _cv()
, _cv_flush()
, _mutex()
, _thread()
{
Expand Down Expand Up @@ -94,8 +95,10 @@ void RemoteReporter::sweepQueue()
sendSpan(span);
}
else if (bufferFlushIntervalExpired()) {
flush();
async_flush();
}
/* If anyone's waiting on flush completion, notify them */
_cv_flush.notify_one();
} catch (...) {
auto logger = logging::consoleLogger();
assert(logger);
Expand All @@ -122,12 +125,13 @@ void RemoteReporter::sendSpan(const Span& span)
}
}

void RemoteReporter::flush()
void RemoteReporter::async_flush()
{
try {
const auto flushed = _sender->flush();
if (flushed > 0) {
_metrics.reporterSuccess().inc(flushed);
_cv.notify_one();
}
} catch (const Transport::Exception& ex) {
_metrics.reporterFailure().inc(ex.numFailed());
Expand All @@ -137,5 +141,14 @@ void RemoteReporter::flush()
_lastFlush = Clock::now();
}

void RemoteReporter::flush()
{
std::unique_lock<std::mutex> lock(_mutex);
_cv_flush.wait(lock, [this]() {
async_flush();
return !_running || _queue.empty();
});
}

} // namespace reporters
} // namespace jaegertracing
5 changes: 4 additions & 1 deletion src/jaegertracing/reporters/RemoteReporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@ class RemoteReporter : public Reporter {

void close() override;

void flush() override;

private:
void sweepQueue();

void sendSpan(const Span& span);

void flush();
void async_flush();

bool bufferFlushIntervalExpired() const
{
Expand All @@ -70,6 +72,7 @@ class RemoteReporter : public Reporter {
bool _running;
Clock::time_point _lastFlush;
std::condition_variable _cv;
std::condition_variable _cv_flush;
std::mutex _mutex;
std::thread _thread;
};
Expand Down
2 changes: 2 additions & 0 deletions src/jaegertracing/reporters/Reporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class Reporter {
virtual void report(const Span& span) = 0;

virtual void close() = 0;

virtual void flush() {};
};

} // namespace reporters
Expand Down

0 comments on commit 866782d

Please sign in to comment.