Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support asyncio for windows #13262

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions db/db_test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,11 @@ Options DBTestBase::GetOptions(
"NewRandomAccessFile:O_DIRECT");
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
"NewWritableFile:O_DIRECT");
#elif defined(OS_WIN)
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
"NewRandomAccessFile:FILE_FLAG_NO_BUFFERING");
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
"NewWritableFile:FILE_FLAG_NO_BUFFERING");
#endif
// kMustFreeHeapAllocations -> indicates ASAN build
if (kMustFreeHeapAllocations && !options_override.full_block_cache) {
Expand Down
4 changes: 4 additions & 0 deletions env/env_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1408,6 +1408,7 @@ TEST_P(EnvPosixTestWithParam, MultiRead) {
for (uint32_t attempt = 0; attempt < 20; attempt++) {
// Random Read
Random rnd(301 + attempt);
#ifndef OS_WIN // Windows currently does not read remaining data.
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"UpdateResults::io_uring_result", [&](void* arg) {
if (attempt > 0) {
Expand All @@ -1421,6 +1422,7 @@ TEST_P(EnvPosixTestWithParam, MultiRead) {
}
}
});
#endif

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
std::unique_ptr<RandomAccessFile> file;
Expand Down Expand Up @@ -1478,6 +1480,7 @@ TEST_F(EnvPosixTest, MultiReadNonAlignedLargeNum) {
// too long, we can modify the io uring depth with SyncPoint here.
const int num_reads = rnd.Uniform(512) + 1;

#ifndef OS_WIN // Windows currently does not read remaining data.
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"UpdateResults::io_uring_result", [&](void* arg) {
if (attempt > 5) {
Expand All @@ -1494,6 +1497,7 @@ TEST_F(EnvPosixTest, MultiReadNonAlignedLargeNum) {
}
}
});
#endif
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

// Generate (offset, len) pairs
Expand Down
2 changes: 2 additions & 0 deletions file/prefetch_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1938,6 +1938,7 @@ TEST_P(PrefetchTest, AvoidBlockCacheLookupTwice) {
Close();
}

#ifndef OS_WIN
TEST_P(PrefetchTest, DBIterAsyncIONoIOUring) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
Expand Down Expand Up @@ -2038,6 +2039,7 @@ TEST_P(PrefetchTest, DBIterAsyncIONoIOUring) {

enable_io_uring = true;
}
#endif

class PrefetchTest1 : public DBTestBase,
public ::testing::WithParamInterface<bool> {
Expand Down
117 changes: 116 additions & 1 deletion port/win/env_win.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "rocksdb/env.h"
#include "rocksdb/slice.h"
#include "strsafe.h"
#include "test_util/sync_point.h"
#include "util/string_util.h"

// Undefine the functions windows might use (again)...
Expand Down Expand Up @@ -245,6 +246,8 @@ IOStatus WinFileSystem::NewSequentialFile(

if (options.use_direct_reads && !options.use_mmap_reads) {
fileFlags |= FILE_FLAG_NO_BUFFERING;
TEST_SYNC_POINT_CALLBACK("NewSequentialFile:FILE_FLAG_NO_BUFFERING",
&fileFlags);
}

{
Expand Down Expand Up @@ -278,10 +281,19 @@ IOStatus WinFileSystem::NewRandomAccessFile(

if (options.use_direct_reads && !options.use_mmap_reads) {
fileFlags |= FILE_FLAG_NO_BUFFERING;
TEST_SYNC_POINT_CALLBACK("NewRandomAccessFile:FILE_FLAG_NO_BUFFERING",
&fileFlags);
} else {
fileFlags |= FILE_FLAG_RANDOM_ACCESS;
}

// Open in async mode which makes Windows allow more parallelism even
// if we need to do sync I/O on top of it. This is directly related to
// PositionedReadInternal() implementation
if (!options.use_mmap_reads) {
fileFlags |= FILE_FLAG_OVERLAPPED;
}

/// Shared access is necessary for corruption test to pass
// almost all tests would work with a possible exception of fault_injection
HANDLE hFile = 0;
Expand Down Expand Up @@ -350,7 +362,8 @@ IOStatus WinFileSystem::NewRandomAccessFile(
fileGuard.release();
}
} else {
result->reset(new WinRandomAccessFile(fname, hFile, page_size_, options));
result->reset(
new WinRandomAccessFileAsyncIo(fname, hFile, page_size_, options));
fileGuard.release();
}
return s;
Expand All @@ -370,6 +383,8 @@ IOStatus WinFileSystem::OpenWritableFile(

if (local_options.use_direct_writes && !local_options.use_mmap_writes) {
fileFlags = FILE_FLAG_NO_BUFFERING | FILE_FLAG_WRITE_THROUGH;
TEST_SYNC_POINT_CALLBACK("NewWritableFile:FILE_FLAG_NO_BUFFERING",
&fileFlags);
}

// Desired access. We are want to write only here but if we want to memory
Expand Down Expand Up @@ -469,6 +484,8 @@ IOStatus WinFileSystem::NewRandomRWFile(const std::string& fname,

if (options.use_direct_reads && options.use_direct_writes) {
file_flags |= FILE_FLAG_NO_BUFFERING;
TEST_SYNC_POINT_CALLBACK("NewRandomRWFile:FILE_FLAG_NO_BUFFERING",
&file_flags);
}

/// Shared access is necessary for corruption test to pass
Expand Down Expand Up @@ -1170,6 +1187,104 @@ FileOptions WinFileSystem::OptimizeForManifestRead(
return optimized;
}

void WinFileSystem::SupportedOps(int64_t& supported_ops) {
supported_ops = 0;
supported_ops |= (1 << FSSupportedOps::kAsyncIO);
}

IOStatus WinFileSystem::Poll(std::vector<void*>& io_handles,
size_t /*min_completions*/) {
for (size_t i = 0; i < io_handles.size(); i++) {
auto win_handle = static_cast<Win_IOHandle*>(io_handles[i]);

if (win_handle->is_finished) {
continue;
}

FSReadRequest req;
req.scratch = win_handle->scratch;
req.offset = win_handle->offset;
req.len = win_handle->len;
DWORD bytes_read = 0;
if (FALSE != GetOverlappedResult(win_handle->file_data->GetFileHandle(),
&win_handle->overlapped, &bytes_read,
TRUE)) {
TEST_SYNC_POINT_CALLBACK(
"UpdateResults::io_uring_result",
&bytes_read); // For testing compatibility with io_uring only
req.result = Slice(req.scratch, bytes_read);
req.status = IOStatus::OK();
} else {
auto err = GetLastError();
if (ERROR_HANDLE_EOF != err) {
req.result = Slice(req.scratch, 0);
req.status = IOErrorFromWindowsError(
"GetOverlappedResult failed: " + win_handle->file_data->GetName(),
err);
} else { // EOF
req.result = Slice(req.scratch, 0);
req.status = IOStatus::OK();
}
}

win_handle->is_finished = true;
win_handle->cb(req, win_handle->cb_arg);

if (win_handle->overlapped.hEvent != NULL) {
CloseHandle(win_handle->overlapped.hEvent);
win_handle->overlapped.hEvent = NULL;
}
}
return IOStatus::OK();
}

IOStatus WinFileSystem::AbortIO(std::vector<void*>& io_handles) {
for (size_t i = 0; i < io_handles.size(); i++) {
auto win_handle = static_cast<Win_IOHandle*>(io_handles[i]);
if (win_handle->is_finished) {
continue;
}

if (win_handle->overlapped.hEvent == NULL) {
return IOStatus::InvalidArgument(
"AbortIO: Overlapped event is NULL for: " +
win_handle->file_data->GetName());
}

if (FALSE == CancelIoEx(win_handle->file_data->GetFileHandle(),
&win_handle->overlapped)) {
auto err = GetLastError();
// If this CancelIoEx cannot find a request to cancel, GetLastError
// returns ERROR_NOT_FOUND
if (err != ERROR_NOT_FOUND) {
return IOErrorFromWindowsError(
"CancelIoEx failed: " + win_handle->file_data->GetName(), err);
}
} else {
// CancelIoEx succeeded
// Wait for the operation to complete
DWORD bytes_read = 0;
if (!GetOverlappedResult(win_handle->file_data->GetFileHandle(),
&win_handle->overlapped, &bytes_read, TRUE)) {
if (GetLastError() != ERROR_OPERATION_ABORTED) {
return IOErrorFromWindowsError(
"AbortIO failed: " + win_handle->file_data->GetName(),
GetLastError());
}
}
}

win_handle->is_finished = true;
FSReadRequest req;
req.status = IOStatus::Aborted();
win_handle->cb(req, win_handle->cb_arg);

CloseHandle(win_handle->overlapped.hEvent);
win_handle->overlapped.hEvent = NULL;
}
return IOStatus::OK();
}

// Returns true iff the named directory exists and is a directory.
bool WinFileSystem::DirExists(const std::string& dname) {
WIN32_FILE_ATTRIBUTE_DATA attrs;
Expand Down
5 changes: 4 additions & 1 deletion port/win/env_win.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,10 @@ class WinFileSystem : public FileSystem {
const FileOptions& file_options) const override;
FileOptions OptimizeForManifestWrite(
const FileOptions& file_options) const override;
void SupportedOps(int64_t& supported_ops) override { supported_ops = 0; }
virtual void SupportedOps(int64_t& supported_ops) override;
virtual IOStatus Poll(std::vector<void*>& io_handles,
size_t min_completions) override;
virtual IOStatus AbortIO(std::vector<void*>& io_handles) override;

protected:
static uint64_t FileTimeToUnixTime(const FILETIME& ftTime);
Expand Down
Loading
Loading