Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

astraea-974 Implement RecordHandler post request #1156

Merged
merged 5 commits into from
Nov 23, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 37 additions & 2 deletions app/src/main/java/org/astraea/app/web/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.stream.Collectors;
import org.astraea.common.EnumInfo;
import org.astraea.common.Utils;
import org.astraea.common.json.JsonConverter;
import org.astraea.common.json.TypeRef;

interface Channel {

Expand All @@ -50,15 +52,21 @@ static Channel ofQueries(Map<String, String> queries) {
return builder().type(Type.GET).queries(queries).build();
}

@Deprecated
static Channel ofRequest(PostRequest request) {
return builder().type(Type.POST).request(request).build();
}

static Channel ofRequest(String json) {
return builder().type(Type.POST).request(json).build();
}

class Builder {
private Type type = Type.UNKNOWN;
private Optional<String> target = Optional.empty();
private Map<String, String> queries = Map.of();
private PostRequest request = PostRequest.EMPTY;
private Optional<String> body = Optional.empty();
@Deprecated private PostRequest request = PostRequest.EMPTY;
private Consumer<Response> sender = r -> {};

private Builder() {}
Expand All @@ -82,10 +90,17 @@ public Builder queries(Map<String, String> queries) {
return this;
}

@Deprecated
public Builder request(Map<String, Object> request) {
return request(PostRequest.of(request));
}

public Builder request(String json) {
this.body = Optional.ofNullable(json);
return this;
}

@Deprecated
public Builder request(PostRequest request) {
this.request = request;
return this;
Expand Down Expand Up @@ -113,6 +128,12 @@ public PostRequest request() {
return request;
}

@Override
public <T> T request(TypeRef<T> typeRef) {
var json = body.orElse("{}");
return JsonConverter.defaultConverter().fromJson(json, typeRef);
}

@Override
public Map<String, String> queries() {
return queries;
Expand Down Expand Up @@ -176,13 +197,20 @@ static Channel of(HttpExchange exchange) {
.collect(Collectors.toMap(p -> p.split("=")[0], p -> p.split("=")[1]));
};

Function<InputStream, PostRequest> parseRequest =
Function<InputStream, PostRequest> parsePostRequest =
stream -> {
var bs = Utils.packException(stream::readAllBytes);
if (bs == null || bs.length == 0) return PostRequest.EMPTY;
return PostRequest.of(new String(bs, StandardCharsets.UTF_8));
};

Function<InputStream, String> parseRequest =
stream -> {
var bs = Utils.packException(stream::readAllBytes);
if (bs == null || bs.length == 0) return null;
return new String(bs, StandardCharsets.UTF_8);
};

Function<String, Type> parseType =
name -> {
switch (name.toUpperCase(Locale.ROOT)) {
Expand All @@ -202,6 +230,7 @@ static Channel of(HttpExchange exchange) {
.type(parseType.apply(exchange.getRequestMethod()))
.target(parseTarget.apply(exchange.getRequestURI()))
.queries(parseQueries.apply(exchange.getRequestURI()))
.request(parsePostRequest.apply(exchange.getRequestBody()))
.request(parseRequest.apply(exchange.getRequestBody()))
.sender(
response -> {
Expand Down Expand Up @@ -235,8 +264,14 @@ static Channel of(HttpExchange exchange) {
/**
* @return body request
*/
@Deprecated
PostRequest request();

/**
* @return body request
*/
<T> T request(TypeRef<T> typeRef);

/**
* @return the queries appended to URL
*/
Expand Down
1 change: 1 addition & 0 deletions app/src/main/java/org/astraea/app/web/PostRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Optional;
import java.util.stream.Collectors;

@Deprecated
public interface PostRequest {

PostRequest EMPTY = PostRequest.of(Map.of());
Expand Down
138 changes: 66 additions & 72 deletions app/src/main/java/org/astraea/app/web/RecordHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,6 @@
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;

import com.google.gson.GsonBuilder;
import com.google.gson.JsonDeserializationContext;
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import com.google.gson.JsonParseException;
import com.google.gson.JsonPrimitive;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.Base64;
import java.util.Collection;
Expand All @@ -40,6 +31,7 @@
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.astraea.app.web.Request.RequestObject;
import org.astraea.common.Cache;
import org.astraea.common.EnumInfo;
import org.astraea.common.FutureUtils;
Expand All @@ -53,15 +45,14 @@
import org.astraea.common.consumer.Deserializer;
import org.astraea.common.consumer.SeekStrategy;
import org.astraea.common.consumer.SubscribedConsumer;
import org.astraea.common.json.JsonConverter;
import org.astraea.common.json.TypeRef;
import org.astraea.common.producer.Producer;
import org.astraea.common.producer.ProducerConfigs;
import org.astraea.common.producer.Serializer;

public class RecordHandler implements Handler {
static final String RECORDS = "records";
static final String TRANSACTION_ID = "transactionId";
static final String PARTITION = "partition";
static final String ASYNC = "async";
static final String DISTANCE_FROM_LATEST = "distanceFromLatest";
static final String DISTANCE_FROM_BEGINNING = "distanceFromBeginning";
static final String SEEK_TO = "seekTo";
Expand Down Expand Up @@ -179,19 +170,14 @@ GetResponse get(Consumer<byte[], byte[]> consumer, int limit, Duration timeout)

@Override
public CompletionStage<Response> post(Channel channel) {
var async = channel.request().getBoolean(ASYNC).orElse(false);
var timeout =
channel.request().get(TIMEOUT).map(Utils::toDuration).orElse(Duration.ofSeconds(5));
var records = channel.request().values(RECORDS, PostRecord.class);
var postRequest = channel.request(TypeRef.of(RecordPostRequest.class));

var records = postRequest.records();
if (records.isEmpty())
throw new IllegalArgumentException("records should contain at least one record");

var producer =
channel
.request()
.get(TRANSACTION_ID)
.map(transactionalProducerCache::get)
.orElse(this.producer);
postRequest.transactionId().map(transactionalProducerCache::get).orElse(this.producer);

var result =
CompletableFuture.supplyAsync(
Expand Down Expand Up @@ -224,10 +210,13 @@ public CompletionStage<Response> post(Channel channel) {
.map(CompletionStage::toCompletableFuture)
.collect(toList())));

if (async) return CompletableFuture.completedFuture(Response.ACCEPT);
if (postRequest.async()) return CompletableFuture.completedFuture(Response.ACCEPT);
return CompletableFuture.completedFuture(
Utils.packException(
() -> new PostResponse(result.get(timeout.toNanos(), TimeUnit.NANOSECONDS))));
() ->
new PostResponse(
result.get(
Utils.toDuration(postRequest.timeout()).toNanos(), TimeUnit.NANOSECONDS))));
}

@Override
Expand Down Expand Up @@ -325,7 +314,9 @@ public String toString() {
return alias();
}

/** (topic, json) convert to bytes */
final BiFunction<String, String, byte[]> serializer;

final Deserializer<?> deserializer;

SerDe(BiFunction<String, String, byte[]> serializer, Deserializer<?> deserializer) {
Expand All @@ -336,30 +327,48 @@ public String toString() {

private static org.astraea.common.producer.Record<byte[], byte[]> createRecord(
Producer<byte[], byte[]> producer, PostRecord postRecord) {
var topic =
Optional.ofNullable(postRecord.topic)
.orElseThrow(() -> new IllegalArgumentException("topic must be set"));
var topic = postRecord.topic;
var builder = org.astraea.common.producer.Record.<byte[], byte[]>builder().topic(topic);

// TODO: Support headers
// (https://github.com/skiptests/astraea/issues/422)
var keySerializer =
Optional.ofNullable(postRecord.keySerializer)
.map(name -> SerDe.ofAlias(name).serializer)
.orElse(SerDe.STRING.serializer);
var valueSerializer =
Optional.ofNullable(postRecord.valueSerializer)
.map(name -> SerDe.ofAlias(name).serializer)
.orElse(SerDe.STRING.serializer);

Optional.ofNullable(postRecord.key)
.ifPresent(key -> builder.key(keySerializer.apply(topic, PostRequest.handle(key))));
Optional.ofNullable(postRecord.value)
.ifPresent(value -> builder.value(valueSerializer.apply(topic, PostRequest.handle(value))));
Optional.ofNullable(postRecord.timestamp).ifPresent(builder::timestamp);
Optional.ofNullable(postRecord.partition).ifPresent(builder::partition);
var keySerializer = SerDe.ofAlias(postRecord.keySerializer).serializer;
var valueSerializer = SerDe.ofAlias(postRecord.valueSerializer).serializer;
postRecord.key.ifPresent(
key -> builder.key(keySerializer.apply(topic, PostRequest.handle(key))));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這邊簡單一點,就直接呼叫toString取代PostRequest.handle,然後留個TODO,描述說要處理只能傳入 number and string,並且開好對應的議題

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

postRecord.value.ifPresent(
value -> builder.value(valueSerializer.apply(topic, PostRequest.handle(value))));
postRecord.timestamp.ifPresent(builder::timestamp);
postRecord.partition.ifPresent(builder::partition);
return builder.build();
}

static class RecordPostRequest implements Request {
private boolean async = false;
private String timeout = "5s";
private List<PostRecord> records = List.of();

private Optional<String> transactionId = Optional.empty();

public RecordPostRequest() {}

public boolean async() {
return async;
}

public String timeout() {
return timeout;
}

public List<PostRecord> records() {
return records;
}

public Optional<String> transactionId() {
return transactionId;
}
}

static class Metadata implements Response {
final String topic;
final int partition;
Expand Down Expand Up @@ -389,12 +398,7 @@ private GetResponse(Consumer<byte[], byte[]> consumer, Collection<Record> record

@Override
public String json() {
return new GsonBuilder()
// gson will do html escape by default (e.g. convert = to \u003d)
.disableHtmlEscaping()
.registerTypeHierarchyAdapter(byte[].class, new ByteArrayToBase64TypeAdapter())
.create()
.toJson(this);
return JsonConverter.defaultConverter().toJson(this);
}

@Override
Expand Down Expand Up @@ -445,26 +449,16 @@ static class Header {
}
}

static class ByteArrayToBase64TypeAdapter
implements JsonSerializer<byte[]>, JsonDeserializer<byte[]> {
public byte[] deserialize(JsonElement json, Type type, JsonDeserializationContext context)
throws JsonParseException {
return Base64.getDecoder().decode(json.getAsString());
}

public JsonElement serialize(byte[] src, Type type, JsonSerializationContext context) {
return new JsonPrimitive(Base64.getEncoder().encodeToString(src));
}
}
static class PostRecord implements RequestObject {
String topic;
Optional<Integer> partition = Optional.empty();
String keySerializer = "STRING";
String valueSerializer = "STRING";
Optional<Object> key = Optional.empty();
Optional<Object> value = Optional.empty();
Optional<Long> timestamp = Optional.empty();

static class PostRecord {
final String topic;
final Integer partition;
final String keySerializer;
final String valueSerializer;
final Object key;
final Object value;
final Long timestamp;
public PostRecord() {}

PostRecord(
String topic,
Expand All @@ -475,12 +469,12 @@ static class PostRecord {
Object value,
Long timestamp) {
this.topic = topic;
this.partition = partition;
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
this.key = key;
this.value = value;
this.timestamp = timestamp;
this.partition = Optional.ofNullable(partition);
this.keySerializer = Optional.ofNullable(keySerializer).orElse("STRING");
this.valueSerializer = Optional.ofNullable(valueSerializer).orElse("STRING");
this.key = Optional.ofNullable(key);
this.value = Optional.ofNullable(value);
this.timestamp = Optional.ofNullable(timestamp);
}
}

Expand Down
23 changes: 23 additions & 0 deletions app/src/main/java/org/astraea/app/web/Request.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.app.web;

public interface Request {

/** Indicates the object used in the request */
interface RequestObject {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不好意思,我不太確定這個介面的用途是什麼?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

主要想解決 之前提過的一個問題

在Request 中的field 是 generic 的 Object 的話 會拿不到他的Class
所以沒辦法做一些testing 上的驗證

像是以下

static class RecordPostRequest implements Request {
   List<PostRecord> records = List.of();
}

這裡直接在那些PostRecord上面加上這個interface 來做驗證

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不好意思,我最近記憶力有點薄弱,我講錯的話就請提醒我一下

現在的驗證是驗證一個「建構出來的物件」,因此 records 這個變數我們就是驗證它是否為 null 以及 它內涵的物件(而非generic type)是否有符合規範。

條列的地來說,一個 Request / Response 物件只能有三種 members

  1. primitive type
  2. Map optional collection
  3. 其他 Request

因此在我的想像中,PostRecord 應該也是繼承 Request 就好

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我先合併這隻PR,後面接著微調,以免擋著太多

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

瞭解
PostRecord 也是可以繼承 Request 的
只是原本定義的 Request 是user 會傳上來的 json 物件
所以才用別的interface 來代表是Request 中會使用到的其他物件

}
Loading