From fa62416cc31bd132481179fbd3d5721d0b3fb520 Mon Sep 17 00:00:00 2001 From: Isaiah Norton Date: Tue, 23 Apr 2019 15:41:16 -0400 Subject: [PATCH] Add support for single-step write to S3, instead of multi-part * Primarily for compatibility with GCS, where the S3 compatibility mode does not support S3 multi-part uploads. Controlled by the new config parameter `s3.use_multipart_upload`, default `true`. * Add test for S3 direct write * Add note to HISTORY --- HISTORY.md | 2 + doc/source/tutorials/working-with-s3.rst | 25 ++++ test/CMakeLists.txt | 1 + test/src/unit-capi-config.cc | 14 ++ test/src/unit-cppapi-config.cc | 2 +- test/src/unit-s3-no-multipart.cc | 170 +++++++++++++++++++++++ tiledb/sm/c_api/tiledb.h | 4 + tiledb/sm/cpp_api/config.h | 4 + tiledb/sm/filesystem/s3.cc | 103 +++++++++++--- tiledb/sm/filesystem/s3.h | 22 ++- tiledb/sm/misc/constants.cc | 3 + tiledb/sm/misc/constants.h | 3 + tiledb/sm/query/writer.cc | 3 +- tiledb/sm/storage_manager/config.cc | 23 +++ tiledb/sm/storage_manager/config.h | 9 ++ 15 files changed, 362 insertions(+), 26 deletions(-) create mode 100644 test/src/unit-s3-no-multipart.cc diff --git a/HISTORY.md b/HISTORY.md index 234ffb065aed..1bf323570db5 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -12,6 +12,8 @@ * The tile MBR in the in-memory fragment metadata are organized into an R-Tree, speeding up tile overlap operations during subarray reads. * Improved encryption key validation process when opening already open arrays. Fixes issue with indefinite growing of the URI to encryption key mapping in `StorageManager` (the mapping is no longer needed). * Improved dense write performance in some benchmarks. [#1229](https://github.com/TileDB-Inc/TileDB/pull/1229) +* Support for direct writes without using the S3 multi-part API. Allows writing to + Google Cloud Storage S3 compatibility mode. [#1219](https://github.com/TileDB-Inc/TileDB/pull/1219) ## Bug fixes diff --git a/doc/source/tutorials/working-with-s3.rst b/doc/source/tutorials/working-with-s3.rst index 51369a49583a..8d3b0fbbcce4 100644 --- a/doc/source/tutorials/working-with-s3.rst +++ b/doc/source/tutorials/working-with-s3.rst @@ -92,6 +92,31 @@ Now you are ready to start writing TileDB programs! When creating a TileDB conte However, we suggest to always check whether the default values are the desired ones for your application. +GCS +--- + +TileDB has experimental support for `Google Cloud Storage S3 compatibility mode +`_. GCS S3 compatibility +`does not support multipart uploads `_, +therefore we have added a configuration option to disable multipart uploads and +use single ``PutObject`` requests instead. Uploads in this mode may be slower +than comparable operations using AWS S3. Currently, ``vfs.s3.multipart_part_size`` +should be set to a value larger than the expected file size, up to available +memory or backend limits (the maximum object size for GCS is 5 TB). + + +.. table:: TileDB GCS config settings + :widths: auto + + ==================================== ======================= + **Parameter** **Value** + ------------------------------------ ----------------------- + ``"vfs.s3.region"`` ``"(region must be configured!)"`` + ``"vfs.s3.use_multipart_upload"`` ``"false"`` + ``"vfs.s3.multipart_part_size"`` ``5000000000000`` + ``"max_parallel_ops"`` ``1`` + ==================================== ======================= + minio ----- diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 4abd5d049dca..92d22fda9f1d 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -82,6 +82,7 @@ set(TILEDB_TEST_SOURCES src/unit-lru_cache.cc src/unit-rtree.cc src/unit-s3.cc + src/unit-s3-no-multipart.cc src/unit-status.cc src/unit-tbb.cc src/unit-threadpool.cc diff --git a/test/src/unit-capi-config.cc b/test/src/unit-capi-config.cc index 38da5616a497..86b6950e97f0 100644 --- a/test/src/unit-capi-config.cc +++ b/test/src/unit-capi-config.cc @@ -225,6 +225,7 @@ void check_save_to_file() { ss << "vfs.s3.region us-east-1\n"; ss << "vfs.s3.request_timeout_ms 3000\n"; ss << "vfs.s3.scheme https\n"; + ss << "vfs.s3.use_multipart_upload true\n"; ss << "vfs.s3.use_virtual_addressing true\n"; std::ifstream ifs("test_config.txt"); @@ -398,6 +399,7 @@ TEST_CASE("C API: Test config iter", "[capi], [config]") { all_param_values["vfs.s3.aws_secret_access_key"] = ""; all_param_values["vfs.s3.endpoint_override"] = ""; all_param_values["vfs.s3.use_virtual_addressing"] = "true"; + all_param_values["vfs.s3.use_multipart_upload"] = "true"; all_param_values["vfs.s3.max_parallel_ops"] = std::to_string(std::thread::hardware_concurrency()); all_param_values["vfs.s3.multipart_part_size"] = "5242880"; @@ -428,6 +430,7 @@ TEST_CASE("C API: Test config iter", "[capi], [config]") { vfs_param_values["s3.aws_secret_access_key"] = ""; vfs_param_values["s3.endpoint_override"] = ""; vfs_param_values["s3.use_virtual_addressing"] = "true"; + vfs_param_values["s3.use_multipart_upload"] = "true"; vfs_param_values["s3.max_parallel_ops"] = std::to_string(std::thread::hardware_concurrency()); vfs_param_values["s3.multipart_part_size"] = "5242880"; @@ -451,6 +454,7 @@ TEST_CASE("C API: Test config iter", "[capi], [config]") { s3_param_values["aws_secret_access_key"] = ""; s3_param_values["endpoint_override"] = ""; s3_param_values["use_virtual_addressing"] = "true"; + s3_param_values["use_multipart_upload"] = "true"; s3_param_values["max_parallel_ops"] = std::to_string(std::thread::hardware_concurrency()); s3_param_values["multipart_part_size"] = "5242880"; @@ -576,5 +580,15 @@ TEST_CASE( rc = tiledb_config_set(config, "vfs.s3.use_virtual_addressing", "False", &err); CHECK(rc == TILEDB_OK); + + rc = tiledb_config_set(config, "vfs.s3.use_multipart_upload", "TRUE", &err); + CHECK(rc == TILEDB_OK); + rc = tiledb_config_set(config, "vfs.s3.use_multipart_upload", "True", &err); + CHECK(rc == TILEDB_OK); + rc = tiledb_config_set(config, "vfs.s3.use_multipart_upload", "FALSE", &err); + CHECK(rc == TILEDB_OK); + rc = tiledb_config_set(config, "vfs.s3.use_multipart_upload", "False", &err); + CHECK(rc == TILEDB_OK); + tiledb_config_free(&config); } diff --git a/test/src/unit-cppapi-config.cc b/test/src/unit-cppapi-config.cc index 165ef671b29d..4de08be33fdb 100644 --- a/test/src/unit-cppapi-config.cc +++ b/test/src/unit-cppapi-config.cc @@ -50,5 +50,5 @@ TEST_CASE("C++ API: Config iterator", "[cppapi], [cppapi-config]") { names.push_back(it->first); } // Check number of VFS params in default config object. - CHECK(names.size() == 25); + CHECK(names.size() == 26); } diff --git a/test/src/unit-s3-no-multipart.cc b/test/src/unit-s3-no-multipart.cc new file mode 100644 index 000000000000..bc111e41bfac --- /dev/null +++ b/test/src/unit-s3-no-multipart.cc @@ -0,0 +1,170 @@ +/** + * @file unit-s3.cc + * + * @section LICENSE + * + * The MIT License + * + * @copyright Copyright (c) 2017-2019 TileDB, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + * + * @section DESCRIPTION + * + * Tests for S3 API filesystem functions. + */ + +#ifdef HAVE_S3 + +#include "catch.hpp" +#include "tiledb/sm/filesystem/s3.h" +#include "tiledb/sm/global_state/unit_test_config.h" +#include "tiledb/sm/misc/thread_pool.h" +#include "tiledb/sm/misc/utils.h" +#include "tiledb/sm/storage_manager/config.h" + +#include +#include + +using namespace tiledb::sm; + +struct S3DirectFx { + const std::string S3_PREFIX = "s3://"; + const tiledb::sm::URI S3_BUCKET = + tiledb::sm::URI(S3_PREFIX + random_bucket_name("tiledb") + "/"); + const std::string TEST_DIR = S3_BUCKET.to_string() + "tiledb_test_dir/"; + tiledb::sm::S3 s3_; + ThreadPool thread_pool_; + + S3DirectFx(); + ~S3DirectFx(); + + static std::string random_bucket_name(const std::string& prefix); +}; + +S3DirectFx::S3DirectFx() { + // Connect + Config::S3Params s3_config; +#ifndef TILEDB_TESTS_AWS_S3_CONFIG + s3_config.endpoint_override_ = "localhost:9999"; + s3_config.scheme_ = "http"; + s3_config.use_virtual_addressing_ = false; +#endif + s3_config.use_multipart_upload_ = false; + REQUIRE(thread_pool_.init(2).ok()); + REQUIRE(s3_.init(s3_config, &thread_pool_).ok()); + + // Create bucket + if (s3_.is_bucket(S3_BUCKET)) + REQUIRE(s3_.remove_bucket(S3_BUCKET).ok()); + + REQUIRE(!s3_.is_bucket(S3_BUCKET)); + REQUIRE(s3_.create_bucket(S3_BUCKET).ok()); + + // Check if bucket is empty + bool is_empty; + REQUIRE(s3_.is_empty_bucket(S3_BUCKET, &is_empty).ok()); + CHECK(is_empty); +} + +S3DirectFx::~S3DirectFx() { + // Empty bucket + bool is_empty; + CHECK(s3_.is_empty_bucket(S3_BUCKET, &is_empty).ok()); + if (!is_empty) { + CHECK(s3_.empty_bucket(S3_BUCKET).ok()); + CHECK(s3_.is_empty_bucket(S3_BUCKET, &is_empty).ok()); + CHECK(is_empty); + } + + // Delete bucket + CHECK(s3_.remove_bucket(S3_BUCKET).ok()); +} + +TEST_CASE_METHOD( + S3DirectFx, + "Test S3 filesystem, file I/O with multipart API disabled", + "[s3]") { + // Prepare buffers + uint64_t buffer_size = 5 * 1024 * 1024; + auto write_buffer = new char[buffer_size]; + for (uint64_t i = 0; i < buffer_size; i++) + write_buffer[i] = (char)('a' + (i % 26)); + uint64_t buffer_size_small = 1024 * 1024; + auto write_buffer_small = new char[buffer_size_small]; + for (uint64_t i = 0; i < buffer_size_small; i++) + write_buffer_small[i] = (char)('a' + (i % 26)); + + // Write to two files + auto largefile = TEST_DIR + "largefile"; + CHECK(s3_.write(URI(largefile), write_buffer, buffer_size).ok()); + CHECK(s3_.write(URI(largefile), write_buffer_small, buffer_size_small).ok()); + auto smallfile = TEST_DIR + "smallfile"; + CHECK(s3_.write(URI(smallfile), write_buffer_small, buffer_size_small).ok()); + + // Before flushing, the files do not exist + CHECK(!s3_.is_object(URI(largefile))); + CHECK(!s3_.is_object(URI(smallfile))); + + // Flush the files + CHECK(s3_.flush_object(URI(largefile)).ok()); + CHECK(s3_.flush_object(URI(smallfile)).ok()); + + // After flushing, the files exist + CHECK(s3_.is_object(URI(largefile))); + CHECK(s3_.is_object(URI(smallfile))); + + // Get file sizes + uint64_t nbytes = 0; + CHECK(s3_.object_size(URI(largefile), &nbytes).ok()); + CHECK(nbytes == (buffer_size + buffer_size_small)); + CHECK(s3_.object_size(URI(smallfile), &nbytes).ok()); + CHECK(nbytes == buffer_size_small); + + // Read from the beginning + auto read_buffer = new char[26]; + CHECK(s3_.read(URI(largefile), 0, read_buffer, 26).ok()); + bool allok = true; + for (int i = 0; i < 26; i++) { + if (read_buffer[i] != static_cast('a' + i)) { + allok = false; + break; + } + } + CHECK(allok); + + // Read from a different offset + CHECK(s3_.read(URI(largefile), 11, read_buffer, 26).ok()); + allok = true; + for (int i = 0; i < 26; i++) { + if (read_buffer[i] != static_cast('a' + (i + 11) % 26)) { + allok = false; + break; + } + } + CHECK(allok); +} + +std::string S3DirectFx::random_bucket_name(const std::string& prefix) { + std::stringstream ss; + ss << prefix << "-" << std::this_thread::get_id() << "-" + << tiledb::sm::utils::time::timestamp_now_ms(); + return ss.str(); +} +#endif diff --git a/tiledb/sm/c_api/tiledb.h b/tiledb/sm/c_api/tiledb.h index df6db6d8824d..8ab638f53ff2 100644 --- a/tiledb/sm/c_api/tiledb.h +++ b/tiledb/sm/c_api/tiledb.h @@ -586,6 +586,10 @@ TILEDB_EXPORT void tiledb_config_free(tiledb_config_t** config); * The S3 use of virtual addressing (`true` or `false`), if S3 is * enabled.
* **Default**: true + * - `vfs.s3.use_multipart_upload`
+ * The S3 use of multi-part upload requests (`true` or `false`), if S3 is + * enabled.
+ * **Default**: true * - `vfs.s3.max_parallel_ops`
* The maximum number of S3 backend parallel operations.
* **Default**: `vfs.num_threads` diff --git a/tiledb/sm/cpp_api/config.h b/tiledb/sm/cpp_api/config.h index 6937058bf2f6..86f54fab72d4 100644 --- a/tiledb/sm/cpp_api/config.h +++ b/tiledb/sm/cpp_api/config.h @@ -346,6 +346,10 @@ class Config { * The S3 use of virtual addressing (`true` or `false`), if S3 is * enabled.
* **Default**: true + * - `vfs.s3.use_virtual_addressing`
+ * The S3 use of virtual addressing (`true` or `false`), if S3 is + * enabled.
+ * **Default**: true * - `vfs.s3.max_parallel_ops`
* The maximum number of S3 backend parallel operations.
* **Default**: `vfs.num_threads` diff --git a/tiledb/sm/filesystem/s3.cc b/tiledb/sm/filesystem/s3.cc index 629ac27f1c6c..67a50dc125e5 100644 --- a/tiledb/sm/filesystem/s3.cc +++ b/tiledb/sm/filesystem/s3.cc @@ -89,7 +89,8 @@ S3::S3() , max_parallel_ops_(1) , multipart_part_size_(0) , vfs_thread_pool_(nullptr) - , use_virtual_addressing_(false) { + , use_virtual_addressing_(false) + , use_multipart_upload_(true) { } S3::~S3() { @@ -120,6 +121,7 @@ Status S3::init( file_buffer_size_ = multipart_part_size_ * max_parallel_ops_; region_ = s3_config.region_; use_virtual_addressing_ = s3_config.use_virtual_addressing_; + use_multipart_upload_ = s3_config.use_multipart_upload_; client_config_ = std::unique_ptr( new Aws::Client::ClientConfiguration); @@ -280,7 +282,9 @@ Status S3::empty_bucket(const URI& bucket) const { Status S3::flush_object(const URI& uri) { RETURN_NOT_OK(init_client()); - + if (!use_multipart_upload_) { + return flush_direct(uri); + } if (!uri.is_s3()) { return LOG_STATUS(Status::S3Error( std::string("URI is not an S3 URI: " + uri.to_string()))); @@ -683,26 +687,30 @@ Status S3::write(const URI& uri, const void* buffer, uint64_t length) { RETURN_NOT_OK(fill_file_buffer(buff, buffer, length, &nbytes_filled)); // Flush file buffer - if (buff->size() == file_buffer_size_) - RETURN_NOT_OK(flush_file_buffer(uri, buff, is_last_part)); - - // Write chunks - uint64_t new_length = length - nbytes_filled; - uint64_t offset = nbytes_filled; - while (new_length > 0) { - if (new_length >= file_buffer_size_) { - RETURN_NOT_OK(write_multipart( - uri, (char*)buffer + offset, file_buffer_size_, is_last_part)); - offset += file_buffer_size_; - new_length -= file_buffer_size_; - } else { - RETURN_NOT_OK(fill_file_buffer( - buff, (char*)buffer + offset, new_length, &nbytes_filled)); - offset += nbytes_filled; - new_length -= nbytes_filled; + // multipart objects will flush whenever the writes exceed file_buffer_size_ + // write_direct should just append to buffer and upload later + if (use_multipart_upload_) { + if (buff->size() == file_buffer_size_) + RETURN_NOT_OK(flush_file_buffer(uri, buff, is_last_part)); + + uint64_t new_length = length - nbytes_filled; + uint64_t offset = nbytes_filled; + // Write chunks + while (new_length > 0) { + if (new_length >= file_buffer_size_) { + RETURN_NOT_OK(write_multipart( + uri, (char*)buffer + offset, file_buffer_size_, is_last_part)); + offset += file_buffer_size_; + new_length -= file_buffer_size_; + } else { + RETURN_NOT_OK(fill_file_buffer( + buff, (char*)buffer + offset, new_length, &nbytes_filled)); + offset += nbytes_filled; + new_length -= nbytes_filled; + } } + assert(offset == length); } - assert(offset == length); return Status::Ok(); } @@ -791,7 +799,6 @@ std::string S3::remove_front_slash(const std::string& path) const { Status S3::flush_file_buffer(const URI& uri, Buffer* buff, bool last_part) { RETURN_NOT_OK(init_client()); - if (buff->size() > 0) { const Status st = write_multipart(uri, buff->data(), buff->size(), last_part); @@ -912,6 +919,60 @@ bool S3::wait_for_bucket_to_be_created(const URI& bucket_uri) const { return false; } +Status S3::flush_direct(const URI& uri) { + // STATS_FUNC_IN(vfs_s3_write_direct); // + + RETURN_NOT_OK(init_client()); + + // Get file buffer + auto buff = (Buffer*)nullptr; + RETURN_NOT_OK(get_file_buffer(uri, &buff)); + + const Aws::Http::URI aws_uri(uri.c_str()); + const std::string uri_path(aws_uri.GetPath().c_str()); + + Aws::S3::Model::PutObjectRequest put_object_request; + + auto stream = std::shared_ptr( + new boost::interprocess::bufferstream((char*)buff->data(), buff->size())); + + put_object_request.SetBody(stream); + put_object_request.SetContentLength(buff->size()); + + // we only want to hash once, and must do it after setting the body + auto md5_hash = + Aws::Utils::HashingUtils::CalculateMD5(*put_object_request.GetBody()); + + put_object_request.SetContentMD5( + Aws::Utils::HashingUtils::Base64Encode(md5_hash)); + put_object_request.SetContentType("application/octet-stream"); + put_object_request.SetBucket(aws_uri.GetAuthority()); + put_object_request.SetKey(aws_uri.GetPath()); + + auto put_object_outcome = client_->PutObject(put_object_request); + if (!put_object_outcome.IsSuccess()) { + return LOG_STATUS(Status::S3Error( + std::string("Cannot write object '") + uri.c_str() + + outcome_error_message(put_object_outcome))); + } + + // verify the MD5 hash of the result + // note the etag is hex-encoded not base64 + Aws::StringStream md5_hex; + md5_hex << "\"" << Aws::Utils::HashingUtils::HexEncode(md5_hash) << "\""; + if (md5_hex.str() != put_object_outcome.GetResult().GetETag()) { + return LOG_STATUS( + Status::S3Error("Object uploaded successfully, but MD5 hash does not " + "match result from server!' ")); + } + + wait_for_object_to_propagate( + put_object_request.GetBucket(), put_object_request.GetKey()); + + return Status::Ok(); + // STATS_FUNC_OUT(vfs_s3_write_direct); +} + Status S3::write_multipart( const URI& uri, const void* buffer, uint64_t length, bool last_part) { STATS_FUNC_IN(vfs_s3_write_multipart); diff --git a/tiledb/sm/filesystem/s3.h b/tiledb/sm/filesystem/s3.h index fbf94a81a0f1..00493943e390 100644 --- a/tiledb/sm/filesystem/s3.h +++ b/tiledb/sm/filesystem/s3.h @@ -123,7 +123,7 @@ class S3 { Status empty_bucket(const URI& bucket) const; /** - * Flushes an object to S3, finalizing the multpart upload. + * Flushes an object to S3, finalizing the multipart upload. * * @param uri The URI of the object to be flushed. * @return Status @@ -427,6 +427,9 @@ class S3 { /** Whether or not to use virtual addressing. */ bool use_virtual_addressing_; + /** Whether or not to use multipart upload. */ + bool use_multipart_upload_; + /* ********************************* */ /* PRIVATE METHODS */ /* ********************************* */ @@ -557,10 +560,25 @@ class S3 { const URI& uri, Buffer* const buff); + /** + * Writes the input buffer to a file by issuing one PutObject + * request. If the file does not exist, then it is created. If the file + * exists then it is appended to. + * + * @param uri The URI of the S3 file to be written to. + * @param buffer The input buffer. + * @param length The size of the input buffer. + * @return Status + */ + Status flush_direct(const URI& uri); + /** * Writes the input buffer to a file by issuing one or more multipart upload * requests. If the file does not exist, then it is created. If the file - * exists then it is appended to. + * exists then it is appended to. This command will upload chunks of an + * in-progress write each time the parallelisation buffer size is exceeded + * (calculated as product of `multipart_part_size`` * ``max_parallel_ops`` + * configuration options). * * @param uri The URI of the S3 file to be written to. * @param buffer The input buffer. diff --git a/tiledb/sm/misc/constants.cc b/tiledb/sm/misc/constants.cc index 9e8d1c3ae78e..7c7da845224f 100644 --- a/tiledb/sm/misc/constants.cc +++ b/tiledb/sm/misc/constants.cc @@ -476,6 +476,9 @@ const std::string s3_allocation_tag = "TileDB"; /** Use virtual addressing (false for minio, true for AWS S3). */ const bool s3_use_virtual_addressing = true; +/** Use virtual addressing (true). */ +const bool s3_use_multipart_upload = true; + /** Connect timeout in milliseconds. */ const long s3_connect_timeout_ms = 3000; diff --git a/tiledb/sm/misc/constants.h b/tiledb/sm/misc/constants.h index e83d7f9eeb4d..f7d67969dc23 100644 --- a/tiledb/sm/misc/constants.h +++ b/tiledb/sm/misc/constants.h @@ -458,6 +458,9 @@ extern const std::string s3_allocation_tag; /** Use virtual addressing (false for minio, true for AWS S3). */ extern const bool s3_use_virtual_addressing; +/** Use multipart upload (false for GCS, true for AWS S3). */ +extern const bool s3_use_multipart_upload; + /** Connect timeout in milliseconds. */ extern const long s3_connect_timeout_ms; diff --git a/tiledb/sm/query/writer.cc b/tiledb/sm/query/writer.cc index 595fdd050a54..fbd489b7cafd 100644 --- a/tiledb/sm/query/writer.cc +++ b/tiledb/sm/query/writer.cc @@ -1035,8 +1035,7 @@ Status Writer::finalize_global_write_state() { global_write_state_.reset(nullptr); std::stringstream ss; ss << "Failed to finalize global write state; Number " - << "of cells written (" - << cells_written + << "of cells written (" << cells_written << ") is different from the number of cells expected (" << array_schema_->domain()->cell_num((T*)subarray_) << ") for the query subarray"; diff --git a/tiledb/sm/storage_manager/config.cc b/tiledb/sm/storage_manager/config.cc index d18feda1a756..643b243df55c 100644 --- a/tiledb/sm/storage_manager/config.cc +++ b/tiledb/sm/storage_manager/config.cc @@ -222,6 +222,8 @@ Status Config::set(const std::string& param, const std::string& value) { RETURN_NOT_OK(set_vfs_s3_endpoint_override(value)); } else if (param == "vfs.s3.use_virtual_addressing") { RETURN_NOT_OK(set_vfs_s3_use_virtual_addressing(value)); + } else if (param == "vfs.s3.use_multipart_upload") { + RETURN_NOT_OK(set_vfs_s3_use_multipart_upload(value)); } else if (param == "vfs.s3.max_parallel_ops") { RETURN_NOT_OK(set_vfs_s3_max_parallel_ops(value)); } else if (param == "vfs.s3.multipart_part_size") { @@ -435,6 +437,13 @@ Status Config::unset(const std::string& param) { "false"); param_values_["vfs.s3.use_virtual_addressing"] = value.str(); value.str(std::string()); + } else if (param == "vfs.s3.use_multipart_upload") { + vfs_params_.s3_params_.use_multipart_upload_ = + constants::s3_use_multipart_upload; + value + << ((vfs_params_.s3_params_.use_multipart_upload_) ? "true" : "false"); + param_values_["vfs.s3.use_multipart_upload"] = value.str(); + value.str(std::string()); } else if (param == "vfs.s3.max_parallel_ops") { vfs_params_.s3_params_.max_parallel_ops_ = constants::s3_max_parallel_ops; value << vfs_params_.s3_params_.max_parallel_ops_; @@ -639,6 +648,10 @@ void Config::set_default_param_values() { param_values_["vfs.s3.use_virtual_addressing"] = value.str(); value.str(std::string()); + value << ((vfs_params_.s3_params_.use_multipart_upload_) ? "true" : "false"); + param_values_["vfs.s3.use_multipart_upload"] = value.str(); + value.str(std::string()); + value << vfs_params_.s3_params_.max_parallel_ops_; param_values_["vfs.s3.max_parallel_ops"] = value.str(); value.str(std::string()); @@ -939,6 +952,16 @@ Status Config::set_vfs_s3_use_virtual_addressing(const std::string& value) { return Status::Ok(); } +Status Config::set_vfs_s3_use_multipart_upload(const std::string& value) { + bool v = false; + if (!parse_bool(value, &v).ok()) { + return LOG_STATUS(Status::ConfigError( + "Cannot set parameter; Invalid S3 multipart mode value")); + } + vfs_params_.s3_params_.use_multipart_upload_ = v; + return Status::Ok(); +} + Status Config::set_vfs_s3_max_parallel_ops(const std::string& value) { uint64_t v; RETURN_NOT_OK(utils::parse::convert(value, &v)); diff --git a/tiledb/sm/storage_manager/config.h b/tiledb/sm/storage_manager/config.h index 2706e24e4078..8f831b3d876d 100644 --- a/tiledb/sm/storage_manager/config.h +++ b/tiledb/sm/storage_manager/config.h @@ -106,6 +106,7 @@ class Config { std::string scheme_; std::string endpoint_override_; bool use_virtual_addressing_; + bool use_multipart_upload_; uint64_t max_parallel_ops_; uint64_t multipart_part_size_; long connect_timeout_ms_; @@ -125,6 +126,7 @@ class Config { scheme_ = constants::s3_scheme; endpoint_override_ = constants::s3_endpoint_override; use_virtual_addressing_ = constants::s3_use_virtual_addressing; + use_multipart_upload_ = constants::s3_use_multipart_upload; max_parallel_ops_ = constants::s3_max_parallel_ops; multipart_part_size_ = constants::s3_multipart_part_size; connect_timeout_ms_ = constants::s3_connect_timeout_ms; @@ -333,6 +335,10 @@ class Config { * The S3 use of virtual addressing (`true` or `false`), if S3 is * enabled.
* **Default**: true + * - `vfs.s3.use_multipart_upload`
+ * The S3 use of multipart upload (`true` or `false`), if S3 is + * enabled.
+ * **Default**: true * - `vfs.s3.max_parallel_ops`
* The maximum number of S3 backend parallel operations.
* **Default**: `vfs.num_threads` @@ -526,6 +532,9 @@ class Config { /** Sets the S3 virtual addressing. */ Status set_vfs_s3_use_virtual_addressing(const std::string& value); + /** Sets the S3 virtual addressing. */ + Status set_vfs_s3_use_multipart_upload(const std::string& value); + /** Sets the maximum number of parallel S3 operations. */ Status set_vfs_s3_max_parallel_ops(const std::string& value);