-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[C++][S3] Crash on exit #36346
Comments
I agree that S3 shutdown should happen earlier than on final exit (otherwise an explicit shutdown wouldn't be needed at all). I am unsure about the proposed solution, though. |
A potential solution is to:
But I don't know if that would work reliably in a multi-threaded context. |
@westonpace Do you have any idea for this problem? |
@pitrou and I had some discussion of this ticket. As @pitrou mentioned, we are currently calling finalize too late. We can't wait for the thread pools to shutdown because that happens at program exit and by then it is too late to call AWS shutdown.
This approach sounds like the simplest solution. |
How about just using diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index c3a6eb0ea..2971c4314 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -398,12 +398,19 @@ namespace {
Status CheckS3Initialized() {
if (!IsS3Initialized()) {
return Status::Invalid(
- "S3 subsystem not initialized; please call InitializeS3() "
+ "S3 subsystem is not initialized; please call InitializeS3() "
"before carrying out any S3-related operation");
}
return Status::OK();
}
+Status CheckS3Finalized() {
+ if (IsS3Finalized()) {
+ return Status::Invalid("S3 subsystem is finalized");
+ }
+ return Status::OK();
+}
+
// XXX Sanitize paths by removing leading slash?
struct S3Path {
@@ -1008,6 +1015,8 @@ class ObjectInputFile final : public io::RandomAccessFile {
content_length_(size) {}
Status Init() {
+ RETURN_NOT_OK(CheckS3Finalized());
+
// Issue a HEAD Object to get the content-length and ensure any
// errors (e.g. file not found) don't wait until the first Read() call.
if (content_length_ != kNoSize) {
@@ -1099,6 +1108,8 @@ class ObjectInputFile final : public io::RandomAccessFile {
return 0;
}
+ RETURN_NOT_OK(CheckS3Finalized());
+
// Read the desired range of bytes
ARROW_ASSIGN_OR_RAISE(S3Model::GetObjectResult result,
GetObjectRange(client_.get(), path_, position, nbytes, out));
@@ -1182,6 +1193,8 @@ class ObjectOutputStream final : public io::OutputStream {
}
Status Init() {
+ RETURN_NOT_OK(CheckS3Finalized());
+
// Initiate the multi-part upload
S3Model::CreateMultipartUploadRequest req;
req.SetBucket(ToAwsString(path_.bucket));
@@ -1217,6 +1230,8 @@ class ObjectOutputStream final : public io::OutputStream {
return Status::OK();
}
+ RETURN_NOT_OK(CheckS3Finalized());
+
S3Model::AbortMultipartUploadRequest req;
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
@@ -1245,6 +1260,8 @@ class ObjectOutputStream final : public io::OutputStream {
Future<> CloseAsync() override {
if (closed_) return Status::OK();
+ RETURN_NOT_OK(CheckS3Finalized());
+
if (current_part_) {
// Upload last part
RETURN_NOT_OK(CommitCurrentPart());
@@ -1307,6 +1324,8 @@ class ObjectOutputStream final : public io::OutputStream {
return Status::Invalid("Operation on closed stream");
}
+ RETURN_NOT_OK(CheckS3Finalized());
+
const int8_t* data_ptr = reinterpret_cast<const int8_t*>(data);
auto advance_ptr = [&data_ptr, &nbytes](const int64_t offset) {
data_ptr += offset;
@@ -1359,6 +1378,7 @@ class ObjectOutputStream final : public io::OutputStream {
if (closed_) {
return Status::Invalid("Operation on closed stream");
}
+ RETURN_NOT_OK(CheckS3Finalized());
// Wait for background writes to finish
std::unique_lock<std::mutex> lock(upload_state_->mutex);
return upload_state_->pending_parts_completed;
@@ -1367,6 +1387,7 @@ class ObjectOutputStream final : public io::OutputStream {
// Upload-related helpers
Status CommitCurrentPart() {
+ RETURN_NOT_OK(CheckS3Finalized());
ARROW_ASSIGN_OR_RAISE(auto buf, current_part_->Finish());
current_part_.reset();
current_part_size_ = 0;
@@ -1379,6 +1400,8 @@ class ObjectOutputStream final : public io::OutputStream {
Status UploadPart(const void* data, int64_t nbytes,
std::shared_ptr<Buffer> owned_buffer = nullptr) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
S3Model::UploadPartRequest req;
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
@@ -1574,6 +1597,8 @@ struct TreeWalker : public std::enable_shared_from_this<TreeWalker> {
S3Model::ListObjectsV2Request req;
Status operator()(const Result<S3Model::ListObjectsV2Outcome>& result) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
// Serialize calls to operation-specific handlers
if (!walker->ok()) {
// Early exit: avoid executing handlers if DoWalk() returned
@@ -1692,6 +1717,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
// Tests to see if a bucket exists
Result<bool> BucketExists(const std::string& bucket) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
S3Model::HeadBucketRequest req;
req.SetBucket(ToAwsString(bucket));
@@ -1709,6 +1736,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
// Create a bucket. Successful if bucket already exists.
Status CreateBucket(const std::string& bucket) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
// Check bucket exists first.
{
S3Model::HeadBucketRequest req;
@@ -1753,6 +1782,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
// Create an object with empty contents. Successful if object already exists.
Status CreateEmptyObject(const std::string& bucket, const std::string& key) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
S3Model::PutObjectRequest req;
req.SetBucket(ToAwsString(bucket));
req.SetKey(ToAwsString(key));
@@ -1768,6 +1799,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
}
Status DeleteObject(const std::string& bucket, const std::string& key) {
+ RETURN_NOT_OK(CheckS3Finalized());
S3Model::DeleteObjectRequest req;
req.SetBucket(ToAwsString(bucket));
req.SetKey(ToAwsString(key));
@@ -1777,6 +1809,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
}
Status CopyObject(const S3Path& src_path, const S3Path& dest_path) {
+ RETURN_NOT_OK(CheckS3Finalized());
S3Model::CopyObjectRequest req;
req.SetBucket(ToAwsString(dest_path.bucket));
req.SetKey(ToAwsString(dest_path.key));
@@ -1799,6 +1832,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
Result<bool> IsEmptyDirectory(
const std::string& bucket, const std::string& key,
const S3Model::HeadObjectOutcome* previous_outcome = nullptr) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
if (previous_outcome) {
// Fetch the backend from the previous error
DCHECK(!previous_outcome->IsSuccess());
@@ -1850,6 +1885,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
}
Result<bool> IsNonEmptyDirectory(const S3Path& path) {
+ RETURN_NOT_OK(CheckS3Finalized());
S3Model::ListObjectsV2Request req;
req.SetBucket(ToAwsString(path.bucket));
req.SetPrefix(ToAwsString(path.key) + kSep);
@@ -1939,6 +1975,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
// Workhorse for GetFileInfo(FileSelector...)
Status Walk(const FileSelector& select, const std::string& bucket,
const std::string& key, std::vector<FileInfo>* out) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
FileInfoCollector collector(bucket, key, select);
auto handle_error = [&](const AWSError<S3Errors>& error) -> Status {
@@ -2027,6 +2065,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
};
Future<std::shared_ptr<WalkResult>> WalkForDeleteDirAsync(const std::string& bucket,
const std::string& key) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
auto state = std::make_shared<WalkResult>();
auto handle_results = [state](const std::string& prefix,
@@ -2064,6 +2104,8 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
// Delete multiple objects at once
Future<> DeleteObjectsAsync(const std::string& bucket,
const std::vector<std::string>& keys) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
struct DeleteCallback {
const std::string bucket;
@@ -2156,6 +2198,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
static Result<std::vector<std::string>> ProcessListBuckets(
const Aws::S3::Model::ListBucketsOutcome& outcome) {
+ RETURN_NOT_OK(CheckS3Finalized());
if (!outcome.IsSuccess()) {
return ErrorToStatus(std::forward_as_tuple("When listing buckets: "), "ListBuckets",
outcome.GetError());
@@ -2169,11 +2212,13 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
}
Result<std::vector<std::string>> ListBuckets() {
+ RETURN_NOT_OK(CheckS3Finalized());
auto outcome = client_->ListBuckets();
return ProcessListBuckets(outcome);
}
Future<std::vector<std::string>> ListBucketsAsync(io::IOContext ctx) {
+ RETURN_NOT_OK(CheckS3Finalized());
auto self = shared_from_this();
return DeferNotOk(SubmitIO(ctx, [self]() { return self->client_->ListBuckets(); }))
// TODO(ARROW-12655) Change to Then(Impl::ProcessListBuckets)
@@ -2187,6 +2232,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(s));
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
RETURN_NOT_OK(ValidateFilePath(path));
+ RETURN_NOT_OK(CheckS3Finalized());
auto ptr = std::make_shared<ObjectInputFile>(client_, fs->io_context(), path);
RETURN_NOT_OK(ptr->Init());
@@ -2205,6 +2251,7 @@ class S3FileSystem::Impl : public std::enable_shared_from_this<S3FileSystem::Imp
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(info.path()));
RETURN_NOT_OK(ValidateFilePath(path));
+ RETURN_NOT_OK(CheckS3Finalized());
auto ptr =
std::make_shared<ObjectInputFile>(client_, fs->io_context(), path, info.size());
@@ -2223,6 +2270,7 @@ S3FileSystem::~S3FileSystem() {}
Result<std::shared_ptr<S3FileSystem>> S3FileSystem::Make(
const S3Options& options, const io::IOContext& io_context) {
RETURN_NOT_OK(CheckS3Initialized());
+ RETURN_NOT_OK(CheckS3Finalized());
std::shared_ptr<S3FileSystem> ptr(new S3FileSystem(options, io_context));
RETURN_NOT_OK(ptr->impl_->Init());
@@ -2250,6 +2298,8 @@ S3Options S3FileSystem::options() const { return impl_->options(); }
std::string S3FileSystem::region() const { return impl_->region(); }
Result<FileInfo> S3FileSystem::GetFileInfo(const std::string& s) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
FileInfo info;
info.set_path(s);
@@ -2313,6 +2363,8 @@ Result<FileInfo> S3FileSystem::GetFileInfo(const std::string& s) {
}
Result<FileInfoVector> S3FileSystem::GetFileInfo(const FileSelector& select) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
ARROW_ASSIGN_OR_RAISE(auto base_path, S3Path::FromString(select.base_dir));
FileInfoVector results;
@@ -2383,6 +2435,8 @@ FileInfoGenerator S3FileSystem::GetFileInfoGenerator(const FileSelector& select)
}
Status S3FileSystem::CreateDir(const std::string& s, bool recursive) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
if (path.key.empty()) {
@@ -2426,6 +2480,8 @@ Status S3FileSystem::CreateDir(const std::string& s, bool recursive) {
}
Status S3FileSystem::DeleteDir(const std::string& s) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
if (path.empty()) {
@@ -2455,6 +2511,8 @@ Status S3FileSystem::DeleteDirContents(const std::string& s, bool missing_dir_ok
}
Future<> S3FileSystem::DeleteDirContentsAsync(const std::string& s, bool missing_dir_ok) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
if (path.empty()) {
@@ -2480,6 +2538,8 @@ Status S3FileSystem::DeleteRootDirContents() {
}
Status S3FileSystem::DeleteFile(const std::string& s) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
RETURN_NOT_OK(ValidateFilePath(path));
@@ -2506,6 +2566,8 @@ Status S3FileSystem::DeleteFile(const std::string& s) {
}
Status S3FileSystem::Move(const std::string& src, const std::string& dest) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
// XXX We don't implement moving directories as it would be too expensive:
// one must copy all directory contents one by one (including object data),
// then delete the original contents.
@@ -2525,6 +2587,8 @@ Status S3FileSystem::Move(const std::string& src, const std::string& dest) {
}
Status S3FileSystem::CopyFile(const std::string& src, const std::string& dest) {
+ RETURN_NOT_OK(CheckS3Finalized());
+
ARROW_ASSIGN_OR_RAISE(auto src_path, S3Path::FromString(src));
RETURN_NOT_OK(ValidateFilePath(src_path));
ARROW_ASSIGN_OR_RAISE(auto dest_path, S3Path::FromString(dest));
@@ -2562,6 +2626,8 @@ Result<std::shared_ptr<io::OutputStream>> S3FileSystem::OpenOutputStream(
ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s));
RETURN_NOT_OK(ValidateFilePath(path));
+ RETURN_NOT_OK(CheckS3Finalized());
+
auto ptr = std::make_shared<ObjectOutputStream>(impl_->client_, io_context(), path,
impl_->options(), metadata);
RETURN_NOT_OK(ptr->Init());
@@ -2600,6 +2666,8 @@ struct AwsInstance : public ::arrow::internal::Executor::Resource {
bool IsInitialized() { return !is_finalized_ && is_initialized_; }
+ bool IsFinalized() { return is_finalized_; }
+
void Finalize(bool from_destructor = false) {
bool expected = true;
is_finalized_.store(true);
@@ -2608,9 +2676,9 @@ struct AwsInstance : public ::arrow::internal::Executor::Resource {
ARROW_LOG(WARNING)
<< " arrow::fs::FinalizeS3 was not called even though S3 was initialized. "
"This could lead to a segmentation fault at exit";
- RegionResolver::ResetDefaultInstance();
- Aws::ShutdownAPI(aws_options_);
}
+ RegionResolver::ResetDefaultInstance();
+ Aws::ShutdownAPI(aws_options_);
}
}
@@ -2672,9 +2740,6 @@ struct AwsInstance : public ::arrow::internal::Executor::Resource {
std::shared_ptr<AwsInstance> CreateAwsInstance() {
auto instance = std::make_shared<AwsInstance>();
- // Don't let S3 be shutdown until all Arrow threads are done using it
- arrow::internal::GetCpuThreadPool()->KeepAlive(instance);
- io::internal::GetIOThreadPool()->KeepAlive(instance);
return instance;
}
@@ -2713,6 +2778,8 @@ Status EnsureS3Finalized() { return FinalizeS3(); }
bool IsS3Initialized() { return GetAwsInstance().IsInitialized(); }
+bool IsS3Finalized() { return GetAwsInstance().IsFinalized(); }
+
// -----------------------------------------------------------------------
// Top-level utility functions
|
…inalize All S3 related operations are failed after we call arrow::fs::FinalizeS3().
#36437 is for the |
### Rationale for this change It is required to call `FinalizeS3` at process end to cleanly shut down resources used by the AWS SDK before it's too late. However, once `FinalizeS3` has been called, another problem appears: no AWS SDK API can be called safely anymore. Even object destructors can be dangerous. We therefore need a way to prevent unsafe use of the AWS SDK APIs, regardless of how the application issues filesystems calls. ### What changes are included in this PR? Shield all uses of the internal `S3Client` class behind a safe RAII facility `S3ClientLock`. Obtaining a `S3ClientLock` ensures that S3 finalization has not been called yet _and_ that it will not be called until the `S3ClientLock` goes out of scope. ### Are these changes tested? Yes, some Python tests exercise calling S3 APIs after explicit finalization. ### Are there any user-facing changes? Not for correctly written application code, ideally. * Closes: #36346 Authored-by: Antoine Pitrou <[email protected]> Signed-off-by: Antoine Pitrou <[email protected]>
### Rationale for this change It is required to call `FinalizeS3` at process end to cleanly shut down resources used by the AWS SDK before it's too late. However, once `FinalizeS3` has been called, another problem appears: no AWS SDK API can be called safely anymore. Even object destructors can be dangerous. We therefore need a way to prevent unsafe use of the AWS SDK APIs, regardless of how the application issues filesystems calls. ### What changes are included in this PR? Shield all uses of the internal `S3Client` class behind a safe RAII facility `S3ClientLock`. Obtaining a `S3ClientLock` ensures that S3 finalization has not been called yet _and_ that it will not be called until the `S3ClientLock` goes out of scope. ### Are these changes tested? Yes, some Python tests exercise calling S3 APIs after explicit finalization. ### Are there any user-facing changes? Not for correctly written application code, ideally. * Closes: apache#36346 Authored-by: Antoine Pitrou <[email protected]> Signed-off-by: Antoine Pitrou <[email protected]>
Describe the bug, including details regarding any error messages, version, and platform.
arrow::FinalizeS3()
doesn't call both ofRegionResolver::ResetDefaultInstance()
andAws::ShutdownAPI()
by #33858.This may cause a crash on exit by the "SubTreeFileSystem$create() with URI" R test:
arrow/r/tests/testthat/test-filesystem.R
Lines 154 to 164 in 0344a2c
For example, it's not happen on the current main but it's happen on #36230:
https://github.com/apache/arrow/actions/runs/5384835055/jobs/9793825156?pr=36230#step:6:33597
I could reproduce this by running only the test (I commented out all other tests). And here is the backtrace for the case:
is here: https://github.com/aws/aws-sdk-cpp/blob/1fb97256a2ae7211a741fda0033ef1e18d29e2f0/aws-cpp-sdk-core/source/http/curl/CurlHandleContainer.cpp#L27
And
AWS_LOGSTREAM_INFO
is here: https://github.com/aws/aws-sdk-cpp/blob/1fb97256a2ae7211a741fda0033ef1e18d29e2f0/aws-cpp-sdk-core/include/aws/core/utils/logging/LogMacros.h#L159-L168It seems that
Aws::Utils::Logging::GetLogSystem()
returns a destroyed object in the context. Note that this is called inexit()
(#33 0x00007fd53f209a60 in exit () from /usr/lib/x86_64-linux-gnu/libc.so.6
). So object destroyed order will be undefined.Can we call
RegionResolver::ResetDefaultInstance()
andAws::ShutdownAPI()
fromarrow::FinalizeS3()
again? For example:I can avoid the crash on my environment by this patch.
@westonpace What do you think about this problem?
Component(s)
C++
The text was updated successfully, but these errors were encountered: