Skip to content

Commit

Permalink
report position
Browse files Browse the repository at this point in the history
  • Loading branch information
sodaRyCN committed May 17, 2024
1 parent 2bd3e57 commit d6c21ec
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
import com.baomidou.mybatisplus.extension.service.IService;
import org.apache.eventmesh.common.remote.JobState;

/**
* @author sodafang
* @description 针对表【event_mesh_job_info】的数据库操作Service
* @createDate 2024-05-09 15:51:45
*/
public interface EventMeshJobInfoService extends IService<EventMeshJobInfo> {

boolean updateJobState(Integer jobID, JobState state);
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package com.apache.eventmesh.admin.server.web.db.service.impl;

import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
import com.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService;
import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshJobInfoMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.eventmesh.common.remote.JobState;
import org.springframework.stereotype.Service;

/**
Expand All @@ -12,9 +15,21 @@
* @createDate 2024-05-09 15:51:45
*/
@Service
@Slf4j
public class EventMeshJobInfoServiceImpl extends ServiceImpl<EventMeshJobInfoMapper, EventMeshJobInfo>
implements EventMeshJobInfoService{

@Override
public boolean updateJobState(Integer jobID, JobState state) {
if (jobID == null || state == null) {
return false;
}
EventMeshJobInfo jobInfo = new EventMeshJobInfo();
jobInfo.setJobID(jobID);
jobInfo.setState(state.ordinal());
update(jobInfo, Wrappers.<EventMeshJobInfo>update().notIn("state",JobState.DELETE,JobState.COMPLETE));
return true;
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@
import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
import org.apache.eventmesh.common.remote.exception.ErrorCode;
import org.apache.eventmesh.common.remote.job.DataSourceType;
import org.apache.eventmesh.common.remote.offset.RecordPosition;
import org.apache.eventmesh.common.remote.request.FetchPositionRequest;
import org.apache.eventmesh.common.remote.request.ReportPositionRequest;
import org.apache.eventmesh.common.remote.response.FetchPositionResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
@Slf4j
public class MysqlReportPositionHandler extends PositionHandler {
Expand All @@ -30,9 +33,11 @@ protected DataSourceType getSourceType() {
public boolean handler(ReportPositionRequest request, Metadata metadata) {
for (int i = 0; i < 3; i++) {
try {
List<RecordPosition> recordPositionList = request.getRecordPositionList();
EventMeshMysqlPosition position = new EventMeshMysqlPosition();
position.setJobID(Integer.parseInt(request.getJobID()));
position.setAddress(request.getAddress());

if (!positionService.saveOrUpdateByJob(position)) {
log.warn("update job position fail [{}]", request);
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,15 @@

import com.apache.eventmesh.admin.server.AdminServerException;
import com.apache.eventmesh.admin.server.web.db.DBThreadPool;
import com.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource;
import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
import com.apache.eventmesh.admin.server.web.db.service.EventMeshDataSourceService;
import com.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService;
import com.apache.eventmesh.admin.server.web.handler.position.IReportPositionHandler;
import com.apache.eventmesh.admin.server.web.handler.position.PositionHandlerFactory;
import com.apache.eventmesh.admin.server.web.handler.request.BaseRequestHandler;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
import org.apache.eventmesh.common.remote.exception.ErrorCode;
import org.apache.eventmesh.common.remote.job.DataSourceType;
import org.apache.eventmesh.common.remote.request.ReportPositionRequest;
import org.apache.eventmesh.common.remote.response.EmptyAckResponse;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -36,34 +34,40 @@ public class ReportPositionHandler extends BaseRequestHandler<ReportPositionRequ

@Override
protected EmptyAckResponse handler(ReportPositionRequest request, Metadata metadata) {
EventMeshJobInfo jobInfo = jobInfoService.getById(request.getJobID());
if (jobInfo == null) {
throw new AdminServerException(ErrorCode.BAD_DB_DATA, String.format("job id [%s] not exists in db",
request.getJobID()));
if (request.getDataSourceType() == null) {
throw new AdminServerException(ErrorCode.BAD_REQUEST, "illegal data type, it's empty");
}
EventMeshDataSource sourceDB = dataSourceService.getById(jobInfo.getSourceData());
if (sourceDB == null) {
throw new AdminServerException(ErrorCode.BAD_DB_DATA, String.format("data base [%s] job id [%s] not " +
"exists in db", jobInfo.getSourceData(), jobInfo.getJobID()));
if (StringUtils.isBlank(request.getJobID())) {
throw new AdminServerException(ErrorCode.BAD_REQUEST, "illegal job id, it's empty");
}
DataSourceType type = DataSourceType.getDataSourceType(sourceDB.getDataType());
if (type == null) {
throw new AdminServerException(ErrorCode.BAD_DB_DATA, String.format("illegal data base [%s] job id [%s] " +
"type [%d]", jobInfo.getSourceData(), jobInfo.getJobID(), sourceDB.getDataType()));
int jobID;

try {
jobID = Integer.parseInt(request.getJobID());
} catch (NumberFormatException e) {
throw new AdminServerException(ErrorCode.BAD_REQUEST, String.format("illegal job id [%s] format",
request.getJobID()));
}
IReportPositionHandler handler = positionHandlerFactory.getHandler(type);

IReportPositionHandler handler = positionHandlerFactory.getHandler(request.getDataSourceType());
if (handler == null) {
throw new AdminServerException(ErrorCode.BAD_DB_DATA, String.format("illegal data base [%s] job id [%s] " +
"type [%d], it not match any handler", jobInfo.getSourceData(), jobInfo.getJobID(),
sourceDB.getDataType()));
throw new AdminServerException(ErrorCode.BAD_DB_DATA, String.format("illegal data base job id [%s] " +
"type [%s], it not match any handler", request.getJobID(),
request.getDataSourceType().getName()));
}

executor.getExecutors().execute(() -> {
try {
jobInfoService.updateJobState(jobID, request.getState());
} catch (Exception e) {
log.warn("update job id [{}] type [{}] state [{}] fail", request.getJobID(),
request.getDataSourceType(), request.getState(), e);
}
try {
handler.handler(request, metadata);
} catch (Exception e) {
log.warn("handle position request fail data base [{}] job id [{}] type [{}]", jobInfo.getSourceData()
, jobInfo.getJobID(), sourceDB.getDataType(), e);
log.warn("handle position request fail, job id [{}] type [{}]", request.getJobID(),
request.getDataSourceType(), e);
}
});
return new EmptyAckResponse();
Expand Down

0 comments on commit d6c21ec

Please sign in to comment.