Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MongoDB CDC will appear "the resume token was not found" error #1879

Closed
bigjar opened this issue Jan 18, 2023 · 4 comments · Fixed by #1938
Closed

MongoDB CDC will appear "the resume token was not found" error #1879

bigjar opened this issue Jan 18, 2023 · 4 comments · Fixed by #1938
Assignees
Labels
bug Something isn't working

Comments

@bigjar
Copy link

bigjar commented Jan 18, 2023

when use mongodb cdc (sql) transfer the mongodb table to flink table store table,if enable the scan.incremental.snapshot, the job will appear the error: " the resume token was not found" immediately.

Environment :

  • Flink version : 1.16.0
  • Flink CDC version: 2.3.0
  • Database and version: mongodb 4.4 flink table store 0.3

the sql is below:

SET table.exec.resource.default-parallelism=10;

DROP TABLE IF EXISTS dim_mongo_xxx;
CREATE TABLE IF NOT EXISTS dim_mongo_xxx (
    _id STRING,
    threaten_type  STRING,
    credit_level STRING,
    threaten_score STRING,
    threaten_source STRING,
    created_time TIMESTAMP(3),
    updated_time TIMESTAMP(3),
    attack_time STRING,
    source STRING,
    ip STRING,
    domain STRING,
    device STRING,
    iphone STRING,
    PRIMARY KEY(_id) NOT ENFORCED
) WITH (
    'connector' = 'mongodb-cdc',
    'hosts' = 'xxxxx',
    'username' = 'xxxxx',
    'password' = 'xxxx',
    'database' = 'xxxx',
    'collection' = 'xxxxxxx',
    'heartbeat.interval.ms' = '3000',
    'poll.max.batch.size' = '2048',
    'scan.incremental.snapshot.enabled' = 'true'
);
CREATE CATALOG fts_catalog WITH (
    'type'='table-store',
    'warehouse'='hdfs://xxx/table_store'
);
USE CATALOG fts_catalog;
DROP TABLE IF EXISTS dim_fts_xxx;
CREATE TABLE IF NOT EXISTS dim_fts_xxx(
    _id STRING,
    threaten_type  STRING,
    credit_level STRING,
    threaten_score STRING,
    threaten_source STRING,
    created_time TIMESTAMP(3),
    updated_time TIMESTAMP(3),
    attack_time STRING,
    source STRING,
    ip STRING,
    domain STRING,
    device STRING,
    iphone STRING,
    PRIMARY KEY(_id) NOT ENFORCED
);
insert into fts_catalog.`default`.dim_fts_xxx select * from default_catalog.default_database.dim_mongo_xxx ;

the exception is below:

java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Read split SnapshotSplit{tableId=Furion.ThreatenScore, splitId='xxx.xxxxxx:2', splitKeyType=[_id INT], splitStart=[{"_id": 1}, {"_id": "04gif.com"}], splitEnd=[{"_id": 1}, {"_id": "09166544.7cmw.com"}], highWatermark=null} error due to Open change stream failed.
at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.checkReadException(IncrementalSourceScanFetcher.java:181)
at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.pollSplitRecords(IncrementalSourceScanFetcher.java:128)
at com.ververica.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:73)
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
... 6 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Open change stream failed
at com.ververica.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java:265)
at com.ververica.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.execute(MongoDBStreamFetchTask.java:108)
at com.ververica.cdc.connectors.mongodb.source.reader.fetch.MongoDBScanFetchTask.execute(MongoDBScanFetchTask.java:183)
at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$0(IncrementalSourceScanFetcher.java:94)
... 5 more
Caused by: com.mongodb.MongoCommandException: Command failed with error 280 (ChangeStreamFatalError): 'cannot resume stream; the resume token was not found. {_data: "8263C7B78200001B832B022C0100296E5A100444A181EBC5C44EE98FC7F7B47B7FB72B463C5F6964003C3131342E34362E34312E313139000004"}' on server xxx.xxx.xx.x:27017. The full response is {"errorLabels": ["NonResumableChangeStreamError"], "operationTime": {"$timestamp": {"t": 1674033032, "i": 17}}, "ok": 0.0, "errmsg": "cannot resume stream; the resume token was not found. {_data: "8263C7B78200001B832B022C0100296E5A100444A181EBC5C44EE98FC7F7B47B7FB72B463C5F6964003C3131342E34362E34312E313139000004"}", "code": 280, "codeName": "ChangeStreamFatalError", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1674033032, "i": 30}}, "signature": {"hash": {"$binary": {"base64": "9xZDZYaihfU1bsbUm1t+yovaEbU=", "subType": "00"}}, "keyId": 7184636505522962435}}}
at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:195)
at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:398)
at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:319)
at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:114)
at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:603)
at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:81)
at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:252)
at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:214)
at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:123)
at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:113)
at com.mongodb.internal.operation.CommandOperationHelper.executeCommand(CommandOperationHelper.java:328)
at com.mongodb.internal.operation.CommandOperationHelper.executeCommand(CommandOperationHelper.java:318)
at com.mongodb.internal.operation.CommandOperationHelper.executeCommandWithConnection(CommandOperationHelper.java:201)
at com.mongodb.internal.operation.CommandOperationHelper.lambda$executeCommand$4(CommandOperationHelper.java:189)
at com.mongodb.internal.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:583)
at com.mongodb.internal.operation.CommandOperationHelper.executeCommand(CommandOperationHelper.java:189)
at com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:195)
at com.mongodb.internal.operation.ChangeStreamOperation$1.call(ChangeStreamOperation.java:323)
at com.mongodb.internal.operation.ChangeStreamOperation$1.call(ChangeStreamOperation.java:319)
at com.mongodb.internal.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:583)
at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:319)
at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:58)
at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:184)
at com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:204)
at com.mongodb.client.internal.ChangeStreamIterableImpl.access$000(ChangeStreamIterableImpl.java:53)
at com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:129)
at com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:121)
at com.ververica.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java:237)
... 8 more
when use mongodb cdc (sql) transfer the mongodb table to flink table store table,if enable the scan.incremental.snapshot, the job will appear the error: " the resume token was not found" immediately.

Environment :

  • Flink version : 1.16.0
  • Flink CDC version: 2.3.0
  • Database and version: mongodb 4.4 flink table store 0.3

the sql is below:

SET table.exec.resource.default-parallelism=10;

DROP TABLE IF EXISTS dim_mongo_xxx;
CREATE TABLE IF NOT EXISTS dim_mongo_xxx (
    _id STRING,
    threaten_type  STRING,
    credit_level STRING,
    threaten_score STRING,
    threaten_source STRING,
    created_time TIMESTAMP(3),
    updated_time TIMESTAMP(3),
    attack_time STRING,
    source STRING,
    ip STRING,
    domain STRING,
    device STRING,
    iphone STRING,
    PRIMARY KEY(_id) NOT ENFORCED
) WITH (
    'connector' = 'mongodb-cdc',
    'hosts' = 'xxxxx',
    'username' = 'xxxxx',
    'password' = 'xxxx',
    'database' = 'xxxx',
    'collection' = 'xxxxxxx',
    'heartbeat.interval.ms' = '3000',
    'poll.max.batch.size' = '2048',
    'scan.incremental.snapshot.enabled' = 'true'
);
CREATE CATALOG fts_catalog WITH (
    'type'='table-store',
    'warehouse'='hdfs://xxx/table_store'
);
USE CATALOG fts_catalog;
DROP TABLE IF EXISTS dim_fts_xxx;
CREATE TABLE IF NOT EXISTS dim_fts_xxx(
    _id STRING,
    threaten_type  STRING,
    credit_level STRING,
    threaten_score STRING,
    threaten_source STRING,
    created_time TIMESTAMP(3),
    updated_time TIMESTAMP(3),
    attack_time STRING,
    source STRING,
    ip STRING,
    domain STRING,
    device STRING,
    iphone STRING,
    PRIMARY KEY(_id) NOT ENFORCED
);
insert into fts_catalog.`default`.dim_fts_xxx select * from default_catalog.default_database.dim_mongo_xxx ;

the exception is below:

java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Read split SnapshotSplit{tableId=Furion.ThreatenScore, splitId='xxx.xxxxxx:2', splitKeyType=[_id INT], splitStart=[{"_id": 1}, {"_id": "04gif.com"}], splitEnd=[{"_id": 1}, {"_id": "09166544.7cmw.com"}], highWatermark=null} error due to Open change stream failed.
at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.checkReadException(IncrementalSourceScanFetcher.java:181)
at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.pollSplitRecords(IncrementalSourceScanFetcher.java:128)
at com.ververica.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:73)
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
... 6 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Open change stream failed
at com.ververica.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java:265)
at com.ververica.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.execute(MongoDBStreamFetchTask.java:108)
at com.ververica.cdc.connectors.mongodb.source.reader.fetch.MongoDBScanFetchTask.execute(MongoDBScanFetchTask.java:183)
at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$0(IncrementalSourceScanFetcher.java:94)
... 5 more
Caused by: com.mongodb.MongoCommandException: Command failed with error 280 (ChangeStreamFatalError): 'cannot resume stream; the resume token was not found. {_data: "8263C7B78200001B832B022C0100296E5A100444A181EBC5C44EE98FC7F7B47B7FB72B463C5F6964003C3131342E34362E34312E313139000004"}' on server xxx.xxx.xx.x:27017. The full response is {"errorLabels": ["NonResumableChangeStreamError"], "operationTime": {"$timestamp": {"t": 1674033032, "i": 17}}, "ok": 0.0, "errmsg": "cannot resume stream; the resume token was not found. {_data: "8263C7B78200001B832B022C0100296E5A100444A181EBC5C44EE98FC7F7B47B7FB72B463C5F6964003C3131342E34362E34312E313139000004"}", "code": 280, "codeName": "ChangeStreamFatalError", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1674033032, "i": 30}}, "signature": {"hash": {"$binary": {"base64": "9xZDZYaihfU1bsbUm1t+yovaEbU=", "subType": "00"}}, "keyId": 7184636505522962435}}}
at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:195)
at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:398)
at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:319)
at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:114)
at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:603)
at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:81)
at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:252)
at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:214)
at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:123)
at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:113)
at com.mongodb.internal.operation.CommandOperationHelper.executeCommand(CommandOperationHelper.java:328)
at com.mongodb.internal.operation.CommandOperationHelper.executeCommand(CommandOperationHelper.java:318)
at com.mongodb.internal.operation.CommandOperationHelper.executeCommandWithConnection(CommandOperationHelper.java:201)
at com.mongodb.internal.operation.CommandOperationHelper.lambda$executeCommand$4(CommandOperationHelper.java:189)
at com.mongodb.internal.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:583)
at com.mongodb.internal.operation.CommandOperationHelper.executeCommand(CommandOperationHelper.java:189)
at com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:195)
at com.mongodb.internal.operation.ChangeStreamOperation$1.call(ChangeStreamOperation.java:323)
at com.mongodb.internal.operation.ChangeStreamOperation$1.call(ChangeStreamOperation.java:319)
at com.mongodb.internal.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:583)
at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:319)
at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:58)
at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:184)
at com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:204)
at com.mongodb.client.internal.ChangeStreamIterableImpl.access$000(ChangeStreamIterableImpl.java:53)
at com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:129)
at com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:121)
at com.ververica.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java:237)
... 8 more

@bigjar bigjar added the bug Something isn't working label Jan 18, 2023
@ai-smalleryu
Copy link

@bigjar How did you do it, I tried to reproduce it, but I tested it here

@gj-zhang
Copy link

gj-zhang commented Feb 7, 2023

+1

1 similar comment
@carlvine500
Copy link

+1

@Jiabao-Sun Jiabao-Sun changed the title the mongodb cdc will appear "the resume token was not found" error MongoDB CDC will appear "the resume token was not found" error Feb 23, 2023
@Jiabao-Sun Jiabao-Sun self-assigned this Feb 23, 2023
Jiabao-Sun added a commit to Jiabao-Sun/flink-cdc-connectors that referenced this issue Feb 23, 2023
vanliu-tx pushed a commit to BKBASE-Plugin/flink-cdc-connectors that referenced this issue Mar 22, 2023
wking1986 pushed a commit to BKBASE-Plugin/flink-cdc-connectors that referenced this issue Mar 22, 2023
* [mongodb][hotfix] Fix authentication failed when using a non admin auth source and duplicated usernames apache#1934 (apache#1935)

* [hotfix][mongodb] Fix resume token not found apache#1879 (apache#1938)

---------

Co-authored-by: Jiabao Sun <[email protected]>
rawlinxx pushed a commit to rawlinxx/flink-cdc-connectors that referenced this issue May 29, 2023
@legend91325
Copy link

who can explain why the error happen ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants