Skip to content

Commit

Permalink
Add support for typed-key arrays, refactor and add tests (elastic#125)
Browse files Browse the repository at this point in the history
  • Loading branch information
swallez committed Jan 24, 2022
1 parent c869daf commit 8bc1f21
Show file tree
Hide file tree
Showing 7 changed files with 393 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import jakarta.json.stream.JsonParser;
import jakarta.json.stream.JsonParsingException;

import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;

Expand Down Expand Up @@ -57,13 +59,13 @@ public Deserializer(Map<String, JsonpDeserializer<? extends Member>> deserialize
/**
* Deserialize a union value, given its type.
*/
public Union deserialize(String type, JsonParser parser, JsonpMapper mapper) {
public Union deserialize(String type, JsonParser parser, JsonpMapper mapper, Event event) {
JsonpDeserializer<? extends Member> deserializer = deserializers.get(type);
if (deserializer == null) {
throw new JsonParsingException("Unknown variant type '" + type + "'", parser.getLocation());
}

return unionCtor.apply(type, deserializer.deserialize(parser, mapper));
return unionCtor.apply(type, deserializer.deserialize(parser, mapper, event));
}

/**
Expand Down Expand Up @@ -104,10 +106,44 @@ public void deserializeEntry(String key, JsonParser parser, JsonpMapper mapper,
String type = key.substring(0, hashPos);
String name = key.substring(hashPos + 1);

targetMap.put(name, deserializer.deserialize(type, parser, mapper));
targetMap.put(name, deserializer.deserialize(type, parser, mapper, parser.next()));
}
}

static <T extends TaggedUnion<?, ?>> JsonpDeserializer<Map<String, List<T>>> arrayMapDeserializer(
TypedKeysDeserializer<T> deserializer
) {
return JsonpDeserializer.of(
EnumSet.of(Event.START_OBJECT),
(parser, mapper, event) -> {
Map<String, List<T>> result = new HashMap<>();
while ((event = parser.next()) != Event.END_OBJECT) {
JsonpUtils.expectEvent(parser, event, Event.KEY_NAME);
// Split key and type
String key = parser.getString();
int hashPos = key.indexOf('#');
if (hashPos == -1) {
throw new JsonParsingException(
"Property name '" + key + "' is not in the 'type#name' format. Make sure the request has 'typed_keys' set.",
parser.getLocation()
);
}

String type = key.substring(0, hashPos);
String name = key.substring(hashPos + 1);

List<T> list = new ArrayList<>();
JsonpUtils.expectNextEvent(parser, Event.START_ARRAY);
while ((event = parser.next()) != Event.END_ARRAY) {
list.add(deserializer.deserializer.deserialize(type, parser, mapper, event));
}
result.put(name, list);
}
return result;
}
);
}

/**
* Serialize an externally tagged union using the typed keys encoding.
*/
Expand All @@ -119,6 +155,26 @@ public void deserializeEntry(String key, JsonParser parser, JsonpMapper mapper,
generator.writeEnd();
}

static <T extends JsonpSerializable & TaggedUnion<? extends JsonEnum, ?>> void serializeTypedKeysArray(
Map<String, List<T>> map, JsonGenerator generator, JsonpMapper mapper
) {
generator.writeStartObject();
for (Map.Entry<String, List<T>> entry: map.entrySet()) {
List<T> list = entry.getValue();
if (list.isEmpty()) {
continue; // We can't know the kind, skip this entry
}

generator.writeKey(list.get(0)._kind().jsonValue() + "#" + entry.getKey());
generator.writeStartArray();
for (T value: list) {
value.serialize(generator, mapper);
}
generator.writeEnd();
}
generator.writeEnd();
}

/**
* Serialize an externally tagged union using the typed keys encoding, without the enclosing start/end object.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package co.elastic.clients.elasticsearch;

import co.elastic.clients.json.JsonpMapper;
import co.elastic.clients.json.jsonb.JsonbJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.testcontainers.elasticsearch.ElasticsearchContainer;

import java.time.Duration;

public class ElasticsearchTestServer implements AutoCloseable {

private volatile ElasticsearchContainer container;
private int port;
private final JsonpMapper mapper = new JsonbJsonpMapper();
private RestClient restClient;
private ElasticsearchTransport transport;
private ElasticsearchClient client;

private static ElasticsearchTestServer global;

public static synchronized ElasticsearchTestServer global() {
if (global == null) {
System.out.println("Starting global ES test server.");
global = new ElasticsearchTestServer();
global.setup();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Stopping global ES test server.");
global.close();
}));
}
return global;
}

private synchronized void setup() {
container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.16.2")
.withEnv("ES_JAVA_OPTS", "-Xms256m -Xmx256m")
.withEnv("path.repo", "/tmp") // for snapshots
.withStartupTimeout(Duration.ofSeconds(30))
.withPassword("changeme");
container.start();
port = container.getMappedPort(9200);

BasicCredentialsProvider credsProv = new BasicCredentialsProvider();
credsProv.setCredentials(
AuthScope.ANY, new UsernamePasswordCredentials("elastic", "changeme")
);
restClient = RestClient.builder(new HttpHost("localhost", port))
.setHttpClientConfigCallback(hc -> hc.setDefaultCredentialsProvider(credsProv))
.build();
transport = new RestClientTransport(restClient, mapper);
client = new ElasticsearchClient(transport);
}

@Override
public void close() {
if (this == global) {
// Closed with a shutdown hook
return;
}

if (container != null) {
container.stop();
}
container = null;
}

public int port() {
return port;
}

public RestClient restClient() {
return restClient;
}

public ElasticsearchTransport transport() {
return transport;
}

public JsonpMapper mapper() {
return mapper;
}

public ElasticsearchClient client() {
return client;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.ElasticsearchTestServer;
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import co.elastic.clients.elasticsearch._types.Refresh;
import co.elastic.clients.elasticsearch._types.aggregations.HistogramAggregate;
Expand All @@ -42,26 +43,12 @@
import co.elastic.clients.elasticsearch.indices.GetMappingResponse;
import co.elastic.clients.elasticsearch.indices.IndexState;
import co.elastic.clients.elasticsearch.model.ModelTestCase;
import co.elastic.clients.elasticsearch.snapshot.CreateRepositoryResponse;
import co.elastic.clients.elasticsearch.snapshot.CreateSnapshotResponse;
import co.elastic.clients.json.JsonpMapper;
import co.elastic.clients.json.jsonb.JsonbJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.endpoints.BooleanResponse;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.testcontainers.elasticsearch.ElasticsearchContainer;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand All @@ -70,38 +57,11 @@

public class RequestTest extends Assert {

private static ElasticsearchContainer container;
private static final JsonpMapper mapper = new JsonbJsonpMapper();
private static RestClient restClient;
private static ElasticsearchTransport transport;
private static ElasticsearchClient client;
static ElasticsearchClient client;

@BeforeClass
public static void setup() {
container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.16.2")
.withEnv("ES_JAVA_OPTS", "-Xms256m -Xmx256m")
.withEnv("path.repo", "/tmp") // for snapshots
.withStartupTimeout(Duration.ofSeconds(30))
.withPassword("changeme");
container.start();
int port = container.getMappedPort(9200);

BasicCredentialsProvider credsProv = new BasicCredentialsProvider();
credsProv.setCredentials(
AuthScope.ANY, new UsernamePasswordCredentials("elastic", "changeme")
);
restClient = RestClient.builder(new HttpHost("localhost", port))
.setHttpClientConfigCallback(hc -> hc.setDefaultCredentialsProvider(credsProv))
.build();
transport = new RestClientTransport(restClient, mapper);
client = new ElasticsearchClient(transport);
}

@AfterClass
public static void tearDown() {
if (container != null) {
container.stop();
}
client = ElasticsearchTestServer.global().client();
}

@Test
Expand All @@ -112,7 +72,7 @@ public void testCount() throws Exception {

@Test
public void testIndexCreation() throws Exception {
ElasticsearchAsyncClient asyncClient = new ElasticsearchAsyncClient(transport);
ElasticsearchAsyncClient asyncClient = new ElasticsearchAsyncClient(client._transport());

// Ping the server
assertTrue(client.ping().value());
Expand Down Expand Up @@ -222,7 +182,7 @@ public void testDataIngestion() throws Exception {
public void testCatRequest() throws IOException {
// Cat requests should have the "format=json" added by the transport
NodesResponse nodes = client.cat().nodes(_0 -> _0);
System.out.println(ModelTestCase.toJson(nodes, mapper));
System.out.println(ModelTestCase.toJson(nodes, client._transport().jsonpMapper()));

assertEquals(1, nodes.valueBody().size());
assertEquals("*", nodes.valueBody().get(0).master());
Expand All @@ -247,15 +207,25 @@ public void testBulkRequest() throws IOException {
.id("def")
.document(appData)
))
.operations(_1 -> _1
.update(_2 -> _2
.index("foo")
.id("gh")
.action(_3 -> _3
.docAsUpsert(true)
.doc(appData))
)
)
);

assertFalse(bulk.errors());
assertEquals(2, bulk.items().size());
assertEquals(3, bulk.items().size());
assertEquals(OperationType.Create, bulk.items().get(0).operationType());
assertEquals("foo", bulk.items().get(0).index());
assertEquals(1L, bulk.items().get(0).version().longValue());
assertEquals("foo", bulk.items().get(1).index());
assertEquals(1L, bulk.items().get(1).version().longValue());
assertEquals(42, client.get(b -> b.index("foo").id("gh"), AppData.class).source().intValue);
}

@Test
Expand Down Expand Up @@ -291,7 +261,7 @@ public void testRefresh() throws IOException {


ExecutionException ee = assertThrows(ExecutionException.class, () -> {
ElasticsearchAsyncClient aClient = new ElasticsearchAsyncClient(transport);
ElasticsearchAsyncClient aClient = new ElasticsearchAsyncClient(client._transport());
GetResponse<String> response = aClient.get(
_0 -> _0.index("doesnotexist").id("reallynot"), String.class
).get();
Expand Down Expand Up @@ -398,30 +368,6 @@ public void testDefaultIndexSettings() throws IOException {
assertNull(settings.get(index).defaults());
}

@Test
public void testSnapshotCreation() throws IOException {
// https://github.com/elastic/elasticsearch-java/issues/74
// https://github.com/elastic/elasticsearch/issues/82358

CreateRepositoryResponse repo = client.snapshot().createRepository(b1 -> b1
.name("test")
.type("fs")
.settings(b2 -> b2
.location("/tmp/test-repo")
)
);

assertTrue(repo.acknowledged());

CreateSnapshotResponse snapshot = client.snapshot().create(b -> b
.repository("test")
.snapshot("1")
.waitForCompletion(true)
);

assertNotNull(snapshot.snapshot());
}

@Test
public void testValueBodyResponse() throws Exception {
DiskUsageResponse resp = client.indices().diskUsage(b -> b
Expand Down
Loading

0 comments on commit 8bc1f21

Please sign in to comment.