Skip to content

Commit

Permalink
Add protobuf json format support for triple (#12996)
Browse files Browse the repository at this point in the history
  • Loading branch information
icodening authored Sep 2, 2023
1 parent cfd0be3 commit d596eeb
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 3 deletions.
4 changes: 4 additions & 0 deletions dubbo-remoting/dubbo-remoting-http12/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@
<artifactId>dubbo-remoting-api</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>

<dependency>
<groupId>io.netty</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
package org.apache.dubbo.remoting.http12.message;

import com.alibaba.fastjson2.JSONObject;
import com.google.protobuf.Message;
import com.google.protobuf.util.JsonFormat;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.common.utils.JsonUtils;
import org.apache.dubbo.common.utils.MethodUtils;
import org.apache.dubbo.remoting.http12.exception.DecodeException;
import org.apache.dubbo.remoting.http12.exception.EncodeException;

Expand All @@ -45,6 +48,11 @@ public MediaType contentType() {
public void encode(OutputStream outputStream, Object unSerializedBody) throws EncodeException {
try {
try {
if (unSerializedBody instanceof Message) {
String jsonString = JsonFormat.printer().print((Message) unSerializedBody);
outputStream.write(jsonString.getBytes(StandardCharsets.UTF_8));
return;
}
String jsonString = JsonUtils.toJson(unSerializedBody);
outputStream.write(jsonString.getBytes(StandardCharsets.UTF_8));
} finally {
Expand Down Expand Up @@ -79,6 +87,11 @@ public Object decode(InputStream body, Class<?> targetType) throws DecodeExcepti
while ((len = body.read(data)) != -1) {
builder.append(new String(data, 0, len));
}
if (isProtobuf(targetType)) {
Message.Builder newBuilder = (Message.Builder) MethodUtils.findMethod(targetType, "newBuilder").invoke(null);
JsonFormat.parser().ignoringUnknownFields().merge(builder.toString(), newBuilder);
return newBuilder.build();
}
return JsonUtils.toJavaObject(builder.toString(), targetType);
} finally {
body.close();
Expand Down Expand Up @@ -124,4 +137,11 @@ public Object[] decode(InputStream dataInputStream, Class<?>[] targetTypes) thro
}
}

private boolean isProtobuf(Class<?> targetType) {
if (targetType == null) {
return false;
}
return Message.class.isAssignableFrom(targetType);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.remoting.http12.message;

import org.apache.dubbo.remoting.http12.exception.DecodeException;

import java.io.InputStream;

public class NoOpStreamingDecoder implements StreamingDecoder {

private FragmentListener listener;

@Override
public void request(int numMessages) {
// do nothing
}

@Override
public void decode(InputStream inputStream) throws DecodeException {
listener.onFragmentMessage(inputStream);
}

@Override
public void close() {
this.listener.onClose();
}

@Override
public void setFragmentListener(FragmentListener listener) {
this.listener = listener;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,17 +118,20 @@ public void onMetadata(HEADER metadata) {
try {
this.executor = initializeExecutor(metadata);
} catch (Throwable throwable) {
LOGGER.error("initialize executor fail.", throwable);
onError(throwable);
return;
}
if (this.executor == null) {
LOGGER.error("executor must be not null.");
onError(new NullPointerException("initializeExecutor return null"));
return;
}
executor.execute(() -> {
try {
doOnMetadata(metadata);
} catch (Throwable throwable) {
LOGGER.error("server internal error", throwable);
onError(throwable);
}
});
Expand Down Expand Up @@ -174,6 +177,7 @@ public void onData(MESSAGE message) {
try {
doOnData(message);
} catch (Throwable e) {
LOGGER.error("server internal error", e);
onError(e);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
import org.apache.dubbo.remoting.http12.h2.Http2TransportListener;
import org.apache.dubbo.remoting.http12.message.DefaultListeningDecoder;
import org.apache.dubbo.remoting.http12.message.JsonCodec;
import org.apache.dubbo.remoting.http12.message.LengthFieldStreamingDecoder;
import org.apache.dubbo.remoting.http12.message.ListeningDecoder;
import org.apache.dubbo.remoting.http12.message.MethodMetadata;
import org.apache.dubbo.remoting.http12.message.NoOpStreamingDecoder;
import org.apache.dubbo.remoting.http12.message.StreamingDecoder;
import org.apache.dubbo.rpc.CancellationContext;
import org.apache.dubbo.rpc.Invoker;
Expand Down Expand Up @@ -116,8 +116,8 @@ public void cancelByRemote(long errorCode) {
}

protected StreamingDecoder newStreamingDecoder() {
//default lengthFieldLength = 4
return new LengthFieldStreamingDecoder();
//default no op
return new NoOpStreamingDecoder();
}

protected void doOnMetadata(Http2Header metadata) {
Expand Down

0 comments on commit d596eeb

Please sign in to comment.