diff --git a/.ci/bwcVersions b/.ci/bwcVersions index 2ec391cd06b6c..d8bfe55ae3bfe 100644 --- a/.ci/bwcVersions +++ b/.ci/bwcVersions @@ -16,4 +16,5 @@ BWC_VERSION: - "7.5.2" - "7.5.3" - "7.6.0" + - "7.7.0" - "8.0.0" diff --git a/buildSrc/src/main/groovy/org/elasticsearch/gradle/BuildPlugin.groovy b/buildSrc/src/main/groovy/org/elasticsearch/gradle/BuildPlugin.groovy index fc853410f746a..a5408304a3faf 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/gradle/BuildPlugin.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/gradle/BuildPlugin.groovy @@ -426,7 +426,7 @@ class BuildPlugin implements Plugin { dependencyNode.appendNode('groupId', dependency.group) dependencyNode.appendNode('artifactId', dependency.getDependencyProject().convention.getPlugin(BasePluginConvention).archivesBaseName) dependencyNode.appendNode('version', dependency.version) - dependencyNode.appendNode('scope', 'runtime') + dependencyNode.appendNode('scope', 'compile') } } } diff --git a/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/RestTestRunnerTask.java b/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/RestTestRunnerTask.java index f4de829009de3..5cd88ea01a32a 100644 --- a/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/RestTestRunnerTask.java +++ b/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/RestTestRunnerTask.java @@ -1,12 +1,22 @@ package org.elasticsearch.gradle.testclusters; +import org.elasticsearch.gradle.tool.Boilerplate; +import org.gradle.api.provider.Provider; +import org.gradle.api.services.internal.BuildServiceRegistryInternal; import org.gradle.api.tasks.CacheableTask; +import org.gradle.api.tasks.Internal; import org.gradle.api.tasks.Nested; import org.gradle.api.tasks.testing.Test; +import org.gradle.internal.resources.ResourceLock; +import org.gradle.internal.resources.SharedResource; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.List; + +import static org.elasticsearch.gradle.testclusters.TestClustersPlugin.THROTTLE_SERVICE_NAME; /** * Customized version of Gradle {@link Test} task which tracks a collection of {@link ElasticsearchCluster} as a task input. We must do this @@ -47,4 +57,19 @@ public Collection getClusters() { return clusters; } + @Override + @Internal + public List getSharedResources() { + List locks = new ArrayList<>(super.getSharedResources()); + BuildServiceRegistryInternal serviceRegistry = getServices().get(BuildServiceRegistryInternal.class); + Provider throttleProvider = Boilerplate.getBuildService(serviceRegistry, THROTTLE_SERVICE_NAME); + SharedResource resource = serviceRegistry.forService(throttleProvider); + + int nodeCount = clusters.stream().mapToInt(cluster -> cluster.getNodes().size()).sum(); + if (nodeCount > 0) { + locks.add(resource.getResourceLock(Math.min(nodeCount, resource.getMaxUsages()))); + } + + return Collections.unmodifiableList(locks); + } } diff --git a/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClustersPlugin.java b/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClustersPlugin.java index e45119670d91f..fc6f81f33471f 100644 --- a/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClustersPlugin.java +++ b/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClustersPlugin.java @@ -21,6 +21,7 @@ import org.elasticsearch.gradle.DistributionDownloadPlugin; import org.elasticsearch.gradle.ReaperPlugin; import org.elasticsearch.gradle.ReaperService; +import org.elasticsearch.gradle.tool.Boilerplate; import org.gradle.api.NamedDomainObjectContainer; import org.gradle.api.Plugin; import org.gradle.api.Project; @@ -30,53 +31,50 @@ import org.gradle.api.invocation.Gradle; import org.gradle.api.logging.Logger; import org.gradle.api.logging.Logging; +import org.gradle.api.provider.Provider; import org.gradle.api.tasks.TaskState; import java.io.File; public class TestClustersPlugin implements Plugin { - private static final String LIST_TASK_NAME = "listTestClusters"; public static final String EXTENSION_NAME = "testClusters"; - private static final String REGISTRY_EXTENSION_NAME = "testClustersRegistry"; + public static final String THROTTLE_SERVICE_NAME = "testClustersThrottle"; + private static final String LIST_TASK_NAME = "listTestClusters"; + private static final String REGISTRY_SERVICE_NAME = "testClustersRegistry"; private static final Logger logger = Logging.getLogger(TestClustersPlugin.class); - private ReaperService reaper; - @Override public void apply(Project project) { project.getPlugins().apply(DistributionDownloadPlugin.class); - project.getRootProject().getPluginManager().apply(ReaperPlugin.class); - reaper = project.getRootProject().getExtensions().getByType(ReaperService.class); + + ReaperService reaper = project.getRootProject().getExtensions().getByType(ReaperService.class); // enable the DSL to describe clusters - NamedDomainObjectContainer container = createTestClustersContainerExtension(project); + NamedDomainObjectContainer container = createTestClustersContainerExtension(project, reaper); // provide a task to be able to list defined clusters. createListClustersTask(project, container); - if (project.getRootProject().getExtensions().findByName(REGISTRY_EXTENSION_NAME) == null) { - TestClustersRegistry registry = project.getRootProject() - .getExtensions() - .create(REGISTRY_EXTENSION_NAME, TestClustersRegistry.class); - - // When we know what tasks will run, we claim the clusters of those task to differentiate between clusters - // that are defined in the build script and the ones that will actually be used in this invocation of gradle - // we use this information to determine when the last task that required the cluster executed so that we can - // terminate the cluster right away and free up resources. - configureClaimClustersHook(project.getGradle(), registry); + // register cluster registry as a global build service + project.getGradle().getSharedServices().registerIfAbsent(REGISTRY_SERVICE_NAME, TestClustersRegistry.class, spec -> {}); - // Before each task, we determine if a cluster needs to be started for that task. - configureStartClustersHook(project.getGradle(), registry); + // register throttle so we only run at most max-workers/2 nodes concurrently + project.getGradle() + .getSharedServices() + .registerIfAbsent( + THROTTLE_SERVICE_NAME, + TestClustersThrottle.class, + spec -> spec.getMaxParallelUsages().set(project.getGradle().getStartParameter().getMaxWorkerCount() / 2) + ); - // After each task we determine if there are clusters that are no longer needed. - configureStopClustersHook(project.getGradle(), registry); - } + // register cluster hooks + project.getRootProject().getPluginManager().apply(TestClustersHookPlugin.class); } - private NamedDomainObjectContainer createTestClustersContainerExtension(Project project) { + private NamedDomainObjectContainer createTestClustersContainerExtension(Project project, ReaperService reaper) { // Create an extensions that allows describing clusters NamedDomainObjectContainer container = project.container( ElasticsearchCluster.class, @@ -95,52 +93,78 @@ private void createListClustersTask(Project project, NamedDomainObjectContainer< ); } - private static void configureClaimClustersHook(Gradle gradle, TestClustersRegistry registry) { - // Once we know all the tasks that need to execute, we claim all the clusters that belong to those and count the - // claims so we'll know when it's safe to stop them. - gradle.getTaskGraph().whenReady(taskExecutionGraph -> { - taskExecutionGraph.getAllTasks() - .stream() - .filter(task -> task instanceof TestClustersAware) - .map(task -> (TestClustersAware) task) - .flatMap(task -> task.getClusters().stream()) - .forEach(registry::claimCluster); - }); - } + static class TestClustersHookPlugin implements Plugin { + @Override + public void apply(Project project) { + if (project != project.getRootProject()) { + throw new IllegalStateException(this.getClass().getName() + " can only be applied to the root project."); + } + + Provider registryProvider = Boilerplate.getBuildService( + project.getGradle().getSharedServices(), + REGISTRY_SERVICE_NAME + ); + TestClustersRegistry registry = registryProvider.get(); + + // When we know what tasks will run, we claim the clusters of those task to differentiate between clusters + // that are defined in the build script and the ones that will actually be used in this invocation of gradle + // we use this information to determine when the last task that required the cluster executed so that we can + // terminate the cluster right away and free up resources. + configureClaimClustersHook(project.getGradle(), registry); + + // Before each task, we determine if a cluster needs to be started for that task. + configureStartClustersHook(project.getGradle(), registry); + + // After each task we determine if there are clusters that are no longer needed. + configureStopClustersHook(project.getGradle(), registry); + } - private static void configureStartClustersHook(Gradle gradle, TestClustersRegistry registry) { - gradle.addListener(new TaskActionListener() { - @Override - public void beforeActions(Task task) { - if (task instanceof TestClustersAware == false) { - return; + private static void configureClaimClustersHook(Gradle gradle, TestClustersRegistry registry) { + // Once we know all the tasks that need to execute, we claim all the clusters that belong to those and count the + // claims so we'll know when it's safe to stop them. + gradle.getTaskGraph().whenReady(taskExecutionGraph -> { + taskExecutionGraph.getAllTasks() + .stream() + .filter(task -> task instanceof TestClustersAware) + .map(task -> (TestClustersAware) task) + .flatMap(task -> task.getClusters().stream()) + .forEach(registry::claimCluster); + }); + } + + private static void configureStartClustersHook(Gradle gradle, TestClustersRegistry registry) { + gradle.addListener(new TaskActionListener() { + @Override + public void beforeActions(Task task) { + if (task instanceof TestClustersAware == false) { + return; + } + // we only start the cluster before the actions, so we'll not start it if the task is up-to-date + TestClustersAware awareTask = (TestClustersAware) task; + awareTask.beforeStart(); + awareTask.getClusters().forEach(registry::maybeStartCluster); } - // we only start the cluster before the actions, so we'll not start it if the task is up-to-date - TestClustersAware awareTask = (TestClustersAware) task; - awareTask.beforeStart(); - awareTask.getClusters().forEach(registry::maybeStartCluster); - } - @Override - public void afterActions(Task task) {} - }); - } + @Override + public void afterActions(Task task) {} + }); + } - private static void configureStopClustersHook(Gradle gradle, TestClustersRegistry registry) { - gradle.addListener(new TaskExecutionListener() { - @Override - public void afterExecute(Task task, TaskState state) { - if (task instanceof TestClustersAware == false) { - return; + private static void configureStopClustersHook(Gradle gradle, TestClustersRegistry registry) { + gradle.addListener(new TaskExecutionListener() { + @Override + public void afterExecute(Task task, TaskState state) { + if (task instanceof TestClustersAware == false) { + return; + } + // always unclaim the cluster, even if _this_ task is up-to-date, as others might not have been + // and caused the cluster to start. + ((TestClustersAware) task).getClusters().forEach(cluster -> registry.stopCluster(cluster, state.getFailure() != null)); } - // always unclaim the cluster, even if _this_ task is up-to-date, as others might not have been - // and caused the cluster to start. - ((TestClustersAware) task).getClusters().forEach(cluster -> registry.stopCluster(cluster, state.getFailure() != null)); - } - @Override - public void beforeExecute(Task task) {} - }); + @Override + public void beforeExecute(Task task) {} + }); + } } - } diff --git a/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClustersRegistry.java b/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClustersRegistry.java index d78aecc82185a..dff0a475eb445 100644 --- a/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClustersRegistry.java +++ b/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClustersRegistry.java @@ -2,13 +2,15 @@ import org.gradle.api.logging.Logger; import org.gradle.api.logging.Logging; +import org.gradle.api.services.BuildService; +import org.gradle.api.services.BuildServiceParameters; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; -public class TestClustersRegistry { +public abstract class TestClustersRegistry implements BuildService { private static final Logger logger = Logging.getLogger(TestClustersRegistry.class); private static final String TESTCLUSTERS_INSPECT_FAILURE = "testclusters.inspect.failure"; private final Boolean allowClusterToSurvive = Boolean.valueOf(System.getProperty(TESTCLUSTERS_INSPECT_FAILURE, "false")); diff --git a/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClustersThrottle.java b/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClustersThrottle.java new file mode 100644 index 0000000000000..4c04f939560b1 --- /dev/null +++ b/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClustersThrottle.java @@ -0,0 +1,6 @@ +package org.elasticsearch.gradle.testclusters; + +import org.gradle.api.services.BuildService; +import org.gradle.api.services.BuildServiceParameters; + +public abstract class TestClustersThrottle implements BuildService {} diff --git a/buildSrc/src/main/java/org/elasticsearch/gradle/tool/Boilerplate.java b/buildSrc/src/main/java/org/elasticsearch/gradle/tool/Boilerplate.java index 3652944e1647e..e10b4099b7d24 100644 --- a/buildSrc/src/main/java/org/elasticsearch/gradle/tool/Boilerplate.java +++ b/buildSrc/src/main/java/org/elasticsearch/gradle/tool/Boilerplate.java @@ -19,12 +19,17 @@ package org.elasticsearch.gradle.tool; import org.gradle.api.Action; +import org.gradle.api.GradleException; import org.gradle.api.NamedDomainObjectContainer; import org.gradle.api.PolymorphicDomainObjectContainer; import org.gradle.api.Project; import org.gradle.api.Task; import org.gradle.api.UnknownTaskException; import org.gradle.api.plugins.JavaPluginConvention; +import org.gradle.api.provider.Provider; +import org.gradle.api.services.BuildService; +import org.gradle.api.services.BuildServiceRegistration; +import org.gradle.api.services.BuildServiceRegistry; import org.gradle.api.tasks.SourceSetContainer; import org.gradle.api.tasks.TaskContainer; import org.gradle.api.tasks.TaskProvider; @@ -102,4 +107,14 @@ public static TaskProvider findByName(TaskContainer tasks, String name) { return task; } + + @SuppressWarnings("unchecked") + public static > Provider getBuildService(BuildServiceRegistry registry, String name) { + BuildServiceRegistration registration = registry.getRegistrations().findByName(name); + if (registration == null) { + throw new GradleException("Unable to find build service with name '" + name + "'."); + } + + return (Provider) registration.getService(); + } } diff --git a/distribution/docker/build.gradle b/distribution/docker/build.gradle index 95755b1800630..9ef1cc3958451 100644 --- a/distribution/docker/build.gradle +++ b/distribution/docker/build.gradle @@ -208,6 +208,7 @@ subprojects { Project subProject -> def tarFile = "${parent.projectDir}/build/elasticsearch${oss ? '-oss' : ''}_test.${VersionProperties.elasticsearch}.docker.tar" final Task exportDockerImageTask = task(exportTaskName, type: LoggedExec) { + inputs.file("${parent.projectDir}/build/markers/${buildTaskName}.marker") executable 'docker' outputs.file(tarFile) args "save", diff --git a/distribution/docker/src/docker/Dockerfile b/distribution/docker/src/docker/Dockerfile index 41113d1d59694..bc206e9467a1b 100644 --- a/distribution/docker/src/docker/Dockerfile +++ b/distribution/docker/src/docker/Dockerfile @@ -35,6 +35,17 @@ RUN chmod 0775 config data logs COPY config/elasticsearch.yml config/log4j2.properties config/ RUN chmod 0660 config/elasticsearch.yml config/log4j2.properties +# `tini` is a tiny but valid init for containers. This is used to cleanly +# control how ES and any child processes are shut down. +# +# The tini GitHub page gives instructions for verifying the binary using +# gpg, but the keyservers are slow to return the key and this can fail the +# build. Instead, we check the binary against a checksum that we have +# computed. +ADD https://github.com/krallin/tini/releases/download/v0.18.0/tini /tini +COPY config/tini.sha512 /tini.sha512 +RUN sha512sum -c /tini.sha512 && chmod +x /tini + ################################################################################ # Build stage 1 (the actual elasticsearch image): # Copy elasticsearch from stage 0 @@ -45,6 +56,8 @@ FROM centos:7 ENV ELASTIC_CONTAINER true +COPY --from=builder /tini /tini + RUN for iter in {1..10}; do yum update --setopt=tsflags=nodocs -y && \ yum install --setopt=tsflags=nodocs -y nc shadow-utils zip unzip && \ yum clean all && exit_code=0 && break || exit_code=\$? && echo "yum error: retry \$iter in 10s" && sleep 10; done; \ @@ -65,14 +78,14 @@ RUN ln -sf /etc/pki/ca-trust/extracted/java/cacerts /usr/share/elasticsearch/jdk ENV PATH /usr/share/elasticsearch/bin:\$PATH -COPY --chown=1000:0 bin/docker-entrypoint.sh /usr/local/bin/docker-entrypoint.sh +COPY bin/docker-entrypoint.sh /usr/local/bin/docker-entrypoint.sh -# Openshift overrides USER and uses ones with randomly uid>1024 and gid=0 -# Allow ENTRYPOINT (and ES) to run even with a different user -RUN chgrp 0 /usr/local/bin/docker-entrypoint.sh && \ - chmod g=u /etc/passwd && \ +RUN chmod g=u /etc/passwd && \ chmod 0775 /usr/local/bin/docker-entrypoint.sh +# Ensure that there are no files with setuid or setgid, in order to mitigate "stackclash" attacks. +RUN find / -xdev -perm -4000 -exec chmod ug-s {} + + EXPOSE 9200 9300 LABEL org.label-schema.build-date="${build_date}" \ @@ -95,7 +108,9 @@ LABEL org.label-schema.build-date="${build_date}" \ org.opencontainers.image.vendor="Elastic" \ org.opencontainers.image.version="${version}" -ENTRYPOINT ["/usr/local/bin/docker-entrypoint.sh"] +USER elasticsearch:root + +ENTRYPOINT ["/tini", "--", "/usr/local/bin/docker-entrypoint.sh"] # Dummy overridable parameter parsed by entrypoint CMD ["eswrapper"] diff --git a/distribution/docker/src/docker/bin/docker-entrypoint.sh b/distribution/docker/src/docker/bin/docker-entrypoint.sh index 0366060257b2c..b46dd72de814c 100644 --- a/distribution/docker/src/docker/bin/docker-entrypoint.sh +++ b/distribution/docker/src/docker/bin/docker-entrypoint.sh @@ -4,38 +4,22 @@ set -e # Files created by Elasticsearch should always be group writable too umask 0002 -run_as_other_user_if_needed() { - if [[ "$(id -u)" == "0" ]]; then - # If running as root, drop to specified UID and run command - exec chroot --userspec=1000 / "${@}" - else - # Either we are running in Openshift with random uid and are a member of the root group - # or with a custom --user - exec "${@}" - fi -} - # Allow user specify custom CMD, maybe bin/elasticsearch itself # for example to directly specify `-E` style parameters for elasticsearch on k8s # or simply to run /bin/bash to check the image -if [[ "$1" != "eswrapper" ]]; then - if [[ "$(id -u)" == "0" && $(basename "$1") == "elasticsearch" ]]; then - # centos:7 chroot doesn't have the `--skip-chdir` option and - # changes our CWD. - # Rewrite CMD args to replace $1 with `elasticsearch` explicitly, - # so that we are backwards compatible with the docs - # from the previous Elasticsearch versions<6 - # and configuration option D: - # https://www.elastic.co/guide/en/elasticsearch/reference/5.6/docker.html#_d_override_the_image_8217_s_default_ulink_url_https_docs_docker_com_engine_reference_run_cmd_default_command_or_options_cmd_ulink - # Without this, user could specify `elasticsearch -E x.y=z` but - # `bin/elasticsearch -E x.y=z` would not work. - set -- "elasticsearch" "${@:2}" - # Use chroot to switch to UID 1000 - exec chroot --userspec=1000 / "$@" - else - # User probably wants to run something else, like /bin/bash, with another uid forced (Openshift?) - exec "$@" - fi +if [[ "$1" == "eswrapper" || $(basename "$1") == "elasticsearch" ]]; then + # Rewrite CMD args to remove the explicit command, + # so that we are backwards compatible with the docs + # from the previous Elasticsearch versions < 6 + # and configuration option: + # https://www.elastic.co/guide/en/elasticsearch/reference/5.6/docker.html#_d_override_the_image_8217_s_default_ulink_url_https_docs_docker_com_engine_reference_run_cmd_default_command_or_options_cmd_ulink + # Without this, user could specify `elasticsearch -E x.y=z` but + # `bin/elasticsearch -E x.y=z` would not work. In any case, + # we want to continue through this script, and not exec early. + set -- "${@:2}" +else + # Run whatever command the user wanted + exec "$@" fi # Allow environment variables to be set by creating a file with the @@ -56,18 +40,13 @@ if [[ -f bin/elasticsearch-users ]]; then # enabled, but we have no way of knowing which node we are yet. We'll just # honor the variable if it's present. if [[ -n "$ELASTIC_PASSWORD" ]]; then - [[ -f /usr/share/elasticsearch/config/elasticsearch.keystore ]] || (run_as_other_user_if_needed elasticsearch-keystore create) - if ! (run_as_other_user_if_needed elasticsearch-keystore list | grep -q '^bootstrap.password$'); then - (run_as_other_user_if_needed echo "$ELASTIC_PASSWORD" | elasticsearch-keystore add -x 'bootstrap.password') + [[ -f /usr/share/elasticsearch/config/elasticsearch.keystore ]] || (elasticsearch-keystore create) + if ! (elasticsearch-keystore list | grep -q '^bootstrap.password$'); then + (echo "$ELASTIC_PASSWORD" | elasticsearch-keystore add -x 'bootstrap.password') fi fi fi -if [[ "$(id -u)" == "0" ]]; then - # If requested and running as root, mutate the ownership of bind-mounts - if [[ -n "$TAKE_FILE_OWNERSHIP" ]]; then - chown -R 1000:0 /usr/share/elasticsearch/{data,logs} - fi -fi - -run_as_other_user_if_needed /usr/share/elasticsearch/bin/elasticsearch +# Signal forwarding and child reaping is handled by `tini`, which is the +# actual entrypoint of the container +exec /usr/share/elasticsearch/bin/elasticsearch diff --git a/distribution/docker/src/docker/config/tini.sha512 b/distribution/docker/src/docker/config/tini.sha512 new file mode 100644 index 0000000000000..fc2fc738d87e7 --- /dev/null +++ b/distribution/docker/src/docker/config/tini.sha512 @@ -0,0 +1 @@ +ffdb31563e34bca91a094f962544b9d31f5d138432f2d639a0856ff605b3a69f47e48191da42d6956ab62a1b24eafca1a95b299901257832225d26770354ce5e /tini diff --git a/distribution/src/bin/elasticsearch-env-from-file b/distribution/src/bin/elasticsearch-env-from-file index d2cca3d729951..c02732d9d2f14 100644 --- a/distribution/src/bin/elasticsearch-env-from-file +++ b/distribution/src/bin/elasticsearch-env-from-file @@ -20,19 +20,44 @@ for VAR_NAME_FILE in $(env | cut -f1 -d= | grep '_FILE$'); do fi if [[ ! -e "${!VAR_NAME_FILE}" ]]; then - echo "ERROR: File ${!VAR_NAME_FILE} from $VAR_NAME_FILE does not exist" >&2 + # Maybe the file doesn't exist, maybe we just can't read it due to file permissions. + # Check permissions on each part of the path + path='' + if ! echo "${!VAR_NAME_FILE}" | grep -q '^/'; then + path='.' + fi + + dirname "${!VAR_NAME_FILE}" | tr '/' '\n' | while read part; do + if [[ "$path" == "/" ]]; then + path="${path}${part}" + else + path="$path/$part" + fi + + if ! [[ -x "$path" ]]; then + echo "ERROR: Cannot read ${!VAR_NAME_FILE} from $VAR_NAME_FILE, due to lack of permissions on '$path'" 2>&1 + exit 1 + fi + done + + if ! [[ -r "${!VAR_NAME_FILE}" ]]; then + echo "ERROR: File ${!VAR_NAME_FILE} from $VAR_NAME_FILE is not readable." 2>&1 + else + echo "ERROR: File ${!VAR_NAME_FILE} from $VAR_NAME_FILE does not exist" >&2 + fi + exit 1 fi FILE_PERMS="$(stat -L -c '%a' ${!VAR_NAME_FILE})" if [[ "$FILE_PERMS" != "400" && "$FILE_PERMS" != "600" ]]; then - if [[ -h "${!VAR_NAME_FILE}" ]]; then - echo "ERROR: File $(readlink "${!VAR_NAME_FILE}") (target of symlink ${!VAR_NAME_FILE} from $VAR_NAME_FILE) must have file permissions 400 or 600, but actually has: $FILE_PERMS" >&2 - else - echo "ERROR: File ${!VAR_NAME_FILE} from $VAR_NAME_FILE must have file permissions 400 or 600, but actually has: $FILE_PERMS" >&2 - fi - exit 1 + if [[ -L "${!VAR_NAME_FILE}" ]]; then + echo "ERROR: File $(readlink "${!VAR_NAME_FILE}") (target of symlink ${!VAR_NAME_FILE} from $VAR_NAME_FILE) must have file permissions 400 or 600, but actually has: $FILE_PERMS" >&2 + else + echo "ERROR: File ${!VAR_NAME_FILE} from $VAR_NAME_FILE must have file permissions 400 or 600, but actually has: $FILE_PERMS" >&2 + fi + exit 1 fi echo "Setting $VAR_NAME from $VAR_NAME_FILE at ${!VAR_NAME_FILE}" >&2 @@ -43,4 +68,3 @@ for VAR_NAME_FILE in $(env | cut -f1 -d= | grep '_FILE$'); do unset "$VAR_NAME_FILE" fi done - diff --git a/docs/reference/setup/install/docker.asciidoc b/docs/reference/setup/install/docker.asciidoc index 537dec2904070..b50b10f18a8ca 100644 --- a/docs/reference/setup/install/docker.asciidoc +++ b/docs/reference/setup/install/docker.asciidoc @@ -87,9 +87,9 @@ endif::[] This sample Docker Compose file brings up a three-node {es} cluster. Node `es01` listens on `localhost:9200` and `es02` and `es03` talk to `es01` over a Docker network. -Please note that this configuration exposes port 9200 on all network interfaces, and given how -Docker manipulates `iptables` on Linux, this means that your {es} cluster is publically accessible, -potentially ignoring any firewall settings. If you don't want to expose port 9200 and instead use +Please note that this configuration exposes port 9200 on all network interfaces, and given how +Docker manipulates `iptables` on Linux, this means that your {es} cluster is publicly accessible, +potentially ignoring any firewall settings. If you don't want to expose port 9200 and instead use a reverse proxy, replace `9200:9200` with `127.0.0.1:9200:9200` in the docker-compose.yml file. {es} will then only be accessible from the host machine itself. @@ -221,12 +221,6 @@ chmod g+rwx esdatadir chgrp 0 esdatadir -------------------------------------------- -As a last resort, you can force the container to mutate the ownership of -any bind-mounts used for the <> through the -environment variable `TAKE_FILE_OWNERSHIP`. When you do this, they will be owned by -uid:gid `1000:0`, which provides the required read/write access to the {es} process. - - ===== Increase ulimits for nofile and nproc Increased ulimits for <> and <> diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/ByteBufUtils.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/ByteBufUtils.java index 78bc8f3728209..0408290b039d9 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/ByteBufUtils.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/ByteBufUtils.java @@ -24,6 +24,7 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefIterator; import org.elasticsearch.common.bytes.AbstractBytesReference; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; @@ -70,7 +71,8 @@ static ByteBuf toByteBuf(final BytesReference reference) { } static BytesReference toBytesReference(final ByteBuf buffer) { - return new ByteBufBytesReference(buffer, buffer.readableBytes()); + final int readableBytes = buffer.readableBytes(); + return readableBytes == 0 ? BytesArray.EMPTY : new ByteBufBytesReference(buffer, readableBytes); } private static class ByteBufBytesReference extends AbstractBytesReference { diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java index 736e89c32dd56..26205e3f36943 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java @@ -19,10 +19,8 @@ package org.elasticsearch.http.nio; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.handler.codec.ByteToMessageDecoder; -import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpContentCompressor; import io.netty.handler.codec.http.HttpContentDecompressor; @@ -158,32 +156,25 @@ public void close() throws IOException { private void handleRequest(Object msg) { final HttpPipelinedRequest pipelinedRequest = (HttpPipelinedRequest) msg; FullHttpRequest request = pipelinedRequest.getRequest(); - - final FullHttpRequest copiedRequest; + boolean success = false; + NioHttpRequest httpRequest = new NioHttpRequest(request, pipelinedRequest.getSequence()); try { - copiedRequest = new DefaultFullHttpRequest( - request.protocolVersion(), - request.method(), - request.uri(), - Unpooled.copiedBuffer(request.content()), - request.headers(), - request.trailingHeaders()); - } finally { - // As we have copied the buffer, we can release the request - request.release(); - } - NioHttpRequest httpRequest = new NioHttpRequest(copiedRequest, pipelinedRequest.getSequence()); - - if (request.decoderResult().isFailure()) { - Throwable cause = request.decoderResult().cause(); - if (cause instanceof Error) { - ExceptionsHelper.maybeDieOnAnotherThread(cause); - transport.incomingRequestError(httpRequest, nioHttpChannel, new Exception(cause)); + if (request.decoderResult().isFailure()) { + Throwable cause = request.decoderResult().cause(); + if (cause instanceof Error) { + ExceptionsHelper.maybeDieOnAnotherThread(cause); + transport.incomingRequestError(httpRequest, nioHttpChannel, new Exception(cause)); + } else { + transport.incomingRequestError(httpRequest, nioHttpChannel, (Exception) cause); + } } else { - transport.incomingRequestError(httpRequest, nioHttpChannel, (Exception) cause); + transport.incomingRequest(httpRequest, nioHttpChannel); + } + success = true; + } finally { + if (success == false) { + request.release(); } - } else { - transport.incomingRequest(httpRequest, nioHttpChannel); } } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java index 8e17d37699cdd..442ae42f0b932 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java @@ -19,6 +19,8 @@ package org.elasticsearch.http.nio; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.FullHttpRequest; @@ -28,7 +30,6 @@ import io.netty.handler.codec.http.cookie.Cookie; import io.netty.handler.codec.http.cookie.ServerCookieDecoder; import io.netty.handler.codec.http.cookie.ServerCookieEncoder; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.http.HttpRequest; import org.elasticsearch.rest.RestRequest; @@ -40,6 +41,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; public class NioHttpRequest implements HttpRequest { @@ -48,16 +50,22 @@ public class NioHttpRequest implements HttpRequest { private final BytesReference content; private final HttpHeadersMap headers; private final int sequence; + private final AtomicBoolean released; + private final boolean pooled; NioHttpRequest(FullHttpRequest request, int sequence) { + this(request, new HttpHeadersMap(request.headers()), sequence, new AtomicBoolean(false), true, + ByteBufUtils.toBytesReference(request.content())); + } + + private NioHttpRequest(FullHttpRequest request, HttpHeadersMap headers, int sequence, AtomicBoolean released, boolean pooled, + BytesReference content) { this.request = request; - headers = new HttpHeadersMap(request.headers()); this.sequence = sequence; - if (request.content().isReadable()) { - this.content = ByteBufUtils.toBytesReference(request.content()); - } else { - this.content = BytesArray.EMPTY; - } + this.headers = headers; + this.content = content; + this.pooled = pooled; + this.released = released; } @Override @@ -105,17 +113,32 @@ public String uri() { @Override public BytesReference content() { + assert released.get() == false; return content; } @Override public void release() { - // NioHttpRequest works from copied unpooled bytes no need to release anything + if (pooled && released.compareAndSet(false, true)) { + request.release(); + } } @Override public HttpRequest releaseAndCopy() { - return this; + assert released.get() == false; + if (pooled == false) { + return this; + } + try { + final ByteBuf copiedContent = Unpooled.copiedBuffer(request.content()); + return new NioHttpRequest( + new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), copiedContent, request.headers(), + request.trailingHeaders()), + headers, sequence, new AtomicBoolean(false), false, ByteBufUtils.toBytesReference(copiedContent)); + } finally { + release(); + } } @Override @@ -156,7 +179,8 @@ public HttpRequest removeHeader(String header) { trailingHeaders.remove(header); FullHttpRequest requestWithoutHeader = new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), request.content(), headersWithoutContentTypeHeader, trailingHeaders); - return new NioHttpRequest(requestWithoutHeader, sequence); + return new NioHttpRequest(requestWithoutHeader, new HttpHeadersMap(requestWithoutHeader.headers()), sequence, released, + pooled, content); } @Override diff --git a/qa/os/src/test/java/org/elasticsearch/packaging/test/DockerTests.java b/qa/os/src/test/java/org/elasticsearch/packaging/test/DockerTests.java index 42e4262e4b947..4d0ecb0c83355 100644 --- a/qa/os/src/test/java/org/elasticsearch/packaging/test/DockerTests.java +++ b/qa/os/src/test/java/org/elasticsearch/packaging/test/DockerTests.java @@ -651,21 +651,42 @@ public void test120DockerLogsIncludeElasticsearchLogs() throws Exception { } /** - * Check that the Java process running inside the container has the expect PID, UID and username. + * Check that the Java process running inside the container has the expected UID, GID and username. */ - public void test130JavaHasCorrectPidAndOwnership() { - final List processes = sh.run("ps -o pid,uid,user -C java").stdout.lines().skip(1).collect(Collectors.toList()); + public void test130JavaHasCorrectOwnership() { + final List processes = sh.run("ps -o uid,gid,user -C java").stdout.lines().skip(1).collect(Collectors.toList()); assertThat("Expected a single java process", processes, hasSize(1)); final String[] fields = processes.get(0).trim().split("\\s+"); assertThat(fields, arrayWithSize(3)); + assertThat("Incorrect UID", fields[0], equalTo("1000")); + assertThat("Incorrect GID", fields[1], equalTo("0")); + assertThat("Incorrect username", fields[2], equalTo("elasticsearch")); + } + + /** + * Check that the init process running inside the container has the expected PID, UID, GID and user. + * The PID is particularly important because PID 1 handles signal forwarding and child reaping. + */ + public void test131InitProcessHasCorrectPID() { + final List processes = sh.run("ps -o pid,uid,gid,user -p 1").stdout.lines().skip(1).collect(Collectors.toList()); + + assertThat("Expected a single process", processes, hasSize(1)); + + final String[] fields = processes.get(0).trim().split("\\s+"); + + assertThat(fields, arrayWithSize(4)); assertThat("Incorrect PID", fields[0], equalTo("1")); assertThat("Incorrect UID", fields[1], equalTo("1000")); - assertThat("Incorrect username", fields[2], equalTo("elasticsearch")); + assertThat("Incorrect GID", fields[2], equalTo("0")); + assertThat("Incorrect username", fields[3], equalTo("elasticsearch")); } + /** + * Check that Elasticsearch reports per-node cgroup information. + */ public void test140CgroupOsStatsAreAvailable() throws Exception { waitForElasticsearch(installation); diff --git a/qa/os/src/test/java/org/elasticsearch/packaging/util/Docker.java b/qa/os/src/test/java/org/elasticsearch/packaging/util/Docker.java index 8cbe7a7c6a987..6040225a05194 100644 --- a/qa/os/src/test/java/org/elasticsearch/packaging/util/Docker.java +++ b/qa/os/src/test/java/org/elasticsearch/packaging/util/Docker.java @@ -26,6 +26,7 @@ import org.apache.http.client.fluent.Request; import org.elasticsearch.common.CheckedRunnable; +import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.attribute.PosixFileAttributes; @@ -152,9 +153,19 @@ private static void executeDockerRun(Distribution distribution, Map // Bind-mount any volumes if (volumes != null) { - volumes.forEach((localPath, containerPath) -> args.add("--volume \"" + localPath + ":" + containerPath + "\"")); + volumes.forEach((localPath, containerPath) -> { + assertTrue(localPath + " doesn't exist", Files.exists(localPath)); + + if (Platforms.WINDOWS == false && System.getProperty("user.name").equals("root")) { + // The tests are running as root, but the process in the Docker container runs as `elasticsearch` (UID 1000), + // so we need to ensure that the container process is able to read the bind-mounted files. + sh.run("chown -R 1000:0 " + localPath); + } + args.add("--volume \"" + localPath + ":" + containerPath + "\""); + }); } + // Image name args.add(distribution.flavor.name + ":test"); final String command = String.join(" ", args); diff --git a/qa/wildfly/build.gradle b/qa/wildfly/build.gradle index 4c62d69f180a8..42338aed8b932 100644 --- a/qa/wildfly/build.gradle +++ b/qa/wildfly/build.gradle @@ -72,7 +72,7 @@ dependencies { compile "com.fasterxml.jackson.module:jackson-module-jaxb-annotations:${versions.jackson}" compile "org.apache.logging.log4j:log4j-api:${versions.log4j}" compile "org.apache.logging.log4j:log4j-core:${versions.log4j}" - compile project(':client:rest-high-level') + compile project(path: ':client:rest-high-level', configuration: 'shadow') wildfly "org.jboss:wildfly:${wildflyVersion}@zip" testCompile project(':test:framework') } diff --git a/qa/wildfly/src/test/java/org/elasticsearch/wildfly/WildflyIT.java b/qa/wildfly/src/test/java/org/elasticsearch/wildfly/WildflyIT.java index 9ac33105ed43f..16e0227f270de 100644 --- a/qa/wildfly/src/test/java/org/elasticsearch/wildfly/WildflyIT.java +++ b/qa/wildfly/src/test/java/org/elasticsearch/wildfly/WildflyIT.java @@ -56,7 +56,6 @@ public class WildflyIT extends LuceneTestCase { private Logger logger = LogManager.getLogger(WildflyIT.class); - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/45625") public void testRestClient() throws URISyntaxException, IOException { try (CloseableHttpClient client = HttpClientBuilder.create().build()) { final String str = String.format( diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index 7ddecd8b2fdb3..8438fdda96dc5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -140,6 +140,12 @@ public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, Sta useShardGenerations); } + public Entry(Entry entry, State state, List indices, long repositoryStateId, + ImmutableOpenMap shards, boolean useShardGenerations, String failure) { + this(entry.snapshot, entry.includeGlobalState, entry.partial, state, indices, entry.startTime, repositoryStateId, shards, + failure, entry.userMetadata, useShardGenerations); + } + public Entry(Entry entry, State state, ImmutableOpenMap shards) { this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime, entry.repositoryStateId, shards, entry.failure, entry.userMetadata, entry.useShardGenerations); diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index f8ea133c0ed63..fcf0c0fe4e61b 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -65,6 +65,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.discovery.DiscoveryModule; +import org.elasticsearch.discovery.HandshakingTransportAddressConnector; import org.elasticsearch.discovery.PeerFinder; import org.elasticsearch.discovery.SeedHostsResolver; import org.elasticsearch.discovery.SettingsBasedSeedHostsProvider; @@ -466,7 +467,9 @@ public void apply(Settings value, Settings current, Settings previous) { TransportAddVotingConfigExclusionsAction.MAXIMUM_VOTING_CONFIG_EXCLUSIONS_SETTING, ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING, ClusterBootstrapService.UNCONFIGURED_BOOTSTRAP_TIMEOUT_SETTING, - LagDetector.CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING); + LagDetector.CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING, + HandshakingTransportAddressConnector.PROBE_CONNECT_TIMEOUT_SETTING, + HandshakingTransportAddressConnector.PROBE_HANDSHAKE_TIMEOUT_SETTING); static List> BUILT_IN_SETTING_UPGRADERS = Collections.emptyList(); diff --git a/server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java b/server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java index 2f111f003ed84..727729b021b4b 100644 --- a/server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java +++ b/server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java @@ -74,7 +74,8 @@ public void connectToRemoteMasterNode(TransportAddress transportAddress, ActionL @Override protected void doRun() { - // TODO if transportService is already connected to this address then skip the handshaking + // We could skip this if the transportService were already connected to the given address, but the savings would be minimal + // so we open a new connection anyway. final DiscoveryNode targetNode = new DiscoveryNode("", transportAddress.toString(), UUIDs.randomBase64UUID(Randomness.get()), // generated deterministically for reproducible tests @@ -99,17 +100,30 @@ protected void innerOnResponse(DiscoveryNode remoteNode) { IOUtils.closeWhileHandlingException(connection); if (remoteNode.equals(transportService.getLocalNode())) { - // TODO cache this result for some time? forever? listener.onFailure(new ConnectTransportException(remoteNode, "local node found")); } else if (remoteNode.isMasterNode() == false) { - // TODO cache this result for some time? listener.onFailure(new ConnectTransportException(remoteNode, "non-master-eligible node found")); } else { - transportService.connectToNode(remoteNode, ActionListener.delegateFailure(listener, - (l, ignored) -> { - logger.trace("[{}] full connection successful: {}", thisConnectionAttempt, remoteNode); + transportService.connectToNode(remoteNode, new ActionListener<>() { + @Override + public void onResponse(Void ignored) { + logger.trace("[{}] completed full connection with [{}]", thisConnectionAttempt, remoteNode); listener.onResponse(remoteNode); - })); + } + + @Override + public void onFailure(Exception e) { + // we opened a connection and successfully performed a handshake, so we're definitely + // talking to a master-eligible node with a matching cluster name and a good version, but + // the attempt to open a full connection to its publish address failed; a common reason is + // that the remote node is listening on 0.0.0.0 but has made an inappropriate choice for its + // publish address. + logger.warn(new ParameterizedMessage( + "[{}] completed handshake with [{}] but followup connection failed", + thisConnectionAttempt, remoteNode), e); + listener.onFailure(e); + } + }); } } catch (Exception e) { listener.onFailure(e); diff --git a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java index ea88c2aca0d17..06520d3036c31 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java @@ -54,7 +54,8 @@ public NoOpEngine(EngineConfig config) { super(config, null, null, true, Function.identity()); this.stats = new SegmentsStats(); Directory directory = store.directory(); - try (DirectoryReader reader = openDirectory(directory)) { + // Do not wrap soft-deletes reader when calculating segment stats as the wrapper might filter out fully deleted segments. + try (DirectoryReader reader = openDirectory(directory, false)) { for (LeafReaderContext ctx : reader.getContext().leaves()) { SegmentReader segmentReader = Lucene.segmentReader(ctx.reader()); fillSegmentStats(segmentReader, true, stats); diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 9e8e07dc815a1..2f11ab8b35dbc 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -516,7 +516,12 @@ public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) { maxSeqNoOfUpdatesOnPrimary + ">" + getMaxSeqNoOfUpdatesOrDeletes(); } - protected static DirectoryReader openDirectory(Directory dir) throws IOException { - return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(dir, OFF_HEAP_READER_ATTRIBUTES), Lucene.SOFT_DELETES_FIELD); + protected static DirectoryReader openDirectory(Directory directory, boolean wrapSoftDeletes) throws IOException { + final DirectoryReader reader = DirectoryReader.open(directory, OFF_HEAP_READER_ATTRIBUTES); + if (wrapSoftDeletes) { + return new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD); + } else { + return reader; + } } } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 45967235e8086..9762beac4eff7 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -107,7 +107,7 @@ *
  • On the master node the {@link #createSnapshot(CreateSnapshotRequest, ActionListener)} is called and makes sure that * no snapshot is currently running and registers the new snapshot in cluster state
  • *
  • When cluster state is updated - * the {@link #beginSnapshot(ClusterState, SnapshotsInProgress.Entry, boolean, ActionListener)} method kicks in and initializes + * the {@link #beginSnapshot} method kicks in and initializes * the snapshot in the repository and then populates list of shards that needs to be snapshotted in cluster state
  • *
  • Each data node is watching for these shards and when new shards scheduled for snapshotting appear in the cluster state, data nodes * start processing them through {@link SnapshotShardsService#startNewSnapshots} method
  • @@ -268,90 +268,85 @@ public void createSnapshot(final CreateSnapshotRequest request, final ActionList final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot()); validate(repositoryName, snapshotName); final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot - final StepListener repositoryDataListener = new StepListener<>(); - repositoriesService.repository(repositoryName).getRepositoryData(repositoryDataListener); - repositoryDataListener.whenComplete(repositoryData -> { - final boolean hasOldFormatSnapshots = hasOldVersionSnapshots(repositoryName, repositoryData, null); - clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() { - - private SnapshotsInProgress.Entry newSnapshot = null; - - @Override - public ClusterState execute(ClusterState currentState) { - validate(repositoryName, snapshotName, currentState); - SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); - if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { - throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, - "cannot snapshot while a snapshot deletion is in-progress in [" + deletionsInProgress + "]"); - } - final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE); - if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) { - throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, - "cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]"); - } - SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); - if (snapshots == null || snapshots.entries().isEmpty()) { - // Store newSnapshot here to be processed in clusterStateProcessed - List indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, - request.indicesOptions(), request.indices())); - logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices); - List snapshotIndices = repositoryData.resolveNewIndices(indices); - newSnapshot = new SnapshotsInProgress.Entry( - new Snapshot(repositoryName, snapshotId), - request.includeGlobalState(), request.partial(), - State.INIT, - snapshotIndices, - threadPool.absoluteTimeInMillis(), - repositoryData.getGenId(), - null, - request.userMetadata(), - hasOldFormatSnapshots == false && - clusterService.state().nodes().getMinNodeVersion().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION)); - initializingSnapshots.add(newSnapshot.snapshot()); - snapshots = new SnapshotsInProgress(newSnapshot); - } else { - throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running"); - } - return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build(); + clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() { + + private SnapshotsInProgress.Entry newSnapshot = null; + + private List indices; + + @Override + public ClusterState execute(ClusterState currentState) { + validate(repositoryName, snapshotName, currentState); + SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); + if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { + throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, + "cannot snapshot while a snapshot deletion is in-progress in [" + deletionsInProgress + "]"); } + final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE); + if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) { + throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, + "cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]"); + } + SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); + if (snapshots != null && snapshots.entries().isEmpty() == false) { + throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running"); + } + // Store newSnapshot here to be processed in clusterStateProcessed + indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, + request.indicesOptions(), request.indices())); + logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices); + newSnapshot = new SnapshotsInProgress.Entry( + new Snapshot(repositoryName, snapshotId), + request.includeGlobalState(), request.partial(), + State.INIT, + Collections.emptyList(), // We'll resolve the list of indices when moving to the STARTED state in #beginSnapshot + threadPool.absoluteTimeInMillis(), + RepositoryData.UNKNOWN_REPO_GEN, + null, + request.userMetadata(), false + ); + initializingSnapshots.add(newSnapshot.snapshot()); + snapshots = new SnapshotsInProgress(newSnapshot); + return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build(); + } - @Override - public void onFailure(String source, Exception e) { - logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot", repositoryName, snapshotName), e); - if (newSnapshot != null) { - initializingSnapshots.remove(newSnapshot.snapshot()); - } - newSnapshot = null; - listener.onFailure(e); + @Override + public void onFailure(String source, Exception e) { + logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot", repositoryName, snapshotName), e); + if (newSnapshot != null) { + initializingSnapshots.remove(newSnapshot.snapshot()); } + newSnapshot = null; + listener.onFailure(e); + } - @Override - public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) { - if (newSnapshot != null) { - final Snapshot current = newSnapshot.snapshot(); - assert initializingSnapshots.contains(current); - beginSnapshot(newState, newSnapshot, request.partial(), new ActionListener<>() { - @Override - public void onResponse(final Snapshot snapshot) { - initializingSnapshots.remove(snapshot); - listener.onResponse(snapshot); - } + @Override + public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) { + if (newSnapshot != null) { + final Snapshot current = newSnapshot.snapshot(); + assert initializingSnapshots.contains(current); + assert indices != null; + beginSnapshot(newState, newSnapshot, request.partial(), indices, new ActionListener<>() { + @Override + public void onResponse(final Snapshot snapshot) { + initializingSnapshots.remove(snapshot); + listener.onResponse(snapshot); + } - @Override - public void onFailure(final Exception e) { - initializingSnapshots.remove(current); - listener.onFailure(e); - } - }); - } + @Override + public void onFailure(final Exception e) { + initializingSnapshots.remove(current); + listener.onFailure(e); + } + }); } + } - @Override - public TimeValue timeout() { - return request.masterNodeTimeout(); - } - }); - }, listener::onFailure); + @Override + public TimeValue timeout() { + return request.masterNodeTimeout(); + } + }); } public boolean hasOldVersionSnapshots(String repositoryName, RepositoryData repositoryData, @Nullable SnapshotId excluded) { @@ -436,6 +431,7 @@ private static void validate(final String repositoryName, final String snapshotN private void beginSnapshot(final ClusterState clusterState, final SnapshotsInProgress.Entry snapshot, final boolean partial, + final List indices, final ActionListener userCreateSnapshotListener) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() { @@ -460,13 +456,20 @@ protected void doRun() { throw new InvalidSnapshotNameException( repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists"); } + snapshotCreated = true; logger.info("snapshot [{}] started", snapshot.snapshot()); - if (snapshot.indices().isEmpty()) { + final boolean hasOldFormatSnapshots = + hasOldVersionSnapshots(snapshot.snapshot().getRepository(), repositoryData, null); + final boolean writeShardGenerations = hasOldFormatSnapshots == false && + clusterService.state().nodes().getMinNodeVersion().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION); + if (indices.isEmpty()) { // No indices in this snapshot - we are done userCreateSnapshotListener.onResponse(snapshot.snapshot()); - endSnapshot(snapshot, clusterState.metaData()); + endSnapshot(new SnapshotsInProgress.Entry( + snapshot, State.STARTED, Collections.emptyList(), repositoryData.getGenId(), null, writeShardGenerations, + null), clusterState.metaData()); return; } clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() { @@ -486,8 +489,10 @@ public ClusterState execute(ClusterState currentState) { assert entry.shards().isEmpty(); hadAbortedInitializations = true; } else { + final List indexIds = repositoryData.resolveNewIndices(indices); // Replace the snapshot that was just initialized - ImmutableOpenMap shards = shards(currentState, entry, repositoryData); + ImmutableOpenMap shards = + shards(currentState, indexIds, writeShardGenerations, repositoryData); if (!partial) { Tuple, Set> indicesWithMissingShards = indicesWithMissingShards(shards, currentState.metaData()); @@ -506,12 +511,13 @@ public ClusterState execute(ClusterState currentState) { failureMessage.append("Indices are closed "); failureMessage.append(closed); } - entries.add( - new SnapshotsInProgress.Entry(entry, State.FAILED, shards, failureMessage.toString())); + entries.add(new SnapshotsInProgress.Entry(entry, State.FAILED, indexIds, + repositoryData.getGenId(), shards, writeShardGenerations, failureMessage.toString())); continue; } } - entries.add(new SnapshotsInProgress.Entry(entry, State.STARTED, shards)); + entries.add(new SnapshotsInProgress.Entry(entry, State.STARTED, indexIds, repositoryData.getGenId(), + shards, writeShardGenerations, null)); } } return ClusterState.builder(currentState) @@ -1493,17 +1499,19 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS /** * Calculates the list of shards that should be included into the current snapshot * - * @param clusterState cluster state - * @param snapshot SnapshotsInProgress Entry + * @param clusterState cluster state + * @param indices Indices to snapshot + * @param useShardGenerations whether to write {@link ShardGenerations} during the snapshot * @return list of shard to be included into current snapshot */ private static ImmutableOpenMap shards(ClusterState clusterState, - SnapshotsInProgress.Entry snapshot, + List indices, + boolean useShardGenerations, RepositoryData repositoryData) { ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); MetaData metaData = clusterState.metaData(); final ShardGenerations shardGenerations = repositoryData.shardGenerations(); - for (IndexId index : snapshot.indices()) { + for (IndexId index : indices) { final String indexName = index.getName(); final boolean isNewIndex = repositoryData.getIndices().containsKey(indexName) == false; IndexMetaData indexMetaData = metaData.index(indexName); @@ -1516,7 +1524,7 @@ private static ImmutableOpenMap