From 88a894c032b15a598850dd37bcea6e6d4829b4e5 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Wed, 31 Jan 2024 17:29:29 +0100 Subject: [PATCH 01/22] New plugin: Open Telemetry Collector. --- Makefile.am | 10 + configure.ac | 6 + src/open_telemetry_collector.cc | 483 ++++++++++++++++++++++++++++++++ 3 files changed, 499 insertions(+) create mode 100644 src/open_telemetry_collector.cc diff --git a/Makefile.am b/Makefile.am index 8f39d989a9..69415dba6c 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1744,6 +1744,16 @@ openvpn_la_SOURCES = src/openvpn.c openvpn_la_LDFLAGS = $(PLUGIN_LDFLAGS) endif +if BUILD_PLUGIN_OPEN_TELEMETRY_COLLECTOR +pkglib_LTLIBRARIES += open_telemetry_collector.la +open_telemetry_collector_la_SOURCES = src/open_telemetry_collector.cc \ + opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.cc \ + opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h +open_telemetry_collector_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBGRPCPP_CPPFLAGS) +open_telemetry_collector_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBGRPCPP_LDFLAGS) +open_telemetry_collector_la_LIBADD = $(BUILD_WITH_LIBGRPCPP_LIBS) +endif + if BUILD_PLUGIN_ORACLE pkglib_LTLIBRARIES += oracle.la oracle_la_SOURCES = \ diff --git a/configure.ac b/configure.ac index 01cd8d2715..cadd9a986c 100644 --- a/configure.ac +++ b/configure.ac @@ -6810,6 +6810,7 @@ plugin_nut="$with_libupsclient" plugin_olsrd="yes" plugin_onewire="$with_libowcapi" plugin_openldap="$with_libldap" +plugin_open_telemetry_collector="yes" plugin_openvpn="yes" plugin_oracle="$with_oracle" plugin_ovs_events="no" @@ -7145,18 +7146,22 @@ fi if test "x$GRPC_CPP_PLUGIN" = "x"; then plugin_grpc="no (grpc_cpp_plugin not found)" + plugin_open_telemetry_collector="no (grpc_cpp_plugin not found)" plugin_write_open_telemetry="no (grpc_cpp_plugin not found)" fi if test "x$have_protoc3" != "xyes"; then plugin_grpc="no (protoc3 not found)" + plugin_open_telemetry_collector="no (protoc3 not found)" plugin_write_open_telemetry="no (protoc3 not found)" fi if test "x$with_libprotobuf" != "xyes"; then plugin_grpc="no (libprotobuf not found)" + plugin_open_telemetry_collector="no (libprotobuf not found)" plugin_write_open_telemetry="no (libprotobuf not found)" fi if test "x$with_libgrpcpp" != "xyes"; then plugin_grpc="no (libgrpc++ not found)" + plugin_open_telemetry_collector="no (libgrpc++ not found)" plugin_write_open_telemetry="no (libgrpc++ not found)" fi if test "x$protoc3_optional" = "xno"; then @@ -7517,6 +7522,7 @@ AC_PLUGIN([olsrd], [$plugin_olsrd], [olsrd statistics] AC_PLUGIN([onewire], [$plugin_onewire], [OneWire sensor statistics]) AC_PLUGIN([openldap], [$plugin_openldap], [OpenLDAP statistics]) AC_PLUGIN([openvpn], [$plugin_openvpn], [OpenVPN client statistics]) +AC_PLUGIN([open_telemetry_collector], [$plugin_open_telemetry_collector], [OpenTelemetry Collector]) AC_PLUGIN([oracle], [$plugin_oracle], [Oracle plugin]) AC_PLUGIN([ovs_events], [$plugin_ovs_events], [OVS events plugin]) AC_PLUGIN([ovs_stats], [$plugin_ovs_stats], [OVS statistics plugin]) diff --git a/src/open_telemetry_collector.cc b/src/open_telemetry_collector.cc new file mode 100644 index 0000000000..1024609452 --- /dev/null +++ b/src/open_telemetry_collector.cc @@ -0,0 +1,483 @@ +/** + * collectd - src/grpc.cc + * Copyright (C) 2015-2016 Sebastian Harl + * Copyright (C) 2016-2024 Florian octo Forster + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * Authors: + * Sebastian Harl + * Florian octo Forster + **/ + +extern "C" { +#include "daemon/collectd.h" +#include "daemon/metric.h" +#include "daemon/utils_cache.h" +#include "utils/common/common.h" +} + +#include +#include + +#include + +#include "opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h" +#include "opentelemetry/proto/common/v1/common.pb.h" +#include "opentelemetry/proto/metrics/v1/metrics.pb.h" +#include "opentelemetry/proto/resource/v1/resource.pb.h" + +using opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest; +using opentelemetry::proto::collector::metrics::v1:: + ExportMetricsServiceResponse; +using opentelemetry::proto::collector::metrics::v1::MetricsService; +using opentelemetry::proto::common::v1::AnyValue; +using opentelemetry::proto::common::v1::KeyValue; +using opentelemetry::proto::metrics::v1::Gauge; +using opentelemetry::proto::metrics::v1::Metric; +using opentelemetry::proto::metrics::v1::NumberDataPoint; +using opentelemetry::proto::metrics::v1::ResourceMetrics; +using opentelemetry::proto::metrics::v1::Sum; +using opentelemetry::proto::resource::v1::Resource; + +/* + * private types + */ + +struct Listener { + grpc::string addr; + grpc::string port; + + grpc::SslServerCredentialsOptions *ssl; +}; +static std::vector listeners; +static grpc::string default_addr("0.0.0.0:50051"); + +/* + * helper functions + */ +static grpc::string read_file(const char *filename) { + std::ifstream f; + grpc::string s, content; + + f.open(filename); + if (!f.is_open()) { + ERROR("open_telemetry_collector: Failed to open '%s'", filename); + return ""; + } + + while (std::getline(f, s)) { + content += s; + content.push_back('\n'); + } + f.close(); + return content; +} /* read_file */ + +/* + * proto conversion + */ +static grpc::Status wrap_error(int err) { + if (!err) { + return grpc::Status::OK; + } + return grpc::Status(grpc::StatusCode::INTERNAL, "wrapped internal error"); +} + +static grpc::Status unmarshal_label_pair(KeyValue kv, label_set_t *labels) { + switch (kv.value().value_case()) { + case AnyValue::kStringValue: + return wrap_error(label_set_add(labels, kv.key().c_str(), + kv.value().string_value().c_str())); + case AnyValue::kBoolValue: + return wrap_error(label_set_add( + labels, kv.key().c_str(), kv.value().bool_value() ? "true" : "false")); + case AnyValue::kIntValue: { + char buf[64] = {0}; + snprintf(buf, sizeof(buf), "%" PRId64, kv.value().int_value()); + return wrap_error(label_set_add(labels, kv.key().c_str(), buf)); + } + case AnyValue::kDoubleValue: { + char buf[64] = {0}; + snprintf(buf, sizeof(buf), GAUGE_FORMAT, kv.value().double_value()); + return wrap_error(label_set_add(labels, kv.key().c_str(), buf)); + } + case AnyValue::kArrayValue: + return grpc::Status(grpc::StatusCode::UNIMPLEMENTED, + "array labels are not supported"); + case AnyValue::kKvlistValue: + return grpc::Status(grpc::StatusCode::UNIMPLEMENTED, + "key/value list labels are not supported"); + case AnyValue::kBytesValue: + return grpc::Status(grpc::StatusCode::UNIMPLEMENTED, + "byte labels are not supported"); + default: + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "unexpected label value type"); + } +} + +static grpc::Status unmarshal_data_point(NumberDataPoint dp, + metric_family_t *fam, + bool is_cumulative) { + metric_t m = { + .time = NS_TO_CDTIME_T(dp.time_unix_nano()), + }; + + switch (dp.value_case()) { + case NumberDataPoint::kAsDouble: +// TODO(octo): enable once floating point counters have been merged (#4266) +#if 0 + if (is_cumulative) { + fam->type = METRIC_TYPE_FPCOUNTER; + m.value.fpcounter = dp.as_double(); + break; + } +#endif + m.value.gauge = dp.as_double(); + break; + case NumberDataPoint::kAsInt: + if (is_cumulative) { + fam->type = METRIC_TYPE_COUNTER; + m.value.counter = (counter_t)dp.as_int(); + break; + } + m.value.gauge = (gauge_t)dp.as_int(); + break; + default: + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "unexpected data point value type"); + } + + for (auto kv : dp.attributes()) { + grpc::Status s = unmarshal_label_pair(kv, &m.label); + if (!s.ok()) { + metric_reset(&m); + return s; + } + } + + // TODO(octo): get the first metric time from the cache and detect counter + // resets. + // TODO(octo): when aggregation temporality == AGGREGATION_TEMPORALITY_DELTA, + // get the previous value from the cache and add to it. + + int err = metric_family_metric_append(fam, m); + + metric_reset(&m); + return wrap_error(err); +} + +static grpc::Status unmarshal_gauge_metric(Gauge g, metric_family_t *fam) { + for (auto db : g.data_points()) { + grpc::Status s = unmarshal_data_point(db, fam, false); + if (!s.ok()) { + return s; + } + } + + return grpc::Status::OK; +} + +static grpc::Status unmarshal_sum_metric(Sum s, metric_family_t *fam) { + // TODO(octo): check is_monotonic and aggregation temporality + for (auto db : s.data_points()) { + grpc::Status s = unmarshal_data_point(db, fam, true); + if (!s.ok()) { + return s; + } + } + + return grpc::Status::OK; +} + +static grpc::Status dispatch_metric(Metric mpb, label_set_t resource) { + metric_family_t fam = { + .name = (char *)mpb.name().c_str(), + .help = (char *)mpb.description().c_str(), + .unit = (char *)mpb.unit().c_str(), + .resource = resource, + }; + + switch (mpb.data_case()) { + case Metric::kGauge: { + fam.type = METRIC_TYPE_GAUGE; + grpc::Status s = unmarshal_gauge_metric(mpb.gauge(), &fam); + if (!s.ok()) { + metric_family_metric_reset(&fam); + return s; + } + break; + } + case Metric::kSum: { + grpc::Status s = unmarshal_sum_metric(mpb.sum(), &fam); + if (!s.ok()) { + metric_family_metric_reset(&fam); + return s; + } + break; + } + case Metric::kHistogram: + case Metric::kExponentialHistogram: + return grpc::Status(grpc::StatusCode::UNIMPLEMENTED, + "histogram metrics are not supported"); + case Metric::kSummary: + return grpc::Status(grpc::StatusCode::UNIMPLEMENTED, + "summary metrics are not supported"); + default: + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "unexpected data type"); + } + + int err = plugin_dispatch_metric_family(&fam); + + metric_family_metric_reset(&fam); + return wrap_error(err); +} + +static grpc::Status unmarshal_resource(Resource rpb, label_set_t *resource) { + for (auto kv : rpb.attributes()) { + grpc::Status s = unmarshal_label_pair(kv, resource); + if (!s.ok()) { + return s; + } + } + + return grpc::Status::OK; +} + +static grpc::Status dispatch_resource_metrics(ResourceMetrics rm) { + label_set_t resource = {0}; + + grpc::Status s = unmarshal_resource(rm.resource(), &resource); + if (!s.ok()) { + return s; + } + + for (auto sm : rm.scope_metrics()) { + for (auto m : sm.metrics()) { + grpc::Status s = dispatch_metric(m, resource); + if (!s.ok()) { + return s; + } + } + } + + label_set_reset(&resource); + return grpc::Status::OK; +} + +/* + * OpenTelemetry MetricsService + */ +class OTMetricsService : public MetricsService::Service { +public: + grpc::Status Export(grpc::ClientContext *context, + const ExportMetricsServiceRequest &req, + ExportMetricsServiceResponse *resp) { + for (auto rm : req.resource_metrics()) { + grpc::Status s = dispatch_resource_metrics(rm); + if (!s.ok()) { + return s; + } + } + + return grpc::Status(grpc::StatusCode::UNIMPLEMENTED, + "Export is not implemented yet"); + } +}; + +/* + * gRPC server implementation + */ +class CollectorServer final { +public: + void Start() { + auto auth = grpc::InsecureServerCredentials(); + + grpc::ServerBuilder builder; + + if (listeners.empty()) { + builder.AddListeningPort(default_addr, auth); + INFO("open_telemetry_collector: Listening on %s", default_addr.c_str()); + } else { + for (auto l : listeners) { + grpc::string addr = l.addr + ":" + l.port; + + auto use_ssl = grpc::string(""); + auto a = auth; + if (l.ssl != nullptr) { + use_ssl = grpc::string(" (SSL enabled)"); + a = grpc::SslServerCredentials(*l.ssl); + } + + builder.AddListeningPort(addr, a); + INFO("open_telemetry_collector: Listening on %s%s", addr.c_str(), + use_ssl.c_str()); + } + } + + builder.RegisterService(&metrics_service); + + server = builder.BuildAndStart(); + } + + void Shutdown() { server->Shutdown(); } + +private: + OTMetricsService metrics_service; + + std::unique_ptr server; +}; + +static CollectorServer *server = nullptr; + +/* + * collectd plugin interface + */ +extern "C" { +static int otelcol_config_listen(oconfig_item_t *ci) { + if ((ci->values_num != 2) || (ci->values[0].type != OCONFIG_TYPE_STRING) || + (ci->values[1].type != OCONFIG_TYPE_STRING)) { + ERROR("open_telemetry_collector: The `%s` config option needs exactly " + "two string argument (address and port).", + ci->key); + return -1; + } + + auto listener = Listener(); + listener.addr = grpc::string(ci->values[0].value.string); + listener.port = grpc::string(ci->values[1].value.string); + listener.ssl = nullptr; + + auto ssl_opts = new grpc::SslServerCredentialsOptions( + GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY); + grpc::SslServerCredentialsOptions::PemKeyCertPair pkcp = {}; + bool use_ssl = false; + + for (int i = 0; i < ci->children_num; i++) { + oconfig_item_t *child = ci->children + i; + + if (!strcasecmp("EnableSSL", child->key)) { + if (cf_util_get_boolean(child, &use_ssl)) { + ERROR("open_telemetry_collector: Option `%s` expects a boolean value", + child->key); + return -1; + } + } else if (!strcasecmp("SSLCACertificateFile", child->key)) { + char *certs = NULL; + if (cf_util_get_string(child, &certs)) { + ERROR("open_telemetry_collector: Option `%s` expects a string value", + child->key); + return -1; + } + ssl_opts->pem_root_certs = read_file(certs); + } else if (!strcasecmp("SSLCertificateKeyFile", child->key)) { + char *key = NULL; + if (cf_util_get_string(child, &key)) { + ERROR("open_telemetry_collector: Option `%s` expects a string value", + child->key); + return -1; + } + pkcp.private_key = read_file(key); + } else if (!strcasecmp("SSLCertificateFile", child->key)) { + char *cert = NULL; + if (cf_util_get_string(child, &cert)) { + ERROR("open_telemetry_collector: Option `%s` expects a string value", + child->key); + return -1; + } + pkcp.cert_chain = read_file(cert); + } else if (!strcasecmp("VerifyPeer", child->key)) { + bool verify = false; + if (cf_util_get_boolean(child, &verify)) { + return -1; + } + ssl_opts->client_certificate_request = + verify ? GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY + : GRPC_SSL_DONT_REQUEST_CLIENT_CERTIFICATE; + } else { + WARNING( + "open_telemetry_collector: Option `%s` not allowed in <%s> block.", + child->key, ci->key); + } + } + + ssl_opts->pem_key_cert_pairs.push_back(pkcp); + if (use_ssl) + listener.ssl = ssl_opts; + else + delete (ssl_opts); + + listeners.push_back(listener); + return 0; +} /* otelcol_config_listen() */ + +static int otelcol_config(oconfig_item_t *ci) { + for (int i = 0; i < ci->children_num; i++) { + oconfig_item_t *child = ci->children + i; + + if (!strcasecmp("Listen", child->key)) { + if (otelcol_config_listen(child)) { + return -1; + } + } + + else { + WARNING("open_telemetry_collector: Option `%s` not allowed here.", + child->key); + } + } + + return 0; +} /* otelcol_config() */ + +static int otelcol_init(void) { + if (server) { + return 0; + } + + server = new CollectorServer(); + if (!server) { + ERROR("open_telemetry_collector: Failed to create server"); + return -1; + } + + server->Start(); + return 0; +} /* otelcol_init() */ + +static int otelcol_shutdown(void) { + if (!server) + return 0; + + server->Shutdown(); + + delete server; + server = nullptr; + + return 0; +} /* otelcol_shutdown() */ + +void module_register(void) { + plugin_register_complex_config("open_telemetry_collector", otelcol_config); + plugin_register_init("open_telemetry_collector", otelcol_init); + plugin_register_shutdown("open_telemetry_collector", otelcol_shutdown); +} /* module_register() */ +} /* extern "C" */ From 9fb91a5a3d9e766dc8d9a6b83c463ff951a3fa4f Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Thu, 1 Feb 2024 07:57:10 +0100 Subject: [PATCH 02/22] src/daemon/utils_cache.c: Consistently return `ENOENT` when metrics are not in the cache. --- src/daemon/utils_cache.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/daemon/utils_cache.c b/src/daemon/utils_cache.c index 780591c7e2..ea0a1a7dbd 100644 --- a/src/daemon/utils_cache.c +++ b/src/daemon/utils_cache.c @@ -523,13 +523,13 @@ int uc_get_value_by_name(const char *name, value_t *ret_values) { /* remove missing values from getval */ if (ce->state == STATE_MISSING) { - status = -1; + status = EAGAIN; } else { *ret_values = ce->last_value; } } else { DEBUG("utils_cache: uc_get_value_by_name: No such value: %s", name); - status = -1; + status = ENOENT; } pthread_mutex_unlock(&cache_lock); @@ -739,7 +739,7 @@ int uc_get_history_by_name(const char *name, gauge_t *ret_history, status = c_avl_get(cache_tree, name, (void *)&ce); if (status != 0) { pthread_mutex_unlock(&cache_lock); - return -ENOENT; + return ENOENT; } /* Check if there are enough values available. If not, increase the buffer * size. */ @@ -749,7 +749,7 @@ int uc_get_history_by_name(const char *name, gauge_t *ret_history, tmp = realloc(ce->history, sizeof(*ce->history) * num_steps); if (tmp == NULL) { pthread_mutex_unlock(&cache_lock); - return -ENOMEM; + return ENOMEM; } for (size_t i = ce->history_length; i < num_steps; i++) From e4065ea0ad02be26523c8aef48c1d184acfdebcf Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Thu, 1 Feb 2024 07:58:29 +0100 Subject: [PATCH 03/22] open_telemetry_collector: Handle `AGGREGATION_TEMPORALITY_DELTA` correctly. --- src/open_telemetry_collector.cc | 51 ++++++++++++++++++++++++--------- 1 file changed, 38 insertions(+), 13 deletions(-) diff --git a/src/open_telemetry_collector.cc b/src/open_telemetry_collector.cc index 1024609452..e4af1fe037 100644 --- a/src/open_telemetry_collector.cc +++ b/src/open_telemetry_collector.cc @@ -49,6 +49,10 @@ using opentelemetry::proto::collector::metrics::v1:: using opentelemetry::proto::collector::metrics::v1::MetricsService; using opentelemetry::proto::common::v1::AnyValue; using opentelemetry::proto::common::v1::KeyValue; +using opentelemetry::proto::metrics::v1::AGGREGATION_TEMPORALITY_CUMULATIVE; +using opentelemetry::proto::metrics::v1::AGGREGATION_TEMPORALITY_DELTA; +using opentelemetry::proto::metrics::v1::AGGREGATION_TEMPORALITY_UNSPECIFIED; +using opentelemetry::proto::metrics::v1::AggregationTemporality; using opentelemetry::proto::metrics::v1::Gauge; using opentelemetry::proto::metrics::v1::Metric; using opentelemetry::proto::metrics::v1::NumberDataPoint; @@ -135,27 +139,48 @@ static grpc::Status unmarshal_label_pair(KeyValue kv, label_set_t *labels) { static grpc::Status unmarshal_data_point(NumberDataPoint dp, metric_family_t *fam, - bool is_cumulative) { + AggregationTemporality agg) { metric_t m = { + .family = fam, // family needs to be populated for uc_get_value(). .time = NS_TO_CDTIME_T(dp.time_unix_nano()), }; + bool is_cumulative = (agg == AGGREGATION_TEMPORALITY_DELTA || + agg == AGGREGATION_TEMPORALITY_CUMULATIVE); + + value_t offset = {0}; + if (agg == AGGREGATION_TEMPORALITY_DELTA) { + int err = uc_get_value(&m, &offset); + switch (err) { + case ENOENT: + case EAGAIN: + offset = (value_t){0}; + break; + case 0: + // no-op + break; + default: + return wrap_error(err); + } + } + switch (dp.value_case()) { case NumberDataPoint::kAsDouble: -// TODO(octo): enable once floating point counters have been merged (#4266) -#if 0 if (is_cumulative) { - fam->type = METRIC_TYPE_FPCOUNTER; - m.value.fpcounter = dp.as_double(); + // TODO(octo): enable once floating point counters have been merged + // (#4266) + // fam->type = METRIC_TYPE_FPCOUNTER; + // m.value.fpcounter = dp.as_double(); + fam->type = METRIC_TYPE_COUNTER; + m.value.counter = offset.counter + (counter_t)dp.as_double(); break; } -#endif m.value.gauge = dp.as_double(); break; case NumberDataPoint::kAsInt: if (is_cumulative) { fam->type = METRIC_TYPE_COUNTER; - m.value.counter = (counter_t)dp.as_int(); + m.value.counter = offset.counter + (counter_t)dp.as_int(); break; } m.value.gauge = (gauge_t)dp.as_int(); @@ -175,8 +200,6 @@ static grpc::Status unmarshal_data_point(NumberDataPoint dp, // TODO(octo): get the first metric time from the cache and detect counter // resets. - // TODO(octo): when aggregation temporality == AGGREGATION_TEMPORALITY_DELTA, - // get the previous value from the cache and add to it. int err = metric_family_metric_append(fam, m); @@ -186,7 +209,8 @@ static grpc::Status unmarshal_data_point(NumberDataPoint dp, static grpc::Status unmarshal_gauge_metric(Gauge g, metric_family_t *fam) { for (auto db : g.data_points()) { - grpc::Status s = unmarshal_data_point(db, fam, false); + grpc::Status s = + unmarshal_data_point(db, fam, AGGREGATION_TEMPORALITY_UNSPECIFIED); if (!s.ok()) { return s; } @@ -195,10 +219,11 @@ static grpc::Status unmarshal_gauge_metric(Gauge g, metric_family_t *fam) { return grpc::Status::OK; } -static grpc::Status unmarshal_sum_metric(Sum s, metric_family_t *fam) { +static grpc::Status unmarshal_sum_metric(Sum sum, metric_family_t *fam) { // TODO(octo): check is_monotonic and aggregation temporality - for (auto db : s.data_points()) { - grpc::Status s = unmarshal_data_point(db, fam, true); + for (auto db : sum.data_points()) { + grpc::Status s = + unmarshal_data_point(db, fam, sum.aggregation_temporality()); if (!s.ok()) { return s; } From 1f17a42cad75e61b53d4e35424cbfabdd4cda08f Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Thu, 1 Feb 2024 12:25:46 +0100 Subject: [PATCH 04/22] open_telemetry_collector plugin: Use the right kind of context. --- src/open_telemetry_collector.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/open_telemetry_collector.cc b/src/open_telemetry_collector.cc index e4af1fe037..8efd1020f9 100644 --- a/src/open_telemetry_collector.cc +++ b/src/open_telemetry_collector.cc @@ -313,8 +313,8 @@ static grpc::Status dispatch_resource_metrics(ResourceMetrics rm) { */ class OTMetricsService : public MetricsService::Service { public: - grpc::Status Export(grpc::ClientContext *context, - const ExportMetricsServiceRequest &req, + grpc::Status Export(grpc::ServerContext *context, + const ExportMetricsServiceRequest *req, ExportMetricsServiceResponse *resp) { for (auto rm : req.resource_metrics()) { grpc::Status s = dispatch_resource_metrics(rm); From 8b4eb62e3eee827e7ae977b411ae92f06d25149b Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Thu, 1 Feb 2024 12:28:02 +0100 Subject: [PATCH 05/22] open_telemetry_collector plugin: Report failing metrics as "partial success". --- src/open_telemetry_collector.cc | 57 +++++++++++++++++++++++++-------- 1 file changed, 43 insertions(+), 14 deletions(-) diff --git a/src/open_telemetry_collector.cc b/src/open_telemetry_collector.cc index 8efd1020f9..b4326747a9 100644 --- a/src/open_telemetry_collector.cc +++ b/src/open_telemetry_collector.cc @@ -39,10 +39,12 @@ extern "C" { #include #include "opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h" +#include "opentelemetry/proto/collector/metrics/v1/metrics_service.pb.h" #include "opentelemetry/proto/common/v1/common.pb.h" #include "opentelemetry/proto/metrics/v1/metrics.pb.h" #include "opentelemetry/proto/resource/v1/resource.pb.h" +using opentelemetry::proto::collector::metrics::v1::ExportMetricsPartialSuccess; using opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest; using opentelemetry::proto::collector::metrics::v1:: ExportMetricsServiceResponse; @@ -232,7 +234,23 @@ static grpc::Status unmarshal_sum_metric(Sum sum, metric_family_t *fam) { return grpc::Status::OK; } -static grpc::Status dispatch_metric(Metric mpb, label_set_t resource) { +static grpc::Status reject_data_points(std::string msg, int num, + ExportMetricsPartialSuccess *ps) { + int64_t rejected = ps->rejected_data_points(); + rejected += (int64_t)num; + ps->set_rejected_data_points(rejected); + + std::string *error_message = ps->mutable_error_message(); + if (!error_message->empty()) { + error_message->append(", "); + } + error_message->append(msg); + + return grpc::Status::OK; +} + +static grpc::Status dispatch_metric(Metric mpb, label_set_t resource, + ExportMetricsPartialSuccess *ps) { metric_family_t fam = { .name = (char *)mpb.name().c_str(), .help = (char *)mpb.description().c_str(), @@ -246,7 +264,8 @@ static grpc::Status dispatch_metric(Metric mpb, label_set_t resource) { grpc::Status s = unmarshal_gauge_metric(mpb.gauge(), &fam); if (!s.ok()) { metric_family_metric_reset(&fam); - return s; + return reject_data_points(s.error_message(), + mpb.gauge().data_points().size(), ps); } break; } @@ -254,17 +273,23 @@ static grpc::Status dispatch_metric(Metric mpb, label_set_t resource) { grpc::Status s = unmarshal_sum_metric(mpb.sum(), &fam); if (!s.ok()) { metric_family_metric_reset(&fam); - return s; + return reject_data_points(s.error_message(), + mpb.sum().data_points().size(), ps); } break; } case Metric::kHistogram: + return reject_data_points( + std::string("histogram metrics are not supported"), + mpb.histogram().data_points().size(), ps); case Metric::kExponentialHistogram: - return grpc::Status(grpc::StatusCode::UNIMPLEMENTED, - "histogram metrics are not supported"); - case Metric::kSummary: - return grpc::Status(grpc::StatusCode::UNIMPLEMENTED, - "summary metrics are not supported"); + return reject_data_points( + std::string("exponential histogram metrics are not supported"), + mpb.exponential_histogram().data_points().size(), ps); + case Metric::kSummary: { + return reject_data_points(std::string("summary metrics are not supported"), + mpb.summary().data_points().size(), ps); + } default: return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "unexpected data type"); @@ -287,7 +312,8 @@ static grpc::Status unmarshal_resource(Resource rpb, label_set_t *resource) { return grpc::Status::OK; } -static grpc::Status dispatch_resource_metrics(ResourceMetrics rm) { +static grpc::Status dispatch_resource_metrics(ResourceMetrics rm, + ExportMetricsPartialSuccess *ps) { label_set_t resource = {0}; grpc::Status s = unmarshal_resource(rm.resource(), &resource); @@ -297,7 +323,7 @@ static grpc::Status dispatch_resource_metrics(ResourceMetrics rm) { for (auto sm : rm.scope_metrics()) { for (auto m : sm.metrics()) { - grpc::Status s = dispatch_metric(m, resource); + grpc::Status s = dispatch_metric(m, resource, ps); if (!s.ok()) { return s; } @@ -316,15 +342,18 @@ class OTMetricsService : public MetricsService::Service { grpc::Status Export(grpc::ServerContext *context, const ExportMetricsServiceRequest *req, ExportMetricsServiceResponse *resp) { - for (auto rm : req.resource_metrics()) { - grpc::Status s = dispatch_resource_metrics(rm); + ExportMetricsPartialSuccess *ps = resp->mutable_partial_success(); + + for (auto rm : req->resource_metrics()) { + grpc::Status s = dispatch_resource_metrics(rm, ps); if (!s.ok()) { + ERROR("open_telemetry_collector: dispatch_resource_metrics failed: %s", + s.error_message().c_str()); return s; } } - return grpc::Status(grpc::StatusCode::UNIMPLEMENTED, - "Export is not implemented yet"); + return grpc::Status::OK; } }; From 0eb74c18acef00a342a897430effe48bd227b4c6 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Thu, 1 Feb 2024 12:29:19 +0100 Subject: [PATCH 06/22] open_telemetry_collector plugin: Default to port 4317. --- src/open_telemetry_collector.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/open_telemetry_collector.cc b/src/open_telemetry_collector.cc index b4326747a9..7b9f7932a4 100644 --- a/src/open_telemetry_collector.cc +++ b/src/open_telemetry_collector.cc @@ -73,7 +73,7 @@ struct Listener { grpc::SslServerCredentialsOptions *ssl; }; static std::vector listeners; -static grpc::string default_addr("0.0.0.0:50051"); +static grpc::string default_addr("0.0.0.0:4317"); /* * helper functions From 15a996f54989ca8bd12e4ef365341340dac051ad Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Thu, 1 Feb 2024 12:30:07 +0100 Subject: [PATCH 07/22] open_telemetry_collector plugin: Add documentation. --- src/collectd.conf.in | 10 ++++++++++ src/collectd.conf.pod | 44 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/src/collectd.conf.in b/src/collectd.conf.in index 36db15b3b3..10ce892dcc 100644 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@ -1331,6 +1331,16 @@ # CollectUserCount false # +# +# +# EnableSSL false +# SSLCACertificateFile "/path/to/root.pem" +# SSLCertificateFile "/path/to/client.pem" +# SSLCertificateKeyFile "/path/to/client.key" +# VerifyPeer false +# +# + # # # Statement "SELECT category, COUNT(*) AS value FROM products WHERE in_stock = 0 GROUP BY category" diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index 631c3ee927..8aa027d2d4 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -7153,6 +7153,50 @@ can be configured independently from that option. Defaults to B. =back +=head2 Plugin C + +The I implements an OpenTelemetry Collector. +Specifically, it implements a gRPC C. + +=over 4 + +=item B I I + +The B statement sets the network address and port to which to bind. +Multiple B blocks can be specified to listen on multiple +interfaces/ports. When no B blocks are specified, it defaults to +B<0.0.0.0:4317>. + +The argument I may be a hostname, an IPv4 address, or an IPv6 address. + +Optionally, B blocks support the following options: + +=over 4 + +=item B I|I + +Whether to enable SSL for incoming connections. Default: false. + +=item B I + +=item B I + +=item B I + +Filenames specifying SSL certificate and key material to be used with SSL +connections. + +=item B B|B + +When enabled, a valid client certificate is required to connect to the server. +When disabled, a client certifiacte is not requested and any unsolicited client +certificate is accepted. +Enabled by default. + +=back + +=back + =head2 Plugin C The "oracle" plugin uses the Oracle® Call Interface I<(OCI)> to connect to an From 0240e08ad5dd75fecfaf9e8544ffa7f87cb15d8e Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Thu, 1 Feb 2024 12:30:42 +0100 Subject: [PATCH 08/22] open_telemetry_collector plugin: Unify the setting of `family->type`. --- src/open_telemetry_collector.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/open_telemetry_collector.cc b/src/open_telemetry_collector.cc index 7b9f7932a4..6aad492a85 100644 --- a/src/open_telemetry_collector.cc +++ b/src/open_telemetry_collector.cc @@ -173,7 +173,6 @@ static grpc::Status unmarshal_data_point(NumberDataPoint dp, // (#4266) // fam->type = METRIC_TYPE_FPCOUNTER; // m.value.fpcounter = dp.as_double(); - fam->type = METRIC_TYPE_COUNTER; m.value.counter = offset.counter + (counter_t)dp.as_double(); break; } @@ -181,7 +180,6 @@ static grpc::Status unmarshal_data_point(NumberDataPoint dp, break; case NumberDataPoint::kAsInt: if (is_cumulative) { - fam->type = METRIC_TYPE_COUNTER; m.value.counter = offset.counter + (counter_t)dp.as_int(); break; } @@ -270,6 +268,7 @@ static grpc::Status dispatch_metric(Metric mpb, label_set_t resource, break; } case Metric::kSum: { + fam.type = METRIC_TYPE_COUNTER; grpc::Status s = unmarshal_sum_metric(mpb.sum(), &fam); if (!s.ok()) { metric_family_metric_reset(&fam); From 808888179fcf2239a55edcddbcfcbbb1439188c3 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Thu, 1 Feb 2024 12:31:55 +0100 Subject: [PATCH 09/22] open_telemetry_collector plugin: Reject non-monotonic Sums. --- src/open_telemetry_collector.cc | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/open_telemetry_collector.cc b/src/open_telemetry_collector.cc index 6aad492a85..acc8b37c2e 100644 --- a/src/open_telemetry_collector.cc +++ b/src/open_telemetry_collector.cc @@ -220,7 +220,15 @@ static grpc::Status unmarshal_gauge_metric(Gauge g, metric_family_t *fam) { } static grpc::Status unmarshal_sum_metric(Sum sum, metric_family_t *fam) { - // TODO(octo): check is_monotonic and aggregation temporality + if (!sum.is_monotonic()) { + // TODO(octo): convert to gauge instead? + DEBUG("open_telemetry_collector: non-monotonic sums (aka. UpDownCounters) " + "are unsupported"); + return grpc::Status( + grpc::StatusCode::UNIMPLEMENTED, + "non-monotonic sums (aka. UpDownCounters) are unsupported"); + } + for (auto db : sum.data_points()) { grpc::Status s = unmarshal_data_point(db, fam, sum.aggregation_temporality()); From 6f733fb50209f70806a7788c67010f2cd4fd34f2 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Thu, 1 Feb 2024 12:34:15 +0100 Subject: [PATCH 10/22] open_telemetry_collector plugin: Link with protobuf library. --- Makefile.am | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/Makefile.am b/Makefile.am index 69415dba6c..82d476aa2b 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1747,11 +1747,19 @@ endif if BUILD_PLUGIN_OPEN_TELEMETRY_COLLECTOR pkglib_LTLIBRARIES += open_telemetry_collector.la open_telemetry_collector_la_SOURCES = src/open_telemetry_collector.cc \ - opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.cc \ - opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h -open_telemetry_collector_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBGRPCPP_CPPFLAGS) -open_telemetry_collector_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBGRPCPP_LDFLAGS) -open_telemetry_collector_la_LIBADD = $(BUILD_WITH_LIBGRPCPP_LIBS) + opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.cc \ + opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h \ + opentelemetry/proto/collector/metrics/v1/metrics_service.pb.cc \ + opentelemetry/proto/collector/metrics/v1/metrics_service.pb.h \ + opentelemetry/proto/common/v1/common.pb.cc \ + opentelemetry/proto/common/v1/common.pb.h \ + opentelemetry/proto/metrics/v1/metrics.pb.cc \ + opentelemetry/proto/metrics/v1/metrics.pb.h \ + opentelemetry/proto/resource/v1/resource.pb.cc \ + opentelemetry/proto/resource/v1/resource.pb.h +open_telemetry_collector_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBGRPCPP_CPPFLAGS) $(BUILD_WITH_LIBPROTOBUF_CPPFLAGS) +open_telemetry_collector_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBGRPCPP_LDFLAGS) $(BUILD_WITH_LIBPROTOBUF_LDFLAGS) +open_telemetry_collector_la_LIBADD = $(BUILD_WITH_LIBGRPCPP_LIBS) $(BUILD_WITH_LIBPROTOBUF_LIBS) endif if BUILD_PLUGIN_ORACLE From 4b64a0337e1cd6f1b199ff4307305aacae3d6559 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Thu, 1 Feb 2024 12:36:01 +0100 Subject: [PATCH 11/22] open_telemetry_collector plugin: Populate `ssl_opts` only when needed. --- src/open_telemetry_collector.cc | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/open_telemetry_collector.cc b/src/open_telemetry_collector.cc index acc8b37c2e..daeee9efd5 100644 --- a/src/open_telemetry_collector.cc +++ b/src/open_telemetry_collector.cc @@ -480,11 +480,12 @@ static int otelcol_config_listen(oconfig_item_t *ci) { } } - ssl_opts->pem_key_cert_pairs.push_back(pkcp); - if (use_ssl) + if (use_ssl) { + ssl_opts->pem_key_cert_pairs.push_back(pkcp); listener.ssl = ssl_opts; - else + } else { delete (ssl_opts); + } listeners.push_back(listener); return 0; From a07173ef9ef0fb78d01a50e8732817841b4c43e9 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Thu, 1 Feb 2024 12:53:31 +0100 Subject: [PATCH 12/22] open_telemetry_collector plugin: Enable gRPC reflection if available. --- configure.ac | 31 +++++++++++++++++++++++++++++++ src/open_telemetry_collector.cc | 6 ++++++ 2 files changed, 37 insertions(+) diff --git a/configure.ac b/configure.ac index cadd9a986c..4105800691 100644 --- a/configure.ac +++ b/configure.ac @@ -2880,6 +2880,37 @@ if test "x$with_libgrpcpp" = "xyes"; then AC_LANG_POP(C++) fi +if test "x$with_libgrpcpp" = "xyes"; then + AC_MSG_CHECKING([for grpc++_reflection]) + AC_LANG_PUSH(C++) + SAVE_CPPFLAGS="$CPPFLAGS" + SAVE_LDFLAGS="$LDFLAGS" + SAVE_LIBS="$LIBS" + CPPFLAGS="-std=c++11 $with_libgrpcpp_cppflags $GRPCPP_CFLAGS $CPPFLAGS" + LDFLAGS="$with_libgrpcpp_ldflags" + LIBS="-lgrpc++_reflection $GRPCPP_LIBS" + AC_LINK_IFELSE( + [ + AC_LANG_PROGRAM( + [[#include ]], + [[grpc::reflection::InitProtoReflectionServerBuilderPlugin();]] + ) + ], + [ + AC_MSG_RESULT([yes]) + GRPCPP_LIBS="$LIBS" + AC_DEFINE([HAVE_GRPCPP_REFLECTION], [1], [Define if the grpc++_reflection library is available.]) + ], + [ + AC_MSG_RESULT([no]) + ] + ) + CPPFLAGS="$SAVE_CPPFLAGS" + LDFLAGS="$SAVE_LDFLAGS" + LIBS="$SAVE_LIBS" + AC_LANG_POP(C++) +fi + BUILD_WITH_LIBGRPCPP_CPPFLAGS="-std=c++11 $with_libgrpcpp_cppflags $GRPCPP_CFLAGS" BUILD_WITH_LIBGRPCPP_LDFLAGS="$with_libgrpcpp_ldflags" BUILD_WITH_LIBGRPCPP_LIBS="$GRPCPP_LIBS" diff --git a/src/open_telemetry_collector.cc b/src/open_telemetry_collector.cc index daeee9efd5..f70926e615 100644 --- a/src/open_telemetry_collector.cc +++ b/src/open_telemetry_collector.cc @@ -37,6 +37,9 @@ extern "C" { #include #include +#if HAVE_GRPCPP_REFLECTION +#include +#endif #include "opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h" #include "opentelemetry/proto/collector/metrics/v1/metrics_service.pb.h" @@ -370,6 +373,9 @@ class OTMetricsService : public MetricsService::Service { class CollectorServer final { public: void Start() { +#if HAVE_GRPCPP_REFLECTION + grpc::reflection::InitProtoReflectionServerBuilderPlugin(); +#endif auto auth = grpc::InsecureServerCredentials(); grpc::ServerBuilder builder; From 3d81d6c01c06f033bdda4ed9812062627cb79897 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Fri, 2 Feb 2024 13:21:17 +0100 Subject: [PATCH 13/22] open_telemetry plugin: Combine the *open_telemetry_collector* and *write_open_telemetry* plugins. --- Makefile.am | 41 +++---- configure.ac | 22 ++-- src/collectd.conf.in | 19 ++-- src/collectd.conf.pod | 80 +++++++------- src/open_telemetry.cc | 70 ++++++++++++ ...elemetry.cc => open_telemetry_exporter.cc} | 28 +---- ...ollector.cc => open_telemetry_receiver.cc} | 101 ++++++++---------- 7 files changed, 190 insertions(+), 171 deletions(-) create mode 100644 src/open_telemetry.cc rename src/{write_open_telemetry.cc => open_telemetry_exporter.cc} (91%) rename src/{open_telemetry_collector.cc => open_telemetry_receiver.cc} (94%) diff --git a/Makefile.am b/Makefile.am index 82d476aa2b..dbc9b1c0a7 100644 --- a/Makefile.am +++ b/Makefile.am @@ -484,7 +484,7 @@ libformat_influxdb_la_SOURCES = \ src/utils/format_influxdb/format_influxdb.c \ src/utils/format_influxdb/format_influxdb.h -if BUILD_PLUGIN_WRITE_OPEN_TELEMETRY +if BUILD_PLUGIN_OPEN_TELEMETRY noinst_LTLIBRARIES += libformat_open_telemetry.la libformat_open_telemetry_la_SOURCES = \ src/utils/format_open_telemetry/format_open_telemetry.cc \ @@ -1744,22 +1744,17 @@ openvpn_la_SOURCES = src/openvpn.c openvpn_la_LDFLAGS = $(PLUGIN_LDFLAGS) endif -if BUILD_PLUGIN_OPEN_TELEMETRY_COLLECTOR -pkglib_LTLIBRARIES += open_telemetry_collector.la -open_telemetry_collector_la_SOURCES = src/open_telemetry_collector.cc \ - opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.cc \ - opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h \ - opentelemetry/proto/collector/metrics/v1/metrics_service.pb.cc \ - opentelemetry/proto/collector/metrics/v1/metrics_service.pb.h \ - opentelemetry/proto/common/v1/common.pb.cc \ - opentelemetry/proto/common/v1/common.pb.h \ - opentelemetry/proto/metrics/v1/metrics.pb.cc \ - opentelemetry/proto/metrics/v1/metrics.pb.h \ - opentelemetry/proto/resource/v1/resource.pb.cc \ - opentelemetry/proto/resource/v1/resource.pb.h -open_telemetry_collector_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBGRPCPP_CPPFLAGS) $(BUILD_WITH_LIBPROTOBUF_CPPFLAGS) -open_telemetry_collector_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBGRPCPP_LDFLAGS) $(BUILD_WITH_LIBPROTOBUF_LDFLAGS) -open_telemetry_collector_la_LIBADD = $(BUILD_WITH_LIBGRPCPP_LIBS) $(BUILD_WITH_LIBPROTOBUF_LIBS) +if BUILD_PLUGIN_OPEN_TELEMETRY +pkglib_LTLIBRARIES += open_telemetry.la +open_telemetry_la_SOURCES = \ + src/open_telemetry.cc \ + src/open_telemetry_exporter.cc \ + src/open_telemetry_receiver.cc \ + opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.cc \ + opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h +open_telemetry_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBGRPCPP_CPPFLAGS) $(BUILD_WITH_LIBPROTOBUF_CPPFLAGS) +open_telemetry_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBGRPCPP_LDFLAGS) $(BUILD_WITH_LIBPROTOBUF_LDFLAGS) +open_telemetry_la_LIBADD = $(BUILD_WITH_LIBGRPCPP_LIBS) $(BUILD_WITH_LIBPROTOBUF_LIBS) libformat_open_telemetry.la endif if BUILD_PLUGIN_ORACLE @@ -2377,16 +2372,6 @@ write_mongodb_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBMONGOC_LDFLAGS) write_mongodb_la_LIBADD = $(BUILD_WITH_LIBMONGOC_LIBS) endif -if BUILD_PLUGIN_WRITE_OPEN_TELEMETRY -pkglib_LTLIBRARIES += write_open_telemetry.la -write_open_telemetry_la_SOURCES = src/write_open_telemetry.cc \ - opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.cc \ - opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h -write_open_telemetry_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBGRPCPP_CPPFLAGS) -write_open_telemetry_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBGRPCPP_LDFLAGS) -write_open_telemetry_la_LIBADD = $(BUILD_WITH_LIBGRPCPP_LIBS) libformat_open_telemetry.la -endif - if BUILD_PLUGIN_WRITE_PROMETHEUS pkglib_LTLIBRARIES += write_prometheus.la write_prometheus_la_SOURCES = src/write_prometheus.c @@ -2535,7 +2520,7 @@ types.pb.cc: $(srcdir)/proto/types.proto endif endif -if BUILD_PLUGIN_WRITE_OPEN_TELEMETRY +if BUILD_PLUGIN_OPEN_TELEMETRY BUILT_SOURCES += \ opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.cc \ opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h \ diff --git a/configure.ac b/configure.ac index 4105800691..9e60e173a2 100644 --- a/configure.ac +++ b/configure.ac @@ -6841,7 +6841,7 @@ plugin_nut="$with_libupsclient" plugin_olsrd="yes" plugin_onewire="$with_libowcapi" plugin_openldap="$with_libldap" -plugin_open_telemetry_collector="yes" +plugin_open_telemetry="yes" plugin_openvpn="yes" plugin_oracle="$with_oracle" plugin_ovs_events="no" @@ -6906,7 +6906,6 @@ plugin_write_influxdb_udp="yes" plugin_write_kafka="$with_librdkafka" plugin_write_log="no" plugin_write_mongodb="$with_libmongoc" -plugin_write_open_telemetry="yes" plugin_write_prometheus="no" plugin_write_redis="yes" plugin_write_riemann="$with_libriemann_client" @@ -7177,26 +7176,22 @@ fi if test "x$GRPC_CPP_PLUGIN" = "x"; then plugin_grpc="no (grpc_cpp_plugin not found)" - plugin_open_telemetry_collector="no (grpc_cpp_plugin not found)" - plugin_write_open_telemetry="no (grpc_cpp_plugin not found)" + plugin_open_telemetry="no (grpc_cpp_plugin not found)" fi if test "x$have_protoc3" != "xyes"; then plugin_grpc="no (protoc3 not found)" - plugin_open_telemetry_collector="no (protoc3 not found)" - plugin_write_open_telemetry="no (protoc3 not found)" + plugin_open_telemetry="no (protoc3 not found)" fi if test "x$with_libprotobuf" != "xyes"; then plugin_grpc="no (libprotobuf not found)" - plugin_open_telemetry_collector="no (libprotobuf not found)" - plugin_write_open_telemetry="no (libprotobuf not found)" + plugin_open_telemetry="no (libprotobuf not found)" fi if test "x$with_libgrpcpp" != "xyes"; then plugin_grpc="no (libgrpc++ not found)" - plugin_open_telemetry_collector="no (libgrpc++ not found)" - plugin_write_open_telemetry="no (libgrpc++ not found)" + plugin_open_telemetry="no (libgrpc++ not found)" fi if test "x$protoc3_optional" = "xno"; then - plugin_write_open_telemetry="no (protoc does not support optional fields)" + plugin_open_telemetry="no (protoc does not support optional fields)" fi if test "x$have_getifaddrs" = "xyes"; then @@ -7553,7 +7548,7 @@ AC_PLUGIN([olsrd], [$plugin_olsrd], [olsrd statistics] AC_PLUGIN([onewire], [$plugin_onewire], [OneWire sensor statistics]) AC_PLUGIN([openldap], [$plugin_openldap], [OpenLDAP statistics]) AC_PLUGIN([openvpn], [$plugin_openvpn], [OpenVPN client statistics]) -AC_PLUGIN([open_telemetry_collector], [$plugin_open_telemetry_collector], [OpenTelemetry Collector]) +AC_PLUGIN([open_telemetry], [$plugin_open_telemetry], [OpenTelemetry exporter/receiver]) AC_PLUGIN([oracle], [$plugin_oracle], [Oracle plugin]) AC_PLUGIN([ovs_events], [$plugin_ovs_events], [OVS events plugin]) AC_PLUGIN([ovs_stats], [$plugin_ovs_stats], [OVS statistics plugin]) @@ -7618,7 +7613,6 @@ AC_PLUGIN([write_influxdb_udp], [$plugin_write_influxdb_udp],[Influxdb udp outp AC_PLUGIN([write_kafka], [$plugin_write_kafka], [Kafka output plugin]) AC_PLUGIN([write_log], [$plugin_write_log], [Log output plugin]) AC_PLUGIN([write_mongodb], [$plugin_write_mongodb], [MongoDB output plugin]) -AC_PLUGIN([write_open_telemetry],[$plugin_write_open_telemetry],[Write OpenTelemetry plugin]) AC_PLUGIN([write_prometheus], [$plugin_write_prometheus], [Prometheus write plugin]) AC_PLUGIN([write_redis], [$plugin_write_redis], [Redis output plugin]) AC_PLUGIN([write_riemann], [$plugin_write_riemann], [Riemann output plugin]) @@ -8001,6 +7995,7 @@ AC_MSG_RESULT([ nut . . . . . . . . . $enable_nut]) AC_MSG_RESULT([ olsrd . . . . . . . . $enable_olsrd]) AC_MSG_RESULT([ onewire . . . . . . . $enable_onewire]) AC_MSG_RESULT([ openldap . . . . . . $enable_openldap]) +AC_MSG_RESULT([ open_telemetry . . . $enable_open_telemetry]) AC_MSG_RESULT([ openvpn . . . . . . . $enable_openvpn]) AC_MSG_RESULT([ oracle . . . . . . . $enable_oracle]) AC_MSG_RESULT([ ovs_events . . . . . $enable_ovs_events]) @@ -8065,7 +8060,6 @@ AC_MSG_RESULT([ write_influxdb_udp. . $enable_write_influxdb_udp]) AC_MSG_RESULT([ write_kafka . . . . . $enable_write_kafka]) AC_MSG_RESULT([ write_log . . . . . . $enable_write_log]) AC_MSG_RESULT([ write_mongodb . . . . $enable_write_mongodb]) -AC_MSG_RESULT([ write_open_telemetry $enable_write_open_telemetry]) AC_MSG_RESULT([ write_prometheus. . . $enable_write_prometheus]) AC_MSG_RESULT([ write_redis . . . . . $enable_write_redis]) AC_MSG_RESULT([ write_riemann . . . . $enable_write_riemann]) diff --git a/src/collectd.conf.in b/src/collectd.conf.in index 10ce892dcc..89a2212f41 100644 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@ -182,6 +182,7 @@ #@BUILD_PLUGIN_OLSRD_TRUE@LoadPlugin olsrd #@BUILD_PLUGIN_ONEWIRE_TRUE@LoadPlugin onewire #@BUILD_PLUGIN_OPENLDAP_TRUE@LoadPlugin openldap +#@BUILD_PLUGIN_OPEN_TELEMETRY_TRUE@LoadPlugin open_telemetry #@BUILD_PLUGIN_OPENVPN_TRUE@LoadPlugin openvpn #@BUILD_PLUGIN_ORACLE_TRUE@LoadPlugin oracle #@BUILD_PLUGIN_OVS_EVENTS_TRUE@LoadPlugin ovs_events @@ -236,7 +237,6 @@ #@BUILD_PLUGIN_WRITE_KAFKA_TRUE@LoadPlugin write_kafka #@BUILD_PLUGIN_WRITE_LOG_TRUE@LoadPlugin write_log #@BUILD_PLUGIN_WRITE_MONGODB_TRUE@LoadPlugin write_mongodb -#@BUILD_PLUGIN_WRITE_OPEN_TELEMETRY_TRUE@LoadPlugin write_open_telemetry #@BUILD_PLUGIN_WRITE_PROMETHEUS_TRUE@LoadPlugin write_prometheus #@BUILD_PLUGIN_WRITE_REDIS_TRUE@LoadPlugin write_redis #@BUILD_PLUGIN_WRITE_RIEMANN_TRUE@LoadPlugin write_riemann @@ -1331,14 +1331,18 @@ # CollectUserCount false # -# -# +# +# # EnableSSL false # SSLCACertificateFile "/path/to/root.pem" # SSLCertificateFile "/path/to/client.pem" # SSLCertificateKeyFile "/path/to/client.key" # VerifyPeer false -# +# +# +# Host "localhost" +# Port "4317" +# # # @@ -2080,13 +2084,6 @@ # # -# -# -# Host "localhost" -# Port "4317" -# -# - # # Port "9103" # diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index 8aa027d2d4..12b39ebd94 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -7155,21 +7155,38 @@ can be configured independently from that option. Defaults to B. =head2 Plugin C -The I implements an OpenTelemetry Collector. -Specifically, it implements a gRPC C. +The I implements an OpenTelemetry exporter and receiver +using OTLP. Specifically, it implements a gRPC C client and +server. + +B + + + + EnableSSL false + SSLCACertificateFile "/path/to/root.pem" + SSLCertificateFile "/path/to/client.pem" + SSLCertificateKeyFile "/path/to/client.key" + VerifyPeer false + + + Host "localhost" + Port "4317" + + =over 4 -=item B I I +=item B I I -The B statement sets the network address and port to which to bind. -Multiple B blocks can be specified to listen on multiple -interfaces/ports. When no B blocks are specified, it defaults to -B<0.0.0.0:4317>. +The B statement sets the network address and port to which to bind. +Multiple B blocks can be specified to listen on multiple +interfaces/ports. When no B blocks are specified, the plugin will not +receive any metrics. The argument I may be a hostname, an IPv4 address, or an IPv6 address. -Optionally, B blocks support the following options: +Optionally, B blocks support the following options: =over 4 @@ -7195,6 +7212,24 @@ Enabled by default. =back +=item B I + +The plugin can export metrics to multiple collectors by specifying multiple +B blocks. Within the B blocks, the following options are +available: + +=over 4 + +=item B I
+ +Hostname or address to connect to. Defaults to C. + +=item B I + +Service name or port number to connect to. Defaults to C<4317>. + +=back + =back =head2 Plugin C @@ -11195,35 +11230,6 @@ want to use authentication all three fields must be set. =back -=head2 Plugin C - -The I will export metrics to an I using the gRPC based OTLP protocol. - -B - - - - Host "localhost" - Port "4317" - - - -The plugin can export metrics to multiple collectors by specifying multiple -B blocks. Within the B blocks, the following options are available: - -=over 4 - -=item B I
- -Hostname or address to connect to. Defaults to C. - -=item B I - -Service name or port number to connect to. Defaults to C<4317>. - -=back - =head2 Plugin C The I implements a tiny webserver that can be scraped diff --git a/src/open_telemetry.cc b/src/open_telemetry.cc new file mode 100644 index 0000000000..f87ea92030 --- /dev/null +++ b/src/open_telemetry.cc @@ -0,0 +1,70 @@ +/** + * collectd - src/open_telemetry.cc + * Copyright (C) 2024 Florian octo Forster + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * Authors: + * Florian octo Forster + **/ + +extern "C" { +#include "collectd.h" + +#include "daemon/configfile.h" +#include "daemon/plugin.h" +} + +int exporter_config(oconfig_item_t *ci); +int receiver_config(oconfig_item_t *ci); + +static int ot_config(oconfig_item_t *ci) { + for (int i = 0; i < ci->children_num; i++) { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp("Exporter", child->key) == 0) { + int err = exporter_config(child); + if (err) { + ERROR("open_telemetry plugin: Configuring exporter failed " + "with status %d", + err); + return err; + } + } else if (strcasecmp("Receiver", child->key) == 0) { + int err = receiver_config(child); + if (err) { + ERROR("open_telemetry plugin: Configuring receiver failed " + "with status %d", + err); + return err; + } + } else { + ERROR("open_telemetry plugin: invalid config option: \"%s\"", child->key); + return EINVAL; + } + } + + return 0; +} + +extern "C" { +void module_register(void) { + plugin_register_complex_config("open_telemetry", ot_config); +} +} diff --git a/src/write_open_telemetry.cc b/src/open_telemetry_exporter.cc similarity index 91% rename from src/write_open_telemetry.cc rename to src/open_telemetry_exporter.cc index 8491724c09..e382b8f10a 100644 --- a/src/write_open_telemetry.cc +++ b/src/open_telemetry_exporter.cc @@ -1,6 +1,6 @@ /** - * collectd - src/write_open_telemetry.cc - * Copyright (C) 2023 Florian octo Forster + * collectd - src/open_telemetry_exporter.cc + * Copyright (C) 2023-2024 Florian octo Forster * * Permission is hereby granted, free of charge, to any person obtaining a * copy of this software and associated documentation files (the "Software"), @@ -38,7 +38,7 @@ extern "C" { #include "collectd.h" -#include "plugin.h" +#include "daemon/plugin.h" #include "utils/common/common.h" #include "utils/resource_metrics/resource_metrics.h" @@ -196,7 +196,7 @@ static int ot_write(metric_family_t const *fam, user_data_t *user_data) { return 0; } -static int ot_config_node(oconfig_item_t *ci) { +int exporter_config(oconfig_item_t *ci) { ot_callback_t *cb = (ot_callback_t *)calloc(1, sizeof(*cb)); if (cb == NULL) { ERROR("write_open_telemetry plugin: calloc failed."); @@ -254,23 +254,3 @@ static int ot_config_node(oconfig_item_t *ci) { STRBUF_DESTROY(callback_name); return 0; } - -static int ot_config(oconfig_item_t *ci) { - for (int i = 0; i < ci->children_num; i++) { - oconfig_item_t *child = ci->children + i; - - if (strcasecmp("Node", child->key) == 0) - ot_config_node(child); - else { - ERROR("write_open_telemetry plugin: Invalid configuration " - "option: %s.", - child->key); - } - } - - return 0; -} - -void module_register(void) { - plugin_register_complex_config("write_open_telemetry", ot_config); -} diff --git a/src/open_telemetry_collector.cc b/src/open_telemetry_receiver.cc similarity index 94% rename from src/open_telemetry_collector.cc rename to src/open_telemetry_receiver.cc index f70926e615..07d6dd498b 100644 --- a/src/open_telemetry_collector.cc +++ b/src/open_telemetry_receiver.cc @@ -1,5 +1,5 @@ /** - * collectd - src/grpc.cc + * collectd - src/open_telemetry_receiver.cc * Copyright (C) 2015-2016 Sebastian Harl * Copyright (C) 2016-2024 Florian octo Forster * @@ -415,11 +415,50 @@ class CollectorServer final { static CollectorServer *server = nullptr; +static int receiver_init(void) { + if (server) { + return 0; + } + + server = new CollectorServer(); + if (!server) { + ERROR("open_telemetry_collector: Failed to create server"); + return -1; + } + + server->Start(); + return 0; +} /* receiver_init() */ + +static int receiver_shutdown(void) { + if (!server) + return 0; + + server->Shutdown(); + + delete server; + server = nullptr; + + return 0; +} /* receiver_shutdown() */ + +static void receiver_install_callbacks(void) { + static bool done; + + if (done) { + return; + } + + plugin_register_init("open_telemetry_collector", receiver_init); + plugin_register_shutdown("open_telemetry_collector", receiver_shutdown); + + done = true; +} + /* * collectd plugin interface */ -extern "C" { -static int otelcol_config_listen(oconfig_item_t *ci) { +int receiver_config(oconfig_item_t *ci) { if ((ci->values_num != 2) || (ci->values[0].type != OCONFIG_TYPE_STRING) || (ci->values[1].type != OCONFIG_TYPE_STRING)) { ERROR("open_telemetry_collector: The `%s` config option needs exactly " @@ -494,58 +533,6 @@ static int otelcol_config_listen(oconfig_item_t *ci) { } listeners.push_back(listener); + receiver_install_callbacks(); return 0; -} /* otelcol_config_listen() */ - -static int otelcol_config(oconfig_item_t *ci) { - for (int i = 0; i < ci->children_num; i++) { - oconfig_item_t *child = ci->children + i; - - if (!strcasecmp("Listen", child->key)) { - if (otelcol_config_listen(child)) { - return -1; - } - } - - else { - WARNING("open_telemetry_collector: Option `%s` not allowed here.", - child->key); - } - } - - return 0; -} /* otelcol_config() */ - -static int otelcol_init(void) { - if (server) { - return 0; - } - - server = new CollectorServer(); - if (!server) { - ERROR("open_telemetry_collector: Failed to create server"); - return -1; - } - - server->Start(); - return 0; -} /* otelcol_init() */ - -static int otelcol_shutdown(void) { - if (!server) - return 0; - - server->Shutdown(); - - delete server; - server = nullptr; - - return 0; -} /* otelcol_shutdown() */ - -void module_register(void) { - plugin_register_complex_config("open_telemetry_collector", otelcol_config); - plugin_register_init("open_telemetry_collector", otelcol_init); - plugin_register_shutdown("open_telemetry_collector", otelcol_shutdown); -} /* module_register() */ -} /* extern "C" */ +} /* receiver_config() */ From 5eb936b1fcf1778330f904b9c778e7a3ed5556ac Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Fri, 2 Feb 2024 13:31:37 +0100 Subject: [PATCH 14/22] open_telemetry plugin: Unify the prefix used in log messages. --- src/open_telemetry_exporter.cc | 24 +++++------------------ src/open_telemetry_receiver.cc | 35 ++++++++++++++++------------------ 2 files changed, 21 insertions(+), 38 deletions(-) diff --git a/src/open_telemetry_exporter.cc b/src/open_telemetry_exporter.cc index e382b8f10a..39886b0d64 100644 --- a/src/open_telemetry_exporter.cc +++ b/src/open_telemetry_exporter.cc @@ -24,17 +24,6 @@ * Florian octo Forster **/ -/* write_open_telemetry plugin configuration example - * - * - * - * Host "localhost" - * Port "8080" - * Path "/v1/metrics" - * - * - */ - extern "C" { #include "collectd.h" @@ -103,7 +92,7 @@ static int export_metrics(ot_callback_t *cb) { grpc::Status status = cb->stub->Export(&context, *req, &resp); if (!status.ok()) { - ERROR("write_open_telemetry plugin: Exporting metrics failed: %s", + ERROR("open_telemetry plugin: Exporting metrics failed: %s", status.error_message().c_str()); return -1; } @@ -111,8 +100,7 @@ static int export_metrics(ot_callback_t *cb) { if (resp.has_partial_success() && resp.partial_success().rejected_data_points() > 0) { auto ps = resp.partial_success(); - NOTICE("write_open_telemetry plugin: %" PRId64 - " data points were rejected: %s", + NOTICE("open_telemetry plugin: %" PRId64 " data points were rejected: %s", ps.rejected_data_points(), ps.error_message().c_str()); } @@ -199,7 +187,7 @@ static int ot_write(metric_family_t const *fam, user_data_t *user_data) { int exporter_config(oconfig_item_t *ci) { ot_callback_t *cb = (ot_callback_t *)calloc(1, sizeof(*cb)); if (cb == NULL) { - ERROR("write_open_telemetry plugin: calloc failed."); + ERROR("open_telemetry plugin: calloc failed."); return -1; } @@ -219,9 +207,7 @@ int exporter_config(oconfig_item_t *ci) { else if (strcasecmp("Port", child->key) == 0) status = cf_util_get_service(child, &cb->port); else { - ERROR("write_open_telemetry plugin: Invalid configuration " - "option: %s.", - child->key); + ERROR("open_telemetry plugin: invalid config option: %s.", child->key); status = -1; } @@ -232,7 +218,7 @@ int exporter_config(oconfig_item_t *ci) { } strbuf_t callback_name = STRBUF_CREATE; - strbuf_printf(&callback_name, "write_open_telemetry/%s", cb->name); + strbuf_printf(&callback_name, "open_telemetry/%s", cb->name); user_data_t user_data = { .data = cb, diff --git a/src/open_telemetry_receiver.cc b/src/open_telemetry_receiver.cc index 07d6dd498b..655caa6276 100644 --- a/src/open_telemetry_receiver.cc +++ b/src/open_telemetry_receiver.cc @@ -87,7 +87,7 @@ static grpc::string read_file(const char *filename) { f.open(filename); if (!f.is_open()) { - ERROR("open_telemetry_collector: Failed to open '%s'", filename); + ERROR("open_telemetry plugin: Failed to open '%s'", filename); return ""; } @@ -225,7 +225,7 @@ static grpc::Status unmarshal_gauge_metric(Gauge g, metric_family_t *fam) { static grpc::Status unmarshal_sum_metric(Sum sum, metric_family_t *fam) { if (!sum.is_monotonic()) { // TODO(octo): convert to gauge instead? - DEBUG("open_telemetry_collector: non-monotonic sums (aka. UpDownCounters) " + DEBUG("open_telemetry plugin: non-monotonic sums (aka. UpDownCounters) " "are unsupported"); return grpc::Status( grpc::StatusCode::UNIMPLEMENTED, @@ -357,7 +357,7 @@ class OTMetricsService : public MetricsService::Service { for (auto rm : req->resource_metrics()) { grpc::Status s = dispatch_resource_metrics(rm, ps); if (!s.ok()) { - ERROR("open_telemetry_collector: dispatch_resource_metrics failed: %s", + ERROR("open_telemetry plugin: dispatch_resource_metrics failed: %s", s.error_message().c_str()); return s; } @@ -382,7 +382,7 @@ class CollectorServer final { if (listeners.empty()) { builder.AddListeningPort(default_addr, auth); - INFO("open_telemetry_collector: Listening on %s", default_addr.c_str()); + INFO("open_telemetry plugin: Listening on %s", default_addr.c_str()); } else { for (auto l : listeners) { grpc::string addr = l.addr + ":" + l.port; @@ -395,7 +395,7 @@ class CollectorServer final { } builder.AddListeningPort(addr, a); - INFO("open_telemetry_collector: Listening on %s%s", addr.c_str(), + INFO("open_telemetry plugin: Listening on %s%s", addr.c_str(), use_ssl.c_str()); } } @@ -422,7 +422,7 @@ static int receiver_init(void) { server = new CollectorServer(); if (!server) { - ERROR("open_telemetry_collector: Failed to create server"); + ERROR("open_telemetry plugin: Failed to create server"); return -1; } @@ -445,13 +445,11 @@ static int receiver_shutdown(void) { static void receiver_install_callbacks(void) { static bool done; - if (done) { - return; + if (!done) { + plugin_register_init("open_telemetry_receiver", receiver_init); + plugin_register_shutdown("open_telemetry_receiver", receiver_shutdown); } - plugin_register_init("open_telemetry_collector", receiver_init); - plugin_register_shutdown("open_telemetry_collector", receiver_shutdown); - done = true; } @@ -461,7 +459,7 @@ static void receiver_install_callbacks(void) { int receiver_config(oconfig_item_t *ci) { if ((ci->values_num != 2) || (ci->values[0].type != OCONFIG_TYPE_STRING) || (ci->values[1].type != OCONFIG_TYPE_STRING)) { - ERROR("open_telemetry_collector: The `%s` config option needs exactly " + ERROR("open_telemetry plugin: The `%s` config option needs exactly " "two string argument (address and port).", ci->key); return -1; @@ -482,14 +480,14 @@ int receiver_config(oconfig_item_t *ci) { if (!strcasecmp("EnableSSL", child->key)) { if (cf_util_get_boolean(child, &use_ssl)) { - ERROR("open_telemetry_collector: Option `%s` expects a boolean value", + ERROR("open_telemetry plugin: Option `%s` expects a boolean value", child->key); return -1; } } else if (!strcasecmp("SSLCACertificateFile", child->key)) { char *certs = NULL; if (cf_util_get_string(child, &certs)) { - ERROR("open_telemetry_collector: Option `%s` expects a string value", + ERROR("open_telemetry plugin: Option `%s` expects a string value", child->key); return -1; } @@ -497,7 +495,7 @@ int receiver_config(oconfig_item_t *ci) { } else if (!strcasecmp("SSLCertificateKeyFile", child->key)) { char *key = NULL; if (cf_util_get_string(child, &key)) { - ERROR("open_telemetry_collector: Option `%s` expects a string value", + ERROR("open_telemetry plugin: Option `%s` expects a string value", child->key); return -1; } @@ -505,7 +503,7 @@ int receiver_config(oconfig_item_t *ci) { } else if (!strcasecmp("SSLCertificateFile", child->key)) { char *cert = NULL; if (cf_util_get_string(child, &cert)) { - ERROR("open_telemetry_collector: Option `%s` expects a string value", + ERROR("open_telemetry plugin: Option `%s` expects a string value", child->key); return -1; } @@ -519,9 +517,8 @@ int receiver_config(oconfig_item_t *ci) { verify ? GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY : GRPC_SSL_DONT_REQUEST_CLIENT_CERTIFICATE; } else { - WARNING( - "open_telemetry_collector: Option `%s` not allowed in <%s> block.", - child->key, ci->key); + WARNING("open_telemetry plugin: Option `%s` not allowed in <%s> block.", + child->key, ci->key); } } From 2174526e2ef1af2fb0349358293a2964d7095fb4 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Fri, 2 Feb 2024 13:51:15 +0100 Subject: [PATCH 15/22] open_telemetry plugin: Unify the configuration between exporter and receiver. --- src/collectd.conf.in | 5 +--- src/collectd.conf.pod | 32 ++++++++------------- src/open_telemetry_exporter.cc | 50 +++++++++++++------------------- src/open_telemetry_receiver.cc | 52 ++++++++++++++++++---------------- 4 files changed, 60 insertions(+), 79 deletions(-) diff --git a/src/collectd.conf.in b/src/collectd.conf.in index 89a2212f41..86dcbb5a1b 100644 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@ -1339,10 +1339,7 @@ # SSLCertificateKeyFile "/path/to/client.key" # VerifyPeer false # -# -# Host "localhost" -# Port "4317" -# +# Exporter "localhost" "4317" # # diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index 12b39ebd94..285a0ca1eb 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -7153,7 +7153,7 @@ can be configured independently from that option. Defaults to B. =back -=head2 Plugin C +=head2 Plugin C The I implements an OpenTelemetry exporter and receiver using OTLP. Specifically, it implements a gRPC C client and @@ -7169,15 +7169,12 @@ B SSLCertificateKeyFile "/path/to/client.key" VerifyPeer false - - Host "localhost" - Port "4317" - + Exporter "localhost" "4317" =over 4 -=item B I I +=item B I [I] The B statement sets the network address and port to which to bind. Multiple B blocks can be specified to listen on multiple @@ -7186,6 +7183,9 @@ receive any metrics. The argument I may be a hostname, an IPv4 address, or an IPv6 address. +The I argument may be omitted. In that case the default C<"4317"> is +used. + Optionally, B blocks support the following options: =over 4 @@ -7212,23 +7212,15 @@ Enabled by default. =back -=item B I - -The plugin can export metrics to multiple collectors by specifying multiple -B blocks. Within the B blocks, the following options are -available: - -=over 4 - -=item B I
- -Hostname or address to connect to. Defaults to C. +=item B I [I] -=item B I +The B option configures an OTLP export to the given collector +address. Multiple collectors can be specified using multiple B lines. -Service name or port number to connect to. Defaults to C<4317>. +The argument I may be a hostname, an IPv4 address, or an IPv6 address. -=back +The I argument may be omitted. In that case the default C<"4317"> is +used. =back diff --git a/src/open_telemetry_exporter.cc b/src/open_telemetry_exporter.cc index 39886b0d64..3b736c3ae9 100644 --- a/src/open_telemetry_exporter.cc +++ b/src/open_telemetry_exporter.cc @@ -42,10 +42,6 @@ extern "C" { #include "opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h" #include "utils/format_open_telemetry/format_open_telemetry.h" -#ifndef OT_DEFAULT_HOST -#define OT_DEFAULT_HOST "localhost" -#endif - #ifndef OT_DEFAULT_PORT #define OT_DEFAULT_PORT "4317" #endif @@ -58,7 +54,6 @@ using opentelemetry::proto::collector::metrics::v1::MetricsService; * Private variables */ typedef struct { - char *name; int reference_count; char *host; @@ -144,7 +139,6 @@ static void ot_callback_decref(void *data) { cb->stub.reset(); - sfree(cb->name); sfree(cb->host); sfree(cb->port); @@ -185,40 +179,34 @@ static int ot_write(metric_family_t const *fam, user_data_t *user_data) { } int exporter_config(oconfig_item_t *ci) { + if (ci->values_num < 1 || ci->values_num > 2 || + ci->values[0].type != OCONFIG_TYPE_STRING || + (ci->values_num > 1 && ci->values[1].type != OCONFIG_TYPE_STRING)) { + ERROR("open_telemetry plugin: The \"%s\" config option needs " + "one or two string arguments (address and port).", + ci->key); + return EINVAL; + } + ot_callback_t *cb = (ot_callback_t *)calloc(1, sizeof(*cb)); if (cb == NULL) { ERROR("open_telemetry plugin: calloc failed."); return -1; } - cb->reference_count = 1; - cf_util_get_string(ci, &cb->name); - cb->host = strdup(OT_DEFAULT_HOST); - cb->port = strdup(OT_DEFAULT_PORT); - - pthread_mutex_init(&cb->mu, /* attr = */ NULL); - - for (int i = 0; i < ci->children_num; i++) { - oconfig_item_t *child = ci->children + i; - - int status = 0; - if (strcasecmp("Host", child->key) == 0) - status = cf_util_get_string(child, &cb->host); - else if (strcasecmp("Port", child->key) == 0) - status = cf_util_get_service(child, &cb->port); - else { - ERROR("open_telemetry plugin: invalid config option: %s.", child->key); - status = -1; - } - - if (status != 0) { - ot_callback_decref(cb); - return status; - } + *cb = (ot_callback_t){ + .reference_count = 1, + .host = strdup(ci->values[0].value.string), + }; + if (ci->values_num >= 2) { + cb->port = strdup(ci->values[1].value.string); + } else { + cb->port = strdup(OT_DEFAULT_PORT); } + pthread_mutex_init(&cb->mu, /* attr = */ NULL); strbuf_t callback_name = STRBUF_CREATE; - strbuf_printf(&callback_name, "open_telemetry/%s", cb->name); + strbuf_printf(&callback_name, "open_telemetry/[%s]:%s", cb->host, cb->port); user_data_t user_data = { .data = cb, diff --git a/src/open_telemetry_receiver.cc b/src/open_telemetry_receiver.cc index 655caa6276..6f3aead092 100644 --- a/src/open_telemetry_receiver.cc +++ b/src/open_telemetry_receiver.cc @@ -47,6 +47,10 @@ extern "C" { #include "opentelemetry/proto/metrics/v1/metrics.pb.h" #include "opentelemetry/proto/resource/v1/resource.pb.h" +#ifndef OT_DEFAULT_PORT +#define OT_DEFAULT_PORT "4317" +#endif + using opentelemetry::proto::collector::metrics::v1::ExportMetricsPartialSuccess; using opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest; using opentelemetry::proto::collector::metrics::v1:: @@ -380,24 +384,19 @@ class CollectorServer final { grpc::ServerBuilder builder; - if (listeners.empty()) { - builder.AddListeningPort(default_addr, auth); - INFO("open_telemetry plugin: Listening on %s", default_addr.c_str()); - } else { - for (auto l : listeners) { - grpc::string addr = l.addr + ":" + l.port; - - auto use_ssl = grpc::string(""); - auto a = auth; - if (l.ssl != nullptr) { - use_ssl = grpc::string(" (SSL enabled)"); - a = grpc::SslServerCredentials(*l.ssl); - } - - builder.AddListeningPort(addr, a); - INFO("open_telemetry plugin: Listening on %s%s", addr.c_str(), - use_ssl.c_str()); + for (auto l : listeners) { + grpc::string addr = l.addr + ":" + l.port; + + auto use_ssl = grpc::string(""); + auto a = auth; + if (l.ssl != nullptr) { + use_ssl = grpc::string(" (SSL enabled)"); + a = grpc::SslServerCredentials(*l.ssl); } + + builder.AddListeningPort(addr, a); + INFO("open_telemetry plugin: Listening on %s%s", addr.c_str(), + use_ssl.c_str()); } builder.RegisterService(&metrics_service); @@ -457,17 +456,22 @@ static void receiver_install_callbacks(void) { * collectd plugin interface */ int receiver_config(oconfig_item_t *ci) { - if ((ci->values_num != 2) || (ci->values[0].type != OCONFIG_TYPE_STRING) || - (ci->values[1].type != OCONFIG_TYPE_STRING)) { - ERROR("open_telemetry plugin: The `%s` config option needs exactly " - "two string argument (address and port).", + if (ci->values_num < 1 || ci->values_num > 2 || + ci->values[0].type != OCONFIG_TYPE_STRING || + (ci->values_num > 1 && ci->values[1].type != OCONFIG_TYPE_STRING)) { + ERROR("open_telemetry plugin: The \"%s\" config option needs " + "one or two string arguments (address and port).", ci->key); - return -1; + return EINVAL; } auto listener = Listener(); listener.addr = grpc::string(ci->values[0].value.string); - listener.port = grpc::string(ci->values[1].value.string); + if (ci->values_num >= 2) { + listener.port = grpc::string(ci->values[1].value.string); + } else { + listener.port = grpc::string(OT_DEFAULT_PORT); + } listener.ssl = nullptr; auto ssl_opts = new grpc::SslServerCredentialsOptions( @@ -526,7 +530,7 @@ int receiver_config(oconfig_item_t *ci) { ssl_opts->pem_key_cert_pairs.push_back(pkcp); listener.ssl = ssl_opts; } else { - delete (ssl_opts); + delete ssl_opts; } listeners.push_back(listener); From 2a363719fce9bee71f8b14c53d7c0ed21a8a0cd1 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Fri, 2 Feb 2024 16:15:51 +0100 Subject: [PATCH 16/22] open_telemetry plugin: Add SSL options for the exporter. --- src/collectd.conf.in | 9 +++- src/collectd.conf.pod | 21 +++++++++- src/open_telemetry_exporter.cc | 76 +++++++++++++++++++++++++++++----- 3 files changed, 92 insertions(+), 14 deletions(-) diff --git a/src/collectd.conf.in b/src/collectd.conf.in index 86dcbb5a1b..33e9bc4e1f 100644 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@ -1337,9 +1337,14 @@ # SSLCACertificateFile "/path/to/root.pem" # SSLCertificateFile "/path/to/client.pem" # SSLCertificateKeyFile "/path/to/client.key" -# VerifyPeer false +# VerifyPeer true # -# Exporter "localhost" "4317" +# +# EnableSSL false +# SSLCACertificateFile "/path/to/root.pem" +# SSLCertificateFile "/path/to/client.pem" +# SSLCertificateKeyFile "/path/to/client.key" +# # # diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index 285a0ca1eb..0aadd0e74c 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -7192,7 +7192,7 @@ Optionally, B blocks support the following options: =item B I|I -Whether to enable SSL for incoming connections. Default: false. +Whether to enable SSL for incoming connections. Default: I. =item B I @@ -7222,6 +7222,25 @@ The argument I may be a hostname, an IPv4 address, or an IPv6 address. The I argument may be omitted. In that case the default C<"4317"> is used. +Optionally, B blocks support the following options: + +=over 4 + +=item B I|I + +Whether to require SSL when connecting. Default: I. + +=item B I + +=item B I + +=item B I + +Filenames specifying SSL certificate and key material to be used with SSL +connections. + +=back + =back =head2 Plugin C diff --git a/src/open_telemetry_exporter.cc b/src/open_telemetry_exporter.cc index 3b736c3ae9..de62e74b8d 100644 --- a/src/open_telemetry_exporter.cc +++ b/src/open_telemetry_exporter.cc @@ -37,6 +37,8 @@ extern "C" { #include } +#include + #include #include "opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h" @@ -51,7 +53,7 @@ using opentelemetry::proto::collector::metrics::v1:: using opentelemetry::proto::collector::metrics::v1::MetricsService; /* - * Private variables + * Private types */ typedef struct { int reference_count; @@ -59,6 +61,9 @@ typedef struct { char *host; char *port; + bool use_ssl; + grpc::SslCredentialsOptions *ssl_opts; + resource_metrics_set_t resource_metrics; cdtime_t staged_time; @@ -69,14 +74,15 @@ typedef struct { static int export_metrics(ot_callback_t *cb) { if (cb->stub == NULL) { - strbuf_t buf = STRBUF_CREATE; - strbuf_printf(&buf, "%s:%s", cb->host, cb->port); + grpc::string addr = grpc::string(cb->host) + ":" + grpc::string(cb->port); - auto chan = - grpc::CreateChannel(buf.ptr, grpc::InsecureChannelCredentials()); - cb->stub = MetricsService::NewStub(chan); + auto creds = grpc::InsecureChannelCredentials(); + if (cb->use_ssl) { + creds = grpc::SslCredentials(*cb->ssl_opts); + } - STRBUF_DESTROY(buf); + auto chan = grpc::CreateChannel(addr, creds); + cb->stub = MetricsService::NewStub(chan); } auto req = format_open_telemetry_export_metrics_service_request( @@ -142,6 +148,8 @@ static void ot_callback_decref(void *data) { sfree(cb->host); sfree(cb->port); + delete cb->ssl_opts; + pthread_mutex_unlock(&cb->mu); pthread_mutex_destroy(&cb->mu); @@ -178,6 +186,29 @@ static int ot_write(metric_family_t const *fam, user_data_t *user_data) { return 0; } +static int config_get_file(oconfig_item_t const *ci, grpc::string *out) { + char *path = NULL; + int err = cf_util_get_string(ci, &path); + if (err) { + return err; + } + + std::ifstream f; + f.open(path); + if (!f.is_open()) { + ERROR("open_telemetry plugin: Failed to open \"%s\"", path); + return EPERM; + } + + grpc::string line; + while (std::getline(f, line)) { + out->append(line); + out->push_back('\n'); + } + f.close(); + return 0; +} /* config_get_file */ + int exporter_config(oconfig_item_t *ci) { if (ci->values_num < 1 || ci->values_num > 2 || ci->values[0].type != OCONFIG_TYPE_STRING || @@ -194,17 +225,40 @@ int exporter_config(oconfig_item_t *ci) { return -1; } - *cb = (ot_callback_t){ - .reference_count = 1, - .host = strdup(ci->values[0].value.string), - }; + cb->reference_count = 1; + cb->host = strdup(ci->values[0].value.string); if (ci->values_num >= 2) { cb->port = strdup(ci->values[1].value.string); } else { cb->port = strdup(OT_DEFAULT_PORT); } + cb->ssl_opts = new grpc::SslCredentialsOptions; pthread_mutex_init(&cb->mu, /* attr = */ NULL); + for (int i = 0; i < ci->children_num; i++) { + oconfig_item_t *child = ci->children + i; + + int err = 0; + if (!strcasecmp("EnableSSL", child->key)) { + err = cf_util_get_boolean(child, &cb->use_ssl); + } else if (!strcasecmp("SSLCACertificateFile", child->key)) { + err = config_get_file(child, &cb->ssl_opts->pem_root_certs); + } else if (!strcasecmp("SSLCertificateKeyFile", child->key)) { + err = config_get_file(child, &cb->ssl_opts->pem_private_key); + } else if (!strcasecmp("SSLCertificateFile", child->key)) { + err = config_get_file(child, &cb->ssl_opts->pem_cert_chain); + } else { + ERROR("open_telemetry plugin: Option \"%s\" is not allowed inside a " + "\"%s\" block.", + child->key, ci->key); + err = EINVAL; + } + + if (err) { + return err; + } + } + strbuf_t callback_name = STRBUF_CREATE; strbuf_printf(&callback_name, "open_telemetry/[%s]:%s", cb->host, cb->port); From 5fc370777edbba652fb56ae26131de42b171dd78 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Fri, 2 Feb 2024 16:18:19 +0100 Subject: [PATCH 17/22] open_telemetry plugin: Update the synopsis in the manpage. --- src/collectd.conf.pod | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index 0aadd0e74c..a9881b02ff 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -7167,9 +7167,14 @@ B SSLCACertificateFile "/path/to/root.pem" SSLCertificateFile "/path/to/client.pem" SSLCertificateKeyFile "/path/to/client.key" - VerifyPeer false + VerifyPeer true - Exporter "localhost" "4317" + + EnableSSL false + SSLCACertificateFile "/path/to/root.pem" + SSLCertificateFile "/path/to/client.pem" + SSLCertificateKeyFile "/path/to/client.key" + =over 4 From 61a33bcfa4b99300293e3cabea4f202975f664ce Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Fri, 2 Feb 2024 17:05:08 +0100 Subject: [PATCH 18/22] open_telemetry plugin: Add support for floating point counters. --- src/open_telemetry_receiver.cc | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/open_telemetry_receiver.cc b/src/open_telemetry_receiver.cc index 6f3aead092..e975a048d3 100644 --- a/src/open_telemetry_receiver.cc +++ b/src/open_telemetry_receiver.cc @@ -176,11 +176,8 @@ static grpc::Status unmarshal_data_point(NumberDataPoint dp, switch (dp.value_case()) { case NumberDataPoint::kAsDouble: if (is_cumulative) { - // TODO(octo): enable once floating point counters have been merged - // (#4266) - // fam->type = METRIC_TYPE_FPCOUNTER; - // m.value.fpcounter = dp.as_double(); - m.value.counter = offset.counter + (counter_t)dp.as_double(); + fam->type = METRIC_TYPE_FPCOUNTER; + m.value.fpcounter = dp.as_double(); break; } m.value.gauge = dp.as_double(); From 4c83fc0d54a8356e52807cda77cec3e39ef743e2 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Sun, 4 Feb 2024 08:43:33 +0100 Subject: [PATCH 19/22] open_telemetry plugin: Fix spelling in the manpage. --- src/collectd.conf.pod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index a9881b02ff..09036da19b 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -7211,7 +7211,7 @@ connections. =item B B|B When enabled, a valid client certificate is required to connect to the server. -When disabled, a client certifiacte is not requested and any unsolicited client +When disabled, a client certificate is not requested and any unsolicited client certificate is accepted. Enabled by default. From 7061ad2d1d90c7cda8b7d77cc5797dcff352f8bf Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Sun, 4 Feb 2024 08:45:11 +0100 Subject: [PATCH 20/22] open_telemetry plugin: Minor cleanups. * Set field to `NULL` after freeing. * Remove unused global variable. --- src/open_telemetry_exporter.cc | 1 + src/open_telemetry_receiver.cc | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/src/open_telemetry_exporter.cc b/src/open_telemetry_exporter.cc index de62e74b8d..564f9949c3 100644 --- a/src/open_telemetry_exporter.cc +++ b/src/open_telemetry_exporter.cc @@ -149,6 +149,7 @@ static void ot_callback_decref(void *data) { sfree(cb->port); delete cb->ssl_opts; + cb->ssl_opts = NULL; pthread_mutex_unlock(&cb->mu); pthread_mutex_destroy(&cb->mu); diff --git a/src/open_telemetry_receiver.cc b/src/open_telemetry_receiver.cc index e975a048d3..56c644d56e 100644 --- a/src/open_telemetry_receiver.cc +++ b/src/open_telemetry_receiver.cc @@ -80,7 +80,6 @@ struct Listener { grpc::SslServerCredentialsOptions *ssl; }; static std::vector listeners; -static grpc::string default_addr("0.0.0.0:4317"); /* * helper functions From c56c79cc5294b2c6431b2d8e2730c2e57363446c Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Sun, 4 Feb 2024 08:46:23 +0100 Subject: [PATCH 21/22] open_telemetry: Unify the configuration handling between exporter and receiver. --- src/open_telemetry_exporter.cc | 4 +- src/open_telemetry_receiver.cc | 76 ++++++++++------------------------ 2 files changed, 24 insertions(+), 56 deletions(-) diff --git a/src/open_telemetry_exporter.cc b/src/open_telemetry_exporter.cc index 564f9949c3..795bbd7ca5 100644 --- a/src/open_telemetry_exporter.cc +++ b/src/open_telemetry_exporter.cc @@ -187,7 +187,7 @@ static int ot_write(metric_family_t const *fam, user_data_t *user_data) { return 0; } -static int config_get_file(oconfig_item_t const *ci, grpc::string *out) { +int config_get_file(oconfig_item_t const *ci, grpc::string *out) { char *path = NULL; int err = cf_util_get_string(ci, &path); if (err) { @@ -223,7 +223,7 @@ int exporter_config(oconfig_item_t *ci) { ot_callback_t *cb = (ot_callback_t *)calloc(1, sizeof(*cb)); if (cb == NULL) { ERROR("open_telemetry plugin: calloc failed."); - return -1; + return ENOMEM; } cb->reference_count = 1; diff --git a/src/open_telemetry_receiver.cc b/src/open_telemetry_receiver.cc index 56c644d56e..921f26e0ac 100644 --- a/src/open_telemetry_receiver.cc +++ b/src/open_telemetry_receiver.cc @@ -81,27 +81,6 @@ struct Listener { }; static std::vector listeners; -/* - * helper functions - */ -static grpc::string read_file(const char *filename) { - std::ifstream f; - grpc::string s, content; - - f.open(filename); - if (!f.is_open()) { - ERROR("open_telemetry plugin: Failed to open '%s'", filename); - return ""; - } - - while (std::getline(f, s)) { - content += s; - content.push_back('\n'); - } - f.close(); - return content; -} /* read_file */ - /* * proto conversion */ @@ -448,6 +427,9 @@ static void receiver_install_callbacks(void) { done = true; } +// config_get_file is implemented in src/open_telemetry_exporter.cc +int config_get_file(oconfig_item_t const *ci, grpc::string *out); + /* * collectd plugin interface */ @@ -478,47 +460,33 @@ int receiver_config(oconfig_item_t *ci) { for (int i = 0; i < ci->children_num; i++) { oconfig_item_t *child = ci->children + i; + int err = 0; if (!strcasecmp("EnableSSL", child->key)) { - if (cf_util_get_boolean(child, &use_ssl)) { - ERROR("open_telemetry plugin: Option `%s` expects a boolean value", - child->key); - return -1; - } + err = cf_util_get_boolean(child, &use_ssl); } else if (!strcasecmp("SSLCACertificateFile", child->key)) { - char *certs = NULL; - if (cf_util_get_string(child, &certs)) { - ERROR("open_telemetry plugin: Option `%s` expects a string value", - child->key); - return -1; - } - ssl_opts->pem_root_certs = read_file(certs); + err = config_get_file(child, &ssl_opts->pem_root_certs); } else if (!strcasecmp("SSLCertificateKeyFile", child->key)) { - char *key = NULL; - if (cf_util_get_string(child, &key)) { - ERROR("open_telemetry plugin: Option `%s` expects a string value", - child->key); - return -1; - } - pkcp.private_key = read_file(key); + err = config_get_file(child, &pkcp.private_key); } else if (!strcasecmp("SSLCertificateFile", child->key)) { - char *cert = NULL; - if (cf_util_get_string(child, &cert)) { - ERROR("open_telemetry plugin: Option `%s` expects a string value", - child->key); - return -1; - } - pkcp.cert_chain = read_file(cert); + err = config_get_file(child, &pkcp.cert_chain); } else if (!strcasecmp("VerifyPeer", child->key)) { bool verify = false; - if (cf_util_get_boolean(child, &verify)) { - return -1; + err = cf_util_get_boolean(child, &verify); + if (!err) { + ssl_opts->client_certificate_request = + verify ? GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY + : GRPC_SSL_DONT_REQUEST_CLIENT_CERTIFICATE; } - ssl_opts->client_certificate_request = - verify ? GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY - : GRPC_SSL_DONT_REQUEST_CLIENT_CERTIFICATE; } else { - WARNING("open_telemetry plugin: Option `%s` not allowed in <%s> block.", - child->key, ci->key); + ERROR("open_telemetry plugin: Option \"%s\" is not allowed inside a " + "\"%s\" block.", + child->key, ci->key); + err = EINVAL; + } + + if (err) { + delete ssl_opts; + return err; } } From 3f17d3300510bd2791b655818bfea3b73fc9e2e1 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Tue, 20 Feb 2024 15:17:53 +0100 Subject: [PATCH 22/22] open_telemetry plugin: Move prototypes to a header file. --- Makefile.am | 2 +- src/open_telemetry.cc | 3 +-- src/open_telemetry.h | 42 ++++++++++++++++++++++++++++++++++ src/open_telemetry_exporter.cc | 3 ++- src/open_telemetry_receiver.cc | 5 ++-- 5 files changed, 48 insertions(+), 7 deletions(-) create mode 100644 src/open_telemetry.h diff --git a/Makefile.am b/Makefile.am index dbc9b1c0a7..3a13c3042d 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1747,7 +1747,7 @@ endif if BUILD_PLUGIN_OPEN_TELEMETRY pkglib_LTLIBRARIES += open_telemetry.la open_telemetry_la_SOURCES = \ - src/open_telemetry.cc \ + src/open_telemetry.cc src/open_telemetry.h \ src/open_telemetry_exporter.cc \ src/open_telemetry_receiver.cc \ opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.cc \ diff --git a/src/open_telemetry.cc b/src/open_telemetry.cc index f87ea92030..10aa2510f1 100644 --- a/src/open_telemetry.cc +++ b/src/open_telemetry.cc @@ -31,8 +31,7 @@ extern "C" { #include "daemon/plugin.h" } -int exporter_config(oconfig_item_t *ci); -int receiver_config(oconfig_item_t *ci); +#include "open_telemetry.h" static int ot_config(oconfig_item_t *ci) { for (int i = 0; i < ci->children_num; i++) { diff --git a/src/open_telemetry.h b/src/open_telemetry.h new file mode 100644 index 0000000000..2b332d9d7b --- /dev/null +++ b/src/open_telemetry.h @@ -0,0 +1,42 @@ +/** + * collectd - src/open_telemetry.h + * Copyright (C) 2024 Florian octo Forster + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * Authors: + * Florian octo Forster + **/ + +#ifndef OPEN_TELEMETRY_H +#define OPEN_TELEMETRY_H 1 + +extern "C" { +#include "daemon/collectd.h" +#include "daemon/configfile.h" +} + +#include + +int exporter_config(oconfig_item_t *ci); +int receiver_config(oconfig_item_t *ci); + +int config_get_file(oconfig_item_t const *ci, grpc::string *out); + +#endif diff --git a/src/open_telemetry_exporter.cc b/src/open_telemetry_exporter.cc index 795bbd7ca5..443a65f1fb 100644 --- a/src/open_telemetry_exporter.cc +++ b/src/open_telemetry_exporter.cc @@ -38,12 +38,13 @@ extern "C" { } #include - #include #include "opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h" #include "utils/format_open_telemetry/format_open_telemetry.h" +#include "open_telemetry.h" + #ifndef OT_DEFAULT_PORT #define OT_DEFAULT_PORT "4317" #endif diff --git a/src/open_telemetry_receiver.cc b/src/open_telemetry_receiver.cc index 921f26e0ac..c0dd114fdf 100644 --- a/src/open_telemetry_receiver.cc +++ b/src/open_telemetry_receiver.cc @@ -47,6 +47,8 @@ extern "C" { #include "opentelemetry/proto/metrics/v1/metrics.pb.h" #include "opentelemetry/proto/resource/v1/resource.pb.h" +#include "open_telemetry.h" + #ifndef OT_DEFAULT_PORT #define OT_DEFAULT_PORT "4317" #endif @@ -427,9 +429,6 @@ static void receiver_install_callbacks(void) { done = true; } -// config_get_file is implemented in src/open_telemetry_exporter.cc -int config_get_file(oconfig_item_t const *ci, grpc::string *out); - /* * collectd plugin interface */