diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoService.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoService.java index 7d5ed144f4..89e5c1ffe1 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoService.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoService.java @@ -2,6 +2,7 @@ import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; import com.baomidou.mybatisplus.extension.service.IService; +import org.apache.eventmesh.common.remote.JobState; /** * @author sodafang @@ -9,5 +10,5 @@ * @createDate 2024-05-09 15:51:45 */ public interface EventMeshJobInfoService extends IService { - + boolean updateJobState(Integer jobID, JobState state); } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java index b189adca6e..96d9041935 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java @@ -1,9 +1,12 @@ package com.apache.eventmesh.admin.server.web.db.service.impl; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; import com.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService; import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshJobInfoMapper; +import lombok.extern.slf4j.Slf4j; +import org.apache.eventmesh.common.remote.JobState; import org.springframework.stereotype.Service; /** @@ -12,9 +15,21 @@ * @createDate 2024-05-09 15:51:45 */ @Service +@Slf4j public class EventMeshJobInfoServiceImpl extends ServiceImpl implements EventMeshJobInfoService{ + @Override + public boolean updateJobState(Integer jobID, JobState state) { + if (jobID == null || state == null) { + return false; + } + EventMeshJobInfo jobInfo = new EventMeshJobInfo(); + jobInfo.setJobID(jobID); + jobInfo.setState(state.ordinal()); + update(jobInfo, Wrappers.update().notIn("state",JobState.DELETE,JobState.COMPLETE)); + return true; + } } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/impl/MysqlReportPositionHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/impl/MysqlReportPositionHandler.java index 59cbd00d46..96d19942be 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/impl/MysqlReportPositionHandler.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/position/impl/MysqlReportPositionHandler.java @@ -8,6 +8,7 @@ import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; import org.apache.eventmesh.common.remote.exception.ErrorCode; import org.apache.eventmesh.common.remote.job.DataSourceType; +import org.apache.eventmesh.common.remote.offset.RecordPosition; import org.apache.eventmesh.common.remote.request.FetchPositionRequest; import org.apache.eventmesh.common.remote.request.ReportPositionRequest; import org.apache.eventmesh.common.remote.response.FetchPositionResponse; @@ -15,6 +16,8 @@ import org.springframework.dao.DuplicateKeyException; import org.springframework.stereotype.Component; +import java.util.List; + @Component @Slf4j public class MysqlReportPositionHandler extends PositionHandler { @@ -30,9 +33,11 @@ protected DataSourceType getSourceType() { public boolean handler(ReportPositionRequest request, Metadata metadata) { for (int i = 0; i < 3; i++) { try { + List recordPositionList = request.getRecordPositionList(); EventMeshMysqlPosition position = new EventMeshMysqlPosition(); position.setJobID(Integer.parseInt(request.getJobID())); position.setAddress(request.getAddress()); + if (!positionService.saveOrUpdateByJob(position)) { log.warn("update job position fail [{}]", request); return false; diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/ReportPositionHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/ReportPositionHandler.java index f070af5ba8..26fa0dbabb 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/ReportPositionHandler.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/request/impl/ReportPositionHandler.java @@ -2,17 +2,15 @@ import com.apache.eventmesh.admin.server.AdminServerException; import com.apache.eventmesh.admin.server.web.db.DBThreadPool; -import com.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource; -import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo; import com.apache.eventmesh.admin.server.web.db.service.EventMeshDataSourceService; import com.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService; import com.apache.eventmesh.admin.server.web.handler.position.IReportPositionHandler; import com.apache.eventmesh.admin.server.web.handler.position.PositionHandlerFactory; import com.apache.eventmesh.admin.server.web.handler.request.BaseRequestHandler; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; import org.apache.eventmesh.common.remote.exception.ErrorCode; -import org.apache.eventmesh.common.remote.job.DataSourceType; import org.apache.eventmesh.common.remote.request.ReportPositionRequest; import org.apache.eventmesh.common.remote.response.EmptyAckResponse; import org.springframework.beans.factory.annotation.Autowired; @@ -36,34 +34,40 @@ public class ReportPositionHandler extends BaseRequestHandler { + try { + jobInfoService.updateJobState(jobID, request.getState()); + } catch (Exception e) { + log.warn("update job id [{}] type [{}] state [{}] fail", request.getJobID(), + request.getDataSourceType(), request.getState(), e); + } try { handler.handler(request, metadata); } catch (Exception e) { - log.warn("handle position request fail data base [{}] job id [{}] type [{}]", jobInfo.getSourceData() - , jobInfo.getJobID(), sourceDB.getDataType(), e); + log.warn("handle position request fail, job id [{}] type [{}]", request.getJobID(), + request.getDataSourceType(), e); } }); return new EmptyAckResponse();