Skip to content

Commit

Permalink
Fix synchronization assert
Browse files Browse the repository at this point in the history
Signed-off-by: Andrei Lebedev <[email protected]>
  • Loading branch information
lebdron authored and kuvaldini committed Jun 15, 2021
1 parent 5f48eeb commit bfae15d
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 140 deletions.
27 changes: 14 additions & 13 deletions irohad/consensus/gate_object.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
#ifndef CONSENSUS_GATE_OBJECT_HPP
#define CONSENSUS_GATE_OBJECT_HPP

#include <boost/variant.hpp>
#include <variant>

#include "ametsuchi/ledger_state.hpp"
#include "consensus/round.hpp"
#include "cryptography/hash.hpp"
Expand Down Expand Up @@ -87,21 +88,21 @@ namespace iroha {
using Synchronizable::Synchronizable;
};

using GateObject = boost::variant<PairValid,
VoteOther,
ProposalReject,
BlockReject,
AgreementOnNone,
Future>;
using GateObject = std::variant<PairValid,
VoteOther,
ProposalReject,
BlockReject,
AgreementOnNone,
Future>;

} // namespace consensus
} // namespace iroha

extern template class boost::variant<iroha::consensus::PairValid,
iroha::consensus::VoteOther,
iroha::consensus::ProposalReject,
iroha::consensus::BlockReject,
iroha::consensus::AgreementOnNone,
iroha::consensus::Future>;
extern template class std::variant<iroha::consensus::PairValid,
iroha::consensus::VoteOther,
iroha::consensus::ProposalReject,
iroha::consensus::BlockReject,
iroha::consensus::AgreementOnNone,
iroha::consensus::Future>;

#endif // CONSENSUS_GATE_OBJECT_HPP
10 changes: 2 additions & 8 deletions irohad/consensus/impl/gate_object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,5 @@

using GateObject = iroha::consensus::GateObject;

template GateObject::~variant();
template GateObject::variant(GateObject &&) noexcept;
template GateObject::variant(const GateObject &);
template void GateObject::destroy_content() noexcept;
template int GateObject::which() const noexcept;
template void GateObject::indicate_which(int) noexcept;
template bool GateObject::using_backup() const noexcept;
template GateObject::convert_copy_into::convert_copy_into(void *) noexcept;
template std::size_t GateObject::index() const noexcept;
template bool GateObject::valueless_by_exception() const noexcept;
13 changes: 7 additions & 6 deletions irohad/main/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -721,9 +721,6 @@ namespace {
case SynchronizationOutcomeType::kNothing:
log->info(R"(~~~~~~~~~| EMPTY (-_-)zzz |~~~~~~~~~ )");
break;
case SynchronizationOutcomeType::kError:
log->info(R"(~~~~~~~~~| ERROR :-?????? |~~~~~~~~~ )");
break;
}
}
} // namespace
Expand Down Expand Up @@ -955,9 +952,13 @@ Irohad::RunResult Irohad::run() {
maybe_log->info("~~~~~~~~~| PROPOSAL ^_^ |~~~~~~~~~ ");
consensus_gate_objects.get_subscriber().on_next(object);
auto event = maybe_synchronizer->processOutcome(std::move(object));
maybe_subscription->notify(EventTypes::kOnSynchronization, event);
printSynchronizationEvent(maybe_log, event);
maybe_ordering_init->processSynchronizationEvent(std::move(event));
if (not event) {
return;
}
maybe_subscription->notify(EventTypes::kOnSynchronization,
SynchronizationEvent(*event));
printSynchronizationEvent(maybe_log, *event);
maybe_ordering_init->processSynchronizationEvent(std::move(*event));
}
});

Expand Down
157 changes: 78 additions & 79 deletions irohad/synchronizer/impl/synchronizer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,47 +36,51 @@ namespace iroha {
block_loader_(std::move(block_loader)),
log_(std::move(log)) {}

SynchronizationEvent SynchronizerImpl::processOutcome(
std::optional<SynchronizationEvent> SynchronizerImpl::processOutcome(
consensus::GateObject object) {
log_->info("processing consensus outcome");

auto process_reject = [this](auto outcome_type, const auto &msg) {
auto process_reject =
[this](auto outcome_type,
const auto &msg) -> std::optional<SynchronizationEvent> {
assert(msg.ledger_state->top_block_info.height + 1
== msg.round.block_round);
return SynchronizationEvent{outcome_type, msg.round, msg.ledger_state};
};

return visit_in_place(
object,
[this](const consensus::PairValid &msg) {
assert(msg.ledger_state->top_block_info.height + 1
== msg.round.block_round);
return this->processNext(msg);
},
[this](const consensus::VoteOther &msg) {
assert(msg.ledger_state->top_block_info.height + 1
== msg.round.block_round);
return this->processDifferent(msg, msg.round.block_round);
},
[&](const consensus::ProposalReject &msg) {
return process_reject(SynchronizationOutcomeType::kReject, msg);
},
[&](const consensus::BlockReject &msg) {
return process_reject(SynchronizationOutcomeType::kReject, msg);
},
[&](const consensus::AgreementOnNone &msg) {
return process_reject(SynchronizationOutcomeType::kNothing, msg);
},
[this](const consensus::Future &msg) {
assert(msg.ledger_state->top_block_info.height + 1
< msg.round.block_round);
// we do not know the ledger state for round n, so we
// cannot claim that the bunch of votes we got is a
// commit certificate and hence we do not know if the
// block n is committed and cannot require its
// acquisition.
return this->processDifferent(msg, msg.round.block_round - 1);
});
return std::visit(
make_visitor(
[this](const consensus::PairValid &msg) {
assert(msg.ledger_state->top_block_info.height + 1
== msg.round.block_round);
return this->processNext(msg);
},
[this](const consensus::VoteOther &msg) {
assert(msg.ledger_state->top_block_info.height + 1
== msg.round.block_round);
return this->processDifferent(msg, msg.round.block_round);
},
[&](const consensus::ProposalReject &msg) {
return process_reject(SynchronizationOutcomeType::kReject, msg);
},
[&](const consensus::BlockReject &msg) {
return process_reject(SynchronizationOutcomeType::kReject, msg);
},
[&](const consensus::AgreementOnNone &msg) {
return process_reject(SynchronizationOutcomeType::kNothing,
msg);
},
[this](const consensus::Future &msg) {
assert(msg.ledger_state->top_block_info.height + 1
< msg.round.block_round);
// we do not know the ledger state for round n, so we
// cannot claim that the bunch of votes we got is a
// commit certificate and hence we do not know if the
// block n is committed and cannot require its
// acquisition.
return this->processDifferent(msg, msg.round.block_round - 1);
}),
object);
}

ametsuchi::CommitResult SynchronizerImpl::downloadAndCommitMissingBlocks(
Expand Down Expand Up @@ -152,46 +156,45 @@ namespace iroha {
return mutable_factory_->createMutableStorage(command_executor_);
}

SynchronizationEvent SynchronizerImpl::processNext(
std::optional<SynchronizationEvent> SynchronizerImpl::processNext(
const consensus::PairValid &msg) {
log_->info("at handleNext");
const auto notify =
[this,
&msg](std::shared_ptr<const iroha::LedgerState> &&ledger_state) {
return SynchronizationEvent{SynchronizationOutcomeType::kCommit,
msg.round,
std::move(ledger_state)};
};
if (mutable_factory_->preparedCommitEnabled()) {
auto result = mutable_factory_->commitPrepared(msg.block);
if (iroha::expected::hasValue(result)) {
return notify(std::move(result).assumeValue());
return SynchronizationEvent{SynchronizationOutcomeType::kCommit,
msg.round,
std::move(std::move(result).assumeValue())};
}
log_->error("Error committing prepared block: {}",
result.assumeError());
}
auto commit_result =
getStorage() | [&](auto &&storage) -> ametsuchi::CommitResult {
if (storage->apply(msg.block)) {
return mutable_factory_->commit(std::move(storage));
} else {
return "Block failed to apply.";
}
};
return std::move(commit_result)
.match(
[&notify](auto &&ledger_state) {
return notify(std::move(ledger_state.value));
},
[this, &msg](const auto &error) {
this->log_->error("Failed to commit: {}", error.error);
return SynchronizationEvent{SynchronizationOutcomeType::kError,
msg.round,
msg.ledger_state};
});
auto maybe_storage = getStorage();

if (expected::hasError(maybe_storage)) {
log_->error("Failed to commit: {}", maybe_storage.assumeError());
return std::nullopt;
}

if (not maybe_storage.assumeValue()->apply(msg.block)) {
log_->error("Block failed to apply.");
return std::nullopt;
}

auto maybe_result =
mutable_factory_->commit(std::move(maybe_storage.assumeValue()));

if (expected::hasError(maybe_result)) {
log_->error("Failed to commit: {}", maybe_result.assumeError());
return std::nullopt;
}

return SynchronizationEvent{SynchronizationOutcomeType::kCommit,
msg.round,
std::move(maybe_result.assumeValue())};
}

SynchronizationEvent SynchronizerImpl::processDifferent(
std::optional<SynchronizationEvent> SynchronizerImpl::processDifferent(
const consensus::Synchronizable &msg,
shared_model::interface::types::HeightType required_height) {
log_->info("at handleDifferent");
Expand All @@ -201,23 +204,19 @@ namespace iroha {
required_height,
msg.public_keys);

return commit_result.match(
[this, &msg](auto &value) {
auto &ledger_state = value.value;
assert(ledger_state);
const auto new_height = ledger_state->top_block_info.height;
return SynchronizationEvent{SynchronizationOutcomeType::kCommit,
new_height != msg.round.block_round
? consensus::Round{new_height, 0}
: msg.round,
std::move(ledger_state)};
},
[this, &msg](const auto &error) {
log_->error("Synchronization failed: {}", error.error);
return SynchronizationEvent{SynchronizationOutcomeType::kError,
msg.round,
msg.ledger_state};
});
if (expected::hasError(commit_result)) {
log_->error("Synchronization failed: {}", commit_result.assumeError());
return std::nullopt;
}

auto &ledger_state = commit_result.assumeValue();
assert(ledger_state);
const auto new_height = ledger_state->top_block_info.height;
return SynchronizationEvent{SynchronizationOutcomeType::kCommit,
new_height != msg.round.block_round
? consensus::Round{new_height, 0}
: msg.round,
std::move(ledger_state)};
}

} // namespace synchronizer
Expand Down
7 changes: 4 additions & 3 deletions irohad/synchronizer/impl/synchronizer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace iroha {
std::shared_ptr<network::BlockLoader> block_loader,
logger::LoggerPtr log);

SynchronizationEvent processOutcome(
std::optional<SynchronizationEvent> processOutcome(
consensus::GateObject object) override;

private:
Expand All @@ -53,14 +53,15 @@ namespace iroha {
const shared_model::interface::types::PublicKeyCollectionType
&public_keys);

SynchronizationEvent processNext(const consensus::PairValid &msg);
std::optional<SynchronizationEvent> processNext(
const consensus::PairValid &msg);

/**
* Performs synchronization on rejects
* @param msg - consensus gate message with a list of peers and a round
* @param required_height - minimal top block height to be downloaded
*/
SynchronizationEvent processDifferent(
std::optional<SynchronizationEvent> processDifferent(
const consensus::Synchronizable &msg,
shared_model::interface::types::HeightType required_height);

Expand Down
4 changes: 3 additions & 1 deletion irohad/synchronizer/synchronizer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#ifndef IROHA_SYNCHRONIZER_HPP
#define IROHA_SYNCHRONIZER_HPP

#include <optional>

#include "consensus/gate_object.hpp"
#include "synchronizer/synchronizer_common.hpp"

Expand All @@ -19,7 +21,7 @@ namespace iroha {
/**
* Processing entry point for consensus outcome
*/
virtual SynchronizationEvent processOutcome(
virtual std::optional<SynchronizationEvent> processOutcome(
consensus::GateObject object) = 0;

virtual ~Synchronizer() = default;
Expand Down
1 change: 0 additions & 1 deletion irohad/synchronizer/synchronizer_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ namespace iroha {
kCommit,
kReject,
kNothing,
kError
};

/**
Expand Down
8 changes: 4 additions & 4 deletions test/module/irohad/consensus/yac/yac_gate_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ TEST_F(YacGateTest, YacGateSubscriptionTest) {

// verify that yac gate emit expected block
auto outcome = *gate->processOutcome(expected_commit);
auto block = boost::get<iroha::consensus::PairValid>(outcome).block;
auto block = std::get<iroha::consensus::PairValid>(outcome).block;
ASSERT_EQ(block, expected_block);

// verify that gate has put to cache block received from consensus
Expand Down Expand Up @@ -280,7 +280,7 @@ TEST_F(YacGateTest, DifferentCommit) {

// verify that yac gate emit expected block
auto outcome = *gate->processOutcome(expected_commit);
auto concrete_outcome = boost::get<iroha::consensus::VoteOther>(outcome);
auto concrete_outcome = std::get<iroha::consensus::VoteOther>(outcome);
auto public_keys = concrete_outcome.public_keys;
auto hash = concrete_outcome.hash;

Expand Down Expand Up @@ -319,7 +319,7 @@ TEST_F(YacGateTest, Future) {

// verify that yac gate emit expected block
auto outcome = *gate->processOutcome(FutureMessage{future_message});
auto concrete_outcome = boost::get<iroha::consensus::Future>(outcome);
auto concrete_outcome = std::get<iroha::consensus::Future>(outcome);

ASSERT_EQ(future_round, concrete_outcome.round);
}
Expand Down Expand Up @@ -387,7 +387,7 @@ class CommitFromTheFuture : public YacGateTest {
void validate() {
// verify that yac gate emit expected block
auto outcome = *gate->processOutcome(expected_commit);
auto concrete_outcome = boost::get<CommitType>(outcome);
auto concrete_outcome = std::get<CommitType>(outcome);

ASSERT_EQ(future_round, concrete_outcome.round);
}
Expand Down
Loading

0 comments on commit bfae15d

Please sign in to comment.