Skip to content

Commit

Permalink
Add codecs for JSON support with Protobuf
Browse files Browse the repository at this point in the history
Prior to this commit, WebFlux had Protobuf codecs for managing the
`Message`/`"application/x-protobuf"` encoding and decoding.
The `com.google.protobuf:protobuf-java-util` library has additional
support for JSON (de)serialization, but this is not supported by
existing codecs.

This commit adds the new `ProtobufJsonEncode` and `ProtobufJsonDecoder`
classes that support this use case. Note, the `ProtobufJsonDecoder` has
a significant limitation: it cannot decode JSON arrays as
`Flux<Message>` because there is no available non-blocking parser able
to tokenize JSON arrays into streams of `Databuffer`. Instead,
applications should decode to `Mono<List<Message>>` which causes
additional buffering but is properly supported.

Closes gh-25457
  • Loading branch information
bclozel committed Jun 18, 2024
1 parent 24bbc6d commit 17abb4d
Show file tree
Hide file tree
Showing 6 changed files with 560 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -47,7 +47,7 @@
*
* <p>To generate {@code Message} Java classes, you need to install the {@code protoc} binary.
*
* <p>This encoder requires Protobuf 3 or higher, and supports
* <p>This encoder requires Protobuf 3.29 or higher, and supports
* {@code "application/x-protobuf"} and {@code "application/octet-stream"} with the official
* {@code "com.google.protobuf:protobuf-java"} library.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,8 +28,8 @@
import reactor.core.publisher.Mono;

import org.springframework.core.ResolvableType;
import org.springframework.core.codec.DecodingException;
import org.springframework.core.codec.Encoder;
import org.springframework.core.codec.EncodingException;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.http.codec.EncoderHttpMessageWriter;
Expand Down Expand Up @@ -97,7 +97,7 @@ else if (!ProtobufEncoder.DELIMITED_VALUE.equals(mediaType.getParameters().get(P
return super.write(inputStream, elementType, mediaType, message, hints);
}
catch (Exception ex) {
return Mono.error(new DecodingException("Could not read Protobuf message: " + ex.getMessage(), ex));
return Mono.error(new EncodingException("Could not write Protobuf message: " + ex.getMessage(), ex));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.http.codec.protobuf;

import java.io.InputStreamReader;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;

import com.google.protobuf.Message;
import com.google.protobuf.util.JsonFormat;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.springframework.core.ResolvableType;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.DecodingException;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferLimitException;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.MediaType;
import org.springframework.lang.Nullable;
import org.springframework.util.ConcurrentReferenceHashMap;
import org.springframework.util.MimeType;

/**
* A {@code Decoder} that reads a JSON byte stream and converts it to
* <a href="https://developers.google.com/protocol-buffers/">Google Protocol Buffers</a>
* {@link com.google.protobuf.Message}s.
*
* <p>Flux deserialized via
* {@link #decode(Publisher, ResolvableType, MimeType, Map)} are not supported because
* the Protobuf Java Util library does not provide a non-blocking parser
* that splits a JSON stream into tokens.
* Applications should consider decoding to {@code Mono<Message>} or
* {@code Mono<List<Message>>}, which will use the supported
* {@link #decodeToMono(Publisher, ResolvableType, MimeType, Map)}.
*
* <p>To generate {@code Message} Java classes, you need to install the
* {@code protoc} binary.
*
* <p>This decoder requires Protobuf 3.29 or higher, and supports
* {@code "application/json"} and {@code "application/*+json"} with
* the official {@code "com.google.protobuf:protobuf-java-util"} library.
*
* @author Brian Clozel
* @since 6.2
* @see ProtobufJsonEncoder
*/
public class ProtobufJsonDecoder implements Decoder<Message> {

/** The default max size for aggregating messages. */
protected static final int DEFAULT_MESSAGE_MAX_SIZE = 256 * 1024;

private static final List<MimeType> defaultMimeTypes = List.of(MediaType.APPLICATION_JSON,
new MediaType("application", "*+json"));

private static final ConcurrentMap<Class<?>, Method> methodCache = new ConcurrentReferenceHashMap<>();

private final JsonFormat.Parser parser;

private int maxMessageSize = DEFAULT_MESSAGE_MAX_SIZE;

/**
* Construct a new {@link ProtobufJsonDecoder} using a default {@link JsonFormat.Parser} instance.
*/
public ProtobufJsonDecoder() {
this(JsonFormat.parser());
}

/**
* Construct a new {@link ProtobufJsonDecoder} using the given {@link JsonFormat.Parser} instance.
*/
public ProtobufJsonDecoder(JsonFormat.Parser parser) {
this.parser = parser;
}

/**
* Return the {@link #setMaxMessageSize configured} message size limit.
*/
public int getMaxMessageSize() {
return this.maxMessageSize;
}

/**
* The max size allowed per message.
* <p>By default, this is set to 256K.
* @param maxMessageSize the max size per message, or -1 for unlimited
*/
public void setMaxMessageSize(int maxMessageSize) {
this.maxMessageSize = maxMessageSize;
}

@Override
public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType) {
return Message.class.isAssignableFrom(elementType.toClass()) && supportsMimeType(mimeType);
}

private static boolean supportsMimeType(@Nullable MimeType mimeType) {
if (mimeType == null) {
return false;
}
for (MimeType m : defaultMimeTypes) {
if (m.isCompatibleWith(mimeType)) {
return true;
}
}
return false;
}


@Override
public List<MimeType> getDecodableMimeTypes() {
return defaultMimeTypes;
}

@Override
public Flux<Message> decode(Publisher<DataBuffer> inputStream, ResolvableType targetType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
return Flux.error(new UnsupportedOperationException("Protobuf decoder does not support Flux, use Mono<List<...>> instead."));
}

@Override
public Message decode(DataBuffer dataBuffer, ResolvableType targetType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) throws DecodingException {
try {
Message.Builder builder = getMessageBuilder(targetType.toClass());
this.parser.merge(new InputStreamReader(dataBuffer.asInputStream()), builder);
return builder.build();
}
catch (Exception ex) {
throw new DecodingException("Could not read Protobuf message: " + ex.getMessage(), ex);
}
finally {
DataBufferUtils.release(dataBuffer);
}
}

/**
* Create a new {@code Message.Builder} instance for the given class.
* <p>This method uses a ConcurrentHashMap for caching method lookups.
*/
private static Message.Builder getMessageBuilder(Class<?> clazz) throws Exception {
Method method = methodCache.get(clazz);
if (method == null) {
method = clazz.getMethod("newBuilder");
methodCache.put(clazz, method);
}
return (Message.Builder) method.invoke(clazz);
}

@Override
public Mono<Message> decodeToMono(Publisher<DataBuffer> inputStream, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
return DataBufferUtils.join(inputStream, this.maxMessageSize)
.map(dataBuffer -> decode(dataBuffer, elementType, mimeType, hints))
.onErrorMap(DataBufferLimitException.class, exc -> new DecodingException("Could not decode JSON as Protobuf message", exc));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.http.codec.protobuf;

import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

import com.google.protobuf.Message;
import com.google.protobuf.util.JsonFormat;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.MediaType;
import org.springframework.http.codec.HttpMessageEncoder;
import org.springframework.lang.Nullable;
import org.springframework.util.FastByteArrayOutputStream;
import org.springframework.util.MimeType;

/**
* A {@code Encoder} that writes {@link com.google.protobuf.Message}s as JSON.
*
* <p>To generate {@code Message} Java classes, you need to install the
* {@code protoc} binary.
*
* <p>This encoder requires Protobuf 3.29 or higher, and supports
* {@code "application/json"} and {@code "application/*+json"} with
* the official {@code "com.google.protobuf:protobuf-java-util"} library.
*
* @author Brian Clozel
* @since 6.2
* @see ProtobufJsonDecoder
*/
public class ProtobufJsonEncoder implements HttpMessageEncoder<Message> {

private static final byte[] EMPTY_BYTES = new byte[0];

private static final ResolvableType MESSAGE_TYPE = ResolvableType.forClass(Message.class);

private static final List<MimeType> defaultMimeTypes = List.of(
MediaType.APPLICATION_JSON,
new MediaType("application", "*+json"));

private final JsonFormat.Printer printer;


/**
* Construct a new {@link ProtobufJsonEncoder} using a default {@link JsonFormat.Printer} instance.
*/
public ProtobufJsonEncoder() {
this(JsonFormat.printer());
}

/**
* Construct a new {@link ProtobufJsonEncoder} using the given {@link JsonFormat.Printer} instance.
*/
public ProtobufJsonEncoder(JsonFormat.Printer printer) {
this.printer = printer;
}

@Override
public List<MediaType> getStreamingMediaTypes() {
return List.of(MediaType.APPLICATION_NDJSON);
}

@Override
public List<MimeType> getEncodableMimeTypes() {
return defaultMimeTypes;
}

@Override
public boolean canEncode(ResolvableType elementType, @Nullable MimeType mimeType) {
return Message.class.isAssignableFrom(elementType.toClass()) && supportsMimeType(mimeType);
}

private static boolean supportsMimeType(@Nullable MimeType mimeType) {
if (mimeType == null) {
return false;
}
for (MimeType m : defaultMimeTypes) {
if (m.isCompatibleWith(mimeType)) {
return true;
}
}
return false;
}

@Override
public Flux<DataBuffer> encode(Publisher<? extends Message> inputStream, DataBufferFactory bufferFactory, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
if (inputStream instanceof Mono) {
return Mono.from(inputStream)
.map(value -> encodeValue(value, bufferFactory, elementType, mimeType, hints))
.flux();
}
JsonArrayJoinHelper helper = new JsonArrayJoinHelper();

// Do not prepend JSON array prefix until first signal is known, onNext vs onError
// Keeps response not committed for error handling
return Flux.from(inputStream)
.map(value -> {
byte[] prefix = helper.getPrefix();
byte[] delimiter = helper.getDelimiter();
DataBuffer dataBuffer = encodeValue(value, bufferFactory, MESSAGE_TYPE, mimeType, hints);
return (prefix.length > 0 ?
bufferFactory.join(List.of(bufferFactory.wrap(prefix), bufferFactory.wrap(delimiter), dataBuffer)) :
bufferFactory.join(List.of(bufferFactory.wrap(delimiter), dataBuffer)));
})
.switchIfEmpty(Mono.fromCallable(() -> bufferFactory.wrap(helper.getPrefix())))
.concatWith(Mono.fromCallable(() -> bufferFactory.wrap(helper.getSuffix())));
}

@Override
public DataBuffer encodeValue(Message message, DataBufferFactory bufferFactory, ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
OutputStreamWriter writer = new OutputStreamWriter(bos, StandardCharsets.UTF_8);
try {
this.printer.appendTo(message, writer);
writer.flush();
byte[] bytes = bos.toByteArrayUnsafe();
return bufferFactory.wrap(bytes);
}
catch (IOException ex) {
throw new IllegalStateException("Unexpected I/O error while writing to data buffer", ex);
}
}

private static class JsonArrayJoinHelper {

private static final byte[] COMMA_SEPARATOR = {','};

private static final byte[] OPEN_BRACKET = {'['};

private static final byte[] CLOSE_BRACKET = {']'};

private boolean firstItemEmitted;

public byte[] getDelimiter() {
if (this.firstItemEmitted) {
return COMMA_SEPARATOR;
}
this.firstItemEmitted = true;
return EMPTY_BYTES;
}

public byte[] getPrefix() {
return (this.firstItemEmitted ? EMPTY_BYTES : OPEN_BRACKET);
}

public byte[] getSuffix() {
return CLOSE_BRACKET;
}
}
}
Loading

0 comments on commit 17abb4d

Please sign in to comment.