Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add elasticsearch db.statement sanitization #1598

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- `opentelemetry-instrumentation-redis` Add `sanitize_query` config option to allow query sanitization. ([#1572](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1572))
- Add default query sanitization for elasticsearch db.statement attribute
([#1545](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1545))
Copy link
Member

@shalevr shalevr Feb 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your PR is not #1598 ?

- `opentelemetry-instrumentation-redis` Add `sanitize_query` config option to allow query sanitization.
Copy link
Member

@shalevr shalevr Feb 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The merge with the main accidentally adds this entry

([#1572](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1572))
- `opentelemetry-instrumentation-celery` Record exceptions as events on the span.
([#1573](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1573))
- Add metric instrumentation for urllib
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ def response_hook(span, response):
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import SpanKind, get_tracer

from .utils import sanitize_body

logger = getLogger(__name__)


Expand Down Expand Up @@ -135,11 +137,16 @@ def _instrument(self, **kwargs):
tracer = get_tracer(__name__, __version__, tracer_provider)
request_hook = kwargs.get("request_hook")
response_hook = kwargs.get("response_hook")
sanitize_query = kwargs.get("sanitize_query", True)
_wrap(
elasticsearch,
"Transport.perform_request",
_wrap_perform_request(
tracer, self._span_name_prefix, request_hook, response_hook
tracer,
sanitize_query,
self._span_name_prefix,
request_hook,
response_hook,
),
)

Expand All @@ -154,7 +161,11 @@ def _uninstrument(self, **kwargs):


def _wrap_perform_request(
tracer, span_name_prefix, request_hook=None, response_hook=None
tracer,
sanitize_query,
span_name_prefix,
request_hook=None,
response_hook=None,
):
# pylint: disable=R0912,R0914
def wrapper(wrapped, _, args, kwargs):
Expand Down Expand Up @@ -213,7 +224,10 @@ def wrapper(wrapped, _, args, kwargs):
if method:
attributes["elasticsearch.method"] = method
if body:
attributes[SpanAttributes.DB_STATEMENT] = str(body)
statement = str(body)
if sanitize_query:
statement = sanitize_body(body)
attributes[SpanAttributes.DB_STATEMENT] = statement
if params:
attributes["elasticsearch.params"] = str(params)
if doc_id:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

sanitized_keys = (
"message",
"should",
"filter",
"query",
"queries",
"intervals",
"match",
)
sanitized_value = "?"


def _flatten_dict(d, parent_key=""):
items = []
for k, v in d.items():
new_key = parent_key + "." + k if parent_key else k
if isinstance(v, dict):
items.extend(_flatten_dict(v, new_key).items())
else:
items.append((new_key, v))
return dict(items)


def _unflatten_dict(d):
res = {}
for k, v in d.items():
keys = k.split(".")
d = res
for key in keys[:-1]:
if key not in d:
d[key] = {}
d = d[key]
d[keys[-1]] = v
return res


def sanitize_body(body) -> str:
flatten_body = _flatten_dict(body)

for key in flatten_body.keys():
if key.endswith(sanitized_keys):
flatten_body[key] = sanitized_value

return str(_unflatten_dict(flatten_body))
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
interval_query = {
"query": {
"intervals": {
"my_text": {
"all_of": {
"ordered": True,
"intervals": [
{
"match": {
"query": "my favorite food",
"max_gaps": 0,
"ordered": True,
}
},
{
"any_of": {
"intervals": [
{"match": {"query": "hot water"}},
{"match": {"query": "cold porridge"}},
]
}
},
],
}
}
}
}
}

match_query = {"query": {"match": {"message": {"query": "this is a test"}}}}

filter_query = {
"query": {
"bool": {
"must": [
{"match": {"title": "Search"}},
{"match": {"content": "Elasticsearch"}},
],
"filter": [
{"term": {"status": "published"}},
{"range": {"publish_date": {"gte": "2015-01-01"}}},
],
}
}
}

interval_query_sanitized = {
"query": {"intervals": {"my_text": {"all_of": {"ordered": True, "intervals": "?"}}}}
}
match_query_sanitized = {"query": {"match": {"message": {"query": "?"}}}}
filter_query_sanitized = {
"query": {
"bool": {
"must": [
{"match": {"title": "Search"}},
{"match": {"content": "Elasticsearch"}},
],
"filter": "?",
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@

import opentelemetry.instrumentation.elasticsearch
from opentelemetry import trace
from opentelemetry.instrumentation.elasticsearch import (
ElasticsearchInstrumentor,
)
from opentelemetry.instrumentation.elasticsearch import ElasticsearchInstrumentor
from opentelemetry.instrumentation.elasticsearch.utils import sanitize_body
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace import StatusCode

major_version = elasticsearch.VERSION[0]

from . import sanitization_queries as queries

if major_version == 7:
from . import helpers_es7 as helpers # pylint: disable=no-name-in-module
elif major_version == 6:
Expand All @@ -42,18 +43,33 @@
else:
from . import helpers_es2 as helpers # pylint: disable=no-name-in-module


Article = helpers.Article


@mock.patch(
"elasticsearch.connection.http_urllib3.Urllib3HttpConnection.perform_request"
)
class TestElasticsearchIntegration(TestBase):
search_attributes = {
SpanAttributes.DB_SYSTEM: "elasticsearch",
"elasticsearch.url": "/test-index/_search",
"elasticsearch.method": helpers.dsl_search_method,
"elasticsearch.target": "test-index",
SpanAttributes.DB_STATEMENT: str(
{"query": {"bool": {"filter": [{"term": {"author": "testing"}}]}}}
),
}

create_attributes = {
SpanAttributes.DB_SYSTEM: "elasticsearch",
"elasticsearch.url": "/test-index",
"elasticsearch.method": "HEAD",
}

def setUp(self):
super().setUp()
self.tracer = self.tracer_provider.get_tracer(__name__)
ElasticsearchInstrumentor().instrument()
ElasticsearchInstrumentor().instrument(sanitize_query=False)

def tearDown(self):
super().tearDown()
Expand Down Expand Up @@ -140,9 +156,7 @@ def test_result_values(self, request_mock):
self.assertEqual(1, len(spans))
self.assertEqual(spans[0].name, "Elasticsearch/test-index/_doc/:id")
self.assertEqual("False", spans[0].attributes["elasticsearch.found"])
self.assertEqual(
"True", spans[0].attributes["elasticsearch.timed_out"]
)
self.assertEqual("True", spans[0].attributes["elasticsearch.timed_out"])
self.assertEqual("7", spans[0].attributes["elasticsearch.took"])

def test_trace_error_unknown(self, request_mock):
Expand All @@ -169,9 +183,7 @@ def _test_trace_error(self, code, exc):
span = spans[0]
self.assertFalse(span.status.is_ok)
self.assertEqual(span.status.status_code, code)
self.assertEqual(
span.status.description, f"{type(exc).__name__}: {exc}"
)
self.assertEqual(span.status.description, f"{type(exc).__name__}: {exc}")

def test_parent(self, request_mock):
request_mock.return_value = (1, {}, {})
Expand Down Expand Up @@ -241,21 +253,35 @@ def test_dsl_search(self, request_mock):
self.assertIsNotNone(span.end_time)
self.assertEqual(
span.attributes,
{
SpanAttributes.DB_SYSTEM: "elasticsearch",
"elasticsearch.url": "/test-index/_search",
"elasticsearch.method": helpers.dsl_search_method,
"elasticsearch.target": "test-index",
SpanAttributes.DB_STATEMENT: str(
{
"query": {
"bool": {
"filter": [{"term": {"author": "testing"}}]
}
}
}
),
},
self.search_attributes,
)

def test_dsl_search_sanitized(self, request_mock):
# Reset instrumentation to use sanitized query (default)
ElasticsearchInstrumentor().uninstrument()
ElasticsearchInstrumentor().instrument()

# update expected attributes to match sanitized query
sanitized_search_attributes = self.search_attributes.copy()
sanitized_search_attributes.update(
{SpanAttributes.DB_STATEMENT: "{'query': {'bool': {'filter': '?'}}}"}
)

request_mock.return_value = (1, {}, '{"hits": {"hits": []}}')
client = Elasticsearch()
search = Search(using=client, index="test-index").filter(
"term", author="testing"
)
search.execute()
spans = self.get_finished_spans()
span = spans[0]
self.assertEqual(1, len(spans))
self.assertEqual(span.name, "Elasticsearch/<target>/_search")
self.assertIsNotNone(span.end_time)
print("~~~~~~~~~~", span.attributes)
self.assertEqual(
span.attributes,
sanitized_search_attributes,
)

def test_dsl_create(self, request_mock):
Expand All @@ -264,17 +290,14 @@ def test_dsl_create(self, request_mock):
Article.init(using=client)

spans = self.get_finished_spans()
assert spans
self.assertEqual(2, len(spans))
span1 = spans.by_attr(key="elasticsearch.method", value="HEAD")
span2 = spans.by_attr(key="elasticsearch.method", value="PUT")

self.assertEqual(
span1.attributes,
{
SpanAttributes.DB_SYSTEM: "elasticsearch",
"elasticsearch.url": "/test-index",
"elasticsearch.method": "HEAD",
},
self.create_attributes,
)

attributes = {
Expand All @@ -288,6 +311,26 @@ def test_dsl_create(self, request_mock):
helpers.dsl_create_statement,
)

def test_dsl_create_sanitized(self, request_mock):
# Reset instrumentation to explicitly use sanitized query
ElasticsearchInstrumentor().uninstrument()
ElasticsearchInstrumentor().instrument(sanitize_query=True)

request_mock.return_value = (1, {}, {})
client = Elasticsearch()
Article.init(using=client)

spans = self.get_finished_spans()
assert spans

self.assertEqual(2, len(spans))
span = spans.by_attr(key="elasticsearch.method", value="HEAD")

self.assertEqual(
span.attributes,
self.create_attributes,
)

def test_dsl_index(self, request_mock):
request_mock.return_value = helpers.dsl_index_result

Expand Down Expand Up @@ -349,9 +392,7 @@ def request_hook(span, method, url, kwargs):
spans = self.get_finished_spans()

self.assertEqual(1, len(spans))
self.assertEqual(
"GET", spans[0].attributes[request_hook_method_attribute]
)
self.assertEqual("GET", spans[0].attributes[request_hook_method_attribute])
self.assertEqual(
f"/{index}/_doc/{doc_id}",
spans[0].attributes[request_hook_url_attribute],
Expand All @@ -366,9 +407,7 @@ def test_response_hook(self, request_mock):

def response_hook(span, response):
if span and span.is_recording():
span.set_attribute(
response_attribute_name, json.dumps(response)
)
span.set_attribute(response_attribute_name, json.dumps(response))

ElasticsearchInstrumentor().uninstrument()
ElasticsearchInstrumentor().instrument(response_hook=response_hook)
Expand Down Expand Up @@ -412,3 +451,14 @@ def response_hook(span, response):
json.dumps(response_payload),
spans[0].attributes[response_attribute_name],
)

def test_body_sanitization(self, _):
self.assertEqual(
sanitize_body(queries.interval_query), str(queries.interval_query_sanitized)
)
self.assertEqual(
sanitize_body(queries.match_query), str(queries.match_query_sanitized)
)
self.assertEqual(
sanitize_body(queries.filter_query), str(queries.filter_query_sanitized)
)