From dc81a50a9fc5f2a5ce6978aa064fdfab1618328b Mon Sep 17 00:00:00 2001 From: Tim Pansino Date: Sat, 6 May 2023 14:16:14 -0700 Subject: [PATCH] Wiring dimensional metrics --- newrelic/api/application.py | 8 ++ newrelic/api/transaction.py | 52 +++++++++- newrelic/common/metric_utils.py | 31 ++++++ newrelic/core/agent.py | 27 ++++++ newrelic/core/application.py | 53 +++++++++++ newrelic/core/data_collector.py | 21 +++++ newrelic/core/stats_engine.py | 151 +++++++++++++++++++++++++++++- newrelic/core/transaction_node.py | 1 + 8 files changed, 342 insertions(+), 2 deletions(-) create mode 100644 newrelic/common/metric_utils.py diff --git a/newrelic/api/application.py b/newrelic/api/application.py index ea57829f28..3210cecbf1 100644 --- a/newrelic/api/application.py +++ b/newrelic/api/application.py @@ -142,6 +142,14 @@ def record_custom_metrics(self, metrics): if self.active and metrics: self._agent.record_custom_metrics(self._name, metrics) + def record_dimensional_metric(self, name, value, tags=None): + if self.active: + self._agent.record_dimensional_metric(self._name, name, value, tags) + + def record_dimensional_metrics(self, metrics): + if self.active and metrics: + self._agent.record_dimensional_metrics(self._name, metrics) + def record_custom_event(self, event_type, params): if self.active: self._agent.record_custom_event(self._name, event_type, params) diff --git a/newrelic/api/transaction.py b/newrelic/api/transaction.py index f04bcba849..461a78d67a 100644 --- a/newrelic/api/transaction.py +++ b/newrelic/api/transaction.py @@ -64,7 +64,7 @@ from newrelic.core.custom_event import create_custom_event from newrelic.core.log_event_node import LogEventNode from newrelic.core.stack_trace import exception_stack -from newrelic.core.stats_engine import CustomMetrics, SampledDataSet +from newrelic.core.stats_engine import CustomMetrics, DimensionalMetrics, SampledDataSet from newrelic.core.thread_utilization import utilization_tracker from newrelic.core.trace_cache import ( TraceCacheActiveTraceError, @@ -307,6 +307,7 @@ def __init__(self, application, enabled=None, source=None): self.synthetics_header = None self._custom_metrics = CustomMetrics() + self._dimensional_metrics = DimensionalMetrics() global_settings = application.global_settings @@ -588,6 +589,7 @@ def __exit__(self, exc, value, tb): apdex_t=self.apdex, suppress_apdex=self.suppress_apdex, custom_metrics=self._custom_metrics, + dimensional_metrics=self._dimensional_metrics, guid=self.guid, cpu_time=self._cpu_user_time_value, suppress_transaction_trace=self.suppress_transaction_trace, @@ -1600,6 +1602,16 @@ def record_custom_metrics(self, metrics): for name, value in metrics: self._custom_metrics.record_custom_metric(name, value) + def record_dimensional_metric(self, name, value, tags=None): + self._dimensional_metrics.record_dimensional_metric(name, value, tags) + + def record_dimensional_metrics(self, metrics): + for metric in metrics: + name, value = metric[:2] + tags = metric[2] if len(metric) >= 3 else None + + self._dimensional_metrics.record_dimensional_metric(name, value, tags) + def record_custom_event(self, event_type, params): settings = self._settings @@ -1898,6 +1910,44 @@ def record_custom_metrics(metrics, application=None): application.record_custom_metrics(metrics) +def record_dimensional_metric(name, value, tags=None, application=None): + if application is None: + transaction = current_transaction() + if transaction: + transaction.record_dimensional_metric(name, value, tags) + else: + _logger.debug( + "record_dimensional_metric has been called but no " + "transaction was running. As a result, the following metric " + "has not been recorded. Name: %r Value: %r Tags: %r. To correct this " + "problem, supply an application object as a parameter to this " + "record_dimensional_metrics call.", + name, + value, + tags, + ) + elif application.enabled: + application.record_dimensional_metric(name, value, tags) + + +def record_dimensional_metrics(metrics, application=None): + if application is None: + transaction = current_transaction() + if transaction: + transaction.record_dimensional_metrics(metrics) + else: + _logger.debug( + "record_dimensional_metrics has been called but no " + "transaction was running. As a result, the following metrics " + "have not been recorded: %r. To correct this problem, " + "supply an application object as a parameter to this " + "record_dimensional_metric call.", + list(metrics), + ) + elif application.enabled: + application.record_dimensional_metrics(metrics) + + def record_custom_event(event_type, params, application=None): """Record a custom event. diff --git a/newrelic/common/metric_utils.py b/newrelic/common/metric_utils.py new file mode 100644 index 0000000000..a5db2a9a69 --- /dev/null +++ b/newrelic/common/metric_utils.py @@ -0,0 +1,31 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This module implements functions for creating a unique identity from a name and set of tags for use in dimensional metrics. +""" + +from newrelic.packages import six + + +def create_metric_identity(name, tags=None): + if tags: + if isinstance(tags, dict): + tags = frozenset(six.iteritems(tags)) if tags is not None else None + elif not isinstance(tags, frozenset): + tags = frozenset(tags) + elif tags is not None: + tags = None # Set empty iterables to None + + return (name, tags) diff --git a/newrelic/core/agent.py b/newrelic/core/agent.py index 6ab9571a45..d5227d474c 100644 --- a/newrelic/core/agent.py +++ b/newrelic/core/agent.py @@ -524,6 +524,33 @@ def record_custom_metrics(self, app_name, metrics): application.record_custom_metrics(metrics) + def record_dimensional_metric(self, app_name, name, value, tags=None): + """Records a basic metric for the named application. If there has + been no prior request to activate the application, the metric is + discarded. + + """ + + application = self._applications.get(app_name, None) + if application is None or not application.active: + return + + application.record_dimensional_metric(name, value, tags) + + def record_dimensional_metrics(self, app_name, metrics): + """Records the metrics for the named application. If there has + been no prior request to activate the application, the metric is + discarded. The metrics should be an iterable yielding tuples + consisting of the name and value. + + """ + + application = self._applications.get(app_name, None) + if application is None or not application.active: + return + + application.record_dimensional_metrics(metrics) + def record_custom_event(self, app_name, event_type, params): application = self._applications.get(app_name, None) if application is None or not application.active: diff --git a/newrelic/core/application.py b/newrelic/core/application.py index 7be2174280..f8f0480ae9 100644 --- a/newrelic/core/application.py +++ b/newrelic/core/application.py @@ -106,6 +106,9 @@ def __init__(self, app_name, linked_applications=None): self._stats_custom_lock = threading.RLock() self._stats_custom_engine = StatsEngine() + self._stats_dimensional_lock = threading.RLock() + self._stats_dimensional_engine = StatsEngine() + self._agent_commands_lock = threading.Lock() self._data_samplers_lock = threading.Lock() self._data_samplers_started = False @@ -510,6 +513,9 @@ def connect_to_data_collector(self, activate_agent): with self._stats_custom_lock: self._stats_custom_engine.reset_stats(configuration) + with self._stats_dimensional_lock: + self._stats_dimensional_engine.reset_stats(configuration) + # Record an initial start time for the reporting period and # clear record of last transaction processed. @@ -860,6 +866,50 @@ def record_custom_metrics(self, metrics): self._global_events_account += 1 self._stats_custom_engine.record_custom_metric(name, value) + def record_dimensional_metric(self, name, value, tags=None): + """Record a dimensional metric against the application independent + of a specific transaction. + + NOTE that this will require locking of the stats engine for + dimensional metrics and so under heavy use will have performance + issues. It is better to record the dimensional metric against an + active transaction as they will then be aggregated at the end of + the transaction when all other metrics are aggregated and so no + additional locking will be required. + + """ + + if not self._active_session: + return + + with self._stats_dimensional_lock: + self._global_events_account += 1 + self._stats_dimensional_engine.record_dimensional_metric(name, value, tags) + + def record_dimensional_metrics(self, metrics): + """Record a set of dimensional metrics against the application + independent of a specific transaction. + + NOTE that this will require locking of the stats engine for + dimensional metrics and so under heavy use will have performance + issues. It is better to record the dimensional metric against an + active transaction as they will then be aggregated at the end of + the transaction when all other metrics are aggregated and so no + additional locking will be required. + + """ + + if not self._active_session: + return + + with self._stats_dimensional_lock: + for metric in metrics: + name, value = metric[:2] + tags = metric[2] if len(metric) >= 3 else None + + self._global_events_account += 1 + self._stats_dimensional_engine.record_dimensional_metric(name, value, tags) + def record_custom_event(self, event_type, params): if not self._active_session: return @@ -1416,11 +1466,14 @@ def harvest(self, shutdown=False, flexible=False): _logger.debug("Normalizing metrics for harvest of %r.", self._app_name) metric_data = stats.metric_data(metric_normalizer) + dimensional_metric_data = stats.dimensional_metric_data(metric_normalizer) _logger.debug("Sending metric data for harvest of %r.", self._app_name) # Send metrics self._active_session.send_metric_data(self._period_start, period_end, metric_data) + if dimensional_metric_data: + self._active_session.send_dimensional_metric_data(self._period_start, period_end, dimensional_metric_data) _logger.debug("Done sending data for harvest of %r.", self._app_name) diff --git a/newrelic/core/data_collector.py b/newrelic/core/data_collector.py index 985e37240d..58303aa3be 100644 --- a/newrelic/core/data_collector.py +++ b/newrelic/core/data_collector.py @@ -31,6 +31,8 @@ _logger = logging.getLogger(__name__) +DIMENSIONAL_METRIC_DATA_TEMP = [] # TODO: REMOVE THIS + class Session(object): PROTOCOL = AgentProtocol @@ -128,6 +130,25 @@ def send_metric_data(self, start_time, end_time, metric_data): payload = (self.agent_run_id, start_time, end_time, metric_data) return self._protocol.send("metric_data", payload) + def send_dimensional_metric_data(self, start_time, end_time, metric_data): + """Called to submit dimensional metric data for specified period of time. + Time values are seconds since UNIX epoch as returned by the + time.time() function. The metric data should be iterable of + specific metrics. + + NOTE: This data is sent not sent to the normal agent endpoints but is sent + to the MELT API endpoints to keep the entity separate. This is for use + with the machine learning integration only. + """ + + payload = (self.agent_run_id, start_time, end_time, metric_data) + # return self._protocol.send("metric_data", payload) + + # TODO: REMOVE THIS. Replace with actual protocol. + DIMENSIONAL_METRIC_DATA_TEMP.append(payload) + _logger.debug("Dimensional Metrics: %r" % metric_data) + return 200 + def send_log_events(self, sampling_info, log_event_data): """Called to submit sample set for log events.""" diff --git a/newrelic/core/stats_engine.py b/newrelic/core/stats_engine.py index 203e3e7960..38a9ff466e 100644 --- a/newrelic/core/stats_engine.py +++ b/newrelic/core/stats_engine.py @@ -35,6 +35,7 @@ from newrelic.api.settings import STRIP_EXCEPTION_MESSAGE from newrelic.api.time_trace import get_linking_metadata from newrelic.common.encoding_utils import json_encode +from newrelic.common.metric_utils import create_metric_identity from newrelic.common.object_names import parse_exc_info from newrelic.common.streaming_utils import StreamBuffer from newrelic.core.attribute import ( @@ -180,6 +181,11 @@ def merge_custom_metric(self, value): self.merge_raw_time_metric(value) + def merge_dimensional_metric(self, value): + """Merge data value.""" + + self.merge_raw_time_metric(value) + class CountStats(TimeStats): def merge_stats(self, other): @@ -234,6 +240,25 @@ def reset_metric_stats(self): """ self.__stats_table = {} +class DimensionalMetrics(CustomMetrics): + + """Extends CustomMetrics to allow a set of tags for metrics.""" + + def __contains__(self, key): + if not isinstance(key[1], frozenset): + # Convert tags dict to a frozen set for proper comparisons + create_metric_identity(*key) + return key in self.__stats_table + + def record_dimensional_metric(self, name, value, tags=None): + """Record a single value metric, merging the data with any data + from prior value metrics with the same name. + + """ + + key = create_metric_identity(name, tags) + self.record_custom_metric(key, value) + class SlowSqlStats(list): def __init__(self): @@ -433,6 +458,7 @@ class StatsEngine(object): def __init__(self): self.__settings = None self.__stats_table = {} + self.__dimensional_stats_table = {} self._transaction_events = SampledDataSet() self._error_events = SampledDataSet() self._custom_events = SampledDataSet() @@ -456,6 +482,10 @@ def settings(self): def stats_table(self): return self.__stats_table + @property + def dimensional_stats_table(self): + return self.__dimensional_stats_table + @property def transaction_events(self): return self._transaction_events @@ -494,7 +524,7 @@ def metrics_count(self): """ - return len(self.__stats_table) + return len(self.__stats_table) + len(self.__dimensional_stats_table) def record_apdex_metric(self, metric): """Record a single apdex metric, merging the data with any data @@ -865,6 +895,44 @@ def record_custom_metrics(self, metrics): for name, value in metrics: self.record_custom_metric(name, value) + def record_dimensional_metric(self, name, value, tags=None): + """Record a single value metric, merging the data with any data + from prior value metrics with the same name. + + """ + if isinstance(value, dict): + if len(value) == 1 and "count" in value: + new_stats = CountStats(call_count=value["count"]) + else: + new_stats = TimeStats(*c2t(**value)) + else: + new_stats = TimeStats(1, value, value, value, value, value**2) + + key = create_metric_identity(name, tags=None) + stats = self.__dimensional_stats_table.get(key) + if stats is None: + self.__dimensional_stats_table[key] = new_stats + else: + stats.merge_stats(new_stats) + + return key + + def record_dimensional_metrics(self, metrics): + """Record the value metrics supplied by the iterable, merging + the data with any data from prior value metrics with the same + name. + + """ + + if not self.__settings: + return + + for metric in metrics: + name, value = metric[:2] + tags = metric[2] if len(metric) >= 3 else None + + self.record_dimensional_metric(name, value, tags) + def record_slow_sql_node(self, node): """Record a single sql metric, merging the data with any data from prior sql metrics for the same sql key. @@ -975,6 +1043,8 @@ def record_transaction(self, transaction): self.merge_custom_metrics(transaction.custom_metrics.metrics()) + self.merge_dimensional_metrics(transaction.dimensional_metrics.metrics()) + self.record_time_metrics(transaction.time_metrics(self)) # Capture any errors if error collection is enabled. @@ -1159,6 +1229,67 @@ def metric_data_count(self): return len(self.__stats_table) + def dimensional_metric_data(self, normalizer=None): + """Returns a list containing the low level metric data for + sending to the core application pertaining to the reporting + period. This consists of tuple pairs where first is dictionary + with name and scope keys with corresponding values, or integer + identifier if metric had an entry in dictionary mapping metric + (name, tags) as supplied from core application. The second is + the list of accumulated metric data, the list always being of + length 6. + + """ + + if not self.__settings: + return [] + + result = [] + normalized_stats = {} + + # Metric Renaming and Re-Aggregation. After applying the metric + # renaming rules, the metrics are re-aggregated to collapse the + # metrics with same names after the renaming. + + if self.__settings.debug.log_raw_metric_data: + _logger.info( + "Raw dimensional metric data for harvest of %r is %r.", + self.__settings.app_name, + list(six.iteritems(self.__dimensional_stats_table)), + ) + + if normalizer is not None: + for key, value in six.iteritems(self.__dimensional_stats_table): + key = (normalizer(key[0])[0], key[1]) + stats = normalized_stats.get(key) + if stats is None: + normalized_stats[key] = copy.copy(value) + else: + stats.merge_stats(value) + else: + normalized_stats = self.__dimensional_stats_table + + if self.__settings.debug.log_normalized_metric_data: + _logger.info( + "Normalized metric data for harvest of %r is %r.", + self.__settings.app_name, + list(six.iteritems(normalized_stats)), + ) + + for key, value in six.iteritems(normalized_stats): + key = dict(name=key[0], scope=key[1]) + result.append((key, value)) + + return result + + def dimensional_metric_data_count(self): + """Returns a count of the number of unique metrics.""" + + if not self.__settings: + return 0 + + return len(self.__dimensional_stats_table) + def error_data(self): """Returns a to a list containing any errors collected during the reporting period. @@ -1785,6 +1916,24 @@ def merge_custom_metrics(self, metrics): else: stats.merge_stats(other) + def merge_dimensional_metrics(self, metrics): + """ + Merges in a set of dimensional metrics. The metrics should be + provide as an iterable where each item is a tuple of the metric + key and the accumulated stats for the metric. The metric key should + also be a tuple, containing a name and attribute filtered frozenset of tags. + """ + + if not self.__settings: + return + + for key, other in metrics: + stats = self.__dimensional_stats_table.get(key) + if not stats: + self.__dimensional_stats_table[key] = other + else: + stats.merge_stats(other) + def _snapshot(self): copy = object.__new__(StatsEngineSnapshot) copy.__dict__.update(self.__dict__) diff --git a/newrelic/core/transaction_node.py b/newrelic/core/transaction_node.py index 0faae37909..fec351c756 100644 --- a/newrelic/core/transaction_node.py +++ b/newrelic/core/transaction_node.py @@ -64,6 +64,7 @@ "apdex_t", "suppress_apdex", "custom_metrics", + "dimensional_metrics", "guid", "cpu_time", "suppress_transaction_trace",