Skip to content

Commit

Permalink
[block-150] consensus no sync (#22)
Browse files Browse the repository at this point in the history
* [bloc-150] consensus no sync

* Вынес slot_ms и blocks_per_slot в параметры конструктора

* Изменил логику шедулинга продюсеров
  • Loading branch information
justefg committed Apr 18, 2019
1 parent 48a3cfd commit cb4bad7
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 17 deletions.
12 changes: 10 additions & 2 deletions simulator/include/database.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,13 @@ class fork_db {
}

void insert(const fork_db_chain_type_ptr& chain) {
auto node = find(chain->base_block);
insert(*chain);
}

void insert(const fork_db_chain_type& chain) {
auto node = find(chain.base_block);
assert(node);
insert(node, chain->blocks);
insert(node, chain.blocks);
}

void insert(fork_db_node_ptr node, const vector<block_id_type>& blocks) {
Expand Down Expand Up @@ -95,6 +99,10 @@ class fork_db {
set_new_lib(node);
}

fork_db_node_ptr get_root() const {
return root;
}

private:
fork_db_node_ptr root;
size_t conf_number;
Expand Down
176 changes: 166 additions & 10 deletions simulator/include/simulator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,19 @@
#include <iostream>
#include <fstream>
#include <thread>
#include <numeric>
#include <chrono>
#include <fc/bitutil.hpp>

#include <database.hpp>

using namespace std;

ostream& operator<<(ostream& os, const block_id_type& block) {
os << block.str().substr(16, 4);
return os;
}

class Clock {
public:
Clock(): now_(0) {}
Expand Down Expand Up @@ -43,7 +53,7 @@ struct Task {
function<void(NodePtr)> cb;

bool operator<(const Task& m) const {
return at < m.at;
return at > m.at;
}
};

Expand All @@ -66,7 +76,7 @@ class Network {

template <typename T>
void bcast(const T&);

Network(Network&&) = default;
private:
uint32_t node_id;
TestRunner* runner;
Expand All @@ -76,13 +86,26 @@ class Network {
class Node {
public:
Node() = default;
explicit Node(int id, Network && net): id(id), net(net) {}
explicit Node(int id, Network && net, fork_db&& db): id(id), net(std::move(net)), db(std::move(db)) {}

template <typename T>
void send(uint32_t to, const T& msg) {
net.send(to, msg);
}

void apply_chain(const fork_db_chain_type& chain) {
stringstream ss;
ss << "[Node] #" << id << " ";
auto node_id = ss.str();
cout << node_id << "Received " << chain.blocks.size() << " blocks " << endl;
cout << node_id << "Ids [ " << chain.base_block << " -> ";
for (auto& block_id : chain.blocks) {
cout << block_id << ", ";
}
cout << "]" << endl;
db.insert(chain);
}

virtual void on_receive(void *) {
std::cout << "Received by " << id << std::endl;
}
Expand All @@ -96,20 +119,22 @@ class Node {
uint32_t id;
bool is_producer = true;
Network net;
fork_db db;
};


class TestRunner {
public:
TestRunner() = default;
explicit TestRunner(int instances) {
init_runner_data(instances);
explicit TestRunner(int instances, size_t slot_ms_ = 500, size_t blocks_per_slot_ = 1) :
slot_ms(slot_ms_), blocks_per_slot(blocks_per_slot_) {
init_runner_data(instances);
}

explicit TestRunner(const matrix_type& matrix) {
// TODO check that it's square matrix
init_runner_data(matrix.size());
delay_matrix = matrix;
count_dist_matrix();
}

void load_graph(const graph_type& graph) {
Expand All @@ -120,6 +145,7 @@ class TestRunner {
delay_matrix[i][j] = delay_matrix[j][i] = delay;
}
}
count_dist_matrix();
}

void load_graph_from_file(const char* filename) {
Expand All @@ -139,6 +165,7 @@ class TestRunner {
delay_matrix[from][to] = delay_matrix[to][from] = delay;
}
}
count_dist_matrix();
}

void load_matrix_from_file(const char* filename) {
Expand All @@ -152,21 +179,108 @@ class TestRunner {
in >> delay_matrix[i][j];
}
}
count_dist_matrix();
}

void load_matrix(const matrix_type& matrix) {
delay_matrix = matrix;
count_dist_matrix();
}

fork_db_chain_type create_blocks(NodePtr node) {
auto& db = node->db;
stringstream ss;
ss << "[Node] #" << node->id << " ";
auto node_id = ss.str();
cout << node_id << "Generating blocks node_id=" << " at " << tester_clock.now() << endl;
cout << node_id << "LIB " << db.last_irreversible_block_id() << endl;
auto head = db.get_master_head();
auto head_block_height = fc::endian_reverse_u32(head->block_id._hash[0]);
cout << node_id << "Head block height: " << head_block_height << endl;

cout << node_id << "Building on top of " << head->block_id << endl;
cout << node_id << "New blocks: ";

vector<block_id_type> blocks(blocks_per_slot);
for (int i = 0; i < blocks_per_slot; i++) {
auto block_height = head_block_height + i + 1;
blocks[i] = generate_block(block_height);
cout << blocks[i] << ", ";
}
db.insert(head, blocks);
cout << endl;
return fork_db_chain_type{head->block_id, blocks};
}

vector<int> get_ordering() {
vector<int> permutation(get_instances());
iota(permutation.begin(), permutation.end(), 0);
random_shuffle(permutation.begin(), permutation.end(), [](size_t n) { return rand() % n; });
return permutation;
}

void add_schedule_task(uint32_t at) {
Task task{RUNNER_ID, RUNNER_ID, at,
[&](NodePtr n) { schedule_producers(); }
};
add_task(std::move(task));
}

void schedule_producers() {
cout << "[TaskRunner] Scheduling PRODUCERS " << endl;
auto ordering = get_ordering();
auto now = tester_clock.now();
auto instances = get_instances();

for (int i = 0; i < instances; i++) {
int producer_id = ordering[i];
Task task;
task.at = now + i * slot_ms;
task.to = producer_id;
task.cb = [&](NodePtr node) {
auto chain = create_blocks(node);
relay_blocks(node, chain);
};
add_task(std::move(task));
}

schedule_time = now + instances * slot_ms;
add_schedule_task(schedule_time);
}

void relay_blocks(NodePtr node, const fork_db_chain_type& chain) {
uint32_t from = node->id;
for (uint32_t to = 0; to < get_instances(); to++) {
if (from != to && dist_matrix[from][to] != -1) {
Task task{from, to, tester_clock.now() + dist_matrix[from][to]};
task.cb = [chain=chain](NodePtr node) {
node->apply_chain(chain);
};
add_task(std::move(task));
}
}
}

template <typename TNode = Node>
void run() {
init_nodes<TNode>(get_instances());
init_connections();

schedule_producers();
while (!timeline.empty()) {
auto task = timeline.top();
cout << "[TaskRunner] " << "current_time=" << task.at << " schedule_time=" << schedule_time << endl;
timeline.pop();
tester_clock.set(task.at);
task.cb(nodes[task.to]);
if (task.to == RUNNER_ID) {
cout << "[TaskRunner] Executing task for " << "TaskRunner" << endl;
task.cb(nullptr);
} else {
cout << "[TaskRunner] Executing task for " << task.to << endl;
task.cb(nodes[task.to]);
}

this_thread::sleep_for(chrono::milliseconds(3000));
}
}

Expand All @@ -178,18 +292,35 @@ class TestRunner {
return delay_matrix;
}

const matrix_type& get_dist_matrix() const {
return dist_matrix;
}

void add_task(Task && task) {
timeline.push(task);
}

const block_id_type genesys_block;
const int DELAY_MS = 10;
const uint32_t RUNNER_ID = 10000000;

size_t slot_ms;
size_t blocks_per_slot;

private:
block_id_type generate_block(uint32_t block_height) {
auto block_id = digest_type::hash(fc::crypto::private_key::generate());
block_id._hash[0] = fc::endian_reverse_u32(block_height);
return block_id;
}

template <typename TNode>
void init_nodes(uint32_t count) {
nodes.clear();
for (auto i = 0; i < count; ++i) {
auto node = std::make_shared<TNode>(i, Network(i, this));
auto conf_number = blocks_per_slot * get_instances();
auto node = std::make_shared<TNode>(i, Network(i, this), fork_db(genesys_block, conf_number));
nodes.push_back(std::static_pointer_cast<Node>(node));

}
}

Expand All @@ -212,11 +343,36 @@ class TestRunner {
delay_matrix[i] = vector<int>(instances, -1);
delay_matrix[i][i] = 0;
}

dist_matrix = delay_matrix;
}

void count_dist_matrix() {
int n = get_instances();
dist_matrix = delay_matrix;

for (int k = 0; k < n; ++k) {
for (int i = 0; i < n; ++i) {
for (int j = 0; j < n; ++j) {
if (dist_matrix[i][k] != -1 && dist_matrix[k][j] != -1) {
auto new_dist = dist_matrix[i][k] + dist_matrix[k][j];
auto& cur_dist = dist_matrix[i][j];
if (cur_dist == -1) {
cur_dist = new_dist;
} else {
cur_dist = min(cur_dist, new_dist);
}
}
}
}
}
}

vector<NodePtr> nodes;
vector<vector<int> > delay_matrix;
matrix_type delay_matrix;
matrix_type dist_matrix;
priority_queue<Task> timeline;
uint32_t schedule_time = 0;
};


Expand Down
10 changes: 5 additions & 5 deletions simulator/tests/no_byzantine_nodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@
#include <iostream>

#include <simulator.hpp>
#include <cstdlib>
#include <ctime>
#include <random>

using namespace std;

using std::string;

const char *actualValTrue = "hello gtest";
const char *expectVal = "hello gtest";

TEST(A, B) {
srand(66);
TestRunner t(3);
vector<pair<int, int> > v0{{1, 2}, {2, 10}};
vector<pair<int, int> > v0{{1, 2}, {2, 5}};
graph_type g;
g.push_back(v0);
t.load_graph(g);
assert(t.get_delay_matrix()[0][1] == 2);
t.run<>();
}

0 comments on commit cb4bad7

Please sign in to comment.