Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track dropped spans #11

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 69 additions & 4 deletions zipkin/include/zipkin/tracer.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,41 @@ class Reporter {
virtual bool flushWithTimeout(std::chrono::system_clock::duration timeout) {
return true;
}

/**
* @return the number of spans that have been dropped since this value was
* last cleared.
*/
virtual uint64_t droppedSpanCount() const { return 0; }

/**
* In addition to getting the drop count, resets the drop count.
* @return the number of spans that have been dropped since this value was
* last reset
*/
virtual uint64_t getAndResetDroppedSpanCount() { return 0; }

/**
* @return the period of time to wait between auto flushes
*/
virtual SteadyClock::duration reportPeriod() const {
return SteadyClock::duration::zero();
}

/**
* @param report_period the period of time to wait between auto flushes
*/
virtual void setReportPeriod(SteadyClock::duration report_period){};

/**
* @return the number of spans that can be stored in the buffer
*/
virtual size_t bufferSpanCount() const { return 0; }
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we rename bufferSpanCount to maxBufferedSpans or spanBufferSize? -- the name doesn't seem quite right.


/**
* @param span_count The new size of the span buffer
*/
virtual void setBufferSpanCount(size_t span_count){};
};

typedef std::unique_ptr<Reporter> ReporterPtr;
Expand Down Expand Up @@ -80,8 +115,10 @@ class Tracer : public TracerInterface {
* @param address Pointer to a network-address object. The IP address and port
* are used in all annotations' endpoints of the spans created by the Tracer.
*/
Tracer(const std::string &service_name, const IpAddress &address)
: service_name_(service_name), address_(address), reporter_(nullptr) {}
Tracer(const std::string &service_name, const IpAddress &address,
ReporterPtr &&reporter)
: service_name_(service_name), address_(address),
reporter_(std::move(reporter)) {}

/**
* Creates a "root" Zipkin span.
Expand Down Expand Up @@ -119,9 +156,37 @@ class Tracer : public TracerInterface {
const IpAddress &address() const { return address_; }

/**
* Associates a Reporter object with this Tracer.
* @return the number of spans that have been dropped since this value was
* last cleared.
*/
uint64_t droppedSpanCount() const;

/**
* In addition to getting the drop count, resets the drop count.
* @return the number of spans that have been dropped since this value was
* last reset
*/
uint64_t getAndResetDroppedSpanCount();

/**
* @return the period of time to wait between auto flushes
*/
SteadyClock::duration reportPeriod() const;

/**
* @param report_period the period of time to wait between auto flushes
*/
void setReportPeriod(SteadyClock::duration report_period);

/**
* @return the number of spans that can be stored in the buffer
*/
size_t bufferSpanCount() const;

/**
* @param span_count The new size of the span buffer
*/
void setReporter(ReporterPtr reporter);
void setBufferSpanCount(size_t span_count);

/**
* Optional method that a concrete Reporter class can implement to flush
Expand Down
24 changes: 22 additions & 2 deletions zipkin/src/tracer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <chrono>

#include "zipkin_core_constants.h"
#include "zipkin_reporter_impl.h"
#include <zipkin/utility.h>

namespace zipkin {
Expand Down Expand Up @@ -108,7 +109,26 @@ void Tracer::reportSpan(Span &&span) {
}
}

void Tracer::setReporter(ReporterPtr reporter) {
reporter_ = std::move(reporter);
uint64_t Tracer::droppedSpanCount() const {
return reporter_->droppedSpanCount();
}

uint64_t Tracer::getAndResetDroppedSpanCount() {
return reporter_->getAndResetDroppedSpanCount();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would you think of adding a reporter accessor method to the Tracer class? Then you can get rid of all of these forwarding methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that makes sense.

}

SteadyClock::duration Tracer::reportPeriod() const {
return reporter_->reportPeriod();
}

void Tracer::setReportPeriod(SteadyClock::duration report_period) {
reporter_->setReportPeriod(report_period);
}

size_t Tracer::bufferSpanCount() const { return reporter_->bufferSpanCount(); }

void Tracer::setBufferSpanCount(size_t span_count) {
reporter_->setBufferSpanCount(span_count);
}

} // namespace zipkin
13 changes: 12 additions & 1 deletion zipkin/src/zipkin_reporter_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ void ReporterImpl::reportSpan(const Span &span) {
bool is_full;
{
std::lock_guard<std::mutex> lock(write_mutex_);
num_spans_reported_ += spans_.addSpan(span);

if (spans_.addSpan(span)) {
num_spans_reported_++;
} else {
dropped_span_count_++;
Copy link
Contributor Author

@KevinMGranger KevinMGranger Feb 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be able to have loser memory ordering because of the mutex, but that might be a premature optimization.

}
is_full = spans_.pendingSpans() == max_buffered_spans_;
}
if (is_full)
Expand All @@ -41,6 +46,12 @@ bool ReporterImpl::flushWithTimeout(
});
}

uint64_t ReporterImpl::droppedSpanCount() const { return dropped_span_count_; }

uint64_t ReporterImpl::getAndResetDroppedSpanCount() {
return dropped_span_count_.exchange(0);
}

void ReporterImpl::makeWriterExit() {
std::lock_guard<std::mutex> lock(write_mutex_);
write_exit_ = true;
Expand Down
37 changes: 37 additions & 0 deletions zipkin/src/zipkin_reporter_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,46 @@ class ReporterImpl : public Reporter {

bool flushWithTimeout(std::chrono::system_clock::duration timeout) override;

uint64_t droppedSpanCount() const override;

uint64_t getAndResetDroppedSpanCount() override;

/**
* @brief getReportPeriod Returns the period of time to wait between auto
* flushes
* @return Setting for the duration between auto flushes
*/
SteadyClock::duration reportPeriod() const override {
return reporting_period_;
}

/**
* @brief setReportPeriod Adjust the period of time to wait between auto
* flushes
* @param report_period The new size of the span buffer
*/
void setReportPeriod(SteadyClock::duration reporting_period) override {
reporting_period_ = reporting_period;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has a race condition, no? Since reporting_period_ is read from another thread.

}

/**
* @brief getBufferSpanCount Returns the number of spans that can be stored
* @return
*/
size_t bufferSpanCount() const override { return max_buffered_spans_; }

/**
* @brief setBufferSpanCount Adjust the number of spans that can be stored
* @param span_count The new size of the span buffer
*/
void setBufferSpanCount(size_t span_count) override {
max_buffered_spans_ = span_count;
}

private:
TransporterPtr transporter_;

std::atomic<uint64_t> dropped_span_count_{0};
std::mutex write_mutex_;
std::condition_variable write_cond_;
SteadyClock::duration reporting_period_;
Expand Down
44 changes: 44 additions & 0 deletions zipkin_opentracing/include/zipkin/opentracing.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,50 @@
#include <zipkin/tracer.h>

namespace zipkin {
class OtTracer : public opentracing::Tracer,
public std::enable_shared_from_this<OtTracer> {
public:
explicit OtTracer(TracerPtr &&tracer);

std::unique_ptr<opentracing::Span>
StartSpanWithOptions(opentracing::string_view operation_name,
const opentracing::StartSpanOptions &options) const
noexcept override;

opentracing::expected<void> Inject(const opentracing::SpanContext &sc,
std::ostream &writer) const override;

opentracing::expected<void>
Inject(const opentracing::SpanContext &sc,
const opentracing::TextMapWriter &writer) const override;

opentracing::expected<void>
Inject(const opentracing::SpanContext &sc,
const opentracing::HTTPHeadersWriter &writer) const override;

opentracing::expected<std::unique_ptr<opentracing::SpanContext>>
Extract(std::istream &reader) const override;

opentracing::expected<std::unique_ptr<opentracing::SpanContext>>
Extract(const opentracing::TextMapReader &reader) const override;

opentracing::expected<std::unique_ptr<opentracing::SpanContext>>
Extract(const opentracing::HTTPHeadersReader &reader) const override;

void Close() noexcept override;

private:
TracerPtr tracer_;

template <class Carrier>
opentracing::expected<void> InjectImpl(const opentracing::SpanContext &sc,
Carrier &writer) const;

template <class Carrier>
opentracing::expected<std::unique_ptr<opentracing::SpanContext>>
ExtractImpl(Carrier &reader) const;
};

struct ZipkinOtTracerOptions {
std::string collector_host = "localhost";
uint32_t collector_port = 9411;
Expand Down
Loading