Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [MySQL-CDC] Restore task failed due to the deletion of binlog #8586

Open
3 tasks done
lm-ylj opened this issue Jan 24, 2025 · 0 comments · May be fixed by #8587
Open
3 tasks done

[Bug] [MySQL-CDC] Restore task failed due to the deletion of binlog #8586

lm-ylj opened this issue Jan 24, 2025 · 0 comments · May be fixed by #8587
Labels

Comments

@lm-ylj
Copy link
Contributor

lm-ylj commented Jan 24, 2025

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

After a period of time, due to expiration, binlog will be automatically deleted by the system. On Tencent Cloud, the default is 120 hours. If the data in the table has not been modified within a period of time (such as 7 days), and there is data written to MySQL at this moment, and the task fails to execute due to connection interruption or other reasons, SeaTunnel will automatically restart the task. When restarting the task, data will be recovered from the checkpoint, but the checkpoint will only update startupOffset when data changes occur. Therefore, the checkpoint still records the binlog and position from a long time ago. At this moment, the binlog file has been deleted by the system. Due to the absence of binlog during the restart process, the task cannot be restored normally
The root cause is that only data changes are made in the following: IncrementalSourceRecordEmitter#processElement

    //  Only data changes will update startupOffset
    protected void processElement(
            SourceRecord element, Collector<T> output, SourceSplitStateBase splitState)
            throws Exception {
        if (isWatermarkEvent(element)) {
            Offset watermark = getWatermark(element);
            if (isLowWatermarkEvent(element) && splitState.isSnapshotSplitState()) {
                splitState.asSnapshotSplitState().setLowWatermark(watermark);
            } else if (isHighWatermarkEvent(element) && splitState.isSnapshotSplitState()) {
                splitState.asSnapshotSplitState().setHighWatermark(watermark);
            } else if ((isSchemaChangeBeforeWatermarkEvent(element)
                            || isSchemaChangeAfterWatermarkEvent(element))
                    && splitState.isIncrementalSplitState()) {
                emitElement(element, output);
            }
        } else if (isSchemaChangeEvent(element) && splitState.isIncrementalSplitState()) {
            emitElement(element, output);
        } else if (isDataChangeRecord(element)) {
            if (splitState.isIncrementalSplitState()) {
                Offset position = getOffsetPosition(element);
                splitState.asIncrementalSplitState().setStartupOffset(position);
            }
            emitElement(element, output);
        } else {
            emitElement(element, output);
        }
    }

We should also modify startupOffset when receiving binlog heartbeat events to solve this problem

SeaTunnel Version

2.3.7

SeaTunnel Config

env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 30000
}

source {
  MySQL-CDC {
    base-url = "jdbc:mysql://xxx/xxx"
    username = "xxx"
    password = "xxx"
    table-names = ["xxx"]
    startup.mode = "initial"
  }
}

sink {
 Redis {
    host = "xxx"
    port = xxx
    auth = "xxx"
    key = "xxx"
    data_type = "key"
    db_num = 3
  }
}

Running Command

bin/seatunnel.sh --config task-config/mysqlcdc-to-redis.conf --async -n mysqlcdc-to-redis.conf

Error Exception

2025-01-22 16:03:08,401 ERROR [o.a.s.e.s.d.p.PhysicalVertex  ] [hz.main.generic-operation.thread-32] - Job pay_blacklist.conf (928945040787505157), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-MySQL-CDC]-SourceTask (1/1)] end with state FAILED and Exception: java.lang.RuntimeException: One or more fetchers have encountered exception
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:147)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:167)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:93)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119)
	at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:156)
	at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
	at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)
	at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:721)
	at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1043)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:81)
	... 5 more
Caused by: java.lang.IllegalStateException: The connector is trying to read binlog starting at Struct{version=1.9.8.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1737532988174,db=,server_id=0,file=mysql-bin.001566,pos=177960575,row=0}, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.
	at org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.MySqlSourceFetchTaskContext.loadStartingOffsetState(MySqlSourceFetchTaskContext.java:281)
	at org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.MySqlSourceFetchTaskContext.configure(MySqlSourceFetchTaskContext.java:127)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher.submitTask(IncrementalSourceStreamFetcher.java:97)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.checkSplitOrStartNext(IncrementalSourceSplitReader.java:147)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:71)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.FetchTask.run(FetchTask.java:54)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
	... 6 more

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@lm-ylj lm-ylj added the bug label Jan 24, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant