Skip to content

Commit

Permalink
Update semantic convention mappings (#24366)
Browse files Browse the repository at this point in the history
  • Loading branch information
lzchen authored May 12, 2022
1 parent 5699616 commit 54ef18d
Show file tree
Hide file tree
Showing 6 changed files with 347 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
# Licensed under the MIT License.
import json
import logging
from typing import Sequence, Any
from typing import Optional, Sequence, Any
from urllib.parse import urlparse

from opentelemetry.util.types import Attributes
from opentelemetry.semconv.trace import DbSystemValues, SpanAttributes
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from opentelemetry.trace import Span, SpanKind
Expand Down Expand Up @@ -116,18 +117,33 @@ def _convert_span_to_envelope(span: Span) -> TelemetryItem:
response_code="0",
success=span.status.is_ok,
properties={},
measurements={},
)
envelope.data = MonitorBase(base_data=data, base_type="RequestData")
if SpanAttributes.HTTP_METHOD in span.attributes: # HTTP
envelope.tags["ai.operation.name"] = span.name
if SpanAttributes.NET_PEER_IP in span.attributes:
envelope.tags["ai.location.ip"] = span.attributes[SpanAttributes.NET_PEER_IP]
if "az.namespace" in span.attributes: # Azure specific resources
# Currently only eventhub and servicebus are supported (kind CONSUMER)
data.source = _get_azure_sdk_target_source(span.attributes)
if span.links:
total = 0
for link in span.links:
attributes = link.attributes
enqueued_time = attributes.get("enqueuedTime")
if enqueued_time:
difference = (span.start_time / 1000000) - int(enqueued_time)
total += difference
data.measurements["timeSinceEnqueued"] = max(0, total / len(span.links))
elif SpanAttributes.HTTP_METHOD in span.attributes: # HTTP
url = ""
path = ""
if SpanAttributes.HTTP_USER_AGENT in span.attributes:
# TODO: Not exposed in Swagger, need to update def
envelope.tags["ai.user.userAgent"] = span.attributes[SpanAttributes.HTTP_USER_AGENT]
# http specific logic for ai.location.ip
if SpanAttributes.HTTP_CLIENT_IP in span.attributes:
envelope.tags["ai.location.ip"] = span.attributes[SpanAttributes.HTTP_CLIENT_IP]
elif SpanAttributes.NET_PEER_IP in span.attributes:
envelope.tags["ai.location.ip"] = span.attributes[SpanAttributes.NET_PEER_IP]
# url
if SpanAttributes.HTTP_URL in span.attributes:
url = span.attributes[SpanAttributes.HTTP_URL]
Expand Down Expand Up @@ -177,38 +193,31 @@ def _convert_span_to_envelope(span: Span) -> TelemetryItem:
)
except Exception: # pylint: disable=broad-except
pass
else:
envelope.tags["ai.operation.name"] = span.name
if SpanAttributes.HTTP_STATUS_CODE in span.attributes:
status_code = span.attributes[SpanAttributes.HTTP_STATUS_CODE]
data.response_code = str(status_code)
elif SpanAttributes.MESSAGING_SYSTEM in span.attributes: # Messaging
envelope.tags["ai.operation.name"] = span.name
if SpanAttributes.NET_PEER_IP in span.attributes:
envelope.tags["ai.location.ip"] = span.attributes[SpanAttributes.NET_PEER_IP]
if SpanAttributes.MESSAGING_DESTINATION in span.attributes:
if SpanAttributes.NET_PEER_NAME in span.attributes:
data.properties["source"] = "{}/{}".format(
data.source = "{}/{}".format(
span.attributes[SpanAttributes.NET_PEER_NAME],
span.attributes[SpanAttributes.MESSAGING_DESTINATION],
)
elif SpanAttributes.NET_PEER_IP in span.attributes:
data.properties["source"] = "{}/{}".format(
data.source = "{}/{}".format(
span.attributes[SpanAttributes.NET_PEER_IP],
span.attributes[SpanAttributes.MESSAGING_DESTINATION],
)
else:
data.properties["source"] = span.attributes[SpanAttributes.MESSAGING_DESTINATION]
else: # Other
envelope.tags["ai.operation.name"] = span.name
if SpanAttributes.NET_PEER_IP in span.attributes:
envelope.tags["ai.location.ip"] = span.attributes[SpanAttributes.NET_PEER_IP]
data.source = span.attributes[SpanAttributes.MESSAGING_DESTINATION]
# Apply truncation
if data.url:
data.url = data.url[:2048] # Breeze max length
if data.response_code:
data.response_code = data.response_code[:1024] # Breeze max length
if envelope.tags["ai.operation.name"]:
if envelope.tags.get("ai.operation.name"):
data.name = envelope.tags["ai.operation.name"][:1024] # Breeze max length
else: # INTERNAL, CLIENT, PRODUCER
envelope.name = "Microsoft.ApplicationInsights.RemoteDependency"
Expand Down Expand Up @@ -240,7 +249,12 @@ def _convert_span_to_envelope(span: Span) -> TelemetryItem:
port != _get_default_port_db(span.attributes.get(SpanAttributes.DB_SYSTEM)):
target = "{}:{}".format(target, port)
if span.kind is SpanKind.CLIENT:
if SpanAttributes.HTTP_METHOD in span.attributes: # HTTP
if "az.namespace" in span.attributes: # Azure specific resources
# Currently only eventhub and servicebus are supported
# https://github.com/Azure/azure-sdk-for-python/issues/9256
data.type = span.attributes["az.namespace"]
data.target = _get_azure_sdk_target_source(span.attributes)
elif SpanAttributes.HTTP_METHOD in span.attributes: # HTTP
data.type = "HTTP"
if SpanAttributes.HTTP_USER_AGENT in span.attributes:
# TODO: Not exposed in Swagger, need to update def
Expand Down Expand Up @@ -319,10 +333,18 @@ def _convert_span_to_envelope(span: Span) -> TelemetryItem:
data.result_code = str(status_code)
elif SpanAttributes.DB_SYSTEM in span.attributes: # Database
db_system = span.attributes[SpanAttributes.DB_SYSTEM]
if not _is_sql_db(db_system):
data.type = db_system
else:
if db_system == DbSystemValues.MYSQL.value:
data.type = "mysql"
elif db_system == DbSystemValues.POSTGRESQL.value:
data.type = "postgresql"
elif db_system == DbSystemValues.MONGODB.value:
data.type = "mongodb"
elif db_system == DbSystemValues.REDIS.value:
data.type = "redis"
elif _is_sql_db(db_system):
data.type = "SQL"
else:
data.type = db_system
# data is the full statement or operation
if SpanAttributes.DB_STATEMENT in span.attributes:
data.data = span.attributes[SpanAttributes.DB_STATEMENT]
Expand All @@ -337,22 +359,38 @@ def _convert_span_to_envelope(span: Span) -> TelemetryItem:
target = "{}|{}".format(target, db_name)
if target is None:
target = db_system
elif SpanAttributes.MESSAGING_SYSTEM in span.attributes: # Messaging
data.type = span.attributes[SpanAttributes.MESSAGING_SYSTEM]
if target is None:
if SpanAttributes.MESSAGING_DESTINATION in span.attributes:
target = span.attributes[SpanAttributes.MESSAGING_DESTINATION]
else:
target = span.attributes[SpanAttributes.MESSAGING_SYSTEM]
elif SpanAttributes.RPC_SYSTEM in span.attributes: # Rpc
data.type = SpanAttributes.RPC_SYSTEM
# TODO: data.data for rpc
if target is None:
target = span.attributes[SpanAttributes.RPC_SYSTEM]
else:
# TODO: Azure specific types
data.type = "N/A"
elif span.kind is SpanKind.PRODUCER: # Messaging
data.type = "Queue Message"
# TODO: data.data for messaging
# TODO: Special logic for data.target for messaging?
# Currently only eventhub and servicebus are supported that produce PRODUCER spans
if "az.namespace" in span.attributes:
data.type = "Queue Message | {}".format(span.attributes["az.namespace"])
data.target = _get_azure_sdk_target_source(span.attributes)
else:
data.type = "Queue Message"
msg_system = span.attributes.get(SpanAttributes.MESSAGING_SYSTEM)
if msg_system:
data.type += " | {}".format(msg_system)
if target is None:
if SpanAttributes.MESSAGING_DESTINATION in span.attributes:
target = span.attributes[SpanAttributes.MESSAGING_DESTINATION]
else:
target = msg_system
else: # SpanKind.INTERNAL
if span.parent:
data.type = "InProc"
data.success = True
data.type = "InProc"
if "az.namespace" in span.attributes:
data.type += " | {}".format(span.attributes["az.namespace"])
# Apply truncation
if data.result_code:
data.result_code = data.result_code[:1024]
Expand Down Expand Up @@ -436,7 +474,7 @@ def _convert_span_events_to_envelopes(span: Span) -> Sequence[TelemetryItem]:
return envelopes

# pylint:disable=too-many-return-statements
def _get_default_port_db(dbsystem):
def _get_default_port_db(dbsystem: str) -> int:
if dbsystem == DbSystemValues.POSTGRESQL.value:
return 5432
if dbsystem == DbSystemValues.CASSANDRA.value:
Expand All @@ -461,15 +499,15 @@ def _get_default_port_db(dbsystem):
return 0


def _get_default_port_http(scheme):
def _get_default_port_http(scheme: str) -> int:
if scheme == "http":
return 80
if scheme == "https":
return 443
return 0


def _is_sql_db(dbsystem):
def _is_sql_db(dbsystem: str) -> bool:
return dbsystem in (
DbSystemValues.DB2.value,
DbSystemValues.DERBY.value,
Expand All @@ -483,6 +521,14 @@ def _is_sql_db(dbsystem):
)


def _get_azure_sdk_target_source(attributes: Attributes) -> Optional[str]:
# Currently logic only works for ServiceBus and EventHub
peer_address = attributes.get("peer.address")
destination = attributes.get("message_bus.destination")
if peer_address and destination:
return peer_address + "/" + destination


def _get_trace_export_result(result: ExportResult) -> SpanExportResult:
if result == ExportResult.SUCCESS:
return SpanExportResult.SUCCESS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
)
get_log_emitter_provider().add_log_processor(BatchLogProcessor(exporter))

# Attach OTel handler to namespaced logger
# Attach LoggingHandler to namespaced logger
handler = LoggingHandler()
logger = logging.getLogger(__name__)
logger.addHandler(handler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
)
get_log_emitter_provider().add_log_processor(BatchLogProcessor(exporter))

# Attach OTel handler to namespaced logger
# Attach LoggingHandler to namespaced logger
handler = LoggingHandler()
logger = logging.getLogger(__name__)
logger.addHandler(handler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
)
get_log_emitter_provider().add_log_processor(BatchLogProcessor(exporter))

# Attach OTel handler to namespaced logger
# Attach LoggingHandler to namespaced logger
handler = LoggingHandler()
logger = logging.getLogger(__name__)
logger.addHandler(handler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
)
get_log_emitter_provider().add_log_processor(BatchLogProcessor(exporter))

# Attach OTel handler to namespaced logger
# Attach LoggingHandler to namespaced logger
handler = LoggingHandler()
logger = logging.getLogger(__name__)
logger.addHandler(handler)
Expand Down
Loading

0 comments on commit 54ef18d

Please sign in to comment.