Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kinesis exporter failures after enabling compression #30108

Closed
preetsarb opened this issue Dec 19, 2023 · 6 comments
Closed

Kinesis exporter failures after enabling compression #30108

preetsarb opened this issue Dec 19, 2023 · 6 comments
Labels
bug Something isn't working exporter/awskinesis

Comments

@preetsarb
Copy link

preetsarb commented Dec 19, 2023

Component(s)

exporter/awskinesis

What happened?

Description

After enabling compression using the following configuration, kinesis exporter errors out with index out of range runtime error and results in termination of collector agent.

encoding:
  compression: zlib

Steps to Reproduce

Use the collector config provided below

Expected Result

Traces should be exported to aws kinesis stream with compressed data

Actual Result

Kinesis exporter fails and shuts down the collector agent

Collector version

v0.91.0

Environment information

Environment

OS: MacOS 14.1
Compiler(if manually compiled): N/A

OpenTelemetry Collector configuration

receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 127.0.0.1:4317
processors:
  batch/traces:
    timeout: 1s
    send_batch_size: 50
    send_batch_max_size: 50
exporters:
  awskinesis:
    aws:
      stream_name: "<stream>"
      region: "<region>"
    encoding:
      name: otlp_proto
      compression: zlib
service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [batch/traces]
      exporters: [awskinesis]

  telemetry:
      logs:
        level: debug

Log output

2023-12-19T12:26:33.318-0800	INFO	traces/traces.go:48	stopping the exporter
2023-12-19T12:26:33.318-0800	info	zapgrpc/zapgrpc.go:178	[transport] [server-transport 0xc002882340] Closing: EOF	{"grpc_log": true}
2023-12-19T12:26:33.318-0800	info	zapgrpc/zapgrpc.go:178	[transport] [server-transport 0xc002882340] loopyWriter exiting with error: transport closed by client	{"grpc_log": true}
panic: runtime error: index out of range [35] with length 6

goroutine 99 [running]:
compress/flate.byLiteral.Less(...)
	compress/flate/huffman_code.go:320
sort.partition({0x9719f50, 0xc00ee2e078}, 0x21, 0x38, 0xc00ee2e078?)
	sort/zsortinterface.go:154 +0x176
sort.pdqsort({0x9719f50, 0xc00ee2e078}, 0x0?, 0x40?, 0x8500000045?)
	sort/zsortinterface.go:114 +0x22b
sort.pdqsort({0x9719f50, 0xc00ee2e078}, 0xfb000000e7?, 0xda?, 0x8aa6dd2?)
	sort/zsortinterface.go:125 +0x2ca
sort.Sort({0x9719f50, 0xc00ee2e078})
	sort/sort.go:51 +0x54
compress/flate.(*byLiteral).sort(...)
	compress/flate/huffman_code.go:314
compress/flate.(*huffmanEncoder).assignEncodingAndSize(0xc00ee2e000, {0xc00ee2e030, 0x10, 0xc002a1f228?}, {0xc00ee54000, 0xc002a1f238?, 0x11f})
	compress/flate/huffman_code.go:259 +0xcd
compress/flate.(*huffmanEncoder).generate(0xc00ee2e000, {0xc00ee28000?, 0xc00ee34000?, 0x40?}, 0xe56c6c0?)
	compress/flate/huffman_code.go:307 +0x1a7
compress/flate.(*huffmanBitWriter).indexTokens(0xc00ee32000, {0xc00f26c000, 0x564, 0x8?})
	compress/flate/huffman_bit_writer.go:567 +0x245
compress/flate.(*huffmanBitWriter).writeBlockDynamic(0xc00ee32000, {0xc00f26c000?, 0x1df3b26?, 0x12d987d?}, 0x0?, {0xc00ee34000, 0x1a64, 0xffff})
	compress/flate/huffman_bit_writer.go:506 +0xaf
compress/flate.(*compressor).encSpeed(0xc00f0c4000)
	compress/flate/deflate.go:362 +0x21b
compress/flate.(*compressor).syncFlush(0xc00f0c4000)
	compress/flate/deflate.go:558 +0x3b
compress/flate.(*Writer).Flush(...)
	compress/flate/deflate.go:725
compress/zlib.(*Writer).Flush(0xc0028d1da0)
	compress/zlib/writer.go:171 +0x65
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awskinesisexporter/internal/compress.(*compressor).Do(0xc00296a7f0, {0xc002c3e000, 0x1a64, 0x1a64})
	github.com/open-telemetry/opentelemetry-collector-contrib/exporter/[email protected]/internal/compress/compresser.go:73 +0x91
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awskinesisexporter/internal/batch.(*Batch).AddRecord(0xc002c198c0, {0xc002c3e000?, 0xc002c15d1c?, 0xc002c15d1c?}, {0xc002b160c0, 0x24})
	github.com/open-telemetry/opentelemetry-collector-contrib/exporter/[email protected]/internal/batch/batch.go:81 +0x4b
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awskinesisexporter/internal/batch.(*batchMarshaller).Traces(0xc002970e10, {0xc00280d6e0?, 0xc0020c1220?})
	github.com/open-telemetry/opentelemetry-collector-contrib/exporter/[email protected]/internal/batch/encode_marshaler.go:85 +0x27e
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awskinesisexporter.Exporter.consumeTraces({{0x96f0990?, 0xc0029633c0?}, {0x9714fa0?, 0xc002970e10?}}, {0x9734e68, 0xc002b2e3f0}, {0xc00280d6e0?, 0xc0020c1220?})
	github.com/open-telemetry/opentelemetry-collector-contrib/exporter/[email protected]/exporter.go:125 +0x4f
go.opentelemetry.io/collector/exporter/exporterhelper.(*tracesRequest).Export(0x9734dc0?, {0x9734e68?, 0xc002b2e3f0?})
	go.opentelemetry.io/collector/[email protected]/exporterhelper/traces.go:58 +0x31
go.opentelemetry.io/collector/exporter/exporterhelper.(*timeoutSender).send(0xc002b2c800?, {0x9734dc0?, 0xc002b12c60?}, {0x96ed600?, 0xc00280d6f8?})
	go.opentelemetry.io/collector/[email protected]/exporterhelper/timeout_sender.go:38 +0x98
go.opentelemetry.io/collector/exporter/exporterhelper.(*retrySender).send(0xc0029275e0, {0x9734dc0, 0xc002b12c60}, {0x96ed600, 0xc00280d6f8})
	go.opentelemetry.io/collector/[email protected]/exporterhelper/retry_sender.go:119 +0x41f
go.opentelemetry.io/collector/exporter/exporterhelper.(*tracesExporterWithObservability).send(0xc00295dfb0, {0x9735390?, 0xc002821cd0?}, {0x96ed600?, 0xc00280d6f8?})
	go.opentelemetry.io/collector/[email protected]/exporterhelper/traces.go:171 +0x7e
go.opentelemetry.io/collector/exporter/exporterhelper.(*queueSender).consume(0xc00293bba0, {0x9735390, 0xc002821cd0}, {0x96ed600, 0xc00280d6f8})
	go.opentelemetry.io/collector/[email protected]/exporterhelper/queue_sender.go:115 +0x56
go.opentelemetry.io/collector/exporter/exporterhelper/internal.(*boundedMemoryQueue[...]).Consume(...)
	go.opentelemetry.io/collector/[email protected]/exporterhelper/internal/bounded_memory_queue.go:55
go.opentelemetry.io/collector/exporter/exporterhelper/internal.(*QueueConsumers[...]).Start.func1()
	go.opentelemetry.io/collector/[email protected]/exporterhelper/internal/consumers.go:43 +0x79
created by go.opentelemetry.io/collector/exporter/exporterhelper/internal.(*QueueConsumers[...]).Start in goroutine 1
	go.opentelemetry.io/collector/[email protected]/exporterhelper/internal/consumers.go:39 +0x7d

Additional context

Note: This is happening for all the supported compression types (flate, zlib and gzip)

Sample traces were generated using the telemetrygen utility.
telemetrygen traces --otlp-insecure --traces 100

This might be relate to using a shared compression writer for multiple co-routines.
golang/go#40008 (comment)

The issue don't surface when the number of consumers for sending queue is limited to 1.

...
exporters:
  awskinesis:
    aws:
      stream_name: "<stream>"
      region: "<region>"
    encoding:
      name: otlp_proto
      compression: zlib
    sending_queue:
      num_consumers: 1
...
@preetsarb preetsarb added bug Something isn't working needs triage New item requiring triage labels Dec 19, 2023
Copy link
Contributor

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

@atoulme atoulme removed the needs triage New item requiring triage label Dec 21, 2023
Copy link
Contributor

This issue has been inactive for 60 days. It will be closed in 60 days if there is no activity. To ping code owners by adding a component label, see Adding Labels via Comments, or if you are unsure of which component this issue relates to, please ping @open-telemetry/collector-contrib-triagers. If this issue is still relevant, please ping the code owners or leave a comment explaining why it is still relevant. Otherwise, please close it.

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

@jordankay13
Copy link

We are experiencing the same thing with the awskinesisexporter, except we noticed the problem on the kinesis stream consumer side.

We also came to the same conclusion that the problem was the shared writer. The awss3exporter creates a new writer each time -- that would "solve" this in an inefficient way:
https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/awss3exporter/s3_writer.go#L97

Http compression was doing that at one time but it was updated to make use of sync.Pool:
open-telemetry/opentelemetry-collector#7859

For searchability, these are exceptions from a .NET consumer of the output of the awskinesisexporter:

Exception: System.IO.InvalidDataException: The archive entry was compressed using an unsupported compression method.
   at System.IO.Compression.Inflater.Inflate(FlushCode flushCode)
   at System.IO.Compression.Inflater.ReadInflateOutput(Byte* bufPtr, Int32 length, FlushCode flushCode, Int32& bytesRead)
   at System.IO.Compression.Inflater.ReadOutput(Byte* bufPtr, Int32 length, Int32& bytesRead)
   at System.IO.Compression.Inflater.InflateVerified(Byte* bufPtr, Int32 length)
   at System.IO.Compression.DeflateStream.CopyToStream.Write(Byte[] buffer, Int32 offset, Int32 count)
   at System.IO.Compression.DeflateStream.CopyToStream.CopyFromSourceToDestination()
   at System.IO.Compression.DeflateStream.CopyTo(Stream destination, Int32 bufferSize)
   at <our application code>

and

Exception: Google.Protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field.  This could mean either that the input has been truncated or that an embedded message misreported its own length.
   at Google.Protobuf.SegmentedBufferHelper.RefillFromCodedInputStream(ReadOnlySpan`1& buffer, ParserInternalState& state, Boolean mustSucceed)
   at Google.Protobuf.ParsingPrimitives.ReadRawBytesIntoSpan(ReadOnlySpan`1& buffer, ParserInternalState& state, Int32 length, Span`1 byteSpan)
   at Google.Protobuf.ParsingPrimitives.ReadRawBytesSlow(ReadOnlySpan`1& buffer, ParserInternalState& state, Int32 size)
   at Google.Protobuf.ParsingPrimitives.ReadRawBytes(ReadOnlySpan`1& buffer, ParserInternalState& state, Int32 size)
   at OpenTelemetry.Proto.Trace.V1.Span.pb::Google.Protobuf.IBufferMessage.InternalMergeFrom(ParseContext& input) 
   at Google.Protobuf.ParsingPrimitivesMessages.ReadMessage(ParseContext& ctx, IMessage message)
   at Google.Protobuf.FieldCodec.<>c__DisplayClass32_0`1.<ForMessage>b__0(ParseContext& ctx)
   at Google.Protobuf.Collections.RepeatedField`1.AddEntriesFrom(ParseContext& ctx, FieldCodec`1 codec)
   at OpenTelemetry.Proto.Trace.V1.ScopeSpans.pb::Google.Protobuf.IBufferMessage.InternalMergeFrom(ParseContext& input)
   at Google.Protobuf.ParsingPrimitivesMessages.ReadMessage(ParseContext& ctx, IMessage message)
   at Google.Protobuf.FieldCodec.<>c__DisplayClass32_0`1.<ForMessage>b__0(ParseContext& ctx)
   at Google.Protobuf.Collections.RepeatedField`1.AddEntriesFrom(ParseContext& ctx, FieldCodec`1 codec)
   at OpenTelemetry.Proto.Trace.V1.ResourceSpans.pb::Google.Protobuf.IBufferMessage.InternalMergeFrom(ParseContext& input)
   at Google.Protobuf.ParsingPrimitivesMessages.ReadMessage(ParseContext& ctx, IMessage message)
   at Google.Protobuf.FieldCodec.<>c__DisplayClass32_0`1.<ForMessage>b__0(ParseContext& ctx)
   at Google.Protobuf.Collections.RepeatedField`1.AddEntriesFrom(ParseContext& ctx, FieldCodec`1 codec)
   at OpenTelemetry.Proto.Collector.Trace.V1.ExportTraceServiceRequest.pb::Google.Protobuf.IBufferMessage.InternalMergeFrom(ParseContext& input) 
   at Google.Protobuf.CodedInputStream.ReadRawMessage(IMessage message)
   at Google.Protobuf.MessageExtensions.MergeFrom(IMessage message, Stream input, Boolean discardUnknownFields, ExtensionRegistry registry)
   at Google.Protobuf.MessageParser`1.ParseFrom(Stream input)
   at <our application code>

@jordankay13
Copy link

This issue was mentioned in the PR here:

However, upon deploying the collector and removing the workaround (setting num_consumers: 1) - the bug is still present.

One of the errors that this parallel compression bug causes is a missing EOF reference - but that's a symptom, not the cause. Under a single-threaded context, the exporter properly sets the EOF.

Commenting just to make sure we don't close this issue out.

@jordankay13
Copy link

We've had v0.101.0 running for about a week now (with the num_consumers workaround removed) and we've encountered 0 errors (last attempt, errors came up within 24 hours in our system). I believe this issue is resolved by:

FYI @MovieStoreGuy in case you want to close this out (although I'm not the OP, so always a chance I've mistaken this issue as another)

@preetsarb
Copy link
Author

Tested with v0.102.0. I can verify that issue is no longer occurring.
Thanks @leorinat for the fix!

Resolving.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working exporter/awskinesis
Projects
None yet
Development

No branches or pull requests

4 participants