Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add missing Java async client implementation #331

Merged
merged 4 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/io/weaviate/client/WeaviateClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public WeaviateClient(Config config, HttpClient httpClient, AccessTokenProvider
}

public WeaviateAsyncClient async() {
return new WeaviateAsyncClient(config);
return new WeaviateAsyncClient(config, tokenProvider);
}

public Misc misc() {
Expand Down
15 changes: 11 additions & 4 deletions src/main/java/io/weaviate/client/v1/async/WeaviateAsyncClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.weaviate.client.base.http.async.AsyncHttpClient;
import io.weaviate.client.base.util.DbVersionProvider;
import io.weaviate.client.base.util.DbVersionSupport;
import io.weaviate.client.base.util.GrpcVersionSupport;
import io.weaviate.client.v1.async.backup.Backup;
import io.weaviate.client.v1.async.batch.Batch;
import io.weaviate.client.v1.async.classifications.Classifications;
Expand All @@ -13,6 +14,7 @@
import io.weaviate.client.v1.async.graphql.GraphQL;
import io.weaviate.client.v1.async.misc.Misc;
import io.weaviate.client.v1.async.schema.Schema;
import io.weaviate.client.v1.auth.provider.AccessTokenProvider;
import io.weaviate.client.v1.misc.model.Meta;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
Expand All @@ -23,30 +25,35 @@ public class WeaviateAsyncClient implements AutoCloseable {
private final Config config;
private final CloseableHttpAsyncClient client;
private final DbVersionSupport dbVersionSupport;
private final GrpcVersionSupport grpcVersionSupport;
private final AccessTokenProvider tokenProvider;

public WeaviateAsyncClient(Config config) {
public WeaviateAsyncClient(Config config, AccessTokenProvider tokenProvider) {
this.config = config;
this.client = AsyncHttpClient.create(config);
// auto start the client
this.start();
// init the db version provider and get the version info
this.dbVersionSupport = new DbVersionSupport(initDbVersionProvider());
DbVersionProvider dbVersionProvider = initDbVersionProvider();
this.dbVersionSupport = new DbVersionSupport(dbVersionProvider);
this.grpcVersionSupport = new GrpcVersionSupport(dbVersionProvider);
this.tokenProvider = tokenProvider;
}

public Misc misc() {
return new Misc(client, config);
}

public Schema schema() {
return new Schema(client, config);
return new Schema(client, config, dbVersionSupport);
}

public Data data() {
return new Data(client, config, dbVersionSupport);
}

public Batch batch() {
return new Batch(client, config, dbVersionSupport, data());
return new Batch(client, config, dbVersionSupport, grpcVersionSupport, tokenProvider, data());
}

public Cluster cluster() {
Expand Down
17 changes: 10 additions & 7 deletions src/main/java/io/weaviate/client/v1/async/batch/Batch.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import io.weaviate.client.Config;
import io.weaviate.client.base.util.BeaconPath;
import io.weaviate.client.base.util.DbVersionSupport;
import io.weaviate.client.base.util.GrpcVersionSupport;
import io.weaviate.client.v1.async.batch.api.ObjectsBatchDeleter;
import io.weaviate.client.v1.async.batch.api.ObjectsBatcher;
import io.weaviate.client.v1.async.data.Data;
import io.weaviate.client.v1.auth.provider.AccessTokenProvider;
import io.weaviate.client.v1.batch.api.ReferencePayloadBuilder;
import io.weaviate.client.v1.batch.util.ObjectsPath;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
Expand All @@ -16,12 +18,17 @@ public class Batch {
private final ObjectsPath objectsPath;
private final BeaconPath beaconPath;
private final Data data;
private final GrpcVersionSupport grpcVersionSupport;
private final AccessTokenProvider tokenProvider;

public Batch(CloseableHttpAsyncClient client, Config config, DbVersionSupport dbVersionSupport, Data data) {
public Batch(CloseableHttpAsyncClient client, Config config, DbVersionSupport dbVersionSupport,
GrpcVersionSupport grpcVersionSupport, AccessTokenProvider tokenProvider, Data data) {
this.client = client;
this.config = config;
this.objectsPath = new ObjectsPath();
this.beaconPath = new BeaconPath(dbVersionSupport);
this.grpcVersionSupport = grpcVersionSupport;
this.tokenProvider = tokenProvider;
this.data = data;
}

Expand All @@ -30,9 +37,7 @@ public ObjectsBatcher objectsBatcher() {
}

public ObjectsBatcher objectsBatcher(ObjectsBatcher.BatchRetriesConfig batchRetriesConfig) {
// TODO: add support for missing arguments
// return ObjectsBatcher.create(client, config, data, objectsPath, tokenProvider, grpcVersionSupport, batchRetriesConfig);
return ObjectsBatcher.create(client, config, data, objectsPath, null, null, batchRetriesConfig);
return ObjectsBatcher.create(client, config, data, objectsPath, tokenProvider, grpcVersionSupport, batchRetriesConfig);
}

public ObjectsBatcher objectsAutoBatcher() {
Expand All @@ -58,9 +63,7 @@ public ObjectsBatcher objectsAutoBatcher(ObjectsBatcher.AutoBatchConfig autoBatc

public ObjectsBatcher objectsAutoBatcher(ObjectsBatcher.BatchRetriesConfig batchRetriesConfig,
ObjectsBatcher.AutoBatchConfig autoBatchConfig) {
// TODO: add support for missing arguments
// return ObjectsBatcher.create(client, config, data, objectsPath, tokenProvider, grpcVersionSupport, batchRetriesConfig);
return ObjectsBatcher.createAuto(client, config, data, objectsPath, null, null, batchRetriesConfig, autoBatchConfig);
return ObjectsBatcher.createAuto(client, config, data, objectsPath, tokenProvider, grpcVersionSupport, batchRetriesConfig, autoBatchConfig);
}

public ObjectsBatchDeleter objectsBatchDeleter() {
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/weaviate/client/v1/async/misc/Misc.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ public OpenIDConfigGetter openIDConfigGetter() {
return new OpenIDConfigGetter(client, config);
}

// TODO:async add support for dbVersionProvider
public LiveChecker liveChecker() {
return new LiveChecker(client, config);
}

// TODO:async add support for dbVersionProvider
public ReadyChecker readyChecker() {
return new ReadyChecker(client, config);
}
Expand Down
13 changes: 8 additions & 5 deletions src/main/java/io/weaviate/client/v1/async/schema/Schema.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.weaviate.client.v1.async.schema;

import io.weaviate.client.Config;
import io.weaviate.client.base.util.DbVersionSupport;
import io.weaviate.client.v1.async.schema.api.ClassCreator;
import io.weaviate.client.v1.async.schema.api.ClassDeleter;
import io.weaviate.client.v1.async.schema.api.ClassExists;
Expand All @@ -12,17 +13,19 @@
import io.weaviate.client.v1.async.schema.api.ShardsGetter;
import io.weaviate.client.v1.async.schema.api.ShardsUpdater;
import io.weaviate.client.v1.async.schema.api.TenantsCreator;
//import io.weaviate.client.v1.async.schema.api.TenantsUpdater;
import io.weaviate.client.v1.async.schema.api.TenantsGetter;
import io.weaviate.client.v1.async.schema.api.TenantsUpdater;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;

public class Schema {
private final CloseableHttpAsyncClient client;
private final Config config;
private final DbVersionSupport dbVersionSupport;

public Schema(CloseableHttpAsyncClient client, Config config) {
public Schema(CloseableHttpAsyncClient client, Config config, DbVersionSupport dbVersionSupport) {
this.client = client;
this.config = config;
this.dbVersionSupport = dbVersionSupport;
}

public SchemaGetter getter() {
Expand Down Expand Up @@ -69,9 +72,9 @@ public TenantsCreator tenantsCreator() {
return new TenantsCreator(client, config);
}

// public TenantsUpdater tenantsUpdater() {
// return new TenantsUpdater(client, config);
// }
public TenantsUpdater tenantsUpdater() {
return new TenantsUpdater(client, config, dbVersionSupport);
}

public TenantsGetter tenantsGetter() {
return new TenantsGetter(client, config);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package io.weaviate.client.v1.async.schema.api;

import io.weaviate.client.Config;
import io.weaviate.client.base.AsyncBaseClient;
import io.weaviate.client.base.AsyncClientResult;
import io.weaviate.client.base.Response;
import io.weaviate.client.base.Result;
import io.weaviate.client.base.http.async.ResponseParser;
import io.weaviate.client.base.util.DbVersionSupport;
import io.weaviate.client.base.util.UrlEncoder;
import io.weaviate.client.v1.schema.model.Tenant;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStatus;

public class TenantsUpdater extends AsyncBaseClient<Boolean> implements AsyncClientResult<Boolean> {

private final static int BATCH_SIZE = 100;
private final DbVersionSupport dbVersionSupport;
private String className;
private Tenant[] tenants;

public TenantsUpdater(CloseableHttpAsyncClient client, Config config, DbVersionSupport dbVersionSupport) {
super(client, config);
this.dbVersionSupport = dbVersionSupport;
}

public TenantsUpdater withClassName(String className) {
this.className = className;
return this;
}

public TenantsUpdater withTenants(Tenant... tenants) {
this.tenants = tenants;
return this;
}

@Override
public Future<Result<Boolean>> run(FutureCallback<Result<Boolean>> callback) {
if (dbVersionSupport.supportsOnly100TenantsInOneRequest() && tenants != null && tenants.length > BATCH_SIZE) {
CompletableFuture<Result<Boolean>> updateALl = CompletableFuture.supplyAsync(() -> chunkTenants(tenants, BATCH_SIZE)).thenApplyAsync(tenants -> {
for (List<Tenant> batch : tenants) {
try {
Result<Boolean> resp = updateTenants(batch.toArray(new Tenant[0]), null).get();
if (resp.hasErrors()) {
return resp;
}
} catch (InterruptedException | ExecutionException e) {
throw new CompletionException(e);
}
}
return new Result<>(200, true, null);
});
if (callback != null) {
return updateALl.whenComplete((booleanResult, e) -> {
callback.completed(booleanResult);
if (e != null) {
callback.failed(new Exception(e));
}
});
}
return updateALl;
}
return updateTenants(tenants, callback);
}

private Future<Result<Boolean>> updateTenants(Tenant[] tenants, FutureCallback<Result<Boolean>> callback) {
String path = String.format("/schema/%s/tenants", UrlEncoder.encodePathParam(className));
return sendPutRequest(path, tenants, callback, new ResponseParser<Boolean>() {
@Override
public Result<Boolean> parse(HttpResponse response, String body, ContentType contentType) {
Response<Tenant[]> resp = serializer.toResponse(response.getCode(), body, Tenant[].class);
return new Result<>(resp.getStatusCode(), resp.getStatusCode() == HttpStatus.SC_OK, resp.getErrors());
}
});
}

private Collection<List<Tenant>> chunkTenants(Tenant[] tenants, int chunkSize) {
AtomicInteger counter = new AtomicInteger();
return Stream.of(tenants).collect(Collectors.groupingBy(it -> counter.getAndIncrement() / chunkSize)).values();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package io.weaviate.integration.client.async.schema;

import io.weaviate.client.Config;
import io.weaviate.client.WeaviateClient;
import io.weaviate.client.base.Result;
import io.weaviate.client.v1.async.WeaviateAsyncClient;
import io.weaviate.client.v1.schema.model.ActivityStatus;
import io.weaviate.client.v1.schema.model.Tenant;
import io.weaviate.integration.client.AssertMultiTenancy;
import io.weaviate.integration.client.WeaviateDockerCompose;
import io.weaviate.integration.client.WeaviateTestGenerics;
import static io.weaviate.integration.client.WeaviateTestGenerics.TENANT_1;
import static io.weaviate.integration.client.WeaviateTestGenerics.TENANT_2;
import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import static org.assertj.core.api.Assertions.assertThat;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;

public class ClientSchemaMultiTenancyTest {
private WeaviateClient client;
private WeaviateTestGenerics testGenerics;
private AssertMultiTenancy assertMT;

@ClassRule
public static WeaviateDockerCompose compose = new WeaviateDockerCompose();

@Before
public void before() {
String httpHost = compose.getHttpHostAddress();
Config config = new Config("http", httpHost);

client = new WeaviateClient(config);
testGenerics = new WeaviateTestGenerics();
assertMT = new AssertMultiTenancy(client);
}

@After
public void after() {
testGenerics.cleanupWeaviate(client);
}

@Test
public void shouldUpdateTenantsOfMTClass() throws ExecutionException, InterruptedException {
Tenant[] tenants = new Tenant[]{TENANT_1, TENANT_2};
testGenerics.createSchemaPizzaForTenants(client);
testGenerics.createTenantsPizza(client, tenants);

try (WeaviateAsyncClient asyncClient = client.async()) {
Result<Boolean> updateResult = asyncClient.schema().tenantsUpdater()
.withClassName("Pizza")
.withTenants(Arrays.stream(tenants)
.map(tenant -> Tenant.builder().name(tenant.getName()).activityStatus(ActivityStatus.COLD).build())
.toArray(Tenant[]::new))
.run().get();

assertThat(updateResult).isNotNull()
.returns(false, Result::hasErrors)
.returns(true, Result::getResult);
}
}
}
Loading