Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(instrumentation-dbapi): add experimental sql commenter capability #908

Merged
merged 7 commits into from
Feb 11, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@

from opentelemetry import trace as trace_api
from opentelemetry.instrumentation.dbapi.version import __version__
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.instrumentation.utils import unwrap, _generate_sql_comment
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import SpanKind, TracerProvider, get_tracer
from opentelemetry.trace import Span, SpanKind, TracerProvider, get_tracer

logger = logging.getLogger(__name__)
_logger = logging.getLogger(__name__)


def trace_integration(
Expand All @@ -59,6 +59,7 @@ def trace_integration(
connection_attributes: typing.Dict = None,
tracer_provider: typing.Optional[TracerProvider] = None,
capture_parameters: bool = False,
enable_commenter: bool = False,
db_api_integration_factory=None,
):
"""Integrate with DB API library.
Expand All @@ -84,6 +85,7 @@ def trace_integration(
version=__version__,
tracer_provider=tracer_provider,
capture_parameters=capture_parameters,
enable_commenter=enable_commenter,
db_api_integration_factory=db_api_integration_factory,
)

Expand All @@ -97,6 +99,7 @@ def wrap_connect(
version: str = "",
tracer_provider: typing.Optional[TracerProvider] = None,
capture_parameters: bool = False,
enable_commenter: bool = False,
db_api_integration_factory=None,
):
"""Integrate with DB API library.
Expand Down Expand Up @@ -132,6 +135,7 @@ def wrap_connect_(
version=version,
tracer_provider=tracer_provider,
capture_parameters=capture_parameters,
enable_commenter=enable_commenter,
)
return db_integration.wrapped_connection(wrapped, args, kwargs)

Expand All @@ -140,7 +144,7 @@ def wrap_connect_(
connect_module, connect_method_name, wrap_connect_
)
except Exception as ex: # pylint: disable=broad-except
logger.warning("Failed to integrate with DB API. %s", str(ex))
_logger.warning("Failed to integrate with DB API. %s", str(ex))


def unwrap_connect(
Expand All @@ -163,7 +167,8 @@ def instrument_connection(
connection_attributes: typing.Dict = None,
version: str = "",
tracer_provider: typing.Optional[TracerProvider] = None,
capture_parameters=False,
capture_parameters: bool = False,
enable_commenter: bool = False,
):
"""Enable instrumentation in a database connection.

Expand All @@ -180,7 +185,7 @@ def instrument_connection(
An instrumented connection.
"""
if isinstance(connection, wrapt.ObjectProxy):
logger.warning("Connection already instrumented")
_logger.warning("Connection already instrumented")
return connection

db_integration = DatabaseApiIntegration(
Expand All @@ -190,6 +195,7 @@ def instrument_connection(
version=version,
tracer_provider=tracer_provider,
capture_parameters=capture_parameters,
enable_commenter=enable_commenter
)
db_integration.get_connection_attributes(connection)
return get_traced_connection_proxy(connection, db_integration)
Expand All @@ -207,7 +213,7 @@ def uninstrument_connection(connection):
if isinstance(connection, wrapt.ObjectProxy):
return connection.__wrapped__

logger.warning("Connection is not instrumented")
_logger.warning("Connection is not instrumented")
return connection


Expand All @@ -220,6 +226,7 @@ def __init__(
version: str = "",
tracer_provider: typing.Optional[TracerProvider] = None,
capture_parameters: bool = False,
enable_commenter: bool = False,
):
self.connection_attributes = connection_attributes
if self.connection_attributes is None:
Expand All @@ -237,6 +244,7 @@ def __init__(
tracer_provider=tracer_provider,
)
self.capture_parameters = capture_parameters
self.enable_commenter = enable_commenter
self.database_system = database_system
self.connection_props = {}
self.span_attributes = {}
Expand Down Expand Up @@ -313,8 +321,9 @@ def __exit__(self, *args, **kwargs):


class CursorTracer:
def __init__(self, db_api_integration: DatabaseApiIntegration):
def __init__(self, db_api_integration: DatabaseApiIntegration) -> None:
self._db_api_integration = db_api_integration
self._commenter_enabled = self._db_api_integration.enable_commenter

def _populate_span(
self,
Expand Down Expand Up @@ -355,6 +364,20 @@ def get_statement(self, cursor, args): # pylint: disable=no-self-use
return statement.decode("utf8", "replace")
return statement

@staticmethod
def _generate_comment(span: Span) -> str:
span_context = span.get_span_context()
meta = {}
if span_context.is_valid:
meta.update({
'trace_id': span_context.trace_id,
'span_id': span_context.span_id,
'trace_flags': span_context.trace_flags,
'trace_state': span_context.trace_state.to_header()
})
## TODO(schekuri): revisit to enrich with info such as route, db_driver etc...
return _generate_sql_comment(**meta)

def traced_execution(
self,
cursor,
Expand All @@ -374,6 +397,16 @@ def traced_execution(
name, kind=SpanKind.CLIENT
) as span:
self._populate_span(span, cursor, *args)
if args and self._commenter_enabled:
try:
comment = self._generate_comment(span)
if isinstance(args[0], bytes):
comment = comment.encode("utf8")
args_list = list(args)
args_list[0] += comment
args = tuple(args_list)
except Exception as exc:
_logger.warning("Exception while generating sql comment: %s", exc)
srikanthccv marked this conversation as resolved.
Show resolved Hide resolved
return query_method(*args, **kwargs)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,21 @@ def test_executemany(self):
span.attributes[SpanAttributes.DB_STATEMENT], "Test query"
)

def test_executemany_comment(self):
db_integration = dbapi.DatabaseApiIntegration(
"testname", "testcomponent", enable_commenter=True
)
mock_connection = db_integration.wrapped_connection(
mock_connect, {}, {}
)
cursor = mock_connection.cursor()
cursor.executemany("Test query")
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]
comment = dbapi.CursorTracer._generate_comment(span)
self.assertIn(comment, cursor.query)

def test_callproc(self):
db_integration = dbapi.DatabaseApiIntegration(
"testname", "testcomponent"
Expand Down Expand Up @@ -317,6 +332,8 @@ def execute(self, query, params=None, throw_exception=False):
def executemany(self, query, params=None, throw_exception=False):
if throw_exception:
raise Exception("Test Exception")
self.query = query
self.params = params

# pylint: disable=unused-argument, no-self-use
def callproc(self, query, params=None, throw_exception=False):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import urllib.parse
from typing import Dict, Sequence

from wrapt import ObjectProxy
Expand Down Expand Up @@ -115,3 +116,38 @@ def _start_internal_or_server_span(
attributes=attributes,
)
return span, token


_KEY_VALUE_DELIMITER = ","
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Single-use constant



def _generate_sql_comment(**meta):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it really necessary to add this to utils? This is a single-use function, its contents can just be used directly where the function is called.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, there will be many sql db instrumentations that will rely on this.

"""
Return a SQL comment with comma delimited key=value pairs created from
**meta kwargs.
"""
if not meta: # No entries added.
return ""

# Sort the keywords to ensure that caching works and that testing is
# deterministic. It eases visual inspection as well.
return (
" /*"
+ _KEY_VALUE_DELIMITER.join(
"{}={!r}".format(_url_quote(key), _url_quote(value))
for key, value in sorted(meta.items())
if value is not None
)
+ "*/"
)


def _url_quote(s):
if not isinstance(s, (str, bytes)):
return s
quoted = urllib.parse.quote(s)
# Since SQL uses '%' as a keyword, '%' is a by-product of url quoting
# e.g. foo,bar --> foo%2Cbar
# thus in our quoting, we need to escape it too to finally give
# foo,bar --> foo%%2Cbar
return quoted.replace("%", "%%")