Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Primitive Keys: Ensure create table gracefully handles null key in topic #4109

Closed
big-andy-coates opened this issue Dec 10, 2019 · 6 comments · Fixed by #4351
Closed

Primitive Keys: Ensure create table gracefully handles null key in topic #4109

big-andy-coates opened this issue Dec 10, 2019 · 6 comments · Fixed by #4351
Assignees
Labels
P0 Denotes must-have for a given milestone
Milestone

Comments

@big-andy-coates
Copy link
Contributor

big-andy-coates commented Dec 10, 2019

To recreate, add this test to table.json:

   {
      "name": "should not blow up on null key",
      "statements": [
        "CREATE TABLE INPUT (ID bigint) WITH (kafka_topic='test_topic', value_format='DELIMITED');",
        "CREATE TABLE OUTPUT as SELECT * FROM INPUT;"
      ],
      "inputs": [
        {"topic": "test_topic", "key": "1", "value": "1"},
        {"topic": "test_topic", "key": null, "value": "2"},
        {"topic": "test_topic", "key": "1", "value": "3"}
      ],
      "outputs": [
        {"topic": "OUTPUT", "key": "1", "value": "1"},
        {"topic": "OUTPUT", "key": "1", "value": "3"}
      ]
    }

Results in NPE:

java.lang.NullPointerException
	at org.apache.kafka.streams.state.internals.RocksDBStore.get(RocksDBStore.java:296)
	at org.apache.kafka.streams.state.internals.RocksDBStore.get(RocksDBStore.java:76)
	at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$2(MeteredKeyValueStore.java:133)
	at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
	at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:133)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.get(ProcessorContextImpl.java:465)
	at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:107)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
	at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:821)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
	at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
	at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:385)
	at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:821)
	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:385)
	at org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:467)
	at org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:429)
	at io.confluent.ksql.test.tools.TestExecutor.processSingleRecord(TestExecutor.java:344)
	at io.confluent.ksql.test.tools.TestExecutor.pipeRecordsFromProvidedInput(TestExecutor.java:299)
	at io.confluent.ksql.test.tools.TestExecutor.buildAndExecuteQuery(TestExecutor.java:166)
	at io.confluent.ksql.test.EndToEndEngineTestUtil.shouldBuildAndExecuteQuery(EndToEndEngineTestUtil.java:46)
	at io.confluent.ksql.test.QueryTranslationTest.shouldBuildAndExecuteQueries(QueryTranslationTest.java:83)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.junit.runners.Suite.runChild(Suite.java:128)
	at org.junit.runners.Suite.runChild(Suite.java:27)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
@big-andy-coates big-andy-coates added this to the 0.7.0 milestone Dec 10, 2019
@apurvam apurvam changed the title Ensure CT gracefully handles null key in topic Primitive Keys: Ensure CT gracefully handles null key in topic Dec 17, 2019
@apurvam apurvam added the P0 Denotes must-have for a given milestone label Dec 17, 2019
@apurvam
Copy link
Contributor

apurvam commented Jan 14, 2020

Adding a null value in a change log causes QTT to blow up.

@purplefox
Copy link
Contributor

A couple of questions:

  • What does CT stand for?
  • Do you have any steps to reproduce this?

@agavra
Copy link
Contributor

agavra commented Jan 15, 2020

@purplefox - CT is CREATE TABLE. Not sure exactly how to reproduce it (I haven't seen this one before)

@purplefox
Copy link
Contributor

@big-andy-coates Hey Andy, could you add a little more info here please?

@purplefox purplefox changed the title Primitive Keys: Ensure CT gracefully handles null key in topic Primitive Keys: Ensure create table gracefully handles null key in topic Jan 15, 2020
@mjsax
Copy link
Member

mjsax commented Jan 18, 2020

It’s unclear how this could happen to me. If key==null a record should be dropped by Kafka Streams:
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java#L97-L104

Hence, it seems you don't pass a null key. Digging into the code, it seems you pass an empty Struct. Later, we this empty Struct is serialized to do the lookup into RocksDB, the serialized bytes[] are null again, and thus RocksDBStore.get() fails with an NPE -- you need to make sure that the null key is passed to Kafka Streams as null.

@purplefox
Copy link
Contributor

purplefox commented Jan 19, 2020

+1 @mjsax looks like you're right.

I've created a fix which makes the serialization/deserialization of null in ksql symmetrical (currently it's asymmetrical, i.e. serializing a null then deserializing doesn't get you back to null). This means the value passed to streams is null and the streams null checks work as intended.

Fix is here. #4351

It seems to work, but I don't know if it's the best way to fix it, as most of this area of the code is new to me. @big-andy-coates wdyt?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
P0 Denotes must-have for a given milestone
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants