From c71e2fe730c575876ab4c8776caa7cf8722dc30b Mon Sep 17 00:00:00 2001 From: geordie Date: Tue, 22 Nov 2022 01:04:33 +0800 Subject: [PATCH 1/4] astraea-974 Implement RecordHandler post request --- .../java/org/astraea/app/web/Channel.java | 39 +++- .../java/org/astraea/app/web/PostRequest.java | 1 + .../org/astraea/app/web/RecordHandler.java | 157 +++++++------- .../java/org/astraea/app/web/Request.java | 23 ++ .../java/org/astraea/app/EnumInfoTest.java | 44 +--- .../astraea/app/web/RecordHandlerTest.java | 176 +++++++-------- .../java/org/astraea/app/web/RequestTest.java | 165 ++++++++++++++ .../test/java/org/astraea/web/TestUtils.java | 62 ++++++ .../java/org/astraea/web/TestUtilsTest.java | 36 ++++ .../main/java/org/astraea/common/Utils.java | 11 + .../astraea/common/json/JsonConverter.java | 76 ++++++- .../java/org/astraea/common/json/TypeRef.java | 6 + .../common/json/JsonConverterTest.java | 201 ++++++++++++++++-- .../org/astraea/common/json/TypeRefTest.java | 5 + 14 files changed, 751 insertions(+), 251 deletions(-) create mode 100644 app/src/main/java/org/astraea/app/web/Request.java create mode 100644 app/src/test/java/org/astraea/app/web/RequestTest.java create mode 100644 app/src/test/java/org/astraea/web/TestUtils.java create mode 100644 app/src/test/java/org/astraea/web/TestUtilsTest.java diff --git a/app/src/main/java/org/astraea/app/web/Channel.java b/app/src/main/java/org/astraea/app/web/Channel.java index a08f6ed6e3..b7785e7d1b 100644 --- a/app/src/main/java/org/astraea/app/web/Channel.java +++ b/app/src/main/java/org/astraea/app/web/Channel.java @@ -29,6 +29,8 @@ import java.util.stream.Collectors; import org.astraea.common.EnumInfo; import org.astraea.common.Utils; +import org.astraea.common.json.JsonConverter; +import org.astraea.common.json.TypeRef; interface Channel { @@ -50,15 +52,21 @@ static Channel ofQueries(Map queries) { return builder().type(Type.GET).queries(queries).build(); } + @Deprecated static Channel ofRequest(PostRequest request) { return builder().type(Type.POST).request(request).build(); } + static Channel ofRequest(String json) { + return builder().type(Type.POST).request(json).build(); + } + class Builder { private Type type = Type.UNKNOWN; private Optional target = Optional.empty(); private Map queries = Map.of(); - private PostRequest request = PostRequest.EMPTY; + private Optional body = Optional.empty(); + @Deprecated private PostRequest request = PostRequest.EMPTY; private Consumer sender = r -> {}; private Builder() {} @@ -82,10 +90,17 @@ public Builder queries(Map queries) { return this; } + @Deprecated public Builder request(Map request) { return request(PostRequest.of(request)); } + public Builder request(String json) { + this.body = Optional.ofNullable(json); + return this; + } + + @Deprecated public Builder request(PostRequest request) { this.request = request; return this; @@ -113,6 +128,12 @@ public PostRequest request() { return request; } + @Override + public T request(TypeRef typeRef) { + var json = body.orElse("{}"); + return JsonConverter.defaultConverter().fromJson(json, typeRef); + } + @Override public Map queries() { return queries; @@ -176,13 +197,20 @@ static Channel of(HttpExchange exchange) { .collect(Collectors.toMap(p -> p.split("=")[0], p -> p.split("=")[1])); }; - Function parseRequest = + Function parsePostRequest = stream -> { var bs = Utils.packException(stream::readAllBytes); if (bs == null || bs.length == 0) return PostRequest.EMPTY; return PostRequest.of(new String(bs, StandardCharsets.UTF_8)); }; + Function parseRequest = + stream -> { + var bs = Utils.packException(stream::readAllBytes); + if (bs == null || bs.length == 0) return null; + return new String(bs, StandardCharsets.UTF_8); + }; + Function parseType = name -> { switch (name.toUpperCase(Locale.ROOT)) { @@ -202,6 +230,7 @@ static Channel of(HttpExchange exchange) { .type(parseType.apply(exchange.getRequestMethod())) .target(parseTarget.apply(exchange.getRequestURI())) .queries(parseQueries.apply(exchange.getRequestURI())) + .request(parsePostRequest.apply(exchange.getRequestBody())) .request(parseRequest.apply(exchange.getRequestBody())) .sender( response -> { @@ -235,8 +264,14 @@ static Channel of(HttpExchange exchange) { /** * @return body request */ + @Deprecated PostRequest request(); + /** + * @return body request + */ + T request(TypeRef typeRef); + /** * @return the queries appended to URL */ diff --git a/app/src/main/java/org/astraea/app/web/PostRequest.java b/app/src/main/java/org/astraea/app/web/PostRequest.java index 9f0e5dc859..45b178ca65 100644 --- a/app/src/main/java/org/astraea/app/web/PostRequest.java +++ b/app/src/main/java/org/astraea/app/web/PostRequest.java @@ -28,6 +28,7 @@ import java.util.Optional; import java.util.stream.Collectors; +@Deprecated public interface PostRequest { PostRequest EMPTY = PostRequest.of(Map.of()); diff --git a/app/src/main/java/org/astraea/app/web/RecordHandler.java b/app/src/main/java/org/astraea/app/web/RecordHandler.java index 92c015e6ea..008d1977e9 100644 --- a/app/src/main/java/org/astraea/app/web/RecordHandler.java +++ b/app/src/main/java/org/astraea/app/web/RecordHandler.java @@ -19,17 +19,7 @@ import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toList; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonDeserializationContext; -import com.google.gson.JsonDeserializer; -import com.google.gson.JsonElement; -import com.google.gson.JsonParseException; -import com.google.gson.JsonPrimitive; -import com.google.gson.JsonSerializationContext; -import com.google.gson.JsonSerializer; -import java.lang.reflect.Type; import java.time.Duration; -import java.util.Base64; import java.util.Collection; import java.util.List; import java.util.Optional; @@ -40,6 +30,7 @@ import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; +import org.astraea.app.web.Request.RequestObject; import org.astraea.common.Cache; import org.astraea.common.EnumInfo; import org.astraea.common.FutureUtils; @@ -53,15 +44,14 @@ import org.astraea.common.consumer.Deserializer; import org.astraea.common.consumer.SeekStrategy; import org.astraea.common.consumer.SubscribedConsumer; +import org.astraea.common.json.JsonConverter; +import org.astraea.common.json.TypeRef; import org.astraea.common.producer.Producer; import org.astraea.common.producer.ProducerConfigs; import org.astraea.common.producer.Serializer; public class RecordHandler implements Handler { - static final String RECORDS = "records"; - static final String TRANSACTION_ID = "transactionId"; static final String PARTITION = "partition"; - static final String ASYNC = "async"; static final String DISTANCE_FROM_LATEST = "distanceFromLatest"; static final String DISTANCE_FROM_BEGINNING = "distanceFromBeginning"; static final String SEEK_TO = "seekTo"; @@ -179,19 +169,14 @@ GetResponse get(Consumer consumer, int limit, Duration timeout) @Override public CompletionStage post(Channel channel) { - var async = channel.request().getBoolean(ASYNC).orElse(false); - var timeout = - channel.request().get(TIMEOUT).map(Utils::toDuration).orElse(Duration.ofSeconds(5)); - var records = channel.request().values(RECORDS, PostRecord.class); + var postRequest = channel.request(TypeRef.of(RecordPostRequest.class)); + + var records = postRequest.records(); if (records.isEmpty()) throw new IllegalArgumentException("records should contain at least one record"); var producer = - channel - .request() - .get(TRANSACTION_ID) - .map(transactionalProducerCache::get) - .orElse(this.producer); + postRequest.transactionId().map(transactionalProducerCache::get).orElse(this.producer); var result = CompletableFuture.supplyAsync( @@ -224,10 +209,13 @@ public CompletionStage post(Channel channel) { .map(CompletionStage::toCompletableFuture) .collect(toList()))); - if (async) return CompletableFuture.completedFuture(Response.ACCEPT); + if (postRequest.async()) return CompletableFuture.completedFuture(Response.ACCEPT); return CompletableFuture.completedFuture( Utils.packException( - () -> new PostResponse(result.get(timeout.toNanos(), TimeUnit.NANOSECONDS)))); + () -> + new PostResponse( + result.get( + Utils.toDuration(postRequest.timeout()).toNanos(), TimeUnit.NANOSECONDS)))); } @Override @@ -274,39 +262,42 @@ public CompletionStage delete(Channel channel) { enum SerDe implements EnumInfo { BYTEARRAY( (topic, value) -> - Optional.ofNullable(value).map(v -> Base64.getDecoder().decode(v)).orElse(null), + Optional.ofNullable(value) + .map(x -> JsonConverter.defaultConverter().fromJson(x, TypeRef.bytes())) + .orElse(null), Deserializer.BYTE_ARRAY), STRING( (topic, value) -> Optional.ofNullable(value) - .map(v -> Serializer.STRING.serialize(topic, List.of(), value)) + .map(x -> JsonConverter.defaultConverter().fromJson(x, TypeRef.of(String.class))) + .map(v -> Serializer.STRING.serialize(topic, List.of(), v)) .orElse(null), Deserializer.STRING), LONG( (topic, value) -> Optional.ofNullable(value) - .map(Long::parseLong) + .map(x -> JsonConverter.defaultConverter().fromJson(x, TypeRef.of(Long.class))) .map(longVal -> Serializer.LONG.serialize(topic, List.of(), longVal)) .orElse(null), Deserializer.LONG), INTEGER( (topic, value) -> Optional.ofNullable(value) - .map(Integer::parseInt) + .map(x -> JsonConverter.defaultConverter().fromJson(x, TypeRef.of(Integer.class))) .map(intVal -> Serializer.INTEGER.serialize(topic, List.of(), intVal)) .orElse(null), Deserializer.INTEGER), FLOAT( (topic, value) -> Optional.ofNullable(value) - .map(Float::parseFloat) + .map(x -> JsonConverter.defaultConverter().fromJson(x, TypeRef.of(Float.class))) .map(floatVal -> Serializer.FLOAT.serialize(topic, List.of(), floatVal)) .orElse(null), Deserializer.FLOAT), DOUBLE( (topic, value) -> Optional.ofNullable(value) - .map(Double::parseDouble) + .map(x -> JsonConverter.defaultConverter().fromJson(x, TypeRef.of(Double.class))) .map(doubleVal -> Serializer.DOUBLE.serialize(topic, List.of(), doubleVal)) .orElse(null), Deserializer.DOUBLE); @@ -325,7 +316,9 @@ public String toString() { return alias(); } + /** (topic, json) convert to bytes */ final BiFunction serializer; + final Deserializer deserializer; SerDe(BiFunction serializer, Deserializer deserializer) { @@ -336,30 +329,51 @@ public String toString() { private static org.astraea.common.producer.Record createRecord( Producer producer, PostRecord postRecord) { - var topic = - Optional.ofNullable(postRecord.topic) - .orElseThrow(() -> new IllegalArgumentException("topic must be set")); + var topic = postRecord.topic; var builder = org.astraea.common.producer.Record.builder().topic(topic); + // TODO: Support headers // (https://github.com/skiptests/astraea/issues/422) - var keySerializer = - Optional.ofNullable(postRecord.keySerializer) - .map(name -> SerDe.ofAlias(name).serializer) - .orElse(SerDe.STRING.serializer); - var valueSerializer = - Optional.ofNullable(postRecord.valueSerializer) - .map(name -> SerDe.ofAlias(name).serializer) - .orElse(SerDe.STRING.serializer); - - Optional.ofNullable(postRecord.key) - .ifPresent(key -> builder.key(keySerializer.apply(topic, PostRequest.handle(key)))); - Optional.ofNullable(postRecord.value) - .ifPresent(value -> builder.value(valueSerializer.apply(topic, PostRequest.handle(value)))); - Optional.ofNullable(postRecord.timestamp).ifPresent(builder::timestamp); - Optional.ofNullable(postRecord.partition).ifPresent(builder::partition); + var keySerializer = SerDe.ofAlias(postRecord.keySerializer).serializer; + var valueSerializer = SerDe.ofAlias(postRecord.valueSerializer).serializer; + postRecord.key.ifPresent( + key -> + builder.key(keySerializer.apply(topic, JsonConverter.defaultConverter().toJson(key)))); + postRecord.value.ifPresent( + value -> + builder.value( + valueSerializer.apply(topic, JsonConverter.defaultConverter().toJson(value)))); + postRecord.timestamp.ifPresent(builder::timestamp); + postRecord.partition.ifPresent(builder::partition); return builder.build(); } + static class RecordPostRequest implements Request { + private boolean async = false; + private String timeout = "5s"; + private List records = List.of(); + + private Optional transactionId = Optional.empty(); + + public RecordPostRequest() {} + + public boolean async() { + return async; + } + + public String timeout() { + return timeout; + } + + public List records() { + return records; + } + + public Optional transactionId() { + return transactionId; + } + } + static class Metadata implements Response { final String topic; final int partition; @@ -389,12 +403,7 @@ private GetResponse(Consumer consumer, Collection record @Override public String json() { - return new GsonBuilder() - // gson will do html escape by default (e.g. convert = to \u003d) - .disableHtmlEscaping() - .registerTypeHierarchyAdapter(byte[].class, new ByteArrayToBase64TypeAdapter()) - .create() - .toJson(this); + return JsonConverter.defaultConverter().toJson(this); } @Override @@ -445,26 +454,16 @@ static class Header { } } - static class ByteArrayToBase64TypeAdapter - implements JsonSerializer, JsonDeserializer { - public byte[] deserialize(JsonElement json, Type type, JsonDeserializationContext context) - throws JsonParseException { - return Base64.getDecoder().decode(json.getAsString()); - } + static class PostRecord implements RequestObject { + String topic; + Optional partition = Optional.empty(); + String keySerializer = "STRING"; + String valueSerializer = "STRING"; + Optional key = Optional.empty(); + Optional value = Optional.empty(); + Optional timestamp = Optional.empty(); - public JsonElement serialize(byte[] src, Type type, JsonSerializationContext context) { - return new JsonPrimitive(Base64.getEncoder().encodeToString(src)); - } - } - - static class PostRecord { - final String topic; - final Integer partition; - final String keySerializer; - final String valueSerializer; - final Object key; - final Object value; - final Long timestamp; + public PostRecord() {} PostRecord( String topic, @@ -475,12 +474,12 @@ static class PostRecord { Object value, Long timestamp) { this.topic = topic; - this.partition = partition; - this.keySerializer = keySerializer; - this.valueSerializer = valueSerializer; - this.key = key; - this.value = value; - this.timestamp = timestamp; + this.partition = Optional.ofNullable(partition); + this.keySerializer = Optional.ofNullable(keySerializer).orElse("STRING"); + this.valueSerializer = Optional.ofNullable(valueSerializer).orElse("STRING"); + this.key = Optional.ofNullable(key); + this.value = Optional.ofNullable(value); + this.timestamp = Optional.ofNullable(timestamp); } } diff --git a/app/src/main/java/org/astraea/app/web/Request.java b/app/src/main/java/org/astraea/app/web/Request.java new file mode 100644 index 0000000000..22f0d0ae99 --- /dev/null +++ b/app/src/main/java/org/astraea/app/web/Request.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.web; + +public interface Request { + + /** Indicates the object used in the request */ + interface RequestObject {} +} diff --git a/app/src/test/java/org/astraea/app/EnumInfoTest.java b/app/src/test/java/org/astraea/app/EnumInfoTest.java index 61269bf40e..ffd9e63f31 100644 --- a/app/src/test/java/org/astraea/app/EnumInfoTest.java +++ b/app/src/test/java/org/astraea/app/EnumInfoTest.java @@ -16,17 +16,11 @@ */ package org.astraea.app; -import java.io.File; -import java.nio.file.Path; import java.util.Arrays; -import java.util.Collections; -import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.apache.commons.io.FileUtils; -import org.apache.commons.io.FilenameUtils; import org.astraea.common.EnumInfo; -import org.astraea.common.Utils; +import org.astraea.web.TestUtils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtensionContext; @@ -101,13 +95,13 @@ public static class EnumClassProvider implements ArgumentsProvider { @Override public Stream provideArguments(ExtensionContext context) { - return getProductionClass().stream().filter(Class::isEnum).map(Arguments::of); + return TestUtils.getProductionClass().stream().filter(Class::isEnum).map(Arguments::of); } } @Test void testProductionClass() { - var productionClasses = getProductionClass(); + var productionClasses = TestUtils.getProductionClass(); Assertions.assertTrue(productionClasses.size() > 100); Assertions.assertTrue( productionClasses.stream().allMatch(x -> x.getPackageName().startsWith("org.astraea"))); @@ -122,36 +116,4 @@ void testEnumClassProvider() { Assertions.assertTrue(enumCls.size() > 0); Assertions.assertTrue(enumCls.stream().map(x -> (Class) x.get()[0]).allMatch(Class::isEnum)); } - - private static List> getProductionClass() { - var pkg = "org/astraea"; - System.out.println(EnumInfoTest.class.getClassLoader()); - var mainDir = - Collections.list( - Utils.packException(() -> EnumInfoTest.class.getClassLoader().getResources(pkg))) - .stream() - .peek(x -> System.out.println(x.toExternalForm())) - .filter(x -> x.toExternalForm().contains("main/" + pkg)) - .findFirst() - .map(x -> Utils.packException(() -> Path.of(x.toURI()))) - .map(x -> x.resolve("../../").normalize()) - .orElseThrow(); - - var dirFiles = - FileUtils.listFiles(mainDir.toFile(), new String[] {"class"}, true).stream() - .map(File::toPath) - .map(mainDir::relativize) - .collect(Collectors.toList()); - - var classNames = - dirFiles.stream() - .map(Path::toString) - .map(FilenameUtils::removeExtension) - .map(x -> x.replace(File.separatorChar, '.')) - .collect(Collectors.toList()); - - return classNames.stream() - .map(x -> Utils.packException(() -> Class.forName(x))) - .collect(Collectors.toList()); - } } diff --git a/app/src/test/java/org/astraea/app/web/RecordHandlerTest.java b/app/src/test/java/org/astraea/app/web/RecordHandlerTest.java index bb5223763d..3f6719cea6 100644 --- a/app/src/test/java/org/astraea/app/web/RecordHandlerTest.java +++ b/app/src/test/java/org/astraea/app/web/RecordHandlerTest.java @@ -18,7 +18,6 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.stream.Collectors.toList; -import static org.astraea.app.web.RecordHandler.ASYNC; import static org.astraea.app.web.RecordHandler.DISTANCE_FROM_BEGINNING; import static org.astraea.app.web.RecordHandler.DISTANCE_FROM_LATEST; import static org.astraea.app.web.RecordHandler.GROUP_ID; @@ -26,15 +25,11 @@ import static org.astraea.app.web.RecordHandler.LIMIT; import static org.astraea.app.web.RecordHandler.OFFSET; import static org.astraea.app.web.RecordHandler.PARTITION; -import static org.astraea.app.web.RecordHandler.RECORDS; import static org.astraea.app.web.RecordHandler.SEEK_TO; import static org.astraea.app.web.RecordHandler.TIMEOUT; -import static org.astraea.app.web.RecordHandler.TRANSACTION_ID; import static org.astraea.app.web.RecordHandler.VALUE_DESERIALIZER; import static org.junit.jupiter.params.provider.Arguments.arguments; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; import java.nio.ByteBuffer; import java.time.Duration; import java.util.Base64; @@ -46,15 +41,15 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; -import org.astraea.app.web.RecordHandler.ByteArrayToBase64TypeAdapter; import org.astraea.app.web.RecordHandler.Metadata; -import org.astraea.common.ExecutionRuntimeException; +import org.astraea.app.web.RecordHandler.PostRecord; import org.astraea.common.Header; import org.astraea.common.Utils; import org.astraea.common.admin.Admin; import org.astraea.common.consumer.Consumer; import org.astraea.common.consumer.ConsumerConfigs; import org.astraea.common.consumer.Deserializer; +import org.astraea.common.json.JsonConverter; import org.astraea.common.producer.Producer; import org.astraea.common.producer.Record; import org.astraea.it.RequireBrokerCluster; @@ -68,6 +63,10 @@ public class RecordHandlerTest extends RequireBrokerCluster { + static final String RECORDS = "records"; + static final String TRANSACTION_ID = "transactionId"; + static final String ASYNC = "async"; + @Test void testInvalidPost() { var handler = getRecordHandler(); @@ -75,39 +74,46 @@ void testInvalidPost() { IllegalArgumentException.class, () -> handler - .post(Channel.ofRequest(PostRequest.of(Map.of(RECORDS, "[]")))) + .post( + Channel.ofRequest( + JsonConverter.defaultConverter().toJson(Map.of(RECORDS, List.of())))) .toCompletableFuture() .join(), "records should contain at least one record"); - var executionRuntimeException = - Assertions.assertThrows( - ExecutionRuntimeException.class, - () -> handler.post(Channel.ofRequest(PostRequest.of(Map.of(RECORDS, "[{}]")))), - "topic must be set"); - Assertions.assertEquals( - IllegalArgumentException.class, executionRuntimeException.getRootCause().getClass()); + + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + handler.post( + Channel.ofRequest( + JsonConverter.defaultConverter() + .toJson(Map.of(RECORDS, List.of(new PostRecord()))))), + "Value `$.records[].topic` is required."); } @Test void testPostTimeout() { Assertions.assertThrows( IllegalArgumentException.class, - () -> getRecordHandler().post(Channel.ofRequest(PostRequest.of(Map.of(TIMEOUT, "foo"))))); + () -> + getRecordHandler() + .post( + Channel.ofRequest( + JsonConverter.defaultConverter().toJson(Map.of(TIMEOUT, "foo"))))); Assertions.assertInstanceOf( RecordHandler.PostResponse.class, getRecordHandler() .post( Channel.ofRequest( - PostRequest.of( - new Gson() - .toJson( - Map.of( - TIMEOUT, - "10s", - RECORDS, - List.of( - new RecordHandler.PostRecord( - "test", null, null, null, null, null, null))))))) + JsonConverter.defaultConverter() + .toJson( + Map.of( + TIMEOUT, + "10s", + RECORDS, + List.of( + new RecordHandler.PostRecord( + "test", null, null, null, null, null, null)))))) .toCompletableFuture() .join()); } @@ -132,7 +138,7 @@ void testPost(boolean isTransaction) { Assertions.assertInstanceOf( RecordHandler.PostResponse.class, getRecordHandler() - .post(Channel.ofRequest(PostRequest.of(new Gson().toJson(requestParams)))) + .post(Channel.ofRequest(JsonConverter.defaultConverter().toJson(requestParams))) .toCompletableFuture() .join()); @@ -198,22 +204,21 @@ void testPostWithAsync() { handler .post( Channel.ofRequest( - PostRequest.of( - new Gson() - .toJson( - Map.of( - ASYNC, - "true", - RECORDS, - List.of( - new RecordHandler.PostRecord( - topic, - 0, - "string", - "integer", - "foo", - "100", - currentTimestamp))))))) + JsonConverter.defaultConverter() + .toJson( + Map.of( + ASYNC, + "true", + RECORDS, + List.of( + new RecordHandler.PostRecord( + topic, + 0, + "string", + "integer", + "foo", + "100", + currentTimestamp)))))) .toCompletableFuture() .join()); Assertions.assertEquals(Response.ACCEPT, result); @@ -248,14 +253,13 @@ void testSerializer(String serializer, String actual, byte[] expected) { handler .post( Channel.ofRequest( - PostRequest.of( - new Gson() - .toJson( - Map.of( - RECORDS, - List.of( - new RecordHandler.PostRecord( - topic, null, serializer, null, actual, null, null))))))) + JsonConverter.defaultConverter() + .toJson( + Map.of( + RECORDS, + List.of( + new RecordHandler.PostRecord( + topic, null, serializer, null, actual, null, null)))))) .toCompletableFuture() .join()); @@ -618,49 +622,30 @@ void testGetJsonResponse() { .toCompletableFuture() .join()); - Assertions.assertEquals( + var expected = "{\"records\":[{" - + "\"topic\":\"" - + topic + + "\"headers\":[{\"key\":\"a\"}]," + + "\"key\":\"" + + Base64.getEncoder().encodeToString("astraea".getBytes(UTF_8)) + "\"," - + "\"partition\":0," + + "\"leaderEpoch\":0," + "\"offset\":0," + + "\"partition\":0," + + "\"serializedKeySize\":7," + + "\"serializedValueSize\":4," + "\"timestamp\":" + timestamp + "," - + "\"serializedKeySize\":7," - + "\"serializedValueSize\":4," - + "\"headers\":[{\"key\":\"a\"}]," - + "\"key\":\"" - + Base64.getEncoder().encodeToString("astraea".getBytes(UTF_8)) + + "\"topic\":\"" + + topic + "\"," - + "\"value\":100," - + "\"leaderEpoch\":0" - + "}]}", - response.json()); + + "\"value\":100}]}"; + Assertions.assertEquals(expected, response.json()); // close consumer response.onComplete(null); } - @Test - void testByteArrayToBase64TypeAdapter() { - var foo = new Foo("test".getBytes()); - var gson = - new GsonBuilder() - .registerTypeHierarchyAdapter(byte[].class, new ByteArrayToBase64TypeAdapter()) - .create(); - Assertions.assertArrayEquals(foo.bar, gson.fromJson(gson.toJson(foo), Foo.class).bar); - } - - private static class Foo { - final byte[] bar; - - public Foo(byte[] bar) { - this.bar = bar; - } - } - @Test void testPostAndGet() { var topic = Utils.randomString(10); @@ -671,20 +656,19 @@ void testPostAndGet() { handler .post( Channel.ofRequest( - PostRequest.of( - new Gson() - .toJson( - Map.of( - RECORDS, - List.of( - new RecordHandler.PostRecord( - topic, - 0, - "string", - "integer", - "foo", - "100", - currentTimestamp))))))) + JsonConverter.defaultConverter() + .toJson( + Map.of( + RECORDS, + List.of( + new RecordHandler.PostRecord( + topic, + 0, + "string", + "integer", + "foo", + 100, + currentTimestamp)))))) .toCompletableFuture() .join()); diff --git a/app/src/test/java/org/astraea/app/web/RequestTest.java b/app/src/test/java/org/astraea/app/web/RequestTest.java new file mode 100644 index 0000000000..3d327e096b --- /dev/null +++ b/app/src/test/java/org/astraea/app/web/RequestTest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.web; + +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.astraea.app.web.RecordHandler.PostRecord; +import org.astraea.app.web.RecordHandler.RecordPostRequest; +import org.astraea.app.web.Request.RequestObject; +import org.astraea.common.Utils; +import org.astraea.web.TestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; + +class RequestTest { + + @ParameterizedTest + @ArgumentsSource(RequestClassProvider.class) + void testRequestImpl(Class cls) { + validateDefaultConstructor(cls); + validateDefaultAssign(cls); + } + + @Test + void testRequestClassProvider() { + var classes = + new RequestClassProvider() + .provideArguments(null) + .map(x -> (Class) x.get()[0]) + .collect(Collectors.toList()); + + Assertions.assertTrue(classes.contains(RecordPostRequest.class)); + Assertions.assertTrue(classes.contains(PostRecord.class)); + } + + @Test + void testValidateConstructor() { + Assertions.assertThrows( + Throwable.class, () -> validateDefaultConstructor(NoDefaultConstructor.class)); + + validateDefaultConstructor(NoDefaultAssignMap.class); + } + + @Test + void testValidateAssign() { + Assertions.assertThrows( + Throwable.class, () -> validateDefaultAssign(NoDefaultAssignList.class)); + Assertions.assertThrows( + Throwable.class, () -> validateDefaultAssign(NoDefaultAssignOptional.class)); + Assertions.assertThrows(Throwable.class, () -> validateDefaultAssign(NoDefaultAssignMap.class)); + + validateDefaultAssign(HasDefaultAssignList.class); + } + + private void validateDefaultConstructor(Class cls) { + Assertions.assertDoesNotThrow( + () -> { + cls.getDeclaredConstructor(); + }); + } + + /** test optional, map, list assigned */ + private void validateDefaultAssign(Class cls) { + var instance = Utils.packException(() -> cls.getDeclaredConstructor().newInstance()); + Arrays.stream(cls.getDeclaredFields()) + .peek(x -> x.setAccessible(true)) + .forEach( + x -> + Utils.packException( + () -> { + var innerCls = x.getType(); + if (Optional.class == innerCls + || Map.class.isAssignableFrom(innerCls) + || List.class.isAssignableFrom(innerCls)) { + Assertions.assertNotNull(x.get(instance)); + } + })); + } + + public static class RequestClassProvider implements ArgumentsProvider { + @Override + public Stream provideArguments(ExtensionContext context) { + return TestUtils.getProductionClass().stream() + .filter(x -> Request.class.isAssignableFrom(x) || RequestObject.class.isAssignableFrom(x)) + .map(RequestTest::getAllFieldPojoCls) + .flatMap(Collection::stream) + .map(Arguments::of); + } + } + + public static List> getAllFieldPojoCls(Class cls) { + if (isPojo(cls)) { + return Stream.concat( + Stream.of(cls), + Arrays.stream(cls.getDeclaredFields()) + .peek(x -> System.out.println(x.getName())) + .map(Field::getType) + .flatMap(x -> getAllFieldPojoCls(x).stream())) + .collect(Collectors.toList()); + } else { + return List.of(); + } + } + + static class NoDefaultConstructor { + String a; + + public NoDefaultConstructor(String a) { + this.a = a; + } + } + + static class NoDefaultAssignMap { + Map map; + } + + static class NoDefaultAssignList { + List list; + } + + static class NoDefaultAssignOptional { + Optional opt; + } + + static class HasDefaultAssignList { + List list = List.of(); + } + + public static boolean isPojo(Class cls) { + return !(cls.isPrimitive() + || Utils.isWrapper(cls) + || cls.isSynthetic() + || cls.isInterface() + || Collection.class.isAssignableFrom(cls) + || Map.class.isAssignableFrom(cls) + || String.class == cls + || Optional.class == cls + || Object.class == cls); + } +} diff --git a/app/src/test/java/org/astraea/web/TestUtils.java b/app/src/test/java/org/astraea/web/TestUtils.java new file mode 100644 index 0000000000..e767641c6a --- /dev/null +++ b/app/src/test/java/org/astraea/web/TestUtils.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.web; + +import java.io.File; +import java.nio.file.Path; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.FilenameUtils; +import org.astraea.app.web.ResponseTest; +import org.astraea.common.Utils; + +public class TestUtils { + + private TestUtils() {} + + public static List> getProductionClass() { + var pkg = "org/astraea"; + var mainDir = + Collections.list( + Utils.packException(() -> ResponseTest.class.getClassLoader().getResources(pkg))) + .stream() + .filter(x -> x.toExternalForm().contains("main/" + pkg)) + .findFirst() + .map(x -> Utils.packException(() -> Path.of(x.toURI()))) + .map(x -> x.resolve("../../").normalize()) + .orElseThrow(); + + var dirFiles = + FileUtils.listFiles(mainDir.toFile(), new String[] {"class"}, true).stream() + .map(File::toPath) + .map(mainDir::relativize) + .collect(Collectors.toList()); + + var classNames = + dirFiles.stream() + .map(Path::toString) + .map(FilenameUtils::removeExtension) + .map(x -> x.replace(File.separatorChar, '.')) + .collect(Collectors.toList()); + + return classNames.stream() + .map(x -> Utils.packException(() -> Class.forName(x))) + .collect(Collectors.toList()); + } +} diff --git a/app/src/test/java/org/astraea/web/TestUtilsTest.java b/app/src/test/java/org/astraea/web/TestUtilsTest.java new file mode 100644 index 0000000000..0dcdbc627d --- /dev/null +++ b/app/src/test/java/org/astraea/web/TestUtilsTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.web; + +import org.astraea.app.version.Version; +import org.astraea.app.web.RecordHandler; +import org.astraea.app.web.RecordHandlerTest; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestUtilsTest { + + @Test + void testGetProductionClass() { + var productionClass = TestUtils.getProductionClass(); + Assertions.assertTrue(productionClass.size() > 100); + Assertions.assertTrue(productionClass.contains(RecordHandler.class)); + Assertions.assertTrue(productionClass.contains(Version.class)); + + Assertions.assertFalse(productionClass.contains(RecordHandlerTest.class)); + } +} diff --git a/common/src/main/java/org/astraea/common/Utils.java b/common/src/main/java/org/astraea/common/Utils.java index ba044485f5..070de8f530 100644 --- a/common/src/main/java/org/astraea/common/Utils.java +++ b/common/src/main/java/org/astraea/common/Utils.java @@ -340,5 +340,16 @@ public static Pattern wildcardToPattern(String string) { string.replaceAll("\\?", ".").replaceAll("\\*", ".*"), Pattern.CASE_INSENSITIVE); } + public static boolean isWrapper(Class cls) { + return cls == Double.class + || cls == Float.class + || cls == Long.class + || cls == Integer.class + || cls == Short.class + || cls == Character.class + || cls == Byte.class + || cls == Boolean.class; + } + private Utils() {} } diff --git a/common/src/main/java/org/astraea/common/json/JsonConverter.java b/common/src/main/java/org/astraea/common/json/JsonConverter.java index a751532b49..1f757b540d 100644 --- a/common/src/main/java/org/astraea/common/json/JsonConverter.java +++ b/common/src/main/java/org/astraea/common/json/JsonConverter.java @@ -18,6 +18,8 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.annotation.JsonSetter; +import com.fasterxml.jackson.annotation.Nulls; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.DeserializationFeature; @@ -27,6 +29,11 @@ import com.fasterxml.jackson.databind.json.JsonMapper; import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import java.lang.reflect.Type; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; import org.astraea.common.Utils; public interface JsonConverter { @@ -56,6 +63,14 @@ static JsonConverter jackson() { .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true) .visibility(new Std(JsonAutoDetect.Visibility.NONE).with(JsonAutoDetect.Visibility.ANY)) + .withConfigOverride( + List.class, + o -> + o.setSetterInfo(JsonSetter.Value.forValueNulls(Nulls.AS_EMPTY, Nulls.AS_EMPTY))) + .withConfigOverride( + Map.class, + o -> + o.setSetterInfo(JsonSetter.Value.forValueNulls(Nulls.AS_EMPTY, Nulls.AS_EMPTY))) .serializationInclusion(Include.NON_EMPTY) .build(); @@ -67,17 +82,58 @@ public String toJson(Object src) { @Override public T fromJson(String json, TypeRef typeRef) { - return Utils.packException( - () -> - objectMapper.readValue( - json, - new TypeReference() { // astraea-986 diamond not work (jdk bug) - @Override - public Type getType() { - return typeRef.getType(); - } - })); + var t = + Utils.packException( + () -> + objectMapper.readValue( + json, + new TypeReference() { // astraea-986 diamond not work (jdk bug) + @Override + public Type getType() { + return typeRef.getType(); + } + })); + preventNull("$", t); + return t; } }; } + + /** + * User Optional to handle null value. + * + * @param name use prefix to locate the error key + */ + private static void preventNull(String name, Object obj) { + if (obj == null) throw new IllegalArgumentException(name + " can not be null"); + + var objClass = obj.getClass(); + + if (Collection.class.isAssignableFrom(objClass)) { + var i = 0; + for (var c : (Collection) obj) { + preventNull(name + "[" + i + "]", c); + i++; + } + } else if (Optional.class == objClass) { + var opt = (Optional) obj; + opt.ifPresent(o -> preventNull(name, o)); + } else if (Map.class.isAssignableFrom(objClass)) { + var map = (Map) obj; + map.forEach((k, v) -> preventNull(name + "." + k, v)); + } else if (objClass.isPrimitive() + || Utils.isWrapper(objClass) + || String.class == objClass + || Object.class == objClass) { + return; + } else { + var declaredFields = objClass.getDeclaredFields(); + Arrays.stream(declaredFields) + .forEach( + x -> { + x.setAccessible(true); + preventNull(name + "." + x.getName(), Utils.packException(() -> x.get(obj))); + }); + } + } } diff --git a/common/src/main/java/org/astraea/common/json/TypeRef.java b/common/src/main/java/org/astraea/common/json/TypeRef.java index 880a8ce2a4..c0d9532133 100644 --- a/common/src/main/java/org/astraea/common/json/TypeRef.java +++ b/common/src/main/java/org/astraea/common/json/TypeRef.java @@ -16,6 +16,7 @@ */ package org.astraea.common.json; +import java.lang.reflect.Array; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.util.List; @@ -43,6 +44,11 @@ public static TypeRef> array(Class clz) { return of(TypeUtils.parameterize(List.class, clz)); } + @SuppressWarnings("unchecked") + public static TypeRef bytes() { + return of((Class) Array.newInstance(byte.class, 0).getClass()); + } + public static TypeRef> map(Class clz) { return of(TypeUtils.parameterize(Map.class, String.class, clz)); } diff --git a/common/src/test/java/org/astraea/common/json/JsonConverterTest.java b/common/src/test/java/org/astraea/common/json/JsonConverterTest.java index 3b78ca6400..d6d82a2468 100644 --- a/common/src/test/java/org/astraea/common/json/JsonConverterTest.java +++ b/common/src/test/java/org/astraea/common/json/JsonConverterTest.java @@ -17,13 +17,15 @@ package org.astraea.common.json; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import com.fasterxml.jackson.core.JsonProcessingException; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; class JsonConverterTest { @@ -96,31 +98,26 @@ void testOptional() { var testFieldClass = new TestOptionalClass(); testFieldClass.optValue = Optional.ofNullable("hello"); testFieldClass.nestedOpt = Optional.ofNullable(List.of("hello")); - testFieldClass.nonInitOpt = Optional.ofNullable("hello"); var json = jsonConverter.toJson(testFieldClass); - assertEquals( - "{\"nestedOpt\":[\"hello\"],\"nonInitOpt\":\"hello\",\"optValue\":\"hello\"}", json); + assertEquals("{\"nestedOpt\":[\"hello\"],\"optValue\":\"hello\"}", json); testFieldClass.optValue = Optional.empty(); testFieldClass.nestedOpt = Optional.empty(); - testFieldClass.nonInitOpt = Optional.empty(); json = jsonConverter.toJson(testFieldClass); assertEquals("{}", json); var convertedTestFieldClass = jsonConverter.fromJson( - "{\"nestedOpt\":[\"hello\"],\"nonInitOpt\":\"hello\",\"optValue\":\"hello\"}", + "{\"nestedOpt\":[\"hello\"],\"optValue\":\"hello\"}", TypeRef.of(TestOptionalClass.class)); assertEquals("hello", convertedTestFieldClass.optValue.get()); assertEquals(List.of("hello"), convertedTestFieldClass.nestedOpt.get()); - assertEquals("hello", convertedTestFieldClass.nonInitOpt.get()); convertedTestFieldClass = jsonConverter.fromJson("{\"optValue\":null}", TypeRef.of(TestOptionalClass.class)); assertTrue(convertedTestFieldClass.optValue.isEmpty()); assertTrue(convertedTestFieldClass.nestedOpt.isEmpty()); - assertNull(convertedTestFieldClass.nonInitOpt); } @Test @@ -130,11 +127,13 @@ void testNestedObject() { var testNestedObjectClass = new TestNestedObjectClass(); testNestedObjectClass.nestedList = List.of(Map.of("key", "value")); testNestedObjectClass.nestedObject = new TestPrimitiveClass(); + testNestedObjectClass.nestedObject.doubleValue = 5d; + testNestedObjectClass.nestedObject.stringValue = "value"; testNestedObjectClass.nestedMap = Map.of("helloKey", List.of("hello")); var json = jsonConverter.toJson(testNestedObjectClass); var expectedJson = - "{\"nestedList\":[{\"key\":\"value\"}],\"nestedMap\":{\"helloKey\":[\"hello\"]},\"nestedObject\":{\"intValue\":0}}"; + "{\"nestedList\":[{\"key\":\"value\"}],\"nestedMap\":{\"helloKey\":[\"hello\"]},\"nestedObject\":{\"doubleValue\":5.0,\"intValue\":0,\"stringValue\":\"value\"}}"; assertEquals(expectedJson, json); var fromJson = jsonConverter.fromJson(expectedJson, TypeRef.of(TestNestedObjectClass.class)); @@ -215,16 +214,173 @@ void testSerializeMapEquals() { } @Test - void testFieldNotInJson() { - var testFieldNameClass = - JsonConverter.defaultConverter() + void testFieldNotInJson() throws JsonProcessingException { + var testPrimitiveClass = + getConverter() .fromJson( - "{" + "\"actor\":123," + "\"apple\":\"notMatter\"" + "}", - TypeRef.of(TestFieldNameClass.class)); + "{\"doubleValue\":5.0,\"intValue\":0,\"stringValue\":\"value\",\"notInField\":\"value\"}", + TypeRef.of(TestPrimitiveClass.class)); + + assertEquals(5.0d, testPrimitiveClass.doubleValue); + assertEquals(0, testPrimitiveClass.intValue); + assertEquals("value", testPrimitiveClass.stringValue); + } + + @Test + void testByteArray() { + var foo = new Foo(); + foo.bar = "test".getBytes(); + var jsonConverter = getConverter(); + Assertions.assertArrayEquals( + "test".getBytes(), + jsonConverter.fromJson(jsonConverter.toJson(foo), TypeRef.of(Foo.class)).bar); + } + + @Test + void testNullObject() { + var jsonConverter = getConverter(); + var forTestValue = + jsonConverter.fromJson( + "{\"foo\":\"fooValue\",\"bar\":500}", TypeRef.of(ForTestConvert.class)); + Assertions.assertEquals("fooValue", forTestValue.foo); + Assertions.assertEquals(500, forTestValue.bar); + + var exception = + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + jsonConverter.fromJson("{\"foo\":\"fooValue\"}", TypeRef.of(ForTestConvert.class))); + Assertions.assertEquals("$.bar can not be null", exception.getMessage()); + } - assertEquals(123, testFieldNameClass.actor); - assertEquals("notMatter", testFieldNameClass.apple); - assertNull(testFieldNameClass.banana); + @Test + void testNullNested() { + var jsonConverter = getConverter(); + var forTestNested = + jsonConverter.fromJson( + "{\"forTestValue\":{\"foo\":\"fooValue\",\"bar\":500}}", + TypeRef.of(ForTestNested.class)); + Assertions.assertEquals("fooValue", forTestNested.forTestValue.foo); + Assertions.assertEquals(500, forTestNested.forTestValue.bar); + + var exception = + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + jsonConverter.fromJson( + "{\"forTestValue\":{\"foo\":\"fooValue\"}}", TypeRef.of(ForTestNested.class))); + Assertions.assertEquals("$.forTestValue.bar can not be null", exception.getMessage()); + } + + @Test + void testNullList() { + var jsonConverter = getConverter(); + var forTestList = + jsonConverter.fromJson( + "{\"forTestValues\":[{\"foo\":\"fooValue\",\"bar\":500}]}", + TypeRef.of(ForTestList.class)); + Assertions.assertEquals("fooValue", forTestList.forTestValues.get(0).foo); + Assertions.assertEquals(500, forTestList.forTestValues.get(0).bar); + + var exception = + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + jsonConverter.fromJson( + "{\"forTestValues\":[{\"foo\":\"fooValue\"}]}", TypeRef.of(ForTestList.class))); + Assertions.assertEquals("$.forTestValues[0].bar can not be null", exception.getMessage()); + } + + @Test + void testNullMap() { + var jsonConverter = getConverter(); + var forTestMap = + jsonConverter.fromJson( + "{\"forTestValueMap\":{\"mapKey\":{\"foo\":\"fooValue\",\"bar\":500}}}", + TypeRef.of(ForTestMap.class)); + Assertions.assertEquals("fooValue", forTestMap.forTestValueMap.get("mapKey").foo); + Assertions.assertEquals(500, forTestMap.forTestValueMap.get("mapKey").bar); + + var exception = + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + jsonConverter.fromJson( + "{\"forTestValueMap\":{\"mapKey\":{\"foo\":\"fooValue\"}}}", + TypeRef.of(ForTestMap.class))); + Assertions.assertEquals("$.forTestValueMap.mapKey.bar can not be null", exception.getMessage()); + } + + @Test + void testNullOptional() { + var jsonConverter = getConverter(); + var forTestOptional = + jsonConverter.fromJson( + "{\"forTestValue\":{\"foo\":\"fooValue\",\"bar\":500}}", + TypeRef.of(ForTestOptional.class)); + Assertions.assertEquals("fooValue", forTestOptional.forTestValue.get().foo); + Assertions.assertEquals(500, forTestOptional.forTestValue.get().bar); + + var exception = + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + jsonConverter.fromJson( + "{\"forTestValue\":{\"foo\":\"fooValue\"}}", + TypeRef.of(ForTestOptional.class))); + Assertions.assertEquals("$.forTestValue.bar can not be null", exception.getMessage()); + } + + @Test + void testGenericDefault() { + var jsonConverter = getConverter(); + var forTestDefault = jsonConverter.fromJson("{}", TypeRef.of(ForTestDefault.class)); + + assertNotNull(forTestDefault.list); + assertNotNull(forTestDefault.map); + assertNotNull(forTestDefault.opt); + + forTestDefault = + jsonConverter.fromJson( + "{\"list\":null,\"map\":null,\"opt\":null}", TypeRef.of(ForTestDefault.class)); + assertNotNull(forTestDefault.list); + assertNotNull(forTestDefault.map); + assertNotNull(forTestDefault.opt); + } + + static class ForTestDefault { + Optional opt = Optional.empty(); + Map map = Map.of(); + List list = List.of(); + } + + static class ForTestOptional { + + Optional forTestValue = Optional.empty(); + } + + static class ForTestMap { + + Map forTestValueMap; + } + + static class ForTestList { + + List forTestValues; + } + + static class ForTestNested { + + ForTestConvert forTestValue; + } + + static class ForTestConvert { + String foo; + Integer bar; + } + + private static class Foo { + byte[] bar; } private static class V0 { @@ -249,14 +405,13 @@ private static class TestFieldNameClass { private static class TestOptionalClass { - private Optional optValue = Optional.empty(); - private Optional> nestedOpt = Optional.empty(); - /** - * if opt is not initialized with Optional.empty(), nonInitOpt will be null when nonInitOpt is - * not in json fields. + * if opt is not initialized with Optional.empty(), it will be null when nonInitOpt is not in + * json fields. */ - private Optional nonInitOpt; + private Optional optValue = Optional.empty(); + + private Optional> nestedOpt = Optional.empty(); } private static class TestNestedObjectClass { diff --git a/common/src/test/java/org/astraea/common/json/TypeRefTest.java b/common/src/test/java/org/astraea/common/json/TypeRefTest.java index 650ddb31bc..70e9ac6438 100644 --- a/common/src/test/java/org/astraea/common/json/TypeRefTest.java +++ b/common/src/test/java/org/astraea/common/json/TypeRefTest.java @@ -47,6 +47,11 @@ void testOf() { assertEquals("class java.lang.String", TypeRef.of(String.class).getType().toString()); } + @Test + void testBytes() { + assertEquals("class [B", TypeRef.bytes().getType().toString()); + } + @Test void testArray() { assertEquals( From d291d361a51cbfba804bd177fc943b7f2283b7ec Mon Sep 17 00:00:00 2001 From: geordie Date: Tue, 22 Nov 2022 23:22:27 +0800 Subject: [PATCH 2/4] revert serde --- .../org/astraea/app/web/RecordHandler.java | 23 ++++++++----------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/app/src/main/java/org/astraea/app/web/RecordHandler.java b/app/src/main/java/org/astraea/app/web/RecordHandler.java index 008d1977e9..c4abdf5b2c 100644 --- a/app/src/main/java/org/astraea/app/web/RecordHandler.java +++ b/app/src/main/java/org/astraea/app/web/RecordHandler.java @@ -20,6 +20,7 @@ import static java.util.stream.Collectors.toList; import java.time.Duration; +import java.util.Base64; import java.util.Collection; import java.util.List; import java.util.Optional; @@ -262,42 +263,39 @@ public CompletionStage delete(Channel channel) { enum SerDe implements EnumInfo { BYTEARRAY( (topic, value) -> - Optional.ofNullable(value) - .map(x -> JsonConverter.defaultConverter().fromJson(x, TypeRef.bytes())) - .orElse(null), + Optional.ofNullable(value).map(v -> Base64.getDecoder().decode(v)).orElse(null), Deserializer.BYTE_ARRAY), STRING( (topic, value) -> Optional.ofNullable(value) - .map(x -> JsonConverter.defaultConverter().fromJson(x, TypeRef.of(String.class))) - .map(v -> Serializer.STRING.serialize(topic, List.of(), v)) + .map(v -> Serializer.STRING.serialize(topic, List.of(), value)) .orElse(null), Deserializer.STRING), LONG( (topic, value) -> Optional.ofNullable(value) - .map(x -> JsonConverter.defaultConverter().fromJson(x, TypeRef.of(Long.class))) + .map(Long::parseLong) .map(longVal -> Serializer.LONG.serialize(topic, List.of(), longVal)) .orElse(null), Deserializer.LONG), INTEGER( (topic, value) -> Optional.ofNullable(value) - .map(x -> JsonConverter.defaultConverter().fromJson(x, TypeRef.of(Integer.class))) + .map(Integer::parseInt) .map(intVal -> Serializer.INTEGER.serialize(topic, List.of(), intVal)) .orElse(null), Deserializer.INTEGER), FLOAT( (topic, value) -> Optional.ofNullable(value) - .map(x -> JsonConverter.defaultConverter().fromJson(x, TypeRef.of(Float.class))) + .map(Float::parseFloat) .map(floatVal -> Serializer.FLOAT.serialize(topic, List.of(), floatVal)) .orElse(null), Deserializer.FLOAT), DOUBLE( (topic, value) -> Optional.ofNullable(value) - .map(x -> JsonConverter.defaultConverter().fromJson(x, TypeRef.of(Double.class))) + .map(Double::parseDouble) .map(doubleVal -> Serializer.DOUBLE.serialize(topic, List.of(), doubleVal)) .orElse(null), Deserializer.DOUBLE); @@ -337,12 +335,9 @@ private static org.astraea.common.producer.Record createRecord( var keySerializer = SerDe.ofAlias(postRecord.keySerializer).serializer; var valueSerializer = SerDe.ofAlias(postRecord.valueSerializer).serializer; postRecord.key.ifPresent( - key -> - builder.key(keySerializer.apply(topic, JsonConverter.defaultConverter().toJson(key)))); + key -> builder.key(keySerializer.apply(topic, PostRequest.handle(key)))); postRecord.value.ifPresent( - value -> - builder.value( - valueSerializer.apply(topic, JsonConverter.defaultConverter().toJson(value)))); + value -> builder.value(valueSerializer.apply(topic, PostRequest.handle(value)))); postRecord.timestamp.ifPresent(builder::timestamp); postRecord.partition.ifPresent(builder::partition); return builder.build(); From 95493ca4f024642c8a8400db1169bf2fc28adc2c Mon Sep 17 00:00:00 2001 From: geordie Date: Tue, 22 Nov 2022 23:57:36 +0800 Subject: [PATCH 3/4] merge main --- app/src/test/java/org/astraea/app/web/RecordHandlerTest.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/app/src/test/java/org/astraea/app/web/RecordHandlerTest.java b/app/src/test/java/org/astraea/app/web/RecordHandlerTest.java index 2cf4b1c46f..8aaed0854d 100644 --- a/app/src/test/java/org/astraea/app/web/RecordHandlerTest.java +++ b/app/src/test/java/org/astraea/app/web/RecordHandlerTest.java @@ -121,16 +121,13 @@ void testPostTimeout() { @Test void testPostRawString() { var topic = "testPostRawString"; - var currentTimestamp = System.currentTimeMillis(); - var response = Assertions.assertInstanceOf( RecordHandler.PostResponse.class, getRecordHandler() .post( Channel.ofRequest( - PostRequest.of( - "{\"records\":[{\"topic\":\"testPostRawString\", \"partition\":0,\"keySerializer\":\"string\",\"valueSerializer\":\"string\",\"key\":\"abc\",\"value\":\"abcd\"}]}"))) + "{\"records\":[{\"topic\":\"testPostRawString\", \"partition\":0,\"keySerializer\":\"string\",\"valueSerializer\":\"string\",\"key\":\"abc\",\"value\":\"abcd\"}]}")) .toCompletableFuture() .join()); From 18516250d0d714359e4e15d7cd55eff69535a03f Mon Sep 17 00:00:00 2001 From: geordie Date: Wed, 23 Nov 2022 18:39:51 +0800 Subject: [PATCH 4/4] add todo --- app/src/main/java/org/astraea/app/web/RecordHandler.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/app/src/main/java/org/astraea/app/web/RecordHandler.java b/app/src/main/java/org/astraea/app/web/RecordHandler.java index c4abdf5b2c..55764e6a63 100644 --- a/app/src/main/java/org/astraea/app/web/RecordHandler.java +++ b/app/src/main/java/org/astraea/app/web/RecordHandler.java @@ -335,9 +335,10 @@ private static org.astraea.common.producer.Record createRecord( var keySerializer = SerDe.ofAlias(postRecord.keySerializer).serializer; var valueSerializer = SerDe.ofAlias(postRecord.valueSerializer).serializer; postRecord.key.ifPresent( - key -> builder.key(keySerializer.apply(topic, PostRequest.handle(key)))); + // TODO: 2022-11-23 astraea-1163 key and value should be Json String or Json Number + key -> builder.key(keySerializer.apply(topic, key.toString()))); postRecord.value.ifPresent( - value -> builder.value(valueSerializer.apply(topic, PostRequest.handle(value)))); + value -> builder.value(valueSerializer.apply(topic, value.toString()))); postRecord.timestamp.ifPresent(builder::timestamp); postRecord.partition.ifPresent(builder::partition); return builder.build();