Skip to content

Commit

Permalink
Fix / CI improvements for cloud storage (#469)
Browse files Browse the repository at this point in the history
* Update Java read / write test suite to avoid file name collisions

* Update Java read-only test suite to avoid file name collisions

* Update Java storage operations test suite to avoid file name collisions

* Do not create per-test storage instances for local storage tests

* Avoid storage path collisions in Python file storage test suite

* Do not create separate storage instances per test for local file storage tests in Python

* Do not create separate storage instance per test for GCP Java tests

* Do not create separate storage instance per test for GCP Python tests

* Do not create separate storage instance per test for GCP Python tests (FS SPEC)

* Fix setup methods in Java CI for GCP storage tests

* Do not create separate storage instance per test for AWS and Azure Python tests

* Close arrow allocator after local storage tests (closing will check for leaks)

* Fix one test case in the file storage read/write suite

* Do not create separate storage instance per test for AWS Java tests

* Do not create separate storage instance per test for Azure Java tests

* Fix setup for Azure and AWS CI storage tests in Java

* Fix setup for Azure read-only CI storage tests in Java
  • Loading branch information
martin-traverse authored Nov 5, 2024
1 parent c524a74 commit 24eb36e
Show file tree
Hide file tree
Showing 21 changed files with 758 additions and 772 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ public abstract class StorageReadOnlyTestSuite {
@Test
void testMkdir_readOnly() throws Exception {

var dir = roStorage.mkdir("test_dir", false, dataContext);
var dir = roStorage.mkdir("testMkdir_readOnly", false, dataContext);
waitFor(TEST_TIMEOUT, dir);
Assertions.assertThrows(EStorageAccess.class, () -> getResultOf(dir));

var dirPresent = roStorage.exists("test_dir", dataContext);
var dirPresent = roStorage.exists("testMkdir_readOnly", dataContext);
waitFor(TEST_TIMEOUT, dirPresent);
Assertions.assertFalse(getResultOf(dirPresent));
}
Expand All @@ -63,11 +63,11 @@ void testRm_readOnly() throws Exception {
// The file is created independently of the storage, because the storage is assumed to be read only
// and the file cannot be created in it.

var prepare = makeSmallFile("test_file.txt", rwStorage, dataContext);
var prepare = makeSmallFile("testRm_readOnly.txt", rwStorage, dataContext);
waitFor(TEST_TIMEOUT, prepare);
getResultOf(prepare);

var created = roStorage.exists("test_file.txt", dataContext);
var created = roStorage.exists("testRm_readOnly.txt", dataContext);
waitFor(TEST_TIMEOUT, created);
Assertions.assertTrue(getResultOf(created));

Expand All @@ -77,7 +77,7 @@ void testRm_readOnly() throws Exception {

// File should not be gone

var exists = roStorage.exists("test_file.txt", dataContext);
var exists = roStorage.exists("testRm_readOnly.txt", dataContext);
waitFor(TEST_TIMEOUT, exists);
Assertions.assertTrue(getResultOf(exists));
}
Expand All @@ -89,29 +89,29 @@ void testRmdir_readOnly() throws Exception {
// The file is created independently of the storage, because the storage is assumed to be read only
// and the file cannot be created in it.

var prepare = rwStorage.mkdir("test_dir", false, dataContext);
var prepare = rwStorage.mkdir("testRmdir_readOnly", false, dataContext);
waitFor(TEST_TIMEOUT, prepare);
getResultOf(prepare);

var created = roStorage.exists("test_dir", dataContext);
var created = roStorage.exists("testRmdir_readOnly", dataContext);
waitFor(TEST_TIMEOUT, created);
Assertions.assertTrue(getResultOf(created));

var rmdir = roStorage.rmdir("test_dir", dataContext);
var rmdir = roStorage.rmdir("testRmdir_readOnly", dataContext);
waitFor(TEST_TIMEOUT, rmdir);
Assertions.assertThrows(EStorageAccess.class, () -> getResultOf(rmdir));

// Dir should not be gone

var exists = roStorage.exists("test_dir", dataContext);
var exists = roStorage.exists("testRmdir_readOnly", dataContext);
waitFor(TEST_TIMEOUT, exists);
Assertions.assertTrue(getResultOf(exists));
}

@Test
void testWriter_readOnly() {

var storagePath = "any_file.txt";
var storagePath = "testWriter_readOnly.txt";

var writeSignal = new CompletableFuture<Long>();
var writer = roStorage.writer(storagePath, writeSignal, dataContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public abstract class StorageReadWriteTestSuite {
@Test
void roundTrip_basic() throws Exception {

var storagePath = "haiku.txt";
var storagePath = "roundTrip_basic.txt";

var haiku =
"The data goes in;\n" +
Expand Down Expand Up @@ -109,7 +109,7 @@ void roundTrip_long_path() throws Exception {
@Test
void roundTrip_large() throws Exception {

var storagePath = "test_file.dat";
var storagePath = "roundTrip_large.dat";

var bytes = new byte[10 * 1024 * 1024]; // One 10 M chunk

Expand All @@ -124,7 +124,7 @@ void roundTrip_large() throws Exception {
@Test
void roundTrip_heterogeneous() throws Exception {

var storagePath = "test_file.dat";
var storagePath = "roundTrip_heterogeneous.dat";

var bytes = List.of( // Selection of different size chunks
new byte[3],
Expand All @@ -145,7 +145,7 @@ void roundTrip_heterogeneous() throws Exception {
@Test
void roundTrip_empty() throws Exception {

var storagePath = "test_file.dat";
var storagePath = "roundTrip_empty.dat";
var emptyBytes = new byte[0];

StorageReadWriteTestSuite.roundTripTest(
Expand Down Expand Up @@ -230,7 +230,7 @@ void testWrite_missingDir() throws Exception {
// Basic test without creating the parent dir first
// Should be automatically created by the writer

var storagePath = "parent_dir/haiku.txt";
var storagePath = "testWrite_missingDir/haiku.txt";

var haiku =
"The data goes in;\n" +
Expand All @@ -248,7 +248,7 @@ void testWrite_fileAlreadyExists() throws Exception {
// Writing a file always overwrites any existing content
// This is in line with cloud bucket semantics

var storagePath = "some_file.txt";
var storagePath = "testWrite_fileAlreadyExists.txt";

var prepare = makeSmallFile(storagePath, storage, dataContext);
waitFor(TEST_TIMEOUT, prepare);
Expand All @@ -270,7 +270,7 @@ void testWrite_dirAlreadyExists() throws Exception {
// File storage should not allow a file to be written if a dir exists with the same name
// TRAC prohibits this even though it is allowed in pure bucket semantics

var storagePath = "some_file.txt";
var storagePath = "testWrite_dirAlreadyExists.txt";

var prepare = storage.mkdir(storagePath, false, dataContext);
waitFor(TEST_TIMEOUT, prepare);
Expand All @@ -280,32 +280,27 @@ void testWrite_dirAlreadyExists() throws Exception {
var exists1Result = getResultOf(exists1);
Assertions.assertTrue(exists1Result);

var content = Bytes.copyToBuffer(
"Some content".getBytes(StandardCharsets.UTF_8),
dataContext.arrowAllocator());
var content = "Some content".getBytes(StandardCharsets.UTF_8);
var contentStream = Stream
.of(content)
.map(chunk -> Bytes.copyToBuffer(chunk, dataContext.arrowAllocator()));

var contentStream = Flows.publish(Stream.of(content));
var writeSignal = new CompletableFuture<Long>();
var writer = storage.writer(storagePath, writeSignal, dataContext);
contentStream.subscribe(writer);

waitFor(TEST_TIMEOUT, writeSignal);

var exists = storage.exists(storagePath, dataContext);
waitFor(TEST_TIMEOUT, exists);
Assertions.assertThrows(EStorageRequest.class, () -> {

Assertions.assertTrue(getResultOf(exists));
Flows.publish(contentStream).subscribe(writer);
waitFor(TEST_TIMEOUT, writeSignal);
getResultOf(writeSignal);
});

var contentStream2 = Flows.publish(Stream.of(content));
var writeSignal2 = new CompletableFuture<Long>();
var writer2 = storage.writer(storagePath, writeSignal2, dataContext);
// Dir should still exist after write failure

Assertions.assertThrows(EStorageRequest.class, () -> {
var stat = storage.stat(storagePath, dataContext);
waitFor(TEST_TIMEOUT, stat);

contentStream2.subscribe(writer2);
waitFor(TEST_TIMEOUT, writeSignal2);
getResultOf(writeSignal2);
});
Assertions.assertEquals(FileType.DIRECTORY, getResultOf(stat).fileType);
}

@Test
Expand Down Expand Up @@ -346,7 +341,7 @@ void testWrite_storageRoot() {
@Test
void testWrite_outsideRoot() {

var storagePath = "../any_file.txt";
var storagePath = "../testWrite_outsideRoot.txt";

var writeSignal = new CompletableFuture<Long>();

Expand All @@ -357,7 +352,7 @@ void testWrite_outsideRoot() {
@Test
void testRead_missing() {

var storagePath = "missing_file.txt";
var storagePath = "testRead_missing.txt";

var reader = storage.reader(storagePath, dataContext);
var result = new ArrayList<ArrowBuf>();
Expand Down Expand Up @@ -409,7 +404,7 @@ void testRead_storageRoot() {
@Test
void testRead_outsideRoot() {

var storagePath = "../some_file.txt";
var storagePath = "../testRead_outsideRoot.txt";

Assertions.assertThrows(EValidationGap.class, () ->
storage.reader(storagePath, dataContext));
Expand Down Expand Up @@ -445,7 +440,7 @@ void testWrite_chunksReleased() throws Exception {
// Writer takes ownership of chunks when it receives them
// This test makes sure they are being released after they have been written

var storagePath = "test_file.dat";
var storagePath = "testWrite_chunksReleased.dat";

var bytes = List.of(
new byte[3],
Expand Down Expand Up @@ -482,16 +477,16 @@ void testWrite_subscribeNever() throws Exception {

// Set up a dir in storage

var mkdir = storage.mkdir("some_dir", false, dataContext);
var mkdir = storage.mkdir("testWrite_subscribeNever", false, dataContext);
waitFor(TEST_TIMEOUT, mkdir);

var dirExists = storage.exists("some_dir", dataContext);
var dirExists = storage.exists("testWrite_subscribeNever", dataContext);
waitFor(TEST_TIMEOUT, dirExists);
Assertions.assertTrue(getResultOf(dirExists));

// Prepare a writer to write a file inside the new dir

var storagePath = "some_dir/some_file.txt";
var storagePath = "testWrite_subscribeNever/some_file.txt";
var writeSignal = new CompletableFuture<Long>();
storage.writer(storagePath, writeSignal, dataContext);

Expand All @@ -507,7 +502,7 @@ void testWrite_subscribeNever() throws Exception {
@Test
void testWrite_subscribeTwice() throws Exception {

var storagePath = "test_file.dat";
var storagePath = "testWrite_subscribeTwice.dat";

var bytes = new byte[10000];
new Random().nextBytes(bytes);
Expand Down Expand Up @@ -543,7 +538,7 @@ void testWrite_subscribeTwice() throws Exception {
@Test
void testWrite_errorImmediately() throws Exception {

var storagePath = "test_file.dat";
var storagePath = "testWrite_errorImmediately.dat";

var writerSignal = new CompletableFuture<Long>();
var writer = storage.writer(storagePath, writerSignal, dataContext);
Expand Down Expand Up @@ -590,7 +585,7 @@ void testWrite_errorImmediately() throws Exception {
@Test
void testWrite_errorAfterChunk() throws Exception {

var storagePath = "test_file.dat";
var storagePath = "testWrite_errorAfterChunk.dat";

var bytes0 = new byte[10000];
new Random().nextBytes(bytes0);
Expand Down Expand Up @@ -646,7 +641,7 @@ void testWrite_errorAfterChunk() throws Exception {
@Test
void testWrite_errorThenRetry() throws Exception {

var storagePath = "test_file.dat";
var storagePath = "testWrite_errorThenRetry.dat";
var dataSize = 10000;

var bytes = new byte[dataSize];
Expand Down Expand Up @@ -704,7 +699,7 @@ void testRead_requestUpfront() {

try {

var storagePath = "some_file.dat";
var storagePath = "testRead_requestUpfront.dat";

// Create a file big enough that it needs many chunks to read

Expand Down Expand Up @@ -770,7 +765,7 @@ void testRead_requestUpfront() {
@Test
void testRead_subscribeLate() throws Exception {

var storagePath = "some_file.txt";
var storagePath = "testRead_subscribeLate.txt";
var writeSignal = makeSmallFile(storagePath, storage, dataContext);
waitFor(TEST_TIMEOUT, writeSignal);

Expand Down Expand Up @@ -800,7 +795,7 @@ void testRead_subscribeLate() throws Exception {
@Test
void testRead_subscribeTwice() throws Exception {

var storagePath = "some_file.txt";
var storagePath = "testRead_subscribeTwice.txt";
var writeSignal = makeSmallFile(storagePath, storage, dataContext);
waitFor(TEST_TIMEOUT, writeSignal);

Expand Down Expand Up @@ -847,7 +842,7 @@ void testRead_subscribeTwice() throws Exception {
@Test
void testRead_cancelImmediately() {

var storagePath = "some_file.txt";
var storagePath = "testRead_cancelImmediately.txt";
var writeSignal = makeSmallFile(storagePath, storage, dataContext);
waitFor(TEST_TIMEOUT, writeSignal);

Expand Down Expand Up @@ -882,7 +877,7 @@ void testRead_cancelImmediately() {
@Test
void testRead_cancelAndRetry() throws Exception {

var storagePath = "some_file.dat";
var storagePath = "testRead_cancelAndRetry.dat";

// Create a file big enough that it can't be read in a single chunk

Expand Down Expand Up @@ -949,7 +944,7 @@ void testRead_cancelAndRetry() throws Exception {
@Test
void testRead_cancelAndDelete() throws Exception {

var storagePath = "some_file.dat";
var storagePath = "testRead_cancelAndDelete.dat";

// Create a file big enough that it can't be read in a single chunk

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.finos.tracdap.common.storage.local;

import org.apache.arrow.memory.BufferAllocator;
import org.finos.tracdap.common.data.DataContext;
import org.finos.tracdap.common.storage.IStorageManager;
import org.finos.tracdap.common.storage.StorageOperationsTestSuite;
Expand All @@ -24,6 +25,8 @@
import io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.arrow.memory.RootAllocator;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;

Expand All @@ -34,17 +37,39 @@
public class LocalStorageOperationsTest extends StorageOperationsTestSuite {

@TempDir
Path storageDir;
static Path storageDir;

static BufferAllocator allocator;

static LocalFileStorage storageInstance;
static DataContext contextInstance;

@BeforeAll
static void setupStorage() {

@BeforeEach
void setupStorage() {

var storageProps = new Properties();
storageProps.put(IStorageManager.PROP_STORAGE_KEY, "TEST_STORAGE");
storageProps.put(LocalFileStorage.CONFIG_ROOT_PATH, storageDir.toString());
storage = new LocalFileStorage("TEST_STORAGE", storageProps);
storageInstance = new LocalFileStorage("TEST_STORAGE", storageProps);

allocator = new RootAllocator();

var elExecutor = new DefaultEventExecutor(new DefaultThreadFactory("t-events"));
dataContext = new DataContext(elExecutor, new RootAllocator());
contextInstance = new DataContext(elExecutor, allocator);
}

@BeforeEach
void useStorageInstance() {

storage = storageInstance;
dataContext = contextInstance;
}

@AfterAll
static void tearDownStorage() {

storageInstance.close();
allocator.close();
}
}
Loading

0 comments on commit 24eb36e

Please sign in to comment.