diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java index f15a0e11..291e1087 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java @@ -18,7 +18,6 @@ import org.apache.commons.csv.CSVParser; import org.apache.commons.csv.CSVRecord; -import org.opensearch.OpenSearchException; import org.opensearch.ResourceAlreadyExistsException; import org.opensearch.action.ActionListener; import org.opensearch.action.DocWriteRequest; @@ -123,17 +122,16 @@ public void onResponse(final IndexResponse indexResponse) { @Override public void onFailure(final Exception e) { if (e instanceof VersionConflictEngineException) { - log.info("Datasource already exists {}", request.getDatasourceName(), e); - listener.onFailure(new ResourceAlreadyExistsException("Datasource already exists")); + listener.onFailure( + new ResourceAlreadyExistsException("datasource [{}] already exists", request.getDatasourceName()) + ); } else { - log.error("Failed to create a datasource {}", request.getDatasourceName(), e); - listener.onFailure(new OpenSearchException("Failed to create a datasource")); + listener.onFailure(e); } } }); } catch (Exception e) { - log.error("Error occurred while creating datasource {}", request.getDatasourceName(), e); - listener.onFailure(new OpenSearchException("Failed to create a datasource")); + listener.onFailure(e); } } diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceManifest.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceManifest.java index 4067758d..0c65382f 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceManifest.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceManifest.java @@ -9,6 +9,7 @@ package org.opensearch.geospatial.ip2geo.common; import java.io.BufferedReader; +import java.io.IOException; import java.io.InputStreamReader; import java.net.URL; import java.nio.CharBuffer; @@ -19,7 +20,6 @@ import lombok.Getter; import lombok.Setter; -import org.opensearch.OpenSearchException; import org.opensearch.SpecialPermission; import org.opensearch.common.SuppressForbidden; import org.opensearch.common.xcontent.json.JsonXContent; @@ -112,10 +112,9 @@ public static class Builder { * * @param url url to downloads a manifest file * @return DatasourceManifest representing the manifest file - * @throws Exception exception */ @SuppressForbidden(reason = "Need to connect to http endpoint to read manifest file") - public static DatasourceManifest build(final URL url) throws Exception { + public static DatasourceManifest build(final URL url) { SpecialPermission.check(); return AccessController.doPrivileged((PrivilegedAction) () -> { try (BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream()))) { @@ -128,8 +127,8 @@ public static DatasourceManifest build(final URL url) throws Exception { charBuffer.toString() ); return PARSER.parse(parser, null); - } catch (Exception e) { - throw new OpenSearchException("Failed to build manifest", e); + } catch (IOException e) { + throw new RuntimeException(e); } }); } diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataHelper.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataHelper.java index ec99287a..e7bd7366 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataHelper.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataHelper.java @@ -35,7 +35,6 @@ import org.opensearch.SpecialPermission; import org.opensearch.action.ActionListener; import org.opensearch.action.admin.indices.create.CreateIndexRequest; -import org.opensearch.action.admin.indices.create.CreateIndexResponse; import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.bulk.BulkResponse; import org.opensearch.action.index.IndexRequest; @@ -70,14 +69,13 @@ public class GeoIpDataHelper { * @param client client * @param indexName index name * @param timeout timeout - * @throws IOException io exception */ public static void createIndexIfNotExists( final ClusterService clusterService, final Client client, final String indexName, final TimeValue timeout - ) throws IOException { + ) { if (clusterService.state().metadata().hasIndex(indexName) == true) { log.info("Index {} already exist", indexName); return; @@ -86,7 +84,7 @@ public static void createIndexIfNotExists( indexSettings.put(INDEX_SETTING_NUM_OF_SHARDS.v1(), INDEX_SETTING_NUM_OF_SHARDS.v2()); indexSettings.put(INDEX_SETTING_AUTO_EXPAND_REPLICAS.v1(), INDEX_SETTING_AUTO_EXPAND_REPLICAS.v2()); final CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings(indexSettings).mapping(getIndexMapping()); - CreateIndexResponse response = client.admin().indices().create(createIndexRequest).actionGet(timeout); + client.admin().indices().create(createIndexRequest).actionGet(timeout); } /** @@ -111,8 +109,8 @@ private static String getIndexMapping() { return reader.lines().collect(Collectors.joining()); } } - } catch (Exception e) { - throw new IllegalArgumentException("Ip2Geo datasource mapping cannot be read correctly."); + } catch (IOException e) { + throw new RuntimeException(e); } } @@ -140,8 +138,11 @@ public static CSVParser getDatabaseReader(final DatasourceManifest manifest) { } catch (IOException e) { throw new RuntimeException(e); } - log.error("ZIP file {} does not have database file {}", manifest.getUrl(), manifest.getDbName()); - throw new RuntimeException("ZIP file does not have database file"); + throw new OpenSearchException( + "database file [{}] does not exist in the zip file [{}]", + manifest.getDbName(), + manifest.getUrl() + ); }); } @@ -332,25 +333,20 @@ public static void putGeoData( ) { BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); while (iterator.hasNext()) { - try { - CSVRecord record = iterator.next(); - String document = createDocument(fields, record.values()); - IndexRequest request = Requests.indexRequest(indexName).source(document, XContentType.JSON); - bulkRequest.add(request); - if (!iterator.hasNext() || bulkRequest.requests().size() == bulkSize) { - BulkResponse response = client.bulk(bulkRequest).actionGet(timeout); - if (response.hasFailures()) { - log.error( - "Error occurred while ingesting GeoIP data in {} with an error {}", - indexName, - response.buildFailureMessage() - ); - throw new OpenSearchException("Error occurred while ingesting GeoIP data"); - } - bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); + CSVRecord record = iterator.next(); + String document = createDocument(fields, record.values()); + IndexRequest request = Requests.indexRequest(indexName).source(document, XContentType.JSON); + bulkRequest.add(request); + if (!iterator.hasNext() || bulkRequest.requests().size() == bulkSize) { + BulkResponse response = client.bulk(bulkRequest).actionGet(timeout); + if (response.hasFailures()) { + throw new OpenSearchException( + "error occurred while ingesting GeoIP data in {} with an error {}", + indexName, + response.buildFailureMessage() + ); } - } catch (Exception e) { - throw e; + bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); } } client.admin().indices().prepareRefresh(indexName).execute().actionGet(timeout); diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java index ca2aaf35..c408ff3b 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java @@ -20,6 +20,7 @@ import org.apache.commons.csv.CSVParser; import org.apache.commons.csv.CSVRecord; +import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.support.IndicesOptions; import org.opensearch.client.Client; @@ -95,13 +96,13 @@ public void initialize(final ClusterService clusterService, final ThreadPool thr @Override public void runJob(final ScheduledJobParameter jobParameter, final JobExecutionContext context) { if (initialized == false) { - throw new AssertionError("This instance is not initialized"); + throw new AssertionError("this instance is not initialized"); } log.info("Update job started for a datasource[{}]", jobParameter.getName()); if (jobParameter instanceof Datasource == false) { throw new IllegalStateException( - "Job parameter is not instance of DatasourceUpdateJobParameter, type: " + jobParameter.getClass().getCanonicalName() + "job parameter is not instance of DatasourceUpdateJobParameter, type: " + jobParameter.getClass().getCanonicalName() ); } @@ -125,28 +126,29 @@ private Runnable updateDatasourceRunner(final ScheduledJobParameter jobParameter if (lock == null) { return; } - Datasource datasource = DatasourceHelper.getDatasource(client, jobParameter.getName(), timeout); - if (datasource == null) { - log.info("Datasource[{}] is already deleted", jobParameter.getName()); - } try { - deleteUnusedIndices(datasource); - updateDatasource(datasource); - deleteUnusedIndices(datasource); - } catch (Exception e) { - log.error("Failed to update datasource for {}", datasource.getId(), e); - datasource.getUpdateStats().setLastFailedAt(Instant.now()); - DatasourceHelper.updateDatasource(client, datasource, timeout); + Datasource datasource = DatasourceHelper.getDatasource(client, jobParameter.getName(), timeout); + if (datasource == null) { + log.info("Datasource[{}] is already deleted", jobParameter.getName()); + return; + } + + try { + deleteUnusedIndices(datasource); + updateDatasource(datasource); + deleteUnusedIndices(datasource); + } catch (Exception e) { + log.error("Failed to update datasource for {}", datasource.getId(), e); + datasource.getUpdateStats().setLastFailedAt(Instant.now()); + DatasourceHelper.updateDatasource(client, datasource, timeout); + } } finally { lockService.release( lock, - ActionListener.wrap( - released -> { log.info("Released lock for job {}", datasource.getId()); }, - exception -> { throw new IllegalStateException("Failed to release lock."); } - ) + ActionListener.wrap(released -> {}, exception -> { log.error("Failed to release lock [{}]", lock); }) ); } - }, exception -> { throw new IllegalStateException("Failed to acquire lock."); })); + }, exception -> { log.error("Failed to acquire lock for job [{}]", jobParameter.getName()); })); } }; @@ -180,10 +182,10 @@ private void deleteUnusedIndices(final Datasource parameter) { .isAcknowledged()) { deletedIndices.add(index); } else { - log.error("Failed to delete an index {}", index); + log.error("Failed to delete an index [{}]", index); } } catch (Exception e) { - log.error("Failed to delete an index {}", index, e); + log.error("Failed to delete an index [{}]", index, e); } } if (!deletedIndices.isEmpty()) { @@ -230,15 +232,21 @@ private void updateDatasource(final Datasource jobParameter) throws Exception { Iterator iter = reader.iterator(); fields = iter.next().values(); if (!jobParameter.getDatabase().getFields().equals(Arrays.asList(fields))) { - log.error("The previous fields and new fields does not match."); - log.error("Previous: {}, New: {}", jobParameter.getDatabase().getFields().toString(), Arrays.asList(fields).toString()); - throw new IllegalStateException("Fields does not match between old and new"); + throw new OpenSearchException( + "fields does not match between old [{}] and new [{}]", + jobParameter.getDatabase().getFields().toString(), + Arrays.asList(fields).toString() + ); } GeoIpDataHelper.putGeoData(client, indexName, fields, iter, indexingBulkSize, timeout); } Instant endTime = Instant.now(); - jobParameter.setDatabase(manifest, fields); + jobParameter.getDatabase().setProvider(manifest.getProvider()); + jobParameter.getDatabase().setMd5Hash(manifest.getMd5Hash()); + jobParameter.getDatabase().setUpdatedAt(Instant.ofEpochMilli(manifest.getUpdatedAt())); + jobParameter.getDatabase().setValidForInDays(manifest.getValidForInDays()); + jobParameter.getDatabase().setFields(Arrays.asList(fields)); jobParameter.getUpdateStats().setLastSucceededAt(endTime); jobParameter.getUpdateStats().setLastProcessingTimeInMillis(endTime.toEpochMilli() - startTime.toEpochMilli()); DatasourceHelper.updateDatasource(client, jobParameter, timeout); diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java b/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java index 0da9cc07..0d6ddd8c 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java @@ -24,7 +24,6 @@ import lombok.extern.log4j.Log4j2; -import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; @@ -173,8 +172,7 @@ private void executeInternal( @Override public void onResponse(final Datasource datasource) { if (datasource == null) { - log.error("Datasource[{}] does not exist", datasourceName); - handler.accept(null, new IllegalStateException("Datasource does not exist")); + handler.accept(null, new IllegalStateException("datasource does not exist")); return; } @@ -196,16 +194,14 @@ public void onResponse(final Map stringObjectMap) { @Override public void onFailure(final Exception e) { - log.error("Error while retrieving geo data from datasource[{}] for a given ip[{}]", datasourceName, ip, e); - handler.accept(null, new OpenSearchException("Failed to geo data")); + handler.accept(null, e); } }); } @Override public void onFailure(final Exception e) { - log.error("Failed to get datasource[{}]", datasourceName, e); - handler.accept(null, new OpenSearchException("Failed to get datasource[{}]", datasourceName)); + handler.accept(null, e); } }); } @@ -235,8 +231,7 @@ private void executeInternal( @Override public void onResponse(final Datasource datasource) { if (datasource == null) { - log.error("Datasource[{}] does not exist", datasourceName); - handler.accept(null, new IllegalStateException("Datasource does not exist")); + handler.accept(null, new IllegalStateException("datasource does not exist")); return; } @@ -291,8 +286,7 @@ public void onResponse(final Object obj) { @Override public void onFailure(final Exception e) { - log.error("Error while retrieving geo data from datasource[{}] for a given ip[{}]", datasourceName, ipList, e); - handler.accept(null, new OpenSearchException("Failed to geo data")); + handler.accept(null, e); } } ); @@ -300,8 +294,7 @@ public void onFailure(final Exception e) { @Override public void onFailure(final Exception e) { - log.error("Failed to get datasource[{}]", datasourceName, e); - handler.accept(null, new OpenSearchException("Failed to get datasource[{}]", datasourceName)); + handler.accept(null, e); } }); }