Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

perf: implement thread-pool-based asynchrounous IO #569

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/aio/aio_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ class aio_provider

virtual aio_context *prepare_aio_context(aio_task *) = 0;

protected:
DSN_API void
complete_io(aio_task *aio, error_code err, uint32_t bytes, int delay_milliseconds = 0);
void complete_io(aio_task *aio, error_code err, uint32_t bytes, int delay_milliseconds = 0);

private:
disk_engine *_engine;
Expand Down
1 change: 1 addition & 0 deletions src/aio/disk_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "aio_provider.h"

#include <dsn/tool_api.h>
#include <dsn/utility/synchronize.h>
#include <dsn/utility/work_queue.h>

Expand Down
130 changes: 130 additions & 0 deletions src/aio/io_event_loop.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#pragma once

#include "aio_provider.h"
#include "disk_engine.h"

#include <dsn/utility/smart_pointers.h>
#include <fcntl.h>
#include <concurrentqueue/blockingconcurrentqueue.h>

namespace dsn {

static error_code do_write(aio_context *aio_ctx, uint32_t *processed_bytes)
{
ssize_t ret = pwrite(static_cast<int>((ssize_t)aio_ctx->file),
aio_ctx->buffer,
aio_ctx->buffer_size,
aio_ctx->file_offset);
if (ret < 0) {
return ERR_FILE_OPERATION_FAILED;
}
*processed_bytes = static_cast<uint32_t>(ret);
return ERR_OK;
}

static error_code do_read(aio_context *aio_ctx, uint32_t *processed_bytes)
{
ssize_t ret = pread(static_cast<int>((ssize_t)aio_ctx->file),
aio_ctx->buffer,
aio_ctx->buffer_size,
aio_ctx->file_offset);
if (ret < 0) {
return ERR_FILE_OPERATION_FAILED;
}
if (ret == 0) {
return ERR_HANDLE_EOF;
}
*processed_bytes = static_cast<uint32_t>(ret);
return ERR_OK;
}

struct io_event_t
{
public:
io_event_t(aio_task *tsk, bool async, aio_provider *provider) : _tsk(tsk), _provider(provider)
{
if (!async) {
_notify = make_unique<utils::notify_event>();
}
}
void wait()
{
if (_notify) {
_notify->wait();
}
}
void complete()
{
_processed_bytes = 0;
auto aio_ctx = _tsk->get_aio_context();
_err = ERR_UNKNOWN;
if (aio_ctx->type == AIO_Read) {
_err = do_read(aio_ctx, &_processed_bytes);
} else if (aio_ctx->type == AIO_Write) {
_err = do_write(aio_ctx, &_processed_bytes);
} else {
// TODO
}
if (_notify) {
_notify->notify();
} else {
_provider->complete_io(_tsk, _err, _processed_bytes);
}
}
error_code get_error() const { return _err; }
uint32_t get_processed_bytes() const { return _processed_bytes; }

private:
std::unique_ptr<utils::notify_event> _notify;
aio_task *_tsk;
aio_provider *_provider;

error_code _err;
uint32_t _processed_bytes;
};

class io_event_loop_t
{
public:
explicit io_event_loop_t(service_node *node)
{
_is_running = true;
_thread = std::thread([this, node]() {
task::set_tls_dsn_context(node, nullptr);

std::shared_ptr<io_event_t> evt;
while (true) {
if (dsn_unlikely(!_is_running.load(std::memory_order_relaxed))) {
break;
}
if (!_evt_que.wait_dequeue_timed(evt, 200)) {
continue;
}
evt->complete();
}
});
}

~io_event_loop_t()
{
if (!_is_running) {
return;
}
_is_running = false;
_thread.join();
}

void enqueue(std::shared_ptr<io_event_t> evt) { _evt_que.enqueue(std::move(evt)); }

private:
// one queue per thread
moodycamel::BlockingConcurrentQueue<std::shared_ptr<io_event_t>> _evt_que;
std::thread _thread;
std::atomic<bool> _is_running{false};
};

} // namespace dsn
175 changes: 23 additions & 152 deletions src/aio/native_linux_aio_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,37 +25,14 @@
*/

#include "native_linux_aio_provider.h"
#include "io_event_loop.h"

#include <fcntl.h>
#include <cstdlib>
#include <dsn/dist/fmt_logging.h>
#include <dsn/dist/replication/replication.codes.h>

namespace dsn {

native_linux_aio_provider::native_linux_aio_provider(disk_engine *disk) : aio_provider(disk)
{
memset(&_ctx, 0, sizeof(_ctx));
auto ret = io_setup(128, &_ctx); // 128 concurrent events
dassert(ret == 0, "io_setup error, ret = %d", ret);

_is_running = true;
_worker = std::thread([this, disk]() {
task::set_tls_dsn_context(node(), nullptr);
get_event();
});
}

native_linux_aio_provider::~native_linux_aio_provider()
{
if (!_is_running) {
return;
}
_is_running = false;

auto ret = io_destroy(_ctx);
dassert(ret == 0, "io_destroy error, ret = %d", ret);

_worker.join();
}
native_linux_aio_provider::~native_linux_aio_provider() {}

dsn_handle_t native_linux_aio_provider::open(const char *file_name, int flag, int pmode)
{
Expand Down Expand Up @@ -86,144 +63,38 @@ error_code native_linux_aio_provider::flush(dsn_handle_t fh)
}
}

aio_context *native_linux_aio_provider::prepare_aio_context(aio_task *tsk)
{
return new linux_disk_aio_context(tsk);
}

void native_linux_aio_provider::submit_aio_task(aio_task *aio_tsk) { aio_internal(aio_tsk, true); }

void native_linux_aio_provider::get_event()
{
struct io_event events[1];
int ret;

task::set_tls_dsn_context(node(), nullptr);

const char *name = ::dsn::tools::get_service_node_name(node());
char buffer[128];
sprintf(buffer, "%s.aio", name);
task_worker::set_name(buffer);

while (true) {
if (dsn_unlikely(!_is_running.load(std::memory_order_relaxed))) {
break;
}
ret = io_getevents(_ctx, 1, 1, events, NULL);
if (ret > 0) // should be 1
{
dassert(ret == 1, "io_getevents returns %d", ret);
struct iocb *io = events[0].obj;
complete_aio(io, static_cast<int>(events[0].res), static_cast<int>(events[0].res2));
} else {
// on error it returns a negated error number (the negative of one of the values listed
// in ERRORS
dwarn("io_getevents returns %d, you probably want to try on another machine:-(", ret);
}
}
}

void native_linux_aio_provider::complete_aio(struct iocb *io, int bytes, int err)
native_linux_aio_provider::native_linux_aio_provider(disk_engine *disk) : aio_provider(disk)
{
linux_disk_aio_context *aio = CONTAINING_RECORD(io, linux_disk_aio_context, cb);
error_code ec;
if (err != 0) {
derror("aio error, err = %s", strerror(err));
ec = ERR_FILE_OPERATION_FAILED;
} else {
ec = bytes > 0 ? ERR_OK : ERR_HANDLE_EOF;
for (int i = 0; i < 1; i++) {
_high_pri_workers.push_back(std::make_shared<io_event_loop_t>(disk->node()));
}

if (!aio->evt) {
aio_task *aio_ptr(aio->tsk);
aio->this_->complete_io(aio_ptr, ec, bytes);
} else {
aio->err = ec;
aio->bytes = bytes;
aio->evt->notify();
for (int i = 0; i < 4; i++) {
_comm_pri_workers.push_back(std::make_shared<io_event_loop_t>(disk->node()));
}
}

void native_linux_aio_provider::submit_aio_task(aio_task *aio_tsk) { aio_internal(aio_tsk, true); }

error_code native_linux_aio_provider::aio_internal(aio_task *aio_tsk,
bool async,
/*out*/ uint32_t *pbytes /*= nullptr*/)
{
struct iocb *cbs[1];
linux_disk_aio_context *aio;
int ret;

aio = (linux_disk_aio_context *)aio_tsk->get_aio_context();

memset(&aio->cb, 0, sizeof(aio->cb));

aio->this_ = this;

switch (aio->type) {
case AIO_Read:
io_prep_pread(&aio->cb,
static_cast<int>((ssize_t)aio->file),
aio->buffer,
aio->buffer_size,
aio->file_offset);
break;
case AIO_Write:
if (aio->buffer) {
io_prep_pwrite(&aio->cb,
static_cast<int>((ssize_t)aio->file),
aio->buffer,
aio->buffer_size,
aio->file_offset);
} else {
int iovcnt = aio->write_buffer_vec->size();
struct iovec *iov = (struct iovec *)alloca(sizeof(struct iovec) * iovcnt);
for (int i = 0; i < iovcnt; i++) {
const dsn_file_buffer_t &buf = aio->write_buffer_vec->at(i);
iov[i].iov_base = buf.buffer;
iov[i].iov_len = buf.size;
}
io_prep_pwritev(
&aio->cb, static_cast<int>((ssize_t)aio->file), iov, iovcnt, aio->file_offset);
}
break;
default:
derror("unknown aio type %u", static_cast<int>(aio->type));
auto evt = std::make_shared<io_event_t>(aio_tsk, async, this);
task_spec *spec = task_spec::get(aio_tsk->code().code());
if (spec->priority == dsn_task_priority_t::TASK_PRIORITY_HIGH) {
_high_pri_workers[aio_tsk->hash() % _high_pri_workers.size()]->enqueue(evt);
} else {
_comm_pri_workers[aio_tsk->hash() % _comm_pri_workers.size()]->enqueue(evt);
}

if (!async) {
aio->evt = new utils::notify_event();
aio->err = ERR_OK;
aio->bytes = 0;
if (async) {
return ERR_IO_PENDING;
}

cbs[0] = &aio->cb;
ret = io_submit(_ctx, 1, cbs);

if (ret != 1) {
if (ret < 0)
derror("io_submit error, ret = %d", ret);
else
derror("could not sumbit IOs, ret = %d", ret);

if (async) {
complete_io(aio_tsk, ERR_FILE_OPERATION_FAILED, 0);
} else {
delete aio->evt;
aio->evt = nullptr;
}
return ERR_FILE_OPERATION_FAILED;
} else {
if (async) {
return ERR_IO_PENDING;
} else {
aio->evt->wait();
delete aio->evt;
aio->evt = nullptr;
if (pbytes != nullptr) {
*pbytes = aio->bytes;
}
return aio->err;
}
evt->wait();
if (pbytes) {
*pbytes = evt->get_processed_bytes();
}
return evt->get_error();
}

} // namespace dsn
Loading