Skip to content

Commit

Permalink
Changed s3 finalization to happen after arrow threads finished
Browse files Browse the repository at this point in the history
  • Loading branch information
westonpace committed Feb 4, 2023
1 parent ecb1f2e commit 50294cb
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 51 deletions.
104 changes: 56 additions & 48 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2568,84 +2568,92 @@ namespace {

std::mutex aws_init_lock;
Aws::SDKOptions aws_options;
std::atomic<bool> aws_initialized(false);
bool aws_initialized = false;

Status DoInitializeS3(const S3GlobalOptions& options) {
Aws::Utils::Logging::LogLevel aws_log_level;
struct AwsInstance : public ::arrow::internal::Executor::Resource {
AwsInstance(const S3GlobalOptions& options) {
Aws::Utils::Logging::LogLevel aws_log_level;

#define LOG_LEVEL_CASE(level_name) \
case S3LogLevel::level_name: \
aws_log_level = Aws::Utils::Logging::LogLevel::level_name; \
break;

switch (options.log_level) {
LOG_LEVEL_CASE(Fatal)
LOG_LEVEL_CASE(Error)
LOG_LEVEL_CASE(Warn)
LOG_LEVEL_CASE(Info)
LOG_LEVEL_CASE(Debug)
LOG_LEVEL_CASE(Trace)
default:
aws_log_level = Aws::Utils::Logging::LogLevel::Off;
}
switch (options.log_level) {
LOG_LEVEL_CASE(Fatal)
LOG_LEVEL_CASE(Error)
LOG_LEVEL_CASE(Warn)
LOG_LEVEL_CASE(Info)
LOG_LEVEL_CASE(Debug)
LOG_LEVEL_CASE(Trace)
default:
aws_log_level = Aws::Utils::Logging::LogLevel::Off;
}

#undef LOG_LEVEL_CASE

aws_options.loggingOptions.logLevel = aws_log_level;
// By default the AWS SDK logs to files, log to console instead
aws_options.loggingOptions.logger_create_fn = [] {
return std::make_shared<Aws::Utils::Logging::ConsoleLogSystem>(
aws_options.loggingOptions.logLevel);
};
aws_options.loggingOptions.logLevel = aws_log_level;
// By default the AWS SDK logs to files, log to console instead
aws_options.loggingOptions.logger_create_fn = [] {
return std::make_shared<Aws::Utils::Logging::ConsoleLogSystem>(
aws_options.loggingOptions.logLevel);
};
#if (defined(AWS_SDK_VERSION_MAJOR) && \
(AWS_SDK_VERSION_MAJOR > 1 || AWS_SDK_VERSION_MINOR > 9 || \
(AWS_SDK_VERSION_MINOR == 9 && AWS_SDK_VERSION_PATCH >= 272)))
// ARROW-18290: escape all special chars for compatibility with non-AWS S3 backends.
// This configuration options is only available with AWS SDK 1.9.272 and later.
aws_options.httpOptions.compliantRfc3986Encoding = true;
// ARROW-18290: escape all special chars for compatibility with non-AWS S3 backends.
// This configuration options is only available with AWS SDK 1.9.272 and later.
aws_options.httpOptions.compliantRfc3986Encoding = true;
#endif
Aws::InitAPI(aws_options);
aws_initialized.store(true);
return Status::OK();
Aws::InitAPI(aws_options);
}

~AwsInstance() {
RegionResolver::ResetDefaultInstance();
Aws::ShutdownAPI(aws_options);
}
};

std::shared_ptr<AwsInstance> CreateAwsInstance(const S3GlobalOptions& options) {
aws_initialized = true;
auto instance = std::make_shared<AwsInstance>(options);
// Don't let S3 be shutdown until all Arrow threads are done using it
arrow::internal::GetCpuThreadPool()->KeepAlive(instance);
io::internal::GetIOThreadPool()->KeepAlive(instance);
return instance;
}

Status DoFinalizeS3() {
RegionResolver::ResetDefaultInstance();
Aws::ShutdownAPI(aws_options);
aws_initialized.store(false);
return Status::OK();
std::shared_ptr<AwsInstance>* GetAwsInstance(const S3GlobalOptions& options) {
static auto instance = CreateAwsInstance(options);
return &instance;
}

} // namespace

Status InitializeS3(const S3GlobalOptions& options) {
std::lock_guard<std::mutex> lock(aws_init_lock);
return DoInitializeS3(options);
}

Status EnsureS3Initialized() {
std::lock_guard<std::mutex> lock(aws_init_lock);
if (!aws_initialized.load()) {
S3GlobalOptions options{S3LogLevel::Fatal};
return DoInitializeS3(options);
std::lock_guard lock(aws_init_lock);
if (!(*GetAwsInstance(options))) {
return Status::Invalid("S3 has already been initialized and finalized");
}
return Status::OK();
}

Status FinalizeS3() {
std::lock_guard<std::mutex> lock(aws_init_lock);
return DoFinalizeS3();
}
Status EnsureS3Initialized() { return InitializeS3({S3LogLevel::Fatal}); }

Status EnsureS3Finalized() {
std::lock_guard<std::mutex> lock(aws_init_lock);
if (aws_initialized.load()) {
return DoFinalizeS3();
Status FinalizeS3() {
std::lock_guard lock(aws_init_lock);
if (aws_initialized) {
GetAwsInstance({})->reset();
}
return Status::OK();
}

bool IsS3Initialized() { return aws_initialized.load(); }
Status EnsureS3Finalized() { return FinalizeS3(); }

bool IsS3Initialized() {
std::lock_guard lock(aws_init_lock);
return aws_initialized;
}

// -----------------------------------------------------------------------
// Top-level utility functions
Expand Down
7 changes: 4 additions & 3 deletions cpp/src/arrow/filesystem/s3fs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class S3TestMixin : public AwsTestMixin {
Status connect_status;
int retries = kNumServerRetries;
do {
InitServerAndClient();
ASSERT_OK(InitServerAndClient());
connect_status = OutcomeToStatus("ListBuckets", client_->ListBuckets());
} while (!connect_status.ok() && --retries > 0);
ASSERT_OK(connect_status);
Expand All @@ -194,8 +194,8 @@ class S3TestMixin : public AwsTestMixin {
void TearDown() override { AwsTestMixin::TearDown(); }

protected:
void InitServerAndClient() {
ASSERT_OK_AND_ASSIGN(minio_, GetMinioEnv()->GetOneServer());
Status InitServerAndClient() {
ARROW_ASSIGN_OR_RAISE(minio_, GetMinioEnv()->GetOneServer());
client_config_.reset(new Aws::Client::ClientConfiguration());
client_config_->endpointOverride = ToAwsString(minio_->connect_string());
client_config_->scheme = Aws::Http::Scheme::HTTP;
Expand All @@ -207,6 +207,7 @@ class S3TestMixin : public AwsTestMixin {
new Aws::S3::S3Client(credentials_, *client_config_,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
use_virtual_addressing));
return Status::OK();
}

// How many times to try launching a server in a row before decreeing failure
Expand Down
2 changes: 2 additions & 0 deletions python/pyarrow/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
_not_imported.append("S3FileSystem")
else:
initialize_s3()
import atexit
atexit.register(finalize_s3)


def __getattr__(name):
Expand Down

0 comments on commit 50294cb

Please sign in to comment.