Skip to content

Commit

Permalink
Submit DBM query samples via new aggregator API (#9045)
Browse files Browse the repository at this point in the history
* update python & mysql checks

Follow-up to #9165: Update python & mysql checks to submit DBM events via the new aggregator API.

Also improves the tests to better handle threads.

Motivation: Submit events using the more robust agent go code to with proper batching, buffering, retries, error handling, and tracking of internal statistics.

* bump datadog-checks-base min version"
  • Loading branch information
djova authored Apr 16, 2021
1 parent befaeac commit b58de78
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 39 deletions.
19 changes: 6 additions & 13 deletions mysql/datadog_checks/mysql/statement_samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,7 @@
from datadog_checks.base import is_affirmative
from datadog_checks.base.log import get_check_logger
from datadog_checks.base.utils.db.sql import compute_exec_plan_signature, compute_sql_signature
from datadog_checks.base.utils.db.statement_samples import (
StatementSamplesClient,
StubStatementSamplesClient,
using_stub_datadog_agent,
)
from datadog_checks.base.utils.db.utils import ConstantRateLimiter, resolve_db_host
from datadog_checks.base.utils.db.utils import ConstantRateLimiter, default_json_event_encoding, resolve_db_host
from datadog_checks.base.utils.serialization import json

VALID_EXPLAIN_STATEMENTS = frozenset({'select', 'table', 'delete', 'insert', 'replace', 'update'})
Expand Down Expand Up @@ -203,10 +198,6 @@
)


def _new_statement_samples_client():
return StubStatementSamplesClient() if using_stub_datadog_agent else StatementSamplesClient()


class MySQLStatementSamples(object):
"""
Collects statement samples and execution plans.
Expand Down Expand Up @@ -267,7 +258,6 @@ def __init__(self, check, config, connection_args):
}
self._preferred_explain_strategies = ['PROCEDURE', 'FQ_PROCEDURE', 'STATEMENT']
self._init_caches()
self._statement_samples_client = _new_statement_samples_client()

def _init_caches(self):
self._collection_strategy_cache = TTLCache(
Expand Down Expand Up @@ -634,6 +624,7 @@ def _get_sample_collection_strategy(self):
return strategy

def _collect_statement_samples(self):
self._log.debug("collecting statement samples")
self._rate_limiter.sleep()
events_statements_table, rate_limit = self._get_sample_collection_strategy()
if not events_statements_table:
Expand All @@ -646,8 +637,10 @@ def _collect_statement_samples(self):
rows = self._get_new_events_statements(events_statements_table, self._events_statements_row_limit)
rows = self._filter_valid_statement_rows(rows)
events = self._collect_plans_for_statements(rows)
submitted_count, failed_count = self._statement_samples_client.submit_events(events)
self._check.count("dd.mysql.statement_samples.error", failed_count, tags=self._tags + ["error:submit-events"])
submitted_count = 0
for e in events:
self._check.database_monitoring_query_sample(json.dumps(e, default=default_json_event_encoding))
submitted_count += 1
self._check.histogram("dd.mysql.collect_statement_samples.time", (time.time() - start_time) * 1000, tags=tags)
self._check.count("dd.mysql.collect_statement_samples.events_submitted.count", submitted_count, tags=tags)
self._check.gauge(
Expand Down
2 changes: 1 addition & 1 deletion mysql/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def get_dependencies():
return f.readlines()


CHECKS_BASE_REQ = 'datadog-checks-base>=16.6.0'
CHECKS_BASE_REQ = 'datadog-checks-base>=18.1.0'

setup(
name='datadog-mysql',
Expand Down
34 changes: 24 additions & 10 deletions mysql/tests/test_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import subprocess
import time
from collections import Counter
from concurrent.futures.thread import ThreadPoolExecutor
from contextlib import closing
from os import environ

Expand All @@ -18,6 +19,7 @@
from datadog_checks.base.utils.serialization import json
from datadog_checks.dev.utils import get_metadata_metrics
from datadog_checks.mysql import MySql, statements
from datadog_checks.mysql.statement_samples import MySQLStatementSamples
from datadog_checks.mysql.version_utils import get_version

from . import common, tags, variables
Expand All @@ -30,10 +32,22 @@
def dbm_instance(instance_complex):
instance_complex['deep_database_monitoring'] = True
instance_complex['min_collection_interval'] = 1
instance_complex['statement_samples'] = {'enabled': True, 'run_sync': False, 'collections_per_second': 1}
instance_complex['statement_samples'] = {
'enabled': True,
# set the default for tests to run sychronously to ensure we don't have orphaned threads running around
'run_sync': True,
'collections_per_second': 1,
}
return instance_complex


@pytest.fixture(autouse=True)
def stop_orphaned_threads():
# make sure we shut down any orphaned threads and create a new Executor for each test
MySQLStatementSamples.executor.shutdown(wait=True)
MySQLStatementSamples.executor = ThreadPoolExecutor()


@pytest.mark.integration
@pytest.mark.usefixtures('dd_environment')
def test_minimal_config(aggregator, instance_basic):
Expand Down Expand Up @@ -354,23 +368,21 @@ def test_generate_synthetic_rows():
],
)
def test_statement_samples_collect(
dbm_instance, bob_conn, events_statements_table, explain_strategy, schema, statement, caplog
aggregator, dbm_instance, bob_conn, events_statements_table, explain_strategy, schema, statement, caplog
):
caplog.set_level(logging.INFO, logger="datadog_checks.mysql.collection_utils")
caplog.set_level(logging.DEBUG, logger="datadog_checks")
caplog.set_level(logging.DEBUG, logger="tests.test_mysql")

# try to collect a sample from all supported events_statements tables using all possible strategies
dbm_instance['statement_samples']['events_statements_table'] = events_statements_table
dbm_instance['statement_samples']['run_sync'] = True
mysql_check = MySql(common.CHECK_NAME, {}, instances=[dbm_instance])
if explain_strategy:
mysql_check._statement_samples._preferred_explain_strategies = [explain_strategy]

logger.debug("running first check")
mysql_check.check(dbm_instance)

mysql_check._statement_samples._statement_samples_client._payloads = []
aggregator.reset()
mysql_check._statement_samples._init_caches()

# we deliberately want to keep the connection open for the duration of the test to ensure
Expand All @@ -384,7 +396,8 @@ def test_statement_samples_collect(
cursor.execute(statement)
logger.debug("running second check")
mysql_check.check(dbm_instance)
events = mysql_check._statement_samples._statement_samples_client.get_events()
logger.debug("done second check")
events = aggregator.get_event_platform_events("dbm-samples")
matching = [e for e in events if e['db']['statement'] == statement]
assert len(matching) > 0, "should have collected an event"
with_plans = [e for e in matching if e['db']['plan']['definition'] is not None]
Expand All @@ -411,6 +424,7 @@ def test_statement_samples_collect(
@pytest.mark.integration
@pytest.mark.usefixtures('dd_environment')
def test_statement_samples_rate_limit(aggregator, bob_conn, dbm_instance):
dbm_instance['statement_samples']['run_sync'] = False
dbm_instance['statement_samples']['collections_per_second'] = 0.5
query = "select name as nam from testdb.users where name = 'hello'"
mysql_check = MySql(common.CHECK_NAME, {}, instances=[dbm_instance])
Expand All @@ -419,17 +433,19 @@ def test_statement_samples_rate_limit(aggregator, bob_conn, dbm_instance):
cursor.execute(query)
mysql_check.check(dbm_instance)
time.sleep(1)
events = mysql_check._statement_samples._statement_samples_client.get_events()
events = aggregator.get_event_platform_events("dbm-samples")
matching = [e for e in events if e['db']['statement'] == query]
assert len(matching) == 1, "should have collected exactly one event due to sample rate limit"
metrics = aggregator.metrics("dd.mysql.collect_statement_samples.time")
assert 2 < len(metrics) < 6
mysql_check.cancel()


@pytest.mark.integration
@pytest.mark.usefixtures('dd_environment')
def test_statement_samples_loop_inactive_stop(aggregator, dbm_instance):
# confirm that the collection loop stops on its own after the check has not been run for a while
dbm_instance['statement_samples']['run_sync'] = False
mysql_check = MySql(common.CHECK_NAME, {}, instances=[dbm_instance])
mysql_check.check(dbm_instance)
# make sure there were no unhandled exceptions
Expand All @@ -441,6 +457,7 @@ def test_statement_samples_loop_inactive_stop(aggregator, dbm_instance):
@pytest.mark.usefixtures('dd_environment')
def test_statement_samples_check_cancel(aggregator, dbm_instance):
# confirm that the collection loop stops on its own after the check has not been run for a while
dbm_instance['statement_samples']['run_sync'] = False
mysql_check = MySql(common.CHECK_NAME, {}, instances=[dbm_instance])
mysql_check.check(dbm_instance)
mysql_check.cancel()
Expand All @@ -455,7 +472,6 @@ def test_statement_samples_check_cancel(aggregator, dbm_instance):
@pytest.mark.usefixtures('dd_environment')
def test_statement_samples_max_per_digest(dbm_instance):
# clear out any events from previous test runs
dbm_instance['statement_samples']['run_sync'] = True
dbm_instance['statement_samples']['events_statements_table'] = 'events_statements_history_long'
mysql_check = MySql(common.CHECK_NAME, {}, instances=[dbm_instance])
for _ in range(3):
Expand All @@ -469,7 +485,6 @@ def test_statement_samples_max_per_digest(dbm_instance):
@pytest.mark.integration
@pytest.mark.usefixtures('dd_environment')
def test_statement_samples_invalid_explain_procedure(aggregator, dbm_instance):
dbm_instance['statement_samples']['run_sync'] = True
dbm_instance['statement_samples']['explain_procedure'] = 'hello'
mysql_check = MySql(common.CHECK_NAME, {}, instances=[dbm_instance])
mysql_check.check(dbm_instance)
Expand All @@ -482,7 +497,6 @@ def test_statement_samples_invalid_explain_procedure(aggregator, dbm_instance):
"events_statements_enable_procedure", ["datadog.enable_events_statements_consumers", "invalid_proc"]
)
def test_statement_samples_enable_consumers(dbm_instance, root_conn, events_statements_enable_procedure):
dbm_instance['statement_samples']['run_sync'] = True
dbm_instance['statement_samples']['events_statements_enable_procedure'] = events_statements_enable_procedure
mysql_check = MySql(common.CHECK_NAME, {}, instances=[dbm_instance])

Expand Down
11 changes: 5 additions & 6 deletions postgres/datadog_checks/postgres/statement_samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
from datadog_checks.base import is_affirmative
from datadog_checks.base.log import get_check_logger
from datadog_checks.base.utils.db.sql import compute_exec_plan_signature, compute_sql_signature
from datadog_checks.base.utils.db.statement_samples import statement_samples_client
from datadog_checks.base.utils.db.utils import ConstantRateLimiter, resolve_db_host
from datadog_checks.base.utils.db.utils import ConstantRateLimiter, default_json_event_encoding, resolve_db_host
from datadog_checks.base.utils.serialization import json
from datadog_checks.base.utils.time import get_timestamp

Expand Down Expand Up @@ -224,15 +223,15 @@ def _collect_statement_samples(self):
rows = self._get_new_pg_stat_activity()
rows = self._filter_valid_statement_rows(rows)
events = self._explain_pg_stat_activity(rows)
submitted_count, failed_count = statement_samples_client.submit_events(events)
submitted_count = 0
for e in events:
self._check.database_monitoring_query_sample(json.dumps(e, default=default_json_event_encoding))
submitted_count += 1
elapsed_ms = (time.time() - start_time) * 1000
self._check.histogram("dd.postgres.collect_statement_samples.time", elapsed_ms, tags=self._tags)
self._check.count(
"dd.postgres.collect_statement_samples.events_submitted.count", submitted_count, tags=self._tags
)
self._check.count(
"dd.postgres.statement_samples.error", failed_count, tags=self._tags + ["error:submit-events"]
)
self._check.gauge(
"dd.postgres.collect_statement_samples.seen_samples_cache.len",
len(self._seen_samples_cache),
Expand Down
2 changes: 1 addition & 1 deletion postgres/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def get_dependencies():
return f.readlines()


CHECKS_BASE_REQ = 'datadog-checks-base>=16.6.0'
CHECKS_BASE_REQ = 'datadog-checks-base>=18.1.0'

setup(
name='datadog-postgres',
Expand Down
22 changes: 14 additions & 8 deletions postgres/tests/test_pg_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
# Licensed under Simplified BSD License (see LICENSE)
import socket
import time
from concurrent.futures.thread import ThreadPoolExecutor

import mock
import psycopg2
import pytest
from semver import VersionInfo

from datadog_checks.base.utils.db.statement_samples import statement_samples_client
from datadog_checks.base.utils.serialization import json
from datadog_checks.postgres import PostgreSql
from datadog_checks.postgres.statement_samples import PostgresStatementSamples
from datadog_checks.postgres.util import PartialFormatter, fmt

from .common import DB_NAME, HOST, PORT, POSTGRES_VERSION, check_bgw_metrics, check_common_metrics
Expand Down Expand Up @@ -45,6 +46,13 @@
pytestmark = [pytest.mark.integration, pytest.mark.usefixtures('dd_environment')]


@pytest.fixture(autouse=True)
def stop_orphaned_threads():
# make sure we shut down any orphaned threads and create a new Executor for each test
PostgresStatementSamples.executor.shutdown(wait=True)
PostgresStatementSamples.executor = ThreadPoolExecutor()


def test_common_metrics(aggregator, integration_check, pg_instance):
check = integration_check(pg_instance)
check.check(pg_instance)
Expand Down Expand Up @@ -285,12 +293,10 @@ def dbm_instance(pg_instance):


@pytest.mark.parametrize("pg_stat_activity_view", ["pg_stat_activity", "datadog.pg_stat_activity()"])
def test_statement_samples_collect(integration_check, dbm_instance, bob_conn, pg_stat_activity_view):
def test_statement_samples_collect(aggregator, integration_check, dbm_instance, bob_conn, pg_stat_activity_view):
dbm_instance['pg_stat_activity_view'] = pg_stat_activity_view
check = integration_check(dbm_instance)
check._connect()
# clear out any samples kept from previous runs
statement_samples_client._payloads = []
query = "SELECT city FROM persons WHERE city = %s"
# we are able to see the full query (including the raw parameters) in pg_stat_activity because psycopg2 uses
# the simple query protocol, sending the whole query as a plain string to postgres.
Expand All @@ -301,7 +307,9 @@ def test_statement_samples_collect(integration_check, dbm_instance, bob_conn, pg
cursor = bob_conn.cursor()
cursor.execute(query, ("hello",))
check.check(dbm_instance)
matching = [e for e in statement_samples_client.get_events() if e['db']['statement'] == expected_query]
matching = [
e for e in aggregator.get_event_platform_events("dbm-samples") if e['db']['statement'] == expected_query
]
if POSTGRES_VERSION.split('.')[0] == "9" and pg_stat_activity_view == "pg_stat_activity":
# pg_monitor role exists only in version 10+
assert len(matching) == 0, "did not expect to catch any events"
Expand All @@ -321,8 +329,6 @@ def test_statement_samples_rate_limits(aggregator, integration_check, dbm_instan
dbm_instance['statement_samples']['collections_per_second'] = 0.5
check = integration_check(dbm_instance)
check._connect()
# clear out any samples kept from previous runs
statement_samples_client._payloads = []
query = "SELECT city FROM persons WHERE city = 'hello'"
# leave bob's connection open until after the check has run to ensure we're able to see the query in
# pg_stat_activity
Expand All @@ -333,7 +339,7 @@ def test_statement_samples_rate_limits(aggregator, integration_check, dbm_instan
time.sleep(1)
cursor.close()

matching = [e for e in statement_samples_client.get_events() if e['db']['statement'] == query]
matching = [e for e in aggregator.get_event_platform_events("dbm-samples") if e['db']['statement'] == query]
assert len(matching) == 1, "should have collected exactly one event due to sample rate limit"

metrics = aggregator.metrics("dd.postgres.collect_statement_samples.time")
Expand Down

0 comments on commit b58de78

Please sign in to comment.