Skip to content

Commit

Permalink
chore(telemetry): ensure instrumentation telemetry is compatible with…
Browse files Browse the repository at this point in the history
… python 3.12 (#6859)

## Motivation

- Make the instrumentation telemetry client compatible with python3.12:
python/cpython#104826

## Description

 - Start telemetry worker thread as early as possible.
 - Delays sending all telemetry events until app-started is queued.
 - Refactors tests to align with this new logic.
 

## Risk 

- Telemetry events (metrics/logs/integrations) are queued as early as
possible but these events are only sent when the trace agent writer is
started. This **may** result in a memory leak if high cardinality
telemetry metrics and logs are added in the future. This is not a
concern right now.

## Checklist

- [x] Change(s) are motivated and described in the PR description.
- [x] Testing strategy is described if automated tests are not included
in the PR.
- [x] Risk is outlined (performance impact, potential for breakage,
maintainability, etc).
- [x] Change is maintainable (easy to change, telemetry, documentation).
- [x] [Library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
are followed. If no release note is required, add label
`changelog/no-changelog`.
- [x] Documentation is included (in-code, generated user docs, [public
corp docs](https://github.com/DataDog/documentation/)).
- [x] Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist

- [x] Title is accurate.
- [x] No unnecessary changes are introduced.
- [x] Description motivates each change.
- [x] Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes unless absolutely necessary.
- [x] Testing strategy adequately addresses listed risk(s).
- [x] Change is maintainable (easy to change, telemetry, documentation).
- [x] Release note makes sense to a user of the library.
- [x] Reviewer has explicitly acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment.
- [x] Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)
- [x] If this PR touches code that signs or publishes builds or
packages, or handles credentials of any kind, I've requested a review
from `@DataDog/security-design-and-guidance`.
- [x] This PR doesn't touch any of that.

---------

Co-authored-by: Yun Kim <[email protected]>
Co-authored-by: Tahir H. Butt <[email protected]>
Co-authored-by: Gabriele N. Tornetta <[email protected]>
Co-authored-by: Yun Kim <[email protected]>
Co-authored-by: Emmett Butler <[email protected]>
Co-authored-by: ZStriker19 <[email protected]>
  • Loading branch information
7 people committed Sep 13, 2023
1 parent 4575165 commit 51f3bac
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 71 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ jobs:
workflows:
setup:
jobs:
- setup
- setup
5 changes: 5 additions & 0 deletions ddtrace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@


telemetry.install_excepthook()
# In order to support 3.12, we start the writer upon initialization.
# See https://github.com/python/cpython/pull/104826.
# Telemetry events will only be sent after the `app-started` is queued.
# This will occur when the agent writer starts.
telemetry.telemetry_writer.enable()

from ._monkey import patch # noqa: E402
from ._monkey import patch_all # noqa: E402
Expand Down
7 changes: 7 additions & 0 deletions ddtrace/internal/telemetry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ def _excepthook(tp, value, root_traceback):
error_msg = "{}:{} {}".format(filename, lineno, str(value))
telemetry_writer.add_integration(integration_name, True, error_msg=error_msg)

if telemetry_writer.started is False:
telemetry_writer._app_started_event(False)
telemetry_writer._app_dependencies_loaded_event()

telemetry_writer.app_shutdown()
telemetry_writer.disable()

return _ORIGINAL_EXCEPTHOOK(tp, value, root_traceback)


Expand Down
6 changes: 3 additions & 3 deletions ddtrace/internal/telemetry/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
from typing import List
from typing import Tuple

import ddtrace
from ddtrace.internal.compat import PY3
from ddtrace.internal.constants import DEFAULT_SERVICE_NAME
from ddtrace.internal.packages import get_distributions
from ddtrace.internal.runtime.container import get_container_info
from ddtrace.internal.utils.cache import cached
from ddtrace.version import get_version

from ...settings import _config as config
from ..hostname import get_hostname
Expand Down Expand Up @@ -63,7 +63,7 @@ def _get_application(key):
"env": env or "",
"language_name": "python",
"language_version": _format_version_info(sys.version_info),
"tracer_version": ddtrace.__version__,
"tracer_version": get_version(),
"runtime_name": platform.python_implementation(),
"runtime_version": _format_version_info(sys.implementation.version) if PY3 else "",
"products": _get_products(),
Expand All @@ -88,7 +88,7 @@ def get_application(service, version, env):
def _get_products():
# type: () -> Dict
return {
"appsec": {"version": ddtrace.__version__, "enabled": config._appsec_enabled},
"appsec": {"version": get_version(), "enabled": config._appsec_enabled},
}


Expand Down
21 changes: 7 additions & 14 deletions ddtrace/internal/telemetry/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ def enable(self):

if self._is_periodic:
self.start()
atexit.register(self.app_shutdown)
return True

self.status = ServiceStatus.RUNNING
Expand Down Expand Up @@ -290,14 +289,18 @@ def add_error(self, code, msg, filename, line_number):
msg = "%s:%s: %s" % (filename, line_number, msg)
self._error = (code, msg)

def _app_started_event(self):
# type: () -> None
def _app_started_event(self, register_app_shutdown=True):
# type: (bool) -> None
"""Sent when TelemetryWriter is enabled or forks"""
if self._forked:
# app-started events should only be sent by the main process
return
# List of configurations to be collected

self.started = True
if register_app_shutdown:
atexit.register(self.app_shutdown)

self.add_configurations(
[
(TELEMETRY_TRACING_ENABLED, config._tracing_enabled, "unknown"),
Expand Down Expand Up @@ -593,15 +596,6 @@ def periodic(self, force_flush=False):
for telemetry_event in telemetry_events:
self._client.send_event(telemetry_event)

def start(self, *args, **kwargs):
# type: (...) -> None
super(TelemetryWriter, self).start(*args, **kwargs)
# Queue app-started event after the telemetry worker thread is running
if self.started is False:
self._app_started_event()
self._app_dependencies_loaded_event()
self.started = True

def app_shutdown(self):
self._app_closing_event()
self.periodic(force_flush=True)
Expand Down Expand Up @@ -634,8 +628,7 @@ def _fork_writer(self):

# Enable writer service in child process to avoid interpreter shutdown
# error in Python 3.12
if sys.version_info >= (3, 12):
self.enable()
self.enable()

def _restart_sequence(self):
self._sequence = itertools.count(1)
Expand Down
3 changes: 2 additions & 1 deletion ddtrace/internal/writer/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,8 @@ def _send_payload(self, payload, count, client):
def start(self):
super(AgentWriter, self).start()
try:
telemetry_writer.enable()
telemetry_writer._app_started_event()
telemetry_writer._app_dependencies_loaded_event()

# appsec remote config should be enabled/started after the global tracer and configs
# are initialized
Expand Down
82 changes: 45 additions & 37 deletions tests/telemetry/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@
def test_enable(test_agent_session, run_python_code_in_subprocess):
code = """
from ddtrace.internal.telemetry import telemetry_writer
from ddtrace.internal.service import ServiceStatus
telemetry_writer.enable()
assert telemetry_writer.status == ServiceStatus.RUNNING
assert telemetry_writer._worker is not None
"""

stdout, stderr, status, _ = run_python_code_in_subprocess(code)
Expand All @@ -17,26 +22,10 @@ def test_enable(test_agent_session, run_python_code_in_subprocess):
assert stdout == b"", stderr
assert stderr == b""

events = test_agent_session.get_events()
assert len(events) == 3

# Same runtime id is used
assert events[0]["runtime_id"] == events[1]["runtime_id"]
assert events[0]["request_type"] == "app-closing"
assert events[1]["request_type"] == "app-dependencies-loaded"
assert events[2]["request_type"] == "app-started"
assert events[2]["payload"]["error"] == {"code": 0, "message": ""}


@pytest.mark.snapshot
def test_telemetry_enabled_on_first_tracer_flush(test_agent_session, ddtrace_run_python_code_in_subprocess):
"""assert telemetry events are generated after the first trace is flushed to the agent"""
# Using ddtrace-run and/or importing ddtrace alone should not enable telemetry
# Telemetry data should only be sent after the first trace to the agent
_, stderr, status, _ = ddtrace_run_python_code_in_subprocess("import ddtrace")
assert status == 0, stderr
# No trace and No Telemetry
assert len(test_agent_session.get_events()) == 0

# Submit a trace to the agent in a subprocess
code = 'from ddtrace import tracer; span = tracer.trace("test-telemetry"); span.finish()'
Expand All @@ -58,13 +47,19 @@ def test_telemetry_enabled_on_first_tracer_flush(test_agent_session, ddtrace_run
def test_enable_fork(test_agent_session, run_python_code_in_subprocess):
"""assert app-started/app-closing events are only sent in parent process"""
code = """
import warnings
# This test logs the following warning in py3.12:
# This process (pid=402) is multi-threaded, use of fork() may lead to deadlocks in the child
warnings.filterwarnings("ignore", category=DeprecationWarning)
import os
from ddtrace.internal.runtime import get_runtime_id
from ddtrace.internal.telemetry import telemetry_writer
# We have to start before forking since fork hooks are not enabled until after enabling
telemetry_writer.enable()
telemetry_writer._app_started_event()
if os.fork() == 0:
# Send multiple started events to confirm none get sent
Expand All @@ -78,27 +73,29 @@ def test_enable_fork(test_agent_session, run_python_code_in_subprocess):

stdout, stderr, status, _ = run_python_code_in_subprocess(code)
assert status == 0, stderr
assert stderr == b""
assert stderr == b"", stderr

runtime_id = stdout.strip().decode("utf-8")

requests = test_agent_session.get_requests()

# We expect 2 events from the parent process to get sent, but none from the child process
assert len(requests) == 3
assert len(requests) == 2
# Validate that the runtime id sent for every event is the parent processes runtime id
assert requests[0]["body"]["runtime_id"] == runtime_id
assert requests[0]["body"]["request_type"] == "app-closing"
assert requests[1]["body"]["runtime_id"] == runtime_id
assert requests[1]["body"]["request_type"] == "app-dependencies-loaded"
assert requests[1]["body"]["runtime_id"] == runtime_id
assert requests[2]["body"]["request_type"] == "app-started"
assert requests[2]["body"]["runtime_id"] == runtime_id
assert requests[1]["body"]["request_type"] == "app-started"


def test_enable_fork_heartbeat(test_agent_session, run_python_code_in_subprocess):
"""assert app-heartbeat events are only sent in parent process when no other events are queued"""
code = """
import warnings
# This test logs the following warning in py3.12:
# This process (pid=402) is multi-threaded, use of fork() may lead to deadlocks in the child
warnings.filterwarnings("ignore", category=DeprecationWarning)
import os
from ddtrace.internal.runtime import get_runtime_id
Expand All @@ -120,7 +117,7 @@ def test_enable_fork_heartbeat(test_agent_session, run_python_code_in_subprocess

stdout, stderr, status, _ = run_python_code_in_subprocess(code)
assert status == 0, stderr
assert stderr == b""
assert stderr == b"", stderr

runtime_id = stdout.strip().decode("utf-8")

Expand All @@ -138,6 +135,11 @@ def test_heartbeat_interval_configuration(run_python_code_in_subprocess):
env = os.environ.copy()
env["DD_TELEMETRY_HEARTBEAT_INTERVAL"] = "61"
code = """
import warnings
# This test logs the following warning in py3.12:
# This process (pid=402) is multi-threaded, use of fork() may lead to deadlocks in the child
warnings.filterwarnings("ignore", category=DeprecationWarning)
from ddtrace import config
assert config._telemetry_heartbeat_interval == 61
Expand All @@ -156,6 +158,11 @@ def test_logs_after_fork(run_python_code_in_subprocess):
# Regression test: telemetry writer should not log an error when a process forks
_, err, status, _ = run_python_code_in_subprocess(
"""
import warnings
# This test logs the following warning in py3.12:
# This process (pid=402) is multi-threaded, use of fork() may lead to deadlocks in the child
warnings.filterwarnings("ignore", category=DeprecationWarning)
import ddtrace
import logging
import os
Expand All @@ -167,7 +174,7 @@ def test_logs_after_fork(run_python_code_in_subprocess):
)

assert status == 0, err
assert err == b""
assert err == b"", err


def test_app_started_error_handled_exception(test_agent_session, run_python_code_in_subprocess):
Expand Down Expand Up @@ -250,6 +257,9 @@ def test_handled_integration_error(test_agent_session, run_python_code_in_subpro
from ddtrace import patch, tracer
patch(raise_errors=False, sqlite3=True)
# Create a span to start the telemetry writer
tracer.trace("hi").finish()
"""

_, stderr, status, _ = run_python_code_in_subprocess(code)
Expand All @@ -260,15 +270,11 @@ def test_handled_integration_error(test_agent_session, run_python_code_in_subpro

events = test_agent_session.get_events()

assert len(events) == 5
# Same runtime id is used
assert (
events[0]["runtime_id"]
== events[1]["runtime_id"]
== events[2]["runtime_id"]
== events[3]["runtime_id"]
== events[4]["runtime_id"]
)
assert len(events) > 1
for event in events:
# Same runtime id is used
assert event["runtime_id"] == events[0]["runtime_id"]

integrations_events = [event for event in events if event["request_type"] == "app-integrations-change"]

assert len(integrations_events) == 1
Expand All @@ -277,12 +283,14 @@ def test_handled_integration_error(test_agent_session, run_python_code_in_subpro
== "failed to import ddtrace module 'ddtrace.contrib.sqlite3' when patching on import"
)

metric_events = [event for event in events if event["request_type"] == "generate-metrics"]

metric_events = [
event
for event in events
if event["request_type"] == "generate-metrics"
and event["payload"]["series"][0]["metric"] == "integration_errors"
]
assert len(metric_events) == 1
assert metric_events[0]["payload"]["namespace"] == "tracers"
assert len(metric_events[0]["payload"]["series"]) == 1
assert metric_events[0]["payload"]["series"][0]["metric"] == "integration_errors"
assert metric_events[0]["payload"]["series"][0]["type"] == "count"
assert len(metric_events[0]["payload"]["series"][0]["points"]) == 1
assert metric_events[0]["payload"]["series"][0]["points"][0][1] == 1
Expand Down
11 changes: 2 additions & 9 deletions tests/telemetry/test_telemetry_metrics_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import os
import subprocess
import sys
import time

import pytest

Expand All @@ -28,8 +27,6 @@ def _build_env():
def gunicorn_server(telemetry_metrics_enabled="true", token=None):
cmd = ["ddtrace-run", "gunicorn", "-w", "1", "-b", "0.0.0.0:8000", "tests.telemetry.app:app"]
env = _build_env()
env["DD_TELEMETRY_METRICS_ENABLED"] = telemetry_metrics_enabled
env["DD_TELEMETRY_HEARTBEAT_INTERVAL"] = "1.0"
env["_DD_TRACE_WRITER_ADDITIONAL_HEADERS"] = "X-Datadog-Test-Session-Token:{}".format(token)
env["DD_TRACE_AGENT_URL"] = os.environ.get("DD_TRACE_AGENT_URL", "")
env["DD_TRACE_DEBUG"] = "true"
Expand Down Expand Up @@ -90,19 +87,15 @@ def test_telemetry_metrics_enabled_on_gunicorn_child_process(test_agent_session)
gunicorn_client.get("/count_metric")
response = gunicorn_client.get("/count_metric")
assert response.status_code == 200
# DD_TELEMETRY_HEARTBEAT_INTERVAL is set to 1 second
time.sleep(1)
gunicorn_client.get("/count_metric")
response = gunicorn_client.get("/count_metric")
assert response.status_code == 200

events = test_agent_session.get_events()
metrics = list(filter(lambda event: event["request_type"] == "generate-metrics", events))
assert len(metrics) == 2
assert len(metrics) == 1
assert metrics[0]["payload"]["series"][0]["metric"] == "test_metric"
assert metrics[0]["payload"]["series"][0]["points"][0][1] == 2.0
assert metrics[1]["payload"]["series"][0]["metric"] == "test_metric"
assert metrics[1]["payload"]["series"][0]["points"][0][1] == 3.0
assert metrics[0]["payload"]["series"][0]["points"][0][1] == 5


def test_span_creation_and_finished_metrics_datadog(test_agent_session, ddtrace_run_python_code_in_subprocess):
Expand Down
11 changes: 5 additions & 6 deletions tests/telemetry/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,8 @@ def test_send_failing_request(mock_status, telemetry_writer):
with httpretty.enabled():
httpretty.register_uri(httpretty.POST, telemetry_writer._client.url, status=mock_status)
with mock.patch("ddtrace.internal.telemetry.writer.log") as log:
# sends failing app-closing event
telemetry_writer.app_shutdown()
# sends failing app-heartbeat event
telemetry_writer.periodic()
# asserts unsuccessful status code was logged
log.debug.assert_called_with(
"failed to send telemetry to the Datadog Agent at %s. response: %s",
Expand All @@ -392,13 +392,11 @@ def test_telemetry_graceful_shutdown(telemetry_writer, test_agent_session, mock_
telemetry_writer.app_shutdown()

events = test_agent_session.get_events()
assert len(events) == 3
assert len(events) == 1

# Reverse chronological order
assert events[0]["request_type"] == "app-closing"
assert events[0] == _get_request_body({}, "app-closing", 3)
assert events[1]["request_type"] == "app-dependencies-loaded"
assert events[2]["request_type"] == "app-started"
assert events[0] == _get_request_body({}, "app-closing", 1)


def test_app_heartbeat_event_periodic(mock_time, telemetry_writer, test_agent_session):
Expand All @@ -407,6 +405,7 @@ def test_app_heartbeat_event_periodic(mock_time, telemetry_writer, test_agent_se

# Ensure telemetry writer is initialized to send periodic events
telemetry_writer._is_periodic = True
telemetry_writer.started = True
# Assert default telemetry interval is 10 seconds and the expected periodic threshold and counts are set
assert telemetry_writer.interval == 10
assert telemetry_writer._periodic_threshold == 5
Expand Down

0 comments on commit 51f3bac

Please sign in to comment.