Skip to content

Commit

Permalink
first commit for epoch filtering
Browse files Browse the repository at this point in the history
Signed-off-by: Leonid Chernin <[email protected]>
  • Loading branch information
Leonid Chernin committed Nov 25, 2024
1 parent 5790400 commit 82f0ef1
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 23 deletions.
51 changes: 45 additions & 6 deletions src/mon/NVMeofGwMap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ int NVMeofGwMap::cfg_add_gw(
const NvmeGwId &gw_id, const NvmeGroupKey& group_key)
{
std::set<NvmeAnaGrpId> allocated;
auto gw_epoch_it = Gw_epoch.find(group_key);
if (gw_epoch_it == Gw_epoch.end())
{
Gw_epoch[group_key].epoch = 0;
dout(4) << "Allocated first gw_epoch : group_key " << group_key << " epoch " << Gw_epoch[group_key].epoch << dendl;
}
for (auto& itr: created_gws[group_key]) {
allocated.insert(itr.second.ana_grp_id);
if (itr.first == gw_id) {
Expand Down Expand Up @@ -188,8 +194,10 @@ int NVMeofGwMap::do_erase_gw_id(const NvmeGwId &gw_id,
fsm_timers.erase(group_key);

created_gws[group_key].erase(gw_id);
if (created_gws[group_key].size() == 0)
if (created_gws[group_key].size() == 0) {
created_gws.erase(group_key);
Gw_epoch.erase(group_key);
}
return 0;
}

Expand Down Expand Up @@ -219,6 +227,20 @@ int NVMeofGwMap::do_delete_gw(
return -EINVAL;
}

void NVMeofGwMap::gw_performed_startup (const NvmeGwId &gw_id,
const NvmeGroupKey& group_key, bool &propose_pending)
{
dout(4) << "GW performed the full startup " << gw_id << dendl;
propose_pending = true;
increment_gw_epoch( group_key);
}

void NVMeofGwMap::increment_gw_epoch( const NvmeGroupKey& group_key)
{
Gw_epoch[group_key].epoch ++ ;
dout(4) << "incremented epoch of " << group_key << " " << Gw_epoch[group_key].epoch << dendl;
}

int NVMeofGwMap::get_num_namespaces(const NvmeGwId &gw_id,
const NvmeGroupKey& group_key, const BeaconSubsystems& subs)
{
Expand Down Expand Up @@ -271,7 +293,10 @@ int NVMeofGwMap::process_gw_map_gw_no_subsys_no_listeners(
gw_id, group_key, state_itr.second,state_itr.first, propose_pending);
}
propose_pending = true; // map should reflect that gw becames Created
if (propose_pending) validate_gw_map(group_key);
if (propose_pending) {
validate_gw_map(group_key);
increment_gw_epoch(group_key);
}
} else {
dout(1) << __FUNCTION__ << "ERROR GW-id was not found in the map "
<< gw_id << dendl;
Expand All @@ -297,7 +322,10 @@ int NVMeofGwMap::process_gw_map_gw_down(
state_itr.second = gw_states_per_group_t::GW_STANDBY_STATE;
}
propose_pending = true; // map should reflect that gw becames Unavailable
if (propose_pending) validate_gw_map(group_key);
if (propose_pending) {
validate_gw_map(group_key);
increment_gw_epoch(group_key);
}
} else {
dout(1) << __FUNCTION__ << "ERROR GW-id was not found in the map "
<< gw_id << dendl;
Expand Down Expand Up @@ -336,7 +364,10 @@ void NVMeofGwMap::process_gw_map_ka(
state_itr.first, last_osd_epoch, propose_pending);
}
}
if (propose_pending) validate_gw_map(group_key);
if (propose_pending) {
validate_gw_map(group_key);

This comment has been minimized.

Copy link
@oritwas

oritwas Nov 25, 2024

Member

I see a few validate_gw_map and then increment_gw_epoch.
Do you think you can make it a function or do the increment in the validate function?

increment_gw_epoch(group_key);
}
}

void NVMeofGwMap::handle_abandoned_ana_groups(bool& propose)
Expand Down Expand Up @@ -385,6 +416,7 @@ void NVMeofGwMap::handle_abandoned_ana_groups(bool& propose)
}
if (propose) {
validate_gw_map(group_key);
increment_gw_epoch(group_key);
}
}
}
Expand Down Expand Up @@ -608,6 +640,7 @@ void NVMeofGwMap::fsm_handle_gw_fast_reboot(const NvmeGwId &gw_id,
}
}
validate_gw_map(group_key);
increment_gw_epoch(group_key);
}

void NVMeofGwMap::fsm_handle_gw_alive(
Expand Down Expand Up @@ -804,7 +837,10 @@ void NVMeofGwMap::fsm_handle_gw_delete(
<< "for GW " << gw_id << dendl;
}
}
if (map_modified) validate_gw_map(group_key);
if (map_modified) {
validate_gw_map(group_key);
increment_gw_epoch(group_key);
}
}

void NVMeofGwMap::fsm_handle_to_expired(
Expand Down Expand Up @@ -870,7 +906,10 @@ void NVMeofGwMap::fsm_handle_to_expired(
fbp_gw_state.set_unavailable_state();
map_modified = true;
}
if (map_modified) validate_gw_map(group_key);
if (map_modified) {
validate_gw_map(group_key);
increment_gw_epoch(group_key);
}
}

struct CMonRequestProposal : public Context {
Expand Down
7 changes: 7 additions & 0 deletions src/mon/NVMeofGwMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class NVMeofGwMap

// map that handles timers started by all Gateway FSMs
std::map<NvmeGroupKey, NvmeGwTimers> fsm_timers;
std::map<NvmeGroupKey, GwEpoch> Gw_epoch;
// epoch for synchronization of GWs belong to the same Group & Pool

void to_gmap(std::map<NvmeGroupKey, NvmeGwMonClientStates>& Gmap) const;
void track_deleting_gws(const NvmeGroupKey& group_key,
Expand All @@ -70,6 +72,8 @@ class NVMeofGwMap
NvmeAnaGrpId anagrpid, uint8_t value);
void handle_gw_performing_fast_reboot(const NvmeGwId &gw_id,
const NvmeGroupKey& group_key, bool &map_modified);
void gw_performed_startup (const NvmeGwId &gw_id,

This comment has been minimized.

Copy link
@oritwas

oritwas Nov 25, 2024

Member

Can we make it inline?

const NvmeGroupKey& group_key, bool &propose_pending);
private:
int do_delete_gw(const NvmeGwId &gw_id, const NvmeGroupKey& group_key);
int do_erase_gw_id(const NvmeGwId &gw_id,
Expand Down Expand Up @@ -118,6 +122,7 @@ class NVMeofGwMap
NvmeAnaGrpId anagrpid);
void validate_gw_map(
const NvmeGroupKey& group_key);
void increment_gw_epoch (const NvmeGroupKey& group_key);

This comment has been minimized.

Copy link
@oritwas

oritwas Nov 25, 2024

Member

Let's make it inline


public:
int blocklist_gw(
Expand All @@ -131,6 +136,7 @@ class NVMeofGwMap

encode(created_gws, bl, features); //Encode created GWs
encode(fsm_timers, bl, features);
encode(Gw_epoch, bl);
ENCODE_FINISH(bl);
}

Expand All @@ -141,6 +147,7 @@ class NVMeofGwMap

decode(created_gws, bl);
decode(fsm_timers, bl);
decode(Gw_epoch, bl);
DECODE_FINISH(bl);
}

Expand Down
51 changes: 36 additions & 15 deletions src/mon/NVMeofGwMon.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,18 +211,28 @@ void NVMeofGwMon::update_from_paxos(bool *need_bootstrap)

void NVMeofGwMon::check_sub(Subscription *sub)
{
dout(10) << "sub->next , map-epoch " << sub->next
<< " " << map.epoch << dendl;
if (sub->next <= map.epoch)
{
dout(10) << "Sending map to subscriber " << sub->session->con
<< " " << sub->session->con->get_peer_addr() << dendl;
sub->session->con->send_message2(make_message<MNVMeofGwMap>(map));

if (sub->onetime) {
mon.session_map.remove_sub(sub);
} else {
sub->next = map.epoch + 1;
// dout(10) << "sub->next , map-epoch " << sub->next << " " << map.epoch << dendl;
for (const auto& created_map_pair: map.created_gws) {
const auto& group_key = created_map_pair.first;
const NvmeGwMonStates& gw_created_map = created_map_pair.second;
for (const auto& gw_created_pair: gw_created_map) {
const auto& gw_id = gw_created_pair.first;
if ( //(gw_created_pair.second.availability == GW_AVAILABILITY_E::GW_AVAILABLE) &&
(gw_created_pair.second.addr_vect == entity_addrvec_t(sub->session->con->get_peer_addr() ) )
)
{
dout(10) << "found gw-vect " << gw_created_pair.second.addr_vect << " GW " << gw_id << " group-key " << group_key << dendl;
dout(10) << "sub->next(epoch) " << sub->next << " map.Gw_epoch " << map.Gw_epoch[group_key].epoch << dendl;
if (sub->next <= map.Gw_epoch[group_key].epoch){
dout(4) << "Send unicast map to GW "<< gw_id << dendl;
NVMeofGwMap unicast_map;
unicast_map.created_gws[group_key][gw_id] = map.created_gws[group_key][gw_id];
// respond with a map slice correspondent to the same GW
unicast_map.epoch = map.epoch;
sub->session->con->send_message2(make_message<MNVMeofGwMap>(unicast_map));
sub->next = map.Gw_epoch[group_key].epoch + 1;
}
}
}
}
}
Expand All @@ -236,6 +246,7 @@ void NVMeofGwMon::check_subs(bool t)
return;
}
for (auto sub : *(mon.session_map.subs[type])) {
dout(10) << " dump subscriber peer_addr : " << sub->session->con->get_peer_addr() << dendl;
check_sub(sub);
}
}
Expand Down Expand Up @@ -335,6 +346,7 @@ bool NVMeofGwMon::preprocess_command(MonOpRequestRef op)
}
}
f->dump_unsigned("num gws", map.created_gws[group_key].size());
f->dump_unsigned("GW-epoch", map.Gw_epoch[group_key].epoch);
if (map.created_gws[group_key].size() == 0) {
f->close_section();
f->flush(rdata);
Expand Down Expand Up @@ -526,7 +538,7 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op)
<< " GW : " << m->get_gw_id()
<< " osdmap_epoch " << m->get_last_osd_epoch()
<< " subsystems " << m->get_subsystems() << dendl;

ConnectionRef con = op->get_connection();
NvmeGwId gw_id = m->get_gw_id();
NvmeGroupKey group_key = std::make_pair(m->get_gw_pool(), m->get_gw_group());
gw_availability_t avail = m->get_availability();
Expand Down Expand Up @@ -560,12 +572,15 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op)
pending_map.handle_gw_performing_fast_reboot(gw_id, group_key, propose);
LastBeacon lb = {gw_id, group_key};
last_beacon[lb] = now; //Update last beacon

} else if (
pending_map.created_gws[group_key][gw_id].performed_full_startup ==
false) {
pending_map.created_gws[group_key][gw_id].performed_full_startup = true;
propose = true;
}
pending_map.gw_performed_startup(gw_id, group_key, propose);
pending_map.created_gws[group_key][gw_id].addr_vect =
entity_addrvec_t(con->get_peer_addr());
goto set_propose;
}
// gw already created
Expand All @@ -590,7 +605,13 @@ bool NVMeofGwMon::prepare_beacon(MonOpRequestRef op)
}
}
}

if ( pending_map.created_gws[group_key][gw_id].addr_vect !=
entity_addrvec_t(con->get_peer_addr()) )
{
dout(4) << "Warning entity addr changed for GW client" << gw_id
<< " was " << pending_map.created_gws[group_key][gw_id].addr_vect
<< " now " << entity_addrvec_t(con->get_peer_addr()) << dendl;
}
// At this stage the gw has to be in the Created_gws
if (gw == group_gws.end()) {
dout(4) << "GW that does not appear in the map sends beacon, ignore "
Expand Down
44 changes: 42 additions & 2 deletions src/mon/NVMeofGwSerialize.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,9 @@ inline std::ostream& operator<<(std::ostream& os, const NvmeGwMonStates value) {

inline std::ostream& operator<<(std::ostream& os, const NVMeofGwMap value) {
os << "NVMeofGwMap [ Created_gws: ";
for (auto& group_gws: value.created_gws) {
for (auto& group_gws: value.Gw_epoch) {
os << "\n" << MODULE_PREFFIX << "{ " << group_gws.first
<< " } -> { " << group_gws.second << " }";
<< " } -> GW epoch: " << group_gws.second.epoch << " }";
}
os << "]";
return os;
Expand Down Expand Up @@ -492,6 +492,7 @@ inline void encode(const NvmeGwMonStates& gws, ceph::bufferlist &bl,
}
}
encode(gw.second.nonce_map, bl, features);
gw.second.addr_vect.encode(bl, CEPH_FEATURES_ALL);
}
ENCODE_FINISH(bl);
}
Expand Down Expand Up @@ -573,6 +574,7 @@ inline void decode(
}
}
decode(gw_created.nonce_map, bl);
gw_created.addr_vect.decode(bl);
gws[gw_name] = gw_created;
}
if (struct_v == 1) { //Fix allocations of states and blocklist_data
Expand All @@ -590,6 +592,44 @@ inline void decode(
DECODE_FINISH(bl);
}

inline void encode( const Gw_Epoch& gw_epoch, ceph::bufferlist &bl)
{
encode(gw_epoch.epoch, bl);
}

inline void encode(const std::map<NvmeGroupKey, GwEpoch>& gw_epoch, ceph::bufferlist &bl) {
ENCODE_START(1, 1, bl);
encode ((uint32_t)gw_epoch.size(), bl); // number of groups
for (auto& group_epoch: gw_epoch) {
auto& group_key = group_epoch.first;
encode(group_key.first, bl); // pool
encode(group_key.second, bl); // group
encode(group_epoch.second, bl);
}
ENCODE_FINISH(bl);
}

inline void decode(Gw_Epoch& gw_epoch, ceph::buffer::list::const_iterator &bl)
{
decode(gw_epoch.epoch, bl);
}

inline void decode(std::map<NvmeGroupKey, GwEpoch>& gw_epoch, ceph::buffer::list::const_iterator &bl) {
gw_epoch.clear();
uint32_t ngroups;
DECODE_START(1, bl);
decode(ngroups, bl);
for(uint32_t i = 0; i<ngroups; i++){
std::string pool, group;
decode(pool, bl);
decode(group, bl);
GwEpoch gepoch;
decode(gepoch, bl);
gw_epoch[std::make_pair(pool, group)] = gepoch;
}
DECODE_FINISH(bl);
}

inline void encode(
const std::map<NvmeGroupKey, NvmeGwMonStates>& created_gws,
ceph::bufferlist &bl, uint64_t features) {
Expand Down
9 changes: 9 additions & 0 deletions src/mon/NVMeofGwTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ struct NvmeGwMonState {
// state machine states per ANA group
SmState sm_state;
BlocklistData blocklist_data;
//ceph entity address allocated for the GW-client that represents this GW-id
entity_addrvec_t addr_vect;

NvmeGwMonState(): ana_grp_id(REDUNDANT_GW_ANA_GROUP_ID) {}

Expand Down Expand Up @@ -226,6 +228,13 @@ struct NvmeGwTimerState {
NvmeGwTimerState() {};
};

typedef struct Gw_Epoch {
epoch_t epoch;
Gw_Epoch(epoch_t epoch) : epoch(epoch){
};
Gw_Epoch():Gw_Epoch(0) {};
}GwEpoch;

using NvmeGwMonClientStates = std::map<NvmeGwId, NvmeGwClientState>;
using NvmeGwTimers = std::map<NvmeGwId, NvmeGwTimerState>;
using NvmeGwMonStates = std::map<NvmeGwId, NvmeGwMonState>;
Expand Down

0 comments on commit 82f0ef1

Please sign in to comment.