-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unhandled exception when JSON is not valid #46
Comments
sounds like a bug in the CosmosDB ChangeFeed logic. either internally in their Java SDK, or in our implementation of the ChangeFeed consumer. have you tracked down where this is happening? |
Was able to reproduce this outside our connector source code by using this sample application. To do so, add the |
However, I am able to get the source and sink connector working together, which should be the original use case behind this issue. The sink connector pushed the following message to Cosmos DB: and this is how it was added into CosmosDB:
Looks like the In the source connector, I get back the following message in the topic:
|
Now if you take that document out of Kafka again with the Sink connector and write it back to Cosmos DB you'll get it in Cosmos DB, with _lsn added ... Then, if you read that out, again, with the Source connector it'll fail because it already has _lsn in it and the Cosmos DB SDK tries to add it again. A complicated, and unlikely, but not impossible, scenario. The solution to this would be to remove the _lsn (and other system properties because it'll probably fail anyways when trying to write a doc containing system properties) before writing it back to Cosmos DB using a SMT. You could also configure an SMT to remove all _* properties in the Source connector when reading, before writing to Kafka. Still, should raise this with Cosmos DB team to see if they can address this in their SDK. |
We should also ensure that the Source connector handles this Invalid JSON error correctly. Need to think about what "correctly" is. Can we, log the error, dead letter this document (if configured) and proceed with the next Document instead of going in to a failed state? |
Thanks for your responses Ryan, especially over the weekend. I understand the scenario now and am able to properly reproduce it. I think it should definitely be possible to remove the Since dead-letter-queues only work for Sink connectors, I don't think that's a viable approach for this issue. |
Design decision: filter out system properties |
will implement additional code in source connector to filter system properties from the jackson object property bag before passing records to Kafka exact list of properties still to be defined as some sys properties might actually be useful, like _ts |
suggested work around for now until this is implemented - use a Single Message Transform (as documented) to filter out properties that you wish to filter. OR have Connect output data to a K-SQL stream and then use filtering there to drop system properties before writing to Kafka. |
follow pattern done by the Spark 3 connector to config include system properties and timestamps |
When reading from Cosmos DB Changfeed, if the original document already has a
_lsn
property, the ChangeFeedProcessor will add an additional one, creating an invalid JSON documentwhen the document is converted to a JSON, the code crashes. Here's the significant stack trace:
The text was updated successfully, but these errors were encountered: