From ad0ed59a6b8418a8970c4195870d175a9d831b77 Mon Sep 17 00:00:00 2001 From: Martin Imre Date: Wed, 13 Sep 2023 12:45:43 +0200 Subject: [PATCH] feat(integrations): Add integration for clickhouse-driver (#2167) Adds an integration that automatically facilitates tracing/recording of all queries, their parameters, data, and results. --- .../test-integration-clickhouse_driver.yml | 85 ++ .../ci-yaml-test-snippet.txt | 1 + .../split-tox-gh-actions.py | 13 + sentry_sdk/integrations/clickhouse_driver.py | 150 +++ setup.py | 1 + .../clickhouse_driver/__init__.py | 3 + .../test_clickhouse_driver.py | 867 ++++++++++++++++++ tox.ini | 9 + 8 files changed, 1129 insertions(+) create mode 100644 .github/workflows/test-integration-clickhouse_driver.yml create mode 100644 sentry_sdk/integrations/clickhouse_driver.py create mode 100644 tests/integrations/clickhouse_driver/__init__.py create mode 100644 tests/integrations/clickhouse_driver/test_clickhouse_driver.py diff --git a/.github/workflows/test-integration-clickhouse_driver.yml b/.github/workflows/test-integration-clickhouse_driver.yml new file mode 100644 index 0000000000..49b26e1803 --- /dev/null +++ b/.github/workflows/test-integration-clickhouse_driver.yml @@ -0,0 +1,85 @@ +name: Test clickhouse_driver + +on: + push: + branches: + - master + - release/** + + pull_request: + +# Cancel in progress workflows on pull_requests. +# https://docs.github.com/en/actions/using-jobs/using-concurrency#example-using-a-fallback-value +concurrency: + group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} + cancel-in-progress: true + +permissions: + contents: read + +env: + BUILD_CACHE_KEY: ${{ github.sha }} + CACHED_BUILD_PATHS: | + ${{ github.workspace }}/dist-serverless + +jobs: + test: + name: clickhouse_driver, python ${{ matrix.python-version }}, ${{ matrix.os }} + runs-on: ${{ matrix.os }} + timeout-minutes: 30 + + strategy: + fail-fast: false + matrix: + python-version: ["3.8","3.9","3.10","3.11"] + # python3.6 reached EOL and is no longer being supported on + # new versions of hosted runners on Github Actions + # ubuntu-20.04 is the last version that supported python3.6 + # see https://github.com/actions/setup-python/issues/544#issuecomment-1332535877 + os: [ubuntu-20.04] + + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + + - uses: getsentry/action-clickhouse-in-ci@v1 + + - name: Setup Test Env + run: | + pip install coverage "tox>=3,<4" + + - name: Test clickhouse_driver + uses: nick-fields/retry@v2 + with: + timeout_minutes: 15 + max_attempts: 2 + retry_wait_seconds: 5 + shell: bash + command: | + set -x # print commands that are executed + coverage erase + + # Run tests + ./scripts/runtox.sh "py${{ matrix.python-version }}-clickhouse_driver" --cov=tests --cov=sentry_sdk --cov-report= --cov-branch && + coverage combine .coverage* && + coverage xml -i + + - uses: codecov/codecov-action@v3 + with: + token: ${{ secrets.CODECOV_TOKEN }} + files: coverage.xml + + + check_required_tests: + name: All clickhouse_driver tests passed or skipped + needs: test + # Always run this, even if a dependent job failed + if: always() + runs-on: ubuntu-20.04 + steps: + - name: Check for failures + if: contains(needs.test.result, 'failure') + run: | + echo "One of the dependent jobs has failed. You may need to re-run it." && exit 1 diff --git a/scripts/split-tox-gh-actions/ci-yaml-test-snippet.txt b/scripts/split-tox-gh-actions/ci-yaml-test-snippet.txt index 8a60a70167..c2d10596ea 100644 --- a/scripts/split-tox-gh-actions/ci-yaml-test-snippet.txt +++ b/scripts/split-tox-gh-actions/ci-yaml-test-snippet.txt @@ -10,6 +10,7 @@ - uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} +{{ additional_uses }} - name: Setup Test Env run: | diff --git a/scripts/split-tox-gh-actions/split-tox-gh-actions.py b/scripts/split-tox-gh-actions/split-tox-gh-actions.py index 3b40178082..15f85391ed 100755 --- a/scripts/split-tox-gh-actions/split-tox-gh-actions.py +++ b/scripts/split-tox-gh-actions/split-tox-gh-actions.py @@ -36,6 +36,10 @@ "asyncpg", ] +FRAMEWORKS_NEEDING_CLICKHOUSE = [ + "clickhouse_driver", +] + MATRIX_DEFINITION = """ strategy: fail-fast: false @@ -48,6 +52,11 @@ os: [ubuntu-20.04] """ +ADDITIONAL_USES_CLICKHOUSE = """\ + + - uses: getsentry/action-clickhouse-in-ci@v1 +""" + CHECK_NEEDS = """\ needs: test """ @@ -119,6 +128,10 @@ def write_yaml_file( f = open(TEMPLATE_FILE_SETUP_DB, "r") out += "".join(f.readlines()) + elif template_line.strip() == "{{ additional_uses }}": + if current_framework in FRAMEWORKS_NEEDING_CLICKHOUSE: + out += ADDITIONAL_USES_CLICKHOUSE + elif template_line.strip() == "{{ check_needs }}": if py27_supported: out += CHECK_NEEDS_PY27 diff --git a/sentry_sdk/integrations/clickhouse_driver.py b/sentry_sdk/integrations/clickhouse_driver.py new file mode 100644 index 0000000000..8a436022be --- /dev/null +++ b/sentry_sdk/integrations/clickhouse_driver.py @@ -0,0 +1,150 @@ +from sentry_sdk import Hub +from sentry_sdk.consts import OP, SPANDATA +from sentry_sdk.hub import _should_send_default_pii +from sentry_sdk.integrations import Integration, DidNotEnable +from sentry_sdk.tracing import Span +from sentry_sdk._types import TYPE_CHECKING +from sentry_sdk.utils import capture_internal_exceptions + +from typing import TypeVar + +# Hack to get new Python features working in older versions +# without introducing a hard dependency on `typing_extensions` +# from: https://stackoverflow.com/a/71944042/300572 +if TYPE_CHECKING: + from typing import ParamSpec, Callable +else: + # Fake ParamSpec + class ParamSpec: + def __init__(self, _): + self.args = None + self.kwargs = None + + # Callable[anything] will return None + class _Callable: + def __getitem__(self, _): + return None + + # Make instances + Callable = _Callable() + + +try: + import clickhouse_driver # type: ignore[import] + +except ImportError: + raise DidNotEnable("clickhouse-driver not installed.") + +if clickhouse_driver.VERSION < (0, 2, 0): + raise DidNotEnable("clickhouse-driver >= 0.2.0 required") + + +class ClickhouseDriverIntegration(Integration): + identifier = "clickhouse_driver" + + @staticmethod + def setup_once() -> None: + # Every query is done using the Connection's `send_query` function + clickhouse_driver.connection.Connection.send_query = _wrap_start( + clickhouse_driver.connection.Connection.send_query + ) + + # If the query contains parameters then the send_data function is used to send those parameters to clickhouse + clickhouse_driver.client.Client.send_data = _wrap_send_data( + clickhouse_driver.client.Client.send_data + ) + + # Every query ends either with the Client's `receive_end_of_query` (no result expected) + # or its `receive_result` (result expected) + clickhouse_driver.client.Client.receive_end_of_query = _wrap_end( + clickhouse_driver.client.Client.receive_end_of_query + ) + clickhouse_driver.client.Client.receive_result = _wrap_end( + clickhouse_driver.client.Client.receive_result + ) + + +P = ParamSpec("P") +T = TypeVar("T") + + +def _wrap_start(f: Callable[P, T]) -> Callable[P, T]: + def _inner(*args: P.args, **kwargs: P.kwargs) -> T: + hub = Hub.current + if hub.get_integration(ClickhouseDriverIntegration) is None: + return f(*args, **kwargs) + connection = args[0] + query = args[1] + query_id = args[2] if len(args) > 2 else kwargs.get("query_id") + params = args[3] if len(args) > 3 else kwargs.get("params") + + span = hub.start_span(op=OP.DB, description=query) + + connection._sentry_span = span # type: ignore[attr-defined] + + _set_db_data(span, connection) + + span.set_data("query", query) + + if query_id: + span.set_data("db.query_id", query_id) + + if params and _should_send_default_pii(): + span.set_data("db.params", params) + + # run the original code + ret = f(*args, **kwargs) + + return ret + + return _inner + + +def _wrap_end(f: Callable[P, T]) -> Callable[P, T]: + def _inner_end(*args: P.args, **kwargs: P.kwargs) -> T: + res = f(*args, **kwargs) + instance = args[0] + span = instance.connection._sentry_span # type: ignore[attr-defined] + + if span is not None: + if res is not None and _should_send_default_pii(): + span.set_data("db.result", res) + + with capture_internal_exceptions(): + span.hub.add_breadcrumb( + message=span._data.pop("query"), category="query", data=span._data + ) + + span.finish() + + return res + + return _inner_end + + +def _wrap_send_data(f: Callable[P, T]) -> Callable[P, T]: + def _inner_send_data(*args: P.args, **kwargs: P.kwargs) -> T: + instance = args[0] # type: clickhouse_driver.client.Client + data = args[2] + span = instance.connection._sentry_span + + _set_db_data(span, instance.connection) + + if _should_send_default_pii(): + db_params = span._data.get("db.params", []) + db_params.extend(data) + span.set_data("db.params", db_params) + + return f(*args, **kwargs) + + return _inner_send_data + + +def _set_db_data( + span: Span, connection: clickhouse_driver.connection.Connection +) -> None: + span.set_data(SPANDATA.DB_SYSTEM, "clickhouse") + span.set_data(SPANDATA.SERVER_ADDRESS, connection.host) + span.set_data(SPANDATA.SERVER_PORT, connection.port) + span.set_data(SPANDATA.DB_NAME, connection.database) + span.set_data(SPANDATA.DB_USER, connection.user) diff --git a/setup.py b/setup.py index f7ed4f4026..a70ebfc12d 100644 --- a/setup.py +++ b/setup.py @@ -51,6 +51,7 @@ def get_file_text(file_name): "bottle": ["bottle>=0.12.13"], "celery": ["celery>=3"], "chalice": ["chalice>=1.16.0"], + "clickhouse-driver": ["clickhouse-driver>=0.2.0"], "django": ["django>=1.8"], "falcon": ["falcon>=1.4"], "fastapi": ["fastapi>=0.79.0"], diff --git a/tests/integrations/clickhouse_driver/__init__.py b/tests/integrations/clickhouse_driver/__init__.py new file mode 100644 index 0000000000..602c4e553c --- /dev/null +++ b/tests/integrations/clickhouse_driver/__init__.py @@ -0,0 +1,3 @@ +import pytest + +pytest.importorskip("clickhouse_driver") diff --git a/tests/integrations/clickhouse_driver/test_clickhouse_driver.py b/tests/integrations/clickhouse_driver/test_clickhouse_driver.py new file mode 100644 index 0000000000..6b0fa566d4 --- /dev/null +++ b/tests/integrations/clickhouse_driver/test_clickhouse_driver.py @@ -0,0 +1,867 @@ +""" +Tests need a local clickhouse instance running, this can best be done using +```sh +docker run -d -p 18123:8123 -p9000:9000 --name clickhouse-test --ulimit nofile=262144:262144 --rm clickhouse/clickhouse-server +``` +""" +import clickhouse_driver +from clickhouse_driver import Client, connect + +from sentry_sdk import start_transaction, capture_message +from sentry_sdk.integrations.clickhouse_driver import ClickhouseDriverIntegration + +EXPECT_PARAMS_IN_SELECT = True +if clickhouse_driver.VERSION < (0, 2, 6): + EXPECT_PARAMS_IN_SELECT = False + + +def test_clickhouse_client_breadcrumbs(sentry_init, capture_events) -> None: + sentry_init( + integrations=[ClickhouseDriverIntegration()], + _experiments={"record_sql_params": True}, + ) + events = capture_events() + + client = Client("localhost") + client.execute("DROP TABLE IF EXISTS test") + client.execute("CREATE TABLE test (x Int32) ENGINE = Memory") + client.execute("INSERT INTO test (x) VALUES", [{"x": 100}]) + client.execute("INSERT INTO test (x) VALUES", [[170], [200]]) + + res = client.execute("SELECT sum(x) FROM test WHERE x > %(minv)i", {"minv": 150}) + assert res[0][0] == 370 + + capture_message("hi") + + (event,) = events + + expected_breadcrumbs = [ + { + "category": "query", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + }, + "message": "DROP TABLE IF EXISTS test", + "type": "default", + }, + { + "category": "query", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + }, + "message": "CREATE TABLE test (x Int32) ENGINE = Memory", + "type": "default", + }, + { + "category": "query", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + }, + "message": "INSERT INTO test (x) VALUES", + "type": "default", + }, + { + "category": "query", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + }, + "message": "INSERT INTO test (x) VALUES", + "type": "default", + }, + { + "category": "query", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + }, + "message": "SELECT sum(x) FROM test WHERE x > 150", + "type": "default", + }, + ] + + if not EXPECT_PARAMS_IN_SELECT: + expected_breadcrumbs[-1]["data"].pop("db.params", None) + + for crumb in event["breadcrumbs"]["values"]: + crumb.pop("timestamp", None) + + assert event["breadcrumbs"]["values"] == expected_breadcrumbs + + +def test_clickhouse_client_breadcrumbs_with_pii(sentry_init, capture_events) -> None: + sentry_init( + integrations=[ClickhouseDriverIntegration()], + send_default_pii=True, + _experiments={"record_sql_params": True}, + ) + events = capture_events() + + client = Client("localhost") + client.execute("DROP TABLE IF EXISTS test") + client.execute("CREATE TABLE test (x Int32) ENGINE = Memory") + client.execute("INSERT INTO test (x) VALUES", [{"x": 100}]) + client.execute("INSERT INTO test (x) VALUES", [[170], [200]]) + + res = client.execute("SELECT sum(x) FROM test WHERE x > %(minv)i", {"minv": 150}) + assert res[0][0] == 370 + + capture_message("hi") + + (event,) = events + + expected_breadcrumbs = [ + { + "category": "query", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + "db.result": [], + }, + "message": "DROP TABLE IF EXISTS test", + "type": "default", + }, + { + "category": "query", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + "db.result": [], + }, + "message": "CREATE TABLE test (x Int32) ENGINE = Memory", + "type": "default", + }, + { + "category": "query", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + "db.params": [{"x": 100}], + }, + "message": "INSERT INTO test (x) VALUES", + "type": "default", + }, + { + "category": "query", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + "db.params": [[170], [200]], + }, + "message": "INSERT INTO test (x) VALUES", + "type": "default", + }, + { + "category": "query", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + "db.result": [[370]], + "db.params": {"minv": 150}, + }, + "message": "SELECT sum(x) FROM test WHERE x > 150", + "type": "default", + }, + ] + + if not EXPECT_PARAMS_IN_SELECT: + expected_breadcrumbs[-1]["data"].pop("db.params", None) + + for crumb in event["breadcrumbs"]["values"]: + crumb.pop("timestamp", None) + + assert event["breadcrumbs"]["values"] == expected_breadcrumbs + + +def test_clickhouse_client_spans( + sentry_init, capture_events, capture_envelopes +) -> None: + sentry_init( + integrations=[ClickhouseDriverIntegration()], + _experiments={"record_sql_params": True}, + traces_sample_rate=1.0, + ) + events = capture_events() + + transaction_trace_id = None + transaction_span_id = None + + with start_transaction(name="test_clickhouse_transaction") as transaction: + transaction_trace_id = transaction.trace_id + transaction_span_id = transaction.span_id + + client = Client("localhost") + client.execute("DROP TABLE IF EXISTS test") + client.execute("CREATE TABLE test (x Int32) ENGINE = Memory") + client.execute("INSERT INTO test (x) VALUES", [{"x": 100}]) + client.execute("INSERT INTO test (x) VALUES", [[170], [200]]) + + res = client.execute( + "SELECT sum(x) FROM test WHERE x > %(minv)i", {"minv": 150} + ) + assert res[0][0] == 370 + + (event,) = events + + expected_spans = [ + { + "op": "db", + "description": "DROP TABLE IF EXISTS test", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + }, + "same_process_as_parent": True, + "trace_id": transaction_trace_id, + "parent_span_id": transaction_span_id, + }, + { + "op": "db", + "description": "CREATE TABLE test (x Int32) ENGINE = Memory", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + }, + "same_process_as_parent": True, + "trace_id": transaction_trace_id, + "parent_span_id": transaction_span_id, + }, + { + "op": "db", + "description": "INSERT INTO test (x) VALUES", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + }, + "same_process_as_parent": True, + "trace_id": transaction_trace_id, + "parent_span_id": transaction_span_id, + }, + { + "op": "db", + "description": "INSERT INTO test (x) VALUES", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + }, + "same_process_as_parent": True, + "trace_id": transaction_trace_id, + "parent_span_id": transaction_span_id, + }, + { + "op": "db", + "description": "SELECT sum(x) FROM test WHERE x > 150", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + }, + "same_process_as_parent": True, + "trace_id": transaction_trace_id, + "parent_span_id": transaction_span_id, + }, + ] + + if not EXPECT_PARAMS_IN_SELECT: + expected_spans[-1]["data"].pop("db.params", None) + + for span in event["spans"]: + span.pop("span_id", None) + span.pop("start_timestamp", None) + span.pop("timestamp", None) + + assert event["spans"] == expected_spans + + +def test_clickhouse_client_spans_with_pii( + sentry_init, capture_events, capture_envelopes +) -> None: + sentry_init( + integrations=[ClickhouseDriverIntegration()], + _experiments={"record_sql_params": True}, + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + transaction_trace_id = None + transaction_span_id = None + + with start_transaction(name="test_clickhouse_transaction") as transaction: + transaction_trace_id = transaction.trace_id + transaction_span_id = transaction.span_id + + client = Client("localhost") + client.execute("DROP TABLE IF EXISTS test") + client.execute("CREATE TABLE test (x Int32) ENGINE = Memory") + client.execute("INSERT INTO test (x) VALUES", [{"x": 100}]) + client.execute("INSERT INTO test (x) VALUES", [[170], [200]]) + + res = client.execute( + "SELECT sum(x) FROM test WHERE x > %(minv)i", {"minv": 150} + ) + assert res[0][0] == 370 + + (event,) = events + + expected_spans = [ + { + "op": "db", + "description": "DROP TABLE IF EXISTS test", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + "db.result": [], + }, + "same_process_as_parent": True, + "trace_id": transaction_trace_id, + "parent_span_id": transaction_span_id, + }, + { + "op": "db", + "description": "CREATE TABLE test (x Int32) ENGINE = Memory", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + "db.result": [], + }, + "same_process_as_parent": True, + "trace_id": transaction_trace_id, + "parent_span_id": transaction_span_id, + }, + { + "op": "db", + "description": "INSERT INTO test (x) VALUES", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + "db.params": [{"x": 100}], + }, + "same_process_as_parent": True, + "trace_id": transaction_trace_id, + "parent_span_id": transaction_span_id, + }, + { + "op": "db", + "description": "INSERT INTO test (x) VALUES", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + "db.params": [[170], [200]], + }, + "same_process_as_parent": True, + "trace_id": transaction_trace_id, + "parent_span_id": transaction_span_id, + }, + { + "op": "db", + "description": "SELECT sum(x) FROM test WHERE x > 150", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + "db.params": {"minv": 150}, + "db.result": [[370]], + }, + "same_process_as_parent": True, + "trace_id": transaction_trace_id, + "parent_span_id": transaction_span_id, + }, + ] + + if not EXPECT_PARAMS_IN_SELECT: + expected_spans[-1]["data"].pop("db.params", None) + + for span in event["spans"]: + span.pop("span_id", None) + span.pop("start_timestamp", None) + span.pop("timestamp", None) + + assert event["spans"] == expected_spans + + +def test_clickhouse_dbapi_breadcrumbs(sentry_init, capture_events) -> None: + sentry_init( + integrations=[ClickhouseDriverIntegration()], + ) + events = capture_events() + + conn = connect("clickhouse://localhost") + cursor = conn.cursor() + cursor.execute("DROP TABLE IF EXISTS test") + cursor.execute("CREATE TABLE test (x Int32) ENGINE = Memory") + cursor.executemany("INSERT INTO test (x) VALUES", [{"x": 100}]) + cursor.executemany("INSERT INTO test (x) VALUES", [[170], [200]]) + cursor.execute("SELECT sum(x) FROM test WHERE x > %(minv)i", {"minv": 150}) + res = cursor.fetchall() + + assert res[0][0] == 370 + + capture_message("hi") + + (event,) = events + + expected_breadcrumbs = [ + { + "category": "query", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + }, + "message": "DROP TABLE IF EXISTS test", + "type": "default", + }, + { + "category": "query", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + }, + "message": "CREATE TABLE test (x Int32) ENGINE = Memory", + "type": "default", + }, + { + "category": "query", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + }, + "message": "INSERT INTO test (x) VALUES", + "type": "default", + }, + { + "category": "query", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + }, + "message": "INSERT INTO test (x) VALUES", + "type": "default", + }, + { + "category": "query", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + }, + "message": "SELECT sum(x) FROM test WHERE x > 150", + "type": "default", + }, + ] + + if not EXPECT_PARAMS_IN_SELECT: + expected_breadcrumbs[-1]["data"].pop("db.params", None) + + for crumb in event["breadcrumbs"]["values"]: + crumb.pop("timestamp", None) + + assert event["breadcrumbs"]["values"] == expected_breadcrumbs + + +def test_clickhouse_dbapi_breadcrumbs_with_pii(sentry_init, capture_events) -> None: + sentry_init( + integrations=[ClickhouseDriverIntegration()], + send_default_pii=True, + ) + events = capture_events() + + conn = connect("clickhouse://localhost") + cursor = conn.cursor() + cursor.execute("DROP TABLE IF EXISTS test") + cursor.execute("CREATE TABLE test (x Int32) ENGINE = Memory") + cursor.executemany("INSERT INTO test (x) VALUES", [{"x": 100}]) + cursor.executemany("INSERT INTO test (x) VALUES", [[170], [200]]) + cursor.execute("SELECT sum(x) FROM test WHERE x > %(minv)i", {"minv": 150}) + res = cursor.fetchall() + + assert res[0][0] == 370 + + capture_message("hi") + + (event,) = events + + expected_breadcrumbs = [ + { + "category": "query", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + "db.result": [[], []], + }, + "message": "DROP TABLE IF EXISTS test", + "type": "default", + }, + { + "category": "query", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + "db.result": [[], []], + }, + "message": "CREATE TABLE test (x Int32) ENGINE = Memory", + "type": "default", + }, + { + "category": "query", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + "db.params": [{"x": 100}], + }, + "message": "INSERT INTO test (x) VALUES", + "type": "default", + }, + { + "category": "query", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + "db.params": [[170], [200]], + }, + "message": "INSERT INTO test (x) VALUES", + "type": "default", + }, + { + "category": "query", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + "db.params": {"minv": 150}, + "db.result": [[["370"]], [["'sum(x)'", "'Int64'"]]], + }, + "message": "SELECT sum(x) FROM test WHERE x > 150", + "type": "default", + }, + ] + + if not EXPECT_PARAMS_IN_SELECT: + expected_breadcrumbs[-1]["data"].pop("db.params", None) + + for crumb in event["breadcrumbs"]["values"]: + crumb.pop("timestamp", None) + + assert event["breadcrumbs"]["values"] == expected_breadcrumbs + + +def test_clickhouse_dbapi_spans(sentry_init, capture_events, capture_envelopes) -> None: + sentry_init( + integrations=[ClickhouseDriverIntegration()], + _experiments={"record_sql_params": True}, + traces_sample_rate=1.0, + ) + events = capture_events() + + transaction_trace_id = None + transaction_span_id = None + + with start_transaction(name="test_clickhouse_transaction") as transaction: + transaction_trace_id = transaction.trace_id + transaction_span_id = transaction.span_id + + conn = connect("clickhouse://localhost") + cursor = conn.cursor() + cursor.execute("DROP TABLE IF EXISTS test") + cursor.execute("CREATE TABLE test (x Int32) ENGINE = Memory") + cursor.executemany("INSERT INTO test (x) VALUES", [{"x": 100}]) + cursor.executemany("INSERT INTO test (x) VALUES", [[170], [200]]) + cursor.execute("SELECT sum(x) FROM test WHERE x > %(minv)i", {"minv": 150}) + res = cursor.fetchall() + + assert res[0][0] == 370 + + (event,) = events + + expected_spans = [ + { + "op": "db", + "description": "DROP TABLE IF EXISTS test", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + }, + "same_process_as_parent": True, + "trace_id": transaction_trace_id, + "parent_span_id": transaction_span_id, + }, + { + "op": "db", + "description": "CREATE TABLE test (x Int32) ENGINE = Memory", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + }, + "same_process_as_parent": True, + "trace_id": transaction_trace_id, + "parent_span_id": transaction_span_id, + }, + { + "op": "db", + "description": "INSERT INTO test (x) VALUES", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + }, + "same_process_as_parent": True, + "trace_id": transaction_trace_id, + "parent_span_id": transaction_span_id, + }, + { + "op": "db", + "description": "INSERT INTO test (x) VALUES", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + }, + "same_process_as_parent": True, + "trace_id": transaction_trace_id, + "parent_span_id": transaction_span_id, + }, + { + "op": "db", + "description": "SELECT sum(x) FROM test WHERE x > 150", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + }, + "same_process_as_parent": True, + "trace_id": transaction_trace_id, + "parent_span_id": transaction_span_id, + }, + ] + + if not EXPECT_PARAMS_IN_SELECT: + expected_spans[-1]["data"].pop("db.params", None) + + for span in event["spans"]: + span.pop("span_id", None) + span.pop("start_timestamp", None) + span.pop("timestamp", None) + + assert event["spans"] == expected_spans + + +def test_clickhouse_dbapi_spans_with_pii( + sentry_init, capture_events, capture_envelopes +) -> None: + sentry_init( + integrations=[ClickhouseDriverIntegration()], + _experiments={"record_sql_params": True}, + traces_sample_rate=1.0, + send_default_pii=True, + ) + events = capture_events() + + transaction_trace_id = None + transaction_span_id = None + + with start_transaction(name="test_clickhouse_transaction") as transaction: + transaction_trace_id = transaction.trace_id + transaction_span_id = transaction.span_id + + conn = connect("clickhouse://localhost") + cursor = conn.cursor() + cursor.execute("DROP TABLE IF EXISTS test") + cursor.execute("CREATE TABLE test (x Int32) ENGINE = Memory") + cursor.executemany("INSERT INTO test (x) VALUES", [{"x": 100}]) + cursor.executemany("INSERT INTO test (x) VALUES", [[170], [200]]) + cursor.execute("SELECT sum(x) FROM test WHERE x > %(minv)i", {"minv": 150}) + res = cursor.fetchall() + + assert res[0][0] == 370 + + (event,) = events + + expected_spans = [ + { + "op": "db", + "description": "DROP TABLE IF EXISTS test", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + "db.result": [[], []], + }, + "same_process_as_parent": True, + "trace_id": transaction_trace_id, + "parent_span_id": transaction_span_id, + }, + { + "op": "db", + "description": "CREATE TABLE test (x Int32) ENGINE = Memory", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + "db.result": [[], []], + }, + "same_process_as_parent": True, + "trace_id": transaction_trace_id, + "parent_span_id": transaction_span_id, + }, + { + "op": "db", + "description": "INSERT INTO test (x) VALUES", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + "db.params": [{"x": 100}], + }, + "same_process_as_parent": True, + "trace_id": transaction_trace_id, + "parent_span_id": transaction_span_id, + }, + { + "op": "db", + "description": "INSERT INTO test (x) VALUES", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + "db.params": [[170], [200]], + }, + "same_process_as_parent": True, + "trace_id": transaction_trace_id, + "parent_span_id": transaction_span_id, + }, + { + "op": "db", + "description": "SELECT sum(x) FROM test WHERE x > 150", + "data": { + "db.system": "clickhouse", + "db.name": "", + "db.user": "default", + "server.address": "localhost", + "server.port": 9000, + "db.params": {"minv": 150}, + "db.result": [[[370]], [["sum(x)", "Int64"]]], + }, + "same_process_as_parent": True, + "trace_id": transaction_trace_id, + "parent_span_id": transaction_span_id, + }, + ] + + if not EXPECT_PARAMS_IN_SELECT: + expected_spans[-1]["data"].pop("db.params", None) + + for span in event["spans"]: + span.pop("span_id", None) + span.pop("start_timestamp", None) + span.pop("timestamp", None) + + assert event["spans"] == expected_spans diff --git a/tox.ini b/tox.ini index fd9a0ca5a4..9e1c7a664f 100644 --- a/tox.ini +++ b/tox.ini @@ -55,6 +55,9 @@ envlist = # Chalice {py3.6,py3.7,py3.8}-chalice-v{1.18,1.20,1.22,1.24} + # Clickhouse Driver + {py3.8,py3.9,py3.10,py3.11}-clickhouse_driver-v{0.2.4,0.2.5,0.2.6} + # Cloud Resource Context {py3.6,py3.7,py3.8,py3.9,py3.10,py3.11}-cloud_resource_context @@ -248,6 +251,11 @@ deps = {py3.7}-chalice: botocore~=1.31 {py3.8}-chalice: botocore~=1.31 + # Clickhouse Driver + clickhouse_driver-v0.2.4: clickhouse_driver>=0.2.4,<0.2.5 + clickhouse_driver-v0.2.5: clickhouse_driver>=0.2.5,<0.2.6 + clickhouse_driver-v0.2.6: clickhouse_driver>=0.2.6,<0.2.7 + # Django django: psycopg2-binary django: Werkzeug<2.1.0 @@ -474,6 +482,7 @@ setenv = bottle: TESTPATH=tests/integrations/bottle celery: TESTPATH=tests/integrations/celery chalice: TESTPATH=tests/integrations/chalice + clickhouse_driver: TESTPATH=tests/integrations/clickhouse_driver cloud_resource_context: TESTPATH=tests/integrations/cloud_resource_context django: TESTPATH=tests/integrations/django falcon: TESTPATH=tests/integrations/falcon