Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Metrics SDK] Metric aggregation temporality controls #1541

Merged
merged 9 commits into from
Aug 8, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,24 @@ class OStreamMetricExporter final : public opentelemetry::sdk::metrics::MetricEx
* export() function will send metrics data into.
* The default ostream is set to stdout
*/
explicit OStreamMetricExporter(std::ostream &sout = std::cout) noexcept;
explicit OStreamMetricExporter(std::ostream &sout = std::cout,
sdk::metrics::AggregationTemporality aggregation_temporality =
sdk::metrics::AggregationTemporality::kCumulative) noexcept;

/**
* Export
* @param data metrics data
*/
sdk::common::ExportResult Export(const sdk::metrics::ResourceMetrics &data) noexcept override;

/**
* Get the AggregationTemporality for ostream exporter
*
* @return AggregationTemporality
*/
sdk::metrics::AggregationTemporality GetAggregationTemporality(
sdk::metrics::InstrumentType instrument_type) const noexcept override;

/**
* Force flush the exporter.
*/
Expand All @@ -55,6 +65,7 @@ class OStreamMetricExporter final : public opentelemetry::sdk::metrics::MetricEx
std::ostream &sout_;
bool is_shutdown_ = false;
mutable opentelemetry::common::SpinLockMutex lock_;
sdk::metrics::AggregationTemporality aggregation_temporality_;
bool isShutdown() const noexcept;
void printInstrumentationInfoMetricData(const sdk::metrics::ScopeMetrics &info_metrics);
void printPointData(const opentelemetry::sdk::metrics::PointType &point_data);
Expand Down
12 changes: 11 additions & 1 deletion exporters/ostream/src/metric_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,17 @@ inline void printVec(std::ostream &os, Container &vec)
os << ']';
}

OStreamMetricExporter::OStreamMetricExporter(std::ostream &sout) noexcept : sout_(sout) {}
OStreamMetricExporter::OStreamMetricExporter(
std::ostream &sout,
sdk::metrics::AggregationTemporality aggregation_temporality) noexcept
: sout_(sout), aggregation_temporality_(aggregation_temporality)
{}

sdk::metrics::AggregationTemporality OStreamMetricExporter::GetAggregationTemporality(
sdk::metrics::InstrumentType instrument_type) const noexcept
{
return aggregation_temporality_;
}

sdk::common::ExportResult OStreamMetricExporter::Export(
const sdk::metrics::ResourceMetrics &data) noexcept
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ class OtlpGrpcMetricExporter : public opentelemetry::sdk::metrics::MetricExporte
*/
explicit OtlpGrpcMetricExporter(const OtlpGrpcMetricExporterOptions &options);

/**
* Get the AggregationTemporality for exporter
*
* @return AggregationTemporality
*/
sdk::metrics::AggregationTemporality GetAggregationTemporality(
sdk::metrics::InstrumentType instrument_type) const noexcept override;

opentelemetry::sdk::common::ExportResult Export(
const opentelemetry::sdk::metrics::ResourceMetrics &data) noexcept override;

Expand All @@ -52,6 +60,9 @@ class OtlpGrpcMetricExporter : public opentelemetry::sdk::metrics::MetricExporte
// The configuration options associated with this exporter.
const OtlpGrpcMetricExporterOptions options_;

// Aggregation Temporality selector
const sdk::metrics::AggregationTemporalitySelector aggregation_temporality_selector_;

// For testing
friend class OtlpGrpcExporterTestPeer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@ namespace otlp
*/
struct OtlpGrpcMetricExporterOptions : public OtlpGrpcExporterOptions
{
opentelemetry::sdk::metrics::AggregationTemporality aggregation_temporality =
opentelemetry::sdk::metrics::AggregationTemporality::kDelta;

// Preferred Aggregation Temporality
sdk::metrics::AggregationTemporality aggregation_temporality =
sdk::metrics::AggregationTemporality::kCumulative;

// opentelemetry::sdk::metrics::AggregationTemporalitySelector aggregation_temporality_selector =
lalitb marked this conversation as resolved.
Show resolved Hide resolved
// OtlpMetricUtils::ChooseTemporalitySelector();
};

} // namespace otlp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
# include "opentelemetry/exporters/otlp/otlp_http_client.h"

# include "opentelemetry/exporters/otlp/otlp_environment.h"
# include "opentelemetry/exporters/otlp/otlp_metric_utils.h"

# include <chrono>
# include <cstddef>
Expand Down Expand Up @@ -52,6 +53,10 @@ struct OtlpHttpMetricExporterOptions
// Additional HTTP headers
OtlpHeaders http_headers = GetOtlpDefaultMetricHeaders();

// Preferred Aggregation Temporality
sdk::metrics::AggregationTemporality aggregation_temporality =
sdk::metrics::AggregationTemporality::kCumulative;

# ifdef ENABLE_ASYNC_EXPORT
// Concurrent requests
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#otlpgrpc-concurrent-requests
Expand Down Expand Up @@ -79,6 +84,14 @@ class OtlpHttpMetricExporter final : public opentelemetry::sdk::metrics::MetricE
*/
OtlpHttpMetricExporter(const OtlpHttpMetricExporterOptions &options);

/**
* Get the AggregationTemporality for exporter
*
* @return AggregationTemporality
*/
sdk::metrics::AggregationTemporality GetAggregationTemporality(
sdk::metrics::InstrumentType instrument_type) const noexcept override;

opentelemetry::sdk::common::ExportResult Export(
const opentelemetry::sdk::metrics::ResourceMetrics &data) noexcept override;

Expand All @@ -95,6 +108,9 @@ class OtlpHttpMetricExporter final : public opentelemetry::sdk::metrics::MetricE
// Configuration options for the exporter
const OtlpHttpMetricExporterOptions options_;

// Aggregation Temporality Selector
const sdk::metrics::AggregationTemporalitySelector aggregation_temporality_selector_;

// Object that stores the HTTP sessions that have been created
std::unique_ptr<OtlpHttpClient> http_client_;
// For testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ class OtlpMetricUtils
static void PopulateRequest(
const opentelemetry::sdk::metrics::ResourceMetrics &data,
proto::collector::metrics::v1::ExportMetricsServiceRequest *request) noexcept;

static sdk::metrics::AggregationTemporalitySelector ChooseTemporalitySelector(
sdk::metrics::AggregationTemporality preferred_aggregation_temporality) noexcept;
static sdk::metrics::AggregationTemporality DeltaTemporalitySelector(
sdk::metrics::InstrumentType instrument_type) noexcept;
static sdk::metrics::AggregationTemporality CumulativeTemporalitySelector(
sdk::metrics::InstrumentType instrument_type) noexcept;
};

} // namespace otlp
Expand Down
16 changes: 14 additions & 2 deletions exporters/otlp/src/otlp_grpc_metric_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,28 @@ OtlpGrpcMetricExporter::OtlpGrpcMetricExporter()
{}

OtlpGrpcMetricExporter::OtlpGrpcMetricExporter(const OtlpGrpcMetricExporterOptions &options)
: options_(options), metrics_service_stub_(MakeMetricsServiceStub(options))
: options_(options),
aggregation_temporality_selector_{
OtlpMetricUtils::ChooseTemporalitySelector(options_.aggregation_temporality)},
metrics_service_stub_(MakeMetricsServiceStub(options))
{}

OtlpGrpcMetricExporter::OtlpGrpcMetricExporter(
std::unique_ptr<proto::collector::metrics::v1::MetricsService::StubInterface> stub)
: options_(OtlpGrpcMetricExporterOptions()), metrics_service_stub_(std::move(stub))
: options_(OtlpGrpcMetricExporterOptions()),
aggregation_temporality_selector_{
OtlpMetricUtils::ChooseTemporalitySelector(options_.aggregation_temporality)},
metrics_service_stub_(std::move(stub))
{}

// ----------------------------- Exporter methods ------------------------------

sdk::metrics::AggregationTemporality OtlpGrpcMetricExporter::GetAggregationTemporality(
sdk::metrics::InstrumentType instrument_type) const noexcept
{
return aggregation_temporality_selector_(instrument_type);
}

opentelemetry::sdk::common::ExportResult OtlpGrpcMetricExporter::Export(
const opentelemetry::sdk::metrics::ResourceMetrics &data) noexcept
{
Expand Down
14 changes: 13 additions & 1 deletion exporters/otlp/src/otlp_http_metric_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ OtlpHttpMetricExporter::OtlpHttpMetricExporter()

OtlpHttpMetricExporter::OtlpHttpMetricExporter(const OtlpHttpMetricExporterOptions &options)
: options_(options),
aggregation_temporality_selector_{
OtlpMetricUtils::ChooseTemporalitySelector(options_.aggregation_temporality)},
http_client_(new OtlpHttpClient(OtlpHttpClientOptions(options.url,
options.content_type,
options.json_bytes_mapping,
Expand All @@ -44,7 +46,10 @@ OtlpHttpMetricExporter::OtlpHttpMetricExporter(const OtlpHttpMetricExporterOptio
{}

OtlpHttpMetricExporter::OtlpHttpMetricExporter(std::unique_ptr<OtlpHttpClient> http_client)
: options_(OtlpHttpMetricExporterOptions()), http_client_(std::move(http_client))
: options_(OtlpHttpMetricExporterOptions()),
aggregation_temporality_selector_{
OtlpMetricUtils::ChooseTemporalitySelector(options_.aggregation_temporality)},
http_client_(std::move(http_client))
{
OtlpHttpMetricExporterOptions &options = const_cast<OtlpHttpMetricExporterOptions &>(options_);
options.url = http_client_->GetOptions().url;
Expand All @@ -61,6 +66,13 @@ OtlpHttpMetricExporter::OtlpHttpMetricExporter(std::unique_ptr<OtlpHttpClient> h
}
// ----------------------------- Exporter methods ------------------------------

sdk::metrics::AggregationTemporality OtlpHttpMetricExporter::GetAggregationTemporality(
sdk::metrics::InstrumentType instrument_type) const noexcept
{

return aggregation_temporality_selector_(instrument_type);
}

opentelemetry::sdk::common::ExportResult OtlpHttpMetricExporter::Export(
const opentelemetry::sdk::metrics::ResourceMetrics &data) noexcept
{
Expand Down
35 changes: 35 additions & 0 deletions exporters/otlp/src/otlp_metric_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,41 @@ void OtlpMetricUtils::PopulateRequest(
auto resource_metrics = request->add_resource_metrics();
PopulateResourceMetrics(data, resource_metrics);
}

sdk::metrics::AggregationTemporalitySelector OtlpMetricUtils::ChooseTemporalitySelector(
sdk::metrics::AggregationTemporality preferred_aggregation_temporality) noexcept
{
if (preferred_aggregation_temporality == sdk::metrics::AggregationTemporality::kDelta)
{
return DeltaTemporalitySelector;
}
return CumulativeTemporalitySelector;
}

sdk::metrics::AggregationTemporality OtlpMetricUtils::DeltaTemporalitySelector(
sdk::metrics::InstrumentType instrument_type) noexcept
{
switch (instrument_type)
{
case sdk::metrics::InstrumentType::kCounter:
case sdk::metrics::InstrumentType::kObservableCounter:
case sdk::metrics::InstrumentType::kHistogram:
case sdk::metrics::InstrumentType::kObservableGauge:
return sdk::metrics::AggregationTemporality::kDelta;
break;
lalitb marked this conversation as resolved.
Show resolved Hide resolved
case sdk::metrics::InstrumentType::kUpDownCounter:
case sdk::metrics::InstrumentType::kObservableUpDownCounter:
return sdk::metrics::AggregationTemporality::kCumulative;
}
return sdk::metrics::AggregationTemporality::kUnspecified;
}

sdk::metrics::AggregationTemporality OtlpMetricUtils::CumulativeTemporalitySelector(
sdk::metrics::InstrumentType instrument_type) noexcept
{
return sdk::metrics::AggregationTemporality::kCumulative;
}

} // namespace otlp
} // namespace exporter
OPENTELEMETRY_END_NAMESPACE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ class PrometheusExporter : public sdk::metrics::MetricExporter
*/
PrometheusExporter(const PrometheusExporterOptions &options);

/**
* Get the AggregationTemporality for Prometheus exporter
*
* @return AggregationTemporality
*/
sdk::metrics::AggregationTemporality GetAggregationTemporality(
sdk::metrics::InstrumentType instrument_type) const noexcept override;

/**
* Exports a batch of Metric Records.
* @param records: a collection of records to export
Expand Down
7 changes: 7 additions & 0 deletions exporters/prometheus/src/exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ PrometheusExporter::PrometheusExporter() : is_shutdown_(false)
collector_ = std::unique_ptr<PrometheusCollector>(new PrometheusCollector(3));
}

sdk::metrics::AggregationTemporality PrometheusExporter::GetAggregationTemporality(
sdk::metrics::InstrumentType instrument_type) const noexcept
{
// Prometheus exporter only support Cumulative
return sdk::metrics::AggregationTemporality::kCumulative;
}

/**
* Exports a batch of Metric Records.
* @param records: a collection of records to export
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ class PeriodicExportingMetricReader : public MetricReader
{

public:
PeriodicExportingMetricReader(
std::unique_ptr<MetricExporter> exporter,
const PeriodicExportingMetricReaderOptions &option,
AggregationTemporality aggregation_temporality = AggregationTemporality::kCumulative);
PeriodicExportingMetricReader(std::unique_ptr<MetricExporter> exporter,
const PeriodicExportingMetricReaderOptions &option);

AggregationTemporality GetAggregationTemporality(
InstrumentType instrument_type) const noexcept override;

private:
bool OnForceFlush(std::chrono::microseconds timeout) noexcept override;
Expand Down
3 changes: 2 additions & 1 deletion sdk/include/opentelemetry/sdk/metrics/instruments.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ struct InstrumentDescriptor
InstrumentValueType value_type_;
};

using MetricAttributes = opentelemetry::sdk::common::OrderedAttributeMap;
using MetricAttributes = opentelemetry::sdk::common::OrderedAttributeMap;
using AggregationTemporalitySelector = std::function<AggregationTemporality(InstrumentType)>;

/*class InstrumentSelector {
public:
Expand Down
8 changes: 8 additions & 0 deletions sdk/include/opentelemetry/sdk/metrics/metric_exporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ class MetricExporter
*/
virtual opentelemetry::sdk::common::ExportResult Export(const ResourceMetrics &data) noexcept = 0;

/**
* Get the AggregationTemporality for given Instrument Type for this exporter.
*
* @return AggregationTemporality
*/
virtual AggregationTemporality GetAggregationTemporality(
InstrumentType instrument_type) const noexcept = 0;

/**
* Force flush the exporter.
*/
Expand Down
13 changes: 9 additions & 4 deletions sdk/include/opentelemetry/sdk/metrics/metric_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ namespace metrics
class MetricReader
{
public:
MetricReader(
AggregationTemporality aggregation_temporality = AggregationTemporality::kCumulative);
MetricReader();

void SetMetricProducer(MetricProducer *metric_producer);

Expand All @@ -36,7 +35,14 @@ class MetricReader
*/
bool Collect(nostd::function_ref<bool(ResourceMetrics &metric_data)> callback) noexcept;

AggregationTemporality GetAggregationTemporality() const noexcept;
/**
* Get the AggregationTemporality for given Instrument Type for this reader.
*
* @return AggregationTemporality
*/

virtual AggregationTemporality GetAggregationTemporality(
InstrumentType instrument_type) const noexcept = 0;

/**
* Shutdown the meter reader.
Expand All @@ -62,7 +68,6 @@ class MetricReader

private:
MetricProducer *metric_producer_;
AggregationTemporality aggregation_temporality_;
mutable opentelemetry::common::SpinLockMutex lock_;
bool shutdown_;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ class MeterContext;
class CollectorHandle
{
public:
virtual AggregationTemporality GetAggregationTemporality() noexcept = 0;
virtual AggregationTemporality GetAggregationTemporality(
InstrumentType instrument_type) noexcept = 0;
};

/**
Expand All @@ -33,7 +34,8 @@ class MetricCollector : public MetricProducer, public CollectorHandle
MetricCollector(std::shared_ptr<MeterContext> &&context,
std::unique_ptr<MetricReader> metric_reader);

AggregationTemporality GetAggregationTemporality() noexcept override;
AggregationTemporality GetAggregationTemporality(
InstrumentType instrument_type) noexcept override;

/**
* The callback to be called for each metric exporter. This will only be those
Expand Down
Loading