Skip to content

Commit

Permalink
Make it easier to use MinIO with Hadoop
Browse files Browse the repository at this point in the history
Migrate convenience methods from HiveMinioDataLake to Minio itself, so
that it's viable to use MinIO alone without Hadoop container.
  • Loading branch information
findepi committed Oct 7, 2022
1 parent 4ed05cc commit ae11dd6
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,26 @@
package io.trino.plugin.hive.containers;

import com.google.common.collect.ImmutableMap;
import com.google.common.reflect.ClassPath;
import io.trino.testing.containers.Minio;
import io.trino.testing.minio.MinioClient;
import io.trino.util.AutoCloseableCloser;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import org.testcontainers.containers.Network;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

import static com.google.common.base.Preconditions.checkState;
import static io.trino.testing.containers.TestContainers.getPathFromClassPathResource;
import static java.time.temporal.ChronoUnit.MINUTES;
import static java.time.temporal.ChronoUnit.SECONDS;
import static java.util.Objects.requireNonNull;
import static java.util.regex.Matcher.quoteReplacement;
import static org.testcontainers.containers.Network.newNetwork;

public class HiveMinioDataLake
implements AutoCloseable
{
public static final String MINIO_ACCESS_KEY = "accesskey";
public static final String MINIO_SECRET_KEY = "secretkey";
@Deprecated
public static final String MINIO_ACCESS_KEY = Minio.MINIO_ACCESS_KEY;
@Deprecated
public static final String MINIO_SECRET_KEY = Minio.MINIO_SECRET_KEY;

private final String bucketName;
private final Minio minio;
Expand Down Expand Up @@ -88,7 +80,8 @@ public void start()
state = State.STARTING;
minio.start();
hiveHadoop.start();
minioClient = initMinioClient();
minioClient = closer.register(minio.createMinioClient());
minio.createBucket(bucketName);
state = State.STARTED;
}

Expand All @@ -107,23 +100,12 @@ public MinioClient getMinioClient()

public void copyResources(String resourcePath, String target)
{
try {
for (ClassPath.ResourceInfo resourceInfo : ClassPath.from(MinioClient.class.getClassLoader())
.getResources()) {
if (resourceInfo.getResourceName().startsWith(resourcePath)) {
String fileName = resourceInfo.getResourceName().replaceFirst("^" + Pattern.quote(resourcePath), quoteReplacement(target));
writeFile(resourceInfo.asByteSource().read(), fileName);
}
}
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
minio.copyResources(resourcePath, bucketName, target);
}

public void writeFile(byte[] contents, String target)
{
getMinioClient().putObject(getBucketName(), contents, target);
minio.writeFile(contents, bucketName, target);
}

public List<String> listFiles(String targetDirectory)
Expand All @@ -146,9 +128,10 @@ public String getBucketName()
return bucketName;
}

@Deprecated
public String getMinioAddress()
{
return "http://" + getMinio().getMinioApiEndpoint();
return getMinio().getMinioAddress();
}

@Override
Expand All @@ -158,22 +141,6 @@ public void close()
stop();
}

private MinioClient initMinioClient()
{
MinioClient minioClient = new MinioClient(getMinioAddress(), MINIO_ACCESS_KEY, MINIO_SECRET_KEY);
closer.register(minioClient);

// use retry loop for minioClient.makeBucket as minio container tends to return "Server not initialized, please try again" error
// for some time after starting up
RetryPolicy<Object> retryPolicy = new RetryPolicy<>()
.withMaxDuration(Duration.of(2, MINUTES))
.withMaxAttempts(Integer.MAX_VALUE) // limited by MaxDuration
.withDelay(Duration.of(10, SECONDS));
Failsafe.with(retryPolicy).run(() -> minioClient.makeBucket(bucketName));

return minioClient;
}

private enum State
{
INITIAL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,27 @@
package io.trino.testing.containers;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.net.HostAndPort;
import com.google.common.reflect.ClassPath;
import io.airlift.log.Logger;
import io.trino.testing.minio.MinioClient;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import org.testcontainers.containers.Network;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;

import static java.time.temporal.ChronoUnit.MINUTES;
import static java.time.temporal.ChronoUnit.SECONDS;
import static java.util.regex.Matcher.quoteReplacement;

public class Minio
extends BaseTestContainer
Expand All @@ -34,6 +47,10 @@ public class Minio
public static final int MINIO_API_PORT = 4566;
public static final int MINIO_CONSOLE_PORT = 4567;

// defaults
public static final String MINIO_ACCESS_KEY = "accesskey";
public static final String MINIO_SECRET_KEY = "secretkey";

public static Builder builder()
{
return new Builder();
Expand Down Expand Up @@ -82,11 +99,57 @@ public HostAndPort getMinioApiEndpoint()
return getMappedHostAndPortForExposedPort(MINIO_API_PORT);
}

public String getMinioAddress()
{
return "http://" + getMinioApiEndpoint();
}

public HostAndPort getMinioConsoleEndpoint()
{
return getMappedHostAndPortForExposedPort(MINIO_CONSOLE_PORT);
}

public void createBucket(String bucketName)
{
try (MinioClient minioClient = createMinioClient()) {
// use retry loop for minioClient.makeBucket as minio container tends to return "Server not initialized, please try again" error
// for some time after starting up
RetryPolicy<Object> retryPolicy = new RetryPolicy<>()
.withMaxDuration(Duration.of(2, MINUTES))
.withMaxAttempts(Integer.MAX_VALUE) // limited by MaxDuration
.withDelay(Duration.of(10, SECONDS));
Failsafe.with(retryPolicy).run(() -> minioClient.makeBucket(bucketName));
}
}

public void copyResources(String resourcePath, String bucketName, String target)
{
try (MinioClient minioClient = createMinioClient()) {
for (ClassPath.ResourceInfo resourceInfo : ClassPath.from(MinioClient.class.getClassLoader())
.getResources()) {
if (resourceInfo.getResourceName().startsWith(resourcePath)) {
String fileName = resourceInfo.getResourceName().replaceFirst("^" + Pattern.quote(resourcePath), quoteReplacement(target));
minioClient.putObject(bucketName, resourceInfo.asByteSource().read(), fileName);
}
}
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public void writeFile(byte[] contents, String bucketName, String path)
{
try (MinioClient minioClient = createMinioClient()) {
minioClient.putObject(bucketName, contents, path);
}
}

public MinioClient createMinioClient()
{
return new MinioClient(getMinioAddress(), MINIO_ACCESS_KEY, MINIO_SECRET_KEY);
}

public static class Builder
extends BaseTestContainer.Builder<Minio.Builder, Minio>
{
Expand All @@ -98,6 +161,10 @@ private Builder()
ImmutableSet.of(
MINIO_API_PORT,
MINIO_CONSOLE_PORT);
this.envVars = ImmutableMap.<String, String>builder()
.put("MINIO_ACCESS_KEY", MINIO_ACCESS_KEY)
.put("MINIO_SECRET_KEY", MINIO_SECRET_KEY)
.buildOrThrow();
}

@Override
Expand Down

0 comments on commit ae11dd6

Please sign in to comment.