Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

fix: support retry when io write incompletely #818

Merged
merged 20 commits into from
Apr 26, 2021
17 changes: 15 additions & 2 deletions include/dsn/utility/fail_point.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@

#include <dsn/utility/string_view.h>

/// The only entry to define a fail point.
/// When a fail point is defined, it's referenced via the name.
/// The only entry to define a fail point with `return()` function: lambda function must be
/// return non-void type. When a fail point is defined, it's referenced via the name.
#define FAIL_POINT_INJECT_F(name, lambda) \
do { \
if (dsn_likely(!::dsn::fail::_S_FAIL_POINT_ENABLED)) \
Expand All @@ -35,6 +35,19 @@
} \
} while (0)

/// The only entry to define a fail point with `off()` function: lambda function must be
/// return void type. When a fail point is defined, it's referenced via the name.
#define FAIL_POINT_INJECT_OFF_F(name, lambda) \
do { \
if (dsn_likely(!::dsn::fail::_S_FAIL_POINT_ENABLED)) \
break; \
auto __Func = lambda; \
auto __Res = ::dsn::fail::eval(name); \
if (__Res == nullptr) { \
__Func(); \
} \
} while (0)

namespace dsn {
namespace fail {

Expand Down
48 changes: 39 additions & 9 deletions src/aio/native_linux_aio_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

#include <dsn/tool-api/async_calls.h>
#include <dsn/c/api_utilities.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/fail_point.h>

namespace dsn {

Expand Down Expand Up @@ -68,15 +70,43 @@ error_code native_linux_aio_provider::flush(dsn_handle_t fh)
error_code native_linux_aio_provider::write(const aio_context &aio_ctx,
/*out*/ uint32_t *processed_bytes)
{
ssize_t ret = pwrite(static_cast<int>((ssize_t)aio_ctx.file),
aio_ctx.buffer,
aio_ctx.buffer_size,
aio_ctx.file_offset);
if (ret < 0) {
return ERR_FILE_OPERATION_FAILED;
}
*processed_bytes = static_cast<uint32_t>(ret);
return ERR_OK;
dsn::error_code resp = ERR_OK;
uint32_t buffer_offset = 0;
do {
// ret is the written data size
uint32_t ret = pwrite(static_cast<int>((ssize_t)aio_ctx.file),
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
(char *)aio_ctx.buffer + buffer_offset,
aio_ctx.buffer_size - buffer_offset,
aio_ctx.file_offset + buffer_offset);
if (dsn_unlikely(ret < 0)) {
if (errno == EINTR) {
dwarn_f("write failed with errno={} and will retry it.", strerror(errno));
continue;
}
resp = ERR_FILE_OPERATION_FAILED;
derror_f("write failed with errno={}, return {}.", strerror(errno), resp);
return resp;
}

// mock the `ret` to reproduce the `write incomplete` case in the first write
FAIL_POINT_INJECT_OFF_F("aio_pwrite_incomplete", [&]() -> void {
if (dsn_unlikely(buffer_offset == 0)) {
--ret;
zhangyifan27 marked this conversation as resolved.
Show resolved Hide resolved
}
});

buffer_offset += ret;
if (dsn_unlikely(buffer_offset != aio_ctx.buffer_size)) {
dwarn_f("write incomplete, request_size={}, total_write_size={}, this_write_size={}, "
"and will retry it.",
aio_ctx.buffer_size,
buffer_offset,
ret);
}
} while (dsn_unlikely(buffer_offset < aio_ctx.buffer_size));

*processed_bytes = buffer_offset;
return resp;
}

error_code native_linux_aio_provider::read(const aio_context &aio_ctx,
Expand Down
10 changes: 10 additions & 0 deletions src/aio/test/aio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <dsn/tool-api/async_calls.h>
#include <dsn/utility/filesystem.h>
#include <dsn/utility/smart_pointers.h>
#include <dsn/utility/fail_point.h>

#include <gtest/gtest.h>

Expand All @@ -37,6 +38,8 @@ DEFINE_TASK_CODE_AIO(LPC_AIO_TEST, TASK_PRIORITY_COMMON, THREAD_POOL_TEST_SERVER

TEST(core, aio)
{
fail::setup();
fail::cfg("aio_pwrite_incomplete", "off()");
const char *buffer = "hello, world";
int len = (int)strlen(buffer);

Expand Down Expand Up @@ -122,6 +125,7 @@ TEST(core, aio)
}

err = file::close(fp);
fail::teardown();
EXPECT_TRUE(err == ERR_OK);

utils::filesystem::remove_path("tmp");
Expand All @@ -143,6 +147,9 @@ TEST(core, aio_share)

TEST(core, operation_failed)
{
fail::setup();
fail::cfg("aio_pwrite_incomplete", "off()");

auto fp = file::open("tmp_test_file", O_WRONLY, 0600);
EXPECT_TRUE(fp == nullptr);

Expand Down Expand Up @@ -171,16 +178,19 @@ TEST(core, operation_failed)
t = ::dsn::file::read(fp2, buffer, 512, 0, LPC_AIO_TEST, nullptr, io_callback, 0);
t->wait();
EXPECT_TRUE(*err == ERR_OK && *count == strlen(str));
EXPECT_TRUE(strncmp(buffer, str, 10) == 0);

t = ::dsn::file::read(fp2, buffer, 5, 0, LPC_AIO_TEST, nullptr, io_callback, 0);
t->wait();
EXPECT_TRUE(*err == ERR_OK && *count == 5);
EXPECT_TRUE(strncmp(buffer, str, 5) == 0);

t = ::dsn::file::read(fp2, buffer, 512, 100, LPC_AIO_TEST, nullptr, io_callback, 0);
t->wait();
ddebug("error code: %s", err->to_string());
file::close(fp);
file::close(fp2);
fail::teardown();

EXPECT_TRUE(utils::filesystem::remove_path("tmp_test_file"));
}
Expand Down