Skip to content

Commit

Permalink
Add support for single-step write to S3, instead of multi-part
Browse files Browse the repository at this point in the history
* Primarily for compatibility with GCS, where the S3 compatibility mode
does not support S3 multi-part uploads. Controlled by the new
config parameter `s3.use_multipart_upload`, default `true`.

* Add test for S3 direct write

* Add note to HISTORY
  • Loading branch information
ihnorton committed Apr 30, 2019
1 parent 51284e4 commit fa62416
Show file tree
Hide file tree
Showing 15 changed files with 362 additions and 26 deletions.
2 changes: 2 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
* The tile MBR in the in-memory fragment metadata are organized into an R-Tree, speeding up tile overlap operations during subarray reads.
* Improved encryption key validation process when opening already open arrays. Fixes issue with indefinite growing of the URI to encryption key mapping in `StorageManager` (the mapping is no longer needed).
* Improved dense write performance in some benchmarks. [#1229](https://github.com/TileDB-Inc/TileDB/pull/1229)
* Support for direct writes without using the S3 multi-part API. Allows writing to
Google Cloud Storage S3 compatibility mode. [#1219](https://github.com/TileDB-Inc/TileDB/pull/1219)

## Bug fixes

Expand Down
25 changes: 25 additions & 0 deletions doc/source/tutorials/working-with-s3.rst
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,31 @@ Now you are ready to start writing TileDB programs! When creating a TileDB conte
However, we suggest to always check whether the default values are the desired ones
for your application.

GCS
---

TileDB has experimental support for `Google Cloud Storage S3 compatibility mode
<https://cloud.google.com/storage/docs/interoperability>`_. GCS S3 compatibility
`does not support multipart uploads <https://cloud.google.com/storage/docs/migrating#methods-comparison>`_,
therefore we have added a configuration option to disable multipart uploads and
use single ``PutObject`` requests instead. Uploads in this mode may be slower
than comparable operations using AWS S3. Currently, ``vfs.s3.multipart_part_size``
should be set to a value larger than the expected file size, up to available
memory or backend limits (the maximum object size for GCS is 5 TB).


.. table:: TileDB GCS config settings
:widths: auto

==================================== =======================
**Parameter** **Value**
------------------------------------ -----------------------
``"vfs.s3.region"`` ``"(region must be configured!)"``
``"vfs.s3.use_multipart_upload"`` ``"false"``
``"vfs.s3.multipart_part_size"`` ``5000000000000``
``"max_parallel_ops"`` ``1``
==================================== =======================

minio
-----

Expand Down
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ set(TILEDB_TEST_SOURCES
src/unit-lru_cache.cc
src/unit-rtree.cc
src/unit-s3.cc
src/unit-s3-no-multipart.cc
src/unit-status.cc
src/unit-tbb.cc
src/unit-threadpool.cc
Expand Down
14 changes: 14 additions & 0 deletions test/src/unit-capi-config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ void check_save_to_file() {
ss << "vfs.s3.region us-east-1\n";
ss << "vfs.s3.request_timeout_ms 3000\n";
ss << "vfs.s3.scheme https\n";
ss << "vfs.s3.use_multipart_upload true\n";
ss << "vfs.s3.use_virtual_addressing true\n";

std::ifstream ifs("test_config.txt");
Expand Down Expand Up @@ -398,6 +399,7 @@ TEST_CASE("C API: Test config iter", "[capi], [config]") {
all_param_values["vfs.s3.aws_secret_access_key"] = "";
all_param_values["vfs.s3.endpoint_override"] = "";
all_param_values["vfs.s3.use_virtual_addressing"] = "true";
all_param_values["vfs.s3.use_multipart_upload"] = "true";
all_param_values["vfs.s3.max_parallel_ops"] =
std::to_string(std::thread::hardware_concurrency());
all_param_values["vfs.s3.multipart_part_size"] = "5242880";
Expand Down Expand Up @@ -428,6 +430,7 @@ TEST_CASE("C API: Test config iter", "[capi], [config]") {
vfs_param_values["s3.aws_secret_access_key"] = "";
vfs_param_values["s3.endpoint_override"] = "";
vfs_param_values["s3.use_virtual_addressing"] = "true";
vfs_param_values["s3.use_multipart_upload"] = "true";
vfs_param_values["s3.max_parallel_ops"] =
std::to_string(std::thread::hardware_concurrency());
vfs_param_values["s3.multipart_part_size"] = "5242880";
Expand All @@ -451,6 +454,7 @@ TEST_CASE("C API: Test config iter", "[capi], [config]") {
s3_param_values["aws_secret_access_key"] = "";
s3_param_values["endpoint_override"] = "";
s3_param_values["use_virtual_addressing"] = "true";
s3_param_values["use_multipart_upload"] = "true";
s3_param_values["max_parallel_ops"] =
std::to_string(std::thread::hardware_concurrency());
s3_param_values["multipart_part_size"] = "5242880";
Expand Down Expand Up @@ -576,5 +580,15 @@ TEST_CASE(
rc =
tiledb_config_set(config, "vfs.s3.use_virtual_addressing", "False", &err);
CHECK(rc == TILEDB_OK);

rc = tiledb_config_set(config, "vfs.s3.use_multipart_upload", "TRUE", &err);
CHECK(rc == TILEDB_OK);
rc = tiledb_config_set(config, "vfs.s3.use_multipart_upload", "True", &err);
CHECK(rc == TILEDB_OK);
rc = tiledb_config_set(config, "vfs.s3.use_multipart_upload", "FALSE", &err);
CHECK(rc == TILEDB_OK);
rc = tiledb_config_set(config, "vfs.s3.use_multipart_upload", "False", &err);
CHECK(rc == TILEDB_OK);

tiledb_config_free(&config);
}
2 changes: 1 addition & 1 deletion test/src/unit-cppapi-config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,5 @@ TEST_CASE("C++ API: Config iterator", "[cppapi], [cppapi-config]") {
names.push_back(it->first);
}
// Check number of VFS params in default config object.
CHECK(names.size() == 25);
CHECK(names.size() == 26);
}
170 changes: 170 additions & 0 deletions test/src/unit-s3-no-multipart.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/**
* @file unit-s3.cc
*
* @section LICENSE
*
* The MIT License
*
* @copyright Copyright (c) 2017-2019 TileDB, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*
* @section DESCRIPTION
*
* Tests for S3 API filesystem functions.
*/

#ifdef HAVE_S3

#include "catch.hpp"
#include "tiledb/sm/filesystem/s3.h"
#include "tiledb/sm/global_state/unit_test_config.h"
#include "tiledb/sm/misc/thread_pool.h"
#include "tiledb/sm/misc/utils.h"
#include "tiledb/sm/storage_manager/config.h"

#include <fstream>
#include <thread>

using namespace tiledb::sm;

struct S3DirectFx {
const std::string S3_PREFIX = "s3://";
const tiledb::sm::URI S3_BUCKET =
tiledb::sm::URI(S3_PREFIX + random_bucket_name("tiledb") + "/");
const std::string TEST_DIR = S3_BUCKET.to_string() + "tiledb_test_dir/";
tiledb::sm::S3 s3_;
ThreadPool thread_pool_;

S3DirectFx();
~S3DirectFx();

static std::string random_bucket_name(const std::string& prefix);
};

S3DirectFx::S3DirectFx() {
// Connect
Config::S3Params s3_config;
#ifndef TILEDB_TESTS_AWS_S3_CONFIG
s3_config.endpoint_override_ = "localhost:9999";
s3_config.scheme_ = "http";
s3_config.use_virtual_addressing_ = false;
#endif
s3_config.use_multipart_upload_ = false;
REQUIRE(thread_pool_.init(2).ok());
REQUIRE(s3_.init(s3_config, &thread_pool_).ok());

// Create bucket
if (s3_.is_bucket(S3_BUCKET))
REQUIRE(s3_.remove_bucket(S3_BUCKET).ok());

REQUIRE(!s3_.is_bucket(S3_BUCKET));
REQUIRE(s3_.create_bucket(S3_BUCKET).ok());

// Check if bucket is empty
bool is_empty;
REQUIRE(s3_.is_empty_bucket(S3_BUCKET, &is_empty).ok());
CHECK(is_empty);
}

S3DirectFx::~S3DirectFx() {
// Empty bucket
bool is_empty;
CHECK(s3_.is_empty_bucket(S3_BUCKET, &is_empty).ok());
if (!is_empty) {
CHECK(s3_.empty_bucket(S3_BUCKET).ok());
CHECK(s3_.is_empty_bucket(S3_BUCKET, &is_empty).ok());
CHECK(is_empty);
}

// Delete bucket
CHECK(s3_.remove_bucket(S3_BUCKET).ok());
}

TEST_CASE_METHOD(
S3DirectFx,
"Test S3 filesystem, file I/O with multipart API disabled",
"[s3]") {
// Prepare buffers
uint64_t buffer_size = 5 * 1024 * 1024;
auto write_buffer = new char[buffer_size];
for (uint64_t i = 0; i < buffer_size; i++)
write_buffer[i] = (char)('a' + (i % 26));
uint64_t buffer_size_small = 1024 * 1024;
auto write_buffer_small = new char[buffer_size_small];
for (uint64_t i = 0; i < buffer_size_small; i++)
write_buffer_small[i] = (char)('a' + (i % 26));

// Write to two files
auto largefile = TEST_DIR + "largefile";
CHECK(s3_.write(URI(largefile), write_buffer, buffer_size).ok());
CHECK(s3_.write(URI(largefile), write_buffer_small, buffer_size_small).ok());
auto smallfile = TEST_DIR + "smallfile";
CHECK(s3_.write(URI(smallfile), write_buffer_small, buffer_size_small).ok());

// Before flushing, the files do not exist
CHECK(!s3_.is_object(URI(largefile)));
CHECK(!s3_.is_object(URI(smallfile)));

// Flush the files
CHECK(s3_.flush_object(URI(largefile)).ok());
CHECK(s3_.flush_object(URI(smallfile)).ok());

// After flushing, the files exist
CHECK(s3_.is_object(URI(largefile)));
CHECK(s3_.is_object(URI(smallfile)));

// Get file sizes
uint64_t nbytes = 0;
CHECK(s3_.object_size(URI(largefile), &nbytes).ok());
CHECK(nbytes == (buffer_size + buffer_size_small));
CHECK(s3_.object_size(URI(smallfile), &nbytes).ok());
CHECK(nbytes == buffer_size_small);

// Read from the beginning
auto read_buffer = new char[26];
CHECK(s3_.read(URI(largefile), 0, read_buffer, 26).ok());
bool allok = true;
for (int i = 0; i < 26; i++) {
if (read_buffer[i] != static_cast<char>('a' + i)) {
allok = false;
break;
}
}
CHECK(allok);

// Read from a different offset
CHECK(s3_.read(URI(largefile), 11, read_buffer, 26).ok());
allok = true;
for (int i = 0; i < 26; i++) {
if (read_buffer[i] != static_cast<char>('a' + (i + 11) % 26)) {
allok = false;
break;
}
}
CHECK(allok);
}

std::string S3DirectFx::random_bucket_name(const std::string& prefix) {
std::stringstream ss;
ss << prefix << "-" << std::this_thread::get_id() << "-"
<< tiledb::sm::utils::time::timestamp_now_ms();
return ss.str();
}
#endif
4 changes: 4 additions & 0 deletions tiledb/sm/c_api/tiledb.h
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,10 @@ TILEDB_EXPORT void tiledb_config_free(tiledb_config_t** config);
* The S3 use of virtual addressing (`true` or `false`), if S3 is
* enabled. <br>
* **Default**: true
* - `vfs.s3.use_multipart_upload` <br>
* The S3 use of multi-part upload requests (`true` or `false`), if S3 is
* enabled. <br>
* **Default**: true
* - `vfs.s3.max_parallel_ops` <br>
* The maximum number of S3 backend parallel operations. <br>
* **Default**: `vfs.num_threads`
Expand Down
4 changes: 4 additions & 0 deletions tiledb/sm/cpp_api/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,10 @@ class Config {
* The S3 use of virtual addressing (`true` or `false`), if S3 is
* enabled. <br>
* **Default**: true
* - `vfs.s3.use_virtual_addressing` <br>
* The S3 use of virtual addressing (`true` or `false`), if S3 is
* enabled. <br>
* **Default**: true
* - `vfs.s3.max_parallel_ops` <br>
* The maximum number of S3 backend parallel operations. <br>
* **Default**: `vfs.num_threads`
Expand Down
Loading

0 comments on commit fa62416

Please sign in to comment.