diff --git a/Makefile.am b/Makefile.am index 8f39d989a9..3a13c3042d 100644 --- a/Makefile.am +++ b/Makefile.am @@ -484,7 +484,7 @@ libformat_influxdb_la_SOURCES = \ src/utils/format_influxdb/format_influxdb.c \ src/utils/format_influxdb/format_influxdb.h -if BUILD_PLUGIN_WRITE_OPEN_TELEMETRY +if BUILD_PLUGIN_OPEN_TELEMETRY noinst_LTLIBRARIES += libformat_open_telemetry.la libformat_open_telemetry_la_SOURCES = \ src/utils/format_open_telemetry/format_open_telemetry.cc \ @@ -1744,6 +1744,19 @@ openvpn_la_SOURCES = src/openvpn.c openvpn_la_LDFLAGS = $(PLUGIN_LDFLAGS) endif +if BUILD_PLUGIN_OPEN_TELEMETRY +pkglib_LTLIBRARIES += open_telemetry.la +open_telemetry_la_SOURCES = \ + src/open_telemetry.cc src/open_telemetry.h \ + src/open_telemetry_exporter.cc \ + src/open_telemetry_receiver.cc \ + opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.cc \ + opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h +open_telemetry_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBGRPCPP_CPPFLAGS) $(BUILD_WITH_LIBPROTOBUF_CPPFLAGS) +open_telemetry_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBGRPCPP_LDFLAGS) $(BUILD_WITH_LIBPROTOBUF_LDFLAGS) +open_telemetry_la_LIBADD = $(BUILD_WITH_LIBGRPCPP_LIBS) $(BUILD_WITH_LIBPROTOBUF_LIBS) libformat_open_telemetry.la +endif + if BUILD_PLUGIN_ORACLE pkglib_LTLIBRARIES += oracle.la oracle_la_SOURCES = \ @@ -2359,16 +2372,6 @@ write_mongodb_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBMONGOC_LDFLAGS) write_mongodb_la_LIBADD = $(BUILD_WITH_LIBMONGOC_LIBS) endif -if BUILD_PLUGIN_WRITE_OPEN_TELEMETRY -pkglib_LTLIBRARIES += write_open_telemetry.la -write_open_telemetry_la_SOURCES = src/write_open_telemetry.cc \ - opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.cc \ - opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h -write_open_telemetry_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBGRPCPP_CPPFLAGS) -write_open_telemetry_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBGRPCPP_LDFLAGS) -write_open_telemetry_la_LIBADD = $(BUILD_WITH_LIBGRPCPP_LIBS) libformat_open_telemetry.la -endif - if BUILD_PLUGIN_WRITE_PROMETHEUS pkglib_LTLIBRARIES += write_prometheus.la write_prometheus_la_SOURCES = src/write_prometheus.c @@ -2517,7 +2520,7 @@ types.pb.cc: $(srcdir)/proto/types.proto endif endif -if BUILD_PLUGIN_WRITE_OPEN_TELEMETRY +if BUILD_PLUGIN_OPEN_TELEMETRY BUILT_SOURCES += \ opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.cc \ opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h \ diff --git a/configure.ac b/configure.ac index 01cd8d2715..9e60e173a2 100644 --- a/configure.ac +++ b/configure.ac @@ -2880,6 +2880,37 @@ if test "x$with_libgrpcpp" = "xyes"; then AC_LANG_POP(C++) fi +if test "x$with_libgrpcpp" = "xyes"; then + AC_MSG_CHECKING([for grpc++_reflection]) + AC_LANG_PUSH(C++) + SAVE_CPPFLAGS="$CPPFLAGS" + SAVE_LDFLAGS="$LDFLAGS" + SAVE_LIBS="$LIBS" + CPPFLAGS="-std=c++11 $with_libgrpcpp_cppflags $GRPCPP_CFLAGS $CPPFLAGS" + LDFLAGS="$with_libgrpcpp_ldflags" + LIBS="-lgrpc++_reflection $GRPCPP_LIBS" + AC_LINK_IFELSE( + [ + AC_LANG_PROGRAM( + [[#include ]], + [[grpc::reflection::InitProtoReflectionServerBuilderPlugin();]] + ) + ], + [ + AC_MSG_RESULT([yes]) + GRPCPP_LIBS="$LIBS" + AC_DEFINE([HAVE_GRPCPP_REFLECTION], [1], [Define if the grpc++_reflection library is available.]) + ], + [ + AC_MSG_RESULT([no]) + ] + ) + CPPFLAGS="$SAVE_CPPFLAGS" + LDFLAGS="$SAVE_LDFLAGS" + LIBS="$SAVE_LIBS" + AC_LANG_POP(C++) +fi + BUILD_WITH_LIBGRPCPP_CPPFLAGS="-std=c++11 $with_libgrpcpp_cppflags $GRPCPP_CFLAGS" BUILD_WITH_LIBGRPCPP_LDFLAGS="$with_libgrpcpp_ldflags" BUILD_WITH_LIBGRPCPP_LIBS="$GRPCPP_LIBS" @@ -6810,6 +6841,7 @@ plugin_nut="$with_libupsclient" plugin_olsrd="yes" plugin_onewire="$with_libowcapi" plugin_openldap="$with_libldap" +plugin_open_telemetry="yes" plugin_openvpn="yes" plugin_oracle="$with_oracle" plugin_ovs_events="no" @@ -6874,7 +6906,6 @@ plugin_write_influxdb_udp="yes" plugin_write_kafka="$with_librdkafka" plugin_write_log="no" plugin_write_mongodb="$with_libmongoc" -plugin_write_open_telemetry="yes" plugin_write_prometheus="no" plugin_write_redis="yes" plugin_write_riemann="$with_libriemann_client" @@ -7145,22 +7176,22 @@ fi if test "x$GRPC_CPP_PLUGIN" = "x"; then plugin_grpc="no (grpc_cpp_plugin not found)" - plugin_write_open_telemetry="no (grpc_cpp_plugin not found)" + plugin_open_telemetry="no (grpc_cpp_plugin not found)" fi if test "x$have_protoc3" != "xyes"; then plugin_grpc="no (protoc3 not found)" - plugin_write_open_telemetry="no (protoc3 not found)" + plugin_open_telemetry="no (protoc3 not found)" fi if test "x$with_libprotobuf" != "xyes"; then plugin_grpc="no (libprotobuf not found)" - plugin_write_open_telemetry="no (libprotobuf not found)" + plugin_open_telemetry="no (libprotobuf not found)" fi if test "x$with_libgrpcpp" != "xyes"; then plugin_grpc="no (libgrpc++ not found)" - plugin_write_open_telemetry="no (libgrpc++ not found)" + plugin_open_telemetry="no (libgrpc++ not found)" fi if test "x$protoc3_optional" = "xno"; then - plugin_write_open_telemetry="no (protoc does not support optional fields)" + plugin_open_telemetry="no (protoc does not support optional fields)" fi if test "x$have_getifaddrs" = "xyes"; then @@ -7517,6 +7548,7 @@ AC_PLUGIN([olsrd], [$plugin_olsrd], [olsrd statistics] AC_PLUGIN([onewire], [$plugin_onewire], [OneWire sensor statistics]) AC_PLUGIN([openldap], [$plugin_openldap], [OpenLDAP statistics]) AC_PLUGIN([openvpn], [$plugin_openvpn], [OpenVPN client statistics]) +AC_PLUGIN([open_telemetry], [$plugin_open_telemetry], [OpenTelemetry exporter/receiver]) AC_PLUGIN([oracle], [$plugin_oracle], [Oracle plugin]) AC_PLUGIN([ovs_events], [$plugin_ovs_events], [OVS events plugin]) AC_PLUGIN([ovs_stats], [$plugin_ovs_stats], [OVS statistics plugin]) @@ -7581,7 +7613,6 @@ AC_PLUGIN([write_influxdb_udp], [$plugin_write_influxdb_udp],[Influxdb udp outp AC_PLUGIN([write_kafka], [$plugin_write_kafka], [Kafka output plugin]) AC_PLUGIN([write_log], [$plugin_write_log], [Log output plugin]) AC_PLUGIN([write_mongodb], [$plugin_write_mongodb], [MongoDB output plugin]) -AC_PLUGIN([write_open_telemetry],[$plugin_write_open_telemetry],[Write OpenTelemetry plugin]) AC_PLUGIN([write_prometheus], [$plugin_write_prometheus], [Prometheus write plugin]) AC_PLUGIN([write_redis], [$plugin_write_redis], [Redis output plugin]) AC_PLUGIN([write_riemann], [$plugin_write_riemann], [Riemann output plugin]) @@ -7964,6 +7995,7 @@ AC_MSG_RESULT([ nut . . . . . . . . . $enable_nut]) AC_MSG_RESULT([ olsrd . . . . . . . . $enable_olsrd]) AC_MSG_RESULT([ onewire . . . . . . . $enable_onewire]) AC_MSG_RESULT([ openldap . . . . . . $enable_openldap]) +AC_MSG_RESULT([ open_telemetry . . . $enable_open_telemetry]) AC_MSG_RESULT([ openvpn . . . . . . . $enable_openvpn]) AC_MSG_RESULT([ oracle . . . . . . . $enable_oracle]) AC_MSG_RESULT([ ovs_events . . . . . $enable_ovs_events]) @@ -8028,7 +8060,6 @@ AC_MSG_RESULT([ write_influxdb_udp. . $enable_write_influxdb_udp]) AC_MSG_RESULT([ write_kafka . . . . . $enable_write_kafka]) AC_MSG_RESULT([ write_log . . . . . . $enable_write_log]) AC_MSG_RESULT([ write_mongodb . . . . $enable_write_mongodb]) -AC_MSG_RESULT([ write_open_telemetry $enable_write_open_telemetry]) AC_MSG_RESULT([ write_prometheus. . . $enable_write_prometheus]) AC_MSG_RESULT([ write_redis . . . . . $enable_write_redis]) AC_MSG_RESULT([ write_riemann . . . . $enable_write_riemann]) diff --git a/src/collectd.conf.in b/src/collectd.conf.in index 36db15b3b3..33e9bc4e1f 100644 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@ -182,6 +182,7 @@ #@BUILD_PLUGIN_OLSRD_TRUE@LoadPlugin olsrd #@BUILD_PLUGIN_ONEWIRE_TRUE@LoadPlugin onewire #@BUILD_PLUGIN_OPENLDAP_TRUE@LoadPlugin openldap +#@BUILD_PLUGIN_OPEN_TELEMETRY_TRUE@LoadPlugin open_telemetry #@BUILD_PLUGIN_OPENVPN_TRUE@LoadPlugin openvpn #@BUILD_PLUGIN_ORACLE_TRUE@LoadPlugin oracle #@BUILD_PLUGIN_OVS_EVENTS_TRUE@LoadPlugin ovs_events @@ -236,7 +237,6 @@ #@BUILD_PLUGIN_WRITE_KAFKA_TRUE@LoadPlugin write_kafka #@BUILD_PLUGIN_WRITE_LOG_TRUE@LoadPlugin write_log #@BUILD_PLUGIN_WRITE_MONGODB_TRUE@LoadPlugin write_mongodb -#@BUILD_PLUGIN_WRITE_OPEN_TELEMETRY_TRUE@LoadPlugin write_open_telemetry #@BUILD_PLUGIN_WRITE_PROMETHEUS_TRUE@LoadPlugin write_prometheus #@BUILD_PLUGIN_WRITE_REDIS_TRUE@LoadPlugin write_redis #@BUILD_PLUGIN_WRITE_RIEMANN_TRUE@LoadPlugin write_riemann @@ -1331,6 +1331,22 @@ # CollectUserCount false # +# +# +# EnableSSL false +# SSLCACertificateFile "/path/to/root.pem" +# SSLCertificateFile "/path/to/client.pem" +# SSLCertificateKeyFile "/path/to/client.key" +# VerifyPeer true +# +# +# EnableSSL false +# SSLCACertificateFile "/path/to/root.pem" +# SSLCertificateFile "/path/to/client.pem" +# SSLCertificateKeyFile "/path/to/client.key" +# +# + # # # Statement "SELECT category, COUNT(*) AS value FROM products WHERE in_stock = 0 GROUP BY category" @@ -2070,13 +2086,6 @@ # # -# -# -# Host "localhost" -# Port "4317" -# -# - # # Port "9103" # diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index 631c3ee927..09036da19b 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -7153,6 +7153,101 @@ can be configured independently from that option. Defaults to B. =back +=head2 Plugin C + +The I implements an OpenTelemetry exporter and receiver +using OTLP. Specifically, it implements a gRPC C client and +server. + +B + + + + EnableSSL false + SSLCACertificateFile "/path/to/root.pem" + SSLCertificateFile "/path/to/client.pem" + SSLCertificateKeyFile "/path/to/client.key" + VerifyPeer true + + + EnableSSL false + SSLCACertificateFile "/path/to/root.pem" + SSLCertificateFile "/path/to/client.pem" + SSLCertificateKeyFile "/path/to/client.key" + + + +=over 4 + +=item B I [I] + +The B statement sets the network address and port to which to bind. +Multiple B blocks can be specified to listen on multiple +interfaces/ports. When no B blocks are specified, the plugin will not +receive any metrics. + +The argument I may be a hostname, an IPv4 address, or an IPv6 address. + +The I argument may be omitted. In that case the default C<"4317"> is +used. + +Optionally, B blocks support the following options: + +=over 4 + +=item B I|I + +Whether to enable SSL for incoming connections. Default: I. + +=item B I + +=item B I + +=item B I + +Filenames specifying SSL certificate and key material to be used with SSL +connections. + +=item B B|B + +When enabled, a valid client certificate is required to connect to the server. +When disabled, a client certificate is not requested and any unsolicited client +certificate is accepted. +Enabled by default. + +=back + +=item B I [I] + +The B option configures an OTLP export to the given collector +address. Multiple collectors can be specified using multiple B lines. + +The argument I may be a hostname, an IPv4 address, or an IPv6 address. + +The I argument may be omitted. In that case the default C<"4317"> is +used. + +Optionally, B blocks support the following options: + +=over 4 + +=item B I|I + +Whether to require SSL when connecting. Default: I. + +=item B I + +=item B I + +=item B I + +Filenames specifying SSL certificate and key material to be used with SSL +connections. + +=back + +=back + =head2 Plugin C The "oracle" plugin uses the Oracle® Call Interface I<(OCI)> to connect to an @@ -11151,35 +11246,6 @@ want to use authentication all three fields must be set. =back -=head2 Plugin C - -The I will export metrics to an I using the gRPC based OTLP protocol. - -B - - - - Host "localhost" - Port "4317" - - - -The plugin can export metrics to multiple collectors by specifying multiple -B blocks. Within the B blocks, the following options are available: - -=over 4 - -=item B I
- -Hostname or address to connect to. Defaults to C. - -=item B I - -Service name or port number to connect to. Defaults to C<4317>. - -=back - =head2 Plugin C The I implements a tiny webserver that can be scraped diff --git a/src/daemon/utils_cache.c b/src/daemon/utils_cache.c index 780591c7e2..ea0a1a7dbd 100644 --- a/src/daemon/utils_cache.c +++ b/src/daemon/utils_cache.c @@ -523,13 +523,13 @@ int uc_get_value_by_name(const char *name, value_t *ret_values) { /* remove missing values from getval */ if (ce->state == STATE_MISSING) { - status = -1; + status = EAGAIN; } else { *ret_values = ce->last_value; } } else { DEBUG("utils_cache: uc_get_value_by_name: No such value: %s", name); - status = -1; + status = ENOENT; } pthread_mutex_unlock(&cache_lock); @@ -739,7 +739,7 @@ int uc_get_history_by_name(const char *name, gauge_t *ret_history, status = c_avl_get(cache_tree, name, (void *)&ce); if (status != 0) { pthread_mutex_unlock(&cache_lock); - return -ENOENT; + return ENOENT; } /* Check if there are enough values available. If not, increase the buffer * size. */ @@ -749,7 +749,7 @@ int uc_get_history_by_name(const char *name, gauge_t *ret_history, tmp = realloc(ce->history, sizeof(*ce->history) * num_steps); if (tmp == NULL) { pthread_mutex_unlock(&cache_lock); - return -ENOMEM; + return ENOMEM; } for (size_t i = ce->history_length; i < num_steps; i++) diff --git a/src/open_telemetry.cc b/src/open_telemetry.cc new file mode 100644 index 0000000000..10aa2510f1 --- /dev/null +++ b/src/open_telemetry.cc @@ -0,0 +1,69 @@ +/** + * collectd - src/open_telemetry.cc + * Copyright (C) 2024 Florian octo Forster + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * Authors: + * Florian octo Forster + **/ + +extern "C" { +#include "collectd.h" + +#include "daemon/configfile.h" +#include "daemon/plugin.h" +} + +#include "open_telemetry.h" + +static int ot_config(oconfig_item_t *ci) { + for (int i = 0; i < ci->children_num; i++) { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp("Exporter", child->key) == 0) { + int err = exporter_config(child); + if (err) { + ERROR("open_telemetry plugin: Configuring exporter failed " + "with status %d", + err); + return err; + } + } else if (strcasecmp("Receiver", child->key) == 0) { + int err = receiver_config(child); + if (err) { + ERROR("open_telemetry plugin: Configuring receiver failed " + "with status %d", + err); + return err; + } + } else { + ERROR("open_telemetry plugin: invalid config option: \"%s\"", child->key); + return EINVAL; + } + } + + return 0; +} + +extern "C" { +void module_register(void) { + plugin_register_complex_config("open_telemetry", ot_config); +} +} diff --git a/src/open_telemetry.h b/src/open_telemetry.h new file mode 100644 index 0000000000..2b332d9d7b --- /dev/null +++ b/src/open_telemetry.h @@ -0,0 +1,42 @@ +/** + * collectd - src/open_telemetry.h + * Copyright (C) 2024 Florian octo Forster + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * Authors: + * Florian octo Forster + **/ + +#ifndef OPEN_TELEMETRY_H +#define OPEN_TELEMETRY_H 1 + +extern "C" { +#include "daemon/collectd.h" +#include "daemon/configfile.h" +} + +#include + +int exporter_config(oconfig_item_t *ci); +int receiver_config(oconfig_item_t *ci); + +int config_get_file(oconfig_item_t const *ci, grpc::string *out); + +#endif diff --git a/src/write_open_telemetry.cc b/src/open_telemetry_exporter.cc similarity index 66% rename from src/write_open_telemetry.cc rename to src/open_telemetry_exporter.cc index 8491724c09..443a65f1fb 100644 --- a/src/write_open_telemetry.cc +++ b/src/open_telemetry_exporter.cc @@ -1,6 +1,6 @@ /** - * collectd - src/write_open_telemetry.cc - * Copyright (C) 2023 Florian octo Forster + * collectd - src/open_telemetry_exporter.cc + * Copyright (C) 2023-2024 Florian octo Forster * * Permission is hereby granted, free of charge, to any person obtaining a * copy of this software and associated documentation files (the "Software"), @@ -24,21 +24,10 @@ * Florian octo Forster **/ -/* write_open_telemetry plugin configuration example - * - * - * - * Host "localhost" - * Port "8080" - * Path "/v1/metrics" - * - * - */ - extern "C" { #include "collectd.h" -#include "plugin.h" +#include "daemon/plugin.h" #include "utils/common/common.h" #include "utils/resource_metrics/resource_metrics.h" @@ -48,14 +37,13 @@ extern "C" { #include } +#include #include #include "opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h" #include "utils/format_open_telemetry/format_open_telemetry.h" -#ifndef OT_DEFAULT_HOST -#define OT_DEFAULT_HOST "localhost" -#endif +#include "open_telemetry.h" #ifndef OT_DEFAULT_PORT #define OT_DEFAULT_PORT "4317" @@ -66,15 +54,17 @@ using opentelemetry::proto::collector::metrics::v1:: using opentelemetry::proto::collector::metrics::v1::MetricsService; /* - * Private variables + * Private types */ typedef struct { - char *name; int reference_count; char *host; char *port; + bool use_ssl; + grpc::SslCredentialsOptions *ssl_opts; + resource_metrics_set_t resource_metrics; cdtime_t staged_time; @@ -85,14 +75,15 @@ typedef struct { static int export_metrics(ot_callback_t *cb) { if (cb->stub == NULL) { - strbuf_t buf = STRBUF_CREATE; - strbuf_printf(&buf, "%s:%s", cb->host, cb->port); + grpc::string addr = grpc::string(cb->host) + ":" + grpc::string(cb->port); - auto chan = - grpc::CreateChannel(buf.ptr, grpc::InsecureChannelCredentials()); - cb->stub = MetricsService::NewStub(chan); + auto creds = grpc::InsecureChannelCredentials(); + if (cb->use_ssl) { + creds = grpc::SslCredentials(*cb->ssl_opts); + } - STRBUF_DESTROY(buf); + auto chan = grpc::CreateChannel(addr, creds); + cb->stub = MetricsService::NewStub(chan); } auto req = format_open_telemetry_export_metrics_service_request( @@ -103,7 +94,7 @@ static int export_metrics(ot_callback_t *cb) { grpc::Status status = cb->stub->Export(&context, *req, &resp); if (!status.ok()) { - ERROR("write_open_telemetry plugin: Exporting metrics failed: %s", + ERROR("open_telemetry plugin: Exporting metrics failed: %s", status.error_message().c_str()); return -1; } @@ -111,8 +102,7 @@ static int export_metrics(ot_callback_t *cb) { if (resp.has_partial_success() && resp.partial_success().rejected_data_points() > 0) { auto ps = resp.partial_success(); - NOTICE("write_open_telemetry plugin: %" PRId64 - " data points were rejected: %s", + NOTICE("open_telemetry plugin: %" PRId64 " data points were rejected: %s", ps.rejected_data_points(), ps.error_message().c_str()); } @@ -156,10 +146,12 @@ static void ot_callback_decref(void *data) { cb->stub.reset(); - sfree(cb->name); sfree(cb->host); sfree(cb->port); + delete cb->ssl_opts; + cb->ssl_opts = NULL; + pthread_mutex_unlock(&cb->mu); pthread_mutex_destroy(&cb->mu); @@ -196,43 +188,81 @@ static int ot_write(metric_family_t const *fam, user_data_t *user_data) { return 0; } -static int ot_config_node(oconfig_item_t *ci) { +int config_get_file(oconfig_item_t const *ci, grpc::string *out) { + char *path = NULL; + int err = cf_util_get_string(ci, &path); + if (err) { + return err; + } + + std::ifstream f; + f.open(path); + if (!f.is_open()) { + ERROR("open_telemetry plugin: Failed to open \"%s\"", path); + return EPERM; + } + + grpc::string line; + while (std::getline(f, line)) { + out->append(line); + out->push_back('\n'); + } + f.close(); + return 0; +} /* config_get_file */ + +int exporter_config(oconfig_item_t *ci) { + if (ci->values_num < 1 || ci->values_num > 2 || + ci->values[0].type != OCONFIG_TYPE_STRING || + (ci->values_num > 1 && ci->values[1].type != OCONFIG_TYPE_STRING)) { + ERROR("open_telemetry plugin: The \"%s\" config option needs " + "one or two string arguments (address and port).", + ci->key); + return EINVAL; + } + ot_callback_t *cb = (ot_callback_t *)calloc(1, sizeof(*cb)); if (cb == NULL) { - ERROR("write_open_telemetry plugin: calloc failed."); - return -1; + ERROR("open_telemetry plugin: calloc failed."); + return ENOMEM; } cb->reference_count = 1; - cf_util_get_string(ci, &cb->name); - cb->host = strdup(OT_DEFAULT_HOST); - cb->port = strdup(OT_DEFAULT_PORT); - + cb->host = strdup(ci->values[0].value.string); + if (ci->values_num >= 2) { + cb->port = strdup(ci->values[1].value.string); + } else { + cb->port = strdup(OT_DEFAULT_PORT); + } + cb->ssl_opts = new grpc::SslCredentialsOptions; pthread_mutex_init(&cb->mu, /* attr = */ NULL); for (int i = 0; i < ci->children_num; i++) { oconfig_item_t *child = ci->children + i; - int status = 0; - if (strcasecmp("Host", child->key) == 0) - status = cf_util_get_string(child, &cb->host); - else if (strcasecmp("Port", child->key) == 0) - status = cf_util_get_service(child, &cb->port); - else { - ERROR("write_open_telemetry plugin: Invalid configuration " - "option: %s.", - child->key); - status = -1; + int err = 0; + if (!strcasecmp("EnableSSL", child->key)) { + err = cf_util_get_boolean(child, &cb->use_ssl); + } else if (!strcasecmp("SSLCACertificateFile", child->key)) { + err = config_get_file(child, &cb->ssl_opts->pem_root_certs); + } else if (!strcasecmp("SSLCertificateKeyFile", child->key)) { + err = config_get_file(child, &cb->ssl_opts->pem_private_key); + } else if (!strcasecmp("SSLCertificateFile", child->key)) { + err = config_get_file(child, &cb->ssl_opts->pem_cert_chain); + } else { + ERROR("open_telemetry plugin: Option \"%s\" is not allowed inside a " + "\"%s\" block.", + child->key, ci->key); + err = EINVAL; } - if (status != 0) { - ot_callback_decref(cb); - return status; + if (err) { + return err; } } strbuf_t callback_name = STRBUF_CREATE; - strbuf_printf(&callback_name, "write_open_telemetry/%s", cb->name); + strbuf_printf(&callback_name, "open_telemetry/[%s]:%s", cb->host, cb->port); user_data_t user_data = { .data = cb, @@ -254,23 +284,3 @@ static int ot_config_node(oconfig_item_t *ci) { STRBUF_DESTROY(callback_name); return 0; } - -static int ot_config(oconfig_item_t *ci) { - for (int i = 0; i < ci->children_num; i++) { - oconfig_item_t *child = ci->children + i; - - if (strcasecmp("Node", child->key) == 0) - ot_config_node(child); - else { - ERROR("write_open_telemetry plugin: Invalid configuration " - "option: %s.", - child->key); - } - } - - return 0; -} - -void module_register(void) { - plugin_register_complex_config("write_open_telemetry", ot_config); -} diff --git a/src/open_telemetry_receiver.cc b/src/open_telemetry_receiver.cc new file mode 100644 index 0000000000..c0dd114fdf --- /dev/null +++ b/src/open_telemetry_receiver.cc @@ -0,0 +1,502 @@ +/** + * collectd - src/open_telemetry_receiver.cc + * Copyright (C) 2015-2016 Sebastian Harl + * Copyright (C) 2016-2024 Florian octo Forster + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * Authors: + * Sebastian Harl + * Florian octo Forster + **/ + +extern "C" { +#include "daemon/collectd.h" +#include "daemon/metric.h" +#include "daemon/utils_cache.h" +#include "utils/common/common.h" +} + +#include +#include + +#include +#if HAVE_GRPCPP_REFLECTION +#include +#endif + +#include "opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.h" +#include "opentelemetry/proto/collector/metrics/v1/metrics_service.pb.h" +#include "opentelemetry/proto/common/v1/common.pb.h" +#include "opentelemetry/proto/metrics/v1/metrics.pb.h" +#include "opentelemetry/proto/resource/v1/resource.pb.h" + +#include "open_telemetry.h" + +#ifndef OT_DEFAULT_PORT +#define OT_DEFAULT_PORT "4317" +#endif + +using opentelemetry::proto::collector::metrics::v1::ExportMetricsPartialSuccess; +using opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest; +using opentelemetry::proto::collector::metrics::v1:: + ExportMetricsServiceResponse; +using opentelemetry::proto::collector::metrics::v1::MetricsService; +using opentelemetry::proto::common::v1::AnyValue; +using opentelemetry::proto::common::v1::KeyValue; +using opentelemetry::proto::metrics::v1::AGGREGATION_TEMPORALITY_CUMULATIVE; +using opentelemetry::proto::metrics::v1::AGGREGATION_TEMPORALITY_DELTA; +using opentelemetry::proto::metrics::v1::AGGREGATION_TEMPORALITY_UNSPECIFIED; +using opentelemetry::proto::metrics::v1::AggregationTemporality; +using opentelemetry::proto::metrics::v1::Gauge; +using opentelemetry::proto::metrics::v1::Metric; +using opentelemetry::proto::metrics::v1::NumberDataPoint; +using opentelemetry::proto::metrics::v1::ResourceMetrics; +using opentelemetry::proto::metrics::v1::Sum; +using opentelemetry::proto::resource::v1::Resource; + +/* + * private types + */ + +struct Listener { + grpc::string addr; + grpc::string port; + + grpc::SslServerCredentialsOptions *ssl; +}; +static std::vector listeners; + +/* + * proto conversion + */ +static grpc::Status wrap_error(int err) { + if (!err) { + return grpc::Status::OK; + } + return grpc::Status(grpc::StatusCode::INTERNAL, "wrapped internal error"); +} + +static grpc::Status unmarshal_label_pair(KeyValue kv, label_set_t *labels) { + switch (kv.value().value_case()) { + case AnyValue::kStringValue: + return wrap_error(label_set_add(labels, kv.key().c_str(), + kv.value().string_value().c_str())); + case AnyValue::kBoolValue: + return wrap_error(label_set_add( + labels, kv.key().c_str(), kv.value().bool_value() ? "true" : "false")); + case AnyValue::kIntValue: { + char buf[64] = {0}; + snprintf(buf, sizeof(buf), "%" PRId64, kv.value().int_value()); + return wrap_error(label_set_add(labels, kv.key().c_str(), buf)); + } + case AnyValue::kDoubleValue: { + char buf[64] = {0}; + snprintf(buf, sizeof(buf), GAUGE_FORMAT, kv.value().double_value()); + return wrap_error(label_set_add(labels, kv.key().c_str(), buf)); + } + case AnyValue::kArrayValue: + return grpc::Status(grpc::StatusCode::UNIMPLEMENTED, + "array labels are not supported"); + case AnyValue::kKvlistValue: + return grpc::Status(grpc::StatusCode::UNIMPLEMENTED, + "key/value list labels are not supported"); + case AnyValue::kBytesValue: + return grpc::Status(grpc::StatusCode::UNIMPLEMENTED, + "byte labels are not supported"); + default: + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "unexpected label value type"); + } +} + +static grpc::Status unmarshal_data_point(NumberDataPoint dp, + metric_family_t *fam, + AggregationTemporality agg) { + metric_t m = { + .family = fam, // family needs to be populated for uc_get_value(). + .time = NS_TO_CDTIME_T(dp.time_unix_nano()), + }; + + bool is_cumulative = (agg == AGGREGATION_TEMPORALITY_DELTA || + agg == AGGREGATION_TEMPORALITY_CUMULATIVE); + + value_t offset = {0}; + if (agg == AGGREGATION_TEMPORALITY_DELTA) { + int err = uc_get_value(&m, &offset); + switch (err) { + case ENOENT: + case EAGAIN: + offset = (value_t){0}; + break; + case 0: + // no-op + break; + default: + return wrap_error(err); + } + } + + switch (dp.value_case()) { + case NumberDataPoint::kAsDouble: + if (is_cumulative) { + fam->type = METRIC_TYPE_FPCOUNTER; + m.value.fpcounter = dp.as_double(); + break; + } + m.value.gauge = dp.as_double(); + break; + case NumberDataPoint::kAsInt: + if (is_cumulative) { + m.value.counter = offset.counter + (counter_t)dp.as_int(); + break; + } + m.value.gauge = (gauge_t)dp.as_int(); + break; + default: + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "unexpected data point value type"); + } + + for (auto kv : dp.attributes()) { + grpc::Status s = unmarshal_label_pair(kv, &m.label); + if (!s.ok()) { + metric_reset(&m); + return s; + } + } + + // TODO(octo): get the first metric time from the cache and detect counter + // resets. + + int err = metric_family_metric_append(fam, m); + + metric_reset(&m); + return wrap_error(err); +} + +static grpc::Status unmarshal_gauge_metric(Gauge g, metric_family_t *fam) { + for (auto db : g.data_points()) { + grpc::Status s = + unmarshal_data_point(db, fam, AGGREGATION_TEMPORALITY_UNSPECIFIED); + if (!s.ok()) { + return s; + } + } + + return grpc::Status::OK; +} + +static grpc::Status unmarshal_sum_metric(Sum sum, metric_family_t *fam) { + if (!sum.is_monotonic()) { + // TODO(octo): convert to gauge instead? + DEBUG("open_telemetry plugin: non-monotonic sums (aka. UpDownCounters) " + "are unsupported"); + return grpc::Status( + grpc::StatusCode::UNIMPLEMENTED, + "non-monotonic sums (aka. UpDownCounters) are unsupported"); + } + + for (auto db : sum.data_points()) { + grpc::Status s = + unmarshal_data_point(db, fam, sum.aggregation_temporality()); + if (!s.ok()) { + return s; + } + } + + return grpc::Status::OK; +} + +static grpc::Status reject_data_points(std::string msg, int num, + ExportMetricsPartialSuccess *ps) { + int64_t rejected = ps->rejected_data_points(); + rejected += (int64_t)num; + ps->set_rejected_data_points(rejected); + + std::string *error_message = ps->mutable_error_message(); + if (!error_message->empty()) { + error_message->append(", "); + } + error_message->append(msg); + + return grpc::Status::OK; +} + +static grpc::Status dispatch_metric(Metric mpb, label_set_t resource, + ExportMetricsPartialSuccess *ps) { + metric_family_t fam = { + .name = (char *)mpb.name().c_str(), + .help = (char *)mpb.description().c_str(), + .unit = (char *)mpb.unit().c_str(), + .resource = resource, + }; + + switch (mpb.data_case()) { + case Metric::kGauge: { + fam.type = METRIC_TYPE_GAUGE; + grpc::Status s = unmarshal_gauge_metric(mpb.gauge(), &fam); + if (!s.ok()) { + metric_family_metric_reset(&fam); + return reject_data_points(s.error_message(), + mpb.gauge().data_points().size(), ps); + } + break; + } + case Metric::kSum: { + fam.type = METRIC_TYPE_COUNTER; + grpc::Status s = unmarshal_sum_metric(mpb.sum(), &fam); + if (!s.ok()) { + metric_family_metric_reset(&fam); + return reject_data_points(s.error_message(), + mpb.sum().data_points().size(), ps); + } + break; + } + case Metric::kHistogram: + return reject_data_points( + std::string("histogram metrics are not supported"), + mpb.histogram().data_points().size(), ps); + case Metric::kExponentialHistogram: + return reject_data_points( + std::string("exponential histogram metrics are not supported"), + mpb.exponential_histogram().data_points().size(), ps); + case Metric::kSummary: { + return reject_data_points(std::string("summary metrics are not supported"), + mpb.summary().data_points().size(), ps); + } + default: + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "unexpected data type"); + } + + int err = plugin_dispatch_metric_family(&fam); + + metric_family_metric_reset(&fam); + return wrap_error(err); +} + +static grpc::Status unmarshal_resource(Resource rpb, label_set_t *resource) { + for (auto kv : rpb.attributes()) { + grpc::Status s = unmarshal_label_pair(kv, resource); + if (!s.ok()) { + return s; + } + } + + return grpc::Status::OK; +} + +static grpc::Status dispatch_resource_metrics(ResourceMetrics rm, + ExportMetricsPartialSuccess *ps) { + label_set_t resource = {0}; + + grpc::Status s = unmarshal_resource(rm.resource(), &resource); + if (!s.ok()) { + return s; + } + + for (auto sm : rm.scope_metrics()) { + for (auto m : sm.metrics()) { + grpc::Status s = dispatch_metric(m, resource, ps); + if (!s.ok()) { + return s; + } + } + } + + label_set_reset(&resource); + return grpc::Status::OK; +} + +/* + * OpenTelemetry MetricsService + */ +class OTMetricsService : public MetricsService::Service { +public: + grpc::Status Export(grpc::ServerContext *context, + const ExportMetricsServiceRequest *req, + ExportMetricsServiceResponse *resp) { + ExportMetricsPartialSuccess *ps = resp->mutable_partial_success(); + + for (auto rm : req->resource_metrics()) { + grpc::Status s = dispatch_resource_metrics(rm, ps); + if (!s.ok()) { + ERROR("open_telemetry plugin: dispatch_resource_metrics failed: %s", + s.error_message().c_str()); + return s; + } + } + + return grpc::Status::OK; + } +}; + +/* + * gRPC server implementation + */ +class CollectorServer final { +public: + void Start() { +#if HAVE_GRPCPP_REFLECTION + grpc::reflection::InitProtoReflectionServerBuilderPlugin(); +#endif + auto auth = grpc::InsecureServerCredentials(); + + grpc::ServerBuilder builder; + + for (auto l : listeners) { + grpc::string addr = l.addr + ":" + l.port; + + auto use_ssl = grpc::string(""); + auto a = auth; + if (l.ssl != nullptr) { + use_ssl = grpc::string(" (SSL enabled)"); + a = grpc::SslServerCredentials(*l.ssl); + } + + builder.AddListeningPort(addr, a); + INFO("open_telemetry plugin: Listening on %s%s", addr.c_str(), + use_ssl.c_str()); + } + + builder.RegisterService(&metrics_service); + + server = builder.BuildAndStart(); + } + + void Shutdown() { server->Shutdown(); } + +private: + OTMetricsService metrics_service; + + std::unique_ptr server; +}; + +static CollectorServer *server = nullptr; + +static int receiver_init(void) { + if (server) { + return 0; + } + + server = new CollectorServer(); + if (!server) { + ERROR("open_telemetry plugin: Failed to create server"); + return -1; + } + + server->Start(); + return 0; +} /* receiver_init() */ + +static int receiver_shutdown(void) { + if (!server) + return 0; + + server->Shutdown(); + + delete server; + server = nullptr; + + return 0; +} /* receiver_shutdown() */ + +static void receiver_install_callbacks(void) { + static bool done; + + if (!done) { + plugin_register_init("open_telemetry_receiver", receiver_init); + plugin_register_shutdown("open_telemetry_receiver", receiver_shutdown); + } + + done = true; +} + +/* + * collectd plugin interface + */ +int receiver_config(oconfig_item_t *ci) { + if (ci->values_num < 1 || ci->values_num > 2 || + ci->values[0].type != OCONFIG_TYPE_STRING || + (ci->values_num > 1 && ci->values[1].type != OCONFIG_TYPE_STRING)) { + ERROR("open_telemetry plugin: The \"%s\" config option needs " + "one or two string arguments (address and port).", + ci->key); + return EINVAL; + } + + auto listener = Listener(); + listener.addr = grpc::string(ci->values[0].value.string); + if (ci->values_num >= 2) { + listener.port = grpc::string(ci->values[1].value.string); + } else { + listener.port = grpc::string(OT_DEFAULT_PORT); + } + listener.ssl = nullptr; + + auto ssl_opts = new grpc::SslServerCredentialsOptions( + GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY); + grpc::SslServerCredentialsOptions::PemKeyCertPair pkcp = {}; + bool use_ssl = false; + + for (int i = 0; i < ci->children_num; i++) { + oconfig_item_t *child = ci->children + i; + + int err = 0; + if (!strcasecmp("EnableSSL", child->key)) { + err = cf_util_get_boolean(child, &use_ssl); + } else if (!strcasecmp("SSLCACertificateFile", child->key)) { + err = config_get_file(child, &ssl_opts->pem_root_certs); + } else if (!strcasecmp("SSLCertificateKeyFile", child->key)) { + err = config_get_file(child, &pkcp.private_key); + } else if (!strcasecmp("SSLCertificateFile", child->key)) { + err = config_get_file(child, &pkcp.cert_chain); + } else if (!strcasecmp("VerifyPeer", child->key)) { + bool verify = false; + err = cf_util_get_boolean(child, &verify); + if (!err) { + ssl_opts->client_certificate_request = + verify ? GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY + : GRPC_SSL_DONT_REQUEST_CLIENT_CERTIFICATE; + } + } else { + ERROR("open_telemetry plugin: Option \"%s\" is not allowed inside a " + "\"%s\" block.", + child->key, ci->key); + err = EINVAL; + } + + if (err) { + delete ssl_opts; + return err; + } + } + + if (use_ssl) { + ssl_opts->pem_key_cert_pairs.push_back(pkcp); + listener.ssl = ssl_opts; + } else { + delete ssl_opts; + } + + listeners.push_back(listener); + receiver_install_callbacks(); + return 0; +} /* receiver_config() */