diff --git a/src/replica/CMakeLists.txt b/src/replica/CMakeLists.txt index 2827ea9a85..4ba91b9a30 100644 --- a/src/replica/CMakeLists.txt +++ b/src/replica/CMakeLists.txt @@ -10,7 +10,9 @@ set(DUPLICATION_SRC ) set(BACKUP_SRC backup/replica_backup_manager.cpp - backup/cold_backup_context.cpp) + backup/cold_backup_context.cpp + backup/replica_backup_server.cpp +) set(BULK_LOAD_SRC bulk_load/replica_bulk_loader.cpp) diff --git a/src/replica/backup/replica_backup_manager.cpp b/src/replica/backup/replica_backup_manager.cpp index 599af6454f..871a01722a 100644 --- a/src/replica/backup/replica_backup_manager.cpp +++ b/src/replica/backup/replica_backup_manager.cpp @@ -4,6 +4,7 @@ #include "replica_backup_manager.h" #include "cold_backup_context.h" +#include "replica/replica.h" #include #include diff --git a/src/replica/backup/replica_backup_manager.h b/src/replica/backup/replica_backup_manager.h index fca6c94c29..82b30bebf0 100644 --- a/src/replica/backup/replica_backup_manager.h +++ b/src/replica/backup/replica_backup_manager.h @@ -4,11 +4,13 @@ #pragma once -#include "replica/replica.h" +#include +#include namespace dsn { namespace replication { +class replica; class replica_backup_manager : replica_base { public: diff --git a/src/replica/backup/replica_backup_server.cpp b/src/replica/backup/replica_backup_server.cpp new file mode 100644 index 0000000000..32b5f0f09e --- /dev/null +++ b/src/replica/backup/replica_backup_server.cpp @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "replica_backup_server.h" +#include "replica_backup_manager.h" +#include "replica/replica.h" +#include "replica/replica_stub.h" + +namespace dsn { +namespace replication { + +replica_backup_server::replica_backup_server(const replica_stub *rs) : _stub(rs) +{ + dsn_rpc_register_handler(RPC_COLD_BACKUP, "cold_backup", [this](message_ex *msg) { + on_cold_backup(backup_rpc::auto_reply(msg)); + }); + dsn_rpc_register_handler(RPC_CLEAR_COLD_BACKUP, "clear_cold_backup", [this](message_ex *msg) { + backup_clear_request clear_req; + unmarshall(msg, clear_req); + on_clear_cold_backup(clear_req); + }); +} + +void replica_backup_server::on_cold_backup(backup_rpc rpc) +{ + const backup_request &request = rpc.request(); + backup_response &response = rpc.response(); + + ddebug("received cold backup request: backup{%s.%s.%" PRId64 "}", + request.pid.to_string(), + request.policy.policy_name.c_str(), + request.backup_id); + response.pid = request.pid; + response.policy_name = request.policy.policy_name; + response.backup_id = request.backup_id; + + if (_stub->options().cold_backup_root.empty()) { + derror("backup{%s.%s.%" PRId64 + "}: cold_backup_root is empty, response ERR_OPERATION_DISABLED", + request.pid.to_string(), + request.policy.policy_name.c_str(), + request.backup_id); + response.err = ERR_OPERATION_DISABLED; + return; + } + + replica_ptr rep = _stub->get_replica(request.pid); + if (rep != nullptr) { + rep->on_cold_backup(request, response); + } else { + derror("backup{%s.%s.%" PRId64 "}: replica not found, response ERR_OBJECT_NOT_FOUND", + request.pid.to_string(), + request.policy.policy_name.c_str(), + request.backup_id); + response.err = ERR_OBJECT_NOT_FOUND; + } +} + +void replica_backup_server::on_clear_cold_backup(const backup_clear_request &request) +{ + ddebug_f("receive clear cold backup request: backup({}.{})", + request.pid.to_string(), + request.policy_name.c_str()); + + replica_ptr rep = _stub->get_replica(request.pid); + if (rep != nullptr) { + rep->get_backup_manager()->on_clear_cold_backup(request); + } +} + +} // namespace replication +} // namespace dsn diff --git a/src/replica/backup/replica_backup_server.h b/src/replica/backup/replica_backup_server.h new file mode 100644 index 0000000000..477f7e329d --- /dev/null +++ b/src/replica/backup/replica_backup_server.h @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +namespace dsn { +namespace replication { + +class replica_stub; + +typedef rpc_holder backup_rpc; + +// A server distributes the cold-backup task to the targeted replica. +class replica_backup_server +{ +public: + explicit replica_backup_server(const replica_stub *rs); + +private: + void on_cold_backup(backup_rpc rpc); + + void on_clear_cold_backup(const backup_clear_request &request); + +private: + const replica_stub *_stub; +}; + +} // namespace replication +} // namespace dsn diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index 22d3ccd35d..431fa4c4af 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -39,7 +39,7 @@ #include "mutation.h" #include "bulk_load/replica_bulk_loader.h" #include "duplication/duplication_sync_timer.h" -#include "backup/replica_backup_manager.h" +#include "backup/replica_backup_server.h" #include #include @@ -758,6 +758,8 @@ void replica_stub::initialize_start() _duplication_sync_timer->start(); } + _backup_server = dsn::make_unique(this); + // init liveness monitor dassert(NS_Disconnected == _state, ""); if (_options.fd_disabled == false) { @@ -806,7 +808,7 @@ dsn::error_code replica_stub::on_kill_replica(gpid id) } } -replica_ptr replica_stub::get_replica(gpid id) +replica_ptr replica_stub::get_replica(gpid id) const { zauto_read_lock l(_replicas_lock); auto it = _replicas.find(id); @@ -1050,53 +1052,6 @@ void replica_stub::on_query_app_info(query_app_info_rpc rpc) } } -void replica_stub::on_cold_backup(backup_rpc rpc) -{ - const backup_request &request = rpc.request(); - backup_response &response = rpc.response(); - - ddebug("received cold backup request: backup{%s.%s.%" PRId64 "}", - request.pid.to_string(), - request.policy.policy_name.c_str(), - request.backup_id); - response.pid = request.pid; - response.policy_name = request.policy.policy_name; - response.backup_id = request.backup_id; - - if (_options.cold_backup_root.empty()) { - derror("backup{%s.%s.%" PRId64 - "}: cold_backup_root is empty, response ERR_OPERATION_DISABLED", - request.pid.to_string(), - request.policy.policy_name.c_str(), - request.backup_id); - response.err = ERR_OPERATION_DISABLED; - return; - } - - replica_ptr rep = get_replica(request.pid); - if (rep != nullptr) { - rep->on_cold_backup(request, response); - } else { - derror("backup{%s.%s.%" PRId64 "}: replica not found, response ERR_OBJECT_NOT_FOUND", - request.pid.to_string(), - request.policy.policy_name.c_str(), - request.backup_id); - response.err = ERR_OBJECT_NOT_FOUND; - } -} - -void replica_stub::on_clear_cold_backup(const backup_clear_request &request) -{ - ddebug_f("receive clear cold backup request: backup({}.{})", - request.pid.to_string(), - request.policy_name.c_str()); - - replica_ptr rep = get_replica(request.pid); - if (rep != nullptr) { - rep->get_backup_manager()->on_clear_cold_backup(request); - } -} - void replica_stub::on_prepare(dsn::message_ex *request) { gpid id; @@ -2136,10 +2091,6 @@ void replica_stub::open_service() RPC_QUERY_DISK_INFO, "query_disk_info", &replica_stub::on_query_disk_info); register_rpc_handler_with_rpc_holder( RPC_QUERY_APP_INFO, "query_app_info", &replica_stub::on_query_app_info); - register_rpc_handler_with_rpc_holder( - RPC_COLD_BACKUP, "cold_backup", &replica_stub::on_cold_backup); - register_rpc_handler( - RPC_CLEAR_COLD_BACKUP, "clear_cold_backup", &replica_stub::on_clear_cold_backup); register_rpc_handler_with_rpc_holder(RPC_SPLIT_NOTIFY_CATCH_UP, "child_notify_catch_up", &replica_stub::on_notify_primary_split_catch_up); diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h index 74c5c93dae..4ef0f378e3 100644 --- a/src/replica/replica_stub.h +++ b/src/replica/replica_stub.h @@ -54,7 +54,6 @@ typedef rpc_holder quer typedef rpc_holder copy_checkpoint_rpc; typedef rpc_holder query_disk_info_rpc; typedef rpc_holder query_app_info_rpc; -typedef rpc_holder backup_rpc; typedef rpc_holder notify_catch_up_rpc; typedef rpc_holder group_bulk_load_rpc; @@ -74,6 +73,7 @@ typedef dsn::ref_ptr replica_stub_ptr; class duplication_sync_timer; class replica_bulk_loader; +class replica_backup_server; class replica_stub : public serverlet, public ref_counter { public: @@ -106,8 +106,6 @@ class replica_stub : public serverlet, public ref_counter void on_query_replica_info(query_replica_info_rpc rpc); void on_query_disk_info(query_disk_info_rpc rpc); void on_query_app_info(query_app_info_rpc rpc); - void on_cold_backup(backup_rpc rpc); - void on_clear_cold_backup(const backup_clear_request &request); void on_bulk_load(bulk_load_rpc rpc); // @@ -152,8 +150,9 @@ class replica_stub : public serverlet, public ref_counter // // common routines for inquiry // - replica_ptr get_replica(gpid id); + replica_ptr get_replica(gpid id) const; replication_options &options() { return _options; } + const replication_options &options() const { return _options; } bool is_connected() const { return NS_Connected == _state; } virtual rpc_address get_meta_server_address() const { return _failure_detector->get_servers(); } rpc_address primary_address() const { return _primary_address; } @@ -328,6 +327,7 @@ class replica_stub : public serverlet, public ref_counter ::dsn::task_ptr _mem_release_timer_task; std::unique_ptr _duplication_sync_timer; + std::unique_ptr _backup_server; // command_handlers dsn_handle_t _kill_partition_command;