Skip to content

Commit

Permalink
Add reindex ITs
Browse files Browse the repository at this point in the history
Signed-off-by: zane-neo <[email protected]>
  • Loading branch information
zane-neo committed Oct 13, 2023
1 parent 04c1e05 commit 025c85e
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,60 @@ private String registerModelGroup() {
return modelGroupId;
}

protected void createIndexWithPipeline(String indexName, String indexMappingFileName, String pipelineName) throws Exception {
createIndexWithConfiguration(
indexName,
Files.readString(Path.of(classLoader.getResource("processor/" + indexMappingFileName).toURI())),
pipelineName
);
}

/**
* Ingest a document to index.
* @param indexName
* @param ingestDocument
* @throws Exception
*/
protected String ingestDocument(String indexName, String ingestDocument) throws Exception {
Response response = makeRequest(
client(),
"POST",
indexName + "/_doc?refresh",
null,
toHttpEntity(ingestDocument),
ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, "Kibana"))
);
Map<String, Object> map = XContentHelper.convertToMap(
XContentType.JSON.xContent(),
EntityUtils.toString(response.getEntity()),
false
);
return (String) map.get("result");
}

/**
* Reindex from one index to another
* @param fromIndexName
* @param toIndexName
* @throws Exception
*/
protected void reindex(String fromIndexName, String toIndexName) throws Exception {
Response response = makeRequest(
client(),
"POST",
"/_reindex?refresh",
null,
toHttpEntity("{\"source\":{\"index\":\""+ fromIndexName +"\"},\"dest\":{\"index\":\"" + toIndexName + "\"}}"),
ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, "Kibana"))
);
Map<String, Object> map = XContentHelper.convertToMap(
XContentType.JSON.xContent(),
EntityUtils.toString(response.getEntity()),
false
);
assertEquals(0, ((List)map.get("failures")).size());
}

/**
* Enumeration for types of pipeline processors, used to lookup resources like create
* processor request as those are type specific
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,31 @@

package org.opensearch.neuralsearch.processor;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;

import lombok.SneakyThrows;

import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.message.BasicHeader;
import org.junit.After;
import org.opensearch.client.Response;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.neuralsearch.common.BaseSparseEncodingIT;

import com.google.common.collect.ImmutableList;

public class SparseEncodingProcessIT extends BaseSparseEncodingIT {

private static final String INDEX_NAME = "sparse_encoding_index";

private static final String PIPELINE_NAME = "pipeline-sparse-encoding";

private static final String INGEST_DOCUMENT = "{\n"
+ " \"title\": \"This is a good day\",\n"
+ " \"description\": \"daily logging\",\n"
+ " \"favor_list\": [\n"
+ " \"test\",\n"
+ " \"hello\",\n"
+ " \"mock\"\n"
+ " ],\n"
+ " \"favorites\": {\n"
+ " \"game\": \"overwatch\",\n"
+ " \"movie\": null\n"
+ " }\n"
+ "}\n";

@After
@SneakyThrows
public void tearDown() {
Expand All @@ -42,47 +44,27 @@ public void tearDown() {
public void testSparseEncodingProcessor() throws Exception {
String modelId = prepareModel();
createPipelineProcessor(modelId, PIPELINE_NAME, ProcessorType.SPARSE_ENCODING);
createSparseEncodingIndex();
ingestDocument();
createIndexWithPipeline(INDEX_NAME, "SparseEncodingIndexMappings.json", PIPELINE_NAME);
String result = ingestDocument(INDEX_NAME, INGEST_DOCUMENT);
assertEquals("created", result);
assertEquals(1, getDocCount(INDEX_NAME));
}

private void createSparseEncodingIndex() throws Exception {
createIndexWithConfiguration(
INDEX_NAME,
Files.readString(Path.of(classLoader.getResource("processor/SparseEncodingIndexMappings.json").toURI())),
PIPELINE_NAME
);
public void testSparseEncodingProcessorWithReindex() throws Exception {
// create a simple index and indexing data into this index.
String fromIndexName = "test-reindex-from";
createIndexWithConfiguration(fromIndexName, "{ \"settings\": { \"number_of_shards\": 1, \"number_of_replicas\": 0 } }", null);
String result = ingestDocument(fromIndexName, "{ \"text\": \"hello world\" }");
assertEquals("created", result);
// create text embedding index for reindex
String modelId = prepareModel();
String toIndexName = "test-reindex-to";
String pipelineName = "pipeline-text-sparse-encoding";
createPipelineProcessor(modelId, pipelineName);
createIndexWithPipeline(toIndexName, "SparseEncodingIndexMappings.json", pipelineName);
reindex(fromIndexName, toIndexName);
assertEquals(1, getDocCount(toIndexName));
}

private void ingestDocument() throws Exception {
String ingestDocument = "{\n"
+ " \"title\": \"This is a good day\",\n"
+ " \"description\": \"daily logging\",\n"
+ " \"favor_list\": [\n"
+ " \"test\",\n"
+ " \"hello\",\n"
+ " \"mock\"\n"
+ " ],\n"
+ " \"favorites\": {\n"
+ " \"game\": \"overwatch\",\n"
+ " \"movie\": null\n"
+ " }\n"
+ "}\n";
Response response = makeRequest(
client(),
"POST",
INDEX_NAME + "/_doc?refresh",
null,
toHttpEntity(ingestDocument),
ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, "Kibana"))
);
Map<String, Object> map = XContentHelper.convertToMap(
XContentType.JSON.xContent(),
EntityUtils.toString(response.getEntity()),
false
);
assertEquals("created", map.get("result"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,33 @@

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;

import lombok.SneakyThrows;

import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.message.BasicHeader;
import org.junit.After;
import org.opensearch.client.Response;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.neuralsearch.common.BaseNeuralSearchIT;

import com.google.common.collect.ImmutableList;

public class TextEmbeddingProcessorIT extends BaseNeuralSearchIT {

private static final String INDEX_NAME = "text_embedding_index";

private static final String PIPELINE_NAME = "pipeline-hybrid";

private static final String TEXT_EMBEDDING_DOCUMENT = "{\n"
+ " \"title\": \"This is a good day\",\n"
+ " \"description\": \"daily logging\",\n"
+ " \"favor_list\": [\n"
+ " \"test\",\n"
+ " \"hello\",\n"
+ " \"mock\"\n"
+ " ],\n"
+ " \"favorites\": {\n"
+ " \"game\": \"overwatch\",\n"
+ " \"movie\": null\n"
+ " }\n"
+ "}\n";

@After
@SneakyThrows
public void tearDown() {
Expand All @@ -43,52 +49,32 @@ public void testTextEmbeddingProcessor() throws Exception {
String modelId = uploadTextEmbeddingModel();
loadModel(modelId);
createPipelineProcessor(modelId, PIPELINE_NAME);
createTextEmbeddingIndex();
ingestDocument();
createIndexWithPipeline(INDEX_NAME, "IndexMappings.json", PIPELINE_NAME);
String result = ingestDocument(INDEX_NAME, TEXT_EMBEDDING_DOCUMENT);
assertEquals("created", result);
assertEquals(1, getDocCount(INDEX_NAME));
}

public void testTextEmbeddingProcessorWithReindexOperation() throws Exception {
// create a simple index and indexing data into this index.
String fromIndexName = "test-reindex-from";
createIndexWithConfiguration(fromIndexName, "{ \"settings\": { \"number_of_shards\": 1, \"number_of_replicas\": 0 } }", null);
String result = ingestDocument(fromIndexName, "{ \"text\": \"hello world\" }");
assertEquals("created", result);
// create text embedding index for reindex
String modelId = uploadTextEmbeddingModel();
loadModel(modelId);
String toIndexName = "test-reindex-to";
String pipelineName = "pipeline-text-embedding";
createPipelineProcessor(modelId, pipelineName);
createIndexWithPipeline(toIndexName, "IndexMappings.json", pipelineName);
reindex(fromIndexName, toIndexName);
assertEquals(1, getDocCount(toIndexName));
}

private String uploadTextEmbeddingModel() throws Exception {
String requestBody = Files.readString(Path.of(classLoader.getResource("processor/UploadModelRequestBody.json").toURI()));
return uploadModel(requestBody);
}

private void createTextEmbeddingIndex() throws Exception {
createIndexWithConfiguration(
INDEX_NAME,
Files.readString(Path.of(classLoader.getResource("processor/IndexMappings.json").toURI())),
PIPELINE_NAME
);
}

private void ingestDocument() throws Exception {
String ingestDocument = "{\n"
+ " \"title\": \"This is a good day\",\n"
+ " \"description\": \"daily logging\",\n"
+ " \"favor_list\": [\n"
+ " \"test\",\n"
+ " \"hello\",\n"
+ " \"mock\"\n"
+ " ],\n"
+ " \"favorites\": {\n"
+ " \"game\": \"overwatch\",\n"
+ " \"movie\": null\n"
+ " }\n"
+ "}\n";
Response response = makeRequest(
client(),
"POST",
INDEX_NAME + "/_doc?refresh",
null,
toHttpEntity(ingestDocument),
ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, "Kibana"))
);
Map<String, Object> map = XContentHelper.convertToMap(
XContentType.JSON.xContent(),
EntityUtils.toString(response.getEntity()),
false
);
assertEquals("created", map.get("result"));
}

}

0 comments on commit 025c85e

Please sign in to comment.