Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug in get datasource API and improve memory usage #313

Merged
merged 1 commit into from
May 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@

package org.opensearch.geospatial.ip2geo.action;

import java.util.Collections;
import java.util.List;

import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.common.inject.Inject;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

Expand Down Expand Up @@ -57,7 +60,8 @@ private boolean shouldGetAllDatasource(final GetDatasourceRequest request) {
return request.getNames().length == 0 || (request.getNames().length == 1 && "_all".equals(request.getNames()[0]));
}

private ActionListener<List<Datasource>> newActionListener(final ActionListener<GetDatasourceResponse> listener) {
@VisibleForTesting
protected ActionListener<List<Datasource>> newActionListener(final ActionListener<GetDatasourceResponse> listener) {
return new ActionListener<>() {
@Override
public void onResponse(final List<Datasource> datasources) {
Expand All @@ -66,6 +70,10 @@ public void onResponse(final List<Datasource> datasources) {

@Override
public void onFailure(final Exception e) {
if (e instanceof IndexNotFoundException) {
listener.onResponse(new GetDatasourceResponse(Collections.emptyList()));
return;
}
listener.onFailure(e);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
Expand All @@ -35,6 +37,7 @@
import org.opensearch.OpenSearchException;
import org.opensearch.SpecialPermission;
import org.opensearch.action.ActionListener;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
Expand All @@ -51,8 +54,10 @@
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.shared.Constants;
import org.opensearch.geospatial.shared.StashedThreadContext;
Expand Down Expand Up @@ -187,7 +192,7 @@ protected CSVParser internalGetDatabaseReader(final DatasourceManifest manifest,
}

/**
* Create a document in json string format to ingest in datasource database index
* Create a document to ingest in datasource database index
*
* It assumes the first field as ip_range. The rest is added under data field.
*
Expand All @@ -204,31 +209,23 @@ protected CSVParser internalGetDatabaseReader(final DatasourceManifest manifest,
* @param fields a list of field name
* @param values a list of values
* @return Document in json string format
* @throws IOException the exception
*/
public String createDocument(final String[] fields, final String[] values) {
public XContentBuilder createDocument(final String[] fields, final String[] values) throws IOException {
if (fields.length != values.length) {
throw new OpenSearchException("header[{}] and record[{}] length does not match", fields, values);
}
StringBuilder sb = new StringBuilder();
sb.append("{\"");
sb.append(IP_RANGE_FIELD_NAME);
sb.append("\":\"");
sb.append(values[0]);
sb.append("\",\"");
sb.append(DATA_FIELD_NAME);
sb.append("\":{");
XContentBuilder builder = XContentFactory.jsonBuilder();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please add a comment with example of the json doc that we're building here

Copy link
Collaborator Author

@heemin32 heemin32 May 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already a comment on top.

 /**
     * Create a document to ingest in datasource database index
     *
     * It assumes the first field as ip_range. The rest is added under data field.
     *
     * Document example
     * {
     *   "_cidr":"1.0.0.1/25",
     *   "_data":{
     *       "country": "USA",
     *       "city": "Seattle",
     *       "location":"13.23,42.12"
     *   }
     * }
     *
     * @param fields a list of field name
     * @param values a list of values
     * @return Document in json string format
     * @throws IOException the exception
     */

builder.startObject();
builder.field(IP_RANGE_FIELD_NAME, values[0]);
builder.startObject(DATA_FIELD_NAME);
for (int i = 1; i < fields.length; i++) {
if (i != 1) {
sb.append(",");
}
sb.append("\"");
sb.append(fields[i]);
sb.append("\":\"");
sb.append(values[i]);
sb.append("\"");
builder.field(fields[i], values[i]);
}
sb.append("}}");
return sb.toString();
builder.endObject();
builder.endObject();
builder.close();
return builder;
}

/**
Expand Down Expand Up @@ -368,14 +365,20 @@ public void putGeoIpData(
@NonNull final Iterator<CSVRecord> iterator,
final int bulkSize,
@NonNull final Runnable renewLock
) {
) throws IOException {
TimeValue timeout = clusterSettings.get(Ip2GeoSettings.TIMEOUT);
final BulkRequest bulkRequest = new BulkRequest();
Queue<DocWriteRequest> requests = new LinkedList<>();
for (int i = 0; i < bulkSize; i++) {
requests.add(Requests.indexRequest(indexName));
}
while (iterator.hasNext()) {
CSVRecord record = iterator.next();
String document = createDocument(fields, record.values());
IndexRequest request = Requests.indexRequest(indexName).source(document, XContentType.JSON);
bulkRequest.add(request);
XContentBuilder document = createDocument(fields, record.values());
IndexRequest indexRequest = (IndexRequest) requests.poll();
indexRequest.source(document);
indexRequest.id(record.get(0));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this get by index safe, or we should check for size first?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is safe. If size is wrong, it will fail in createDocument

bulkRequest.add(indexRequest);
if (iterator.hasNext() == false || bulkRequest.requests().size() == bulkSize) {
BulkResponse response = StashedThreadContext.run(client, () -> client.bulk(bulkRequest).actionGet(timeout));
if (response.hasFailures()) {
Expand All @@ -385,6 +388,7 @@ public void putGeoIpData(
response.buildFailureMessage()
);
}
requests.addAll(bulkRequest.requests());
bulkRequest.requests().clear();
}
renewLock.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService;
import org.opensearch.geospatial.ip2geo.processor.Ip2GeoProcessor;
import org.opensearch.geospatial.plugin.GeospatialPlugin;
import org.opensearch.ingest.IngestMetadata;
import org.opensearch.ingest.IngestService;
import org.opensearch.jobscheduler.spi.LockModel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,20 @@

package org.opensearch.geospatial.ip2geo.action;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.opensearch.action.ActionListener;
import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.tasks.Task;

public class GetDatasourceTransportActionTests extends Ip2GeoTestCase {
Expand All @@ -27,7 +29,7 @@ public void init() {
action = new GetDatasourceTransportAction(transportService, actionFilters, datasourceFacade);
}

public void testDoExecute_whenAll_thenSucceed() throws Exception {
public void testDoExecute_whenAll_thenSucceed() {
Task task = mock(Task.class);
GetDatasourceRequest request = new GetDatasourceRequest(new String[] { "_all" });
ActionListener<GetDatasourceResponse> listener = mock(ActionListener.class);
Expand All @@ -36,22 +38,7 @@ public void testDoExecute_whenAll_thenSucceed() throws Exception {
action.doExecute(task, request, listener);

// Verify
ArgumentCaptor<ActionListener<List<Datasource>>> captor = ArgumentCaptor.forClass(ActionListener.class);
verify(datasourceFacade).getAllDatasources(captor.capture());

// Run
List<Datasource> datasources = Arrays.asList(randomDatasource(), randomDatasource());
captor.getValue().onResponse(datasources);

// Verify
verify(listener).onResponse(new GetDatasourceResponse(datasources));

// Run
RuntimeException exception = new RuntimeException();
captor.getValue().onFailure(exception);

// Verify
verify(listener).onFailure(exception);
verify(datasourceFacade).getAllDatasources(any(ActionListener.class));
}

public void testDoExecute_whenNames_thenSucceed() {
Expand All @@ -66,20 +53,37 @@ public void testDoExecute_whenNames_thenSucceed() {
action.doExecute(task, request, listener);

// Verify
ArgumentCaptor<ActionListener<List<Datasource>>> captor = ArgumentCaptor.forClass(ActionListener.class);
verify(datasourceFacade).getDatasources(eq(datasourceNames), captor.capture());
verify(datasourceFacade).getDatasources(eq(datasourceNames), any(ActionListener.class));
}

public void testNewActionListener_whenOnResponse_thenSucceed() {
List<Datasource> datasources = Arrays.asList(randomDatasource(), randomDatasource());
ActionListener<GetDatasourceResponse> actionListener = mock(ActionListener.class);

// Run
action.newActionListener(actionListener).onResponse(datasources);

// Verify
verify(actionListener).onResponse(new GetDatasourceResponse(datasources));
}

public void testNewActionListener_whenOnFailureWithNoSuchIndexException_thenEmptyDatasource() {
ActionListener<GetDatasourceResponse> actionListener = mock(ActionListener.class);

// Run
captor.getValue().onResponse(datasources);
action.newActionListener(actionListener).onFailure(new IndexNotFoundException("no index"));

// Verify
verify(listener).onResponse(new GetDatasourceResponse(datasources));
verify(actionListener).onResponse(new GetDatasourceResponse(Collections.emptyList()));
}

public void testNewActionListener_whenOnFailure_thenFails() {
ActionListener<GetDatasourceResponse> actionListener = mock(ActionListener.class);

// Run
RuntimeException exception = new RuntimeException();
captor.getValue().onFailure(exception);
action.newActionListener(actionListener).onFailure(new RuntimeException());

// Verify
verify(listener).onFailure(exception);
verify(actionListener).onFailure(any(RuntimeException.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.common.Randomness;
import org.opensearch.common.Strings;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please make sure if works with latest update from core, they start moving classes around opensearch-project/OpenSearch#7508

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. There is another PR to fix it.#314

import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.geospatial.GeospatialTestHelper;
Expand Down Expand Up @@ -103,12 +104,13 @@ public void testCreateIndexIfNotExistsWithoutExistingIndex() {
verifyingGeoIpDataFacade.createIndexIfNotExists(index);
}

@SneakyThrows
public void testCreateDocument() {
String[] names = { "ip", "country", "city" };
String[] values = { "1.0.0.0/25", "USA", "Seattle" };
assertEquals(
"{\"_cidr\":\"1.0.0.0/25\",\"_data\":{\"country\":\"USA\",\"city\":\"Seattle\"}}",
noOpsGeoIpDataFacade.createDocument(names, values)
Strings.toString(noOpsGeoIpDataFacade.createDocument(names, values))
);
}

Expand Down