Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [Connecters] Seatunnel 混合集群模式,重启服务之后,connector 抛出 ClassCastException #8180

Open
3 tasks done
AceGain opened this issue Nov 30, 2024 · 0 comments
Labels

Comments

@AceGain
Copy link

AceGain commented Nov 30, 2024

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

步骤1:
按照官网教程部署seatunnel,混合集群模式,单机;
步骤2:
通过restful api提交任务给seatunnel,期间没有异常,数据也正常入库;
步骤3:
通过restful api关闭job,通过bin下的shell脚本,关闭seatunnel服务;
步骤4:
配置 jvm,重新启动seatunnel,并通过restful api提交相同任务,jobid和jobname不同;出现异常;

SeaTunnel Version

2.37

SeaTunnel Config

{
    "env": {
        "job.mode": "STREAMING"
    },
    "source": [
        {
            "plugin_name": "Kafka",
            "schema": {
                "fields": {
                    "productCode": "string",
                    "productName": "string",
                    "deviceType": "string",
                    "deviceTypeName": "string",
                    "deviceSn": "string",
                    "deviceName": "string",
                    "time": "string",
                    "liquidlevel": "string",
                    "flowrate": "string",
                    "flowrateA": "string",
                    "flowrateB": "string"
                }
            },
            "start_mode": "earliest",
            "bootstrap.servers": "10.136.11.70:9092",
            "topic": "zysbf_sc_JCMY",
            "result_table_name": "kafka_table",
            "group": "SeaTunnel-Consumer-Group"
        }
    ],
    "transform": [
        {
            "plugin_name": "Sql",
            "query": "SELECT 'admin' AS create_by, FORMATDATETIME(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS create_time, 'admin' AS update_by, FORMATDATETIME(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS update_time, 'admin' AS obj, 'admin' AS obj_type, 'admin' AS note, productCode, productName, deviceType, deviceTypeName, deviceSn, deviceName, time, IFNULL(liquidlevel,'') AS liquidlevel, IFNULL(flowrate,'') AS flowrate, IFNULL(flowrateA,'') AS flowrateA, IFNULL(flowrateB,'') AS flowrateB FROM kafka_table;",
            "source_table_name": "kafka_table",
            "result_table_name": "clickhouse_table"
        }
    ],
    "sink": [
        {
            "plugin_name": "Console",
            "source_table_name": "clickhouse_table"
        }
    ]
}

Running Command

http://192.168.10.84:5801/hazelcast/rest/maps/submit-job?jobId=173260734713252320&jobName=jc_zysbf_sc_n_173260734713252320&isStartWithSavePoint=false

{
    "env": {
        "job.mode": "STREAMING"
    },
    "source": [
        {
            "plugin_name": "Kafka",
            "schema": {
                "fields": {
                    "productCode": "string",
                    "productName": "string",
                    "deviceType": "string",
                    "deviceTypeName": "string",
                    "deviceSn": "string",
                    "deviceName": "string",
                    "time": "string",
                    "liquidlevel": "string",
                    "flowrate": "string",
                    "flowrateA": "string",
                    "flowrateB": "string"
                }
            },
            "start_mode": "earliest",
            "bootstrap.servers": "10.136.11.70:9092",
            "topic": "zysbf_sc_JCMY",
            "result_table_name": "kafka_table",
            "group": "SeaTunnel-Consumer-Group"
        }
    ],
    "transform": [
        {
            "plugin_name": "Sql",
            "query": "SELECT 'admin' AS create_by, FORMATDATETIME(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS create_time, 'admin' AS update_by, FORMATDATETIME(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS update_time, 'admin' AS obj, 'admin' AS obj_type, 'admin' AS note, productCode, productName, deviceType, deviceTypeName, deviceSn, deviceName, time, IFNULL(liquidlevel,'') AS liquidlevel, IFNULL(flowrate,'') AS flowrate, IFNULL(flowrateA,'') AS flowrateA, IFNULL(flowrateB,'') AS flowrateB FROM kafka_table;",
            "source_table_name": "kafka_table",
            "result_table_name": "clickhouse_table"
        }
    ],
    "sink": [
        {
            "plugin_name": "Console",
            "source_table_name": "clickhouse_table"
        }
    ]
}

Error Exception

2024-11-30 15:43:07,449 ERROR [o.a.s.e.s.d.p.PhysicalVertex  ] [hz.main.seaTunnel.task.thread-21] - Job jc_zysbf_sc_n_173260734713252320 (173260734713252320), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-Kafka]-SplitEnumerator (1/1)] end with state FAILED and Exception: java.util.concurrent.CompletionException: java.lang.ClassCastException: cannot assign instance of org.apache.kafka.common.TopicPartition to field org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplit.topicPartition of type org.apache.kafka.common.TopicPartition in instance of org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplit
        at com.hazelcast.spi.impl.AbstractInvocationFuture.returnOrThrowWithJoinConventions(AbstractInvocationFuture.java:819)
        at com.hazelcast.spi.impl.AbstractInvocationFuture.resolveAndThrowWithJoinConvention(AbstractInvocationFuture.java:835)
        at com.hazelcast.spi.impl.AbstractInvocationFuture.join(AbstractInvocationFuture.java:553)
        at org.apache.seatunnel.engine.server.task.context.SeaTunnelSplitEnumeratorContext.assignSplit(SeaTunnelSplitEnumeratorContext.java:86)
        at org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitEnumerator.lambda$assignSplit$13(KafkaSourceSplitEnumerator.java:330)
        at java.base/java.util.HashMap.forEach(HashMap.java:1337)
        at org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitEnumerator.assignSplit(KafkaSourceSplitEnumerator.java:328)
        at org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitEnumerator.run(KafkaSourceSplitEnumerator.java:131)
        at org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask.stateProcess(SourceSplitEnumeratorTask.java:319)
        at org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask.call(SourceSplitEnumeratorTask.java:138)
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:717)
        at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1039)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.kafka.common.TopicPartition to field org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplit.topicPartition of type org.apache.kafka.common.TopicPartition in instance of org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplit
        at java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2076)
        at java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2039)
        at java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1293)
        at java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2512)
        at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2419)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
        at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
        at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
        at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
        at org.apache.seatunnel.common.utils.SerializationUtils.deserialize(SerializationUtils.java:73)
        at org.apache.seatunnel.api.serialization.DefaultSerializer.deserialize(DefaultSerializer.java:41)
        at org.apache.seatunnel.api.serialization.DefaultSerializer.deserialize(DefaultSerializer.java:25)
        at org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation.lambda$run$0(AssignSplitOperation.java:67)
        at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48)
        at org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation.run(AssignSplitOperation.java:54)
        at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
        at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
        at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
        at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
        at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
        at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
        at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
        at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)

Zeta or Flink or Spark Version

Zeta

Java or Scala Version

java 11.0.2

Screenshots

微信截图_20241130155930

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@AceGain AceGain added the bug label Nov 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant