diff --git a/cli/src/main/java/fr/pilato/elasticsearch/crawler/fs/cli/FsCrawler.java b/cli/src/main/java/fr/pilato/elasticsearch/crawler/fs/cli/FsCrawler.java index 0149add78..ff4c84f18 100644 --- a/cli/src/main/java/fr/pilato/elasticsearch/crawler/fs/cli/FsCrawler.java +++ b/cli/src/main/java/fr/pilato/elasticsearch/crawler/fs/cli/FsCrawler.java @@ -1,4 +1,4 @@ -package fr.pilato.elasticsearch.crawler.fs.cli;/* +/* * Licensed to David Pilato (the "Author") under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,6 +17,8 @@ * under the License. */ +package fr.pilato.elasticsearch.crawler.fs.cli; + import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import fr.pilato.elasticsearch.crawler.fs.FsCrawlerImpl; diff --git a/cli/src/test/resources/legacy/2_0/david.json b/cli/src/test/resources/legacy/2_0/david.json index e1ef836fa..da610f038 100644 --- a/cli/src/test/resources/legacy/2_0/david.json +++ b/cli/src/test/resources/legacy/2_0/david.json @@ -25,4 +25,4 @@ "bulk_size" : 100, "flush_interval" : "5s" } -} \ No newline at end of file +} diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsCrawlerImpl.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsCrawlerImpl.java index 244cae993..103f78813 100644 --- a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsCrawlerImpl.java +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsCrawlerImpl.java @@ -41,8 +41,6 @@ public class FsCrawlerImpl { @Deprecated public static final String INDEX_TYPE_FOLDER = "folder"; - @Deprecated - public static final String INDEX_TYPE_DOC = "doc"; private static final Logger logger = LogManager.getLogger(FsCrawlerImpl.class); diff --git a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParser.java b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParser.java index 4b53cd282..7d51bda0b 100644 --- a/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParser.java +++ b/core/src/main/java/fr/pilato/elasticsearch/crawler/fs/FsParser.java @@ -81,6 +81,12 @@ public abstract class FsParser implements Runnable { private final Integer loop; private final MessageDigest messageDigest; + /** + * This is a temporary value we need to support both v5 and newer versions. + * V5 does not allow a type named _doc but V6 recommends using it. + */ + private final String typeName; + private ScanStatistic stats; private final AtomicInteger runNumber = new AtomicInteger(0); private static final Object semaphore = new Object(); @@ -105,6 +111,8 @@ public FsParser(FsSettings fsSettings, Path config, ElasticsearchClientManager e } else { messageDigest = null; } + + typeName = esClientManager.client().getDefaultTypeName(); } protected abstract FileAbstractor buildFileAbstractor(); @@ -558,13 +566,13 @@ private void removeEsDirectoryRecursively(final String path) throws Exception { * Add to bulk an IndexRequest in JSon format */ void esIndex(BulkProcessor bulkProcessor, String index, String id, String json, String pipeline) { - logger.debug("Indexing {}/doc/{}?pipeline={}", index, id, pipeline); + logger.debug("Indexing {}/{}/{}?pipeline={}", index, typeName, id, pipeline); logger.trace("JSon indexed : {}", json); if (!closed) { - bulkProcessor.add(new IndexRequest(index, "doc", id).source(json, XContentType.JSON).setPipeline(pipeline)); + bulkProcessor.add(new IndexRequest(index, typeName, id).source(json, XContentType.JSON).setPipeline(pipeline)); } else { - logger.warn("trying to add new file while closing crawler. Document [{}]/[doc]/[{}] has been ignored", index, id); + logger.warn("trying to add new file while closing crawler. Document [{}]/[{}]/[{}] has been ignored", index, typeName, id); } } @@ -572,11 +580,11 @@ void esIndex(BulkProcessor bulkProcessor, String index, String id, String json, * Add to bulk a DeleteRequest */ void esDelete(String index, String id) { - logger.debug("Deleting {}/doc/{}", index, id); + logger.debug("Deleting {}/{}/{}", index, typeName, id); if (!closed) { - esClientManager.bulkProcessorDoc().add(new DeleteRequest(index, "doc", id)); + esClientManager.bulkProcessorDoc().add(new DeleteRequest(index, typeName, id)); } else { - logger.warn("trying to remove a file while closing crawler. Document [{}]/[doc]/[{}] has been ignored", index, id); + logger.warn("trying to remove a file while closing crawler. Document [{}]/[{}]/[{}] has been ignored", index, typeName, id); } } diff --git a/docs/source/admin/fs/elasticsearch.rst b/docs/source/admin/fs/elasticsearch.rst index a7e927c30..7113c2f29 100644 --- a/docs/source/admin/fs/elasticsearch.rst +++ b/docs/source/admin/fs/elasticsearch.rst @@ -107,6 +107,11 @@ Or fall back to the command line: - ``6/_settings.json``: for elasticsearch 6.x series document index settings - ``6/_settings_folder.json``: for elasticsearch 6.x series folder index settings +.. note:: + + For versions before 6.x series, the type of the document is ``doc``. + From 6.x, the type of the document is ``_doc``. + Creating your own mapping (analyzers) """"""""""""""""""""""""""""""""""""" @@ -138,7 +143,7 @@ The following example uses a ``french`` analyzer to index the } }, "mappings": { - "doc": { + "_doc": { "properties" : { "attachment" : { "type" : "binary", diff --git a/docs/source/admin/fs/local-fs.rst b/docs/source/admin/fs/local-fs.rst index 494d4c69b..82c96fcc5 100644 --- a/docs/source/admin/fs/local-fs.rst +++ b/docs/source/admin/fs/local-fs.rst @@ -647,7 +647,7 @@ JSon document. This field is not indexed. Default mapping for .. code:: json { - "doc" : { + "_doc" : { "properties" : { "attachment" : { "type" : "binary", diff --git a/docs/source/admin/fs/rest.rst b/docs/source/admin/fs/rest.rst index d43594da2..f7de2ed19 100644 --- a/docs/source/admin/fs/rest.rst +++ b/docs/source/admin/fs/rest.rst @@ -98,7 +98,7 @@ You will get back your document as it has been stored by elasticsearch: { "_index" : "fscrawler-rest-tests_doc", - "_type" : "doc", + "_type" : "_doc", "_id" : "dd18bf3a8ea2a3e53e2661c7fb53534", "_version" : 1, "found" : true, diff --git a/docs/source/installation.rst b/docs/source/installation.rst index 584540dac..7c7f1ac40 100644 --- a/docs/source/installation.rst +++ b/docs/source/installation.rst @@ -148,7 +148,7 @@ to upgrade elasticsearch. This procedure only applies if you did not set previously ``elasticsearch.type`` setting (default value was ``doc``). If you did, then you also need to reindex the existing documents to the default -``doc`` type as per elasticsearch 6.0: +``_doc`` type as per elasticsearch 6.x (or ``doc`` for 5.x series): :: @@ -161,7 +161,7 @@ then you also need to reindex the existing documents to the default }, "dest": { "index": "job_name", - "type": "doc" + "type": "_doc" } } # Remove old type data from job_name index @@ -252,3 +252,6 @@ Then restore old data: The default mapping changed for FSCrawler for ``meta.raw.*`` fields. Might be better to reindex your data. +- For new indices, FSCrawler now uses ``_doc`` as the default type name for clusters +running elasticsearch 6.x or superior. + diff --git a/elasticsearch-client/src/main/java/fr/pilato/elasticsearch/crawler/fs/client/ElasticsearchClient.java b/elasticsearch-client/src/main/java/fr/pilato/elasticsearch/crawler/fs/client/ElasticsearchClient.java index f52848c27..1e821d46c 100644 --- a/elasticsearch-client/src/main/java/fr/pilato/elasticsearch/crawler/fs/client/ElasticsearchClient.java +++ b/elasticsearch-client/src/main/java/fr/pilato/elasticsearch/crawler/fs/client/ElasticsearchClient.java @@ -59,9 +59,27 @@ public class ElasticsearchClient extends RestHighLevelClient { private static final Logger logger = LogManager.getLogger(ElasticsearchClient.class); private boolean INGEST_SUPPORT = true; + /** + * Type name for Elasticsearch versions < 6.0 + * @deprecated Will be removed with Elasticsearch V8 + */ + @Deprecated + private static final String INDEX_TYPE_DOC_V5 = "doc"; + /** + * Type name for Elasticsearch versions >= 6.0 + * @deprecated Will be removed with Elasticsearch V8 + */ + @Deprecated + private static final String INDEX_TYPE_DOC = "_doc"; + /** + * Type name to use. It depends on elasticsearch version. + * @deprecated Will be removed with Elasticsearch V8 + */ + @Deprecated + private String defaultTypeName = INDEX_TYPE_DOC; private Version VERSION = null; - public ElasticsearchClient(RestClientBuilder client) throws IOException { + public ElasticsearchClient(RestClientBuilder client) { super(client); } @@ -280,6 +298,14 @@ public void setElasticsearchBehavior() throws IOException { INGEST_SUPPORT = false; logger.debug("Using elasticsearch < 5, so we can't use ingest node feature"); } + + // With elasticsearch 6.x, we can use _doc as the default type name + if (VERSION.onOrAfter(Version.V_6_0_0)) { + logger.debug("Using elasticsearch >= 6, so we can use {} as the default type name", defaultTypeName); + } else { + defaultTypeName = INDEX_TYPE_DOC_V5; + logger.debug("Using elasticsearch < 6, so we use {} as the default type name", defaultTypeName); + } } } @@ -291,6 +317,10 @@ public Version getVersion() { return VERSION; } + public String getDefaultTypeName() { + return defaultTypeName; + } + public static Node decodeCloudId(String cloudId) { // 1. Ignore anything before `:`. String id = cloudId.substring(cloudId.indexOf(':')+1); diff --git a/integration-tests/src/test/java/fr/pilato/elasticsearch/crawler/fs/test/integration/AbstractITCase.java b/integration-tests/src/test/java/fr/pilato/elasticsearch/crawler/fs/test/integration/AbstractITCase.java index 6798a9612..af51367f2 100644 --- a/integration-tests/src/test/java/fr/pilato/elasticsearch/crawler/fs/test/integration/AbstractITCase.java +++ b/integration-tests/src/test/java/fr/pilato/elasticsearch/crawler/fs/test/integration/AbstractITCase.java @@ -118,6 +118,7 @@ public abstract class AbstractITCase extends AbstractFSCrawlerTestCase { private static ElasticsearchContainer container; private static RestClient esRestClient; + static String typeName; @BeforeClass public static void createFsCrawlerJobDir() throws IOException { @@ -279,6 +280,8 @@ public static void startElasticsearchRestClient() throws IOException { // We set what will be elasticsearch behavior as it depends on the cluster version elasticsearchClient.setElasticsearchBehavior(); + + typeName = elasticsearchClient.getDefaultTypeName(); } @BeforeClass diff --git a/integration-tests/src/test/java/fr/pilato/elasticsearch/crawler/fs/test/integration/FsCrawlerTestFilenameAsIdIT.java b/integration-tests/src/test/java/fr/pilato/elasticsearch/crawler/fs/test/integration/FsCrawlerTestFilenameAsIdIT.java index 5720baff6..ec4beb3a5 100644 --- a/integration-tests/src/test/java/fr/pilato/elasticsearch/crawler/fs/test/integration/FsCrawlerTestFilenameAsIdIT.java +++ b/integration-tests/src/test/java/fr/pilato/elasticsearch/crawler/fs/test/integration/FsCrawlerTestFilenameAsIdIT.java @@ -47,7 +47,7 @@ public void test_filename_as_id() throws Exception { assertThat("Document should exists with [roottxtfile.txt] id...", awaitBusy(() -> { try { - return elasticsearchClient.exists(new GetRequest(getCrawlerName(), "doc", "roottxtfile.txt")); + return elasticsearchClient.exists(new GetRequest(getCrawlerName(), typeName, "roottxtfile.txt")); } catch (IOException e) { return false; } @@ -70,14 +70,14 @@ public void test_remove_deleted_with_filename_as_id() throws Exception { assertThat("Document should exists with [id1.txt] id...", awaitBusy(() -> { try { - return elasticsearchClient.exists(new GetRequest(getCrawlerName(), "doc", "id1.txt")); + return elasticsearchClient.exists(new GetRequest(getCrawlerName(), typeName, "id1.txt")); } catch (IOException e) { return false; } }), equalTo(true)); assertThat("Document should exists with [id2.txt] id...", awaitBusy(() -> { try { - return elasticsearchClient.exists(new GetRequest(getCrawlerName(), "doc", "id2.txt")); + return elasticsearchClient.exists(new GetRequest(getCrawlerName(), typeName, "id2.txt")); } catch (IOException e) { return false; } diff --git a/integration-tests/src/test/java/fr/pilato/elasticsearch/crawler/fs/test/integration/FsCrawlerTestRawIT.java b/integration-tests/src/test/java/fr/pilato/elasticsearch/crawler/fs/test/integration/FsCrawlerTestRawIT.java index bb90af954..15a53f5d5 100644 --- a/integration-tests/src/test/java/fr/pilato/elasticsearch/crawler/fs/test/integration/FsCrawlerTestRawIT.java +++ b/integration-tests/src/test/java/fr/pilato/elasticsearch/crawler/fs/test/integration/FsCrawlerTestRawIT.java @@ -69,10 +69,10 @@ public void test_mapping() throws Exception { // This will cause an Elasticsearch Exception as the String is not a Date // If the mapping is incorrect - elasticsearchClient.index(new IndexRequest(getCrawlerName(), "doc", "1") + elasticsearchClient.index(new IndexRequest(getCrawlerName(), typeName, "1") .source(json1, XContentType.JSON) ); - elasticsearchClient.index(new IndexRequest(getCrawlerName(), "doc", "2") + elasticsearchClient.index(new IndexRequest(getCrawlerName(), typeName, "2") .source(json2, XContentType.JSON) ); } diff --git a/integration-tests/src/test/java/fr/pilato/elasticsearch/crawler/fs/test/integration/FsCrawlerTestRemoveDeletedIT.java b/integration-tests/src/test/java/fr/pilato/elasticsearch/crawler/fs/test/integration/FsCrawlerTestRemoveDeletedIT.java index aa78f57cc..99cd6ecf1 100644 --- a/integration-tests/src/test/java/fr/pilato/elasticsearch/crawler/fs/test/integration/FsCrawlerTestRemoveDeletedIT.java +++ b/integration-tests/src/test/java/fr/pilato/elasticsearch/crawler/fs/test/integration/FsCrawlerTestRemoveDeletedIT.java @@ -236,7 +236,7 @@ private void checkDocVersions(SearchResponse response, long maxVersion) { for (SearchHit hit : response.getHits().getHits()) { // Read the document. This is needed since 5.0 as search does not return the _version field try { - GetResponse getHit = elasticsearchClient.get(new GetRequest(hit.getIndex(), "doc", hit.getId())); + GetResponse getHit = elasticsearchClient.get(new GetRequest(hit.getIndex(), typeName, hit.getId())); assertThat(getHit.getVersion(), lessThanOrEqualTo(maxVersion)); } catch (IOException e) { fail("We got an IOException: " + e.getMessage()); diff --git a/rest/src/main/java/fr/pilato/elasticsearch/crawler/fs/rest/RestServer.java b/rest/src/main/java/fr/pilato/elasticsearch/crawler/fs/rest/RestServer.java index 2e3fc1b9b..d301f9240 100644 --- a/rest/src/main/java/fr/pilato/elasticsearch/crawler/fs/rest/RestServer.java +++ b/rest/src/main/java/fr/pilato/elasticsearch/crawler/fs/rest/RestServer.java @@ -49,7 +49,7 @@ public static void start(FsSettings settings, ElasticsearchClientManager elastic final ResourceConfig rc = new ResourceConfig() .registerInstances( new ServerStatusApi(elasticsearchClientManager.client(), settings), - new UploadApi(settings, elasticsearchClientManager.bulkProcessorDoc())) + new UploadApi(settings, elasticsearchClientManager)) .register(MultiPartFeature.class) .register(RestJsonProvider.class) .register(JacksonFeature.class) diff --git a/rest/src/main/java/fr/pilato/elasticsearch/crawler/fs/rest/UploadApi.java b/rest/src/main/java/fr/pilato/elasticsearch/crawler/fs/rest/UploadApi.java index 037b83dfb..f0d06d60e 100644 --- a/rest/src/main/java/fr/pilato/elasticsearch/crawler/fs/rest/UploadApi.java +++ b/rest/src/main/java/fr/pilato/elasticsearch/crawler/fs/rest/UploadApi.java @@ -22,12 +22,12 @@ import fr.pilato.elasticsearch.crawler.fs.beans.Doc; import fr.pilato.elasticsearch.crawler.fs.beans.DocParser; +import fr.pilato.elasticsearch.crawler.fs.client.ElasticsearchClientManager; import fr.pilato.elasticsearch.crawler.fs.framework.SignTool; import fr.pilato.elasticsearch.crawler.fs.settings.Elasticsearch; import fr.pilato.elasticsearch.crawler.fs.settings.FsSettings; import fr.pilato.elasticsearch.crawler.fs.tika.TikaDocParser; import org.apache.commons.io.FilenameUtils; -import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.common.xcontent.XContentType; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; import org.glassfish.jersey.media.multipart.FormDataParam; @@ -51,14 +51,14 @@ @Path("/_upload") public class UploadApi extends RestApi { - private final BulkProcessor bulkProcessor; + private final ElasticsearchClientManager elasticsearchClientManager; private final FsSettings settings; private final MessageDigest messageDigest; private static final TimeBasedUUIDGenerator TIME_UUID_GENERATOR = new TimeBasedUUIDGenerator(); - UploadApi(FsSettings settings, BulkProcessor bulkProcessor) { + UploadApi(FsSettings settings, ElasticsearchClientManager elasticsearchClientManager) { this.settings = settings; - this.bulkProcessor = bulkProcessor; + this.elasticsearchClientManager = elasticsearchClientManager; // Create MessageDigest instance try { messageDigest = settings.getFs().getChecksum() == null ? @@ -110,9 +110,13 @@ public UploadResponse post( logger.debug("Simulate mode is on, so we skip sending document [{}] to elasticsearch.", filename); } else { logger.debug("Sending document [{}] to elasticsearch.", filename); - bulkProcessor.add(new org.elasticsearch.action.index.IndexRequest(settings.getElasticsearch().getIndex(), "doc", id) - .setPipeline(settings.getElasticsearch().getPipeline()) - .source(DocParser.toJson(doc), XContentType.JSON)); + elasticsearchClientManager.bulkProcessorDoc().add( + new org.elasticsearch.action.index.IndexRequest( + settings.getElasticsearch().getIndex(), + elasticsearchClientManager.client().getDefaultTypeName(), + id) + .setPipeline(settings.getElasticsearch().getPipeline()) + .source(DocParser.toJson(doc), XContentType.JSON)); // Elasticsearch entity coordinates (we use the first node address) Elasticsearch.Node node = settings.getElasticsearch().getNodes().get(0); if (node.getCloudId() != null) { @@ -121,7 +125,7 @@ public UploadResponse post( url = buildUrl( node.getScheme().toLowerCase(), node.getHost(), node.getPort()) + "/" + settings.getElasticsearch().getIndex() + "/" + - "doc" + "/" + + elasticsearchClientManager.client().getDefaultTypeName() + "/" + id; } diff --git a/settings/src/main/resources/fr/pilato/elasticsearch/crawler/fs/_default/6/_settings.json b/settings/src/main/resources/fr/pilato/elasticsearch/crawler/fs/_default/6/_settings.json index 7c7768d7f..72265216c 100644 --- a/settings/src/main/resources/fr/pilato/elasticsearch/crawler/fs/_default/6/_settings.json +++ b/settings/src/main/resources/fr/pilato/elasticsearch/crawler/fs/_default/6/_settings.json @@ -15,7 +15,7 @@ } }, "mappings": { - "doc": { + "_doc": { "dynamic_templates": [ { "raw_as_text": { diff --git a/settings/src/main/resources/fr/pilato/elasticsearch/crawler/fs/_default/6/_settings_folder.json b/settings/src/main/resources/fr/pilato/elasticsearch/crawler/fs/_default/6/_settings_folder.json index 506b51caa..089844e69 100644 --- a/settings/src/main/resources/fr/pilato/elasticsearch/crawler/fs/_default/6/_settings_folder.json +++ b/settings/src/main/resources/fr/pilato/elasticsearch/crawler/fs/_default/6/_settings_folder.json @@ -14,7 +14,7 @@ } }, "mappings": { - "doc": { + "_doc": { "properties" : { "real" : { "type" : "keyword", diff --git a/settings/src/test/java/fr/pilato/elasticsearch/crawler/fs/settings/FsMappingTest.java b/settings/src/test/java/fr/pilato/elasticsearch/crawler/fs/settings/FsMappingTest.java index b0096173f..6fce8636f 100644 --- a/settings/src/test/java/fr/pilato/elasticsearch/crawler/fs/settings/FsMappingTest.java +++ b/settings/src/test/java/fr/pilato/elasticsearch/crawler/fs/settings/FsMappingTest.java @@ -600,7 +600,7 @@ public void fsSettingsForDocVersion6() throws Exception { " }\n" + " },\n" + " \"mappings\": {\n" + - " \"doc\": {\n" + + " \"_doc\": {\n" + " \"dynamic_templates\": [\n" + " {\n" + " \"raw_as_text\": {\n" + @@ -806,7 +806,7 @@ public void fsSettingsForFolderVersion6() throws Exception { " }\n" + " },\n" + " \"mappings\": {\n" + - " \"doc\": {\n" + + " \"_doc\": {\n" + " \"properties\" : {\n" + " \"real\" : {\n" + " \"type\" : \"keyword\",\n" +