Skip to content

Commit

Permalink
Fix hang in MultiRead with O_DIRECT and io_uring (#10368)
Browse files Browse the repository at this point in the history
Summary:
Fix bug in O_DIRECT and io_uring when its EOF and bytes_read =
0 because of wrong check, it got added into incomplete list and gets stuck in an infinite loop as it will always return bytes_read = 0. The bug was introduced by PR #10197 and that PR is not released yet in any release branch.

Pull Request resolved: #10368

Test Plan: Added new unit test

Reviewed By: siying

Differential Revision: D37885184

Pulled By: akankshamahajan15

fbshipit-source-id: 35b36a44b696d29b2f6f25301aa1b19547b4e03b
  • Loading branch information
akankshamahajan15 authored and siying committed Jul 18, 2022
1 parent dcc3eb3 commit 70ba7cb
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 9 deletions.
75 changes: 74 additions & 1 deletion env/env_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1401,7 +1401,7 @@ TEST_P(EnvPosixTestWithParam, MultiRead) {
}
});

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
std::unique_ptr<RandomAccessFile> file;
std::vector<ReadRequest> reqs(3);
std::vector<std::unique_ptr<char, Deleter>> data;
Expand Down Expand Up @@ -1522,6 +1522,79 @@ TEST_F(EnvPosixTest, MultiReadNonAlignedLargeNum) {
}
}

#ifndef ROCKSDB_LITE
TEST_F(EnvPosixTest, NonAlignedDirectIOMultiReadBeyondFileSize) {
EnvOptions soptions;
soptions.use_direct_reads = true;
soptions.use_direct_writes = false;
std::string fname = test::PerThreadDBPath(env_, "testfile");

Random rnd(301);
std::unique_ptr<WritableFile> wfile;
size_t alignment = 0;
// Create file.
{
ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
auto data_ptr = NewAligned(4095, 'b');
Slice data_b(data_ptr.get(), 4095);
ASSERT_OK(wfile->PositionedAppend(data_b, 0U));
ASSERT_OK(wfile->Close());
}

#if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && \
!defined(OS_AIX) && !defined(OS_OPENBSD) && !defined(OS_FREEBSD)
if (soptions.use_direct_reads) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"NewRandomAccessFile:O_DIRECT", [&](void* arg) {
int* val = static_cast<int*>(arg);
*val &= ~O_DIRECT;
});
}
#endif
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

const int num_reads = 2;
// Create requests
std::vector<std::string> scratches;
scratches.reserve(num_reads);
std::vector<ReadRequest> reqs(num_reads);

std::unique_ptr<RandomAccessFile> file;
ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
alignment = file->GetRequiredBufferAlignment();
ASSERT_EQ(num_reads, reqs.size());

std::vector<std::unique_ptr<char, Deleter>> data;

std::vector<size_t> offsets = {0, 2047};
std::vector<size_t> lens = {2047, 4096 - 2047};

for (size_t i = 0; i < num_reads; i++) {
// Do alignment
reqs[i].offset = static_cast<uint64_t>(TruncateToPageBoundary(
alignment, static_cast<size_t>(/*offset=*/offsets[i])));
reqs[i].len =
Roundup(static_cast<size_t>(/*offset=*/offsets[i]) + /*length=*/lens[i],
alignment) -
reqs[i].offset;

size_t new_capacity = Roundup(reqs[i].len, alignment);
data.emplace_back(NewAligned(new_capacity, 0));
reqs[i].scratch = data.back().get();
}

// Query the data
ASSERT_OK(file->MultiRead(reqs.data(), reqs.size()));

// Validate results
for (size_t i = 0; i < num_reads; ++i) {
ASSERT_OK(reqs[i].status);
}

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
#endif // ROCKSDB_LITE

#if defined(ROCKSDB_IOURING_PRESENT)
void GenerateFilesAndRequest(Env* env, const std::string& fname,
std::vector<ReadRequest>* ret_reqs,
Expand Down
19 changes: 11 additions & 8 deletions env/io_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -750,14 +750,17 @@ IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs,
bytes_read, read_again);
int32_t res = cqe->res;
if (res >= 0) {
if (bytes_read == 0 && read_again) {
Slice tmp_slice;
req->status =
Read(req->offset + req_wrap->finished_len,
req->len - req_wrap->finished_len, options, &tmp_slice,
req->scratch + req_wrap->finished_len, dbg);
req->result =
Slice(req->scratch, req_wrap->finished_len + tmp_slice.size());
if (bytes_read == 0) {
if (read_again) {
Slice tmp_slice;
req->status =
Read(req->offset + req_wrap->finished_len,
req->len - req_wrap->finished_len, options, &tmp_slice,
req->scratch + req_wrap->finished_len, dbg);
req->result =
Slice(req->scratch, req_wrap->finished_len + tmp_slice.size());
}
// else It means EOF so no need to do anything.
} else if (bytes_read < req_wrap->iov.iov_len) {
incomplete_rq_list.push_back(req_wrap);
}
Expand Down

0 comments on commit 70ba7cb

Please sign in to comment.