Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Confluent Schema Registry in Kafka Receiver #28745

Closed
muaft opened this issue Oct 30, 2023 · 6 comments
Closed

Support for Confluent Schema Registry in Kafka Receiver #28745

muaft opened this issue Oct 30, 2023 · 6 comments

Comments

@muaft
Copy link

muaft commented Oct 30, 2023

Is your feature request related to a problem? Please describe.
We are using the Kafka Receiver to read metrics data from a Confluent Cloud Kafka topic. The topic is schema validated and integrated with Confluent Cloud Schema Registry. The Kafka topic is using the official Protobuf metrics schema. However, I get the error illegal tag 0 (wire type 0) when I start OTEL collector:

2023-10-30T13:09:44.781+0100 info [email protected]/telemetry.go:84 Setting up own telemetry...
2023-10-30T13:09:44.781+0100 info [email protected]/telemetry.go:201 Serving Prometheus metrics {"address": ":8888", "level": "Basic"}
2023-10-30T13:09:44.781+0100 info [email protected]/exporter.go:275 Development component. May change in the future. {"kind": "exporter", "data_type": "metrics", "name": "debug"}
2023-10-30T13:09:45.039+0100 info [email protected]/service.go:143 Starting otelcol... {"Version": "0.87.0", "NumCPU": 16}
2023-10-30T13:09:45.039+0100 info extensions/extensions.go:33 Starting extensions...
2023-10-30T13:09:47.719+0100 info [email protected]/service.go:169 Everything is ready. Begin running and processing data.
2023-10-30T13:09:48.006+0100 info [email protected]/kafka_receiver.go:561 Starting consumer group {"kind": "receiver", "name": "kafka", "data_type": "metrics", "partition": 0}
2023-10-30T13:10:02.795+0100 error [email protected]/kafka_receiver.go:588 failed to unmarshal message {"kind": "receiver", "name": "kafka", "data_type": "metrics", "error": "proto: MetricsData: illegal tag 0 (wire type 0)"}
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver.(*metricsConsumerGroupHandler).ConsumeClaim
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/[email protected]/kafka_receiver.go:588
github.com/IBM/sarama.(*consumerGroupSession).consume
github.com/IBM/[email protected]/consumer_group.go:949
github.com/IBM/sarama.newConsumerGroupSession.func2
github.com/IBM/[email protected]/consumer_group.go:874
2023-10-30T13:10:03.570+0100 info [email protected]/kafka_receiver.go:561 Starting consumer group {"kind": "receiver", "name": "kafka", "data_type": "metrics", "partition": 0}

It seems the error is encountered due to extra bytes appended by Confluent Schema Registry to the beginning of the Protobuf message as describe here. OTEL collector doesn't expect the extra bytes and, therefore, throws the error

Describe the solution you'd like
Add official support for Confluent Schema Registry in Kafka Receiver.

Describe alternatives you've considered
Currently, we are reading data from the source topic, discard the extra 6 bytes at the beginning, and write remaining bytes to a new topic. OTEL collector can then successfully read from the new topic, but this is temporary workaround.

Additional context
Here is the OTEL config

receivers:
  kafka:
    protocol_version: 3.6.0
    brokers: ["$KAFKA_BOOTSTRAP_SERVERS"]
    topic: "$KAFKA_TOPIC"
    encoding: otlp_proto
    group_id: "$KAFKA_CONSUMER_GROUP_ID"
    initial_offset: latest
    auth:
      sasl:
        username: "$KAFKA_SASL_USERNAME"
        password: "$KAFKA_SASL_PASSWORD"
        mechanism: PLAIN
      tls:
        insecure: false
        insecure_skip_verify: true
    metadata:
      full: true
      retry:
        max: 3
        backoff: 250ms
    autocommit:
      enable: true
      interval: 1s
    header_extraction:
      extract_headers: false

processors:
  batch:

exporters:
  debug:
    verbosity: detailed

service:
  pipelines:
    metrics:
      receivers: [kafka]
      processors: [batch]
      exporters: [debug]
@mx-psi mx-psi transferred this issue from open-telemetry/opentelemetry-collector Oct 30, 2023
@crobert-1 crobert-1 added receiver/kafka bug Something isn't working labels Nov 1, 2023
Copy link
Contributor

github-actions bot commented Nov 1, 2023

Pinging code owners for receiver/kafka: @pavolloffay @MovieStoreGuy. See Adding Labels via Comments if you do not have permissions to add labels yourself.

@crobert-1
Copy link
Member

Hello @muaft, I apologize for the delayed response here, but I believe this is a valid request.

It should be pretty simple to implement this. We'd want to add a new option for the Encoding config option, and then add a new unmarshaller. This new unmarshaller could essentially just take the message.Value from here and call the default underlying otlp_proto unmarshaller on the contents of message.Value without the pre-pended magic byte and schema ID.

@crobert-1 crobert-1 added enhancement New feature or request and removed bug Something isn't working needs triage New item requiring triage labels Dec 9, 2023
Copy link
Contributor

github-actions bot commented Feb 7, 2024

This issue has been inactive for 60 days. It will be closed in 60 days if there is no activity. To ping code owners by adding a component label, see Adding Labels via Comments, or if you are unsure of which component this issue relates to, please ping @open-telemetry/collector-contrib-triagers. If this issue is still relevant, please ping the code owners or leave a comment explaining why it is still relevant. Otherwise, please close it.

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

@github-actions github-actions bot added the Stale label Feb 7, 2024
@crobert-1 crobert-1 removed the Stale label Feb 7, 2024
Copy link
Contributor

github-actions bot commented Apr 8, 2024

This issue has been inactive for 60 days. It will be closed in 60 days if there is no activity. To ping code owners by adding a component label, see Adding Labels via Comments, or if you are unsure of which component this issue relates to, please ping @open-telemetry/collector-contrib-triagers. If this issue is still relevant, please ping the code owners or leave a comment explaining why it is still relevant. Otherwise, please close it.

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

@github-actions github-actions bot added the Stale label Apr 8, 2024
@crobert-1 crobert-1 removed the Stale label Apr 8, 2024
Copy link
Contributor

This issue has been inactive for 60 days. It will be closed in 60 days if there is no activity. To ping code owners by adding a component label, see Adding Labels via Comments, or if you are unsure of which component this issue relates to, please ping @open-telemetry/collector-contrib-triagers. If this issue is still relevant, please ping the code owners or leave a comment explaining why it is still relevant. Otherwise, please close it.

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

@github-actions github-actions bot added the Stale label Jun 10, 2024
Copy link
Contributor

github-actions bot commented Aug 9, 2024

This issue has been closed as inactive because it has been stale for 120 days with no activity.

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Aug 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants