Skip to content

Commit

Permalink
more grpc server
Browse files Browse the repository at this point in the history
  • Loading branch information
sodaRyCN committed Apr 26, 2024
1 parent 7c50e7f commit a2307c6
Show file tree
Hide file tree
Showing 39 changed files with 314 additions and 171 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.apache.eventmesh.admin.server;

import org.apache.eventmesh.common.adminserver.HeartBeat;
import org.apache.eventmesh.common.remote.HeartBeat;
import org.apache.eventmesh.common.utils.PagedList;

import org.apache.eventmesh.common.adminserver.Task;
import org.apache.eventmesh.common.remote.Task;

public interface Admin extends ComponentLifeCycle {
/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.apache.eventmesh.admin.server;

import org.apache.eventmesh.common.adminserver.Task;
import org.apache.eventmesh.common.remote.Task;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.adminserver.HeartBeat;
import org.apache.eventmesh.common.remote.HeartBeat;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.PagedList;
import org.apache.eventmesh.registry.RegisterServerInfo;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,48 +1,65 @@
package com.apache.eventmesh.admin.server.web;

import com.apache.eventmesh.admin.server.web.handler.BaseRequestHandler;
import com.apache.eventmesh.admin.server.web.handler.RequestHandlerFactory;
import com.google.protobuf.Any;
import com.google.protobuf.UnsafeByteOperations;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.adminserver.response.BaseResponse;
import org.apache.eventmesh.common.adminserver.response.FailResponse;
import org.apache.eventmesh.common.remote.payload.PayloadUtil;
import org.apache.eventmesh.common.remote.request.BaseGrpcRequest;
import org.apache.eventmesh.common.remote.response.BaseGrpcResponse;
import org.apache.eventmesh.common.remote.response.FailResponse;
import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc;
import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class AdminGrpcServer extends AdminServiceGrpc.AdminServiceImplBase {
@Autowired
RequestHandlerFactory handlerFactory;

private Payload process(Payload value) {
if (value == null || StringUtils.isBlank(value.getMetadata().getType())) {

return PayloadUtil.from(FailResponse.build(BaseGrpcResponse.UNKNOWN, "bad request"));
}
BaseRequestHandler<BaseGrpcRequest, BaseGrpcResponse> handler =
handlerFactory.getHandler(value.getMetadata().getType());
if (handler == null) {
return PayloadUtil.from(FailResponse.build(BaseGrpcResponse.UNKNOWN,
"not match any request handler"));
}
return PayloadUtil.from(handler.handlerRequest(PayloadUtil.parse(value), value.getMetadata()));
}

public StreamObserver<Payload> invokeBiStream(StreamObserver<Payload> responseObserver) {
return new StreamObserver<Payload>() {
@Override
public void onNext(Payload value) {
if (value == null || StringUtils.isBlank(value.getMetadata().getType())) {
responseObserver.onNext(Payload.newBuilder().setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(JsonUtils.toJSONBytes(FailResponse.build(BaseResponse.UNKNOWN, "bad " +
"request"))))).build());
return;
}
handlerFactory.getHandler(value.getMetadata().getType());
responseObserver.onNext();
responseObserver.onNext(process(value));
}

@Override
public void onError(Throwable t) {

if (responseObserver instanceof ServerCallStreamObserver) {
if (!((ServerCallStreamObserver<Payload>) responseObserver).isCancelled()) {
log.warn("admin gRPC server fail", t);
}
}
}

@Override
public void onCompleted() {

responseObserver.onCompleted();
}
};
}

public void invoke(Payload request, StreamObserver<Payload> responseObserver) {
responseObserver.onNext(process(request));
responseObserver.onCompleted();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.apache.eventmesh.admin.server.web.handler;

import org.apache.eventmesh.common.remote.request.BaseGrpcRequest;
import org.apache.eventmesh.common.remote.response.BaseGrpcResponse;
import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;

public abstract class BaseRequestHandler<T extends BaseGrpcRequest, S extends BaseGrpcResponse> {
public BaseGrpcResponse handlerRequest(T request, Metadata metadata) {
return handler(request, metadata);
}

protected abstract S handler(T request, Metadata metadata);
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.apache.eventmesh.admin.server.web.handler;

import org.apache.eventmesh.common.adminserver.request.FetchJobRequest;
import org.apache.eventmesh.common.adminserver.response.FetchJobResponse;
import org.apache.eventmesh.common.remote.request.FetchJobRequest;
import org.apache.eventmesh.common.remote.response.FetchJobResponse;
import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;

public class FetchJobRequestHandler extends AbstractRequestHandler<FetchJobRequest, FetchJobResponse> {
public class FetchJobRequestHandler extends BaseRequestHandler<FetchJobRequest, FetchJobResponse> {

@Override
public FetchJobResponse handler(FetchJobRequest request, Metadata metadata) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,38 +1,35 @@
package com.apache.eventmesh.admin.server.web.handler;

import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import org.apache.eventmesh.common.adminserver.request.BaseRequest;
import org.apache.eventmesh.common.adminserver.response.BaseResponse;
import org.apache.eventmesh.common.remote.request.BaseGrpcRequest;
import org.apache.eventmesh.common.remote.response.BaseGrpcResponse;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Service;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Service
@Component
public class RequestHandlerFactory implements ApplicationListener<ContextRefreshedEvent> {

private final Map<String, AbstractRequestHandler<BaseRequest,BaseResponse>> handlers =
private final Map<String, BaseRequestHandler<BaseGrpcRequest, BaseGrpcResponse>> handlers =
new ConcurrentHashMap<>();

public AbstractRequestHandler<BaseRequest, BaseResponse> getHandler(String type) {
public BaseRequestHandler<BaseGrpcRequest, BaseGrpcResponse> getHandler(String type) {
return handlers.get(type);
}

@Override
@SuppressWarnings({"rawtypes", "unchecked"})
public void onApplicationEvent(ContextRefreshedEvent event) {
Map<String, AbstractRequestHandler> beans =
event.getApplicationContext().getBeansOfType(AbstractRequestHandler.class);
Map<String, BaseRequestHandler> beans =
event.getApplicationContext().getBeansOfType(BaseRequestHandler.class);

for (AbstractRequestHandler<BaseRequest, BaseResponse> requestHandler : beans.values()) {
for (BaseRequestHandler<BaseGrpcRequest, BaseGrpcResponse> requestHandler : beans.values()) {
Class<?> clazz = requestHandler.getClass();
boolean skip = false;
while (!clazz.getSuperclass().equals(AbstractRequestHandler.class)) {
while (!clazz.getSuperclass().equals(BaseRequestHandler.class)) {
if (clazz.getSuperclass().equals(Object.class)) {
skip = true;
break;
Expand All @@ -43,11 +40,6 @@ public void onApplicationEvent(ContextRefreshedEvent event) {
continue;
}

try {
Method method = clazz.getMethod("handle", Request.class, RequestMeta.class);
} catch (Exception e) {
//ignore.
}
Class tClass = (Class) ((ParameterizedType) clazz.getGenericSuperclass()).getActualTypeArguments()[0];
handlers.putIfAbsent(tClass.getSimpleName(), requestHandler);
}
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.apache.eventmesh.common.adminserver;
package org.apache.eventmesh.common.remote;

import lombok.Data;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.apache.eventmesh.common.adminserver;
package org.apache.eventmesh.common.remote;

public class Job {
private long id;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.apache.eventmesh.common.adminserver;
package org.apache.eventmesh.common.remote;

public enum JobState {
INIT,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.apache.eventmesh.common.adminserver;
package org.apache.eventmesh.common.remote;

public enum JobType {
FULL,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.apache.eventmesh.common.remote;

public class Position {

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.apache.eventmesh.common.adminserver;
package org.apache.eventmesh.common.remote;

// task : job = 1 : m
public class Task {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.apache.eventmesh.common.remote.exception;

public class ErrorCode {
public static final int TYPE_IN_METADATA_NOT_EXISTS = 4001;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.apache.eventmesh.common.remote.exception;

public class PayloadFormatException extends RemoteRuntimeException {
public PayloadFormatException(int code, String desc) {
super(code, desc);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.apache.eventmesh.common.remote.exception;

public class RemoteRuntimeException extends RuntimeException{
protected final int code;
protected final String message;
public RemoteRuntimeException(int code, String message) {
this.code = code;
this.message = message;
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.apache.eventmesh.common.adminserver.job;
package org.apache.eventmesh.common.remote.job;

public enum DataSourceClassify {
// relationship db
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.apache.eventmesh.common.adminserver.job;
package org.apache.eventmesh.common.remote.job;

public enum DataSourceDriverType {
MYSQL,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.apache.eventmesh.common.adminserver.job;
package org.apache.eventmesh.common.remote.job;

public enum DataSourceType {
MYSQL("MySQL", DataSourceDriverType.MYSQL, DataSourceClassify.RDB),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.apache.eventmesh.common.adminserver.job;
package org.apache.eventmesh.common.remote.job;

public enum JobTransportType {
MYSQL_MYSQL(DataSourceType.MYSQL, DataSourceType.MYSQL),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package org.apache.eventmesh.common.remote.payload;

public interface IPayload {
}
Loading

0 comments on commit a2307c6

Please sign in to comment.