Skip to content

Commit

Permalink
fix: safe kafka partition extraction (#872)
Browse files Browse the repository at this point in the history
* safe partition extraction

* update changelog
  • Loading branch information
nozik authored Jan 28, 2022
1 parent e53c1da commit ef7769c
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 20 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- `opentelemetry-instrumentation-sqlite3` Instrumentation now works with `dbapi2.connect`

- `opentelemetry-instrumentation-kafka` Kafka: safe kafka partition extraction
([#872](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/872))

## [1.8.0-0.27b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.8.0-0.27b0) - 2021-12-17

### Added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,27 +55,36 @@ def extract_send_headers(args, kwargs):
@staticmethod
def extract_send_partition(instance, args, kwargs):
"""extract partition `send` method arguments, using the `_partition` method in KafkaProducer class"""
topic = KafkaPropertiesExtractor.extract_send_topic(args)
key = KafkaPropertiesExtractor.extract_send_key(args, kwargs)
value = KafkaPropertiesExtractor.extract_send_value(args, kwargs)
partition = KafkaPropertiesExtractor._extract_argument(
"partition", 4, None, args, kwargs
)
key_bytes = instance._serialize(
instance.config["key_serializer"], topic, key
)
value_bytes = instance._serialize(
instance.config["value_serializer"], topic, value
)
valid_types = (bytes, bytearray, memoryview, type(None))
if (
type(key_bytes) not in valid_types
or type(value_bytes) not in valid_types
):
try:
topic = KafkaPropertiesExtractor.extract_send_topic(args)
key = KafkaPropertiesExtractor.extract_send_key(args, kwargs)
value = KafkaPropertiesExtractor.extract_send_value(args, kwargs)
partition = KafkaPropertiesExtractor._extract_argument(
"partition", 4, None, args, kwargs
)
key_bytes = instance._serialize(
instance.config["key_serializer"], topic, key
)
value_bytes = instance._serialize(
instance.config["value_serializer"], topic, value
)
valid_types = (bytes, bytearray, memoryview, type(None))
if (
type(key_bytes) not in valid_types
or type(value_bytes) not in valid_types
):
return None

all_partitions = instance._metadata.partitions_for_topic(topic)
if all_partitions is None or len(all_partitions) == 0:
return None

return instance._partition(
topic, partition, key, value, key_bytes, value_bytes
)
except Exception as exception: # pylint: disable=W0703
_LOG.debug("Unable to extract partition: %s", exception)
return None
return instance._partition(
topic, partition, key, value, key_bytes, value_bytes
)


ProduceHookT = Optional[Callable[[Span, List, Dict], None]]
Expand Down

0 comments on commit ef7769c

Please sign in to comment.