diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc index 3b5846f575c38..a22d9c10bec20 100644 --- a/cpp/src/arrow/filesystem/s3fs.cc +++ b/cpp/src/arrow/filesystem/s3fs.cc @@ -2571,100 +2571,137 @@ Result> S3FileSystem::OpenAppendStream( namespace { -std::mutex aws_init_lock; -Aws::SDKOptions aws_options; -std::atomic aws_initialized(false); +struct AwsInstance : public ::arrow::internal::Executor::Resource { + AwsInstance() : is_initialized_(false), is_finalized_(false) {} + ~AwsInstance() { Finalize(/*from_destructor=*/true); } + + // Returns true iff the instance was newly initialized with `options` + Result EnsureInitialized(const S3GlobalOptions& options) { + bool expected = false; + if (is_finalized_.load()) { + return Status::Invalid("Attempt to initialize S3 after it has been finalized"); + } + if (is_initialized_.compare_exchange_strong(expected, true)) { + DoInitialize(options); + return true; + } + return false; + } -Status DoInitializeS3(const S3GlobalOptions& options) { - Aws::Utils::Logging::LogLevel aws_log_level; + bool IsInitialized() { return !is_finalized_ && is_initialized_; } + + void Finalize(bool from_destructor = false) { + bool expected = true; + is_finalized_.store(true); + if (is_initialized_.compare_exchange_strong(expected, false)) { + if (from_destructor) { + ARROW_LOG(WARNING) + << " arrow::fs::FinalizeS3 was not called even though S3 was initialized. " + "This could lead to a segmentation fault at exit"; + RegionResolver::ResetDefaultInstance(); + Aws::ShutdownAPI(aws_options_); + } + } + } + + private: + void DoInitialize(const S3GlobalOptions& options) { + Aws::Utils::Logging::LogLevel aws_log_level; #define LOG_LEVEL_CASE(level_name) \ case S3LogLevel::level_name: \ aws_log_level = Aws::Utils::Logging::LogLevel::level_name; \ break; - switch (options.log_level) { - LOG_LEVEL_CASE(Fatal) - LOG_LEVEL_CASE(Error) - LOG_LEVEL_CASE(Warn) - LOG_LEVEL_CASE(Info) - LOG_LEVEL_CASE(Debug) - LOG_LEVEL_CASE(Trace) - default: - aws_log_level = Aws::Utils::Logging::LogLevel::Off; - } + switch (options.log_level) { + LOG_LEVEL_CASE(Fatal) + LOG_LEVEL_CASE(Error) + LOG_LEVEL_CASE(Warn) + LOG_LEVEL_CASE(Info) + LOG_LEVEL_CASE(Debug) + LOG_LEVEL_CASE(Trace) + default: + aws_log_level = Aws::Utils::Logging::LogLevel::Off; + } #undef LOG_LEVEL_CASE #ifdef ARROW_S3_HAS_CRT - aws_options.ioOptions.clientBootstrap_create_fn = - [ev_threads = options.num_event_loop_threads]() { - // https://github.com/aws/aws-sdk-cpp/blob/1.11.15/src/aws-cpp-sdk-core/source/Aws.cpp#L65 - Aws::Crt::Io::EventLoopGroup event_loop_group(ev_threads); - Aws::Crt::Io::DefaultHostResolver default_host_resolver( - event_loop_group, /*maxHosts=*/8, /*maxTTL=*/30); - auto client_bootstrap = Aws::MakeShared( - "Aws_Init_Cleanup", event_loop_group, default_host_resolver); - client_bootstrap->EnableBlockingShutdown(); - return client_bootstrap; - }; + aws_options_.ioOptions.clientBootstrap_create_fn = + [ev_threads = options.num_event_loop_threads]() { + // https://github.com/aws/aws-sdk-cpp/blob/1.11.15/src/aws-cpp-sdk-core/source/Aws.cpp#L65 + Aws::Crt::Io::EventLoopGroup event_loop_group(ev_threads); + Aws::Crt::Io::DefaultHostResolver default_host_resolver( + event_loop_group, /*maxHosts=*/8, /*maxTTL=*/30); + auto client_bootstrap = Aws::MakeShared( + "Aws_Init_Cleanup", event_loop_group, default_host_resolver); + client_bootstrap->EnableBlockingShutdown(); + return client_bootstrap; + }; #endif - - aws_options.loggingOptions.logLevel = aws_log_level; - // By default the AWS SDK logs to files, log to console instead - aws_options.loggingOptions.logger_create_fn = [] { - return std::make_shared( - aws_options.loggingOptions.logLevel); - }; + aws_options_.loggingOptions.logLevel = aws_log_level; + // By default the AWS SDK logs to files, log to console instead + aws_options_.loggingOptions.logger_create_fn = [this] { + return std::make_shared( + aws_options_.loggingOptions.logLevel); + }; #if (defined(AWS_SDK_VERSION_MAJOR) && \ (AWS_SDK_VERSION_MAJOR > 1 || AWS_SDK_VERSION_MINOR > 9 || \ (AWS_SDK_VERSION_MINOR == 9 && AWS_SDK_VERSION_PATCH >= 272))) - // ARROW-18290: escape all special chars for compatibility with non-AWS S3 backends. - // This configuration options is only available with AWS SDK 1.9.272 and later. - aws_options.httpOptions.compliantRfc3986Encoding = true; + // ARROW-18290: escape all special chars for compatibility with non-AWS S3 backends. + // This configuration options is only available with AWS SDK 1.9.272 and later. + aws_options_.httpOptions.compliantRfc3986Encoding = true; #endif - Aws::InitAPI(aws_options); - aws_initialized.store(true); - return Status::OK(); + Aws::InitAPI(aws_options_); + } + + Aws::SDKOptions aws_options_; + std::atomic is_initialized_; + std::atomic is_finalized_; +}; + +std::shared_ptr CreateAwsInstance() { + auto instance = std::make_shared(); + // Don't let S3 be shutdown until all Arrow threads are done using it + arrow::internal::GetCpuThreadPool()->KeepAlive(instance); + io::internal::GetIOThreadPool()->KeepAlive(instance); + return instance; } -Status DoFinalizeS3() { - RegionResolver::ResetDefaultInstance(); - Aws::ShutdownAPI(aws_options); - aws_initialized.store(false); - return Status::OK(); +AwsInstance& GetAwsInstance() { + static auto instance = CreateAwsInstance(); + return *instance; +} + +Result EnsureAwsInstanceInitialized(const S3GlobalOptions& options) { + return GetAwsInstance().EnsureInitialized(options); } } // namespace Status InitializeS3(const S3GlobalOptions& options) { - std::lock_guard lock(aws_init_lock); - return DoInitializeS3(options); -} - -Status EnsureS3Initialized() { - std::lock_guard lock(aws_init_lock); - if (!aws_initialized.load()) { - S3GlobalOptions options{S3LogLevel::Fatal}; - return DoInitializeS3(options); + ARROW_ASSIGN_OR_RAISE(bool successfully_initialized, + EnsureAwsInstanceInitialized(options)); + if (!successfully_initialized) { + return Status::Invalid( + "S3 was already initialized. It is safe to use but the options passed in this " + "call have been ignored."); } return Status::OK(); } -Status FinalizeS3() { - std::lock_guard lock(aws_init_lock); - return DoFinalizeS3(); +Status EnsureS3Initialized() { + return EnsureAwsInstanceInitialized({S3LogLevel::Fatal}).status(); } -Status EnsureS3Finalized() { - std::lock_guard lock(aws_init_lock); - if (aws_initialized.load()) { - return DoFinalizeS3(); - } +Status FinalizeS3() { + GetAwsInstance().Finalize(); return Status::OK(); } -bool IsS3Initialized() { return aws_initialized.load(); } +Status EnsureS3Finalized() { return FinalizeS3(); } + +bool IsS3Initialized() { return GetAwsInstance().IsInitialized(); } // ----------------------------------------------------------------------- // Top-level utility functions diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h index 2be16f869d6f0..2bccecafe8e91 100644 --- a/cpp/src/arrow/filesystem/s3fs.h +++ b/cpp/src/arrow/filesystem/s3fs.h @@ -335,6 +335,9 @@ struct ARROW_EXPORT S3GlobalOptions { /// Initialize the S3 APIs. It is required to call this function at least once /// before using S3FileSystem. +/// +/// Once this function is called you MUST call FinalizeS3 before the end of the +/// application in order to avoid a segmentation fault at shutdown. ARROW_EXPORT Status InitializeS3(const S3GlobalOptions& options); diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc b/cpp/src/arrow/filesystem/s3fs_test.cc index 6295705785580..38df84bdede76 100644 --- a/cpp/src/arrow/filesystem/s3fs_test.cc +++ b/cpp/src/arrow/filesystem/s3fs_test.cc @@ -185,7 +185,7 @@ class S3TestMixin : public AwsTestMixin { Status connect_status; int retries = kNumServerRetries; do { - InitServerAndClient(); + ASSERT_OK(InitServerAndClient()); connect_status = OutcomeToStatus("ListBuckets", client_->ListBuckets()); } while (!connect_status.ok() && --retries > 0); ASSERT_OK(connect_status); @@ -198,8 +198,8 @@ class S3TestMixin : public AwsTestMixin { } protected: - void InitServerAndClient() { - ASSERT_OK_AND_ASSIGN(minio_, GetMinioEnv()->GetOneServer()); + Status InitServerAndClient() { + ARROW_ASSIGN_OR_RAISE(minio_, GetMinioEnv()->GetOneServer()); client_config_.reset(new Aws::Client::ClientConfiguration()); client_config_->endpointOverride = ToAwsString(minio_->connect_string()); client_config_->scheme = Aws::Http::Scheme::HTTP; @@ -211,6 +211,7 @@ class S3TestMixin : public AwsTestMixin { new Aws::S3::S3Client(credentials_, *client_config_, Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, use_virtual_addressing)); + return Status::OK(); } // How many times to try launching a server in a row before decreeing failure diff --git a/python/pyarrow/fs.py b/python/pyarrow/fs.py index e8e53225fb8e9..567bea8ac05e8 100644 --- a/python/pyarrow/fs.py +++ b/python/pyarrow/fs.py @@ -59,6 +59,8 @@ _not_imported.append("S3FileSystem") else: ensure_s3_initialized() + import atexit + atexit.register(finalize_s3) def __getattr__(name): diff --git a/r/R/arrow-package.R b/r/R/arrow-package.R index a3c860a51c8f9..76c420e21fac6 100644 --- a/r/R/arrow-package.R +++ b/r/R/arrow-package.R @@ -106,6 +106,15 @@ supported_dplyr_methods <- list( explain = NULL ) +# This should be run at session exit and must be called +# to avoid a segmentation fault at shutdown +finalize_s3 <- function(env) { + FinalizeS3() +} + +# Helper environment to register the exit hook +s3_finalizer <- new.env(parent = emptyenv()) + #' @importFrom vctrs s3_register vec_size vec_cast vec_unique .onLoad <- function(...) { # Make sure C++ knows on which thread it is safe to call the R API @@ -147,6 +156,11 @@ supported_dplyr_methods <- list( # Register extension types that we use internally reregister_extension_type(vctrs_extension_type(vctrs::unspecified())) + # Registers a callback to run at session exit + # This can't be done in .onUnload or .onDetach because those hooks are + # not guaranteed to run (e.g. they only run if the user unloads arrow) + reg.finalizer(s3_finalizer, finalize_s3, onexit = TRUE) + invisible() } diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index b3dd3a96018a5..a8e8f5b8af3bf 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -1352,6 +1352,10 @@ fs___S3FileSystem__region <- function(fs) { .Call(`_arrow_fs___S3FileSystem__region`, fs) } +FinalizeS3 <- function() { + invisible(.Call(`_arrow_FinalizeS3`)) +} + fs___GcsFileSystem__Make <- function(anonymous, options) { .Call(`_arrow_fs___GcsFileSystem__Make`, anonymous, options) } diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index fb333e0c070c3..55c59f4b388e8 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -3489,6 +3489,14 @@ extern "C" SEXP _arrow_fs___S3FileSystem__region(SEXP fs_sexp){ } #endif +// filesystem.cpp +void FinalizeS3(); +extern "C" SEXP _arrow_FinalizeS3(){ +BEGIN_CPP11 + FinalizeS3(); + return R_NilValue; +END_CPP11 +} // filesystem.cpp #if defined(ARROW_R_WITH_GCS) std::shared_ptr fs___GcsFileSystem__Make(bool anonymous, cpp11::list options); @@ -5828,6 +5836,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_fs___CopyFiles", (DL_FUNC) &_arrow_fs___CopyFiles, 6}, { "_arrow_fs___S3FileSystem__create", (DL_FUNC) &_arrow_fs___S3FileSystem__create, 17}, { "_arrow_fs___S3FileSystem__region", (DL_FUNC) &_arrow_fs___S3FileSystem__region, 1}, + { "_arrow_FinalizeS3", (DL_FUNC) &_arrow_FinalizeS3, 0}, { "_arrow_fs___GcsFileSystem__Make", (DL_FUNC) &_arrow_fs___GcsFileSystem__Make, 2}, { "_arrow_fs___GcsFileSystem__options", (DL_FUNC) &_arrow_fs___GcsFileSystem__options, 1}, { "_arrow_io___Readable__Read", (DL_FUNC) &_arrow_io___Readable__Read, 2}, diff --git a/r/src/filesystem.cpp b/r/src/filesystem.cpp index cd795c0f80afb..4388d111b4a64 100644 --- a/r/src/filesystem.cpp +++ b/r/src/filesystem.cpp @@ -351,6 +351,13 @@ std::string fs___S3FileSystem__region(const std::shared_ptr& f #endif +// [[arrow::export]] +void FinalizeS3() { +#if defined(ARROW_R_WITH_S3) + StopIfNotOk(fs::FinalizeS3()); +#endif +} + #if defined(ARROW_R_WITH_GCS) #include