Skip to content

Commit

Permalink
Complete fixes after rebase.
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand committed May 29, 2023
1 parent b0dfb66 commit 07debe3
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,18 @@ public class OpenSearchScrollRequest implements OpenSearchRequest {
@EqualsAndHashCode.Exclude
@ToString.Exclude
private OpenSearchExprValueFactory exprValueFactory;

/**
* Scroll id which is set after first request issued. Because ElasticsearchClient is shared by
* multi-thread so this state has to be maintained here.
* Scroll id which is set after first request issued. Because OpenSearchClient is shared by
* multiple threads so this state has to be maintained here.
*/
@Setter
@Getter
private String scrollId = NO_SCROLL_ID;

public static final String NO_SCROLL_ID = "";

@EqualsAndHashCode.Exclude
private boolean needClean = true;

@Getter
Expand Down Expand Up @@ -164,16 +166,7 @@ public boolean hasAnotherBatch() {
public void writeTo(StreamOutput out) throws IOException {
initialSearchRequest.writeTo(out);
out.writeTimeValue(scrollTimeout);
out.writeBoolean(needClean);
if (!needClean) {
// If needClean is true, there is no more data to get from OpenSearch and scrollId is
// used only to clean up OpenSearch context.

out.writeString(scrollId);

//out.writeString(scrollId == null ? "" : scrollId);

}
out.writeString(scrollId);
out.writeStringCollection(includes);
indexName.writeTo(out);
}
Expand All @@ -188,10 +181,7 @@ public OpenSearchScrollRequest(StreamInput in, OpenSearchStorageEngine engine)
throws IOException {
initialSearchRequest = new SearchRequest(in);
scrollTimeout = in.readTimeValue();
needClean = in.readBoolean();
if (!needClean) {
scrollId = in.readString();
}
scrollId = in.readString();
includes = in.readStringList();
indexName = new IndexName(in);
OpenSearchIndex index = (OpenSearchIndex) engine.getTable(null, indexName.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import lombok.SneakyThrows;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.lucene.search.TotalHits;
import org.junit.jupiter.api.DisplayNameGeneration;
import org.junit.jupiter.api.DisplayNameGenerator;
Expand Down Expand Up @@ -186,9 +187,11 @@ void search_withoutIncludes() {
OpenSearchResponse searchResponse = request.search(searchAction, scrollAction);
assertFalse(searchResponse.isEmpty());
}
/*

@Test
@SneakyThrows
void hasAnotherBatch() {
FieldUtils.writeField(request, "needClean", false, true);
request.setScrollId("scroll123");
assertTrue(request.hasAnotherBatch());

Expand All @@ -198,7 +201,6 @@ void hasAnotherBatch() {
request.setScrollId("");
assertFalse(request.hasAnotherBatch());
}
}*/

@Test
void clean_on_empty_response() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.stubbing.Answer;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.bytes.BytesArray;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.opensearch.sql.ast.expression.DataType;
Expand Down Expand Up @@ -114,14 +116,21 @@ void serialize() {
var request = new OpenSearchScrollRequest(
INDEX_NAME, CURSOR_KEEP_ALIVE, searchSourceBuilder, factory);
request.setScrollId("valid-id");
// make a response, so OpenSearchResponse::isEmpty would return true and unset needClean
var response = mock(SearchResponse.class);
when(response.getAggregations()).thenReturn(mock());
var hits = mock(SearchHits.class);
when(response.getHits()).thenReturn(hits);
when(response.getScrollId()).thenReturn("valid-id");
when(hits.getHits()).thenReturn(new SearchHit[]{ mock() });
request.search(null, (req) -> response);

try (var indexScan = new OpenSearchIndexScan(client, QUERY_SIZE, request)) {
var planSerializer = new PlanSerializer(engine);
var cursor = planSerializer.convertToCursor(indexScan);
var newPlan = planSerializer.convertToPlan(cursor.toString());
assertEquals(indexScan, newPlan);
}

}

@Test
Expand Down

0 comments on commit 07debe3

Please sign in to comment.