diff --git a/src/dist/block_service/fds/fds_service.cpp b/src/dist/block_service/fds/fds_service.cpp index 7aa9f1ef24..6c4dfd28ea 100644 --- a/src/dist/block_service/fds/fds_service.cpp +++ b/src/dist/block_service/fds/fds_service.cpp @@ -17,6 +17,10 @@ #include #include #include +#include +#include +#include +#include namespace dsn { namespace dist { @@ -95,10 +99,46 @@ DEFINE_THREAD_POOL_CODE(THREAD_POOL_FDS_SERVICE) DEFINE_TASK_CODE(LPC_FDS_CALL, TASK_PRIORITY_COMMON, THREAD_POOL_FDS_SERVICE) const std::string fds_service::FILE_LENGTH_CUSTOM_KEY = "x-xiaomi-meta-content-length"; -const std::string fds_service::FILE_LENGTH_KEY = "content-length"; const std::string fds_service::FILE_MD5_KEY = "content-md5"; -fds_service::fds_service() {} +fds_service::fds_service() +{ + const int BYTE_TO_BIT = 8; + + /// In normal scenario, the sst file size of level 0 is write_buffer_size * [0.75, 1.25] + /// And in BULK_LOAD scenario, it is 4 * write_buffer_size * [0.75, 1.25]. + /// In rdsn, we can't get the scenario, so if we take BULK_LOAD scenario into consideration, + /// we must set max_sst_file_size to 4 * write_buffer_size * [0.75, 1.25], which is too big. + /// So in this implementation, we don't take BULK_LOAD scenario into consideration. + uint64_t target_file_size = + dsn_config_get_value_uint64("pegasus.server", + "rocksdb_target_file_size_base", + 64 * 1024 * 1024, + "rocksdb options.target_file_size_base"); + uint64_t write_buffer_size = dsn_config_get_value_uint64("pegasus.server", + "rocksdb_write_buffer_size", + 64 * 1024 * 1024, + "rocksdb options.write_buffer_size"); + uint64_t max_sst_file_size = std::max(target_file_size, (uint64_t)1.25 * write_buffer_size); + + uint32_t write_rate_limit = (uint32_t)dsn_config_get_value_uint64( + "replication", "fds_write_limit_rate", 20, "rate limit of fds(Mb/s)"); + /// For write operation, we can't send a file in batches. Because putContent interface of fds + /// will overwrite what was sent before for the same file. So we must send a file as a whole. + /// If file size > burst size, the file will be rejected by the token bucket. + /// Here we set burst_size = max_sst_file_size + 3MB, a litte greater than max_sst_file_size + uint32_t burst_size = + std::max(2 * write_rate_limit * 1e6 / BYTE_TO_BIT, max_sst_file_size + 3e6); + _write_token_bucket.reset( + new folly::TokenBucket(write_rate_limit * 1e6 / BYTE_TO_BIT, burst_size)); + + uint32_t read_rate_limit = (uint32_t)dsn_config_get_value_uint64( + "replication", "fds_read_limit_rate", 20, "rate limit of fds(Mb/s)"); + burst_size = 2 * read_rate_limit * 1e6 / BYTE_TO_BIT; + _read_token_bucket.reset( + new folly::TokenBucket(read_rate_limit * 1e6 / BYTE_TO_BIT, burst_size)); +} + fds_service::~fds_service() {} /** @@ -231,6 +271,7 @@ dsn::task_ptr fds_service::list_dir(const ls_request &req, return t; } +// TODO(zhaoliwei) refactor these code, because there have same code in get_file_meta() dsn::task_ptr fds_service::create_file(const create_file_request &req, dsn::task_code code, const create_file_callback &cb, @@ -484,82 +525,146 @@ fds_file_object::fds_file_object(fds_service *s, fds_file_object::~fds_file_object() {} -dsn::error_code fds_file_object::get_content(uint64_t pos, - int64_t length, - /*out*/ std::ostream &os, - /*out*/ uint64_t &transfered_bytes) +error_code fds_file_object::get_file_meta() { - dsn::error_code err; + galaxy::fds::GalaxyFDSClient *c = _service->get_client(); + try { + auto meta = c->getObjectMetadata(_service->get_bucket_name(), _fds_path)->metadata(); + + // get file length + auto iter = meta.find(fds_service::FILE_LENGTH_CUSTOM_KEY); + dassert_f(iter != meta.end(), + "can't find {} in object({})'s metadata", + fds_service::FILE_LENGTH_CUSTOM_KEY.c_str(), + _fds_path.c_str()); + bool valid = dsn::buf2uint64(iter->second, _size); + dassert_f(valid, "error to get file size"); + + // get md5 key + iter = meta.find(fds_service::FILE_MD5_KEY); + dassert_f(iter != meta.end(), + "can't find {} in object({})'s metadata", + fds_service::FILE_MD5_KEY.c_str(), + _fds_path.c_str()); + _md5sum = iter->second; + + _has_meta_synced = true; + return ERR_OK; + } catch (const galaxy::fds::GalaxyFDSClientException &ex) { + if (ex.code() == Poco::Net::HTTPResponse::HTTP_NOT_FOUND) { + return ERR_OBJECT_NOT_FOUND; + } else { + derror_f("fds getObjectMetadata failed: parameter({}), code({}), msg({})", + _name.c_str(), + ex.code(), + ex.what()); + return ERR_FS_INTERNAL; + } + } +} + +error_code fds_file_object::get_content_in_batches(uint64_t start, + int64_t length, + /*out*/ std::ostream &os, + /*out*/ uint64_t &transfered_bytes) +{ + // the max batch size is 1MB + const uint64_t BATCH_MAX = 1e6; + error_code err = ERR_OK; transfered_bytes = 0; + // get file meta if it is not synced + if (!_has_meta_synced) { + err = get_file_meta(); + if (ERR_OK != err) { + return err; + } + } + + // if length = -1, it means we should transfer the whole file + uint64_t to_transfer_bytes = (length == -1 ? _size : length); + + uint64_t pos = start; + uint64_t once_transfered_bytes = 0; + while (pos < start + to_transfer_bytes) { + uint64_t batch_len = std::min(BATCH_MAX, start + to_transfer_bytes - pos); + // get tokens from token bucket + _service->_read_token_bucket->consumeWithBorrowAndWait(batch_len); + + err = get_content(pos, batch_len, os, once_transfered_bytes); + transfered_bytes += once_transfered_bytes; + if (err != ERR_OK || once_transfered_bytes < batch_len) { + return err; + } + pos += batch_len; + } + + return ERR_OK; +} + +error_code fds_file_object::get_content(uint64_t pos, + uint64_t length, + /*out*/ std::ostream &os, + /*out*/ uint64_t &transfered_bytes) +{ + error_code err = ERR_OK; + transfered_bytes = 0; while (true) { - if (_has_meta_synced) { - // if we have download enough or we have reach the end - if ((length != -1 && (int64_t)transfered_bytes >= length) || - transfered_bytes + pos >= _size) { - return dsn::ERR_OK; - } + // if we have download enough or we have reach the end + if (transfered_bytes >= length || transfered_bytes + pos >= _size) { + return ERR_OK; } try { galaxy::fds::GalaxyFDSClient *c = _service->get_client(); std::shared_ptr obj; - if (length == -1) - obj = c->getObject(_service->get_bucket_name(), _fds_path, pos + transfered_bytes); - else - obj = c->getObject(_service->get_bucket_name(), - _fds_path, - pos + transfered_bytes, - length - transfered_bytes); + obj = c->getObject(_service->get_bucket_name(), + _fds_path, + pos + transfered_bytes, + length - transfered_bytes); dinfo("get object from fds succeed, remote_file(%s)", _fds_path.c_str()); - if (!_has_meta_synced) { - const std::map &meta = obj->objectMetadata().metadata(); - auto iter = meta.find(fds_service::FILE_MD5_KEY); - if (iter != meta.end()) { - _md5sum = iter->second; - iter = meta.find(fds_service::FILE_LENGTH_KEY); - dassert(iter != meta.end(), - "%s: can't get %s in getObject %s", - _name.c_str(), - fds_service::FILE_LENGTH_KEY.c_str(), - _fds_path.c_str()); - _size = atoll(iter->second.c_str()); - _has_meta_synced = true; - } - } std::istream &is = obj->objectContent(); transfered_bytes += utils::copy_stream(is, os, PIECE_SIZE); - err = dsn::ERR_OK; + err = ERR_OK; } catch (const galaxy::fds::GalaxyFDSClientException &ex) { derror("fds getObject error: remote_file(%s), code(%d), msg(%s)", file_name().c_str(), ex.code(), ex.what()); - if (!_has_meta_synced && ex.code() == Poco::Net::HTTPResponse::HTTP_NOT_FOUND) { + if (ex.code() == Poco::Net::HTTPResponse::HTTP_NOT_FOUND) { _has_meta_synced = true; _md5sum = ""; _size = 0; - err = dsn::ERR_OBJECT_NOT_FOUND; + err = ERR_OBJECT_NOT_FOUND; } else { - err = dsn::ERR_FS_INTERNAL; + err = ERR_FS_INTERNAL; } } FDS_EXCEPTION_HANDLE(err, "getObject", file_name().c_str()) - if (err != dsn::ERR_OK) { + if (err != ERR_OK) { return err; } } - - return err; } -dsn::error_code fds_file_object::put_content(/*in-out*/ std::istream &is, - uint64_t &transfered_bytes) +error_code fds_file_object::put_content(/*in-out*/ std::istream &is, + int64_t to_transfer_bytes, + uint64_t &transfered_bytes) { - dsn::error_code err = dsn::ERR_OK; + error_code err = ERR_OK; transfered_bytes = 0; galaxy::fds::GalaxyFDSClient *c = _service->get_client(); + + // get tokens from token bucket + if (!_service->_write_token_bucket->consumeWithBorrowAndWait(to_transfer_bytes)) { + ddebug_f("the transfer count({}) is greater than burst size({}), so it is rejected by " + "token bucket", + to_transfer_bytes, + _service->_write_token_bucket->burst()); + return ERR_BUSY; + } + try { c->putObject(_service->get_bucket_name(), _fds_path, is, galaxy::fds::FDSObjectMetadata()); } catch (const galaxy::fds::GalaxyFDSClientException &ex) { @@ -571,7 +676,7 @@ dsn::error_code fds_file_object::put_content(/*in-out*/ std::istream &is, } FDS_EXCEPTION_HANDLE(err, "putObject", file_name().c_str()) - if (err != dsn::ERR_OK) { + if (err != ERR_OK) { return err; } @@ -621,7 +726,7 @@ dsn::task_ptr fds_file_object::write(const write_request &req, write_response resp; std::istringstream is; is.str(std::string(req.buffer.data(), req.buffer.length())); - resp.err = put_content(is, resp.written_size); + resp.err = put_content(is, req.buffer.length(), resp.written_size); t->enqueue_with(resp); release_ref(); @@ -643,6 +748,10 @@ dsn::task_ptr fds_file_object::upload(const upload_request &req, add_ref(); auto upload_background = [this, req, t]() { const std::string &local_file = req.input_local_name; + // get file size + int64_t file_sz = 0; + dsn::utils::filesystem::file_size(local_file, file_sz); + upload_response resp; // TODO: we can cache the whole file in buffer, then upload the buffer rather than the // ifstream, because if ifstream read file beyond 60s, fds-server will reset the session, @@ -658,7 +767,7 @@ dsn::task_ptr fds_file_object::upload(const upload_request &req, ptr); resp.err = dsn::ERR_FILE_OPERATION_FAILED; } else { - resp.err = put_content(is, resp.uploaded_size); + resp.err = put_content(is, file_sz, resp.uploaded_size); is.close(); } @@ -691,7 +800,7 @@ dsn::task_ptr fds_file_object::read(const read_request &req, read_response resp; std::ostringstream os; uint64_t transferd_size; - resp.err = get_content(req.remote_pos, req.remote_length, os, transferd_size); + resp.err = get_content_in_batches(req.remote_pos, req.remote_length, os, transferd_size); if (os.tellp() > 0) { std::string *output = new std::string(); *output = os.str(); @@ -743,7 +852,8 @@ dsn::task_ptr fds_file_object::download(const download_request &req, auto download_background = [this, req, handle, t]() { download_response resp; uint64_t transfered_size; - resp.err = get_content(req.remote_pos, req.remote_length, *handle, transfered_size); + resp.err = + get_content_in_batches(req.remote_pos, req.remote_length, *handle, transfered_size); resp.downloaded_size = 0; if (handle->tellp() != -1) resp.downloaded_size = handle->tellp(); @@ -755,6 +865,6 @@ dsn::task_ptr fds_file_object::download(const download_request &req, dsn::tasking::enqueue(LPC_FDS_CALL, nullptr, download_background); return t; } -} -} -} +} // namespace block_service +} // namespace dist +} // namespace dsn diff --git a/src/dist/block_service/fds/fds_service.h b/src/dist/block_service/fds/fds_service.h index ea4773f9ff..0e763d0155 100644 --- a/src/dist/block_service/fds/fds_service.h +++ b/src/dist/block_service/fds/fds_service.h @@ -3,6 +3,13 @@ #include +namespace folly { +template +class BasicTokenBucket; + +using TokenBucket = BasicTokenBucket; +} + namespace galaxy { namespace fds { class GalaxyFDSClient; @@ -64,6 +71,10 @@ class fds_service : public block_filesystem private: std::shared_ptr _client; std::string _bucket_name; + std::unique_ptr _read_token_bucket; + std::unique_ptr _write_token_bucket; + + friend class fds_file_object; }; class fds_file_object : public block_file @@ -101,11 +112,19 @@ class fds_file_object : public block_file dsn::task_tracker *tracker) override; private: - dsn::error_code get_content(uint64_t pos, - int64_t length, - /*out*/ std::ostream &os, - /*out*/ uint64_t &transfered_bytes); - dsn::error_code put_content(/*in-out*/ std::istream &is, /*out*/ uint64_t &transfered_bytes); + error_code get_content_in_batches(uint64_t start, + int64_t length, + /*out*/ std::ostream &os, + /*out*/ uint64_t &transfered_bytes); + error_code get_content(uint64_t pos, + uint64_t length, + /*out*/ std::ostream &os, + /*out*/ uint64_t &transfered_bytes); + error_code put_content(/*in-out*/ std::istream &is, + /*int*/ int64_t to_transfer_bytes, + /*out*/ uint64_t &transfered_bytes); + error_code get_file_meta(); + fds_service *_service; std::string _fds_path; std::string _md5sum;