From 9b8854b774f26db0076b50757378fb0d6325c1af Mon Sep 17 00:00:00 2001 From: Alfonso Presa Date: Thu, 5 Dec 2019 20:21:38 +0100 Subject: [PATCH] feature(k8s): copy files from pods using shared folder instead of api calls fixes: #445 --- .../zalenium/container/ContainerClient.java | 5 +- .../container/DockerContainerClient.java | 8 +- .../kubernetes/KubernetesContainerClient.java | 78 ++++++++++++++++--- .../container/swarm/SwarmContainerClient.java | 4 +- .../proxy/DockerSeleniumRemoteProxy.java | 28 +++---- .../streams/DefaultInputStreamDescriptor.java | 23 ++++++ .../streams/InputStreamDescriptor.java | 10 +++ .../streams/InputStreamGroupIterator.java | 9 +++ .../streams/MapInputStreamAdapter.java | 27 +++++++ .../streams/TarInputStreamGroupWrapper.java | 32 ++++++++ 10 files changed, 191 insertions(+), 33 deletions(-) create mode 100644 src/main/java/de/zalando/ep/zalenium/streams/DefaultInputStreamDescriptor.java create mode 100644 src/main/java/de/zalando/ep/zalenium/streams/InputStreamDescriptor.java create mode 100644 src/main/java/de/zalando/ep/zalenium/streams/InputStreamGroupIterator.java create mode 100644 src/main/java/de/zalando/ep/zalenium/streams/MapInputStreamAdapter.java create mode 100644 src/main/java/de/zalando/ep/zalenium/streams/TarInputStreamGroupWrapper.java diff --git a/src/main/java/de/zalando/ep/zalenium/container/ContainerClient.java b/src/main/java/de/zalando/ep/zalenium/container/ContainerClient.java index 7c0cb41930..2e68e5025f 100644 --- a/src/main/java/de/zalando/ep/zalenium/container/ContainerClient.java +++ b/src/main/java/de/zalando/ep/zalenium/container/ContainerClient.java @@ -1,6 +1,7 @@ package de.zalando.ep.zalenium.container; -import java.io.InputStream; +import de.zalando.ep.zalenium.streams.InputStreamGroupIterator; + import java.net.URL; import java.util.Map; @@ -10,7 +11,7 @@ public interface ContainerClient { ContainerClientRegistration registerNode(String zaleniumContainerName, URL remoteHost); - InputStream copyFiles(String containerId, String folderName); + InputStreamGroupIterator copyFiles(String containerId, String folderName); void stopContainer(String containerId); diff --git a/src/main/java/de/zalando/ep/zalenium/container/DockerContainerClient.java b/src/main/java/de/zalando/ep/zalenium/container/DockerContainerClient.java index 4cc82afe2e..5b4c705d1a 100644 --- a/src/main/java/de/zalando/ep/zalenium/container/DockerContainerClient.java +++ b/src/main/java/de/zalando/ep/zalenium/container/DockerContainerClient.java @@ -1,7 +1,6 @@ package de.zalando.ep.zalenium.container; import java.io.IOException; -import java.io.InputStream; import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -20,6 +19,9 @@ import com.spotify.docker.client.DockerClient; import com.spotify.docker.client.LogStream; import com.spotify.docker.client.messages.PortBinding; +import de.zalando.ep.zalenium.streams.InputStreamGroupIterator; +import de.zalando.ep.zalenium.streams.TarInputStreamGroupWrapper; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.RandomStringUtils; import org.slf4j.Logger; @@ -214,9 +216,9 @@ private String getContainerId(String containerName) { } } - public InputStream copyFiles(String containerId, String folderName) { + public InputStreamGroupIterator copyFiles(String containerId, String folderName) { try { - return dockerClient.archiveContainer(containerId, folderName); + return new TarInputStreamGroupWrapper(new TarArchiveInputStream(dockerClient.archiveContainer(containerId, folderName))); } catch (DockerException | InterruptedException e) { logger.warn(nodeId + " Something happened while copying the folder " + folderName + ", " + "most of the time it is an issue while closing the input/output stream, which is usually OK.", e); diff --git a/src/main/java/de/zalando/ep/zalenium/container/kubernetes/KubernetesContainerClient.java b/src/main/java/de/zalando/ep/zalenium/container/kubernetes/KubernetesContainerClient.java index fbabb1b894..1cbf30ecb8 100644 --- a/src/main/java/de/zalando/ep/zalenium/container/kubernetes/KubernetesContainerClient.java +++ b/src/main/java/de/zalando/ep/zalenium/container/kubernetes/KubernetesContainerClient.java @@ -3,6 +3,9 @@ import de.zalando.ep.zalenium.container.ContainerClient; import de.zalando.ep.zalenium.container.ContainerClientRegistration; import de.zalando.ep.zalenium.container.ContainerCreationStatus; +import de.zalando.ep.zalenium.streams.InputStreamGroupIterator; +import de.zalando.ep.zalenium.streams.MapInputStreamAdapter; +import de.zalando.ep.zalenium.streams.TarInputStreamGroupWrapper; import de.zalando.ep.zalenium.util.Environment; import io.fabric8.kubernetes.api.model.Container; import io.fabric8.kubernetes.api.model.ContainerStateTerminated; @@ -24,21 +27,20 @@ import io.fabric8.kubernetes.client.dsl.ExecListener; import io.fabric8.kubernetes.client.dsl.ExecWatch; import okhttp3.Response; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.ByteArrayOutputStream; -import java.io.InputStream; +import java.io.IOException; +import java.io.File; import java.net.InetAddress; import java.net.URL; import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; @@ -70,6 +72,7 @@ public class KubernetesContainerClient implements ContainerClient { private Map appLabelMap; private Map mountedSharedFoldersMap = new HashMap<>(); + private VolumeMount nodeSharedArtifactsMount; private List hostAliases = new ArrayList<>(); private Map nodeSelector = new HashMap<>(); private List tolerations = new ArrayList<>(); @@ -216,7 +219,12 @@ private void discoverFolderMounts() { volumes.stream() .filter(volume -> validMount.getName().equalsIgnoreCase(volume.getName())) .findFirst() - .ifPresent(volume -> mountedSharedFoldersMap.put(validMount, volume)); + .ifPresent(volume -> { + if(nodeSharedArtifactsMount == null) { + nodeSharedArtifactsMount = validMount; + } + mountedSharedFoldersMap.put(validMount, volume); + }); } } } @@ -258,10 +266,44 @@ public void setNodeId(String nodeId) { * Unfortunately due to the fact that any error handling happens on another thread, if the tar command fails the * InputStream will simply be empty and it will close. It won't propagate an Exception to the reader of the * InputStream. + * @return */ @Override - public InputStream copyFiles(String containerId, String folderName) { + public InputStreamGroupIterator copyFiles(String containerId, String folderName) { + if(nodeSharedArtifactsMount != null) { + return copyFilesFromSharedVolume(containerId, folderName); + } else { + return copyFilesThroughCommands(containerId, folderName); + } + } + + private InputStreamGroupIterator copyFilesFromSharedVolume(String containerId, String folderName) { + Map streams = new HashMap<>(); + + Optional oWorkDir = client.pods().withName(containerId).get() + .getSpec().getContainers().get(0).getEnv() + .stream() + .filter(env -> env.getName().equals("SHARED_DIR")) + .map(env -> env.getValue()) + .findFirst(); + + if(!oWorkDir.isPresent()) { + throw new RuntimeException("SHARED_DIR not present in pod" + containerId); + } + String workDir = oWorkDir.get(); + + File dir = new File(workDir + folderName); + File[] directoryListing = dir.listFiles(); + for(File f : directoryListing) { + if(f.getName().endsWith(".log") || f.getName().endsWith(".mp4")) { + streams.put(f.getName(), f); + } + } + + return new MapInputStreamAdapter(streams); + } + private InputStreamGroupIterator copyFilesThroughCommands(String containerId, String folderName) { ByteArrayOutputStream stderr = new ByteArrayOutputStream(); String[] command = new String[] { "tar", "-C", folderName, "-c", "." }; CopyFilesExecListener listener = new CopyFilesExecListener(stderr, command, containerId); @@ -276,7 +318,7 @@ public InputStream copyFiles(String containerId, String folderName) { // Let's wait until it is connected before proceeding. listener.waitForInputStreamToConnect(); - return exec.getOutput(); + return new TarInputStreamGroupWrapper(new TarArchiveInputStream(exec.getOutput())); } @Override @@ -352,6 +394,22 @@ public ContainerCreationStatus createContainer(String zaleniumContainerName, Str .map(e -> new EnvVar(e.getKey(), e.getValue(), null)) .collect(Collectors.toList()); + if(nodeSharedArtifactsMount != null) { + String workDir = nodeSharedArtifactsMount.getMountPath() + "/" + UUID.randomUUID().toString(); + flattenedEnvVars.add(new EnvVar("SHARED_DIR", workDir, null)); + flattenedEnvVars.add(new EnvVar("VIDEOS_DIR", workDir + "/videos", null)); + flattenedEnvVars.add(new EnvVar("LOGS_DIR", workDir + "/var/log/cont", null)); + if (!Files.exists(Paths.get(workDir))) { + try { + Files.createDirectories(Paths.get(workDir)); + Files.createDirectories(Paths.get(workDir + "/videos")); + Files.createDirectories(Paths.get(workDir + "/var/log/cont")); + } catch (IOException e) { + logger.error("Error creating folder {}", workDir, e); + } + } + } + Map podSelector = new HashMap<>(); PodConfiguration config = new PodConfiguration(); diff --git a/src/main/java/de/zalando/ep/zalenium/container/swarm/SwarmContainerClient.java b/src/main/java/de/zalando/ep/zalenium/container/swarm/SwarmContainerClient.java index 19826a0bca..330faac5da 100644 --- a/src/main/java/de/zalando/ep/zalenium/container/swarm/SwarmContainerClient.java +++ b/src/main/java/de/zalando/ep/zalenium/container/swarm/SwarmContainerClient.java @@ -10,6 +10,7 @@ import de.zalando.ep.zalenium.container.ContainerClientRegistration; import de.zalando.ep.zalenium.container.ContainerCreationStatus; import de.zalando.ep.zalenium.proxy.DockeredSeleniumStarter; +import de.zalando.ep.zalenium.streams.InputStreamGroupIterator; import de.zalando.ep.zalenium.util.Environment; import de.zalando.ep.zalenium.util.GoogleAnalyticsApi; import de.zalando.ep.zalenium.util.ZaleniumConfiguration; @@ -20,7 +21,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.io.InputStream; import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.*; @@ -124,7 +124,7 @@ private String getContainerId(String containerName) { } } - public InputStream copyFiles(String containerId, String folderName) { + public InputStreamGroupIterator copyFiles(String containerId, String folderName) { // TODO: Implement behaviour return null; } diff --git a/src/main/java/de/zalando/ep/zalenium/proxy/DockerSeleniumRemoteProxy.java b/src/main/java/de/zalando/ep/zalenium/proxy/DockerSeleniumRemoteProxy.java index ac5617700e..61f22953d7 100644 --- a/src/main/java/de/zalando/ep/zalenium/proxy/DockerSeleniumRemoteProxy.java +++ b/src/main/java/de/zalando/ep/zalenium/proxy/DockerSeleniumRemoteProxy.java @@ -15,6 +15,8 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import de.zalando.ep.zalenium.streams.InputStreamDescriptor; +import de.zalando.ep.zalenium.streams.InputStreamGroupIterator; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.lang3.StringUtils; @@ -645,21 +647,18 @@ void copyVideos(final String containerId) { } String currentName = configureThreadName(); boolean videoWasCopied = false; - TarArchiveInputStream tarStream = new TarArchiveInputStream(containerClient.copyFiles(containerId, "/videos/")); + InputStreamGroupIterator tarStream = containerClient.copyFiles(containerId, "/videos/"); try { - TarArchiveEntry entry; - while ((entry = tarStream.getNextTarEntry()) != null) { - if (entry.isDirectory()) { - continue; - } - String fileExtension = entry.getName().substring(entry.getName().lastIndexOf('.')); + InputStreamDescriptor entry; + while ((entry = tarStream.next()) != null) { + String fileExtension = entry.name().substring(entry.name().lastIndexOf('.')); testInformation.setFileExtension(fileExtension); Path videoFile = Paths.get(String.format("%s/%s", testInformation.getVideoFolderPath(), testInformation.getFileName())); if (!Files.exists(videoFile.getParent())) { Files.createDirectories(videoFile.getParent()); } - Files.copy(tarStream, videoFile); + Files.copy(entry.get(), videoFile); CommonProxyUtilities.setFilePermissions(videoFile); videoWasCopied = true; LOGGER.debug("Video file copied to: {}/{}", testInformation.getVideoFolderPath(), testInformation.getFileName()); @@ -693,21 +692,18 @@ void copyLogs(final String containerId) { return; } String currentName = configureThreadName(); - TarArchiveInputStream tarStream = new TarArchiveInputStream(containerClient.copyFiles(containerId, "/var/log/cont/")); + InputStreamGroupIterator tarStream = containerClient.copyFiles(containerId, "/var/log/cont/"); try { - TarArchiveEntry entry; - while ((entry = tarStream.getNextTarEntry()) != null) { - if (entry.isDirectory()) { - continue; - } + InputStreamDescriptor entry; + while ((entry = tarStream.next()) != null) { if (!Files.exists(Paths.get(testInformation.getLogsFolderPath()))) { Path directories = Files.createDirectories(Paths.get(testInformation.getLogsFolderPath())); CommonProxyUtilities.setFilePermissions(directories); CommonProxyUtilities.setFilePermissions(directories.getParent()); } - String fileName = entry.getName().replace("cont/", ""); + String fileName = entry.name().replace("cont/", ""); Path logFile = Paths.get(String.format("%s/%s", testInformation.getLogsFolderPath(), fileName)); - Files.copy(tarStream, logFile); + Files.copy(entry.get(), logFile); CommonProxyUtilities.setFilePermissions(logFile); } LOGGER.debug("Logs copied to: {}", testInformation.getLogsFolderPath()); diff --git a/src/main/java/de/zalando/ep/zalenium/streams/DefaultInputStreamDescriptor.java b/src/main/java/de/zalando/ep/zalenium/streams/DefaultInputStreamDescriptor.java new file mode 100644 index 0000000000..2e984d4b0c --- /dev/null +++ b/src/main/java/de/zalando/ep/zalenium/streams/DefaultInputStreamDescriptor.java @@ -0,0 +1,23 @@ +package de.zalando.ep.zalenium.streams; + +import java.io.InputStream; + +public final class DefaultInputStreamDescriptor implements InputStreamDescriptor{ + private final InputStream is; + private final String name; + + public DefaultInputStreamDescriptor(InputStream is, String name) { + this.is = is; + this.name = name; + } + + @Override + public InputStream get() { + return is; + } + + @Override + public String name() { + return name; + } +} \ No newline at end of file diff --git a/src/main/java/de/zalando/ep/zalenium/streams/InputStreamDescriptor.java b/src/main/java/de/zalando/ep/zalenium/streams/InputStreamDescriptor.java new file mode 100644 index 0000000000..9e7b7875cb --- /dev/null +++ b/src/main/java/de/zalando/ep/zalenium/streams/InputStreamDescriptor.java @@ -0,0 +1,10 @@ +package de.zalando.ep.zalenium.streams; + +import java.io.InputStream; + +public interface InputStreamDescriptor { + + InputStream get(); + String name(); + +} \ No newline at end of file diff --git a/src/main/java/de/zalando/ep/zalenium/streams/InputStreamGroupIterator.java b/src/main/java/de/zalando/ep/zalenium/streams/InputStreamGroupIterator.java new file mode 100644 index 0000000000..039ffd467e --- /dev/null +++ b/src/main/java/de/zalando/ep/zalenium/streams/InputStreamGroupIterator.java @@ -0,0 +1,9 @@ +package de.zalando.ep.zalenium.streams; + +import java.io.IOException; + +public interface InputStreamGroupIterator { + + InputStreamDescriptor next() throws IOException; + +} \ No newline at end of file diff --git a/src/main/java/de/zalando/ep/zalenium/streams/MapInputStreamAdapter.java b/src/main/java/de/zalando/ep/zalenium/streams/MapInputStreamAdapter.java new file mode 100644 index 0000000000..93503d27de --- /dev/null +++ b/src/main/java/de/zalando/ep/zalenium/streams/MapInputStreamAdapter.java @@ -0,0 +1,27 @@ +package de.zalando.ep.zalenium.streams; + +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Iterator; +import java.io.File; +import java.util.Map; + +public class MapInputStreamAdapter implements InputStreamGroupIterator{ + + Iterator> streams; + + public MapInputStreamAdapter(Map files) { + streams = files.entrySet().stream() + .iterator(); + } + + @Override + public InputStreamDescriptor next() throws IOException { + return streams.hasNext() ? getNextDescriptor() : null; + } + + private InputStreamDescriptor getNextDescriptor() throws IOException { + Map.Entry entry = streams.next(); + return new DefaultInputStreamDescriptor(new FileInputStream(entry.getValue()), entry.getKey()); + } +} \ No newline at end of file diff --git a/src/main/java/de/zalando/ep/zalenium/streams/TarInputStreamGroupWrapper.java b/src/main/java/de/zalando/ep/zalenium/streams/TarInputStreamGroupWrapper.java new file mode 100644 index 0000000000..a8ba2e7749 --- /dev/null +++ b/src/main/java/de/zalando/ep/zalenium/streams/TarInputStreamGroupWrapper.java @@ -0,0 +1,32 @@ +package de.zalando.ep.zalenium.streams; + +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; + +import java.io.IOException; + +public class TarInputStreamGroupWrapper implements InputStreamGroupIterator{ + + private final TarArchiveInputStream stream; + private TarArchiveEntry currentTarEntry; + private boolean started = false; + + public TarInputStreamGroupWrapper(TarArchiveInputStream stream) { + if(stream == null) { + throw new RuntimeException("Stream cannot be null"); + } + + this.stream = stream; + } + + @Override + public InputStreamDescriptor next() throws IOException { + while(!started || (currentTarEntry != null && currentTarEntry.isDirectory())) { + started = true; + currentTarEntry = stream.getNextTarEntry(); + } + return currentTarEntry == null ? null : new DefaultInputStreamDescriptor(stream, currentTarEntry.getName()); + } + + +} \ No newline at end of file