Skip to content

Commit

Permalink
Adjust the CloudImport method to align with the logic of SDKs in othe…
Browse files Browse the repository at this point in the history
…r languages (#1099)

Signed-off-by: lentitude2tk <[email protected]>
  • Loading branch information
lentitude2tk authored Oct 14, 2024
1 parent 63b27f2 commit c9b566c
Show file tree
Hide file tree
Showing 12 changed files with 78 additions and 392 deletions.
66 changes: 37 additions & 29 deletions examples/main/java/io/milvus/v1/BulkWriterExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import com.google.gson.JsonObject;
import com.google.gson.reflect.TypeToken;
import io.milvus.bulkwriter.BulkWriter;
import io.milvus.bulkwriter.CloudImportV2;
import io.milvus.bulkwriter.CloudImport;
import io.milvus.bulkwriter.LocalBulkWriter;
import io.milvus.bulkwriter.LocalBulkWriterParam;
import io.milvus.bulkwriter.RemoteBulkWriter;
Expand All @@ -40,12 +40,9 @@
import io.milvus.bulkwriter.connect.AzureConnectParam;
import io.milvus.bulkwriter.connect.S3ConnectParam;
import io.milvus.bulkwriter.connect.StorageConnectParam;
import io.milvus.bulkwriter.request.v2.BulkImportV2Request;
import io.milvus.bulkwriter.request.v2.GetImportProgressV2Request;
import io.milvus.bulkwriter.request.v2.ListImportJobsV2Request;
import io.milvus.bulkwriter.response.v2.BulkImportV2Response;
import io.milvus.bulkwriter.response.v2.GetImportProgressV2Response;
import io.milvus.bulkwriter.response.v2.ListImportJobsV2Response;
import io.milvus.bulkwriter.request.BulkImportRequest;
import io.milvus.bulkwriter.request.GetImportProgressRequest;
import io.milvus.bulkwriter.request.ListImportJobsRequest;
import io.milvus.client.MilvusClient;
import io.milvus.client.MilvusServiceClient;
import io.milvus.common.utils.ExceptionUtils;
Expand Down Expand Up @@ -81,7 +78,6 @@

import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand Down Expand Up @@ -561,7 +557,7 @@ private void callBulkInsert(CollectionSchemaParam collectionSchema, List<List<St
System.out.println("Collection row number: " + getCollectionStatistics());
}

private void callCloudImport(List<List<String>> batchFiles, String collectionName, String partitionName) throws InterruptedException, MalformedURLException {
private void callCloudImport(List<List<String>> batchFiles, String collectionName, String partitionName) throws InterruptedException {
System.out.println("\n===================== call cloudImport ====================");

String objectUrl = StorageConsts.cloudStorage == CloudStorage.AZURE
Expand All @@ -570,28 +566,34 @@ private void callCloudImport(List<List<String>> batchFiles, String collectionNam
String accessKey = StorageConsts.cloudStorage == CloudStorage.AZURE ? StorageConsts.AZURE_ACCOUNT_NAME : StorageConsts.STORAGE_ACCESS_KEY;
String secretKey = StorageConsts.cloudStorage == CloudStorage.AZURE ? StorageConsts.AZURE_ACCOUNT_KEY : StorageConsts.STORAGE_SECRET_KEY;

BulkImportV2Request bulkImportRequest = BulkImportV2Request.builder()
BulkImportRequest bulkImportRequest = BulkImportRequest.builder()
.objectUrl(objectUrl).accessKey(accessKey).secretKey(secretKey)
.clusterId(CloudImportConsts.CLUSTER_ID).collectionName(collectionName).partitionName(partitionName)
.build();
BulkImportV2Response bulkImportResponse = CloudImportV2.createImportJobs(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, bulkImportRequest);
String jobId = bulkImportResponse.getJobId();
String bulkImportResult = CloudImport.bulkImport(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, bulkImportRequest);
JsonObject bulkImportObject = convertDataMap(bulkImportResult);

String jobId = bulkImportObject.getAsJsonObject("data").get("jobId").getAsString();
System.out.println("Create a cloudImport job, job id: " + jobId);

while (true) {
System.out.println("Wait 5 second to check bulkInsert job state...");
TimeUnit.SECONDS.sleep(5);

GetImportProgressV2Request request = GetImportProgressV2Request.builder().clusterId(CloudImportConsts.CLUSTER_ID).jobId(jobId).build();
GetImportProgressV2Response getImportProgressResponse = CloudImportV2.getImportJobProgress(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, request);
if ("Completed".equals(getImportProgressResponse.getState())) {
GetImportProgressRequest request = GetImportProgressRequest.builder().clusterId(CloudImportConsts.CLUSTER_ID).jobId(jobId).build();
String getImportProgressResult = CloudImport.getImportProgress(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, request);
JsonObject getImportProgressObject = convertDataMap(getImportProgressResult);
String importProgressState = getImportProgressObject.getAsJsonObject("data").get("state").getAsString();
String reason = getImportProgressObject.getAsJsonObject("data").get("reason").getAsString();
String progress = getImportProgressObject.getAsJsonObject("data").get("progress").getAsString();
if ("Completed".equals(importProgressState)) {
System.out.printf("The job %s completed%n", jobId);
break;
} else if (StringUtils.isNotEmpty(getImportProgressResponse.getReason())) {
System.out.printf("The job %s failed or canceled, reason: %s%n", jobId, getImportProgressResponse.getReason());
} else if (StringUtils.isNotEmpty(reason)) {
System.out.printf("The job %s failed or canceled, reason: %s%n", jobId, reason);
break;
} else {
System.out.printf("The job %s is running, progress:%s%n", jobId, getImportProgressResponse.getProgress());
System.out.printf("The job %s is running, progress:%s%n", jobId, progress);
}
}

Expand Down Expand Up @@ -768,25 +770,27 @@ private Long getCollectionStatistics() {
return wrapper.getRowCount();
}

private static void exampleCloudImport() throws MalformedURLException {
private static void exampleCloudImport() {
System.out.println("\n===================== import files to cloud vectordb ====================");
BulkImportV2Request request = BulkImportV2Request.builder()
BulkImportRequest request = BulkImportRequest.builder()
.objectUrl(CloudImportConsts.OBJECT_URL).accessKey(CloudImportConsts.OBJECT_ACCESS_KEY).secretKey(CloudImportConsts.OBJECT_SECRET_KEY)
.clusterId(CloudImportConsts.CLUSTER_ID).collectionName(CloudImportConsts.COLLECTION_NAME).partitionName(CloudImportConsts.PARTITION_NAME)
.build();
BulkImportV2Response bulkImportResponse = CloudImportV2.createImportJobs(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, request);
System.out.println(GSON_INSTANCE.toJson(bulkImportResponse));
String bulkImportResult = CloudImport.bulkImport(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, request);
System.out.println(bulkImportResult);

System.out.println("\n===================== get import job progress ====================");
String jobId = bulkImportResponse.getJobId();
GetImportProgressV2Request getImportProgressRequest = GetImportProgressV2Request.builder().clusterId(CloudImportConsts.CLUSTER_ID).jobId(jobId).build();
GetImportProgressV2Response getImportProgressResponse = CloudImportV2.getImportJobProgress(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, getImportProgressRequest);
System.out.println(GSON_INSTANCE.toJson(getImportProgressResponse));

JsonObject bulkImportObject = convertDataMap(bulkImportResult);
String jobId = bulkImportObject.getAsJsonObject("data").get("jobId").getAsString();
GetImportProgressRequest getImportProgressRequest = GetImportProgressRequest.builder().clusterId(CloudImportConsts.CLUSTER_ID).jobId(jobId).build();
String getImportProgressResult = CloudImport.getImportProgress(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, getImportProgressRequest);
System.out.println(getImportProgressResult);

System.out.println("\n===================== list import jobs ====================");
ListImportJobsV2Request listImportJobsRequest = ListImportJobsV2Request.builder().clusterId(CloudImportConsts.CLUSTER_ID).currentPage(1).pageSize(10).build();
ListImportJobsV2Response listImportJobsResponse = CloudImportV2.listImportJobs(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, listImportJobsRequest);
System.out.println(GSON_INSTANCE.toJson(listImportJobsResponse));
ListImportJobsRequest listImportJobsRequest = ListImportJobsRequest.builder().clusterId(CloudImportConsts.CLUSTER_ID).currentPage(1).pageSize(10).build();
String listImportJobsResult = CloudImport.listImportJobs(CloudImportConsts.CLOUD_ENDPOINT, CloudImportConsts.API_KEY, listImportJobsRequest);
System.out.println(listImportJobsResult);
}

private CollectionSchemaParam buildSimpleSchema() {
Expand Down Expand Up @@ -968,4 +972,8 @@ private void checkMilvusClientIfExist() {
throw new RuntimeException(msg);
}
}

private static JsonObject convertDataMap(String result) {
return GSON_INSTANCE.fromJson(result, JsonObject.class);
}
}
23 changes: 2 additions & 21 deletions src/main/java/io/milvus/bulkwriter/BaseCloudImport.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package io.milvus.bulkwriter;

import com.google.gson.Gson;
import io.milvus.bulkwriter.response.RestfulResponse;
import io.milvus.common.utils.ExceptionUtils;
import kong.unirest.Unirest;
Expand All @@ -28,16 +27,14 @@
import java.util.Map;

public class BaseCloudImport {
private static final Gson GSON_INSTANCE = new Gson();

protected static String postRequest(String url, String apiKey, Map<String, Object> params, int timeout) {
try {
kong.unirest.HttpResponse<String> response = Unirest.post(url)
.connectTimeout(timeout)
.headers(httpHeaders(apiKey))
.body(params).asString();
if (response.getStatus() != 200) {
ExceptionUtils.throwUnExpectedException(String.format("Failed to post url: %s, status code: %s", url, response.getStatus()));
ExceptionUtils.throwUnExpectedException(String.format("Failed to post url: %s, status code: %s, msg: %s", url, response.getStatus(), response.getBody()));
} else {
return response.getBody();
}
Expand All @@ -54,7 +51,7 @@ protected static String getRequest(String url, String apiKey, Map<String, Object
.headers(httpHeaders(apiKey))
.queryString(params).asString();
if (response.getStatus() != 200) {
ExceptionUtils.throwUnExpectedException(String.format("Failed to get url: %s, status code: %s", url, response.getStatus()));
ExceptionUtils.throwUnExpectedException(String.format("Failed to get url: %s, status code: %s, msg: %s", url, response.getStatus(), response.getBody()));
} else {
return response.getBody();
}
Expand Down Expand Up @@ -85,20 +82,4 @@ protected static void handleResponse(String url, RestfulResponse res) {
ExceptionUtils.throwUnExpectedException(String.format("Failed to request url: %s, code: %s, message: %s", url, innerCode, innerMessage));
}
}

protected static String convertToV2ControlBaseURL(String url) {
/**
* Compatible with the original v1 API format: https://controller.api.{region-name}.cloud.zilliz.com
* Unified overseas domain for control center calls to reduce user perception
*/
if (url.endsWith(".com")) {
return "https://api.cloud.zilliz.com";
}

/**
* Compatible with the original v1 API format: https://controller.api.{region-name}.cloud.zilliz.com.cn
* Unified domestic domain for control center calls to reduce user perception
*/
return "https://api.cloud.zilliz.com.cn";
}
}
55 changes: 16 additions & 39 deletions src/main/java/io/milvus/bulkwriter/CloudImport.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,67 +21,44 @@

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import io.milvus.bulkwriter.response.BulkImportResponse;
import io.milvus.bulkwriter.response.GetImportProgressResponse;
import io.milvus.bulkwriter.response.ListImportJobsResponse;
import io.milvus.bulkwriter.request.BulkImportRequest;
import io.milvus.bulkwriter.request.GetImportProgressRequest;
import io.milvus.bulkwriter.request.ListImportJobsRequest;
import io.milvus.bulkwriter.response.RestfulResponse;
import io.milvus.bulkwriter.response.v2.GetImportProgressV2Response;
import io.milvus.bulkwriter.response.v2.ListImportJobsV2Response;

import java.net.MalformedURLException;
import java.util.HashMap;
import java.util.Map;

@Deprecated
// use CloudImportV2 replace
public class CloudImport extends BaseCloudImport {
private static final Gson GSON_INSTANCE = new Gson();

public static BulkImportResponse bulkImport(String url, String apiKey, String objectUrl,
String accessKey, String secretKey, String clusterId, String collectionName) throws MalformedURLException {
url = convertToV2ControlBaseURL(url);
public static String bulkImport(String url, String apiKey, BulkImportRequest request) {
String requestURL = url + "/v2/vectordb/jobs/import/create";

Map<String, Object> params = new HashMap<>();
params.put("objectUrl", objectUrl);
params.put("accessKey", accessKey);
params.put("secretKey", secretKey);
params.put("clusterId", clusterId);
params.put("collectionName", collectionName);

Map<String, Object> params = GSON_INSTANCE.fromJson(GSON_INSTANCE.toJson(request), new TypeToken<Map<String, Object>>() {}.getType());
String body = postRequest(requestURL, apiKey, params, 60 * 1000);
RestfulResponse<BulkImportResponse> response = GSON_INSTANCE.fromJson(body, new TypeToken<RestfulResponse<BulkImportResponse>>(){}.getType());
RestfulResponse<Object> response = GSON_INSTANCE.fromJson(body, new TypeToken<RestfulResponse<Object>>(){}.getType());
handleResponse(requestURL, response);
return response.getData();
return body;
}

public static GetImportProgressResponse getImportProgress(String url, String apiKey, String jobId, String clusterId) throws MalformedURLException {
url = convertToV2ControlBaseURL(url);
String requestURL = url + "/v2/vectordb/jobs/import/getProgress";

Map<String, Object> params = new HashMap<>();
params.put("clusterId", clusterId);
params.put("jobId", jobId);
public static String getImportProgress(String url, String apiKey, GetImportProgressRequest request) {
String requestURL = url + "/v2/vectordb/jobs/import/describe";

Map<String, Object> params = GSON_INSTANCE.fromJson(GSON_INSTANCE.toJson(request), new TypeToken<Map<String, Object>>() {}.getType());
String body = postRequest(requestURL, apiKey, params, 60 * 1000);
RestfulResponse<GetImportProgressV2Response> response = GSON_INSTANCE.fromJson(body, new TypeToken<RestfulResponse<GetImportProgressV2Response>>(){}.getType());
RestfulResponse<Object> response = GSON_INSTANCE.fromJson(body, new TypeToken<RestfulResponse<Object>>(){}.getType());
handleResponse(requestURL, response);
return response.getData().toGetImportProgressResponse();
return body;
}

public static ListImportJobsResponse listImportJobs(String url, String apiKey, String clusterId, int pageSize, int currentPage) throws MalformedURLException {
url = convertToV2ControlBaseURL(url);
public static String listImportJobs(String url, String apiKey, ListImportJobsRequest request) {
String requestURL = url + "/v2/vectordb/jobs/import/list";

Map<String, Object> params = new HashMap<>();
params.put("clusterId", clusterId);
params.put("pageSize", pageSize);
params.put("currentPage", currentPage);

Map<String, Object> params = GSON_INSTANCE.fromJson(GSON_INSTANCE.toJson(request), new TypeToken<Map<String, Object>>() {}.getType());
String body = postRequest(requestURL, apiKey, params, 60 * 1000);
RestfulResponse<ListImportJobsV2Response> response = GSON_INSTANCE.fromJson(body, new TypeToken<RestfulResponse<ListImportJobsV2Response>>(){}.getType());
RestfulResponse<Object> response = GSON_INSTANCE.fromJson(body, new TypeToken<RestfulResponse<Object>>(){}.getType());
handleResponse(requestURL, response);
return response.getData().toListImportJobsResponse();
return body;
}

}
69 changes: 0 additions & 69 deletions src/main/java/io/milvus/bulkwriter/CloudImportV2.java

This file was deleted.

Loading

0 comments on commit c9b566c

Please sign in to comment.