Skip to content

Commit

Permalink
Merge branch 'master' into grpc_metrics_tests
Browse files Browse the repository at this point in the history
  • Loading branch information
cnnradams authored Jul 28, 2020
2 parents c8ab7d3 + 8bc7786 commit c140121
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 96 deletions.
27 changes: 27 additions & 0 deletions .github/pull_request_template.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Description

Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.

Fixes # (issue)

## Type of change

Please delete options that are not relevant.

- [ ] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
- [ ] This change requires a documentation update

# How Has This Been Tested?

Please describe the tests that you ran to verify your changes. Provide instructions so we can reproduce. Please also list any relevant details for your test configuration

- [ ] Test A

# Checklist:

- [ ] Followed the style guidelines of this project
- [ ] Changelogs have been updated
- [ ] Unit tests have been added
- [ ] Documentation has been updated
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,12 @@ Meeting notes are available as a public [Google doc](https://docs.google.com/doc

Approvers ([@open-telemetry/python-approvers](https://github.com/orgs/open-telemetry/teams/python-approvers)):

- [Aaron Abbott](https://github.com/aabmass), Google
- [Carlos Alberto Cortez](https://github.com/carlosalberto), Lightstep
- [Tahir H. Butt](https://github.com/majorgreys) DataDog
- [Chris Kleinknecht](https://github.com/c24t), Google
- [Diego Hurtado](https://github.com/ocelotl)
- [Hector Hernandez](https://github.com/hectorhdzg), Microsoft
- [Mauricio Vásquez](https://github.com/mauriciovasquezbernal), Kinvolk
- [Reiley Yang](https://github.com/reyang), Microsoft
- [Yusuke Tsutsumi](https://github.com/toumorokoshi), Google

Expand Down
3 changes: 3 additions & 0 deletions ext/opentelemetry-ext-asyncpg/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

- Shouldn't capture query parameters by default
([#854](https://github.com/open-telemetry/opentelemetry-python/pull/854))

## Version 0.10b0

Released 2020-06-23
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,36 +74,11 @@ def _hydrate_span_from_args(connection, query, parameters) -> dict:
return span_attributes


async def _do_execute(func, instance, args, kwargs):
span_attributes = _hydrate_span_from_args(instance, args[0], args[1:])
tracer = getattr(asyncpg, _APPLIED)

exception = None

with tracer.start_as_current_span(
"postgresql", kind=SpanKind.CLIENT
) as span:

for attribute, value in span_attributes.items():
span.set_attribute(attribute, value)

try:
result = await func(*args, **kwargs)
except Exception as exc: # pylint: disable=W0703
exception = exc
raise
finally:
if exception is not None:
span.set_status(
Status(_exception_to_canonical_code(exception))
)
else:
span.set_status(Status(StatusCanonicalCode.OK))

return result


class AsyncPGInstrumentor(BaseInstrumentor):
def __init__(self, capture_parameters=False):
super().__init__()
self.capture_parameters = capture_parameters

def _instrument(self, **kwargs):
tracer_provider = kwargs.get(
"tracer_provider", trace.get_tracer_provider()
Expand All @@ -113,6 +88,7 @@ def _instrument(self, **kwargs):
_APPLIED,
tracer_provider.get_tracer("asyncpg", __version__),
)

for method in [
"Connection.execute",
"Connection.executemany",
Expand All @@ -121,7 +97,7 @@ def _instrument(self, **kwargs):
"Connection.fetchrow",
]:
wrapt.wrap_function_wrapper(
"asyncpg.connection", method, _do_execute
"asyncpg.connection", method, self._do_execute
)

def _uninstrument(self, **__):
Expand All @@ -134,3 +110,33 @@ def _uninstrument(self, **__):
"fetchrow",
]:
unwrap(asyncpg.Connection, method)

async def _do_execute(self, func, instance, args, kwargs):
span_attributes = _hydrate_span_from_args(
instance, args[0], args[1:] if self.capture_parameters else None,
)
tracer = getattr(asyncpg, _APPLIED)

exception = None

with tracer.start_as_current_span(
"postgresql", kind=SpanKind.CLIENT
) as span:

for attribute, value in span_attributes.items():
span.set_attribute(attribute, value)

try:
result = await func(*args, **kwargs)
except Exception as exc: # pylint: disable=W0703
exception = exc
raise
finally:
if exception is not None:
span.set_status(
Status(_exception_to_canonical_code(exception))
)
else:
span.set_status(Status(StatusCanonicalCode.OK))

return result
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def _await(coro):
return loop.run_until_complete(coro)


class TestFunctionalPsycopg(TestBase):
class TestFunctionalAsyncPG(TestBase):
@classmethod
def setUpClass(cls):
super().setUpClass()
Expand Down Expand Up @@ -58,24 +58,6 @@ def test_instrumented_execute_method_without_arguments(self, *_, **__):
},
)

def test_instrumented_execute_method_with_arguments(self, *_, **__):
_await(self._connection.execute("SELECT $1;", "1"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertEqual(
StatusCanonicalCode.OK, spans[0].status.canonical_code
)
self.assertEqual(
spans[0].attributes,
{
"db.type": "sql",
"db.user": POSTGRES_USER,
"db.statement.parameters": "('1',)",
"db.instance": POSTGRES_DB_NAME,
"db.statement": "SELECT $1;",
},
)

def test_instrumented_fetch_method_without_arguments(self, *_, **__):
_await(self._connection.fetch("SELECT 42;"))
spans = self.memory_exporter.get_finished_spans()
Expand All @@ -90,52 +72,6 @@ def test_instrumented_fetch_method_without_arguments(self, *_, **__):
},
)

def test_instrumented_fetch_method_with_arguments(self, *_, **__):
_await(self._connection.fetch("SELECT $1;", "1"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertEqual(
spans[0].attributes,
{
"db.type": "sql",
"db.user": POSTGRES_USER,
"db.statement.parameters": "('1',)",
"db.instance": POSTGRES_DB_NAME,
"db.statement": "SELECT $1;",
},
)

def test_instrumented_executemany_method_with_arguments(self, *_, **__):
_await(self._connection.executemany("SELECT $1;", [["1"], ["2"]]))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertEqual(
{
"db.type": "sql",
"db.statement": "SELECT $1;",
"db.statement.parameters": "([['1'], ['2']],)",
"db.user": POSTGRES_USER,
"db.instance": POSTGRES_DB_NAME,
},
spans[0].attributes,
)

def test_instrumented_execute_interface_error_method(self, *_, **__):
with self.assertRaises(asyncpg.InterfaceError):
_await(self._connection.execute("SELECT 42;", 1, 2, 3))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertEqual(
spans[0].attributes,
{
"db.type": "sql",
"db.instance": POSTGRES_DB_NAME,
"db.user": POSTGRES_USER,
"db.statement.parameters": "(1, 2, 3)",
"db.statement": "SELECT 42;",
},
)

def test_instrumented_transaction_method(self, *_, **__):
async def _transaction_execute():
async with self._connection.transaction():
Expand Down Expand Up @@ -229,3 +165,113 @@ async def _transaction_execute():
self.assertEqual(
StatusCanonicalCode.OK, spans[2].status.canonical_code
)

def test_instrumented_method_doesnt_capture_parameters(self, *_, **__):
_await(self._connection.execute("SELECT $1;", "1"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertEqual(
StatusCanonicalCode.OK, spans[0].status.canonical_code
)
self.assertEqual(
spans[0].attributes,
{
"db.type": "sql",
"db.user": POSTGRES_USER,
# This shouldn't be set because we don't capture parameters by
# default
#
# "db.statement.parameters": "('1',)",
"db.instance": POSTGRES_DB_NAME,
"db.statement": "SELECT $1;",
},
)


class TestFunctionalAsyncPG_CaptureParameters(TestBase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls._connection = None
cls._cursor = None
cls._tracer = cls.tracer_provider.get_tracer(__name__)
AsyncPGInstrumentor(capture_parameters=True).instrument(
tracer_provider=cls.tracer_provider
)
cls._connection = _await(
asyncpg.connect(
database=POSTGRES_DB_NAME,
user=POSTGRES_USER,
password=POSTGRES_PASSWORD,
host=POSTGRES_HOST,
port=POSTGRES_PORT,
)
)

@classmethod
def tearDownClass(cls):
AsyncPGInstrumentor().uninstrument()

def test_instrumented_execute_method_with_arguments(self, *_, **__):
_await(self._connection.execute("SELECT $1;", "1"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertEqual(
StatusCanonicalCode.OK, spans[0].status.canonical_code
)
self.assertEqual(
spans[0].attributes,
{
"db.type": "sql",
"db.user": POSTGRES_USER,
"db.statement.parameters": "('1',)",
"db.instance": POSTGRES_DB_NAME,
"db.statement": "SELECT $1;",
},
)

def test_instrumented_fetch_method_with_arguments(self, *_, **__):
_await(self._connection.fetch("SELECT $1;", "1"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertEqual(
spans[0].attributes,
{
"db.type": "sql",
"db.user": POSTGRES_USER,
"db.statement.parameters": "('1',)",
"db.instance": POSTGRES_DB_NAME,
"db.statement": "SELECT $1;",
},
)

def test_instrumented_executemany_method_with_arguments(self, *_, **__):
_await(self._connection.executemany("SELECT $1;", [["1"], ["2"]]))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertEqual(
{
"db.type": "sql",
"db.statement": "SELECT $1;",
"db.statement.parameters": "([['1'], ['2']],)",
"db.user": POSTGRES_USER,
"db.instance": POSTGRES_DB_NAME,
},
spans[0].attributes,
)

def test_instrumented_execute_interface_error_method(self, *_, **__):
with self.assertRaises(asyncpg.InterfaceError):
_await(self._connection.execute("SELECT 42;", 1, 2, 3))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertEqual(
spans[0].attributes,
{
"db.type": "sql",
"db.instance": POSTGRES_DB_NAME,
"db.user": POSTGRES_USER,
"db.statement.parameters": "(1, 2, 3)",
"db.statement": "SELECT 42;",
},
)

0 comments on commit c140121

Please sign in to comment.