Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a topK table and do sampling based on the number of requests. #14

Merged
merged 31 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
836680d
add stream_summary with space saving algorithm
samohte Dec 12, 2023
ccc5812
add simple getSamplingKey() method
samohte Dec 12, 2023
3e852c1
fix initialization: remove (unintentionally) duplicated line
samohte Dec 13, 2023
f817c13
add timer to query topK()
samohte Dec 13, 2023
a1d5a77
add initial unit test
samohte Dec 13, 2023
927e026
extend checks in validate(), extend unit test
samohte Dec 14, 2023
507c8ed
Add Sampling controller (currently not funcitonal)
samohte Dec 15, 2023
c489c0a
add initial unit test, minor cleanup
samohte Dec 18, 2023
41e283c
fix calling shouldSample().
samohte Dec 19, 2023
92c8bc7
Use calculated sampling exponent for sampling
samohte Dec 19, 2023
b956bff
set pathInfo in tracestate (only root path random set)
samohte Dec 20, 2023
612835b
FW4Tag handling: change parsing, rename variable, add unit tests
samohte Dec 20, 2023
e3abd62
add required synchronization (via mutex)
samohte Dec 20, 2023
3533704
extend test
samohte Dec 21, 2023
6f79690
move creation of SamplerConfigFetcher out of DynatraceSampler to allow
samohte Dec 22, 2023
3793a67
SamplerConfig: reset to default value as fallback if no value was
samohte Dec 22, 2023
eb3b1e4
extend test
samohte Dec 22, 2023
4c736e8
add test cases
samohte Jan 15, 2024
0af49f4
change Locking, add unit test, introduce constant for streamSummary size
samohte Jan 18, 2024
875e35c
introduce MAX_SAMPLING_EXPONENT
samohte Jan 25, 2024
815f363
add debug log
samohte Jan 25, 2024
3585ce1
move getSamplingKey() to SamplingController, add unit test
samohte Jan 26, 2024
0b7c239
split sampling_controller into .h and .cc file
samohte Jan 26, 2024
2298fee
move stream_summary from DynaTraceSampler to SamplingController
samohte Jan 29, 2024
e9c9841
calculate exponent as fallback to handle warmup phase
samohte Jan 31, 2024
996aa44
cleanup unit test
samohte Feb 2, 2024
ecd8336
remove tenant_id_ and cluster_id_ (unintentionally re-added during re…
samohte Feb 5, 2024
9f4fd46
add calculateTenantId() to allow to set tenant instead of tenant_id in
samohte Feb 5, 2024
ca06322
rename tenant_id -> tenant
samohte Feb 6, 2024
1c33d1f
extract tenant id calculation to allow unit testing
samohte Feb 6, 2024
2b883a0
implement review feedback, move class DynatraceTag from header to cc …
samohte Feb 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ SamplingResult AlwaysOnSampler::shouldSample(const absl::optional<SpanContext> p
OptRef<const Tracing::TraceContext> /*trace_context*/,
const std::vector<SpanContext>& /*links*/) {
SamplingResult result;
result.decision = Decision::RECORD_AND_SAMPLE;
result.decision = Decision::RecordAndSample;
samohte marked this conversation as resolved.
Show resolved Hide resolved
if (parent_context.has_value()) {
result.tracestate = parent_context.value().tracestate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@ envoy_cc_library(
name = "dynatrace_sampler_lib",
srcs = [
"dynatrace_sampler.cc",
"sampler_config.cc",
"sampler_config_fetcher.cc",
"sampling_controller.cc",
],
hdrs = [
"dynatrace_sampler.h",
"sampler_config.h",
"sampler_config_fetcher.h",
"sampling_controller.h",
"stream_summary.h",
],
deps = [
"//source/common/config:datasource_lib",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#include "config.h"
#include "source/extensions/tracers/opentelemetry/samplers/dynatrace/config.h"

#include <memory>

#include "envoy/extensions/tracers/opentelemetry/samplers/v3/dynatrace_sampler.pb.validate.h"

Expand All @@ -16,11 +18,14 @@ DynatraceSamplerFactory::createSampler(const Protobuf::Message& config,
Server::Configuration::TracerFactoryContext& context) {
auto mptr = Envoy::Config::Utility::translateAnyToFactoryConfig(
dynamic_cast<const ProtobufWkt::Any&>(config), context.messageValidationVisitor(), *this);
return std::make_shared<DynatraceSampler>(
MessageUtil::downcastAndValidate<
const envoy::extensions::tracers::opentelemetry::samplers::v3::DynatraceSamplerConfig&>(
*mptr, context.messageValidationVisitor()),
context);

const auto& proto_config = MessageUtil::downcastAndValidate<
const envoy::extensions::tracers::opentelemetry::samplers::v3::DynatraceSamplerConfig&>(
*mptr, context.messageValidationVisitor());

SamplerConfigFetcherPtr cf = std::make_unique<SamplerConfigFetcherImpl>(
context, proto_config.http_uri(), proto_config.token());
return std::make_shared<DynatraceSampler>(proto_config, context, std::move(cf));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace OpenTelemetry {
class DynatraceSamplerFactory : public SamplerFactory {
public:
/**
* @brief Create a Sampler which samples every span
* @brief Create a Dynatrace sampler
*
* @param context
* @return SamplerSharedPtr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,59 @@
#include <sstream>
#include <string>

#include "source/common/common/hash.h"
#include "source/common/config/datasource.h"
#include "source/extensions/tracers/opentelemetry/samplers/sampler.h"
#include "source/extensions/tracers/opentelemetry/span_context.h"
#include "source/extensions/tracers/opentelemetry/trace_state.h"

#include "absl/strings/str_cat.h"

namespace Envoy {
namespace Extensions {
namespace Tracers {
namespace OpenTelemetry {

static const char* SAMPLING_EXTRAPOLATION_SPAN_ATTRIBUTE_NAME =
"sampling_extrapolation_set_in_sampler";
namespace {

constexpr std::chrono::minutes SAMPLING_UPDATE_TIMER_DURATION{1};
const char* SAMPLING_EXTRAPOLATION_SPAN_ATTRIBUTE_NAME = "sampling_extrapolation_set_in_sampler";

} // namespace

DynatraceSampler::DynatraceSampler(
const envoy::extensions::tracers::opentelemetry::samplers::v3::DynatraceSamplerConfig& config,
Server::Configuration::TracerFactoryContext& context)
Server::Configuration::TracerFactoryContext& context,
SamplerConfigFetcherPtr sampler_config_fetcher)
: tenant_id_(config.tenant_id()), cluster_id_(config.cluster_id()),
dt_tracestate_key_(absl::StrCat(absl::string_view(config.tenant_id()), "-",
absl::string_view(config.cluster_id()), "@dt")),
sampler_config_fetcher_(context, config.http_uri(), config.token()), counter_(0) {}
sampling_controller_(std::move(sampler_config_fetcher)) {

timer_ = context.serverFactoryContext().mainThreadDispatcher().createTimer([this]() -> void {
sampling_controller_.update();
timer_->enableTimer(SAMPLING_UPDATE_TIMER_DURATION);
});
timer_->enableTimer(SAMPLING_UPDATE_TIMER_DURATION);
}

SamplingResult DynatraceSampler::shouldSample(const absl::optional<SpanContext> parent_context,
const std::string& /*trace_id*/,
const std::string& trace_id,
const std::string& /*name*/, OTelSpanKind /*kind*/,
OptRef<const Tracing::TraceContext> /*trace_context*/,
OptRef<const Tracing::TraceContext> trace_context,
const std::vector<SpanContext>& /*links*/) {

SamplingResult result;
std::map<std::string, std::string> att;

// trace_context->path() returns path and query. query part is removed in getSamplingKey()
const std::string sampling_key =
trace_context.has_value()
? sampling_controller_.getSamplingKey(trace_context->path(), trace_context->method())
: "";

sampling_controller_.offer(sampling_key);
samohte marked this conversation as resolved.
Show resolved Hide resolved

auto trace_state =
TraceState::fromHeader(parent_context.has_value() ? parent_context->tracestate() : "");

Expand All @@ -47,24 +70,19 @@ SamplingResult DynatraceSampler::shouldSample(const absl::optional<SpanContext>
std::to_string(fw4_tag.getSamplingExponent());
result.tracestate = parent_context->tracestate();
}
} else { // make a sampling decision
// this is just a demo, we sample every second request here
uint32_t current_counter = ++counter_;
bool sample;
int sampling_exponent;
if (current_counter % 2) {
sample = true;
sampling_exponent = 1;
} else {
sample = false;
sampling_exponent = 0;
}
} else {
// do a decision based on the calculated exponent
// at the moment we use a hash of the trace_id as random number
const auto hash = MurmurHash::murmurHash2(trace_id);
const auto sampling_state = sampling_controller_.getSamplingState(sampling_key);
const bool sample = sampling_state.shouldSample(hash);
const auto sampling_exponent = sampling_state.getExponent();

att[SAMPLING_EXTRAPOLATION_SPAN_ATTRIBUTE_NAME] = std::to_string(sampling_exponent);
samohte marked this conversation as resolved.
Show resolved Hide resolved

result.decision = sample ? Decision::RecordAndSample : Decision::Drop;
// create new forward tag and add it to tracestate
FW4Tag new_tag = FW4Tag::create(!sample, sampling_exponent);
FW4Tag new_tag = FW4Tag::create(!sample, sampling_exponent, static_cast<uint8_t>(hash));
trace_state = trace_state->set(dt_tracestate_key_, new_tag.asString());
result.tracestate = trace_state->toHeader();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,66 +1,83 @@
#pragma once

#include <memory>

#include "envoy/extensions/tracers/opentelemetry/samplers/v3/dynatrace_sampler.pb.h"
#include "envoy/server/factory_context.h"

#include "source/common/common/logger.h"
#include "source/common/config/datasource.h"
#include "source/extensions/tracers/opentelemetry/samplers/dynatrace/sampler_config_fetcher.h"
#include "source/extensions/tracers/opentelemetry/samplers/dynatrace/sampling_controller.h"
#include "source/extensions/tracers/opentelemetry/samplers/dynatrace/stream_summary.h"
#include "source/extensions/tracers/opentelemetry/samplers/sampler.h"

#include "absl/synchronization/mutex.h"

namespace Envoy {
namespace Extensions {
namespace Tracers {
namespace OpenTelemetry {

class FW4Tag {
public:
static FW4Tag createInvalid() { return {false, false, 0}; }
static FW4Tag createInvalid() { return {false, false, 0, 0}; }
samohte marked this conversation as resolved.
Show resolved Hide resolved

static FW4Tag create(bool ignored, int sampling_exponent) {
return {true, ignored, sampling_exponent};
static FW4Tag create(bool ignored, uint32_t sampling_exponent, uint32_t path_info) {
return {true, ignored, sampling_exponent, path_info};
}

static FW4Tag create(const std::string& value) {
std::vector<absl::string_view> tracestate_components =
absl::StrSplit(value, ';', absl::AllowEmpty());
if (tracestate_components.size() < 7) {
if (tracestate_components.size() < 8) {
return createInvalid();
}

if (tracestate_components[0] != "fw4") {
return createInvalid();
}
bool ignored = tracestate_components[5] == "1";
int sampling_exponent = std::stoi(std::string(tracestate_components[6]));
return {true, ignored, sampling_exponent};
uint32_t sampling_exponent;
uint32_t path_info;
if (absl::SimpleAtoi(tracestate_components[6], &sampling_exponent) &&
absl::SimpleHexAtoi(tracestate_components[7], &path_info)) {
return {true, ignored, sampling_exponent, path_info};
}
return createInvalid();
}

std::string asString() const {
return absl::StrCat("fw4;0;0;0;0;", ignored_ ? "1" : "0", ";", sampling_exponent_, ";0");
std::string ret = absl::StrCat("fw4;0;0;0;0;", ignored_ ? "1" : "0", ";", sampling_exponent_,
";", absl::Hex(path_info_));
return ret;
}

bool isValid() const { return valid_; };
bool isIgnored() const { return ignored_; };
int getSamplingExponent() const { return sampling_exponent_; };
uint32_t getPathInfo() const { return path_info_; };

private:
FW4Tag(bool valid, bool ignored, int sampling_exponent)
: valid_(valid), ignored_(ignored), sampling_exponent_(sampling_exponent) {}
FW4Tag(bool valid, bool ignored, uint32_t sampling_exponent, uint32_t path_info)
: valid_(valid), ignored_(ignored), sampling_exponent_(sampling_exponent),
path_info_(path_info) {}

bool valid_;
bool ignored_;
int sampling_exponent_;
uint32_t sampling_exponent_;
uint32_t path_info_;
};

/**
* @brief A Dynatrace specific sampler *
* @brief A Dynatrace specific sampler
*/
class DynatraceSampler : public Sampler, Logger::Loggable<Logger::Id::tracing> {
public:
DynatraceSampler(
const envoy::extensions::tracers::opentelemetry::samplers::v3::DynatraceSamplerConfig& config,
Server::Configuration::TracerFactoryContext& context);
Server::Configuration::TracerFactoryContext& context,
SamplerConfigFetcherPtr sampler_config_fetcher);

SamplingResult shouldSample(const absl::optional<SpanContext> parent_context,
const std::string& trace_id, const std::string& name,
Expand All @@ -74,8 +91,8 @@ class DynatraceSampler : public Sampler, Logger::Loggable<Logger::Id::tracing> {
std::string tenant_id_;
std::string cluster_id_;
std::string dt_tracestate_key_;
SamplerConfigFetcher sampler_config_fetcher_;
std::atomic<uint32_t> counter_; // request counter for dummy sampling
Event::TimerPtr timer_;
SamplingController sampling_controller_;
};

} // namespace OpenTelemetry
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#include <utility>

#include "source/common/json/json_loader.h"
#include "source/extensions/tracers/opentelemetry/samplers/dynatrace/sampler_config_fetcher.h"

namespace Envoy {
namespace Extensions {
namespace Tracers {
namespace OpenTelemetry {

void SamplerConfig::parse(const std::string& json) {
const auto result = Envoy::Json::Factory::loadFromStringNoThrow(json);
if (result.ok()) {
const auto obj = result.value();
samohte marked this conversation as resolved.
Show resolved Hide resolved
if (obj->hasObject("rootSpansPerMinute")) {
const auto value = obj->getInteger("rootSpansPerMinute", ROOT_SPANS_PER_MINUTE_DEFAULT);
root_spans_per_minute_.store(value);
samohte marked this conversation as resolved.
Show resolved Hide resolved
return;
}
(void)obj;
}
// didn't get a value, reset to default
root_spans_per_minute_.store(ROOT_SPANS_PER_MINUTE_DEFAULT);
}

} // namespace OpenTelemetry
} // namespace Tracers
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
#include <atomic>
#include <cstdint>
#include <string>
#include <utility>

#include "source/common/json/json_loader.h"

namespace Envoy {
namespace Extensions {
Expand All @@ -14,25 +11,14 @@ namespace OpenTelemetry {

class SamplerConfig {
public:
static constexpr uint64_t ROOT_SPANS_PER_MINUTE_DEFAULT = 1000;
static constexpr uint32_t ROOT_SPANS_PER_MINUTE_DEFAULT = 1000;

void parse(const std::string& json) {
root_spans_per_minute_.store(ROOT_SPANS_PER_MINUTE_DEFAULT); // reset to default
auto result = Envoy::Json::Factory::loadFromStringNoThrow(json);
if (result.ok()) {
auto obj = result.value();
if (obj->hasObject("rootSpansPerMinute")) {
auto value = obj->getInteger("rootSpansPerMinute", ROOT_SPANS_PER_MINUTE_DEFAULT);
root_spans_per_minute_.store(value);
}
(void)obj;
}
}
void parse(const std::string& json);

uint64_t getRootSpansPerMinute() const { return root_spans_per_minute_.load(); }
uint32_t getRootSpansPerMinute() const { return root_spans_per_minute_.load(); }

private:
std::atomic<uint64_t> root_spans_per_minute_ = ROOT_SPANS_PER_MINUTE_DEFAULT;
std::atomic<uint32_t> root_spans_per_minute_ = ROOT_SPANS_PER_MINUTE_DEFAULT;
};

} // namespace OpenTelemetry
Expand Down
Loading
Loading