Skip to content

Commit

Permalink
[ISSUE apache#5067] Enhancement for eventmesh-admin-server
Browse files Browse the repository at this point in the history
  • Loading branch information
xwm1992 committed Aug 2, 2024
1 parent c770607 commit 7af3d16
Show file tree
Hide file tree
Showing 22 changed files with 272 additions and 193 deletions.
10 changes: 8 additions & 2 deletions eventmesh-admin-server/conf/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,11 @@ mybatis-plus:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
event-mesh:
admin-server:
service-name: DEFAULT_GROUP@@em_adm_server
port: 8081
serviceName: DEFAULT_GROUP@@em_adm_server
port: 8081
adminServerList:
region1:
- http://localhost:8081
region2:
- http://localhost:8082
region: region1
14 changes: 8 additions & 6 deletions eventmesh-admin-server/conf/eventmesh.sql
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,15 @@ CREATE TABLE IF NOT EXISTS `event_mesh_data_source` (
CREATE TABLE IF NOT EXISTS `event_mesh_job_info` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`jobID` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`desc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
`jobDesc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
`taskID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`transportType` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`sourceData` int NOT NULL DEFAULT '0',
`targetData` int NOT NULL DEFAULT '0',
`state` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`jobState` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`jobType` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`fromRegion` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`runningRegion` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`createUid` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`updateUid` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
Expand Down Expand Up @@ -118,10 +119,11 @@ CREATE TABLE IF NOT EXISTS `event_mesh_runtime_history` (
CREATE TABLE IF NOT EXISTS `event_mesh_task_info` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`taskID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
`name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`desc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
`state` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT 'taskstate',
`fromRegion` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`taskName` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`taskDesc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
`taskState` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT 'taskstate',
`sourceRegion` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`targetRegion` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`createUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`updateUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
Expand Down
40 changes: 21 additions & 19 deletions eventmesh-admin-server/conf/mapper/EventMeshJobInfoMapper.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,33 @@
-->

<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.apache.eventmesh.admin.server.web.db.mapper.EventMeshJobInfoMapper">

<resultMap id="BaseResultMap" type="org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo">
<id property="id" column="id" jdbcType="INTEGER"/>
<result property="jobID" column="jobID" jdbcType="VARCHAR"/>
<result property="desc" column="desc" jdbcType="VARCHAR"/>
<result property="taskID" column="taskID" jdbcType="VARCHAR"/>
<result property="transportType" column="transportType" jdbcType="VARCHAR"/>
<result property="sourceData" column="sourceData" jdbcType="INTEGER"/>
<result property="targetData" column="targetData" jdbcType="INTEGER"/>
<result property="state" column="state" jdbcType="VARCHAR"/>
<result property="jobType" column="jobType" jdbcType="VARCHAR"/>
<result property="fromRegion" column="fromRegion" jdbcType="VARCHAR"/>
<result property="createUid" column="createUid" jdbcType="VARCHAR"/>
<result property="updateUid" column="updateUid" jdbcType="VARCHAR"/>
<result property="createTime" column="createTime" jdbcType="TIMESTAMP"/>
<result property="updateTime" column="updateTime" jdbcType="TIMESTAMP"/>
<id property="id" column="id" jdbcType="INTEGER"/>
<result property="jobID" column="jobID" jdbcType="VARCHAR"/>
<result property="jobDesc" column="desc" jdbcType="VARCHAR"/>
<result property="taskID" column="taskID" jdbcType="VARCHAR"/>
<result property="transportType" column="transportType" jdbcType="VARCHAR"/>
<result property="sourceData" column="sourceData" jdbcType="INTEGER"/>
<result property="targetData" column="targetData" jdbcType="INTEGER"/>
<result property="jobState" column="state" jdbcType="VARCHAR"/>
<result property="jobType" column="jobType" jdbcType="VARCHAR"/>
<result property="fromRegion" column="sourceRegion" jdbcType="VARCHAR"/>
<result property="runningRegion" column="targetRegion" jdbcType="VARCHAR"/>
<result property="createUid" column="createUid" jdbcType="VARCHAR"/>
<result property="updateUid" column="updateUid" jdbcType="VARCHAR"/>
<result property="createTime" column="createTime" jdbcType="TIMESTAMP"/>
<result property="updateTime" column="updateTime" jdbcType="TIMESTAMP"/>
</resultMap>

<sql id="Base_Column_List">
id,jobID,desc,
id,jobID,jobDesc,
taskID,transportType,sourceData,
targetData,state,jobType,
fromRegion,createTime,updateTime
targetData,jobState,jobType,
fromRegion,runningRegion,createUid,
updateUid,createTime,updateTime
</sql>
</mapper>
13 changes: 7 additions & 6 deletions eventmesh-admin-server/conf/mapper/EventMeshTaskInfoMapper.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,20 @@
<resultMap id="BaseResultMap" type="org.apache.eventmesh.admin.server.web.db.entity.EventMeshTaskInfo">
<id property="id" column="id" jdbcType="INTEGER"/>
<result property="taskID" column="taskID" jdbcType="VARCHAR"/>
<result property="name" column="name" jdbcType="VARCHAR"/>
<result property="desc" column="desc" jdbcType="VARCHAR"/>
<result property="state" column="state" jdbcType="VARCHAR"/>
<result property="fromRegion" column="fromRegion" jdbcType="VARCHAR"/>
<result property="taskName" column="taskName" jdbcType="VARCHAR"/>
<result property="taskDesc" column="taskDesc" jdbcType="VARCHAR"/>
<result property="taskState" column="taskState" jdbcType="VARCHAR"/>
<result property="sourceRegion" column="sourceRegion" jdbcType="VARCHAR"/>
<result property="targetRegion" column="targetRegion" jdbcType="VARCHAR"/>
<result property="createUid" column="createUid" jdbcType="VARCHAR"/>
<result property="updateUid" column="updateUid" jdbcType="VARCHAR"/>
<result property="createTime" column="createTime" jdbcType="TIMESTAMP"/>
<result property="updateTime" column="updateTime" jdbcType="TIMESTAMP"/>
</resultMap>

<sql id="Base_Column_List">
id,taskID,name,
desc,state,fromRegion,
id,taskID,taskName,
taskDesc,taskState,sourceRegion,targetRegion,
createUid,updateUid,createTime,
updateTime
</sql>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.eventmesh.admin.server;

import java.util.List;
import java.util.Map;

import org.springframework.boot.context.properties.ConfigurationProperties;

import lombok.Getter;
Expand All @@ -32,4 +35,6 @@ public class AdminServerProperties {
private String configurationPath;
private String configurationFile;
private String serviceName;
private Map<String, List<String>> adminServerList;
private String region;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,21 @@
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import com.alibaba.druid.support.json.JSONUtils;

@RestController
@RequestMapping("/eventmesh/admin")
public class HttpServer {
@Autowired
private TaskBizService taskService;

@RequestMapping("/createTask")
public ResponseEntity<Response<String>> createOrUpdateTask(@RequestBody CreateTaskRequest task) {
@RequestMapping(value = "/createTask", method = RequestMethod.POST)
public ResponseEntity<Object> createOrUpdateTask(@RequestBody CreateTaskRequest task) {
String uuid = taskService.createTask(task);
return ResponseEntity.ok(Response.success(uuid));
return ResponseEntity.ok(JSONUtils.toJSONString(Response.success(uuid)));
}

public boolean deleteTask(Long id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class EventMeshJobInfo implements Serializable {

private String jobID;

private String desc;
private String jobDesc;

private String taskID;

Expand All @@ -47,12 +47,16 @@ public class EventMeshJobInfo implements Serializable {

private Integer targetData;

private String state;
private String jobState;

private String jobType;

// job request from region
private String fromRegion;

// job actually running region
private String runningRegion;

private String createUid;

private String updateUid;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ public class EventMeshTaskInfo implements Serializable {

private String taskID;

private String name;
private String taskName;

private String desc;
private String taskDesc;

private String state;
private String taskState;

private String fromRegion;
private String sourceRegion;

private String targetRegion;

private String createUid;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,31 @@

import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Options;
import org.apache.ibatis.annotations.Param;

import java.util.List;

import org.springframework.transaction.annotation.Transactional;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;

/**
* etx operator for table event_mesh_job_info
*/
@Mapper
public interface EventMeshJobInfoExtMapper extends BaseMapper<EventMeshJobInfo> {
@Insert("insert into event_mesh_job_info(`taskID`,`state`,`jobType`) values"
+ "<foreach collection= 'jobs' item='job' separator=','>(#{job.taskID},#{job.state},#{job.jobType})</foreach>")
@Options(useGeneratedKeys = true, keyProperty = "jobID")

@Insert("<script>"
+ "insert into event_mesh_job_info(jobID, jobDesc, taskID, transportType, sourceData, "
+ "targetData, jobState, jobType, fromRegion, runningRegion, "
+ "createUid, updateUid) values"
+ "<foreach collection= 'jobs' item='job' separator=','>"
+ "(#{job.jobID}, #{job.jobDesc}, #{job.taskID}, #{job.transportType}, "
+ "#{job.sourceData}, #{job.targetData}, #{job.jobState}, #{job.jobType}, "
+ "#{job.fromRegion}, #{job.runningRegion}, #{job.createUid}, #{job.updateUid})"
+ "</foreach>"
+ "</script>")
@Transactional(rollbackFor = Exception.class)
int saveBatch(@Param("jobs") List<EventMeshJobInfo> jobInfoList);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.admin.server.web.db.service.impl;

import org.apache.eventmesh.admin.server.web.db.entity.EventMeshVerify;
import org.apache.eventmesh.admin.server.web.db.mapper.EventMeshVerifyMapper;
import org.apache.eventmesh.admin.server.web.db.service.EventMeshVerifyService;

import org.springframework.stereotype.Service;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;

/**
* event_mesh_verify
*/
@Service
public class EventMeshVerifyServiceImpl extends ServiceImpl<EventMeshVerifyMapper, EventMeshVerify>
implements EventMeshVerifyService {

}




Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class JobDetail {

private String jobID;

private String desc;
private String jobDesc;

private String taskID;

Expand All @@ -50,7 +50,11 @@ public class JobDetail {

private String updateUid;

private String region;
// job request from region
private String fromRegion;

// job actually running region
private String runningRegion;

private DataSource sourceDataSource;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.eventmesh.admin.server.web.service.job;

import org.apache.eventmesh.admin.server.AdminServerProperties;
import org.apache.eventmesh.admin.server.AdminServerRuntimeException;
import org.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource;
import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
Expand Down Expand Up @@ -70,12 +71,15 @@ public class JobInfoBizService {
@Autowired
private PositionBizService positionBizService;

@Autowired
private AdminServerProperties properties;

public boolean updateJobState(String jobID, TaskState state) {
if (jobID == null || state == null) {
return false;
}
EventMeshJobInfo jobInfo = new EventMeshJobInfo();
jobInfo.setState(state.name());
jobInfo.setJobState(state.name());
return jobInfoService.update(jobInfo, Wrappers.<EventMeshJobInfo>update().eq("jobID", jobID).ne("state", TaskState.DELETE.name()));
}

Expand All @@ -86,34 +90,40 @@ public List<EventMeshJobInfo> createJobs(List<JobDetail> jobs) {
return null;
}
List<EventMeshJobInfo> entityList = new LinkedList<>();

for (JobDetail job : jobs) {
// if running region not equal with admin region continue
if (!job.getRunningRegion().equals(properties.getRegion())) {
continue;
}
EventMeshJobInfo entity = new EventMeshJobInfo();
entity.setState(TaskState.INIT.name());
entity.setJobState(TaskState.INIT.name());
entity.setTaskID(job.getTaskID());
entity.setJobType(job.getJobType().name());
entity.setDesc(job.getDesc());
entity.setJobDesc(job.getJobDesc());
String jobID = UUID.randomUUID().toString();
entity.setJobID(jobID);
entity.setTransportType(job.getTransportType().name());
entity.setCreateUid(job.getCreateUid());
entity.setUpdateUid(job.getUpdateUid());
entity.setFromRegion(job.getRegion());
entity.setFromRegion(job.getFromRegion());
entity.setRunningRegion(job.getRunningRegion());
CreateOrUpdateDataSourceReq source = new CreateOrUpdateDataSourceReq();
source.setType(job.getTransportType().getSrc());
source.setOperator(job.getCreateUid());
source.setRegion(job.getRegion());
source.setRegion(job.getSourceDataSource().getRegion());
source.setDesc(job.getSourceConnectorDesc());
source.setConfig(job.getSourceDataSource());
source.setConfig(job.getSourceDataSource().getConf());
EventMeshDataSource createdSource = dataSourceBizService.createDataSource(source);
entity.setSourceData(createdSource.getId());

CreateOrUpdateDataSourceReq sink = new CreateOrUpdateDataSourceReq();
sink.setType(job.getTransportType().getDst());
sink.setOperator(job.getCreateUid());
sink.setRegion(job.getRegion());
sink.setRegion(job.getSinkDataSource().getRegion());
sink.setDesc(job.getSinkConnectorDesc());
sink.setConfig(job.getSinkDataSource());
EventMeshDataSource createdSink = dataSourceBizService.createDataSource(source);
sink.setConfig(job.getSinkDataSource().getConf());
EventMeshDataSource createdSink = dataSourceBizService.createDataSource(sink);
entity.setTargetData(createdSink.getId());

entityList.add(entity);
Expand Down Expand Up @@ -167,7 +177,7 @@ public JobDetail getJobDetail(String jobID) {
detail.setSinkConnectorDesc(target.getDescription());
}

TaskState state = TaskState.fromIndex(job.getState());
TaskState state = TaskState.fromIndex(job.getJobState());
if (state == null) {
throw new AdminServerRuntimeException(ErrorCode.BAD_DB_DATA, "illegal job state in db");
}
Expand Down
Loading

0 comments on commit 7af3d16

Please sign in to comment.