diff --git a/google/cloud/pubsub_v1/open_telemetry/context_propagation.py b/google/cloud/pubsub_v1/open_telemetry/context_propagation.py index 37fad3e20..bfa1aa638 100644 --- a/google/cloud/pubsub_v1/open_telemetry/context_propagation.py +++ b/google/cloud/pubsub_v1/open_telemetry/context_propagation.py @@ -12,7 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from opentelemetry.propagators.textmap import Setter +from typing import Optional, List + +from opentelemetry.propagators.textmap import Setter, Getter from google.pubsub_v1 import PubsubMessage @@ -37,3 +39,17 @@ def set(self, carrier: PubsubMessage, key: str, value: str) -> None: None """ carrier.attributes["googclient_" + key] = value + + +class OpenTelemetryContextGetter(Getter): + """ + Used by Open Telemetry for context propagation. + """ + + def get(self, carrier: PubsubMessage, key: str) -> Optional[List[str]]: + if ("googclient_" + key) not in carrier.attributes: + return None + return [carrier.attributes["googclient_" + key]] + + def keys(self, carrier: PubsubMessage) -> List[str]: + return list(map(str, carrier.attributes.keys())) diff --git a/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py b/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py index 3f0d65dce..7c0ebfd83 100644 --- a/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py +++ b/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py @@ -30,6 +30,10 @@ from google.cloud.pubsub_v1.subscriber import futures from google.pubsub_v1.services.subscriber import client as subscriber_client from google.pubsub_v1.services.subscriber.transports.grpc import SubscriberGrpcTransport +from google.cloud.pubsub_v1.open_telemetry.context_propagation import ( + OpenTelemetryContextGetter, +) +from google.pubsub_v1.types import PubsubMessage def test_init_default_client_info(creds): @@ -341,3 +345,24 @@ def test_opentelemetry_subscriber_setting(creds, enable_open_telemetry): ): client = subscriber.Client(credentials=creds, subscriber_options=options) assert client._open_telemetry_enabled is False + + +def test_opentelemetry_propagator_get(): + message = PubsubMessage(data=b"foo") + message.attributes["key1"] = "value1" + message.attributes["googclient_key2"] = "value2" + + assert OpenTelemetryContextGetter().get(message, "key2") == ["value2"] + + assert OpenTelemetryContextGetter().get(message, "key1") is None + + +def test_opentelemetry_propagator_keys(): + message = PubsubMessage(data=b"foo") + message.attributes["key1"] = "value1" + message.attributes["googclient_key2"] = "value2" + + assert sorted(OpenTelemetryContextGetter().keys(message)) == [ + "googclient_key2", + "key1", + ]