Skip to content
This repository has been archived by the owner on Apr 17, 2019. It is now read-only.

Commit

Permalink
Ordering Service Batch Support (#1586)
Browse files Browse the repository at this point in the history
Signed-off-by: Akvinikym <[email protected]>
  • Loading branch information
Akvinikym authored and l4l committed Jul 25, 2018
1 parent c529f0a commit 6fa98b1
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 61 deletions.
11 changes: 5 additions & 6 deletions irohad/network/ordering_service_transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

#include <memory>
#include "interfaces/iroha_internal/proposal.hpp"
#include "interfaces/transaction.hpp"
#include "interfaces/iroha_internal/transaction_batch.hpp"

namespace iroha {
namespace network {
Expand All @@ -31,12 +31,11 @@ namespace iroha {
class OrderingServiceNotification {
public:
/**
* Callback on receiving transaction
* @param transaction - transaction object itself
* Callback on receiving transaction(s)
* @param batch object, in which transaction(s) are packed
*/
virtual void onTransaction(
std::shared_ptr<shared_model::interface::Transaction>
transaction) = 0;
virtual void onBatch(
shared_model::interface::TransactionBatch &&batch) = 0;

virtual ~OrderingServiceNotification() = default;
};
Expand Down
47 changes: 32 additions & 15 deletions irohad/ordering/impl/ordering_service_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
*/

#include "ordering/impl/ordering_service_impl.hpp"

#include <algorithm>
#include <iterator>

#include "ametsuchi/ordering_service_persistent_state.hpp"
#include "ametsuchi/peer_query.hpp"
#include "backend/protobuf/proposal.hpp"
Expand All @@ -25,8 +27,9 @@ namespace iroha {
bool is_async)
: wsv_(wsv),
max_size_(max_size),
current_size_(0),
transport_(transport),
persistent_state_(persistent_state) {
persistent_state_(persistent_state){
log_ = logger::log("OrderingServiceImpl");

// restore state of ordering service from persistent storage
Expand All @@ -43,8 +46,8 @@ namespace iroha {
switch (v) {
case ProposalEvent::kTimerEvent:
return not queue_.empty();
case ProposalEvent::kTransactionEvent:
return queue_.unsafe_size() >= max_size_;
case ProposalEvent::kBatchEvent:
return current_size_.load() >= max_size_;
default:
BOOST_ASSERT_MSG(false, "Unknown value");
}
Expand All @@ -64,29 +67,43 @@ namespace iroha {
}
}

void OrderingServiceImpl::onTransaction(
std::shared_ptr<shared_model::interface::Transaction> transaction) {
queue_.push(transaction);
log_->info("Queue size is {}", queue_.unsafe_size());
void OrderingServiceImpl::onBatch(
shared_model::interface::TransactionBatch &&batch) {
std::shared_lock<std::shared_timed_mutex> batch_prop_lock(
batch_prop_mutex_);

current_size_.fetch_add(batch.transactions().size());
queue_.push(std::make_unique<shared_model::interface::TransactionBatch>(
std::move(batch)));
log_->info("Queue size is {}", current_size_.load());

// on_next calls should not be concurrent
std::lock_guard<std::mutex> lk(mutex_);
transactions_.get_subscriber().on_next(ProposalEvent::kTransactionEvent);
batch_prop_lock.unlock();

std::lock_guard<std::mutex> event_lock(event_mutex_);
transactions_.get_subscriber().on_next(ProposalEvent::kBatchEvent);
}

void OrderingServiceImpl::generateProposal() {
std::lock_guard<std::shared_timed_mutex> lock(batch_prop_mutex_);

// TODO 05/03/2018 andrei IR-1046 Server-side shared model object
// factories with move semantics
iroha::protocol::Proposal proto_proposal;
proto_proposal.set_height(proposal_height_++);
proto_proposal.set_created_time(iroha::time::now());
log_->info("Start proposal generation");
for (std::shared_ptr<shared_model::interface::Transaction> tx;
for (std::unique_ptr<shared_model::interface::TransactionBatch> batch;
static_cast<size_t>(proto_proposal.transactions_size()) < max_size_
and queue_.try_pop(tx);) {
*proto_proposal.add_transactions() =
std::move(static_cast<shared_model::proto::Transaction *>(tx.get())
->getTransport());
and queue_.try_pop(batch);) {
std::for_each(
batch->transactions().begin(),
batch->transactions().end(),
[this, &proto_proposal](auto &tx) {
*proto_proposal.add_transactions() =
std::static_pointer_cast<shared_model::proto::Transaction>(tx)
->getTransport();
current_size_--;
});
}

auto proposal = std::make_unique<shared_model::proto::Proposal>(
Expand Down
30 changes: 19 additions & 11 deletions irohad/ordering/impl/ordering_service_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
#ifndef IROHA_ORDERING_SERVICE_IMPL_HPP
#define IROHA_ORDERING_SERVICE_IMPL_HPP

#include <tbb/concurrent_queue.h>
#include <memory>
#include <mutex>
#include <shared_mutex>

#include <tbb/concurrent_queue.h>
#include <rxcpp/rx.hpp>

#include "logger/logger.hpp"
Expand Down Expand Up @@ -52,12 +53,11 @@ namespace iroha {
bool is_async = true);

/**
* Process transaction received from network
* Enqueues transaction and publishes corresponding event
* @param transaction
* Process transaction(s) received from network
* Enqueues transactions and publishes corresponding event
* @param batch, in which transactions are packed
*/
void onTransaction(std::shared_ptr<shared_model::interface::Transaction>
transaction) override;
void onBatch(shared_model::interface::TransactionBatch &&batch) override;

~OrderingServiceImpl() override;

Expand All @@ -73,7 +73,7 @@ namespace iroha {
/**
* Events for queue check strategy
*/
enum class ProposalEvent { kTransactionEvent, kTimerEvent };
enum class ProposalEvent { kBatchEvent, kTimerEvent };

/**
* Collect transactions from queue
Expand All @@ -84,14 +84,19 @@ namespace iroha {
std::shared_ptr<ametsuchi::PeerQuery> wsv_;

tbb::concurrent_queue<
std::shared_ptr<shared_model::interface::Transaction>>
std::unique_ptr<shared_model::interface::TransactionBatch>>
queue_;

/**
* max number of txs in proposal
*/
const size_t max_size_;

/**
* current number of transactions in a queue
*/
std::atomic_ulong current_size_;

std::shared_ptr<network::OrderingServiceTransport> transport_;

/**
Expand All @@ -115,9 +120,12 @@ namespace iroha {
rxcpp::composite_subscription handle_;

/**
* Mutex for incoming transactions
* Variables for concurrency
*/
std::mutex mutex_;
/// mutex for both batch and proposal generation
std::shared_timed_mutex batch_prop_mutex_;
/// mutex for events activating
std::mutex event_mutex_;

logger::Logger log_;
};
Expand Down
59 changes: 56 additions & 3 deletions irohad/ordering/impl/ordering_service_transport_grpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

#include "backend/protobuf/transaction.hpp"
#include "builders/protobuf/proposal.hpp"
#include "interfaces/common_objects/transaction_sequence_common.hpp"
#include "network/impl/grpc_channel_builder.hpp"
#include "validators/default_validator.hpp"

using namespace iroha::ordering;

Expand All @@ -35,14 +37,65 @@ grpc::Status OrderingServiceTransportGrpc::onTransaction(
if (subscriber_.expired()) {
log_->error("No subscriber");
} else {
subscriber_.lock()->onTransaction(
std::make_shared<shared_model::proto::Transaction>(
iroha::protocol::Transaction(*request)));
auto batch_result =
shared_model::interface::TransactionBatch::createTransactionBatch<
shared_model::validation::DefaultTransactionValidator>(
std::make_shared<shared_model::proto::Transaction>(
iroha::protocol::Transaction(*request)));
batch_result.match(
[this](iroha::expected::Value<shared_model::interface::TransactionBatch>
&batch) {
subscriber_.lock()->onBatch(std::move(batch.value));
},
[this](const iroha::expected::Error<std::string> &error) {
log_->error(
"Could not create batch from received single transaction: {}",
error.error);
});
}

return ::grpc::Status::OK;
}

grpc::Status OrderingServiceTransportGrpc::onBatch(
::grpc::ServerContext *context,
const protocol::TxList *request,
::google::protobuf::Empty *response) {
log_->info("OrderingServiceTransportGrpc::onBatch");
if (subscriber_.expired()) {
log_->error("No subscriber");
} else {
auto txs =
std::vector<std::shared_ptr<shared_model::interface::Transaction>>(
request->transactions_size());
std::transform(
std::begin(request->transactions()),
std::end(request->transactions()),
std::begin(txs),
[](const auto &tx) {
return std::make_shared<shared_model::proto::Transaction>(tx);
});

auto batch_result =
shared_model::interface::TransactionBatch::createTransactionBatch(
txs,
shared_model::validation::SignedTransactionsCollectionValidator<
shared_model::validation::DefaultTransactionValidator,
shared_model::validation::BatchOrderValidator>());
batch_result.match(
[this](iroha::expected::Value<shared_model::interface::TransactionBatch>
&batch) {
subscriber_.lock()->onBatch(std::move(batch.value));
},
[this](const iroha::expected::Error<std::string> &error) {
log_->error(
"Could not create batch from received transaction list: {}",
error.error);
});
}
return ::grpc::Status::OK;
}

void OrderingServiceTransportGrpc::publishProposal(
std::unique_ptr<shared_model::interface::Proposal> proposal,
const std::vector<std::string> &peers) {
Expand Down
4 changes: 4 additions & 0 deletions irohad/ordering/impl/ordering_service_transport_grpc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ namespace iroha {
const protocol::Transaction *request,
::google::protobuf::Empty *response) override;

grpc::Status onBatch(::grpc::ServerContext *context,
const protocol::TxList *request,
::google::protobuf::Empty *response) override;

~OrderingServiceTransportGrpc() = default;

private:
Expand Down
Loading

0 comments on commit 6fa98b1

Please sign in to comment.