Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No spam sub function #249

Merged
merged 16 commits into from
Mar 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 135 additions & 100 deletions libraries/app/database_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@

#define GET_REQUIRED_FEES_MAX_RECURSION 4

typedef std::map< std::pair<graphene::chain::asset_id_type, graphene::chain::asset_id_type>, std::vector<fc::variant> > market_queue_type;

namespace graphene { namespace app {

class database_api_impl;
Expand All @@ -56,7 +58,7 @@ class database_api_impl : public std::enable_shared_from_this<database_api_impl>
fc::variants get_objects(const vector<object_id_type>& ids)const;

// Subscriptions
void set_subscribe_callback( std::function<void(const variant&)> cb, bool clear_filter );
void set_subscribe_callback( std::function<void(const variant&)> cb, bool notify_remove_create );
void set_pending_transaction_callback( std::function<void(const variant&)> cb );
void set_block_applied_callback( std::function<void(const variant& block_id)> cb );
void cancel_all_subscriptions();
Expand Down Expand Up @@ -160,22 +162,52 @@ class database_api_impl : public std::enable_shared_from_this<database_api_impl>
{
if( !_subscribe_callback )
return false;
return true;

return _subscribe_filter.contains( i );
}

void broadcast_updates( const vector<variant>& updates );
bool is_impacted_account( const flat_set<account_id_type>& accounts)
{
if( !_subscribed_accounts.size() || !accounts.size() )
return false;

return std::any_of(accounts.begin(), accounts.end(), [this](const account_id_type& account) {
return _subscribed_accounts.find(account) != _subscribed_accounts.end();
});
}

template<typename T>
void enqueue_if_subscribed_to_market(const object* obj, market_queue_type& queue, bool full_object=true)
{
const T* order = dynamic_cast<const T*>(obj);
FC_ASSERT( order != nullptr);

auto market = order->get_market();

auto sub = _market_subscriptions.find( market );
if( sub != _market_subscriptions.end() ) {
queue[market].emplace_back( full_object ? obj->to_variant() : fc::variant(obj->id) );
}
}

void broadcast_updates( const vector<variant>& updates );
void broadcast_market_updates( const market_queue_type& queue);
void handle_object_changed(bool force_notify, bool full_object, const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts, std::function<const object*(object_id_type id)> find_object);

/** called every time a block is applied to report the objects that were changed */
void on_objects_changed(const vector<object_id_type>& ids);
void on_objects_removed(const vector<const object*>& objs);
void on_objects_new(const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts);
void on_objects_changed(const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts);
void on_objects_removed(const vector<object_id_type>& ids, const vector<const object*>& objs, const flat_set<account_id_type>& impacted_accounts);
void on_applied_block();

mutable fc::bloom_filter _subscribe_filter;
bool _notify_remove_create = false;
mutable fc::bloom_filter _subscribe_filter;
std::set<account_id_type> _subscribed_accounts;
std::function<void(const fc::variant&)> _subscribe_callback;
std::function<void(const fc::variant&)> _pending_trx_callback;
std::function<void(const fc::variant&)> _block_applied_callback;

boost::signals2::scoped_connection _new_connection;
boost::signals2::scoped_connection _change_connection;
boost::signals2::scoped_connection _removed_connection;
boost::signals2::scoped_connection _applied_block_connection;
Expand All @@ -198,11 +230,14 @@ database_api::~database_api() {}
database_api_impl::database_api_impl( graphene::chain::database& db ):_db(db)
{
wlog("creating database api ${x}", ("x",int64_t(this)) );
_change_connection = _db.changed_objects.connect([this](const vector<object_id_type>& ids) {
on_objects_changed(ids);
_new_connection = _db.new_objects.connect([this](const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts) {
on_objects_new(ids, impacted_accounts);
});
_change_connection = _db.changed_objects.connect([this](const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts) {
on_objects_changed(ids, impacted_accounts);
});
_removed_connection = _db.removed_objects.connect([this](const vector<const object*>& objs) {
on_objects_removed(objs);
_removed_connection = _db.removed_objects.connect([this](const vector<object_id_type>& ids, const vector<const object*>& objs, const flat_set<account_id_type>& impacted_accounts) {
on_objects_removed(ids, objs, impacted_accounts);
});
_applied_block_connection = _db.applied_block.connect([this](const signed_block&){ on_applied_block(); });

Expand Down Expand Up @@ -258,24 +293,24 @@ fc::variants database_api_impl::get_objects(const vector<object_id_type>& ids)co
// //
//////////////////////////////////////////////////////////////////////

void database_api::set_subscribe_callback( std::function<void(const variant&)> cb, bool clear_filter )
void database_api::set_subscribe_callback( std::function<void(const variant&)> cb, bool notify_remove_create )
{
my->set_subscribe_callback( cb, clear_filter );
my->set_subscribe_callback( cb, notify_remove_create );
}

void database_api_impl::set_subscribe_callback( std::function<void(const variant&)> cb, bool clear_filter )
void database_api_impl::set_subscribe_callback( std::function<void(const variant&)> cb, bool notify_remove_create )
{
edump((clear_filter));
//edump((clear_filter));
_subscribe_callback = cb;
if( clear_filter || !cb )
{
static fc::bloom_parameters param;
param.projected_element_count = 10000;
param.false_positive_probability = 1.0/10000;
param.maximum_size = 1024*8*8*2;
param.compute_optimal_parameters();
_subscribe_filter = fc::bloom_filter(param);
}
_notify_remove_create = notify_remove_create;
_subscribed_accounts.clear();

static fc::bloom_parameters param;
param.projected_element_count = 10000;
param.false_positive_probability = 1.0/100;
param.maximum_size = 1024*8*8*2;
param.compute_optimal_parameters();
_subscribe_filter = fc::bloom_filter(param);
}

void database_api::set_pending_transaction_callback( std::function<void(const variant&)> cb )
Expand Down Expand Up @@ -568,7 +603,8 @@ std::map<std::string, full_account> database_api_impl::get_full_accounts( const

if( subscribe )
{
ilog( "subscribe to ${id}", ("id",account->name) );
FC_ASSERT( std::distance(_subscribed_accounts.begin(), _subscribed_accounts.end()) < 100 );
_subscribed_accounts.insert( account->get_id() );
subscribe_to_item( account->id );
}

Expand Down Expand Up @@ -630,7 +666,7 @@ std::map<std::string, full_account> database_api_impl::get_full_accounts( const
[&acnt] (const call_order_object& call) {
acnt.call_orders.emplace_back(call);
});
// get assets issued by user
auto asset_range = _db.get_index_type<asset_index>().indices().get<by_issuer>().equal_range(account->id);
std::for_each(asset_range.first, asset_range.second,
Expand Down Expand Up @@ -1802,108 +1838,107 @@ vector<blinded_balance_object> database_api_impl::get_blinded_balances( const fl

void database_api_impl::broadcast_updates( const vector<variant>& updates )
{
if( updates.size() ) {
if( updates.size() && _subscribe_callback ) {
auto capture_this = shared_from_this();
fc::async([capture_this,updates](){
capture_this->_subscribe_callback( fc::variant(updates) );
if(capture_this->_subscribe_callback)
capture_this->_subscribe_callback( fc::variant(updates) );
});
}
}

void database_api_impl::on_objects_removed( const vector<const object*>& objs )
void database_api_impl::broadcast_market_updates( const market_queue_type& queue)
{
/// we need to ensure the database_api is not deleted for the life of the async operation
if( _subscribe_callback )
{
vector<variant> updates;
updates.reserve(objs.size());

for( auto obj : objs )
updates.emplace_back( obj->id );
broadcast_updates( updates );
}

if( _market_subscriptions.size() )
if( queue.size() )
{
map< pair<asset_id_type, asset_id_type>, vector<variant> > broadcast_queue;
for( const auto& obj : objs )
{
const limit_order_object* order = dynamic_cast<const limit_order_object*>(obj);
if( order )
{
auto sub = _market_subscriptions.find( order->get_market() );
auto capture_this = shared_from_this();
fc::async([capture_this, this, queue](){
for( const auto& item : queue )
{
auto sub = _market_subscriptions.find(item.first);
if( sub != _market_subscriptions.end() )
broadcast_queue[order->get_market()].emplace_back( order->id );
}
}
if( broadcast_queue.size() )
{
auto capture_this = shared_from_this();
fc::async([capture_this,this,broadcast_queue](){
for( const auto& item : broadcast_queue )
{
auto sub = _market_subscriptions.find(item.first);
if( sub != _market_subscriptions.end() )
sub->second( fc::variant(item.second ) );
}
});
}
sub->second( fc::variant(item.second ) );
}
});
}
}

void database_api_impl::on_objects_changed(const vector<object_id_type>& ids)
void database_api_impl::on_objects_removed( const vector<object_id_type>& ids, const vector<const object*>& objs, const flat_set<account_id_type>& impacted_accounts)
{
vector<variant> updates;
map< pair<asset_id_type, asset_id_type>, vector<variant> > market_broadcast_queue;
handle_object_changed(_notify_remove_create, false, ids, impacted_accounts,
[objs](object_id_type id) -> const object* {
auto it = std::find_if(
objs.begin(), objs.end(),
[id](const object* o) {return o != nullptr && o->id == id;});

for(auto id : ids)
{
const object* obj = nullptr;
if( _subscribe_callback )
{
obj = _db.find_object( id );
if( obj )
{
updates.emplace_back( obj->to_variant() );
}
else
{
updates.emplace_back(id); // send just the id to indicate removal
}
if (it != objs.end())
return *it;

return nullptr;
}
);
}

void database_api_impl::on_objects_new(const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts)
{
handle_object_changed(_notify_remove_create, true, ids, impacted_accounts,
std::bind(&object_database::find_object, &_db, std::placeholders::_1)
);
}

void database_api_impl::on_objects_changed(const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts)
{
handle_object_changed(false, true, ids, impacted_accounts,
std::bind(&object_database::find_object, &_db, std::placeholders::_1)
);
}

if( _market_subscriptions.size() )
void database_api_impl::handle_object_changed(bool force_notify, bool full_object, const vector<object_id_type>& ids, const flat_set<account_id_type>& impacted_accounts, std::function<const object*(object_id_type id)> find_object)
{
if( _subscribe_callback )
{
vector<variant> updates;

for(auto id : ids)
{
if( !_subscribe_callback )
obj = _db.find_object( id );
if( obj )
if( force_notify || is_subscribed_to_item(id) || is_impacted_account(impacted_accounts) )
{
const limit_order_object* order = dynamic_cast<const limit_order_object*>(obj);
if( order )
if( full_object )
{
auto obj = find_object(id);
if( obj )
{
updates.emplace_back( obj->to_variant() );
}
}
else
{
auto sub = _market_subscriptions.find( order->get_market() );
if( sub != _market_subscriptions.end() )
market_broadcast_queue[order->get_market()].emplace_back( order->id );
updates.emplace_back( id );
}
}
}
}

auto capture_this = shared_from_this();
broadcast_updates(updates);
}

/// pushing the future back / popping the prior future if it is complete.
/// if a connection hangs then this could get backed up and result in
/// a failure to exit cleanly.
fc::async([capture_this,this,updates,market_broadcast_queue](){
if( _subscribe_callback ) _subscribe_callback( updates );
if( _market_subscriptions.size() )
{
market_queue_type broadcast_queue;

for( const auto& item : market_broadcast_queue )
for(auto id : ids)
{
auto sub = _market_subscriptions.find(item.first);
if( sub != _market_subscriptions.end() )
sub->second( fc::variant(item.second ) );
if( id.is<call_order_object>() )
{
enqueue_if_subscribed_to_market<call_order_object>( find_object(id), broadcast_queue, full_object );
}
else if( id.is<limit_order_object>() )
{
enqueue_if_subscribed_to_market<limit_order_object>( find_object(id), broadcast_queue, full_object );
}
}
});

broadcast_market_updates(broadcast_queue);
}
}

/** note: this method cannot yield because it is called in the middle of
Expand Down
1 change: 1 addition & 0 deletions libraries/chain/database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@
#include "db_market.cpp"
#include "db_update.cpp"
#include "db_witness_schedule.cpp"
#include "db_notify.cpp"
22 changes: 3 additions & 19 deletions libraries/chain/db_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <graphene/chain/block_summary_object.hpp>
#include <graphene/chain/global_property_object.hpp>
#include <graphene/chain/operation_history_object.hpp>

#include <graphene/chain/proposal_object.hpp>
#include <graphene/chain/transaction_object.hpp>
#include <graphene/chain/witness_object.hpp>
Expand Down Expand Up @@ -241,7 +242,7 @@ processed_transaction database::_push_transaction( const signed_transaction& trx
auto processed_trx = _apply_transaction( trx );
_pending_tx.push_back(processed_trx);

notify_changed_objects();
// notify_changed_objects();
// The transaction applied successfully. Merge its changes into the pending block session.
temp_session.merge();

Expand Down Expand Up @@ -544,24 +545,7 @@ void database::_apply_block( const signed_block& next_block )
notify_changed_objects();
} FC_CAPTURE_AND_RETHROW( (next_block.block_num()) ) }

void database::notify_changed_objects()
{ try {
if( _undo_db.enabled() )
{
const auto& head_undo = _undo_db.head();
vector<object_id_type> changed_ids; changed_ids.reserve(head_undo.old_values.size());
for( const auto& item : head_undo.old_values ) changed_ids.push_back(item.first);
for( const auto& item : head_undo.new_ids ) changed_ids.push_back(item);
vector<const object*> removed;
removed.reserve( head_undo.removed.size() );
for( const auto& item : head_undo.removed )
{
changed_ids.push_back( item.first );
removed.emplace_back( item.second.get() );
}
changed_objects(changed_ids);
}
} FC_CAPTURE_AND_RETHROW() }


processed_transaction database::apply_transaction(const signed_transaction& trx, uint32_t skip)
{
Expand Down
Loading