Skip to content

Commit

Permalink
Move Rubix initialization tests to HDFS module
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Dec 9, 2023
1 parent 36f0aa5 commit e9416da
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 159 deletions.
140 changes: 140 additions & 0 deletions lib/trino-hdfs/src/test/java/io/trino/hdfs/TestCachingSetup.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.hdfs;

import com.google.common.collect.ImmutableMap;
import com.qubole.rubix.core.CachingFileSystem;
import io.airlift.testing.TempFile;
import io.opentelemetry.api.OpenTelemetry;
import io.trino.filesystem.hdfs.HdfsFileSystemManager;
import io.trino.testing.TestingNodeManager;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.api.parallel.Execution;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;
import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD;

@TestInstance(PER_CLASS)
@Execution(SAME_THREAD)
class TestCachingSetup
{
@BeforeEach
@AfterEach
public void deinitializeRubix()
{
// revert static Rubix initialization done by other tests
CachingFileSystem.deinitialize();
}

@Test
public void testS3SecurityMappingAndHiveCachingMutuallyExclusive(@TempDir Path tempDirectory)
throws IOException
{
try (TempFile mappingConfig = new TempFile()) {
assertThatThrownBy(() -> createFileSystemManager(
ImmutableMap.<String, String>builder()
.put("hive.s3.security-mapping.config-file", mappingConfig.path().toString())
.put("hive.cache.enabled", "true")
.put("hive.cache.location", tempDirectory.toString())
.buildOrThrow()))
.hasMessageContaining("S3 security mapping is not compatible with Hive caching");
}
}

@Test
public void testGcsAccessTokenAndHiveCachingMutuallyExclusive(@TempDir Path tempDirectory)
{
assertThatThrownBy(() -> createFileSystemManager(
ImmutableMap.<String, String>builder()
.put("hive.gcs.use-access-token", "true")
.put("hive.cache.enabled", "true")
.put("hive.cache.location", tempDirectory.toString())
.buildOrThrow()))
.hasMessageContaining("Use of GCS access token is not compatible with Hive caching");
}

@Test
public void testHdfsImpersonationAndHiveCachingMutuallyExclusive(@TempDir Path tempDirectory)
{
assertThatThrownBy(() -> createFileSystemManager(
ImmutableMap.<String, String>builder()
.put("hive.hdfs.impersonation.enabled", "true")
.put("hive.cache.enabled", "true")
.put("hive.cache.location", tempDirectory.toString())
.buildOrThrow()))
.hasMessageContaining("HDFS impersonation is not compatible with Hive caching");
}

@Test
public void testRubixCache(@TempDir Path tempDirectory)
{
createFileSystemManager(
ImmutableMap.<String, String>builder()
.put("hive.cache.enabled", "true")
.put("hive.cache.location", tempDirectory.toString())
.buildOrThrow());
}

@Test
public void testRubixCacheWithNonExistingCacheDirectory()
{
assertThatThrownBy(() -> createFileSystemManager(
ImmutableMap.<String, String>builder()
.put("hive.cache.enabled", "true")
.put("hive.cache.start-server-on-coordinator", "true")
.put("hive.cache.location", "/tmp/non/existing/directory")
.buildOrThrow()))
.hasMessageContaining("None of the cache parent directories exists");

assertThatThrownBy(() -> createFileSystemManager(
ImmutableMap.<String, String>builder()
.put("hive.cache.enabled", "true")
.put("hive.cache.start-server-on-coordinator", "true")
.buildOrThrow()))
.hasMessageContaining("caching directories were not provided");

// cache directories should not be required when cache is not explicitly started on coordinator
createFileSystemManager(
ImmutableMap.<String, String>builder()
.put("hive.cache.enabled", "true")
.buildOrThrow());
}

private static void createFileSystemManager(Map<String, String> config)
{
HdfsFileSystemManager manager = new HdfsFileSystemManager(
ImmutableMap.<String, String>builder()
.putAll(config)
.put("boostrap.quiet", "true")
.buildOrThrow(),
true,
true,
true,
"test",
new TestingNodeManager(),
OpenTelemetry.noop());
manager.configure();
manager.create();
manager.stop();
}
}
6 changes: 0 additions & 6 deletions plugin/trino-hive-hadoop2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,6 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>com.qubole.rubix</groupId>
<artifactId>rubix-presto-shaded</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>concurrent</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,66 +14,26 @@
package io.trino.plugin.hive;

import com.google.common.collect.ImmutableMap;
import com.qubole.rubix.core.CachingFileSystem;
import io.trino.spi.Plugin;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorFactory;
import io.trino.testing.TestingConnectorContext;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.parallel.Execution;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;

import static com.google.common.collect.MoreCollectors.onlyElement;
import static com.google.common.collect.MoreCollectors.toOptional;
import static com.google.common.collect.Streams.stream;
import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.trino.plugin.hive.HiveSessionProperties.InsertExistingPartitionsBehavior.APPEND;
import static io.trino.plugin.hive.HiveSessionProperties.InsertExistingPartitionsBehavior.ERROR;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.nio.file.Files.createTempDirectory;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;
import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD;

@TestInstance(PER_CLASS)
@Execution(SAME_THREAD) // see @BeforeEach
public class TestHivePlugin
{
private Path tempDirectory;

@BeforeAll
public void setup()
throws IOException
{
tempDirectory = createTempDirectory(getClass().getSimpleName());
}

@AfterAll
public void tearDown()
throws IOException
{
deleteRecursively(tempDirectory, ALLOW_INSECURE);
}

@AfterEach
@BeforeEach
public void deinitializeRubix()
{
// revert static rubix initialization done by other tests
CachingFileSystem.deinitialize();
}

@Test
public void testCreateConnector()
{
Expand Down Expand Up @@ -141,44 +101,6 @@ public void testGlueMetastore()
.hasMessageContaining("Error: Configuration property 'hive.metastore.uri' was not used");
}

@Test
public void testS3SecurityMappingAndHiveCachingMutuallyExclusive()
throws IOException
{
Path mappingConfig = Files.createTempFile(null, null);
ConnectorFactory connectorFactory = getHiveConnectorFactory();

assertThatThrownBy(() -> connectorFactory.create(
"test",
ImmutableMap.<String, String>builder()
.put("hive.s3.security-mapping.config-file", mappingConfig.toString())
.put("hive.cache.enabled", "true")
.put("hive.metastore.uri", "thrift://foo:1234")
.put("hive.cache.location", tempDirectory.toString())
.put("bootstrap.quiet", "true")
.buildOrThrow(),
new TestingConnectorContext()))
.hasMessageContaining("S3 security mapping is not compatible with Hive caching");
}

@Test
public void testGcsAccessTokenAndHiveCachingMutuallyExclusive()
{
ConnectorFactory connectorFactory = getHiveConnectorFactory();

assertThatThrownBy(() -> connectorFactory.create(
"test",
ImmutableMap.<String, String>builder()
.put("hive.gcs.use-access-token", "true")
.put("hive.cache.enabled", "true")
.put("hive.metastore.uri", "thrift://foo:1234")
.put("hive.cache.location", tempDirectory.toString())
.put("bootstrap.quiet", "true")
.buildOrThrow(),
new TestingConnectorContext()))
.hasMessageContaining("Use of GCS access token is not compatible with Hive caching");
}

@Test
public void testImmutablePartitionsAndInsertOverwriteMutuallyExclusive()
{
Expand Down Expand Up @@ -236,81 +158,6 @@ private Object getDefaultValueInsertExistingPartitionsBehavior(Connector connect
.getDefaultValue();
}

@Test
public void testHdfsImpersonationAndHiveCachingMutuallyExclusive()
{
ConnectorFactory connectorFactory = getHiveConnectorFactory();

assertThatThrownBy(() -> connectorFactory.create(
"test",
ImmutableMap.<String, String>builder()
.put("hive.hdfs.impersonation.enabled", "true")
.put("hive.cache.enabled", "true")
.put("hive.metastore.uri", "thrift://foo:1234")
.put("hive.cache.location", tempDirectory.toString())
.put("bootstrap.quiet", "true")
.buildOrThrow(),
new TestingConnectorContext()))
.hasMessageContaining("HDFS impersonation is not compatible with Hive caching");
}

@Test
public void testRubixCache()
{
ConnectorFactory connectorFactory = getHiveConnectorFactory();

connectorFactory.create(
"test",
ImmutableMap.<String, String>builder()
.put("hive.cache.enabled", "true")
.put("hive.metastore.uri", "thrift://foo:1234")
.put("hive.cache.location", tempDirectory.toString())
.put("bootstrap.quiet", "true")
.buildOrThrow(),
new TestingConnectorContext())
.shutdown();
}

@Test
public void testRubixCacheWithNonExistingCacheDirectory()
{
ConnectorFactory connectorFactory = getHiveConnectorFactory();

assertThatThrownBy(() -> connectorFactory.create(
"test",
ImmutableMap.<String, String>builder()
.put("hive.cache.enabled", "true")
.put("hive.cache.start-server-on-coordinator", "true")
.put("hive.metastore.uri", "thrift://foo:1234")
.put("hive.cache.location", "/tmp/non/existing/directory")
.put("bootstrap.quiet", "true")
.buildOrThrow(),
new TestingConnectorContext()))
.hasMessageContaining("None of the cache parent directories exists");

assertThatThrownBy(() -> connectorFactory.create(
"test",
ImmutableMap.<String, String>builder()
.put("hive.cache.enabled", "true")
.put("hive.cache.start-server-on-coordinator", "true")
.put("hive.metastore.uri", "thrift://foo:1234")
.put("bootstrap.quiet", "true")
.buildOrThrow(),
new TestingConnectorContext()))
.hasMessageContaining("caching directories were not provided");

// cache directories should not be required when cache is not explicitly started on coordinator
connectorFactory.create(
"test",
ImmutableMap.<String, String>builder()
.put("hive.cache.enabled", "true")
.put("hive.metastore.uri", "thrift://foo:1234")
.put("bootstrap.quiet", "true")
.buildOrThrow(),
new TestingConnectorContext())
.shutdown();
}

@Test
public void testAllowAllAccessControl()
{
Expand Down

0 comments on commit e9416da

Please sign in to comment.