Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka-influxdb-sink cannot handle Avro type array anymore #737

Closed
afausti opened this issue Jan 26, 2021 · 7 comments
Closed

kafka-influxdb-sink cannot handle Avro type array anymore #737

afausti opened this issue Jan 26, 2021 · 7 comments

Comments

@afausti
Copy link
Contributor

afausti commented Jan 26, 2021

Issue Guidelines

What version of the Stream Reactor are you reporting this issue for?

2.1.3

Are you running the correct version of Kafka/Confluent for the Stream reactor release?

Yes, with Confluent Kafka 5.5.2

Have you read the docs?

Yes

What is the expected behaviour?

I would expect kafka-influxdb-sink 2.1.3 to support Avro records of type array (see PR #522) In fact I contributed that PR in the past, but I didn't include a test for it :(

I can try to fix this with some guidance.

What was observed?

docker-compose exec schema-registry kafka-avro-console-producer --bootstrap-server broker:29092 --topic foo --property value.schema='{"type":"record", "name":"foo", "fields":[{"name":"bar","type":"string"}, {"name":"baz","type":{"type":"array","items":"float"}}]}'

{"bar": "John Doe","baz": [1,2,3]}

What is your Connect cluster configuration (connect-avro-distributed.properties)?

$docker-compose exec connect cat ./etc/schema-registry/connect-avro-distributed.properties

bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
plugin.path=/usr/share/java,/usr/share/confluent-hub-components

What is your connector properties configuration (my-connector.properties)?

docker-compose run kafkaconnect config influxdb-sink
Creating angelofausti_kafkaconnect_run ... done
{
    "connect.influx.db": "mydb",
    "connect.influx.error.policy": "THROW",
    "connect.influx.kcql": "INSERT INTO foo SELECT * FROM foo WITHTIMESTAMP sys_time()",
    "connect.influx.max.retries": "10",
    "connect.influx.password": "",
    "connect.influx.retry.interval": "60000",
    "connect.influx.timestamp": "sys_time()",
    "connect.influx.url": "http://influxdb:8086",
    "connect.influx.username": "-",
    "connect.progress.enabled": "false",
    "connector.class": "com.datamountaineer.streamreactor.connect.influx.InfluxSinkConnector",
    "name": "influxdb-sink",
    "tasks.max": "1",
    "topics": "foo"
}

Please provide full log files (redact and sensitive information)

connect            | [2021-01-26 23:12:40,756] INFO Empty list of records received. (com.datamountaineer.streamreactor.connect.influx.InfluxSinkTask)
connect            | [2021-01-26 23:12:42,631] ERROR Encountered error Can't select field:'baz' because it leads to value:'[1.0, 2.0, 3.0]' (java.util.ArrayList)is 
not a valid type for InfluxDb. (com.datamountaineer.streamreactor.connect.influx.writers.InfluxDbWriter)
connect            | java.lang.RuntimeException: Can't select field:'baz' because it leads to value:'[1.0, 2.0, 3.0]' (java.util.ArrayList)is not a valid type for InfluxDb.
connect            |    at com.datamountaineer.streamreactor.connect.influx.converters.InfluxPoint$.writeField(InfluxPoint.scala:91)
connect            |    at com.datamountaineer.streamreactor.connect.influx.converters.InfluxPoint$.$anonfun$addValuesAndTags$6(InfluxPoint.scala:36)
connect            |    at scala.util.Success.flatMap(Try.scala:251)
connect            |    at com.datamountaineer.streamreactor.connect.influx.converters.InfluxPoint$.$anonfun$addValuesAndTags$5(InfluxPoint.scala:35)
connect            |    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
connect            |    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
connect            |    at scala.collection.immutable.List.foldLeft(List.scala:89)
connect            |    at com.datamountaineer.streamreactor.connect.influx.converters.InfluxPoint$.addValuesAndTags(InfluxPoint.scala:34)
connect            |    at com.datamountaineer.streamreactor.connect.influx.converters.InfluxPoint$.$anonfun$build$6(InfluxPoint.scala:23)
connect            |    at scala.util.Success.flatMap(Try.scala:251)
connect            |    at com.datamountaineer.streamreactor.connect.influx.converters.InfluxPoint$.build(InfluxPoint.scala:20)
connect            |    at com.datamountaineer.streamreactor.connect.influx.writers.InfluxBatchPointsBuilder.$anonfun$build$4(InfluxBatchPointsBuilder.scala:94)
connect            |    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273)
connect            |    at scala.collection.Iterator.foreach(Iterator.scala:943)
connect            |    at scala.collection.Iterator.foreach$(Iterator.scala:943)
connect            |    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
connect            |    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
connect            |    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
connect            |    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
connect            |    at scala.collection.TraversableLike.map(TraversableLike.scala:273)
connect            |    at scala.collection.TraversableLike.map$(TraversableLike.scala:266)
connect            |    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
connect            |    at com.datamountaineer.streamreactor.connect.influx.writers.InfluxBatchPointsBuilder.$anonfun$build$3(InfluxBatchPointsBuilder.scala:94)
connect            |    at scala.Option.map(Option.scala:230)
connect            |    at com.datamountaineer.streamreactor.connect.influx.writers.InfluxBatchPointsBuilder.$anonfun$build$2(InfluxBatchPointsBuilder.scala:94)
connect            |    at scala.util.Success.flatMap(Try.scala:251)
connect            |    at com.datamountaineer.streamreactor.connect.influx.writers.InfluxBatchPointsBuilder.$anonfun$build$1(InfluxBatchPointsBuilder.scala:91)
connect            |    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273)
connect            |    at scala.collection.Iterator.foreach(Iterator.scala:943)
connect            |    at scala.collection.Iterator.foreach$(Iterator.scala:943)
connect            |    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
connect            |    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
connect            |    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
connect            |    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
connect            |    at scala.collection.TraversableLike.map(TraversableLike.scala:273)
connect            |    at scala.collection.TraversableLike.map$(TraversableLike.scala:266)
connect            |    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
connect            |    at com.datamountaineer.streamreactor.connect.influx.writers.InfluxBatchPointsBuilder.build(InfluxBatchPointsBuilder.scala:88)
connect            |    at com.datamountaineer.streamreactor.connect.influx.writers.InfluxDbWriter.write(InfluxDbWriter.scala:45)
connect            |    at com.datamountaineer.streamreactor.connect.influx.InfluxSinkTask.$anonfun$put$2(InfluxSinkTask.scala:77)
connect            |    at com.datamountaineer.streamreactor.connect.influx.InfluxSinkTask.$anonfun$put$2$adapted(InfluxSinkTask.scala:77)
connect            |    at scala.Option.foreach(Option.scala:407)
connect            |    at com.datamountaineer.streamreactor.connect.influx.InfluxSinkTask.put(InfluxSinkTask.scala:77)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:549)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
connect            |    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect            |    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect            |    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect            |    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect            |    at java.lang.Thread.run(Thread.java:748)
@stheppi
Copy link
Contributor

stheppi commented Jan 28, 2021

Hi @afausti ,
InfluxDb client linked does not have an API for adding an Array as a point.

This code was not touched (even if you look at the history).

def writeField(builder: Point.Builder)(field: String, v: Any): Try[Point.Builder] = v match {

So I am not sure when the influxdb sink ever supported inserting an array.

I see indeed you added the point handling at higher level when array support were added. So it's best to inject the handling lower in the code i shared. We will add it back

@afausti
Copy link
Contributor Author

afausti commented Jan 28, 2021

@stheppi nice, thank you, yes the solution in that PR was to extract the elements of the array and write them as new fields in InfluxDB. It used to work like this:

docker-compose exec schema-registry kafka-avro-console-producer --bootstrap-server broker:29092 --topic foo --property value.schema='{"type":"record", "name":"foo", "fields":[{"name":"bar","type":"string"}, {"name":"baz","type":{"type":"array","items":"float"}}]}'
{"bar": "John Doe","baz": [1.0,2.0,3.0]}
Ctrl+D

docker-compose exec influxdb influx -database mydb -execute "SELECT * FROM foo"
name: foo
time                bar      baz0 baz1 baz2
----                ---      ---- ---- ----
1611707507555316950 John Doe 1.0  2.0  3.0

@afausti
Copy link
Contributor Author

afausti commented Feb 13, 2021

Please consider adding a test to ensure that the connector can handle arrays with NaN values. An array like baz=[1.0, NaN, 3.0] should create the field set baz0=1.0, baz2=3.0 in InfluxDB. According to PR #734 influxdb-java is expected to skip fields with NaN values before writing to InfluxDB.

@afausti
Copy link
Contributor Author

afausti commented May 26, 2021

@stheppi just checking if there plans on adding this feature back. Thank you!

@stheppi
Copy link
Contributor

stheppi commented Jun 4, 2021

@afausti yes, we will be working to add it back

@afausti
Copy link
Contributor Author

afausti commented Jun 8, 2021

I can confirm that the new array handling implementation works fine for me. I've built kafka-connect-influxdb from master and followed these steps produce an Avro encoded message with an array and verified that it is flattened in InfluxDB.

This issue can be closed.

@andrewstevenson
Copy link
Contributor

@afausti thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants