diff --git a/include/dsn/dist/replication/replication.codes.h b/include/dsn/dist/replication/replication.codes.h index 53a8f75e0e..ce6c447176 100644 --- a/include/dsn/dist/replication/replication.codes.h +++ b/include/dsn/dist/replication/replication.codes.h @@ -64,8 +64,7 @@ MAKE_EVENT_CODE_RPC(RPC_TEST_AGENT_WRITE, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_TEST_AGENT_READ, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_AIO(LPC_AIO_TEST, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_AIO(LPC_AIO_IMMEDIATE_CALLBACK, TASK_PRIORITY_COMMON) -MAKE_EVENT_CODE_AIO(LPC_WRITE_REPLICATION_LOG_COMMON, TASK_PRIORITY_HIGH) -MAKE_EVENT_CODE_AIO(LPC_WRITE_REPLICATION_LOG_PRIVATE, TASK_PRIORITY_HIGH) +MAKE_EVENT_CODE_AIO(LPC_WRITE_REPLICATION_LOG_COMMON, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(LPC_QUERY_CONFIGURATION_ALL, TASK_PRIORITY_HIGH) MAKE_EVENT_CODE(LPC_MEM_RELEASE, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(LPC_CREATE_CHILD, TASK_PRIORITY_COMMON) @@ -183,6 +182,7 @@ MAKE_EVENT_CODE(LPC_BACKGROUND_BULK_LOAD, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(LPC_REPLICATION_LONG_LOW, TASK_PRIORITY_LOW) MAKE_EVENT_CODE(LPC_REPLICATION_LONG_COMMON, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(LPC_REPLICATION_LONG_HIGH, TASK_PRIORITY_HIGH) +MAKE_EVENT_CODE_AIO(LPC_WRITE_REPLICATION_LOG_PRIVATE, TASK_PRIORITY_COMMON) #undef CURRENT_THREAD_POOL #define CURRENT_THREAD_POOL THREAD_POOL_SLOG diff --git a/src/aio/aio_provider.h b/src/aio/aio_provider.h index 2d182aa417..c2be9a46c4 100644 --- a/src/aio/aio_provider.h +++ b/src/aio/aio_provider.h @@ -61,6 +61,8 @@ class aio_provider virtual error_code close(dsn_handle_t fh) = 0; virtual error_code flush(dsn_handle_t fh) = 0; + virtual error_code write(const aio_context &aio_ctx, /*out*/ uint32_t *processed_bytes) = 0; + virtual error_code read(const aio_context &aio_ctx, /*out*/ uint32_t *processed_bytes) = 0; // Submits the aio_task to the underlying disk-io executor. // This task may not be executed immediately, call `aio_task::wait` @@ -69,9 +71,7 @@ class aio_provider virtual aio_context *prepare_aio_context(aio_task *) = 0; -protected: - DSN_API void - complete_io(aio_task *aio, error_code err, uint32_t bytes, int delay_milliseconds = 0); + void complete_io(aio_task *aio, error_code err, uint32_t bytes, int delay_milliseconds = 0); private: disk_engine *_engine; diff --git a/src/aio/disk_engine.h b/src/aio/disk_engine.h index 199b44d562..8889b6add1 100644 --- a/src/aio/disk_engine.h +++ b/src/aio/disk_engine.h @@ -28,6 +28,7 @@ #include "aio_provider.h" +#include #include #include diff --git a/src/aio/native_linux_aio_provider.cpp b/src/aio/native_linux_aio_provider.cpp index c799f7fc9d..d08f36860b 100644 --- a/src/aio/native_linux_aio_provider.cpp +++ b/src/aio/native_linux_aio_provider.cpp @@ -26,36 +26,13 @@ #include "native_linux_aio_provider.h" -#include -#include +#include namespace dsn { -native_linux_aio_provider::native_linux_aio_provider(disk_engine *disk) : aio_provider(disk) -{ - memset(&_ctx, 0, sizeof(_ctx)); - auto ret = io_setup(128, &_ctx); // 128 concurrent events - dassert(ret == 0, "io_setup error, ret = %d", ret); - - _is_running = true; - _worker = std::thread([this]() { - task::set_tls_dsn_context(node(), nullptr); - get_event(); - }); -} - -native_linux_aio_provider::~native_linux_aio_provider() -{ - if (!_is_running) { - return; - } - _is_running = false; - - auto ret = io_destroy(_ctx); - dassert(ret == 0, "io_destroy error, ret = %d", ret); +native_linux_aio_provider::native_linux_aio_provider(disk_engine *disk) : aio_provider(disk) {} - _worker.join(); -} +native_linux_aio_provider::~native_linux_aio_provider() {} dsn_handle_t native_linux_aio_provider::open(const char *file_name, int flag, int pmode) { @@ -86,144 +63,75 @@ error_code native_linux_aio_provider::flush(dsn_handle_t fh) } } -aio_context *native_linux_aio_provider::prepare_aio_context(aio_task *tsk) +error_code native_linux_aio_provider::write(const aio_context &aio_ctx, + /*out*/ uint32_t *processed_bytes) { - return new linux_disk_aio_context(tsk); + ssize_t ret = pwrite(static_cast((ssize_t)aio_ctx.file), + aio_ctx.buffer, + aio_ctx.buffer_size, + aio_ctx.file_offset); + if (ret < 0) { + return ERR_FILE_OPERATION_FAILED; + } + *processed_bytes = static_cast(ret); + return ERR_OK; } -void native_linux_aio_provider::submit_aio_task(aio_task *aio_tsk) { aio_internal(aio_tsk, true); } - -void native_linux_aio_provider::get_event() +error_code native_linux_aio_provider::read(const aio_context &aio_ctx, + /*out*/ uint32_t *processed_bytes) { - struct io_event events[1]; - int ret; - - task::set_tls_dsn_context(node(), nullptr); - - const char *name = ::dsn::tools::get_service_node_name(node()); - char buffer[128]; - sprintf(buffer, "%s.aio", name); - task_worker::set_name(buffer); - - while (true) { - if (dsn_unlikely(!_is_running.load(std::memory_order_relaxed))) { - break; - } - ret = io_getevents(_ctx, 1, 1, events, NULL); - if (ret > 0) // should be 1 - { - dassert(ret == 1, "io_getevents returns %d", ret); - struct iocb *io = events[0].obj; - complete_aio(io, static_cast(events[0].res), static_cast(events[0].res2)); - } else { - // on error it returns a negated error number (the negative of one of the values listed - // in ERRORS - dwarn("io_getevents returns %d, you probably want to try on another machine:-(", ret); - } + ssize_t ret = pread(static_cast((ssize_t)aio_ctx.file), + aio_ctx.buffer, + aio_ctx.buffer_size, + aio_ctx.file_offset); + if (ret < 0) { + return ERR_FILE_OPERATION_FAILED; + } + if (ret == 0) { + return ERR_HANDLE_EOF; } + *processed_bytes = static_cast(ret); + return ERR_OK; } -void native_linux_aio_provider::complete_aio(struct iocb *io, int bytes, int err) +void native_linux_aio_provider::submit_aio_task(aio_task *aio_tsk) { - linux_disk_aio_context *aio = CONTAINING_RECORD(io, linux_disk_aio_context, cb); - error_code ec; - if (err != 0) { - derror("aio error, err = %s", strerror(err)); - ec = ERR_FILE_OPERATION_FAILED; - } else { - ec = bytes > 0 ? ERR_OK : ERR_HANDLE_EOF; - } - - if (!aio->evt) { - aio_task *aio_ptr(aio->tsk); - aio->this_->complete_io(aio_ptr, ec, bytes); - } else { - aio->err = ec; - aio->bytes = bytes; - aio->evt->notify(); - } + tasking::enqueue(aio_tsk->code(), + aio_tsk->tracker(), + [=]() { aio_internal(aio_tsk, true); }, + aio_tsk->hash()); } error_code native_linux_aio_provider::aio_internal(aio_task *aio_tsk, bool async, /*out*/ uint32_t *pbytes /*= nullptr*/) { - struct iocb *cbs[1]; - linux_disk_aio_context *aio; - int ret; - - aio = (linux_disk_aio_context *)aio_tsk->get_aio_context(); - - memset(&aio->cb, 0, sizeof(aio->cb)); - - aio->this_ = this; - - switch (aio->type) { + aio_context *aio_ctx = aio_tsk->get_aio_context(); + error_code err = ERR_UNKNOWN; + uint32_t processed_bytes = 0; + switch (aio_ctx->type) { case AIO_Read: - io_prep_pread(&aio->cb, - static_cast((ssize_t)aio->file), - aio->buffer, - aio->buffer_size, - aio->file_offset); + err = read(*aio_ctx, &processed_bytes); break; case AIO_Write: - if (aio->buffer) { - io_prep_pwrite(&aio->cb, - static_cast((ssize_t)aio->file), - aio->buffer, - aio->buffer_size, - aio->file_offset); - } else { - int iovcnt = aio->write_buffer_vec->size(); - struct iovec *iov = (struct iovec *)alloca(sizeof(struct iovec) * iovcnt); - for (int i = 0; i < iovcnt; i++) { - const dsn_file_buffer_t &buf = aio->write_buffer_vec->at(i); - iov[i].iov_base = buf.buffer; - iov[i].iov_len = buf.size; - } - io_prep_pwritev( - &aio->cb, static_cast((ssize_t)aio->file), iov, iovcnt, aio->file_offset); - } + err = write(*aio_ctx, &processed_bytes); break; default: - derror("unknown aio type %u", static_cast(aio->type)); + return err; } - if (!async) { - aio->evt = new utils::notify_event(); - aio->err = ERR_OK; - aio->bytes = 0; + if (pbytes) { + *pbytes = processed_bytes; } - cbs[0] = &aio->cb; - ret = io_submit(_ctx, 1, cbs); - - if (ret != 1) { - if (ret < 0) - derror("io_submit error, ret = %d", ret); - else - derror("could not sumbit IOs, ret = %d", ret); - - if (async) { - complete_io(aio_tsk, ERR_FILE_OPERATION_FAILED, 0); - } else { - delete aio->evt; - aio->evt = nullptr; - } - return ERR_FILE_OPERATION_FAILED; + if (async) { + complete_io(aio_tsk, err, processed_bytes); } else { - if (async) { - return ERR_IO_PENDING; - } else { - aio->evt->wait(); - delete aio->evt; - aio->evt = nullptr; - if (pbytes != nullptr) { - *pbytes = aio->bytes; - } - return aio->err; - } + utils::notify_event notify; + notify.notify(); } + + return err; } } // namespace dsn diff --git a/src/aio/native_linux_aio_provider.h b/src/aio/native_linux_aio_provider.h index e8600dcc74..2c5abcd6f5 100644 --- a/src/aio/native_linux_aio_provider.h +++ b/src/aio/native_linux_aio_provider.h @@ -28,16 +28,6 @@ #include "aio_provider.h" -#include -#include -#include -#include /* for perror() */ -#include /* for __NR_* definitions */ -#include -#include /* O_RDWR */ -#include /* memset() */ -#include /* uint64_t */ - namespace dsn { class native_linux_aio_provider : public aio_provider @@ -49,34 +39,14 @@ class native_linux_aio_provider : public aio_provider dsn_handle_t open(const char *file_name, int flag, int pmode) override; error_code close(dsn_handle_t fh) override; error_code flush(dsn_handle_t fh) override; - void submit_aio_task(aio_task *aio) override; - aio_context *prepare_aio_context(aio_task *tsk) override; - - class linux_disk_aio_context : public aio_context - { - public: - struct iocb cb; - aio_task *tsk; - native_linux_aio_provider *this_; - utils::notify_event *evt; - error_code err; - uint32_t bytes; + error_code write(const aio_context &aio_ctx, /*out*/ uint32_t *processed_bytes) override; + error_code read(const aio_context &aio_ctx, /*out*/ uint32_t *processed_bytes) override; - explicit linux_disk_aio_context(aio_task *tsk_) - : tsk(tsk_), this_(nullptr), evt(nullptr), err(ERR_UNKNOWN), bytes(0) - { - } - }; + void submit_aio_task(aio_task *aio) override; + aio_context *prepare_aio_context(aio_task *tsk) override { return new aio_context; } protected: error_code aio_internal(aio_task *aio, bool async, /*out*/ uint32_t *pbytes = nullptr); - void complete_aio(struct iocb *io, int bytes, int err); - void get_event(); - -private: - io_context_t _ctx; - std::atomic _is_running{false}; - std::thread _worker; }; } // namespace dsn diff --git a/src/nfs/nfs_client_impl.cpp b/src/nfs/nfs_client_impl.cpp index c76e980e11..6520a26b36 100644 --- a/src/nfs/nfs_client_impl.cpp +++ b/src/nfs/nfs_client_impl.cpp @@ -212,7 +212,7 @@ void nfs_client_impl::end_get_file_size(::dsn::error_code err, _copy_requests_low.push(std::move(copy_requests)); } - continue_copy(); + tasking::enqueue(LPC_NFS_COPY_FILE, nullptr, [this]() { continue_copy(); }, 0); } void nfs_client_impl::continue_copy() @@ -268,6 +268,7 @@ void nfs_client_impl::continue_copy() zauto_lock l(req->lock); const user_request_ptr &ureq = req->file_ctx->user_req; if (req->is_valid) { + // todo(jiashuo1) use non-block api `consumeWithBorrowNonBlocking` or `consume` _copy_token_bucket->consumeWithBorrowAndWait(req->size); copy_request copy_req; diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp index 13c1b9c2c9..298444c8c3 100644 --- a/src/replica/replica.cpp +++ b/src/replica/replica.cpp @@ -224,7 +224,6 @@ void replica::execute_mutation(mutation_ptr &mu) name(), mu->name(), static_cast(mu->client_requests.size())); - ADD_POINT(mu->tracer); error_code err = ERR_OK; decree d = mu->data.header.decree; @@ -242,6 +241,7 @@ void replica::execute_mutation(mutation_ptr &mu) } break; case partition_status::PS_PRIMARY: { + ADD_POINT(mu->tracer); check_state_completeness(); dassert(_app->last_committed_decree() + 1 == d, "app commit: %" PRId64 ", mutation decree: %" PRId64 "", @@ -306,6 +306,7 @@ void replica::execute_mutation(mutation_ptr &mu) } if (status() == partition_status::PS_PRIMARY) { + ADD_CUSTOM_POINT(mu->tracer, "completed"); mutation_ptr next = _primary_states.write_queue.check_possible_work( static_cast(_prepare_list->max_decree() - d)); @@ -315,7 +316,6 @@ void replica::execute_mutation(mutation_ptr &mu) } // update table level latency perf-counters for primary partition - ADD_CUSTOM_POINT(mu->tracer, "completed"); if (partition_status::PS_PRIMARY == status()) { uint64_t now_ns = dsn_now_ns(); for (auto update : mu->data.updates) { diff --git a/src/replica/replication_app_base.cpp b/src/replica/replication_app_base.cpp index 4d15e73262..98c4e3d6d2 100644 --- a/src/replica/replication_app_base.cpp +++ b/src/replica/replication_app_base.cpp @@ -473,7 +473,9 @@ ::dsn::error_code replication_app_base::apply_mutation(const mutation *mu) (int)mu->client_requests.size()); dassert(mu->data.updates.size() > 0, ""); - ADD_POINT(mu->tracer); + if (_replica->status() == partition_status::PS_PRIMARY) { + ADD_POINT(mu->tracer); + } bool has_ingestion_request = false; int request_count = static_cast(mu->client_requests.size()); diff --git a/src/replica/test/log_file_test.cpp b/src/replica/test/log_file_test.cpp index 84b9555be3..d016d114e2 100644 --- a/src/replica/test/log_file_test.cpp +++ b/src/replica/test/log_file_test.cpp @@ -14,6 +14,7 @@ class log_file_test : public replica_test_base public: void SetUp() override { + utils::filesystem::remove_path(_log_dir); utils::filesystem::create_directory(_log_dir); _logf = log_file::create_write(_log_dir.c_str(), 1, _start_offset); }