Skip to content

Commit

Permalink
Fixes for most of valgrind errors related to unintialized values
Browse files Browse the repository at this point in the history
Including one additional debug code (related to read_loop MT access).
  • Loading branch information
vogel76 committed Aug 28, 2017
1 parent 0147618 commit 0bda58f
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 34 deletions.
4 changes: 2 additions & 2 deletions libraries/chain/block_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ namespace steemit { namespace chain {
std::fstream index_stream;
fc::path block_file;
fc::path index_file;
bool block_write;
bool index_write;
bool block_write = false;
bool index_write = false;

inline void check_block_read()
{
Expand Down
4 changes: 2 additions & 2 deletions libraries/net/include/graphene/net/message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ namespace graphene { namespace net {
*/
struct message_header
{
uint32_t size; // number of bytes in message, capped at MAX_MESSAGE_SIZE
uint32_t msg_type; // every channel gets a 16 bit message type specifier
uint32_t size = 0; // number of bytes in message, capped at MAX_MESSAGE_SIZE
uint32_t msg_type = 0; // every channel gets a 16 bit message type specifier
};

typedef fc::uint160_t message_hash_type;
Expand Down
40 changes: 20 additions & 20 deletions libraries/net/include/graphene/net/peer_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,25 +166,25 @@ namespace graphene { namespace net
};


size_t _total_queued_messages_size;
size_t _total_queued_messages_size = 0;
std::queue<std::unique_ptr<queued_message>, std::list<std::unique_ptr<queued_message> > > _queued_messages;
fc::future<void> _send_queued_messages_done;
public:
fc::time_point connection_initiation_time;
fc::time_point connection_closed_time;
fc::time_point connection_terminated_time;
peer_connection_direction direction;
peer_connection_direction direction = peer_connection_direction::unknown;
//connection_state state;
firewalled_state is_firewalled;
firewalled_state is_firewalled = firewalled_state::unknown;
fc::microseconds clock_offset;
fc::microseconds round_trip_delay;

our_connection_state our_state;
bool they_have_requested_close;
their_connection_state their_state;
bool we_have_requested_close;
our_connection_state our_state = our_connection_state::disconnected;
bool they_have_requested_close = false;
their_connection_state their_state = their_connection_state::disconnected;
bool we_have_requested_close = false;

connection_negotiation_status negotiation_status;
connection_negotiation_status negotiation_status = connection_negotiation_status::disconnected;
fc::oexception connection_closed_error;

fc::time_point get_connection_time()const { return _message_connection.get_connection_time(); }
Expand All @@ -199,7 +199,7 @@ namespace graphene { namespace net
* from the user_data field of the hello, or if none is present it will be filled with a
* copy of node_public_key */
node_id_t node_id;
uint32_t core_protocol_version;
uint32_t core_protocol_version = 0;
std::string user_agent;
fc::optional<std::string> graphene_git_revision_sha;
fc::optional<fc::time_point_sec> graphene_git_revision_unix_timestamp;
Expand All @@ -213,8 +213,8 @@ namespace graphene { namespace net
// its hello message. For outbound, they record what we sent the peer
// in our hello message
fc::ip::address inbound_address;
uint16_t inbound_port;
uint16_t outbound_port;
uint16_t inbound_port = 0;
uint16_t outbound_port = 0;
/// @}

typedef std::unordered_map<item_id, fc::time_point> item_to_time_map_type;
Expand All @@ -223,15 +223,15 @@ namespace graphene { namespace net
/// @{
boost::container::deque<item_hash_t> ids_of_items_to_get; /// id of items in the blockchain that this peer has told us about
std::set<item_hash_t> ids_of_items_being_processed; /// list of all items this peer has offered use that we've already handed to the client but the client hasn't finished processing
uint32_t number_of_unfetched_item_ids; /// number of items in the blockchain that follow ids_of_items_to_get but the peer hasn't yet told us their ids
bool peer_needs_sync_items_from_us;
bool we_need_sync_items_from_peer;
uint32_t number_of_unfetched_item_ids = 0; /// number of items in the blockchain that follow ids_of_items_to_get but the peer hasn't yet told us their ids
bool peer_needs_sync_items_from_us = false;
bool we_need_sync_items_from_peer = false;
fc::optional<boost::tuple<std::vector<item_hash_t>, fc::time_point> > item_ids_requested_from_peer; /// we check this to detect a timed-out request and in busy()
fc::time_point last_sync_item_received_time; /// the time we received the last sync item or the time we sent the last batch of sync item requests to this peer
std::set<item_hash_t> sync_items_requested_from_peer; /// ids of blocks we've requested from this peer during sync. fetch from another peer if this peer disconnects
item_hash_t last_block_delegate_has_seen; /// the hash of the last block this peer has told us about that the peer knows
fc::time_point_sec last_block_time_delegate_has_seen;
bool inhibit_fetching_sync_blocks;
bool inhibit_fetching_sync_blocks = false;
/// @}

/// non-synchronization state data
Expand Down Expand Up @@ -261,17 +261,17 @@ namespace graphene { namespace net
// blockchain catch up
fc::time_point transaction_fetching_inhibited_until;

uint32_t last_known_fork_block_number;
uint32_t last_known_fork_block_number = 0;

fc::future<void> accept_or_connect_task_done;

firewall_check_state_data *firewall_check_state;
firewall_check_state_data *firewall_check_state = nullptr;
#ifndef NDEBUG
private:
fc::thread* _thread;
unsigned _send_message_queue_tasks_running; // temporary debugging
fc::thread* _thread = nullptr;
unsigned _send_message_queue_tasks_running = 0; // temporary debugging
#endif
bool _currently_handling_message; // true while we're in the middle of handling a message from the remote system
bool _currently_handling_message = false; // true while we're in the middle of handling a message from the remote system
private:
peer_connection(peer_connection_delegate* delegate);
void destroy();
Expand Down
28 changes: 26 additions & 2 deletions libraries/net/message_oriented_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
#include <graphene/net/stcp_socket.hpp>
#include <graphene/net/config.hpp>

#include <atomic>

#ifdef DEFAULT_LOGGER
# undef DEFAULT_LOGGER
#endif
Expand Down Expand Up @@ -61,7 +63,7 @@ namespace graphene { namespace net {
fc::time_point _last_message_sent_time;

bool _send_message_in_progress;

std::atomic_bool readLoopInProgress;
#ifndef NDEBUG
fc::thread* _thread;
#endif
Expand Down Expand Up @@ -97,7 +99,8 @@ namespace graphene { namespace net {
_delegate(delegate),
_bytes_received(0),
_bytes_sent(0),
_send_message_in_progress(false)
_send_message_in_progress(false),
readLoopInProgress(false)
#ifndef NDEBUG
,_thread(&fc::thread::current())
#endif
Expand Down Expand Up @@ -137,6 +140,20 @@ namespace graphene { namespace net {
_sock.bind(local_endpoint);
}

class THelper final
{
std::atomic_bool* Flag;
public:
explicit THelper(std::atomic_bool* flag) : Flag(flag)
{
FC_ASSERT(*flag == false, "Only one thread at time can visit it");
*flag = true;
}
~THelper()
{
*Flag = false;
}
};

void message_oriented_connection_impl::read_loop()
{
Expand All @@ -145,6 +162,8 @@ namespace graphene { namespace net {
const int LEFTOVER = BUFFER_SIZE - sizeof(message_header);
static_assert(BUFFER_SIZE >= sizeof(message_header), "insufficient buffer");

THelper helper(&this->readLoopInProgress);

_connected_time = fc::time_point::now();

fc::oexception exception_to_rethrow;
Expand Down Expand Up @@ -261,8 +280,13 @@ namespace graphene { namespace net {
//pad the message we send to a multiple of 16 bytes
size_t size_with_padding = 16 * ((size_of_message_and_header + 15) / 16);
std::unique_ptr<char[]> padded_message(new char[size_with_padding]);

memcpy(padded_message.get(), (char*)&message_to_send, sizeof(message_header));
memcpy(padded_message.get() + sizeof(message_header), message_to_send.data.data(), message_to_send.size );
char* paddingSpace = padded_message.get() + sizeof(message_header) + message_to_send.size;
size_t toClean = size_with_padding - size_of_message_and_header;
memset(paddingSpace, 0, toClean);

_sock.write(padded_message.get(), size_with_padding);
_sock.flush();
_bytes_sent += size_with_padding;
Expand Down
24 changes: 16 additions & 8 deletions libraries/net/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5477,17 +5477,21 @@ namespace graphene { namespace net { namespace detail {
# define INVOKE_AND_COLLECT_STATISTICS(method_name, ...) \
try \
{ \
call_statistics_collector statistics_collector(#method_name, \
&_ ## method_name ## _execution_accumulator, \
&_ ## method_name ## _delay_before_accumulator, \
&_ ## method_name ## _delay_after_accumulator); \
if (_thread->is_current()) \
{ \
call_statistics_collector statistics_collector(#method_name, \
&_ ## method_name ## _execution_accumulator, \
&_ ## method_name ## _delay_before_accumulator, \
&_ ## method_name ## _delay_after_accumulator); \
call_statistics_collector::actual_execution_measurement_helper helper(statistics_collector); \
return _node_delegate->method_name(__VA_ARGS__); \
} \
else \
return _thread->async([&](){ \
call_statistics_collector statistics_collector(#method_name, \
&_ ## method_name ## _execution_accumulator, \
&_ ## method_name ## _delay_before_accumulator, \
&_ ## method_name ## _delay_after_accumulator); \
call_statistics_collector::actual_execution_measurement_helper helper(statistics_collector); \
return _node_delegate->method_name(__VA_ARGS__); \
}, "invoke " BOOST_STRINGIZE(method_name)).wait(); \
Expand All @@ -5509,17 +5513,21 @@ namespace graphene { namespace net { namespace detail {
}
#else
# define INVOKE_AND_COLLECT_STATISTICS(method_name, ...) \
call_statistics_collector statistics_collector(#method_name, \
&_ ## method_name ## _execution_accumulator, \
&_ ## method_name ## _delay_before_accumulator, \
&_ ## method_name ## _delay_after_accumulator); \
if (_thread->is_current()) \
{ \
call_statistics_collector statistics_collector(#method_name, \
&_ ## method_name ## _execution_accumulator, \
&_ ## method_name ## _delay_before_accumulator, \
&_ ## method_name ## _delay_after_accumulator); \
call_statistics_collector::actual_execution_measurement_helper helper(statistics_collector); \
return _node_delegate->method_name(__VA_ARGS__); \
} \
else \
return _thread->async([&](){ \
call_statistics_collector statistics_collector(#method_name, \
&_ ## method_name ## _execution_accumulator, \
&_ ## method_name ## _delay_before_accumulator, \
&_ ## method_name ## _delay_after_accumulator); \
call_statistics_collector::actual_execution_measurement_helper helper(statistics_collector); \
return _node_delegate->method_name(__VA_ARGS__); \
}, "invoke " BOOST_STRINGIZE(method_name)).wait()
Expand Down

0 comments on commit 0bda58f

Please sign in to comment.