Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-15054: [C++] Change s3 finalization to happen after arrow threads finished, add pyarrow exit hook #33858

Merged
merged 9 commits into from
Apr 15, 2023
161 changes: 99 additions & 62 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2571,100 +2571,137 @@ Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenAppendStream(

namespace {

std::mutex aws_init_lock;
Aws::SDKOptions aws_options;
std::atomic<bool> aws_initialized(false);
struct AwsInstance : public ::arrow::internal::Executor::Resource {
AwsInstance() : is_initialized_(false), is_finalized_(false) {}
~AwsInstance() { Finalize(/*from_destructor=*/true); }

// Returns true iff the instance was newly initialized with `options`
Result<bool> EnsureInitialized(const S3GlobalOptions& options) {
bool expected = false;
if (is_finalized_.load()) {
return Status::Invalid("Attempt to initialize S3 after it has been finalized");
}
if (is_initialized_.compare_exchange_strong(expected, true)) {
DoInitialize(options);
return true;
}
return false;
}

Status DoInitializeS3(const S3GlobalOptions& options) {
Aws::Utils::Logging::LogLevel aws_log_level;
bool IsInitialized() { return !is_finalized_ && is_initialized_; }

void Finalize(bool from_destructor = false) {
bool expected = true;
is_finalized_.store(true);
if (is_initialized_.compare_exchange_strong(expected, false)) {
if (from_destructor) {
ARROW_LOG(WARNING)
<< " arrow::fs::FinalizeS3 was not called even though S3 was initialized. "
"This could lead to a segmentation fault at exit";
RegionResolver::ResetDefaultInstance();
Aws::ShutdownAPI(aws_options_);
}
}
}

private:
void DoInitialize(const S3GlobalOptions& options) {
Aws::Utils::Logging::LogLevel aws_log_level;

#define LOG_LEVEL_CASE(level_name) \
case S3LogLevel::level_name: \
aws_log_level = Aws::Utils::Logging::LogLevel::level_name; \
break;

switch (options.log_level) {
LOG_LEVEL_CASE(Fatal)
LOG_LEVEL_CASE(Error)
LOG_LEVEL_CASE(Warn)
LOG_LEVEL_CASE(Info)
LOG_LEVEL_CASE(Debug)
LOG_LEVEL_CASE(Trace)
default:
aws_log_level = Aws::Utils::Logging::LogLevel::Off;
}
switch (options.log_level) {
LOG_LEVEL_CASE(Fatal)
LOG_LEVEL_CASE(Error)
LOG_LEVEL_CASE(Warn)
LOG_LEVEL_CASE(Info)
LOG_LEVEL_CASE(Debug)
LOG_LEVEL_CASE(Trace)
default:
aws_log_level = Aws::Utils::Logging::LogLevel::Off;
}
Comment on lines +2616 to +2625
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the new tab an expected change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The initialize and finalize methods were file-scoped methods previously. Now they have become the constructor/destructor of the AwsInstance class. Since they are in a class they get some indentation.


#undef LOG_LEVEL_CASE

#ifdef ARROW_S3_HAS_CRT
aws_options.ioOptions.clientBootstrap_create_fn =
[ev_threads = options.num_event_loop_threads]() {
// https://github.com/aws/aws-sdk-cpp/blob/1.11.15/src/aws-cpp-sdk-core/source/Aws.cpp#L65
Aws::Crt::Io::EventLoopGroup event_loop_group(ev_threads);
Aws::Crt::Io::DefaultHostResolver default_host_resolver(
event_loop_group, /*maxHosts=*/8, /*maxTTL=*/30);
auto client_bootstrap = Aws::MakeShared<Aws::Crt::Io::ClientBootstrap>(
"Aws_Init_Cleanup", event_loop_group, default_host_resolver);
client_bootstrap->EnableBlockingShutdown();
return client_bootstrap;
};
aws_options_.ioOptions.clientBootstrap_create_fn =
[ev_threads = options.num_event_loop_threads]() {
// https://github.com/aws/aws-sdk-cpp/blob/1.11.15/src/aws-cpp-sdk-core/source/Aws.cpp#L65
Aws::Crt::Io::EventLoopGroup event_loop_group(ev_threads);
Aws::Crt::Io::DefaultHostResolver default_host_resolver(
event_loop_group, /*maxHosts=*/8, /*maxTTL=*/30);
auto client_bootstrap = Aws::MakeShared<Aws::Crt::Io::ClientBootstrap>(
"Aws_Init_Cleanup", event_loop_group, default_host_resolver);
client_bootstrap->EnableBlockingShutdown();
return client_bootstrap;
};
#endif

aws_options.loggingOptions.logLevel = aws_log_level;
// By default the AWS SDK logs to files, log to console instead
aws_options.loggingOptions.logger_create_fn = [] {
return std::make_shared<Aws::Utils::Logging::ConsoleLogSystem>(
aws_options.loggingOptions.logLevel);
};
aws_options_.loggingOptions.logLevel = aws_log_level;
// By default the AWS SDK logs to files, log to console instead
aws_options_.loggingOptions.logger_create_fn = [this] {
return std::make_shared<Aws::Utils::Logging::ConsoleLogSystem>(
aws_options_.loggingOptions.logLevel);
};
#if (defined(AWS_SDK_VERSION_MAJOR) && \
(AWS_SDK_VERSION_MAJOR > 1 || AWS_SDK_VERSION_MINOR > 9 || \
(AWS_SDK_VERSION_MINOR == 9 && AWS_SDK_VERSION_PATCH >= 272)))
// ARROW-18290: escape all special chars for compatibility with non-AWS S3 backends.
// This configuration options is only available with AWS SDK 1.9.272 and later.
aws_options.httpOptions.compliantRfc3986Encoding = true;
// ARROW-18290: escape all special chars for compatibility with non-AWS S3 backends.
// This configuration options is only available with AWS SDK 1.9.272 and later.
aws_options_.httpOptions.compliantRfc3986Encoding = true;
#endif
Aws::InitAPI(aws_options);
aws_initialized.store(true);
return Status::OK();
Aws::InitAPI(aws_options_);
}

Aws::SDKOptions aws_options_;
std::atomic<bool> is_initialized_;
std::atomic<bool> is_finalized_;
};

std::shared_ptr<AwsInstance> CreateAwsInstance() {
auto instance = std::make_shared<AwsInstance>();
// Don't let S3 be shutdown until all Arrow threads are done using it
arrow::internal::GetCpuThreadPool()->KeepAlive(instance);
io::internal::GetIOThreadPool()->KeepAlive(instance);
return instance;
}

Status DoFinalizeS3() {
RegionResolver::ResetDefaultInstance();
Aws::ShutdownAPI(aws_options);
aws_initialized.store(false);
return Status::OK();
AwsInstance& GetAwsInstance() {
static auto instance = CreateAwsInstance();
return *instance;
}

Result<bool> EnsureAwsInstanceInitialized(const S3GlobalOptions& options) {
return GetAwsInstance().EnsureInitialized(options);
}

} // namespace

Status InitializeS3(const S3GlobalOptions& options) {
std::lock_guard<std::mutex> lock(aws_init_lock);
return DoInitializeS3(options);
}

Status EnsureS3Initialized() {
std::lock_guard<std::mutex> lock(aws_init_lock);
if (!aws_initialized.load()) {
S3GlobalOptions options{S3LogLevel::Fatal};
return DoInitializeS3(options);
ARROW_ASSIGN_OR_RAISE(bool successfully_initialized,
EnsureAwsInstanceInitialized(options));
if (!successfully_initialized) {
return Status::Invalid(
"S3 was already initialized. It is safe to use but the options passed in this "
"call have been ignored.");
}
return Status::OK();
}

Status FinalizeS3() {
std::lock_guard<std::mutex> lock(aws_init_lock);
return DoFinalizeS3();
Status EnsureS3Initialized() {
return EnsureAwsInstanceInitialized({S3LogLevel::Fatal}).status();
}

Status EnsureS3Finalized() {
std::lock_guard<std::mutex> lock(aws_init_lock);
if (aws_initialized.load()) {
return DoFinalizeS3();
}
Status FinalizeS3() {
GetAwsInstance().Finalize();
return Status::OK();
}

bool IsS3Initialized() { return aws_initialized.load(); }
Status EnsureS3Finalized() { return FinalizeS3(); }

bool IsS3Initialized() { return GetAwsInstance().IsInitialized(); }

// -----------------------------------------------------------------------
// Top-level utility functions
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/filesystem/s3fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,9 @@ struct ARROW_EXPORT S3GlobalOptions {

/// Initialize the S3 APIs. It is required to call this function at least once
/// before using S3FileSystem.
///
/// Once this function is called you MUST call FinalizeS3 before the end of the
/// application in order to avoid a segmentation fault at shutdown.
ARROW_EXPORT
Status InitializeS3(const S3GlobalOptions& options);

Expand Down
7 changes: 4 additions & 3 deletions cpp/src/arrow/filesystem/s3fs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class S3TestMixin : public AwsTestMixin {
Status connect_status;
int retries = kNumServerRetries;
do {
InitServerAndClient();
ASSERT_OK(InitServerAndClient());
connect_status = OutcomeToStatus("ListBuckets", client_->ListBuckets());
} while (!connect_status.ok() && --retries > 0);
ASSERT_OK(connect_status);
Expand All @@ -198,8 +198,8 @@ class S3TestMixin : public AwsTestMixin {
}

protected:
void InitServerAndClient() {
ASSERT_OK_AND_ASSIGN(minio_, GetMinioEnv()->GetOneServer());
Status InitServerAndClient() {
ARROW_ASSIGN_OR_RAISE(minio_, GetMinioEnv()->GetOneServer());
client_config_.reset(new Aws::Client::ClientConfiguration());
client_config_->endpointOverride = ToAwsString(minio_->connect_string());
client_config_->scheme = Aws::Http::Scheme::HTTP;
Expand All @@ -211,6 +211,7 @@ class S3TestMixin : public AwsTestMixin {
new Aws::S3::S3Client(credentials_, *client_config_,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
use_virtual_addressing));
return Status::OK();
}

// How many times to try launching a server in a row before decreeing failure
Expand Down
2 changes: 2 additions & 0 deletions python/pyarrow/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
_not_imported.append("S3FileSystem")
else:
ensure_s3_initialized()
import atexit
atexit.register(finalize_s3)


def __getattr__(name):
Expand Down
14 changes: 14 additions & 0 deletions r/R/arrow-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ supported_dplyr_methods <- list(
explain = NULL
)

# This should be run at session exit and must be called
# to avoid a segmentation fault at shutdown
finalize_s3 <- function(env) {
FinalizeS3()
}

# Helper environment to register the exit hook
s3_finalizer <- new.env(parent = emptyenv())

#' @importFrom vctrs s3_register vec_size vec_cast vec_unique
.onLoad <- function(...) {
# Make sure C++ knows on which thread it is safe to call the R API
Expand Down Expand Up @@ -147,6 +156,11 @@ supported_dplyr_methods <- list(
# Register extension types that we use internally
reregister_extension_type(vctrs_extension_type(vctrs::unspecified()))

# Registers a callback to run at session exit
# This can't be done in .onUnload or .onDetach because those hooks are
# not guaranteed to run (e.g. they only run if the user unloads arrow)
reg.finalizer(s3_finalizer, finalize_s3, onexit = TRUE)

invisible()
}

Expand Down
4 changes: 4 additions & 0 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions r/src/arrowExports.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions r/src/filesystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,13 @@ std::string fs___S3FileSystem__region(const std::shared_ptr<fs::S3FileSystem>& f

#endif

// [[arrow::export]]
void FinalizeS3() {
#if defined(ARROW_R_WITH_S3)
StopIfNotOk(fs::FinalizeS3());
#endif
}

#if defined(ARROW_R_WITH_GCS)

#include <arrow/filesystem/gcsfs.h>
Expand Down