From a2307c647998e2701ae2729988708bf7dd7321fa Mon Sep 17 00:00:00 2001 From: sodaRyCN <757083350@qq.com> Date: Fri, 26 Apr 2024 17:47:09 +0800 Subject: [PATCH] more grpc server --- .../apache/eventmesh/admin/server/Admin.java | 4 +- .../eventmesh/admin/server/AdminServer.java | 4 +- .../admin/server/web/AdminGrpcServer.java | 45 +++++++++++----- .../web/handler/AbstractRequestHandler.java | 13 ----- .../web/handler/BaseRequestHandler.java | 13 +++++ .../web/handler/FetchJobRequestHandler.java | 6 +-- .../web/handler/RequestHandlerFactory.java | 28 ++++------ .../common/adminserver/IPayload.java | 4 -- .../common/adminserver/PayloadUtil.java | 15 ------ .../common/adminserver/Position.java | 5 -- .../adminserver/request/BaseRequest.java | 7 --- .../adminserver/request/FetchJobRequest.java | 8 --- .../request/ReportPositionRequest.java | 17 ------ .../adminserver/response/BaseResponse.java | 12 ----- .../{adminserver => remote}/HeartBeat.java | 2 +- .../common/{adminserver => remote}/Job.java | 2 +- .../{adminserver => remote}/JobState.java | 2 +- .../{adminserver => remote}/JobType.java | 2 +- .../eventmesh/common/remote/Position.java | 5 ++ .../common/{adminserver => remote}/Task.java | 2 +- .../common/remote/exception/ErrorCode.java | 5 ++ .../exception/PayloadFormatException.java | 7 +++ .../exception/RemoteRuntimeException.java | 10 ++++ .../job/DataSourceClassify.java | 2 +- .../job/DataSourceDriverType.java | 2 +- .../job/DataSourceType.java | 2 +- .../job/JobTransportType.java | 2 +- .../common/remote/payload/IPayload.java | 4 ++ .../common/remote/payload/PayloadFactory.java | 54 +++++++++++++++++++ .../common/remote/payload/PayloadUtil.java | 33 ++++++++++++ .../remote/request/BaseGrpcRequest.java | 31 +++++++++++ .../remote/request/FetchJobRequest.java | 8 +++ .../remote/request/ReportPositionRequest.java | 17 ++++++ .../remote/response/BaseGrpcResponse.java | 40 ++++++++++++++ .../response/FailResponse.java | 6 +-- .../response/FetchJobResponse.java | 11 ++-- .../eventmesh/common/utils/JsonUtils.java | 29 ++++++---- .../runtime/boot/RuntimeInstance.java | 20 ------- .../runtime/connector/ConnectorRuntime.java | 6 +-- 39 files changed, 314 insertions(+), 171 deletions(-) delete mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/AbstractRequestHandler.java create mode 100644 eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/BaseRequestHandler.java delete mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/IPayload.java delete mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/PayloadUtil.java delete mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/Position.java delete mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/request/BaseRequest.java delete mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/request/FetchJobRequest.java delete mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/request/ReportPositionRequest.java delete mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/response/BaseResponse.java rename eventmesh-common/src/main/java/org/apache/eventmesh/common/{adminserver => remote}/HeartBeat.java (75%) rename eventmesh-common/src/main/java/org/apache/eventmesh/common/{adminserver => remote}/Job.java (71%) rename eventmesh-common/src/main/java/org/apache/eventmesh/common/{adminserver => remote}/JobState.java (66%) rename eventmesh-common/src/main/java/org/apache/eventmesh/common/{adminserver => remote}/JobType.java (57%) create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/Position.java rename eventmesh-common/src/main/java/org/apache/eventmesh/common/{adminserver => remote}/Task.java (86%) create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/exception/ErrorCode.java create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/exception/PayloadFormatException.java create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/exception/RemoteRuntimeException.java rename eventmesh-common/src/main/java/org/apache/eventmesh/common/{adminserver => remote}/job/DataSourceClassify.java (62%) rename eventmesh-common/src/main/java/org/apache/eventmesh/common/{adminserver => remote}/job/DataSourceDriverType.java (58%) rename eventmesh-common/src/main/java/org/apache/eventmesh/common/{adminserver => remote}/job/DataSourceType.java (95%) rename eventmesh-common/src/main/java/org/apache/eventmesh/common/{adminserver => remote}/job/JobTransportType.java (94%) create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/payload/IPayload.java create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/payload/PayloadFactory.java create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/payload/PayloadUtil.java create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/BaseGrpcRequest.java create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/FetchJobRequest.java create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportPositionRequest.java create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/BaseGrpcResponse.java rename eventmesh-common/src/main/java/org/apache/eventmesh/common/{adminserver => remote}/response/FailResponse.java (71%) rename eventmesh-common/src/main/java/org/apache/eventmesh/common/{adminserver => remote}/response/FetchJobResponse.java (53%) diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/Admin.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/Admin.java index 0af629368e..171d141457 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/Admin.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/Admin.java @@ -1,9 +1,9 @@ package com.apache.eventmesh.admin.server; -import org.apache.eventmesh.common.adminserver.HeartBeat; +import org.apache.eventmesh.common.remote.HeartBeat; import org.apache.eventmesh.common.utils.PagedList; -import org.apache.eventmesh.common.adminserver.Task; +import org.apache.eventmesh.common.remote.Task; public interface Admin extends ComponentLifeCycle { /** diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServer.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServer.java index 7e71a49aed..e65e335296 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServer.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServer.java @@ -1,8 +1,8 @@ package com.apache.eventmesh.admin.server; -import org.apache.eventmesh.common.adminserver.Task; +import org.apache.eventmesh.common.remote.Task; import org.apache.eventmesh.common.Constants; -import org.apache.eventmesh.common.adminserver.HeartBeat; +import org.apache.eventmesh.common.remote.HeartBeat; import org.apache.eventmesh.common.utils.IPUtils; import org.apache.eventmesh.common.utils.PagedList; import org.apache.eventmesh.registry.RegisterServerInfo; diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/AdminGrpcServer.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/AdminGrpcServer.java index 5f84ab0ae3..a5c49de353 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/AdminGrpcServer.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/AdminGrpcServer.java @@ -1,48 +1,65 @@ package com.apache.eventmesh.admin.server.web; +import com.apache.eventmesh.admin.server.web.handler.BaseRequestHandler; import com.apache.eventmesh.admin.server.web.handler.RequestHandlerFactory; -import com.google.protobuf.Any; -import com.google.protobuf.UnsafeByteOperations; +import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; -import org.apache.eventmesh.common.adminserver.response.BaseResponse; -import org.apache.eventmesh.common.adminserver.response.FailResponse; +import org.apache.eventmesh.common.remote.payload.PayloadUtil; +import org.apache.eventmesh.common.remote.request.BaseGrpcRequest; +import org.apache.eventmesh.common.remote.response.BaseGrpcResponse; +import org.apache.eventmesh.common.remote.response.FailResponse; import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc; import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload; -import org.apache.eventmesh.common.utils.JsonUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service +@Slf4j public class AdminGrpcServer extends AdminServiceGrpc.AdminServiceImplBase { @Autowired RequestHandlerFactory handlerFactory; + private Payload process(Payload value) { + if (value == null || StringUtils.isBlank(value.getMetadata().getType())) { + + return PayloadUtil.from(FailResponse.build(BaseGrpcResponse.UNKNOWN, "bad request")); + } + BaseRequestHandler handler = + handlerFactory.getHandler(value.getMetadata().getType()); + if (handler == null) { + return PayloadUtil.from(FailResponse.build(BaseGrpcResponse.UNKNOWN, + "not match any request handler")); + } + return PayloadUtil.from(handler.handlerRequest(PayloadUtil.parse(value), value.getMetadata())); + } + public StreamObserver invokeBiStream(StreamObserver responseObserver) { return new StreamObserver() { @Override public void onNext(Payload value) { - if (value == null || StringUtils.isBlank(value.getMetadata().getType())) { - responseObserver.onNext(Payload.newBuilder().setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(JsonUtils.toJSONBytes(FailResponse.build(BaseResponse.UNKNOWN, "bad " + - "request"))))).build()); - return; - } - handlerFactory.getHandler(value.getMetadata().getType()); - responseObserver.onNext(); + responseObserver.onNext(process(value)); } @Override public void onError(Throwable t) { - + if (responseObserver instanceof ServerCallStreamObserver) { + if (!((ServerCallStreamObserver) responseObserver).isCancelled()) { + log.warn("admin gRPC server fail", t); + } + } } @Override public void onCompleted() { - + responseObserver.onCompleted(); } }; } public void invoke(Payload request, StreamObserver responseObserver) { + responseObserver.onNext(process(request)); + responseObserver.onCompleted(); } } diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/AbstractRequestHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/AbstractRequestHandler.java deleted file mode 100644 index a21eeaf6b4..0000000000 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/AbstractRequestHandler.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.apache.eventmesh.admin.server.web.handler; - -import org.apache.eventmesh.common.adminserver.request.BaseRequest; -import org.apache.eventmesh.common.adminserver.response.BaseResponse; -import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; - -public abstract class AbstractRequestHandler { - public BaseResponse handlerRequest(T request, Metadata metadata) { - return handler(request, metadata); - } - - public abstract S handler(T request, Metadata metadata); -} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/BaseRequestHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/BaseRequestHandler.java new file mode 100644 index 0000000000..4906da7353 --- /dev/null +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/BaseRequestHandler.java @@ -0,0 +1,13 @@ +package com.apache.eventmesh.admin.server.web.handler; + +import org.apache.eventmesh.common.remote.request.BaseGrpcRequest; +import org.apache.eventmesh.common.remote.response.BaseGrpcResponse; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; + +public abstract class BaseRequestHandler { + public BaseGrpcResponse handlerRequest(T request, Metadata metadata) { + return handler(request, metadata); + } + + protected abstract S handler(T request, Metadata metadata); +} diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/FetchJobRequestHandler.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/FetchJobRequestHandler.java index 1fd2c7fe65..15ad700f1b 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/FetchJobRequestHandler.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/FetchJobRequestHandler.java @@ -1,10 +1,10 @@ package com.apache.eventmesh.admin.server.web.handler; -import org.apache.eventmesh.common.adminserver.request.FetchJobRequest; -import org.apache.eventmesh.common.adminserver.response.FetchJobResponse; +import org.apache.eventmesh.common.remote.request.FetchJobRequest; +import org.apache.eventmesh.common.remote.response.FetchJobResponse; import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; -public class FetchJobRequestHandler extends AbstractRequestHandler { +public class FetchJobRequestHandler extends BaseRequestHandler { @Override public FetchJobResponse handler(FetchJobRequest request, Metadata metadata) { diff --git a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/RequestHandlerFactory.java b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/RequestHandlerFactory.java index 6d37d32ac8..2f44d8e0de 100644 --- a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/RequestHandlerFactory.java +++ b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/RequestHandlerFactory.java @@ -1,38 +1,35 @@ package com.apache.eventmesh.admin.server.web.handler; -import com.alibaba.nacos.api.remote.request.Request; -import com.alibaba.nacos.api.remote.request.RequestMeta; -import org.apache.eventmesh.common.adminserver.request.BaseRequest; -import org.apache.eventmesh.common.adminserver.response.BaseResponse; +import org.apache.eventmesh.common.remote.request.BaseGrpcRequest; +import org.apache.eventmesh.common.remote.response.BaseGrpcResponse; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; -import org.springframework.stereotype.Service; +import org.springframework.stereotype.Component; -import java.lang.reflect.Method; import java.lang.reflect.ParameterizedType; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -@Service +@Component public class RequestHandlerFactory implements ApplicationListener { - private final Map> handlers = + private final Map> handlers = new ConcurrentHashMap<>(); - public AbstractRequestHandler getHandler(String type) { + public BaseRequestHandler getHandler(String type) { return handlers.get(type); } @Override @SuppressWarnings({"rawtypes", "unchecked"}) public void onApplicationEvent(ContextRefreshedEvent event) { - Map beans = - event.getApplicationContext().getBeansOfType(AbstractRequestHandler.class); + Map beans = + event.getApplicationContext().getBeansOfType(BaseRequestHandler.class); - for (AbstractRequestHandler requestHandler : beans.values()) { + for (BaseRequestHandler requestHandler : beans.values()) { Class clazz = requestHandler.getClass(); boolean skip = false; - while (!clazz.getSuperclass().equals(AbstractRequestHandler.class)) { + while (!clazz.getSuperclass().equals(BaseRequestHandler.class)) { if (clazz.getSuperclass().equals(Object.class)) { skip = true; break; @@ -43,11 +40,6 @@ public void onApplicationEvent(ContextRefreshedEvent event) { continue; } - try { - Method method = clazz.getMethod("handle", Request.class, RequestMeta.class); - } catch (Exception e) { - //ignore. - } Class tClass = (Class) ((ParameterizedType) clazz.getGenericSuperclass()).getActualTypeArguments()[0]; handlers.putIfAbsent(tClass.getSimpleName(), requestHandler); } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/IPayload.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/IPayload.java deleted file mode 100644 index be09f958dc..0000000000 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/IPayload.java +++ /dev/null @@ -1,4 +0,0 @@ -package org.apache.eventmesh.common.adminserver; - -public interface IPayload { -} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/PayloadUtil.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/PayloadUtil.java deleted file mode 100644 index c01b163d90..0000000000 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/PayloadUtil.java +++ /dev/null @@ -1,15 +0,0 @@ -package org.apache.eventmesh.common.adminserver; - -import org.apache.eventmesh.common.adminserver.request.BaseRequest; -import org.apache.eventmesh.common.adminserver.response.BaseResponse; -import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload; - -public class PayloadUtil { - public static Payload from(BaseRequest request) { - return null; - } - - public static Payload from(BaseResponse response) { - return null; - } -} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/Position.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/Position.java deleted file mode 100644 index 8a00052752..0000000000 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/Position.java +++ /dev/null @@ -1,5 +0,0 @@ -package org.apache.eventmesh.common.adminserver; - -public class Position { - -} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/request/BaseRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/request/BaseRequest.java deleted file mode 100644 index a6c80d2fbb..0000000000 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/request/BaseRequest.java +++ /dev/null @@ -1,7 +0,0 @@ -package org.apache.eventmesh.common.adminserver.request; - -import org.apache.eventmesh.common.adminserver.IPayload; - -public abstract class BaseRequest implements IPayload { - -} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/request/FetchJobRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/request/FetchJobRequest.java deleted file mode 100644 index 68d5ab5b3a..0000000000 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/request/FetchJobRequest.java +++ /dev/null @@ -1,8 +0,0 @@ -package org.apache.eventmesh.common.adminserver.request; - -import lombok.Data; - -@Data -public class FetchJobRequest extends BaseRequest { - String primaryKey; -} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/request/ReportPositionRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/request/ReportPositionRequest.java deleted file mode 100644 index 0741ef94b1..0000000000 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/request/ReportPositionRequest.java +++ /dev/null @@ -1,17 +0,0 @@ -package org.apache.eventmesh.common.adminserver.request; - -import org.apache.eventmesh.common.adminserver.JobState; -import org.apache.eventmesh.common.adminserver.Position; - -import lombok.Data; - -@Data -public class ReportPositionRequest { - - private String jobID; - - private Position position; - - private JobState state; - -} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/response/BaseResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/response/BaseResponse.java deleted file mode 100644 index 2ff7b8809a..0000000000 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/response/BaseResponse.java +++ /dev/null @@ -1,12 +0,0 @@ -package org.apache.eventmesh.common.adminserver.response; - -import lombok.Data; -import org.apache.eventmesh.common.adminserver.IPayload; - -@Data -public abstract class BaseResponse implements IPayload { - public static final int UNKNOWN = -1; - private boolean success = true; - private int errorCode; - private String desc; -} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/HeartBeat.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/HeartBeat.java similarity index 75% rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/HeartBeat.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/HeartBeat.java index 90dc33900c..bc82e4e67f 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/HeartBeat.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/HeartBeat.java @@ -1,4 +1,4 @@ -package org.apache.eventmesh.common.adminserver; +package org.apache.eventmesh.common.remote; import lombok.Data; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/Job.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/Job.java similarity index 71% rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/Job.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/Job.java index ed56441699..ab5e6363d2 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/Job.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/Job.java @@ -1,4 +1,4 @@ -package org.apache.eventmesh.common.adminserver; +package org.apache.eventmesh.common.remote; public class Job { private long id; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/JobState.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobState.java similarity index 66% rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/JobState.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobState.java index 05f6736fab..0547c50334 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/JobState.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobState.java @@ -1,4 +1,4 @@ -package org.apache.eventmesh.common.adminserver; +package org.apache.eventmesh.common.remote; public enum JobState { INIT, diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/JobType.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobType.java similarity index 57% rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/JobType.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobType.java index 93045a188f..e4dbddbf6e 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/JobType.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobType.java @@ -1,4 +1,4 @@ -package org.apache.eventmesh.common.adminserver; +package org.apache.eventmesh.common.remote; public enum JobType { FULL, diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/Position.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/Position.java new file mode 100644 index 0000000000..8550d623b4 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/Position.java @@ -0,0 +1,5 @@ +package org.apache.eventmesh.common.remote; + +public class Position { + +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/Task.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/Task.java similarity index 86% rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/Task.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/Task.java index 0aa9a452dc..06128c7c08 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/Task.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/Task.java @@ -1,4 +1,4 @@ -package org.apache.eventmesh.common.adminserver; +package org.apache.eventmesh.common.remote; // task : job = 1 : m public class Task { diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/exception/ErrorCode.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/exception/ErrorCode.java new file mode 100644 index 0000000000..a940bd7c2e --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/exception/ErrorCode.java @@ -0,0 +1,5 @@ +package org.apache.eventmesh.common.remote.exception; + +public class ErrorCode { + public static final int TYPE_IN_METADATA_NOT_EXISTS = 4001; +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/exception/PayloadFormatException.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/exception/PayloadFormatException.java new file mode 100644 index 0000000000..0160fc0cd6 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/exception/PayloadFormatException.java @@ -0,0 +1,7 @@ +package org.apache.eventmesh.common.remote.exception; + +public class PayloadFormatException extends RemoteRuntimeException { + public PayloadFormatException(int code, String desc) { + super(code, desc); + } +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/exception/RemoteRuntimeException.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/exception/RemoteRuntimeException.java new file mode 100644 index 0000000000..299a2c9383 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/exception/RemoteRuntimeException.java @@ -0,0 +1,10 @@ +package org.apache.eventmesh.common.remote.exception; + +public class RemoteRuntimeException extends RuntimeException{ + protected final int code; + protected final String message; + public RemoteRuntimeException(int code, String message) { + this.code = code; + this.message = message; + } +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/job/DataSourceClassify.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/DataSourceClassify.java similarity index 62% rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/job/DataSourceClassify.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/DataSourceClassify.java index b3f9a749ef..cb3b77d18d 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/job/DataSourceClassify.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/DataSourceClassify.java @@ -1,4 +1,4 @@ -package org.apache.eventmesh.common.adminserver.job; +package org.apache.eventmesh.common.remote.job; public enum DataSourceClassify { // relationship db diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/job/DataSourceDriverType.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/DataSourceDriverType.java similarity index 58% rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/job/DataSourceDriverType.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/DataSourceDriverType.java index fc849fcfb0..43d8e6e77b 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/job/DataSourceDriverType.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/DataSourceDriverType.java @@ -1,4 +1,4 @@ -package org.apache.eventmesh.common.adminserver.job; +package org.apache.eventmesh.common.remote.job; public enum DataSourceDriverType { MYSQL, diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/job/DataSourceType.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/DataSourceType.java similarity index 95% rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/job/DataSourceType.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/DataSourceType.java index 12277134cc..902f7e898e 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/job/DataSourceType.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/DataSourceType.java @@ -1,4 +1,4 @@ -package org.apache.eventmesh.common.adminserver.job; +package org.apache.eventmesh.common.remote.job; public enum DataSourceType { MYSQL("MySQL", DataSourceDriverType.MYSQL, DataSourceClassify.RDB), diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/job/JobTransportType.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/JobTransportType.java similarity index 94% rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/job/JobTransportType.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/JobTransportType.java index a7a16a2fa0..e049528731 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/job/JobTransportType.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/job/JobTransportType.java @@ -1,4 +1,4 @@ -package org.apache.eventmesh.common.adminserver.job; +package org.apache.eventmesh.common.remote.job; public enum JobTransportType { MYSQL_MYSQL(DataSourceType.MYSQL, DataSourceType.MYSQL), diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/payload/IPayload.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/payload/IPayload.java new file mode 100644 index 0000000000..7224da76ec --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/payload/IPayload.java @@ -0,0 +1,4 @@ +package org.apache.eventmesh.common.remote.payload; + +public interface IPayload { +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/payload/PayloadFactory.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/payload/PayloadFactory.java new file mode 100644 index 0000000000..0c98d69dd3 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/payload/PayloadFactory.java @@ -0,0 +1,54 @@ +package org.apache.eventmesh.common.remote.payload; + +import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload; + +import java.lang.reflect.Modifier; +import java.util.Map; +import java.util.ServiceLoader; +import java.util.concurrent.ConcurrentHashMap; + +public class PayloadFactory { + private PayloadFactory(){ + } + + private static class PayloadFactoryHolder { + private static final PayloadFactory INSTANCE = new PayloadFactory(); + } + + public static PayloadFactory getInstance(){ + return PayloadFactoryHolder.INSTANCE; + } + + private final Map> registryPayload = new ConcurrentHashMap<>(); + + private boolean initialized = false; + + public void init() { + scan(); + } + + private synchronized void scan() { + if (initialized) { + return; + } + ServiceLoader payloads = ServiceLoader.load(Payload.class); + for (Payload payload : payloads) { + register(payload.getClass().getSimpleName(), payload.getClass()); + } + initialized = true; + } + + public void register(String type, Class clazz) { + if (Modifier.isAbstract(clazz.getModifiers())) { + return; + } + if (registryPayload.containsKey(type)) { + throw new RuntimeException(String.format("Fail to register, type:%s ,clazz:%s ", type, clazz.getName())); + } + registryPayload.put(type, clazz); + } + + public Class getClassByType(String type) { + return registryPayload.get(type); + } +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/payload/PayloadUtil.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/payload/PayloadUtil.java new file mode 100644 index 0000000000..59230a2872 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/payload/PayloadUtil.java @@ -0,0 +1,33 @@ +package org.apache.eventmesh.common.remote.payload; + +import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream; +import com.google.protobuf.Any; +import com.google.protobuf.UnsafeByteOperations; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata; +import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload; +import org.apache.eventmesh.common.remote.exception.ErrorCode; +import org.apache.eventmesh.common.remote.exception.PayloadFormatException; +import org.apache.eventmesh.common.remote.request.BaseGrpcRequest; +import org.apache.eventmesh.common.remote.response.BaseGrpcResponse; +import org.apache.eventmesh.common.utils.JsonUtils; + +public class PayloadUtil { + public static Payload from(BaseGrpcRequest request) { + return null; + } + + public static Payload from(BaseGrpcResponse response) { + byte[] responseBytes = JsonUtils.toJSONBytes(response); + Metadata.Builder metadata = Metadata.newBuilder().setType(response.getClass().getSimpleName()); + return Payload.newBuilder().setMetadata(metadata).setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(responseBytes))).build(); + } + + public static BaseGrpcRequest parse(Payload payload) { + Class targetClass = PayloadFactory.getInstance().getClassByType(payload.getMetadata().getType()); + if (targetClass == null) { + throw new PayloadFormatException(ErrorCode.TYPE_IN_METADATA_NOT_EXISTS, + "Unknown payload type:" + payload.getMetadata().getType()); + } + return (BaseGrpcRequest)JsonUtils.parseObject(new ByteBufferBackedInputStream(payload.getBody().getValue().asReadOnlyByteBuffer()), targetClass); + } +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/BaseGrpcRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/BaseGrpcRequest.java new file mode 100644 index 0000000000..360054a01e --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/BaseGrpcRequest.java @@ -0,0 +1,31 @@ +package org.apache.eventmesh.common.remote.request; + +import lombok.Getter; +import org.apache.eventmesh.common.remote.payload.IPayload; + +import java.util.HashMap; +import java.util.Map; + +@Getter +public abstract class BaseGrpcRequest implements IPayload { + private Map header = new HashMap<>(); + + public void addHeader(String key, String value) { + if (key == null || value == null) { + return; + } + header.put(key,value); + } + + public void addHeaders(Map map) { + if (map == null || map.isEmpty()) { + return; + } + map.forEach((k,v) -> { + if (k == null || v == null) { + return; + } + this.header.put(k,v); + }); + } +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/FetchJobRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/FetchJobRequest.java new file mode 100644 index 0000000000..1f5de1ab58 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/FetchJobRequest.java @@ -0,0 +1,8 @@ +package org.apache.eventmesh.common.remote.request; + +import lombok.Data; + +@Data +public class FetchJobRequest extends BaseGrpcRequest { + String primaryKey; +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportPositionRequest.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportPositionRequest.java new file mode 100644 index 0000000000..68659f912f --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportPositionRequest.java @@ -0,0 +1,17 @@ +package org.apache.eventmesh.common.remote.request; + +import org.apache.eventmesh.common.remote.JobState; +import org.apache.eventmesh.common.remote.Position; + +import lombok.Data; + +@Data +public class ReportPositionRequest { + + private String jobID; + + private Position position; + + private JobState state; + +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/BaseGrpcResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/BaseGrpcResponse.java new file mode 100644 index 0000000000..d9791baa87 --- /dev/null +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/BaseGrpcResponse.java @@ -0,0 +1,40 @@ +package org.apache.eventmesh.common.remote.response; + +import lombok.Getter; +import lombok.Setter; +import org.apache.eventmesh.common.remote.payload.IPayload; + +import java.util.HashMap; +import java.util.Map; + +@Getter +public abstract class BaseGrpcResponse implements IPayload { + public static final int UNKNOWN = -1; + @Setter + private boolean success = true; + @Setter + private int errorCode; + @Setter + private String desc; + + private Map header = new HashMap<>(); + + public void addHeader(String key, String value) { + if (key == null || value == null) { + return; + } + header.put(key,value); + } + + public void addHeaders(Map map) { + if (map == null || map.isEmpty()) { + return; + } + map.forEach((k,v) -> { + if (k == null || v == null) { + return; + } + this.header.put(k,v); + }); + } +} diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/response/FailResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FailResponse.java similarity index 71% rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/response/FailResponse.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FailResponse.java index 1455106fea..0eec63173b 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/response/FailResponse.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FailResponse.java @@ -1,6 +1,6 @@ -package org.apache.eventmesh.common.adminserver.response; +package org.apache.eventmesh.common.remote.response; -public class FailResponse extends BaseResponse { +public class FailResponse extends BaseGrpcResponse { public static FailResponse build(int errorCode, String msg) { FailResponse response = new FailResponse(); response.setErrorCode(errorCode); @@ -17,6 +17,6 @@ public static FailResponse build(int errorCode, String msg) { * @return response */ public static FailResponse build(Throwable exception) { - return build(BaseResponse.UNKNOWN, exception.getMessage()); + return build(BaseGrpcResponse.UNKNOWN, exception.getMessage()); } } diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/response/FetchJobResponse.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java similarity index 53% rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/response/FetchJobResponse.java rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java index f5eb13e993..9e7b6da3c8 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/adminserver/response/FetchJobResponse.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/response/FetchJobResponse.java @@ -1,15 +1,16 @@ -package org.apache.eventmesh.common.adminserver.response; +package org.apache.eventmesh.common.remote.response; -import org.apache.eventmesh.common.adminserver.JobState; -import org.apache.eventmesh.common.adminserver.Position; -import org.apache.eventmesh.common.adminserver.job.JobTransportType; +import org.apache.eventmesh.common.remote.JobState; +import org.apache.eventmesh.common.remote.Position; +import org.apache.eventmesh.common.remote.job.JobTransportType; import java.util.Map; import lombok.Data; +import org.apache.eventmesh.common.remote.response.BaseGrpcResponse; @Data -public class FetchJobResponse extends BaseResponse { +public class FetchJobResponse extends BaseGrpcResponse { private long id; diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java index 71d42e3452..c7504a776e 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java @@ -17,17 +17,6 @@ package org.apache.eventmesh.common.utils; -import org.apache.eventmesh.common.Constants; -import org.apache.eventmesh.common.EventMeshDateFormat; -import org.apache.eventmesh.common.exception.JsonException; - -import org.apache.commons.lang3.StringUtils; - -import java.io.IOException; -import java.lang.reflect.Type; -import java.util.Map; -import java.util.Objects; - import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; @@ -36,6 +25,16 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.apache.commons.lang3.StringUtils; +import org.apache.eventmesh.common.Constants; +import org.apache.eventmesh.common.EventMeshDateFormat; +import org.apache.eventmesh.common.exception.JsonException; + +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.Type; +import java.util.Map; +import java.util.Objects; /** * Json serialize or deserialize utils. @@ -108,6 +107,14 @@ public static T parseObject(String text, Class clazz) { } } + public static T parseObject(InputStream inputStream, Class clazz) { + try { + return OBJECT_MAPPER.readValue(inputStream, clazz); + } catch (IOException e) { + throw new JsonException("deserialize input stream to object error",e); + } + } + public static T parseObject(String text, Type type) { if (StringUtils.isEmpty(text)) { return null; diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java index 87dcc31507..0c7b054c51 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstance.java @@ -1,13 +1,5 @@ package org.apache.eventmesh.runtime.boot; -import static org.apache.eventmesh.common.enums.ComponentType.CONNECTOR; -import static org.apache.eventmesh.common.enums.ComponentType.FUNCTION; -import static org.apache.eventmesh.common.enums.ComponentType.MESH; - -import org.apache.eventmesh.common.adminserver.HeartBeat; -import org.apache.eventmesh.common.enums.ComponentType; -import org.apache.eventmesh.common.utils.IPUtils; -import org.apache.eventmesh.common.utils.JsonUtils; import org.apache.eventmesh.registry.QueryInstances; import org.apache.eventmesh.registry.RegisterServerInfo; import org.apache.eventmesh.registry.RegistryFactory; @@ -30,19 +22,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Random; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; -import io.grpc.stub.StreamObserver; - -import com.google.protobuf.Any; -import com.google.protobuf.StringValue; -import com.google.protobuf.UnsafeByteOperations; import lombok.extern.slf4j.Slf4j; diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java index afbdba7c1c..74c26be8c2 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java @@ -4,9 +4,9 @@ import org.apache.eventmesh.api.factory.StoragePluginFactory; import org.apache.eventmesh.api.producer.Producer; import org.apache.eventmesh.common.ThreadPoolFactory; -import org.apache.eventmesh.common.adminserver.HeartBeat; -import org.apache.eventmesh.common.adminserver.request.FetchJobRequest; -import org.apache.eventmesh.common.adminserver.response.FetchJobResponse; +import org.apache.eventmesh.common.remote.HeartBeat; +import org.apache.eventmesh.common.remote.request.FetchJobRequest; +import org.apache.eventmesh.common.remote.response.FetchJobResponse; import org.apache.eventmesh.common.config.ConfigService; import org.apache.eventmesh.common.config.connector.SinkConfig; import org.apache.eventmesh.common.config.connector.SourceConfig;