Skip to content

Commit

Permalink
Wait until GeoIP data to be replicated to all data nodes (#348)
Browse files Browse the repository at this point in the history
Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 committed Jul 21, 2023
1 parent 04e6d6d commit 90f3b7c
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.OpenSearchException;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.ip2geo.common.DatasourceManifest;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.dao.DatasourceDao;
Expand All @@ -30,6 +31,8 @@

@Log4j2
public class DatasourceUpdateService {
private static final int SLEEP_TIME_IN_MILLIS = 5000; // 5 seconds
private static final int MAX_WAIT_TIME_FOR_REPLICATION_TO_COMPLETE_IN_MILLIS = 10 * 60 * 60 * 1000; // 10 hours
private final ClusterService clusterService;
private final ClusterSettings clusterSettings;
private final DatasourceDao datasourceDao;
Expand Down Expand Up @@ -86,10 +89,36 @@ public void updateOrCreateGeoIpData(final Datasource datasource, final Runnable
geoIpDataDao.putGeoIpData(indexName, header, reader.iterator(), renewLock);
}

waitUntilAllShardsStarted(indexName, MAX_WAIT_TIME_FOR_REPLICATION_TO_COMPLETE_IN_MILLIS);
Instant endTime = Instant.now();
updateDatasourceAsSucceeded(indexName, datasource, manifest, fieldsToStore, startTime, endTime);
}

/**
* We wait until all shards are ready to serve search requests before updating datasource metadata to
* point to a new index so that there won't be latency degradation during GeoIP data update
*
* @param indexName the indexName
*/
@VisibleForTesting
protected void waitUntilAllShardsStarted(final String indexName, final int timeout) {
Instant start = Instant.now();
try {
while (Instant.now().toEpochMilli() - start.toEpochMilli() < timeout) {
if (clusterService.state().routingTable().allShards(indexName).stream().allMatch(shard -> shard.started())) {
return;
}
Thread.sleep(SLEEP_TIME_IN_MILLIS);
}
throw new OpenSearchException(
"index[{}] replication did not complete after {} millis",
MAX_WAIT_TIME_FOR_REPLICATION_TO_COMPLETE_IN_MILLIS
);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

/**
* Return header fields of geo data with given url of a manifest file
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ public abstract class Ip2GeoTestCase extends RestActionTestCase {
protected Ip2GeoLockService ip2GeoLockService;
@Mock
protected Ip2GeoProcessorDao ip2GeoProcessorDao;
@Mock
protected RoutingTable routingTable;
protected IngestMetadata ingestMetadata;
protected NoOpNodeClient client;
protected VerifyingClient verifyingClient;
Expand All @@ -119,7 +121,7 @@ public void prepareIp2GeoTestCase() {
when(clusterService.state()).thenReturn(clusterState);
when(clusterState.metadata()).thenReturn(metadata);
when(clusterState.getMetadata()).thenReturn(metadata);
when(clusterState.routingTable()).thenReturn(RoutingTable.EMPTY_ROUTING_TABLE);
when(clusterState.routingTable()).thenReturn(routingTable);
when(ip2GeoExecutor.forDatasourceUpdate()).thenReturn(OpenSearchExecutors.newDirectExecutorService());
when(ingestService.getClusterService()).thenReturn(clusterService);
when(threadPool.generic()).thenReturn(OpenSearchExecutors.newDirectExecutorService());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.geospatial.ip2geo.jobscheduler;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
Expand All @@ -28,6 +29,7 @@
import org.apache.commons.csv.CSVParser;
import org.junit.Before;
import org.opensearch.OpenSearchException;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.geospatial.GeospatialTestHelper;
import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase;
Expand Down Expand Up @@ -134,6 +136,9 @@ public void testUpdateOrCreateGeoIpData_whenValidInput_thenSucceed() {

File sampleFile = new File(this.getClass().getClassLoader().getResource("ip2geo/sample_valid.csv").getFile());
when(geoIpDataDao.getDatabaseReader(any())).thenReturn(CSVParser.parse(sampleFile, StandardCharsets.UTF_8, CSVFormat.RFC4180));
ShardRouting shardRouting = mock(ShardRouting.class);
when(shardRouting.started()).thenReturn(true);
when(routingTable.allShards(anyString())).thenReturn(Arrays.asList(shardRouting));

Datasource datasource = new Datasource();
datasource.setState(DatasourceState.AVAILABLE);
Expand All @@ -158,6 +163,34 @@ public void testUpdateOrCreateGeoIpData_whenValidInput_thenSucceed() {
verify(geoIpDataDao).putGeoIpData(eq(datasource.currentIndexName()), isA(String[].class), any(Iterator.class), any(Runnable.class));
}

public void testWaitUntilAllShardsStarted_whenTimedOut_thenThrowException() {
String indexName = GeospatialTestHelper.randomLowerCaseString();
ShardRouting shardRouting = mock(ShardRouting.class);
when(shardRouting.started()).thenReturn(false);
when(routingTable.allShards(indexName)).thenReturn(Arrays.asList(shardRouting));

// Run
Exception e = expectThrows(OpenSearchException.class, () -> datasourceUpdateService.waitUntilAllShardsStarted(indexName, 10));

// Verify
assertTrue(e.getMessage().contains("did not complete"));
}

@SneakyThrows
public void testWaitUntilAllShardsStarted_whenInterrupted_thenThrowException() {
String indexName = GeospatialTestHelper.randomLowerCaseString();
ShardRouting shardRouting = mock(ShardRouting.class);
when(shardRouting.started()).thenReturn(false);
when(routingTable.allShards(indexName)).thenReturn(Arrays.asList(shardRouting));

// Run
Thread.currentThread().interrupt();
Exception e = expectThrows(RuntimeException.class, () -> datasourceUpdateService.waitUntilAllShardsStarted(indexName, 10));

// Verify
assertEquals(InterruptedException.class, e.getCause().getClass());
}

@SneakyThrows
public void testGetHeaderFields_whenValidInput_thenReturnCorrectValue() {
File manifestFile = new File(this.getClass().getClassLoader().getResource("ip2geo/manifest.json").getFile());
Expand Down

0 comments on commit 90f3b7c

Please sign in to comment.