Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for bwc for testclusters and convert full cluster restart #45374

Merged
merged 12 commits into from
Aug 16, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.elasticsearch.gradle.testclusters;

import org.elasticsearch.gradle.ElasticsearchDistribution;
import org.elasticsearch.gradle.FileSupplier;
import org.elasticsearch.gradle.PropertyNormalization;
import org.elasticsearch.gradle.ReaperService;
Expand Down Expand Up @@ -59,24 +58,23 @@ public class ElasticsearchCluster implements TestClusterConfiguration, Named {
private final String clusterName;
private final NamedDomainObjectContainer<ElasticsearchNode> nodes;
private final File workingDirBase;
private final Function<Integer, ElasticsearchDistribution> distributionFactory;
private final LinkedHashMap<String, Predicate<TestClusterConfiguration>> waitConditions = new LinkedHashMap<>();
private final Project project;
private final ReaperService reaper;
private int nodeIndex = 0;

public ElasticsearchCluster(String path, String clusterName, Project project, ReaperService reaper,
Function<Integer, ElasticsearchDistribution> distributionFactory, File workingDirBase) {
public ElasticsearchCluster(String path, String clusterName, Project project,
ReaperService reaper, File workingDirBase) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should try and keep the distribution factory pattern here rather than have the ElasticsearchNode directly call into the DistributionContainer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you expand on the advantages you see here ? To me it seemed like indirection that makes things harder to understand. We have proffered explicit coupling elsewhere.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid this kind of coupling when at all possible. That's the reason for this factory pattern here initially. I think it's actually easier to understand with that factory logic living in the plugin. It removes some noise from the ElasticsearchNode class which is already getting quite complex and should realistically be broken up at some point.

this.path = path;
this.clusterName = clusterName;
this.project = project;
this.reaper = reaper;
this.distributionFactory = distributionFactory;
this.workingDirBase = workingDirBase;
this.nodes = project.container(ElasticsearchNode.class);
this.nodes.add(
new ElasticsearchNode(
path, clusterName + "-0",
project, reaper, workingDirBase, distributionFactory.apply(0)
project, reaper, workingDirBase
)
);
// configure the cluster name eagerly so nodes know about it
Expand All @@ -100,7 +98,7 @@ public void setNumberOfNodes(int numberOfNodes) {

for (int i = nodes.size() ; i < numberOfNodes; i++) {
this.nodes.add(new ElasticsearchNode(
path, clusterName + "-" + i, project, reaper, workingDirBase, distributionFactory.apply(i)
path, clusterName + "-" + i, project, reaper, workingDirBase
));
}
}
Expand All @@ -126,6 +124,11 @@ public void setVersion(String version) {
nodes.all(each -> each.setVersion(version));
}

@Override
public void setVersion(List<String> version) {
nodes.all(each -> each.setVersion(version));
}

@Override
public void setTestDistribution(TestDistribution distribution) {
nodes.all(each -> each.setTestDistribution(distribution));
Expand Down Expand Up @@ -249,8 +252,8 @@ public void start() {
if (nodes.stream().map(ElasticsearchNode::getName).anyMatch( name -> name == null)) {
nodeNames = null;
} else {
nodeNames = nodes.stream().map(ElasticsearchNode::getName).collect(Collectors.joining(","));
};
nodeNames = nodes.stream().map(ElasticsearchNode::getName).map(this::safeName).collect(Collectors.joining(","));
}
for (ElasticsearchNode node : nodes) {
if (nodeNames != null) {
// Can only configure master nodes if we have node names defined
Expand All @@ -269,6 +272,19 @@ public void restart() {
nodes.forEach(ElasticsearchNode::restart);
}

@Override
public void goToNextVersion() {
nodes.all(ElasticsearchNode::goToNextVersion);
}

public void nextNodeToNextVersion() {
if (nodeIndex + 1 > nodes.size()) {
throw new TestClustersException("Ran out of nodes to take to the next version");
}
nodes.getByName(clusterName + "-" + nodeIndex).goToNextVersion();
nodeIndex += 1;
}

@Override
public void extraConfigFile(String destination, File from) {
nodes.all(node -> node.extraConfigFile(destination, from));
Expand Down Expand Up @@ -363,7 +379,6 @@ private void addWaitForClusterHealth() {
nodes.size()
);
if (httpSslEnabled) {

getFirstNode().configureHttpWait(wait);
}
List<Map<String, String>> credentials = getFirstNode().getCredentials();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.gradle.testclusters;

import org.elasticsearch.gradle.DistributionDownloadPlugin;
import org.elasticsearch.gradle.ElasticsearchDistribution;
import org.elasticsearch.gradle.FileSupplier;
import org.elasticsearch.gradle.LazyPropertyList;
Expand All @@ -31,6 +32,7 @@
import org.elasticsearch.gradle.http.WaitForHttpResource;
import org.gradle.api.Action;
import org.gradle.api.Named;
import org.gradle.api.NamedDomainObjectContainer;
import org.gradle.api.Project;
import org.gradle.api.file.FileCollection;
import org.gradle.api.logging.Logger;
Expand Down Expand Up @@ -135,23 +137,23 @@ public class ElasticsearchNode implements TestClusterConfiguration {
private final Path esStdoutFile;
private final Path esStderrFile;
private final Path tmpDir;
private final Path distroDir;

private String version;
private int currentDistro = 0;
private TestDistribution testDistribution;
private ElasticsearchDistribution distribution;
private List<ElasticsearchDistribution> distributions = new ArrayList<>();
private File javaHome;
private volatile Process esProcess;
private Function<String, String> nameCustomization = Function.identity();
private boolean isWorkingDirConfigured = false;

ElasticsearchNode(String path, String name, Project project, ReaperService reaper, File workingDirBase,
ElasticsearchDistribution distribution) {
ElasticsearchNode(String path, String name, Project project, ReaperService reaper, File workingDirBase) {
this.path = path;
this.name = name;
this.project = project;
this.reaper = reaper;
this.workingDir = workingDirBase.toPath().resolve(safeName(name)).toAbsolutePath();
this.distribution = distribution;
workingDir = workingDirBase.toPath().resolve(safeName(name)).toAbsolutePath();
distroDir = workingDir.resolve("distro");
confPathRepo = workingDir.resolve("repo");
configFile = workingDir.resolve("config/elasticsearch.yml");
confPathData = workingDir.resolve("data");
Expand All @@ -164,7 +166,6 @@ public class ElasticsearchNode implements TestClusterConfiguration {
waitConditions.put("ports files", this::checkPortsFilesExistWithDelay);

setTestDistribution(TestDistribution.INTEG_TEST);
setVersion(VersionProperties.getElasticsearch());
}

public String getName() {
Expand All @@ -173,15 +174,29 @@ public String getName() {

@Internal
public Version getVersion() {
return distribution.getVersion();
return distributions.get(currentDistro).getVersion();
}

@Override
public void setVersion(String version) {
requireNonNull(version, "null version passed when configuring test cluster `" + this + "`");
setVersion(Collections.singletonList(version));
}

@Override
public void setVersion(List<String> versions) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We really don't have a need to go through multiple versions. The upgrade tests start with one version, and go to another. This seems far more complicated to me than we had before, where each step of the upgrade tests was a different cluster. IIRC the tricky part there was hooking up finalizers to shut down certain nodes between phases of the tests, but given that testclusters has far better control over when nodes start and stop, why couldn't we stick with that pattern?

Copy link
Contributor Author

@alpar-t alpar-t Aug 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm completely open to suggestion here, that's why I stopped at converting one project. Here's my reasoning for going down this path.

The reason we can't just call setVersion from a doFirst is that all versions need to be known at configuration time.

I wanted to have a way to support the upgrade in the framework to have less repetition in the projects that implement bwc testing.
The alternative would be to have multiple cluster definitions and set the data paths ( which we would have to allow, we don't currently ).
For rolling restart we would have to come up with a way to tell testclusters not to start the cluster for the task, as the task would want to have fine grained control and start the upgraded cluster node-by-node, I really dislike having to add that.

I would also prefer we don't give node level control to build scripts as it would make implementing the tests more imperative, but descriptive tests are easier to understand.

Another use-case that was brought up and plays into this is testing with evolving configurations in a mixed cluster. e.x. starting with some nodes that have ML disabled, run some tests, enable them, run some more.
In addition we already have some tests that alter the configuration of the cluster ( e.x. enabling security ) that currently just call the same configuration DSL and restart the cluster, but I'm not happy with them.

The initial thinking with the freeze() call was that all configuration would happen at Gradle's configuration time, thus the versions and all the config should be set at Gradle's configuration time and frozen during execution time.

The setVersion(List) is probably not the best way to express this, but I really think we should keep the paradigm of configuring both the initial and all future states of the cluster and then have a DSL that would move between the states.
I see setVersion(List) as a shortcut to get there and start running bwc with --parallel and get rid of all the timeout related test failures in CI before we go and refactor to separate out configuration from the implementation of the actions, so we can have something like named states we could go to, so we wouldn't need setVersions but instead it would look more like configuring multiple clusters with configuration inheritance, except it won't be multiple clusters but different state of the same cluster.
e.x.

testClusters {
    "bwcTestv${bwcVersion}" {
           version  = bwcVersion
           alternativeConfigurations {
               upgraded {
                   version = project.version
              }
           }
    }
}

upgradeTest.doFirst {
 testClusters."bwcTestv${bwcVersion}".alternativeConfiguration("upgraded")
}

We can then also add a way to optionally configure a subset of nodes differently withing this DSL. Something like:

 testClusters {
       numberOfNodes = 3
       setting "foo", "false"
       nodes(1/3) {
             setting "foo", "true"
       }
 }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed that we will merge this PR as is and discuss DSL in a follow up.

requireNonNull(versions, "null version list passed when configuring test cluster `" + this + "`");
checkFrozen();
this.version = version;
this.distribution.setVersion(version);
for (String version : versions) {
String distroName = "testclusters" + path.replace(":", "-") + "-" + this.name + "-" + version + "-";
NamedDomainObjectContainer<ElasticsearchDistribution> container = DistributionDownloadPlugin.getContainer(project);
if (container.findByName(distroName) == null){
container.create(distroName);
}
ElasticsearchDistribution distro = container.getByName(distroName);
distro.setVersion(version);
distributions.add(distro);
}
}

@Internal
Expand All @@ -191,23 +206,25 @@ public TestDistribution getTestDistribution() {

// package private just so test clusters plugin can access to wire up task dependencies
@Internal
ElasticsearchDistribution getDistribution() {
return distribution;
List<ElasticsearchDistribution> getDistributions() {
return distributions;
}

@Override
public void setTestDistribution(TestDistribution testDistribution) {
requireNonNull(testDistribution, "null distribution passed when configuring test cluster `" + this + "`");
checkFrozen();
this.testDistribution = testDistribution;
if (testDistribution == TestDistribution.INTEG_TEST) {
this.distribution.setType(ElasticsearchDistribution.Type.INTEG_TEST_ZIP);
} else {
this.distribution.setType(ElasticsearchDistribution.Type.ARCHIVE);
if (testDistribution == TestDistribution.DEFAULT) {
this.distribution.setFlavor(ElasticsearchDistribution.Flavor.DEFAULT);
for (ElasticsearchDistribution distribution : distributions) {
if (testDistribution == TestDistribution.INTEG_TEST) {
distribution.setType(ElasticsearchDistribution.Type.INTEG_TEST_ZIP);
} else {
this.distribution.setFlavor(ElasticsearchDistribution.Flavor.OSS);
distribution.setType(ElasticsearchDistribution.Type.ARCHIVE);
if (testDistribution == TestDistribution.DEFAULT) {
distribution.setFlavor(ElasticsearchDistribution.Flavor.DEFAULT);
} else {
distribution.setFlavor(ElasticsearchDistribution.Flavor.OSS);
}
}
}
}
Expand Down Expand Up @@ -317,9 +334,11 @@ public Path getConfigDir() {

@Override
public void freeze() {
requireNonNull(distribution, "null distribution passed when configuring test cluster `" + this + "`");
requireNonNull(getVersion(), "null version passed when configuring test cluster `" + this + "`");
requireNonNull(distributions, "null distribution passed when configuring test cluster `" + this + "`");
requireNonNull(javaHome, "null javaHome passed when configuring test cluster `" + this + "`");
if (distributions.isEmpty()) {
setVersion(VersionProperties.getElasticsearch());
}
LOGGER.info("Locking configuration of `{}`", this);
configurationFrozen.set(true);
}
Expand Down Expand Up @@ -361,10 +380,13 @@ public synchronized void start() {
try {
if (isWorkingDirConfigured == false) {
logToProcessStdout("Configuring working directory: " + workingDir);
// Only configure working dir once so we don't lose data on restarts
// make sure we always start fresh
if (Files.exists(workingDir)) {
project.delete(workingDir);
}
isWorkingDirConfigured = true;
createWorkingDir(getExtractedDistributionDir());
}
createWorkingDir(getExtractedDistributionDir());
} catch (IOException e) {
throw new UncheckedIOException("Failed to create working directory for " + this, e);
}
Expand Down Expand Up @@ -446,6 +468,18 @@ public void restart() {
start();
}

@Override
public void goToNextVersion() {
if (currentDistro + 1 >= distributions.size()) {
throw new TestClustersException("Ran out of versions to go to for " + this);
}
LOGGER.info("Switch version from {} to {} for {}",
getVersion(), distributions.get(currentDistro + 1).getVersion(), this
);
currentDistro += 1;
restart();
}

private boolean isSettingMissingOrTrue(String name) {
return Boolean.valueOf(settings.getOrDefault(name, "false").toString());
}
Expand Down Expand Up @@ -475,7 +509,7 @@ private void installModules() {
logToProcessStdout("Installing " + modules.size() + "modules");
for (File module : modules) {
Path destination = workingDir.resolve("modules").resolve(module.getName().replace(".zip", "")
.replace("-" + version, ""));
.replace("-" + getVersion(), ""));

// only install modules that are not already bundled with the integ-test distribution
if (Files.exists(destination) == false) {
Expand All @@ -492,7 +526,7 @@ private void installModules() {
}
}
} else {
LOGGER.info("Not installing " + modules.size() + "(s) since the " + distribution + " distribution already " +
LOGGER.info("Not installing " + modules.size() + "(s) since the " + distributions + " distribution already " +
"has them");
}
}
Expand Down Expand Up @@ -533,16 +567,16 @@ public void user(Map<String, String> userSpec) {

private void runElaticsearchBinScriptWithInput(String input, String tool, String... args) {
if (
Files.exists(workingDir.resolve("bin").resolve(tool)) == false &&
Files.exists(workingDir.resolve("bin").resolve(tool + ".bat")) == false
Files.exists(distroDir.resolve("bin").resolve(tool)) == false &&
Files.exists(distroDir.resolve("bin").resolve(tool + ".bat")) == false
) {
throw new TestClustersException("Can't run bin script: `" + tool + "` does not exist. " +
"Is this the distribution you expect it to be ?");
}
try (InputStream byteArrayInputStream = new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8))) {
LoggedExec.exec(project, spec -> {
spec.setEnvironment(getESEnvironment());
spec.workingDir(workingDir);
spec.workingDir(distroDir);
spec.executable(
OS.conditionalString()
.onUnix(() -> "./bin/" + tool)
Expand Down Expand Up @@ -620,8 +654,8 @@ private void startElasticsearchProcess() {
final ProcessBuilder processBuilder = new ProcessBuilder();

List<String> command = OS.<List<String>>conditional()
.onUnix(() -> Arrays.asList("./bin/elasticsearch"))
.onWindows(() -> Arrays.asList("cmd", "/c", "bin\\elasticsearch.bat"))
.onUnix(() -> Arrays.asList(distroDir.getFileName().resolve("./bin/elasticsearch").toString()))
.onWindows(() -> Arrays.asList("cmd", "/c", distroDir.getFileName().resolve("bin\\elasticsearch.bat").toString()))
.supply();
processBuilder.command(command);
processBuilder.directory(workingDir.toFile());
Expand Down Expand Up @@ -821,7 +855,7 @@ private void waitForProcessToExit(ProcessHandle processHandle) {
}

private void createWorkingDir(Path distroExtractDir) throws IOException {
syncWithLinks(distroExtractDir, workingDir);
syncWithLinks(distroExtractDir, distroDir);
Files.createDirectories(configFile.getParent());
Files.createDirectories(confPathRepo);
Files.createDirectories(confPathData);
Expand All @@ -844,7 +878,14 @@ private void syncWithLinks(Path sourceRoot, Path destinationRoot) {

try (Stream<Path> stream = Files.walk(sourceRoot)) {
stream.forEach(source -> {
Path destination = destinationRoot.resolve(sourceRoot.relativize(source));
Path relativeDestination = sourceRoot.relativize(source);
if (relativeDestination.getNameCount() <= 1) {
return;
}
// Throw away the first name as the archives have everything in a single top level folder we are not interested in
relativeDestination = relativeDestination.subpath(1, relativeDestination.getNameCount());

Path destination = destinationRoot.resolve(relativeDestination);
if (Files.isDirectory(source)) {
try {
Files.createDirectories(destination);
Expand Down Expand Up @@ -920,9 +961,6 @@ private void createConfiguration() {
.forEach(defaultConfig::remove);

try {
// We create hard links for the distribution, so we need to remove the config file before writing it
// to prevent the changes to reflect across all copies.
Files.delete(configFile);
Files.write(
configFile,
Stream.concat(
Expand All @@ -931,8 +969,17 @@ private void createConfiguration() {
)
.map(entry -> entry.getKey() + ": " + entry.getValue())
.collect(Collectors.joining("\n"))
.getBytes(StandardCharsets.UTF_8)
.getBytes(StandardCharsets.UTF_8),
StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE
);
Path jvmOptions = configFile.getParent().resolve("jvm.options");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused as to why this is necessary? Why are we cherry picking files here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could recursively copy the contents of the directory, but these are the relevant files

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I don't understand why we need to explicity copy these files from the distribution dir to the working directory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are setting up a new config dir, outside of the distro dir.
These only get copied if they don't exist, so for bwc tests they will use the old versions even after upgrade.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm replacing this with walking the config dir so we don't hard-code files

Path log4j = configFile.getParent().resolve("log4j2.properties");
if (Files.exists(jvmOptions) == false) {
Files.copy(distroDir.resolve("config/jvm.options"), jvmOptions);
}
if (Files.exists(log4j) == false) {
Files.copy(distroDir.resolve("config/log4j2.properties"), log4j);
}
} catch (IOException e) {
throw new UncheckedIOException("Could not write config file: " + configFile, e);
}
Expand Down Expand Up @@ -972,7 +1019,7 @@ private List<String> readPortsFile(Path file) throws IOException {
}

private Path getExtractedDistributionDir() {
return Paths.get(distribution.getExtracted().toString()).resolve("elasticsearch-" + version);
return Paths.get(distributions.get(currentDistro).getExtracted().toString());
}

private List<File> getInstalledFileSet(Action<? super PatternFilterable> filter) {
Expand Down
Loading