From 8656a8b60bfe2250bbb90d862509353cb0db35f0 Mon Sep 17 00:00:00 2001 From: xwm1992 Date: Thu, 18 Apr 2024 18:04:41 +0800 Subject: [PATCH] update connector runtime --- .../runtime/boot/RuntimeInstanceStarter.java | 22 +-- .../runtime/rpc/EventMeshAdminService.java | 138 +++++++++--------- 2 files changed, 77 insertions(+), 83 deletions(-) diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstanceStarter.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstanceStarter.java index e2061a8813..5d0a525211 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstanceStarter.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/boot/RuntimeInstanceStarter.java @@ -12,6 +12,9 @@ import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; +import com.google.protobuf.Any; +import com.google.protobuf.StringValue; + import lombok.extern.slf4j.Slf4j; @Slf4j @@ -23,22 +26,18 @@ public static void main(String[] args) { // TODO:添加shutDownHook try { - ConfigService.getInstance() - .setConfigPath(EventMeshConstants.EVENTMESH_CONF_HOME + File.separator) - .setRootConfig(EventMeshConstants.EVENTMESH_CONF_FILE); - EventMeshServer server = new EventMeshServer(); BannerUtil.generateBanner(); server.init(); server.start(); Runtime.getRuntime().addShutdownHook(new Thread(() -> { try { - log.info("eventMesh shutting down hook begin."); + log.info("runtime shutting down hook begin."); long start = System.currentTimeMillis(); server.shutdown(); long end = System.currentTimeMillis(); - log.info("eventMesh shutdown cost {}ms", end - start); + log.info("runtime shutdown cost {}ms", end - start); } catch (Exception e) { log.error("exception when shutdown.", e); } @@ -60,27 +59,28 @@ public static void main(String[] args) { StreamObserver responseObserver = new StreamObserver() { @Override public void onNext(Payload response) { - System.out.println("Received response: " + response.getBody()); + log.info("runtime receive message: {} ", response); } @Override public void onError(Throwable t) { - System.out.println("Error: " + t.getMessage()); + log.error("runtime receive error message: {}", t.getMessage()); } @Override public void onCompleted() { - System.out.println("Stream completed"); + log.info("runtime finished receive message and completed"); } }; // 创建一个请求观察者 StreamObserver requestObserver = stub.invokeBiStream(responseObserver); - + StringValue stringValue = StringValue.newBuilder().setValue("test").build(); + Any test = Any.pack(stringValue); // 发送请求 for (int i = 0; i < 10; i++) { Payload request = Payload.newBuilder() - .setBody("t") + .setBody(test) .build(); requestObserver.onNext(request); } diff --git a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/rpc/EventMeshAdminService.java b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/rpc/EventMeshAdminService.java index 82c1f60749..f188113ce4 100644 --- a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/rpc/EventMeshAdminService.java +++ b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/rpc/EventMeshAdminService.java @@ -2,82 +2,76 @@ // source: event_mesh_admin_service.proto public final class EventMeshAdminService { + private EventMeshAdminService() {} + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistryLite registry) { + } - private EventMeshAdminService() { - } - - public static void registerAllExtensions( - com.google.protobuf.ExtensionRegistryLite registry) { - } - - public static void registerAllExtensions( - com.google.protobuf.ExtensionRegistry registry) { - registerAllExtensions( - (com.google.protobuf.ExtensionRegistryLite) registry); - } - - static final com.google.protobuf.Descriptors.Descriptor - internal_static_Metadata_descriptor; - static final + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistry registry) { + registerAllExtensions( + (com.google.protobuf.ExtensionRegistryLite) registry); + } + static final com.google.protobuf.Descriptors.Descriptor + internal_static_Metadata_descriptor; + static final com.google.protobuf.GeneratedMessageV3.FieldAccessorTable - internal_static_Metadata_fieldAccessorTable; - static final com.google.protobuf.Descriptors.Descriptor - internal_static_Metadata_HeadersEntry_descriptor; - static final + internal_static_Metadata_fieldAccessorTable; + static final com.google.protobuf.Descriptors.Descriptor + internal_static_Metadata_HeadersEntry_descriptor; + static final com.google.protobuf.GeneratedMessageV3.FieldAccessorTable - internal_static_Metadata_HeadersEntry_fieldAccessorTable; - static final com.google.protobuf.Descriptors.Descriptor - internal_static_Payload_descriptor; - static final + internal_static_Metadata_HeadersEntry_fieldAccessorTable; + static final com.google.protobuf.Descriptors.Descriptor + internal_static_Payload_descriptor; + static final com.google.protobuf.GeneratedMessageV3.FieldAccessorTable - internal_static_Payload_fieldAccessorTable; - - public static com.google.protobuf.Descriptors.FileDescriptor - getDescriptor() { - return descriptor; - } - - private static com.google.protobuf.Descriptors.FileDescriptor - descriptor; + internal_static_Payload_fieldAccessorTable; - static { - String[] descriptorData = { - "\n\036event_mesh_admin_service.proto\032\031google" + - "/protobuf/any.proto\"q\n\010Metadata\022\014\n\004type\030" + - "\003 \001(\t\022\'\n\007headers\030\007 \003(\0132\026.Metadata.Header" + - "sEntry\032.\n\014HeadersEntry\022\013\n\003key\030\001 \001(\t\022\r\n\005v" + - "alue\030\002 \001(\t:\0028\001\"J\n\007Payload\022\033\n\010metadata\030\002 " + - "\001(\0132\t.Metadata\022\"\n\004body\030\003 \001(\0132\024.google.pr" + - "otobuf.Any2B\n\024AdminBiStreamService\022*\n\016in" + - "vokeBiStream\022\010.Payload\032\010.Payload\"\000(\0010\00126" + - "\n\014AdminService\022&\n\016invokeBiStream\022\010.Paylo" + - "ad\032\010.Payload\"\000B\002P\001b\006proto3" - }; - descriptor = com.google.protobuf.Descriptors.FileDescriptor - .internalBuildGeneratedFileFrom(descriptorData, - new com.google.protobuf.Descriptors.FileDescriptor[] { - com.google.protobuf.AnyProto.getDescriptor(), - }); - internal_static_Metadata_descriptor = - getDescriptor().getMessageTypes().get(0); - internal_static_Metadata_fieldAccessorTable = new - com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( - internal_static_Metadata_descriptor, - new String[] {"Type", "Headers",}); - internal_static_Metadata_HeadersEntry_descriptor = - internal_static_Metadata_descriptor.getNestedTypes().get(0); - internal_static_Metadata_HeadersEntry_fieldAccessorTable = new - com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( - internal_static_Metadata_HeadersEntry_descriptor, - new String[] {"Key", "Value",}); - internal_static_Payload_descriptor = - getDescriptor().getMessageTypes().get(1); - internal_static_Payload_fieldAccessorTable = new - com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( - internal_static_Payload_descriptor, - new String[] {"Metadata", "Body",}); - com.google.protobuf.AnyProto.getDescriptor(); - } + public static com.google.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static com.google.protobuf.Descriptors.FileDescriptor + descriptor; + static { + String[] descriptorData = { + "\n\036event_mesh_admin_service.proto\032\031google" + + "/protobuf/any.proto\"q\n\010Metadata\022\014\n\004type\030" + + "\003 \001(\t\022\'\n\007headers\030\007 \003(\0132\026.Metadata.Header" + + "sEntry\032.\n\014HeadersEntry\022\013\n\003key\030\001 \001(\t\022\r\n\005v" + + "alue\030\002 \001(\t:\0028\001\"J\n\007Payload\022\033\n\010metadata\030\002 " + + "\001(\0132\t.Metadata\022\"\n\004body\030\003 \001(\0132\024.google.pr" + + "otobuf.Any2B\n\024AdminBiStreamService\022*\n\016in" + + "vokeBiStream\022\010.Payload\032\010.Payload\"\000(\0010\0012." + + "\n\014AdminService\022\036\n\006invoke\022\010.Payload\032\010.Pay" + + "load\"\000B\002P\001b\006proto3" + }; + descriptor = com.google.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new com.google.protobuf.Descriptors.FileDescriptor[] { + com.google.protobuf.AnyProto.getDescriptor(), + }); + internal_static_Metadata_descriptor = + getDescriptor().getMessageTypes().get(0); + internal_static_Metadata_fieldAccessorTable = new + com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( + internal_static_Metadata_descriptor, + new String[] { "Type", "Headers", }); + internal_static_Metadata_HeadersEntry_descriptor = + internal_static_Metadata_descriptor.getNestedTypes().get(0); + internal_static_Metadata_HeadersEntry_fieldAccessorTable = new + com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( + internal_static_Metadata_HeadersEntry_descriptor, + new String[] { "Key", "Value", }); + internal_static_Payload_descriptor = + getDescriptor().getMessageTypes().get(1); + internal_static_Payload_fieldAccessorTable = new + com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( + internal_static_Payload_descriptor, + new String[] { "Metadata", "Body", }); + com.google.protobuf.AnyProto.getDescriptor(); + } - // @@protoc_insertion_point(outer_class_scope) + // @@protoc_insertion_point(outer_class_scope) }