Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add span for connection phase #1134

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
886ce6a
Add span for connection phase
shahargl Jun 15, 2022
0d7c028
Fix the tests
shahargl Jun 16, 2022
3310698
Merge branch 'main' into feature/sqlalchemy-connect-span
shahargl Jun 16, 2022
cd588a8
Fix the multiple engine test
shahargl Jun 16, 2022
1d33583
Add engine connection instrumentation test
shahargl Jun 16, 2022
269fada
Merge branch 'main' into feature/sqlalchemy-connect-span
srikanthccv Jun 16, 2022
6141cb7
Support SQLAlchemy 1.1 too
shahargl Jun 19, 2022
d86faf4
Merge branch 'feature/sqlalchemy-connect-span' of github.com:shahargl…
shahargl Jun 19, 2022
3ca0778
Merge branch 'main' into feature/sqlalchemy-connect-span
srikanthccv Jun 21, 2022
3a22a4a
Update instrumentation/opentelemetry-instrumentation-sqlalchemy/src/o…
shahargl Jun 22, 2022
7b539c5
Merge branch 'main' into feature/sqlalchemy-connect-span
srikanthccv Jun 25, 2022
59f1e6d
Don't create tracer for each connect
shahargl Jun 26, 2022
4ade098
Merge from main
shahargl Jun 26, 2022
72c82cc
Update instrumentation/opentelemetry-instrumentation-sqlalchemy/src/o…
shahargl Jun 27, 2022
46f3bc1
Merge branch 'main' into feature/sqlalchemy-connect-span
srikanthccv Jun 27, 2022
467e614
Fix the tests
shahargl Jun 28, 2022
0d580a0
Merge branch 'main' into feature/sqlalchemy-connect-span
srikanthccv Jul 3, 2022
64c93a6
tox generate
shahargl Jul 11, 2022
f958a59
Merge branch 'feature/sqlalchemy-connect-span' of github.com:shahargl…
shahargl Jul 11, 2022
d2df489
Fix tests
shahargl Jul 11, 2022
24d4bcc
fix tests
shahargl Jul 12, 2022
95fe38a
Merge branch 'main' into feature/sqlalchemy-connect-span
srikanthccv Jul 12, 2022
2751862
.
shahargl Jul 12, 2022
cc4513a
Merge branch 'main' into feature/sqlalchemy-connect-span
shahargl Jul 17, 2022
91fb04d
fixing typos
shahargl Jul 17, 2022
43229e0
Merge branch 'feature/sqlalchemy-connect-span' of github.com:shahargl…
shahargl Jul 17, 2022
bb53566
Fixing test_instrument
shahargl Jul 17, 2022
64c0ad5
revert change
shahargl Jul 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- `opentelemetry-instrumentation-redis` add support to instrument RedisCluster clients
([#1177](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1177))
- `opentelemetry-instrumentation-sqlalchemy` Added span for the connection phase ([#1133](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/1133))

## [1.12.0rc2-0.32b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.12.0rc2-0.32b0) - 2022-07-01

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@

import sqlalchemy
from packaging.version import parse as parse_version
from sqlalchemy.engine.base import Engine
from wrapt import wrap_function_wrapper as _w

from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.sqlalchemy.engine import (
EngineTracer,
_get_tracer,
_wrap_connect,
_wrap_create_async_engine,
_wrap_create_engine,
)
Expand Down Expand Up @@ -97,13 +99,17 @@ def _instrument(self, **kwargs):
"create_engine",
_wrap_create_engine(tracer_provider),
)
_w(
"sqlalchemy.engine.base",
"Engine.connect",
_wrap_connect(tracer_provider),
)
if parse_version(sqlalchemy.__version__).release >= (1, 4):
_w(
"sqlalchemy.ext.asyncio",
"create_async_engine",
_wrap_create_async_engine(tracer_provider),
)

if kwargs.get("engine") is not None:
return EngineTracer(
_get_tracer(tracer_provider),
Expand All @@ -127,5 +133,6 @@ def _instrument(self, **kwargs):
def _uninstrument(self, **kwargs):
unwrap(sqlalchemy, "create_engine")
unwrap(sqlalchemy.engine, "create_engine")
unwrap(Engine, "connect")
if parse_version(sqlalchemy.__version__).release >= (1, 4):
unwrap(sqlalchemy.ext.asyncio, "create_async_engine")
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,23 @@ def _wrap_create_engine_internal(func, module, args, kwargs):
return _wrap_create_engine_internal


def _wrap_connect(tracer_provider=None):
tracer = trace.get_tracer(
_instrumenting_module_name,
__version__,
tracer_provider=tracer_provider,
)

# pylint: disable=unused-argument
def _wrap_connect_internal(func, module, args, kwargs):
shahargl marked this conversation as resolved.
Show resolved Hide resolved
with tracer.start_as_current_span(
"connect", kind=trace.SpanKind.CLIENT
):
return func(*args, **kwargs)

return _wrap_connect_internal


class EngineTracer:
def __init__(self, tracer, engine, enable_commenter=False):
self.tracer = tracer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,13 @@ def test_trace_integration(self):
cnx.execute("SELECT 1 + 1;").fetchall()
spans = self.memory_exporter.get_finished_spans()

self.assertEqual(len(spans), 1)
self.assertEqual(spans[0].name, "SELECT :memory:")
self.assertEqual(len(spans), 2)
# first span - the connection to the db
self.assertEqual(spans[0].name, "connect")
self.assertEqual(spans[0].kind, trace.SpanKind.CLIENT)
# second span - the query itself
self.assertEqual(spans[1].name, "SELECT :memory:")
self.assertEqual(spans[1].kind, trace.SpanKind.CLIENT)

def test_instrument_two_engines(self):
engine_1 = create_engine("sqlite:///:memory:")
Expand All @@ -65,8 +69,20 @@ def test_instrument_two_engines(self):
cnx_2.execute("SELECT 1 + 1;").fetchall()

spans = self.memory_exporter.get_finished_spans()
# 2 queries + 2 engine connect
self.assertEqual(len(spans), 4)

self.assertEqual(len(spans), 2)
def test_instrument_engine_connect(self):
engine = create_engine("sqlite:///:memory:")

SQLAlchemyInstrumentor().instrument(
engine=engine,
tracer_provider=self.tracer_provider,
)

engine.connect()
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)

@pytest.mark.skipif(
not sqlalchemy.__version__.startswith("1.4"),
Expand All @@ -85,11 +101,15 @@ async def run():
async with engine.connect() as cnx:
await cnx.execute(sqlalchemy.text("SELECT 1 + 1;"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertEqual(spans[0].name, "SELECT :memory:")
self.assertEqual(len(spans), 2)
# first span - the connection to the db
self.assertEqual(spans[0].name, "connect")
self.assertEqual(spans[0].kind, trace.SpanKind.CLIENT)
# second span - the query
self.assertEqual(spans[1].name, "SELECT :memory:")
self.assertEqual(spans[1].kind, trace.SpanKind.CLIENT)
self.assertEqual(
spans[0].instrumentation_scope.name,
spans[1].instrumentation_scope.name,
"opentelemetry.instrumentation.sqlalchemy",
)

Expand All @@ -99,7 +119,10 @@ def test_not_recording(self):
mock_tracer = mock.Mock()
mock_span = mock.Mock()
mock_span.is_recording.return_value = False
mock_span.__enter__ = mock.Mock(return_value=(mock.Mock(), None))
mock_span.__exit__ = mock.Mock(return_value=None)
mock_tracer.start_span.return_value = mock_span
mock_tracer.start_as_current_span.return_value = mock_span
with mock.patch("opentelemetry.trace.get_tracer") as tracer:
tracer.return_value = mock_tracer
engine = create_engine("sqlite:///:memory:")
Expand All @@ -123,11 +146,15 @@ def test_create_engine_wrapper(self):
cnx.execute("SELECT 1 + 1;").fetchall()
spans = self.memory_exporter.get_finished_spans()

self.assertEqual(len(spans), 1)
self.assertEqual(spans[0].name, "SELECT :memory:")
self.assertEqual(len(spans), 2)
# first span - the connection to the db
self.assertEqual(spans[0].name, "connect")
self.assertEqual(spans[0].kind, trace.SpanKind.CLIENT)
# second span - the query
self.assertEqual(spans[1].name, "SELECT :memory:")
self.assertEqual(spans[1].kind, trace.SpanKind.CLIENT)
self.assertEqual(
spans[0].instrumentation_scope.name,
spans[1].instrumentation_scope.name,
"opentelemetry.instrumentation.sqlalchemy",
)

Expand All @@ -153,7 +180,7 @@ def test_custom_tracer_provider(self):
cnx.execute("SELECT 1 + 1;").fetchall()
spans = self.memory_exporter.get_finished_spans()

self.assertEqual(len(spans), 1)
self.assertEqual(len(spans), 2)
self.assertEqual(spans[0].resource.attributes["service.name"], "test")
self.assertEqual(
spans[0].resource.attributes["deployment.environment"], "env"
Expand All @@ -177,11 +204,15 @@ async def run():
async with engine.connect() as cnx:
await cnx.execute(sqlalchemy.text("SELECT 1 + 1;"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertEqual(spans[0].name, "SELECT :memory:")
self.assertEqual(len(spans), 2)
# first span - the connection to the db
self.assertEqual(spans[0].name, "connect")
self.assertEqual(spans[0].kind, trace.SpanKind.CLIENT)
# second span - the query
self.assertEqual(spans[1].name, "SELECT :memory:")
self.assertEqual(spans[1].kind, trace.SpanKind.CLIENT)
self.assertEqual(
spans[0].instrumentation_scope.name,
spans[1].instrumentation_scope.name,
"opentelemetry.instrumentation.sqlalchemy",
)

Expand All @@ -199,8 +230,8 @@ def test_generate_commenter(self):
cnx = engine.connect()
cnx.execute("SELECT 1 + 1;").fetchall()
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self.assertEqual(len(spans), 2)
span = spans[1]
self.assertIn(
EngineTracer._generate_comment(span),
self.caplog.records[-2].getMessage(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,9 @@ def test_orm_insert(self):
self.session.commit()

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
# one span for the connection and one for the query
self.assertEqual(len(spans), 2)
span = spans[1]
stmt = "INSERT INTO players (id, name) VALUES "
if span.attributes.get(SpanAttributes.DB_SYSTEM) == "sqlite":
stmt += "(?, ?)"
Expand All @@ -148,8 +149,9 @@ def test_session_query(self):
self.assertEqual(len(out), 0)

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
# one span for the connection and one for the query
self.assertEqual(len(spans), 2)
span = spans[1]
stmt = "SELECT players.id AS players_id, players.name AS players_name \nFROM players \nWHERE players.name = "
if span.attributes.get(SpanAttributes.DB_SYSTEM) == "sqlite":
stmt += "?"
Expand All @@ -170,8 +172,9 @@ def test_engine_connect_execute(self):
self.assertEqual(len(rows), 0)

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
# one span for the connection and one for the query
self.assertEqual(len(spans), 2)
span = spans[1]
self._check_span(span, "SELECT")
self.assertEqual(
span.attributes.get(SpanAttributes.DB_STATEMENT),
Expand All @@ -190,8 +193,9 @@ def test_parent(self):
self.assertEqual(len(rows), 0)

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 2)
child_span, parent_span = spans
# one span for the connection and two for the queries
self.assertEqual(len(spans), 3)
_, child_span, parent_span = spans

# confirm the parenting
self.assertIsNone(parent_span.parent)
Expand Down Expand Up @@ -247,5 +251,5 @@ def insert_players(session):
# batch inserts together which means `insert_players` only generates one span.
# See https://docs.sqlalchemy.org/en/14/changelog/migration_14.html#orm-batch-inserts-with-psycopg2-now-batch-statements-with-returning-in-most-cases
self.assertEqual(
len(spans), 5 if self.VENDOR not in ["postgresql"] else 3
len(spans), 8 if self.VENDOR not in ["postgresql"] else 6
)
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ def test_engine_execute_errors(self):
conn.execute("SELECT * FROM a_wrong_table").fetchall()

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
# one span for the connection and one for the query
self.assertEqual(len(spans), 2)
span = spans[1]
# span fields
self.assertEqual(span.name, "SELECT opentelemetry-tests")
self.assertEqual(
Expand All @@ -96,9 +97,9 @@ def test_orm_insert(self):
self.session.commit()

spans = self.memory_exporter.get_finished_spans()
# identity insert on before the insert, insert, and identity insert off after the insert
self.assertEqual(len(spans), 3)
span = spans[1]
# connect, identity insert on before the insert, insert, and identity insert off after the insert
self.assertEqual(len(spans), 4)
span = spans[2]
self._check_span(span, "INSERT")
self.assertIn(
"INSERT INTO players",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ def test_engine_execute_errors(self):
conn.execute("SELECT * FROM a_wrong_table").fetchall()

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
# one span for the connection and one for the query
self.assertEqual(len(spans), 2)
span = spans[1]
# span fields
self.assertEqual(span.name, "SELECT opentelemetry-tests")
self.assertEqual(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ def test_engine_execute_errors(self):
conn.execute("SELECT * FROM a_wrong_table").fetchall()

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
# one span for the connection and one for the query
self.assertEqual(len(spans), 2)
span = spans[1]
# span fields
self.assertEqual(span.name, "SELECT opentelemetry-tests")
self.assertEqual(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ def test_engine_execute_errors(self):
conn.execute(stmt).fetchall()

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
# one span for the connection and one span for the query
self.assertEqual(len(spans), 2)
span = spans[1]
# span fields
self.assertEqual(span.name, "SELECT :memory:")
self.assertEqual(
Expand Down