Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Header Still Added To Older Clients using Hystrix #944

Closed
ctoestreich opened this issue Aug 8, 2019 · 7 comments
Closed

Kafka Header Still Added To Older Clients using Hystrix #944

ctoestreich opened this issue Aug 8, 2019 · 7 comments
Labels
tag: needs investigation Issues needing investigations type: bug
Milestone

Comments

@ctoestreich
Copy link

ctoestreich commented Aug 8, 2019

We were seeing errors using version 0.24.0 of the java apm agent using older kafka stack. We upgraded to 0.31.0 to try and get the fix from 0.28.0, however we are still seeing the issue wher it tries to add a header. This MAY be due to thee addition of hystrix instrumentation as well. We are still investigating. Stack from our logs .

2019-08-08 09:11:21.596-05:00  ERROR- |[org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#74-2] [com.xxxxxxxxxxxx.event.core.service.EventLogService] [mthd]  | null - EventLogService-2019-08-08-09-11-00
Step: Post message to Kafka
EventLog-0024906627 EventLogID: xxxxxx, EventName: xxxxxxxx, EventStatus: 1, TransID: xxxxxxx, AttemptsToProcessCount: 4, PreviousEventLogID: 0, NextEventLogID: 0, Payload: xxxxxxxxx
Try again later exception: java.util.concurrent.ExecutionException: Observable onError
java.util.concurrent.ExecutionException: Observable onError
	at rx.internal.operators.BlockingOperatorToFuture$2.getValue(BlockingOperatorToFuture.java:118)
	at rx.internal.operators.BlockingOperatorToFuture$2.get(BlockingOperatorToFuture.java:102)
	at com.netflix.hystrix.HystrixCommand$4.get(HystrixCommand.java:423)
	at org.springframework.util.concurrent.FutureAdapter.get(FutureAdapter.java:81)
	at com.xxxxxxxxxxxx.event.core.service.impl.EventLogServiceImpl.lambda$10(EventLogServiceImpl.java:229)
	at com.xxxxxxxxxxxx.event.core.service.impl.EventLogServiceImpl.dt_access$739(EventLogServiceImpl.java)
	at com.xxxxxxxxxxxx.event.core.service.impl.EventLogServiceImpl.lambda$7(EventLogServiceImpl.java:130)
	at com.xxxxxxxxxxxx.event.core.service.impl.EventLogServiceImpl.dt_access$723(EventLogServiceImpl.java)
	at java.util.Optional.orElseGet(Optional.java:267)
	at com.xxxxxxxxxxxx.event.core.service.impl.EventLogServiceImpl.runStep(EventLogServiceImpl.java:130)
	at com.xxxxxxxxxxxx.event.core.service.impl.EventLogServiceImpl.lambda$3(EventLogServiceImpl.java:104)
	at com.xxxxxxxxxxxx.event.core.service.impl.EventLogServiceImpl.dt_access$694(EventLogServiceImpl.java)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at com.xxxxxxxxxxxx.event.core.service.impl.EventLogServiceImpl.processEventList(EventLogServiceImpl.java:107)
	at com.xxxxxxxxxxxx.event.core.service.impl.EventLogServiceImpl.pushEventLogsToQueue(EventLogServiceImpl.java:80)
	at com.xxxxxxxxxxxx.event.core.util.EventLogPushTask.runTask(EventLogPushTask.java:18)
	at com.xxxxxxxxxxxx.common.scheduler.util.TaskQueueReceiverImpl.runTaskThrows(TaskQueueReceiverImpl.java:167)
	at com.xxxxxxxxxxxx.common.scheduler.util.TaskQueueReceiverImpl.runTask(TaskQueueReceiverImpl.java:127)
	at com.xxxxxxxxxxxx.common.scheduler.util.TaskQueueReceiverImpl.runScheduledJob(TaskQueueReceiverImpl.java:78)
	at com.xxxxxxxxxxxx.common.scheduler.util.TaskQueueReceiverImpl.processMessage(TaskQueueReceiverImpl.java:68)
	at com.xxxxxxxxxxxx.common.scheduler.util.TaskQueueReceiverImpl.handleMessage(TaskQueueReceiverImpl.java:64)
	at com.xxxxxxxxxxxx.amqp.AMQPNonTransactionalMessageListener.handleMessage(AMQPNonTransactionalMessageListener.java:44)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.util.MethodInvoker.invoke(MethodInvoker.java:265)
	at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:387)
	at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:298)
	at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:273)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:856)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:779)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:105)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:208)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1381)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:760)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1324)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1294)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1800(SimpleMessageListenerContainer.java:105)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1550)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.netflix.hystrix.exception.HystrixRuntimeException: ResilientKafkaEventNotifierCommand failed and no fallback available.
	at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:822)
	at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:807)
	at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:140)
	at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
	at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
	at com.netflix.hystrix.AbstractCommand$DeprecatedOnFallbackHookApplication$1.onError(AbstractCommand.java:1472)
	at com.netflix.hystrix.AbstractCommand$FallbackHookApplication$1.onError(AbstractCommand.java:1397)
	at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
	at datadog.trace.instrumentation.hystrix.HystrixInstrumentation$TracedSubscriber.onError(HystrixInstrumentation.java:222)
	at rx.observers.Subscribers$5.onError(Subscribers.java:230)
	at rx.internal.operators.OnSubscribeThrow.call(OnSubscribeThrow.java:44)
	at rx.internal.operators.OnSubscribeThrow.call(OnSubscribeThrow.java:28)
	at rx.Observable.unsafeSubscribe(Observable.java:10151)
	at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)
	at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
	at datadog.trace.instrumentation.hystrix.HystrixInstrumentation$TracedOnSubscribe.call(HystrixInstrumentation.java:136)
	at datadog.trace.instrumentation.hystrix.HystrixInstrumentation$TracedOnSubscribe.call(HystrixInstrumentation.java:99)
	at rx.Observable.unsafeSubscribe(Observable.java:10151)
	at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
	at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
	at rx.Observable.unsafeSubscribe(Observable.java:10151)
	at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
	at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
	at rx.Observable.unsafeSubscribe(Observable.java:10151)
	at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
	at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
	at rx.Observable.unsafeSubscribe(Observable.java:10151)
	at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
	at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
	at rx.Observable.unsafeSubscribe(Observable.java:10151)
	at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:142)
	at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
	at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
	at com.netflix.hystrix.AbstractCommand$HystrixObservableTimeoutOperator$3.onError(AbstractCommand.java:1194)
	at rx.internal.operators.OperatorSubscribeOn$1$1.onError(OperatorSubscribeOn.java:59)
	at rx.observers.Subscribers$5.onError(Subscribers.java:230)
	at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)
	at rx.observers.Subscribers$5.onError(Subscribers.java:230)
	at com.netflix.hystrix.AbstractCommand$DeprecatedOnRunHookApplication$1.onError(AbstractCommand.java:1431)
	at com.netflix.hystrix.AbstractCommand$ExecutionHookApplication$1.onError(AbstractCommand.java:1362)
	at datadog.trace.instrumentation.hystrix.HystrixInstrumentation$TracedSubscriber.onError(HystrixInstrumentation.java:222)
	at rx.observers.Subscribers$5.onError(Subscribers.java:230)
	at rx.observers.Subscribers$5.onError(Subscribers.java:230)
	at rx.internal.operators.OnSubscribeThrow.call(OnSubscribeThrow.java:44)
	at rx.internal.operators.OnSubscribeThrow.call(OnSubscribeThrow.java:28)
	at rx.Observable.unsafeSubscribe(Observable.java:10151)
	at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)
	at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
	at datadog.trace.instrumentation.hystrix.HystrixInstrumentation$TracedOnSubscribe.call(HystrixInstrumentation.java:136)
	at datadog.trace.instrumentation.hystrix.HystrixInstrumentation$TracedOnSubscribe.call(HystrixInstrumentation.java:99)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
	at rx.Observable.unsafeSubscribe(Observable.java:10151)
	at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)
	at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
	at rx.Observable.unsafeSubscribe(Observable.java:10151)
	at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
	at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
	at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
	at rx.Observable.unsafeSubscribe(Observable.java:10151)
	at rx.internal.operators.OperatorSubscribeOn$1.call(OperatorSubscribeOn.java:94)
	at com.netflix.hystrix.strategy.concurrency.HystrixContexSchedulerAction$1.call(HystrixContexSchedulerAction.java:56)
	at com.netflix.hystrix.strategy.concurrency.HystrixContexSchedulerAction$1.call(HystrixContexSchedulerAction.java:47)
	at com.netflix.hystrix.strategy.concurrency.HystrixContexSchedulerAction.call(HystrixContexSchedulerAction.java:69)
	at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.lang.IllegalArgumentException: Magic v1 does not support record headers
	at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385)
	at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:424)
	at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:481)
	at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:504)
	at org.apache.kafka.clients.producer.internals.ProducerBatch.tryAppend(ProducerBatch.java:106)
	at org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:219)
	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:791)
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:745)
	at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:358)
	at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:355)
	at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:181)
	at com.xxxxxxxxxxxx.event.core.messaging.impl.ResilientKafkaEventNotifierImpl.lambda$0(ResilientKafkaEventNotifierImpl.java:28)
	at com.xxxxxxxxxxxx.event.core.messaging.impl.ResilientKafkaEventNotifierImpl.dt_access$80(ResilientKafkaEventNotifierImpl.java)
	at com.xxxxxxxxxxxx.event.core.messaging.impl.ResilientKafkaEventNotifierImpl$ResilientKafkaEventNotifierCommand.run(ResilientKafkaEventNotifierImpl.java:56)
	at com.xxxxxxxxxxxx.event.core.messaging.impl.ResilientKafkaEventNotifierImpl$ResilientKafkaEventNotifierCommand.run(ResilientKafkaEventNotifierImpl.java:1)
	at com.netflix.hystrix.HystrixCommand$2.call(HystrixCommand.java:302)
	at com.netflix.hystrix.HystrixCommand$2.call(HystrixCommand.java:298)
	at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:46)
	... 28 more
@ctoestreich ctoestreich changed the title Kafka Header Still Added To Older Clients Kafka Header Still Added To Older Clients using Hystrix Aug 8, 2019
@ctoestreich
Copy link
Author

ctoestreich commented Aug 8, 2019

We are going to try using a combination of these to see if one or the other resolves the issue

DD_INTEGRATION_KAFKA_ENABLED=false
DD_INTEGRATION_KAFKA_STREAMS_ENABLED=false
DD_INTEGRATION_HYSTRIX_ENABLED=false

This causes the error to disappear.

@ctoestreich
Copy link
Author

We next tried

DD_INTEGRATION_KAFKA_ENABLED=true
DD_INTEGRATION_KAFKA_STREAMS_ENABLED=true
DD_INTEGRATION_HYSTRIX_ENABLED=false

and the issue has returned

@tylerbenson
Copy link
Contributor

What Kafka client and broker version are you using?

@nnordrum
Copy link

He’s using 0.10.2.0-cp1

@randomanderson randomanderson added type: bug tag: needs investigation Issues needing investigations labels Oct 24, 2019
@ctoestreich
Copy link
Author

Of our 100+ services, only 1 has this issue due to older kafka drivers, but it is the monolith and upgrading is not on the horizon. Anything on the roadmap for more investigation on this issue?

@randomanderson
Copy link
Contributor

With #1448, you can disable just the header injection while leaving everything all other instrumentation active.

@github-actions
Copy link
Contributor

🤖 This issue has been addressed in the latest release. See full details in the Release Notes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
tag: needs investigation Issues needing investigations type: bug
Projects
None yet
Development

No branches or pull requests

4 participants