Skip to content
This repository has been archived by the owner on Sep 21, 2021. It is now read-only.

Commit

Permalink
feature(k8s): copy files from pods using shared folder instead of api…
Browse files Browse the repository at this point in the history
… calls

fixes: #445
  • Loading branch information
alfonso-presa authored and diemol committed Dec 11, 2019
1 parent 714f87c commit 9b8854b
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package de.zalando.ep.zalenium.container;

import java.io.InputStream;
import de.zalando.ep.zalenium.streams.InputStreamGroupIterator;

import java.net.URL;
import java.util.Map;

Expand All @@ -10,7 +11,7 @@ public interface ContainerClient {

ContainerClientRegistration registerNode(String zaleniumContainerName, URL remoteHost);

InputStream copyFiles(String containerId, String folderName);
InputStreamGroupIterator copyFiles(String containerId, String folderName);

void stopContainer(String containerId);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package de.zalando.ep.zalenium.container;

import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand All @@ -20,6 +19,9 @@
import com.spotify.docker.client.DockerClient;
import com.spotify.docker.client.LogStream;
import com.spotify.docker.client.messages.PortBinding;
import de.zalando.ep.zalenium.streams.InputStreamGroupIterator;
import de.zalando.ep.zalenium.streams.TarInputStreamGroupWrapper;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -214,9 +216,9 @@ private String getContainerId(String containerName) {
}
}

public InputStream copyFiles(String containerId, String folderName) {
public InputStreamGroupIterator copyFiles(String containerId, String folderName) {
try {
return dockerClient.archiveContainer(containerId, folderName);
return new TarInputStreamGroupWrapper(new TarArchiveInputStream(dockerClient.archiveContainer(containerId, folderName)));
} catch (DockerException | InterruptedException e) {
logger.warn(nodeId + " Something happened while copying the folder " + folderName + ", " +
"most of the time it is an issue while closing the input/output stream, which is usually OK.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import de.zalando.ep.zalenium.container.ContainerClient;
import de.zalando.ep.zalenium.container.ContainerClientRegistration;
import de.zalando.ep.zalenium.container.ContainerCreationStatus;
import de.zalando.ep.zalenium.streams.InputStreamGroupIterator;
import de.zalando.ep.zalenium.streams.MapInputStreamAdapter;
import de.zalando.ep.zalenium.streams.TarInputStreamGroupWrapper;
import de.zalando.ep.zalenium.util.Environment;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerStateTerminated;
Expand All @@ -24,21 +27,20 @@
import io.fabric8.kubernetes.client.dsl.ExecListener;
import io.fabric8.kubernetes.client.dsl.ExecWatch;
import okhttp3.Response;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.IOException;
import java.io.File;
import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -70,6 +72,7 @@ public class KubernetesContainerClient implements ContainerClient {
private Map<String, String> appLabelMap;

private Map<VolumeMount, Volume> mountedSharedFoldersMap = new HashMap<>();
private VolumeMount nodeSharedArtifactsMount;
private List<HostAlias> hostAliases = new ArrayList<>();
private Map<String, String> nodeSelector = new HashMap<>();
private List<Toleration> tolerations = new ArrayList<>();
Expand Down Expand Up @@ -216,7 +219,12 @@ private void discoverFolderMounts() {
volumes.stream()
.filter(volume -> validMount.getName().equalsIgnoreCase(volume.getName()))
.findFirst()
.ifPresent(volume -> mountedSharedFoldersMap.put(validMount, volume));
.ifPresent(volume -> {
if(nodeSharedArtifactsMount == null) {
nodeSharedArtifactsMount = validMount;
}
mountedSharedFoldersMap.put(validMount, volume);
});
}
}
}
Expand Down Expand Up @@ -258,10 +266,44 @@ public void setNodeId(String nodeId) {
* Unfortunately due to the fact that any error handling happens on another thread, if the tar command fails the
* InputStream will simply be empty and it will close. It won't propagate an Exception to the reader of the
* InputStream.
* @return
*/
@Override
public InputStream copyFiles(String containerId, String folderName) {
public InputStreamGroupIterator copyFiles(String containerId, String folderName) {
if(nodeSharedArtifactsMount != null) {
return copyFilesFromSharedVolume(containerId, folderName);
} else {
return copyFilesThroughCommands(containerId, folderName);
}
}

private InputStreamGroupIterator copyFilesFromSharedVolume(String containerId, String folderName) {
Map<String, File> streams = new HashMap<>();

Optional<String> oWorkDir = client.pods().withName(containerId).get()
.getSpec().getContainers().get(0).getEnv()
.stream()
.filter(env -> env.getName().equals("SHARED_DIR"))
.map(env -> env.getValue())
.findFirst();

if(!oWorkDir.isPresent()) {
throw new RuntimeException("SHARED_DIR not present in pod" + containerId);
}
String workDir = oWorkDir.get();

File dir = new File(workDir + folderName);
File[] directoryListing = dir.listFiles();
for(File f : directoryListing) {
if(f.getName().endsWith(".log") || f.getName().endsWith(".mp4")) {
streams.put(f.getName(), f);
}
}

return new MapInputStreamAdapter(streams);
}

private InputStreamGroupIterator copyFilesThroughCommands(String containerId, String folderName) {
ByteArrayOutputStream stderr = new ByteArrayOutputStream();
String[] command = new String[] { "tar", "-C", folderName, "-c", "." };
CopyFilesExecListener listener = new CopyFilesExecListener(stderr, command, containerId);
Expand All @@ -276,7 +318,7 @@ public InputStream copyFiles(String containerId, String folderName) {
// Let's wait until it is connected before proceeding.
listener.waitForInputStreamToConnect();

return exec.getOutput();
return new TarInputStreamGroupWrapper(new TarArchiveInputStream(exec.getOutput()));
}

@Override
Expand Down Expand Up @@ -352,6 +394,22 @@ public ContainerCreationStatus createContainer(String zaleniumContainerName, Str
.map(e -> new EnvVar(e.getKey(), e.getValue(), null))
.collect(Collectors.toList());

if(nodeSharedArtifactsMount != null) {
String workDir = nodeSharedArtifactsMount.getMountPath() + "/" + UUID.randomUUID().toString();
flattenedEnvVars.add(new EnvVar("SHARED_DIR", workDir, null));
flattenedEnvVars.add(new EnvVar("VIDEOS_DIR", workDir + "/videos", null));
flattenedEnvVars.add(new EnvVar("LOGS_DIR", workDir + "/var/log/cont", null));
if (!Files.exists(Paths.get(workDir))) {
try {
Files.createDirectories(Paths.get(workDir));
Files.createDirectories(Paths.get(workDir + "/videos"));
Files.createDirectories(Paths.get(workDir + "/var/log/cont"));
} catch (IOException e) {
logger.error("Error creating folder {}", workDir, e);
}
}
}

Map<String, String> podSelector = new HashMap<>();

PodConfiguration config = new PodConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import de.zalando.ep.zalenium.container.ContainerClientRegistration;
import de.zalando.ep.zalenium.container.ContainerCreationStatus;
import de.zalando.ep.zalenium.proxy.DockeredSeleniumStarter;
import de.zalando.ep.zalenium.streams.InputStreamGroupIterator;
import de.zalando.ep.zalenium.util.Environment;
import de.zalando.ep.zalenium.util.GoogleAnalyticsApi;
import de.zalando.ep.zalenium.util.ZaleniumConfiguration;
Expand All @@ -20,7 +21,6 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.*;
Expand Down Expand Up @@ -124,7 +124,7 @@ private String getContainerId(String containerName) {
}
}

public InputStream copyFiles(String containerId, String folderName) {
public InputStreamGroupIterator copyFiles(String containerId, String folderName) {
// TODO: Implement behaviour
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import de.zalando.ep.zalenium.streams.InputStreamDescriptor;
import de.zalando.ep.zalenium.streams.InputStreamGroupIterator;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -645,21 +647,18 @@ void copyVideos(final String containerId) {
}
String currentName = configureThreadName();
boolean videoWasCopied = false;
TarArchiveInputStream tarStream = new TarArchiveInputStream(containerClient.copyFiles(containerId, "/videos/"));
InputStreamGroupIterator tarStream = containerClient.copyFiles(containerId, "/videos/");
try {
TarArchiveEntry entry;
while ((entry = tarStream.getNextTarEntry()) != null) {
if (entry.isDirectory()) {
continue;
}
String fileExtension = entry.getName().substring(entry.getName().lastIndexOf('.'));
InputStreamDescriptor entry;
while ((entry = tarStream.next()) != null) {
String fileExtension = entry.name().substring(entry.name().lastIndexOf('.'));
testInformation.setFileExtension(fileExtension);
Path videoFile = Paths.get(String.format("%s/%s", testInformation.getVideoFolderPath(),
testInformation.getFileName()));
if (!Files.exists(videoFile.getParent())) {
Files.createDirectories(videoFile.getParent());
}
Files.copy(tarStream, videoFile);
Files.copy(entry.get(), videoFile);
CommonProxyUtilities.setFilePermissions(videoFile);
videoWasCopied = true;
LOGGER.debug("Video file copied to: {}/{}", testInformation.getVideoFolderPath(), testInformation.getFileName());
Expand Down Expand Up @@ -693,21 +692,18 @@ void copyLogs(final String containerId) {
return;
}
String currentName = configureThreadName();
TarArchiveInputStream tarStream = new TarArchiveInputStream(containerClient.copyFiles(containerId, "/var/log/cont/"));
InputStreamGroupIterator tarStream = containerClient.copyFiles(containerId, "/var/log/cont/");
try {
TarArchiveEntry entry;
while ((entry = tarStream.getNextTarEntry()) != null) {
if (entry.isDirectory()) {
continue;
}
InputStreamDescriptor entry;
while ((entry = tarStream.next()) != null) {
if (!Files.exists(Paths.get(testInformation.getLogsFolderPath()))) {
Path directories = Files.createDirectories(Paths.get(testInformation.getLogsFolderPath()));
CommonProxyUtilities.setFilePermissions(directories);
CommonProxyUtilities.setFilePermissions(directories.getParent());
}
String fileName = entry.getName().replace("cont/", "");
String fileName = entry.name().replace("cont/", "");
Path logFile = Paths.get(String.format("%s/%s", testInformation.getLogsFolderPath(), fileName));
Files.copy(tarStream, logFile);
Files.copy(entry.get(), logFile);
CommonProxyUtilities.setFilePermissions(logFile);
}
LOGGER.debug("Logs copied to: {}", testInformation.getLogsFolderPath());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package de.zalando.ep.zalenium.streams;

import java.io.InputStream;

public final class DefaultInputStreamDescriptor implements InputStreamDescriptor{
private final InputStream is;
private final String name;

public DefaultInputStreamDescriptor(InputStream is, String name) {
this.is = is;
this.name = name;
}

@Override
public InputStream get() {
return is;
}

@Override
public String name() {
return name;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package de.zalando.ep.zalenium.streams;

import java.io.InputStream;

public interface InputStreamDescriptor {

InputStream get();
String name();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package de.zalando.ep.zalenium.streams;

import java.io.IOException;

public interface InputStreamGroupIterator {

InputStreamDescriptor next() throws IOException;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package de.zalando.ep.zalenium.streams;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.Iterator;
import java.io.File;
import java.util.Map;

public class MapInputStreamAdapter implements InputStreamGroupIterator{

Iterator<Map.Entry<String, File>> streams;

public MapInputStreamAdapter(Map<String, File> files) {
streams = files.entrySet().stream()
.iterator();
}

@Override
public InputStreamDescriptor next() throws IOException {
return streams.hasNext() ? getNextDescriptor() : null;
}

private InputStreamDescriptor getNextDescriptor() throws IOException {
Map.Entry<String, File> entry = streams.next();
return new DefaultInputStreamDescriptor(new FileInputStream(entry.getValue()), entry.getKey());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package de.zalando.ep.zalenium.streams;

import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;

import java.io.IOException;

public class TarInputStreamGroupWrapper implements InputStreamGroupIterator{

private final TarArchiveInputStream stream;
private TarArchiveEntry currentTarEntry;
private boolean started = false;

public TarInputStreamGroupWrapper(TarArchiveInputStream stream) {
if(stream == null) {
throw new RuntimeException("Stream cannot be null");
}

this.stream = stream;
}

@Override
public InputStreamDescriptor next() throws IOException {
while(!started || (currentTarEntry != null && currentTarEntry.isDirectory())) {
started = true;
currentTarEntry = stream.getNextTarEntry();
}
return currentTarEntry == null ? null : new DefaultInputStreamDescriptor(stream, currentTarEntry.getName());
}


}

0 comments on commit 9b8854b

Please sign in to comment.