-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sink connector fails when id value is number #370
Comments
Details of dead-letter queue support and configurations: https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/ |
the problem with this sample message is that all 3 values are numbers, so i am not sure if it's the non-string id causing the issue, or in fact a non-string PK value causing the issue. if it's the id field causing the problem, we should ensure that id is always a string. |
if PK path is NOT /id then PK can be any data type |
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
See if #390 fixes this issue. |
Description
sink connector fails when PK value in topic is a long (data type conversion not happening correction long->string)
Message:
{
"id": 2804,
"product_number": 2804,
"product_name": “2804"
}
Error:
[2021-03-05 14:06:31,844] ERROR WorkerSinkTask{id=cosmosdb-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:187)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.azure.cosmos.kafka.connect.sink.CosmosDBWriteException: Unable to write record to CosmosDB: null (value schema:Schema{datagen.product:STRUCT}
at com.azure.cosmos.kafka.connect.sink.CosmosDBSinkTask.put(CosmosDBSinkTask.java:86)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
Caused by: {"ClassName":"BadRequestException","userAgent":"azsdk-java-cosmos/4.11.0 MacOSX/10.15.7 JRE/11.0.10","statusCode":400,"resourceAddress":"rntbd://cdb-ms-prod-centralindia1-fd0.documents.azure.com:14092/apps/bbb89637-58d3-4009-a8d0-7803963a3701/services/4f9e883b-fd5b-42cf-8297-6d60f0f2f74d/partitions/cd221f07-7f90-4c7e-b614-bc9ecfa2b727/replicas/132593325004050143p/","error":"{"Errors":["The input name '2804' is invalid. Ensure to provide a unique non-empty string less than '1024' characters."]}","innerErrorMessage":"["The input name '2804' is invalid. Ensure to provide a unique non-empty string less than '1024' characters."]","causeInfo":null,"responseHeaders":"{x-ms-current-replica-set-size=4, x-ms-last-state-change-utc=Thu, 04 Mar 2021 11:55:23.110 GMT, x-ms-session-token=0:-1#1, lsn=1, x-ms-request-charge=1.24, x-ms-schemaversion=1.11, x-ms-transport-request-id=3, x-ms-number-of-read-regions=0, x-ms-current-write-quorum=3, x-ms-cosmos-quorum-acked-llsn=1, x-ms-quorum-acked-lsn=1, x-ms-activity-id=e1ed2d86-7d8d-11eb-a4e9-131f486068f5, x-ms-xp-role=1, x-ms-global-Committed-lsn=1, x-ms-cosmos-llsn=1, x-ms-serviceversion= version=2.11.0.0}","cosmosDiagnostics":{"userAgent":"azsdk-java-cosmos/4.11.0 MacOSX/10.15.7 JRE/11.0.10","requestLatencyInMs":1865,"requestStartTimeUTC":"2021-03-05T08:36:29.916147Z","requestEndTimeUTC":"2021-03-05T08:36:31.781581Z","connectionMode":"DIRECT","responseStatisticsList":[{"storeResult":{"storePhysicalAddress":"rntbd://cdb-ms-prod-centralindia1-fd0.documents.azure.com:14092/apps/bbb89637-58d3-4009-a8d0-7803963a3701/services/4f9e883b-fd5b-42cf-8297-6d60f0f2f74d/partitions/cd221f07-7f90-4c7e-b614-bc9ecfa2b727/replicas/132593325004050143p/","lsn":1,"globalCommittedLsn":1,"partitionKeyRangeId":"0","isValid":true,"statusCode":400,"subStatusCode":0,"isGone":false,"isNotFound":false,"isInvalidPartition":false,"requestCharge":1.24,"itemLSN":-1,"sessionToken":"-1#1","exception":"["The input name '2804' is invalid. Ensure to provide a unique non-empty string less than '1024' characters."]","transportRequestTimeline":[{"eventName":"created","startTimeUTC":"2021-03-05T08:36:30.987993Z","durationInMicroSec":386},{"eventName":"queued","startTimeUTC":"2021-03-05T08:36:30.988379Z","durationInMicroSec":3},{"eventName":"channelAcquisitionStarted","startTimeUTC":"2021-03-05T08:36:30.988382Z","durationInMicroSec":658516},{"eventName":"pipelined","startTimeUTC":"2021-03-05T08:36:31.646898Z","durationInMicroSec":1832},{"eventName":"transitTime","startTimeUTC":"2021-03-05T08:36:31.648730Z","durationInMicroSec":124787},{"eventName":"received","startTimeUTC":"2021-03-05T08:36:31.773517Z","durationInMicroSec":7567},{"eventName":"completed","startTimeUTC":"2021-03-05T08:36:31.781084Z","durationInMicroSec":1982}],"rntbdRequestLengthInBytes":468,"rntbdResponseLengthInBytes":325,"requestPayloadLengthInBytes":55,"responsePayloadLengthInBytes":null,"channelTaskQueueSize":1,"pendingRequestsCount":1,"serviceEndpointStatistics":{"availableChannels":0,"acquiredChannels":0,"executorTaskQueueSize":0,"inflightRequests":1,"lastSuccessfulRequestTime":"2021-03-05T08:36:30.983Z","lastRequestTime":"2021-03-05T08:36:30.983Z","createdTime":"2021-03-05T08:36:30.988044Z","isClosed":false}},"requestResponseTimeUTC":"2021-03-05T08:36:31.781581Z","requestResourceType":"Document","requestOperationType":"Create"}],"supplementalResponseStatisticsList":[],"addressResolutionStatistics":{"e281f37a-7d8d-11eb-a4e9-131f486068f5":{"startTimeUTC":"2021-03-05T08:36:30.896711Z","endTimeUTC":"2021-03-05T08:36:30.985504Z","targetEndpoint":"https://rankesh-partner-demo-centralindia.documents.azure.com:443/addresses/?$resolveFor=dbs%2FLYY4AA%3D%3D%2Fcolls%2FLYY4ANz1SXE%3D%2Fdocs&$filter=protocol%20eq%20rntbd&$partitionKeyRangeIds=0","errorMessage":null,"inflightRequest":false}},"regionsContacted":["https://rankesh-partner-demo-centralindia.documents.azure.com:443/"],"retryContext":{"retryCount":0,"statusAndSubStatusCodes":null,"retryLatency":0},"metadataDiagnosticsContext":{"metadataDiagnosticList":[{"metaDataName":"CONTAINER_LOOK_UP","startTimeUTC":"2021-03-05T08:36:29.925804Z","endTimeUTC":"2021-03-05T08:36:30.503833Z","durationinMS":578},{"metaDataName":"PARTITION_KEY_RANGE_LOOK_UP","startTimeUTC":"2021-03-05T08:36:30.520209Z","endTimeUTC":"2021-03-05T08:36:30.890028Z","durationinMS":369},{"metaDataName":"SERVER_ADDRESS_LOOKUP","startTimeUTC":"2021-03-05T08:36:30.896720Z","endTimeUTC":"2021-03-05T08:36:30.985493Z","durationinMS":88}]},"serializationDiagnosticsContext":{"serializationDiagnosticsList":[{"serializationType":"ITEM_SERIALIZATION","startTimeUTC":"2021-03-05T08:36:29.918371Z","endTimeUTC":"2021-03-05T08:36:29.921127Z","durationInMicroSec":2756},{"serializationType":"PARTITION_KEY_FETCH_SERIALIZATION","startTimeUTC":"2021-03-05T08:36:30.505873Z","endTimeUTC":"2021-03-05T08:36:30.506922Z","durationInMicroSec":1049}]},"gatewayStatistics":null,"systemInformation":{"usedMemory":"662544 KB","availableMemory":"1434608 KB","systemCpuLoad":"(2021-03-05T08:36:02.928443Z 5.4%), (2021-03-05T08:36:07.928489Z 4.3%), (2021-03-05T08:36:12.930572Z 5.4%), (2021-03-05T08:36:17.928052Z 6.3%), (2021-03-05T08:36:22.930060Z 3.9%), (2021-03-05T08:36:27.925883Z 4.7%)"},"clientCfgs":{"id":0,"numberOfClients":1,"connCfg":{"rntbd":"(cto:PT5S, rto:PT5S, icto:PT0S, ieto:PT1H, mcpe:130, mrpc:30, cer:false)","gw":"(cps:1000, rto:PT5S, icto:null, p:false)","other":"(ed: true, cs: false)"},"consistencyCfg":"(consistency: null, mm: true, prgns: [])"}}}
at com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestManager.messageReceived(RntbdRequestManager.java:768)
at com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdRequestManager.channelRead(RntbdRequestManager.java:181)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1533)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1282)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1329)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 more
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
at reactor.core.publisher.Mono.block(Mono.java:1685)
at com.azure.cosmos.CosmosContainer.blockItemResponse(CosmosContainer.java:232)
at com.azure.cosmos.CosmosContainer.createItem(CosmosContainer.java:153)
at com.azure.cosmos.kafka.connect.sink.CosmosDBSinkTask.addItemToContainer(CosmosDBSinkTask.java:96)
at com.azure.cosmos.kafka.connect.sink.CosmosDBSinkTask.put(CosmosDBSinkTask.java:84)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
[2021-03-05 14:06:31,846] INFO Attempting to close client 0 (com.azure.cosmos.implementation.RxDocumentClientImpl:3714)
[2021-03-05 14:06:31,846] INFO Shutting down ... (com.azure.cosmos.implementation.RxDocumentClientImpl:3716)
[2021-03-05 14:06:31,846] INFO Closing Global Endpoint Manager ... (com.azure.cosmos.implementation.RxDocumentClientImpl:3717)
[2021-03-05 14:06:31,846] INFO Closing StoreClientFactory ... (com.azure.cosmos.implementation.RxDocumentClientImpl:3719)
[2021-03-05 14:06:31,846] INFO Shutting down RntbdClientChannelPoolMonitoringProvider ... (com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdServiceEndpoint$RntbdEndpointMonitoringProvider:591)
[2021-03-05 14:06:31,851] INFO Shutting down reactorHttpClient ... (com.azure.cosmos.implementation.RxDocumentClientImpl:3721)
[2021-03-05 14:06:31,852] INFO Shutting down CpuMonitor ... (com.azure.cosmos.implementation.RxDocumentClientImpl:3723)
[2021-03-05 14:06:31,852] INFO Shutting down completed. (com.azure.cosmos.implementation.RxDocumentClientImpl:3725)
[2021-03-05 14:06:31,853] INFO [Consumer clientId=connector-consumer-cosmosdb-sink-0, groupId=connect-cosmosdb-sink] Revoke previously assigned partitions products-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:307)
[2021-03-05 14:06:31,853] INFO [Consumer clientId=connector-consumer-cosmosdb-sink-0, groupId=connect-cosmosdb-sink] Member connector-consumer-cosmosdb-sink-0-60127476-9318-4d62-bfe3-e6db62da93a8 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:1029)
[2021-03-0
Expected Behavior
The long value should be converted in to a string before being used as the id field
Reproduce
Additional Context
consider reject any bad messages (due to data type conversions etc.) and using the deadletter APIs to push these bad messages to the dead-letter topic.
awaiting further documentation andor examples on how to push to dead-letter from within a connector
The text was updated successfully, but these errors were encountered: