Skip to content

Commit

Permalink
Changed to the propagator API
Browse files Browse the repository at this point in the history
Adding a UnifiedContext, composing DistributedContext and SpanContext.
This will enable propagators to extract and inject values from either system,
enabling more sophisticated schemes and standards to propagate data.

This also removes the need for generics and propagators that only
consume one or the other, requiring integrators to do extra work to
wire propagators appropriately.

Modifying the API of the propagators to consume the context as a mutable
argument. By passing in the context rather than returning, this enables the
chained use of propagators, allowing for situations such as supporting
multiple trace propagation standards simulatenously.
  • Loading branch information
toumorokoshi committed Sep 2, 2019
1 parent 4aea780 commit a8dc9b3
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 78 deletions.
17 changes: 7 additions & 10 deletions opentelemetry-api/src/opentelemetry/context/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,22 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""
The OpenTelemetry context module provides abstraction layer on top of
thread-local storage and contextvars. The long term direction is to switch to
contextvars provided by the Python runtime library.
A global object ``Context`` is provided to access all the context related
functionalities::
functionalities:
>>> from opentelemetry.context import Context
>>> Context.foo = 1
>>> Context.foo = 2
>>> Context.foo
2
When explicit thread is used, a helper function
``Context.with_current_context`` can be used to carry the context across
threads::
When explicit thread is used, a helper function `Context.with_current_context`
can be used to carry the context across threads:
from threading import Thread
from opentelemetry.context import Context
Expand Down Expand Up @@ -62,7 +59,7 @@ def work(name):
print('Main thread:', Context)
Here goes another example using thread pool::
Here goes another example using thread pool:
import time
import threading
Expand Down Expand Up @@ -97,7 +94,7 @@ def work(name):
pool.join()
println('Main thread: {}'.format(Context))
Here goes a simple demo of how async could work in Python 3.7+::
Here goes a simple demo of how async could work in Python 3.7+:
import asyncio
Expand Down Expand Up @@ -141,9 +138,9 @@ async def main():
import typing

from .base_context import BaseRuntimeContext
from .unified_context import UnifiedContext

__all__ = ["Context"]

__all__ = ["Context", "UnifiedContext"]

Context = None # type: typing.Optional[BaseRuntimeContext]

Expand Down
6 changes: 2 additions & 4 deletions opentelemetry-api/src/opentelemetry/context/base_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def set(self, value: "object") -> None:
raise NotImplementedError

_lock = threading.Lock()
_slots = {} # type: typing.Dict[str, 'BaseRuntimeContext.Slot']
_slots: typing.Dict[str, Slot] = {}

@classmethod
def clear(cls) -> None:
Expand All @@ -48,9 +48,7 @@ def clear(cls) -> None:
slot.clear()

@classmethod
def register_slot(
cls, name: str, default: "object" = None
) -> "BaseRuntimeContext.Slot":
def register_slot(cls, name: str, default: "object" = None) -> "Slot":
"""Register a context slot with an optional default value.
:type name: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
# limitations under the License.

import abc
import typing

from opentelemetry.trace import SpanContext
from opentelemetry.context import UnifiedContext


class BinaryFormat(abc.ABC):
Expand All @@ -27,14 +26,14 @@ class BinaryFormat(abc.ABC):

@staticmethod
@abc.abstractmethod
def to_bytes(context: SpanContext) -> bytes:
def to_bytes(context: UnifiedContext) -> bytes:
"""Creates a byte representation of a SpanContext.
to_bytes should read values from a SpanContext and return a data
format to represent it, in bytes.
Args:
context: the SpanContext to serialize
context: the SpanContext to serialize.
Returns:
A bytes representation of the SpanContext.
Expand All @@ -43,15 +42,17 @@ def to_bytes(context: SpanContext) -> bytes:

@staticmethod
@abc.abstractmethod
def from_bytes(byte_representation: bytes) -> typing.Optional[SpanContext]:
"""Return a SpanContext that was represented by bytes.
def from_bytes(context: UnifiedContext,
byte_representation: bytes) -> None:
"""Populate UnifiedContext that was represented by bytes.
from_bytes should return back a SpanContext that was constructed from
the data serialized in the byte_representation passed. If it is not
from_bytes should populated UnifiedContext with data that was
serialized in the byte_representation passed. If it is not
possible to read in a proper SpanContext, return None.
Args:
byte_representation: the bytes to deserialize
context: The UnifiedContext to populate.
byte_representation: the bytes to deserialize.
Returns:
A bytes representation of the SpanContext if it is valid.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import abc
import typing

from opentelemetry.context import UnifiedContext
from opentelemetry.trace import SpanContext

Setter = typing.Callable[[object, str, str], None]
Expand All @@ -35,11 +36,12 @@ class HTTPTextFormat(abc.ABC):
import flask
import requests
from opentelemetry.context.propagation import HTTPTextFormat
from opentelemetry.trace import tracer
from opentelemetry.context import UnifiedContext
PROPAGATOR = HTTPTextFormat()
def get_header_from_flask_request(request, key):
return request.headers.get_all(key)
Expand All @@ -48,15 +50,17 @@ def set_header_into_requests_request(request: requests.Request,
request.headers[key] = value
def example_route():
span_context = PROPAGATOR.extract(
get_header_from_flask_request,
span = tracer().create_span("")
context = UnifiedContext.create(span)
PROPAGATOR.extract(
context, get_header_from_flask_request,
flask.request
)
request_to_downstream = requests.Request(
"GET", "http://httpbin.org/get"
)
PROPAGATOR.inject(
span_context,
context,
set_header_into_requests_request,
request_to_downstream
)
Expand All @@ -70,15 +74,20 @@ def example_route():

@abc.abstractmethod
def extract(
self, get_from_carrier: Getter, carrier: object
) -> SpanContext:
"""Create a SpanContext from values in the carrier.
self,
context: UnifiedContext,
get_from_carrier: Getter,
carrier: object,
) -> None:
"""Extract values from the carrier into the context.
The extract function should retrieve values from the carrier
object using get_from_carrier, and use values to populate a
SpanContext value and return it.
object using get_from_carrier, and use values to populate
attributes of the UnifiedContext passed in.
Args:
context: A UnifiedContext instance that will be
populated with values from the carrier.
get_from_carrier: a function that can retrieve zero
or more values from the carrier. In the case that
the value does not exist, return an empty list.
Expand Down
65 changes: 65 additions & 0 deletions opentelemetry-api/src/opentelemetry/context/unified_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Copyright 2019, OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from opentelemetry.distributedcontext import DistributedContext
from opentelemetry.trace import SpanContext


class UnifiedContext:
"""A unified context object that contains all context relevant to
telemetry.
The UnifiedContext is a single object that composes all contexts that
are needed by the various forms of telemetry. It is intended to be an
object that can be passed as the argument to any component that needs
to read or modify content values (such as propagators). By unifying
all context in a composed data structure, it expands the flexibility
of the APIs that modify it.
As it is designed to carry context specific to all telemetry use
cases, it's schema is explicit. Note that this is not intended to
be an object that acts as a singleton that returns different results
based on the thread or coroutine of execution. For that, see `Context`.
Args:
distributed: The DistributedContext for this instance.
span: The SpanContext for this instance.
"""
__slots__ = ["distributed", "span"]

def __init__(self, distributed: DistributedContext, span: SpanContext):
self.distributed = distributed
self.span = span

@staticmethod
def create(span: SpanContext) -> "UnifiedContext":
"""Create an unpopulated UnifiedContext object.
Example:
from opentelemetry.trace import tracer
span = tracer.create_span("")
context = UnifiedContext.create(span)
Args:
parent_span: the parent SpanContext that will be the
parent of the span in the UnifiedContext.
"""
return UnifiedContext(DistributedContext(), span)

def __repr__(self) -> str:
return "{}(distributed={}, span={})".format(
type(self).__name__, repr(self.distributed), repr(self.span))
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class B3Format(HTTPTextFormat):
_SAMPLE_PROPAGATE_VALUES = set(["1", "True", "true", "d"])

@classmethod
def extract(cls, get_from_carrier, carrier):
def extract(cls, context, get_from_carrier, carrier):
trace_id = format_trace_id(trace.INVALID_TRACE_ID)
span_id = format_span_id(trace.INVALID_SPAN_ID)
sampled = 0
Expand All @@ -57,7 +57,7 @@ def extract(cls, get_from_carrier, carrier):
elif len(fields) == 4:
trace_id, span_id, sampled, _parent_span_id = fields
else:
return trace.INVALID_SPAN_CONTEXT
return
else:
trace_id = (
_extract_first_element(
Expand Down Expand Up @@ -92,7 +92,7 @@ def extract(cls, get_from_carrier, carrier):
if sampled in cls._SAMPLE_PROPAGATE_VALUES or flags == "1":
options |= trace.TraceOptions.RECORDED

return trace.SpanContext(
context.span = trace.SpanContext(
# trace an span ids are encoded in hex, so must be converted
trace_id=int(trace_id, 16),
span_id=int(span_id, 16),
Expand Down
Loading

0 comments on commit a8dc9b3

Please sign in to comment.