From 4ccc26a526a03699635fa1898a629e3941b707b7 Mon Sep 17 00:00:00 2001 From: AndreaLanfranchi Date: Fri, 18 May 2018 16:00:29 +0200 Subject: [PATCH 01/13] Async Disconnect and Reconnect Attempt to solve problems related to disconnect/reconnect for timeouts on responses or no-work. On PoolManager the IO_SERVICE is now persistent and never exits On Linux * Disconnect/Reconnect without issues for plain TCP connections * SSL Connections get shut down properly but at first reconnect attempt it goes timeout. After that reconnection is immediate. On Windows * Feedback needed. --- ethminer/MinerAux.h | 7 +- libethcore/Farm.h | 1 + libpoolprotocols/PoolClient.h | 3 +- libpoolprotocols/PoolManager.cpp | 153 ++++---- libpoolprotocols/PoolManager.h | 14 +- libpoolprotocols/getwork/EthGetworkClient.h | 2 + libpoolprotocols/stratum/EthStratumClient.cpp | 345 ++++++++++++++---- libpoolprotocols/stratum/EthStratumClient.h | 25 +- libpoolprotocols/testing/SimulateClient.h | 1 + 9 files changed, 376 insertions(+), 175 deletions(-) diff --git a/ethminer/MinerAux.h b/ethminer/MinerAux.h index 508a662818..b408a956d8 100644 --- a/ethminer/MinerAux.h +++ b/ethminer/MinerAux.h @@ -197,7 +197,7 @@ class MinerCLI { string url = argv[++i]; if (url == "exit") // add fake scheme and port to 'exit' url - url = "stratum://exit:1"; + url = "stratum+tcp://-:x@exit:0"; URI uri; try { uri = url; @@ -809,8 +809,7 @@ class MinerCLI Farm f; f.setSealers(sealers); - PoolManager mgr(client, f, m_minerType); - mgr.setReconnectTries(m_maxFarmRetries); + PoolManager mgr(client, f, m_minerType, m_maxFarmRetries); // If we are in simulation mode we add a fake connection if (m_mode == OperationMode::Simulation) { @@ -848,7 +847,7 @@ class MinerCLI } mgr.stop(); - + cnote << "Terminated !"; exit(0); } diff --git a/libethcore/Farm.h b/libethcore/Farm.h index 448e4bb9f3..f4a0098ae1 100644 --- a/libethcore/Farm.h +++ b/libethcore/Farm.h @@ -178,6 +178,7 @@ class Farm: public FarmFace m_hashrateTimer.cancel(); m_io_service.stop(); + m_serviceThread.join(); m_lastProgresses.clear(); } diff --git a/libpoolprotocols/PoolClient.h b/libpoolprotocols/PoolClient.h index 24ce15aa3e..1fd230b089 100644 --- a/libpoolprotocols/PoolClient.h +++ b/libpoolprotocols/PoolClient.h @@ -28,6 +28,7 @@ namespace dev virtual void submitHashrate(string const & rate) = 0; virtual void submitSolution(Solution solution) = 0; virtual bool isConnected() = 0; + virtual bool isPendingState() = 0; virtual string ActiveEndPoint() = 0; using SolutionAccepted = std::function; @@ -46,7 +47,7 @@ namespace dev bool m_authorized = false; bool m_connected = false; bool m_connection_changed = false; - boost::asio::ip::tcp::endpoint m_endpoint; + boost::asio::ip::basic_endpoint m_endpoint; URI m_conn; diff --git a/libpoolprotocols/PoolManager.cpp b/libpoolprotocols/PoolManager.cpp index ad21a82392..e0bacd7bad 100644 --- a/libpoolprotocols/PoolManager.cpp +++ b/libpoolprotocols/PoolManager.cpp @@ -19,26 +19,17 @@ static string diffToDisplay(double diff) return ss.str(); } -PoolManager::PoolManager(PoolClient * client, Farm &farm, MinerType const & minerType) : Worker("main"), m_farm(farm), m_minerType(minerType) +PoolManager::PoolManager(PoolClient * client, Farm &farm, MinerType const & minerType, unsigned maxTries) : Worker("main"), m_farm(farm), m_minerType(minerType) { p_client = client; - + m_maxConnectionAttempts = maxTries; + p_client->onConnected([&]() { + m_connectionAttempt = 0; cnote << "Connected to " << m_connections[m_activeConnectionIdx].Host() << p_client->ActiveEndPoint(); - if (!m_farm.isMining()) - { - cnote << "Spinning up miners..."; - if (m_minerType == MinerType::CL) - m_farm.start("opencl", false); - else if (m_minerType == MinerType::CUDA) - m_farm.start("cuda", false); - else if (m_minerType == MinerType::Mixed) { - m_farm.start("cuda", false); - m_farm.start("opencl", true); - } - } }); + p_client->onDisconnected([&]() { cnote << "Disconnected from " + m_connections[m_activeConnectionIdx].Host() << p_client->ActiveEndPoint(); @@ -48,13 +39,12 @@ PoolManager::PoolManager(PoolClient * client, Farm &farm, MinerType const & mine m_farm.stop(); } - if (m_running) - tryReconnect(); }); + p_client->onWorkReceived([&](WorkPackage const& wp) { - m_reconnectTry = 0; - m_farm.setWork(wp); + + cnote << "New job" << wp.header << " " + m_connections[m_activeConnectionIdx].Host() + p_client->ActiveEndPoint(); if (wp.boundary != m_lastBoundary) { using namespace boost::multiprecision; @@ -64,8 +54,24 @@ PoolManager::PoolManager(PoolClient * client, Farm &farm, MinerType const & mine const uint256_t divisor(string("0x") + m_lastBoundary.hex()); cnote << "New pool difficulty:" << EthWhite << diffToDisplay(double(dividend / divisor)) << EthReset; } - cnote << "New job" << wp.header << " " + m_connections[m_activeConnectionIdx].Host() + p_client->ActiveEndPoint(); + + if (!m_farm.isMining()) + { + cnote << "Spinning up miners..."; + if (m_minerType == MinerType::CL) + m_farm.start("opencl", false); + else if (m_minerType == MinerType::CUDA) + m_farm.start("cuda", false); + else if (m_minerType == MinerType::Mixed) { + m_farm.start("cuda", false); + m_farm.start("opencl", true); + } + } + + m_farm.setWork(wp); + }); + p_client->onSolutionAccepted([&](bool const& stale) { using namespace std::chrono; @@ -76,6 +82,7 @@ PoolManager::PoolManager(PoolClient * client, Farm &farm, MinerType const & mine cnote << EthLime "**Accepted" EthReset << (stale ? "(stale)" : "") << ss.str(); m_farm.acceptedSolution(stale); }); + p_client->onSolutionRejected([&](bool const& stale) { using namespace std::chrono; @@ -139,9 +146,8 @@ void PoolManager::stop() if (m_running) { cnote << "Shutting down..."; m_running = false; - - if (p_client->isConnected()) - p_client->disconnect(); + if (p_client->isConnected()) { p_client->disconnect(); } + stopWorking(); if (m_farm.isMining()) { @@ -155,9 +161,50 @@ void PoolManager::workLoop() { while (m_running) { - this_thread::sleep_for(chrono::seconds(1)); - m_hashrateReportingTimePassed++; + + // Take action only if not pending state (connecting/disconnecting) + // Otherwise do nothing and wait until connection state is NOT pending + if (!p_client->isPendingState()) { + + if (!p_client->isConnected()) { + + // Rotate connections if above max attempts threshold + if (m_connectionAttempt >= m_maxConnectionAttempts) { + + m_connectionAttempt = 0; + m_activeConnectionIdx++; + if (m_activeConnectionIdx == m_connections.size()) { + m_activeConnectionIdx = 0; + } + + } + + if (m_connections[m_activeConnectionIdx].Host() != "exit") { + + // Count connectionAttempts + m_connectionAttempt++; + + // Invoke connections + p_client->setConnection(m_connections[m_activeConnectionIdx]); + m_farm.set_pool_addresses(m_connections[m_activeConnectionIdx].Host(), m_connections[m_activeConnectionIdx].Port()); + cnote << "Selected pool" << (m_connections[m_activeConnectionIdx].Host() + ":" + toString(m_connections[m_activeConnectionIdx].Port())); + p_client->connect(); + + } + else { + + dev::setThreadName("main"); + cnote << "No more failover connections."; + m_running = false; + } + + } + + } + // Hashrate reporting + m_hashrateReportingTimePassed++; + if (m_hashrateReportingTimePassed > m_hashrateReportingTime) { auto mp = m_farm.miningProgress(); std::string h = toHex(toCompactBigEndian(mp.rate(), 1)); @@ -171,6 +218,9 @@ void PoolManager::workLoop() p_client->submitHashrate("0x" + ss.str()); m_hashrateReportingTimePassed = 0; } + + this_thread::sleep_for(chrono::seconds(1)); + } } @@ -178,10 +228,6 @@ void PoolManager::addConnection(URI &conn) { m_connections.push_back(conn); - if (m_connections.size() == 1) { - p_client->setConnection(conn); - m_farm.set_pool_addresses(conn.Host(), conn.Port()); - } } void PoolManager::clearConnections() @@ -197,60 +243,9 @@ void PoolManager::start() if (m_connections.size() > 0) { m_running = true; startWorking(); - - // Try to connect to pool - cnote << "Selected pool" << (m_connections[m_activeConnectionIdx].Host() + ":" + toString(m_connections[m_activeConnectionIdx].Port())); - p_client->connect(); } else { cwarn << "Manager has no connections defined!"; } } -void PoolManager::tryReconnect() -{ - // No connections available, so why bother trying to reconnect - if (m_connections.size() <= 0) { - cwarn << "Manager has no connections defined!"; - return; - } - - for (auto i = 4; --i; this_thread::sleep_for(chrono::seconds(1))) { - cnote << "Retrying in " << i << "... \r"; - } - - // We do not need awesome logic here, we just have one connection anyway - if (m_connections.size() == 1) { - - cnote << "Selected pool" << (m_connections[m_activeConnectionIdx].Host() + ":" + toString(m_connections[m_activeConnectionIdx].Port())); - p_client->connect(); - return; - } - - // Fallback logic, tries current connection multiple times and then switches to - // one of the other connections. - if (m_reconnectTries > m_reconnectTry) { - - m_reconnectTry++; - cnote << "Selected pool" << (m_connections[m_activeConnectionIdx].Host() + ":" + toString(m_connections[m_activeConnectionIdx].Port())); - p_client->connect(); - } - else { - m_reconnectTry = 0; - m_activeConnectionIdx++; - if (m_activeConnectionIdx >= m_connections.size()) { - m_activeConnectionIdx = 0; - } - if (m_connections[m_activeConnectionIdx].Host() == "exit") { - dev::setThreadName("main"); - cnote << "Exiting because reconnecting is not possible."; - stop(); - } - else { - p_client->setConnection(m_connections[m_activeConnectionIdx]); - m_farm.set_pool_addresses(m_connections[m_activeConnectionIdx].Host(), m_connections[m_activeConnectionIdx].Port()); - cnote << "Selected pool" << (m_connections[m_activeConnectionIdx].Host() + ":" + toString(m_connections[m_activeConnectionIdx].Port())); - p_client->connect(); - } - } -} diff --git a/libpoolprotocols/PoolManager.h b/libpoolprotocols/PoolManager.h index c21b795274..f559410d7d 100644 --- a/libpoolprotocols/PoolManager.h +++ b/libpoolprotocols/PoolManager.h @@ -19,12 +19,11 @@ namespace dev class PoolManager : public Worker { public: - PoolManager(PoolClient * client, Farm &farm, MinerType const & minerType); + PoolManager(PoolClient * client, Farm &farm, MinerType const & minerType, unsigned maxTries); void addConnection(URI &conn); void clearConnections(); void start(); void stop(); - void setReconnectTries(unsigned const & reconnectTries) { m_reconnectTries = reconnectTries; }; bool isConnected() { return p_client->isConnected(); }; bool isRunning() { return m_running; }; @@ -34,17 +33,20 @@ namespace dev bool m_running = false; void workLoop() override; - unsigned m_reconnectTries = 3; - unsigned m_reconnectTry = 0; - std::vector m_connections; + + unsigned m_connectionAttempt = 0; + unsigned m_maxConnectionAttempts = 0; unsigned m_activeConnectionIdx = 0; + + std::vector m_connections; + h256 m_lastBoundary = h256(); PoolClient *p_client; Farm &m_farm; MinerType m_minerType; std::chrono::steady_clock::time_point m_submit_time; - void tryReconnect(); + }; } } diff --git a/libpoolprotocols/getwork/EthGetworkClient.h b/libpoolprotocols/getwork/EthGetworkClient.h index 4ece93112f..13a347346d 100644 --- a/libpoolprotocols/getwork/EthGetworkClient.h +++ b/libpoolprotocols/getwork/EthGetworkClient.h @@ -20,6 +20,8 @@ class EthGetworkClient : public PoolClient, Worker void disconnect() override; bool isConnected() override { return m_connected; } + bool isPendingState() override { return false; } + string ActiveEndPoint() override { return ""; }; void submitHashrate(string const & rate) override; diff --git a/libpoolprotocols/stratum/EthStratumClient.cpp b/libpoolprotocols/stratum/EthStratumClient.cpp index 81ac707ab9..92a3035c03 100644 --- a/libpoolprotocols/stratum/EthStratumClient.cpp +++ b/libpoolprotocols/stratum/EthStratumClient.cpp @@ -43,21 +43,26 @@ static void diffToTarget(uint32_t *target, double diff) EthStratumClient::EthStratumClient(int worktimeout, int responsetimeout, string const & email, bool const & submitHashrate) : PoolClient(), m_worktimeout(worktimeout), m_responsetimeout(responsetimeout), + m_io_work(m_io_service), + m_io_work_timer(m_io_service), m_socket(nullptr), m_conntimer(m_io_service), m_worktimer(m_io_service), m_responsetimer(m_io_service), m_resolver(m_io_service), + m_endpoints(), m_email(email), m_submit_hashrate(submitHashrate) { if (m_submit_hashrate) m_submit_hashrate_id = h256::random().hex(); + } EthStratumClient::~EthStratumClient() { + m_io_work_timer.cancel(); m_io_service.stop(); m_serviceThread.join(); } @@ -65,13 +70,31 @@ EthStratumClient::~EthStratumClient() void EthStratumClient::connect() { + // Start service thread immediately + if (!m_serviceThread.joinable()) { + + m_io_work_timer.expires_from_now(boost::posix_time::seconds(60)); + m_io_work_timer.async_wait(boost::bind(&EthStratumClient::io_work_timer_handler, this, boost::asio::placeholders::error)); + + // Start io service + m_serviceThread = std::thread{ boost::bind(&boost::asio::io_service::run, &m_io_service) }; + + } + + // Prevent unnecessary and potentially dangerous recursion + if (m_connecting.load(std::memory_order::memory_order_relaxed)) { + return; + } + else { + m_connecting.store(true, std::memory_order::memory_order_relaxed); + } + + m_connected.store(false, std::memory_order_relaxed); m_subscribed.store(false, std::memory_order_relaxed); m_authorized.store(false, std::memory_order_relaxed); - // Prepare Socket - if (m_conn.SecLevel() != SecureLevel::NONE) { boost::asio::ssl::context::method method = boost::asio::ssl::context::tls_client; @@ -81,6 +104,7 @@ void EthStratumClient::connect() boost::asio::ssl::context ctx(method); m_securesocket = std::make_shared >(m_io_service, ctx); m_socket = &m_securesocket->next_layer(); + m_securesocket->set_verify_mode(boost::asio::ssl::verify_peer); @@ -138,31 +162,40 @@ void EthStratumClient::connect() setsockopt(m_socket->native_handle(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); #endif - // Begin resolve and connect - tcp::resolver::query q(m_conn.Host(), toString(m_conn.Port())); - m_resolver.async_resolve(q, - boost::bind(&EthStratumClient::resolve_handler, - this, boost::asio::placeholders::error, - boost::asio::placeholders::iterator)); + // Begin resolve all ips associated to hostname + if (m_endpoints.empty()) { + tcp::resolver::query q(m_conn.Host(), toString(m_conn.Port())); + m_resolver.async_resolve(q, + boost::bind(&EthStratumClient::resolve_handler, + this, boost::asio::placeholders::error, + boost::asio::placeholders::iterator)); - // IMPORTANT !! - if (m_serviceThread.joinable()) - { - // If the service thread have been created try to reset the service. - m_io_service.reset(); } - else - { - // Otherwise, if the first time here, create new thread. - m_serviceThread = std::thread{ boost::bind(&boost::asio::io_service::run, &m_io_service) }; + else { + start_connect(); } -} + //// IMPORTANT !! + //if (m_serviceThread.joinable()) + //{ + // // If the service thread have been created try to reset the service. + // //m_serviceThread.join(); + // //m_io_service.rese(); + //} + //else { + + // // Start io service + // m_serviceThread = std::thread{ boost::bind(&boost::asio::io_service::run, &m_io_service) }; + + //} + + + -#define BOOST_ASIO_ENABLE_CANCELIO +} void EthStratumClient::disconnect() { @@ -174,9 +207,12 @@ void EthStratumClient::disconnect() m_disconnecting.store(true, std::memory_order::memory_order_relaxed); } - m_conntimer.cancel(); - m_worktimer.cancel(); - m_responsetimer.cancel(); + m_io_service.post([&] { + m_conntimer.cancel(); + m_worktimer.cancel(); + m_responsetimer.cancel(); + }); + m_response_pending = false; if (m_socket && m_socket->is_open()) { @@ -186,24 +222,61 @@ void EthStratumClient::disconnect() boost::system::error_code sec; if (m_conn.SecLevel() != SecureLevel::NONE) { - m_securesocket->shutdown(sec); + + cnote << "Initiated m_securesocket->async_shutdown"; + + // This will initiate the exchange of "close_notify" message among parties. + // If both client and server are connected then we expect the handler with success + // As there may be a connection issue we also endorse a timeout + m_securesocket->async_shutdown(boost::bind(&EthStratumClient::onSSLShutdownCompleted, this, boost::asio::placeholders::error)); + + m_conntimer.expires_from_now(boost::posix_time::seconds(m_responsetimeout)); + m_conntimer.async_wait(boost::bind(&EthStratumClient::check_connect_timeout, this, boost::asio::placeholders::error)); + + + // Rest of disconnection is performed asynchronously + return; } else { + m_nonsecuresocket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, sec); + m_socket->close(); } - m_socket->close(); - m_io_service.stop(); + } catch (std::exception const& _e) { cwarn << "Error while disconnecting:" << _e.what(); } + disconnect_finalize(); + + } + + + +} + +void EthStratumClient::disconnect_finalize() { + + cnote << "Entered disconnect_finalize()"; + + if (m_conn.SecLevel() != SecureLevel::NONE) { + + m_securesocket->lowest_layer().shutdown(boost::asio::ip::tcp::socket::shutdown_both); + m_securesocket->lowest_layer().close(); + m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both); + m_socket->close(); m_securesocket = nullptr; - m_nonsecuresocket = nullptr; m_socket = nullptr; + } + else { + m_socket = nullptr; + m_nonsecuresocket = nullptr; + + } m_subscribed.store(false, std::memory_order_relaxed); m_authorized.store(false, std::memory_order_relaxed); @@ -213,25 +286,34 @@ void EthStratumClient::disconnect() m_disconnecting.store(false, std::memory_order::memory_order_relaxed); // Trigger handlers - if (m_onDisconnected) { m_onDisconnected(); } + if (m_onDisconnected) { m_onDisconnected(); } + } void EthStratumClient::resolve_handler(const boost::system::error_code& ec, tcp::resolver::iterator i) -{ - dev::setThreadName("stratum"); +{ if (!ec) { + dev::setThreadName("stratum"); - // Start Connection Process and set timeout timer - start_connect(i); - m_conntimer.async_wait(boost::bind(&EthStratumClient::check_connect_timeout, this, boost::asio::placeholders::error)); + while (i != tcp::resolver::iterator()) + { + m_endpoints.push(i->endpoint()); + i++; + } + + // Resolver has finished so begin connecting + start_connect(); } else { + dev::setThreadName("stratum"); cwarn << "Could not resolve host " << m_conn.Host() << ", " << ec.message(); - disconnect(); + // Trigger handlers + if (m_onDisconnected) { m_onDisconnected(); } + } } @@ -240,27 +322,64 @@ void EthStratumClient::reset_work_timeout() m_worktimer.cancel(); m_worktimer.expires_from_now(boost::posix_time::seconds(m_worktimeout)); m_worktimer.async_wait(boost::bind(&EthStratumClient::work_timeout_handler, this, boost::asio::placeholders::error)); + //m_worktimer.async_wait(&work_timeout_handler); + } -void EthStratumClient::start_connect(tcp::resolver::iterator endpoint_iter) +void EthStratumClient::start_connect() { - if (endpoint_iter != tcp::resolver::iterator()) { + if (!m_endpoints.empty()) { - cnote << ("Trying " + toString(endpoint_iter->endpoint()) + " ..."); - - m_conntimer.expires_from_now(boost::posix_time::seconds(m_responsetimeout)); + // Sets active end point and removes + // it from queue + m_endpoint = m_endpoints.front(); + m_endpoints.pop(); + dev::setThreadName("stratum"); + cnote << ("Trying " + toString(m_endpoint) + " ..."); + + m_conntimer.expires_from_now(boost::posix_time::seconds(m_responsetimeout)); + m_conntimer.async_wait(boost::bind(&EthStratumClient::check_connect_timeout, this, boost::asio::placeholders::error)); + // Start connecting async - m_socket->async_connect(endpoint_iter->endpoint(), - boost::bind(&EthStratumClient::connect_handler, this, _1, endpoint_iter)); + if (m_conn.SecLevel() != SecureLevel::NONE) { + m_securesocket->lowest_layer().async_connect(m_endpoint, boost::bind(&EthStratumClient::connect_handler, this, _1)); + } + else { + m_socket->async_connect(m_endpoint, + boost::bind(&EthStratumClient::connect_handler, this, _1)); + } + } else { + + + dev::setThreadName("stratum"); + m_connecting.store(false, std::memory_order_relaxed); + cwarn << "No ip addresses to try for host:" << m_conn.Host(); - cwarn << "No more addresses to try !"; - disconnect(); + // Trigger handlers + if (m_onDisconnected) { m_onDisconnected(); } } + + + //if (endpoint_iter != tcp::resolver::iterator()) { + + // cnote << ("Trying " + toString(endpoint_iter->endpoint()) + " ..."); + // + // m_conntimer.expires_from_now(boost::posix_time::seconds(m_responsetimeout)); + + // // Start connecting async + // m_socket->async_connect(endpoint_iter->endpoint(), + // boost::bind(&EthStratumClient::connect_handler, this, _1, endpoint_iter)); + + //} + //else { + + + //} } void EthStratumClient::check_connect_timeout(const boost::system::error_code& ec) @@ -271,13 +390,26 @@ void EthStratumClient::check_connect_timeout(const boost::system::error_code& ec // the current time since a new asynchronous operation may have moved the // deadline before this actor had a chance to run. - if (!isConnected()) { + if (isPendingState()) { if (m_conntimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()) { - // The deadline has passed. The socket is closed so that any outstanding - // asynchronous operations are cancelled. - m_socket->close(); + // The deadline has passed. + + if (m_connecting.load(std::memory_order_relaxed)) { + + // The socket is closed so that any outstanding + // asynchronous connection operations are cancelled. + m_socket->close(); + + } + + // This is set for SSL disconnection + if (m_disconnecting.load(std::memory_order_relaxed) && (m_conn.SecLevel() != SecureLevel::NONE)) { + if (m_securesocket->lowest_layer().is_open()) { + m_securesocket->lowest_layer().close(); + } + } // There is no longer an active deadline. The expiry is set to positive // infinity so that the actor takes no action until a new deadline is set. @@ -290,7 +422,7 @@ void EthStratumClient::check_connect_timeout(const boost::system::error_code& ec } -void EthStratumClient::connect_handler(const boost::system::error_code& ec, tcp::resolver::iterator i) +void EthStratumClient::connect_handler(const boost::system::error_code& ec) { dev::setThreadName("stratum"); @@ -298,14 +430,14 @@ void EthStratumClient::connect_handler(const boost::system::error_code& ec, tcp: // Timeout has run before if (!m_socket->is_open()) { - cwarn << ("Error " + toString((i)->endpoint()) + " [Timeout]"); + cwarn << ("Error " + toString(m_endpoint) + " [Timeout]"); // Try the next available endpoint. - start_connect(++i); + start_connect(); } else if (ec) { - cwarn << ("Error " + toString((i)->endpoint()) + " [" + ec.message() + "]"); + cwarn << ("Error " + toString(m_endpoint) + " [" + ec.message() + "]"); // We need to close the socket used in the previous connection attempt // before starting a new one. @@ -313,22 +445,23 @@ void EthStratumClient::connect_handler(const boost::system::error_code& ec, tcp: m_socket->close(); // Try the next available endpoint. - start_connect(++i); + start_connect(); } else { - // Immediately set connected flag to prevent + // Immediately set connecting flag to prevent // occurrence of subsequents timeouts (if any) - m_connected.store(true, std::memory_order_relaxed); + m_connecting.store(false, std::memory_order_relaxed); m_conntimer.cancel(); - m_endpoint = (i)->endpoint(); - if (m_conn.SecLevel() != SecureLevel::NONE) { boost::system::error_code hec; + + cnote << "Start handshake ..."; m_securesocket->handshake(boost::asio::ssl::stream_base::client, hec); + cnote << "End handshake"; if (hec) { cwarn << "SSL/TLS Handshake failed: " << hec.message(); @@ -349,11 +482,17 @@ void EthStratumClient::connect_handler(const boost::system::error_code& ec, tcp: // Disconnection is triggered on no more IP available m_connected.store(false, std::memory_order_relaxed); m_socket->close(); - start_connect(++i); + start_connect(); return; } } + // Here is where we're properly connected + m_connected.store(true, std::memory_order_relaxed); + + // Clean buffer from any previous stale data + m_sendBuffer.consume(4096); + // Trigger event handlers and begin counting for the next job if (m_onConnected) { m_onConnected(); } reset_work_timeout(); @@ -978,13 +1117,25 @@ void EthStratumClient::processReponse(Json::Value& responseObject) } -void EthStratumClient::work_timeout_handler(const boost::system::error_code& ec) { +void EthStratumClient::io_work_timer_handler(const boost::system::error_code& ec) { - dev::setThreadName("stratum"); - m_worktimer.cancel(); + if (!ec) { + + // This does absolutely nothing aside resubmitting timer + // ensuring io_service's queue has always something to do + m_io_work_timer.expires_from_now(boost::posix_time::seconds(60)); + m_io_work_timer.async_wait(boost::bind(&EthStratumClient::io_work_timer_handler, this, boost::asio::placeholders::error)); + } + +} + + +void EthStratumClient::work_timeout_handler(const boost::system::error_code& ec) { + if (!ec) { if (isConnected()) { + dev::setThreadName("stratum"); cwarn << "No new work received in " << m_worktimeout << " seconds."; disconnect(); } @@ -994,10 +1145,9 @@ void EthStratumClient::work_timeout_handler(const boost::system::error_code& ec) void EthStratumClient::response_timeout_handler(const boost::system::error_code& ec) { - dev::setThreadName("stratum"); - if (!ec) { if (isConnected() && m_response_pending) { + dev::setThreadName("stratum"); cwarn << "No response received in" << m_responsetimeout << "seconds."; disconnect(); } @@ -1108,6 +1258,11 @@ void EthStratumClient::onRecvSocketDataCompleted(const boost::system::error_code dev::setThreadName("stratum"); + // Due to the nature of io_service's queue and + // the implementation of the loop this event may trigger + // late after clean disconnection. Check status of connection + // before triggering all stack of calls + if (!ec && bytes_transferred > 0) { // Extract received message @@ -1115,28 +1270,41 @@ void EthStratumClient::onRecvSocketDataCompleted(const boost::system::error_code std::string message; getline(is, message); - if (!message.empty()) { + if (isConnected()) { + + if (!message.empty()) { + + // Test validity of chunk and process + Json::Value jMsg; + Json::Reader jRdr; + if (jRdr.parse(message, jMsg)) { + processReponse(jMsg); + } + else { + cwarn << "Got invalid Json message :" + jRdr.getFormattedErrorMessages(); + } - // Test validity of chunk and process - Json::Value jMsg; - Json::Reader jRdr; - if (jRdr.parse(message, jMsg)) { - processReponse(jMsg); - } - else { - cwarn << "Got invalid Json message :" + jRdr.getFormattedErrorMessages(); } + // Eventually keep reading from socket + recvSocketData(); + } - // Eventually keep reading from socket - if (isConnected()) { recvSocketData(); } } else { if (isConnected()) { - if (ec == boost::asio::error::eof) + + if ( + (ec.category() == boost::asio::error::get_ssl_category()) && + (ERR_GET_REASON(ec.value()) == SSL_RECEIVED_SHUTDOWN) + ) + { + cnote << "SSL Stream remotely closed by" << m_conn.Host(); + } + else if (ec == boost::asio::error::eof) { cnote << "Connection remotely closed by" << m_conn.Host(); } @@ -1175,13 +1343,40 @@ void EthStratumClient::sendSocketData(Json::Value const & jReq) { void EthStratumClient::onSendSocketDataCompleted(const boost::system::error_code& ec) { - dev::setThreadName("stratum"); - if (ec) { + + if ((ec.category() == boost::asio::error::get_ssl_category()) && (SSL_R_PROTOCOL_IS_SHUTDOWN == ERR_GET_REASON(ec.value()))) { + cnote << "onSendSocketDataCompleted. Error code is " << ec.message(); + m_securesocket->lowest_layer().close(); + } + if (isConnected()) { + dev::setThreadName("stratum"); cwarn << "Socket write failed: " + ec.message(); disconnect(); } } } + +void EthStratumClient::onSSLShutdownCompleted(const boost::system::error_code& ec) { + + cnote << "onSSLShutdownCompleted Error code is : " << ec.message(); + disconnect_finalize(); + + //if (ec == boost::asio::error::operation_aborted) { + + // // Timeout has triggered before + // // We can only close the transport + // disconnect_finalize(); + //} + //else { + + //} + // + // (void)ec; + //m_securesocket->lowest_layer().shutdown(tcp::socket::shutdown_both); + //m_securesocket->lowest_layer().close(); + + +} \ No newline at end of file diff --git a/libpoolprotocols/stratum/EthStratumClient.h b/libpoolprotocols/stratum/EthStratumClient.h index 845781da01..5e5980c1da 100644 --- a/libpoolprotocols/stratum/EthStratumClient.h +++ b/libpoolprotocols/stratum/EthStratumClient.h @@ -31,11 +31,9 @@ class EthStratumClient : public PoolClient void disconnect(); // Connected and Connection Statuses - bool isConnected() - { - return m_connected.load(std::memory_order_relaxed) && - !m_disconnecting.load(std::memory_order_relaxed); - } + bool isConnected() override { return m_connected.load(std::memory_order_relaxed) && !isPendingState(); } + bool isPendingState() override { return (m_connecting.load(std::memory_order_relaxed) || m_disconnecting.load(std::memory_order_relaxed));} + bool isSubscribed() { return m_subscribed.load(std::memory_order_relaxed); } bool isAuthorized() { return m_authorized.load(std::memory_order_relaxed); } string ActiveEndPoint() { return " [" + toString(m_endpoint) + "]"; }; @@ -48,10 +46,13 @@ class EthStratumClient : public PoolClient private: + void disconnect_finalize(); + void resolve_handler(const boost::system::error_code& ec, boost::asio::ip::tcp::resolver::iterator i); - void start_connect(boost::asio::ip::tcp::resolver::iterator endpoint_iter); + void start_connect(); void check_connect_timeout(const boost::system::error_code& ec); - void connect_handler(const boost::system::error_code& ec, boost::asio::ip::tcp::resolver::iterator i); + void connect_handler(const boost::system::error_code& ec); + void io_work_timer_handler(const boost::system::error_code& ec); void work_timeout_handler(const boost::system::error_code& ec); void response_timeout_handler(const boost::system::error_code& ec); @@ -64,7 +65,7 @@ class EthStratumClient : public PoolClient void onRecvSocketDataCompleted(const boost::system::error_code& ec, std::size_t bytes_transferred); void sendSocketData(Json::Value const & jReq); void onSendSocketDataCompleted(const boost::system::error_code& ec); - + void onSSLShutdownCompleted(const boost::system::error_code& ec); string m_worker; // eth-proxy only; No ! It's for all !!! @@ -72,6 +73,7 @@ class EthStratumClient : public PoolClient std::atomic m_authorized = { false }; std::atomic m_connected = { false }; std::atomic m_disconnecting = { false }; + std::atomic m_connecting = { false }; // seconds to trigger a work_timeout (overwritten in constructor) int m_worktimeout; @@ -83,8 +85,10 @@ class EthStratumClient : public PoolClient bool m_stale = false; - std::thread m_serviceThread; ///< The IO service thread. - boost::asio::io_service m_io_service; + std::thread m_serviceThread; // The IO service thread. + boost::asio::io_service m_io_service; // The IO service itself + boost::asio::io_service::work m_io_work; // The IO work which prevents io_service.run() to return on no work thus terminating thread + boost::asio::deadline_timer m_io_work_timer; // A dummy timer to keep io_service with something to do boost::asio::ip::tcp::socket *m_socket; // Use shared ptrs to avoid crashes due to async_writes @@ -104,6 +108,7 @@ class EthStratumClient : public PoolClient bool m_response_pending = false; boost::asio::ip::tcp::resolver m_resolver; + std::queue> m_endpoints; string m_email; string m_rate; diff --git a/libpoolprotocols/testing/SimulateClient.h b/libpoolprotocols/testing/SimulateClient.h index 1c4552b6e8..e8d2dff6b8 100644 --- a/libpoolprotocols/testing/SimulateClient.h +++ b/libpoolprotocols/testing/SimulateClient.h @@ -21,6 +21,7 @@ class SimulateClient : public PoolClient, Worker void disconnect() override; bool isConnected() override { return m_connected; } + bool isPendingState() override { return false; } string ActiveEndPoint() override { return ""; }; void submitHashrate(string const & rate) override; From 6e1d650fd661e78c6c88e7afe7f1b511841b44e2 Mon Sep 17 00:00:00 2001 From: AndreaLanfranchi Date: Sun, 20 May 2018 20:54:07 +0200 Subject: [PATCH 02/13] Use of boost::shared_ptr --- ethminer/MinerAux.h | 44 ++++++++++--------- libpoolprotocols/PoolManager.cpp | 16 +++---- libpoolprotocols/PoolManager.h | 18 +++++--- libpoolprotocols/stratum/EthStratumClient.cpp | 30 ++++++------- libpoolprotocols/stratum/EthStratumClient.h | 11 ++++- 5 files changed, 69 insertions(+), 50 deletions(-) diff --git a/ethminer/MinerAux.h b/ethminer/MinerAux.h index b408a956d8..ba7da4bc5c 100644 --- a/ethminer/MinerAux.h +++ b/ethminer/MinerAux.h @@ -783,27 +783,29 @@ class MinerCLI sealers["cuda"] = Farm::SealerDescriptor{&CUDAMiner::instances, [](FarmFace& _farm, unsigned _index){ return new CUDAMiner(_farm, _index); }}; #endif - PoolClient *client = nullptr; - - if (m_mode == OperationMode::Stratum) { - client = new EthStratumClient(m_worktimeout, m_responsetimeout, m_email, m_report_stratum_hashrate); - } - else if (m_mode == OperationMode::Farm) { - client = new EthGetworkClient(m_farmRecheckPeriod); - } - else if (m_mode == OperationMode::Simulation) { - client = new SimulateClient(20, m_benchmarkBlock); - } - else { - cwarn << "Invalid OperationMode"; - exit(1); - } - - // Should not happen! - if (!client) { - cwarn << "Invalid PoolClient"; - exit(1); - } + EthStratumClient::pointer client = EthStratumClient::create(m_worktimeout, m_responsetimeout, m_email, m_report_stratum_hashrate); + + //PoolClient *client = nullptr; + + //if (m_mode == OperationMode::Stratum) { + // client = new EthStratumClient(m_worktimeout, m_responsetimeout, m_email, m_report_stratum_hashrate); + //} + //else if (m_mode == OperationMode::Farm) { + // client = new EthGetworkClient(m_farmRecheckPeriod); + //} + //else if (m_mode == OperationMode::Simulation) { + // client = new SimulateClient(20, m_benchmarkBlock); + //} + //else { + // cwarn << "Invalid OperationMode"; + // exit(1); + //} + + //// Should not happen! + //if (!client) { + // cwarn << "Invalid PoolClient"; + // exit(1); + //} //sealers, m_minerType Farm f; diff --git a/libpoolprotocols/PoolManager.cpp b/libpoolprotocols/PoolManager.cpp index e0bacd7bad..c045d77a0c 100644 --- a/libpoolprotocols/PoolManager.cpp +++ b/libpoolprotocols/PoolManager.cpp @@ -19,7 +19,7 @@ static string diffToDisplay(double diff) return ss.str(); } -PoolManager::PoolManager(PoolClient * client, Farm &farm, MinerType const & minerType, unsigned maxTries) : Worker("main"), m_farm(farm), m_minerType(minerType) +PoolManager::PoolManager(EthStratumClient::pointer client, Farm &farm, MinerType const & minerType, unsigned maxTries) : m_farm(farm), m_minerType(minerType) { p_client = client; m_maxConnectionAttempts = maxTries; @@ -143,11 +143,10 @@ PoolManager::PoolManager(PoolClient * client, Farm &farm, MinerType const & mine void PoolManager::stop() { - if (m_running) { + if (m_running.load(std::memory_order_relaxed)) { cnote << "Shutting down..."; - m_running = false; + m_running.store(false, std::memory_order_relaxed); if (p_client->isConnected()) { p_client->disconnect(); } - stopWorking(); if (m_farm.isMining()) { @@ -159,7 +158,7 @@ void PoolManager::stop() void PoolManager::workLoop() { - while (m_running) + while (m_running.load(std::memory_order_relaxed)) { // Take action only if not pending state (connecting/disconnecting) @@ -195,7 +194,8 @@ void PoolManager::workLoop() dev::setThreadName("main"); cnote << "No more failover connections."; - m_running = false; + m_running.store(false, std::memory_order_relaxed); + continue; } } @@ -241,8 +241,8 @@ void PoolManager::clearConnections() void PoolManager::start() { if (m_connections.size() > 0) { - m_running = true; - startWorking(); + m_running.store (true, std::memory_order_relaxed); + m_workThread = std::thread{ boost::bind(&PoolManager::workLoop, this) }; } else { cwarn << "Manager has no connections defined!"; diff --git a/libpoolprotocols/PoolManager.h b/libpoolprotocols/PoolManager.h index f559410d7d..96cb446624 100644 --- a/libpoolprotocols/PoolManager.h +++ b/libpoolprotocols/PoolManager.h @@ -6,6 +6,12 @@ #include #include "PoolClient.h" +#include "stratum/EthStratumClient.h" +#include "getwork/EthGetworkClient.h" +#include "testing/SimulateClient.h" + + + #if ETH_DBUS #include "DBusInt.h" #endif @@ -16,10 +22,10 @@ namespace dev { namespace eth { - class PoolManager : public Worker + class PoolManager { public: - PoolManager(PoolClient * client, Farm &farm, MinerType const & minerType, unsigned maxTries); + PoolManager(EthStratumClient::pointer client, Farm &farm, MinerType const & minerType, unsigned maxTries); void addConnection(URI &conn); void clearConnections(); void start(); @@ -31,18 +37,20 @@ namespace dev unsigned m_hashrateReportingTime = 60; unsigned m_hashrateReportingTimePassed = 0; - bool m_running = false; - void workLoop() override; + std::atomic m_running = { false }; + void workLoop(); unsigned m_connectionAttempt = 0; unsigned m_maxConnectionAttempts = 0; unsigned m_activeConnectionIdx = 0; std::vector m_connections; + std::thread m_workThread; h256 m_lastBoundary = h256(); - PoolClient *p_client; + EthStratumClient::pointer p_client; + //PoolClient *p_client; Farm &m_farm; MinerType m_minerType; std::chrono::steady_clock::time_point m_submit_time; diff --git a/libpoolprotocols/stratum/EthStratumClient.cpp b/libpoolprotocols/stratum/EthStratumClient.cpp index 92a3035c03..4cbcefd052 100644 --- a/libpoolprotocols/stratum/EthStratumClient.cpp +++ b/libpoolprotocols/stratum/EthStratumClient.cpp @@ -74,7 +74,7 @@ void EthStratumClient::connect() if (!m_serviceThread.joinable()) { m_io_work_timer.expires_from_now(boost::posix_time::seconds(60)); - m_io_work_timer.async_wait(boost::bind(&EthStratumClient::io_work_timer_handler, this, boost::asio::placeholders::error)); + m_io_work_timer.async_wait(boost::bind(&EthStratumClient::io_work_timer_handler, shared_from_this(), boost::asio::placeholders::error)); // Start io service m_serviceThread = std::thread{ boost::bind(&boost::asio::io_service::run, &m_io_service) }; @@ -168,7 +168,7 @@ void EthStratumClient::connect() tcp::resolver::query q(m_conn.Host(), toString(m_conn.Port())); m_resolver.async_resolve(q, boost::bind(&EthStratumClient::resolve_handler, - this, boost::asio::placeholders::error, + shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::iterator)); } @@ -228,10 +228,10 @@ void EthStratumClient::disconnect() // This will initiate the exchange of "close_notify" message among parties. // If both client and server are connected then we expect the handler with success // As there may be a connection issue we also endorse a timeout - m_securesocket->async_shutdown(boost::bind(&EthStratumClient::onSSLShutdownCompleted, this, boost::asio::placeholders::error)); + m_securesocket->async_shutdown(boost::bind(&EthStratumClient::onSSLShutdownCompleted, shared_from_this(), boost::asio::placeholders::error)); m_conntimer.expires_from_now(boost::posix_time::seconds(m_responsetimeout)); - m_conntimer.async_wait(boost::bind(&EthStratumClient::check_connect_timeout, this, boost::asio::placeholders::error)); + m_conntimer.async_wait(boost::bind(&EthStratumClient::check_connect_timeout, shared_from_this(), boost::asio::placeholders::error)); // Rest of disconnection is performed asynchronously @@ -321,7 +321,7 @@ void EthStratumClient::reset_work_timeout() { m_worktimer.cancel(); m_worktimer.expires_from_now(boost::posix_time::seconds(m_worktimeout)); - m_worktimer.async_wait(boost::bind(&EthStratumClient::work_timeout_handler, this, boost::asio::placeholders::error)); + m_worktimer.async_wait(boost::bind(&EthStratumClient::work_timeout_handler, shared_from_this(), boost::asio::placeholders::error)); //m_worktimer.async_wait(&work_timeout_handler); } @@ -339,15 +339,15 @@ void EthStratumClient::start_connect() cnote << ("Trying " + toString(m_endpoint) + " ..."); m_conntimer.expires_from_now(boost::posix_time::seconds(m_responsetimeout)); - m_conntimer.async_wait(boost::bind(&EthStratumClient::check_connect_timeout, this, boost::asio::placeholders::error)); + m_conntimer.async_wait(boost::bind(&EthStratumClient::check_connect_timeout, shared_from_this(), boost::asio::placeholders::error)); // Start connecting async if (m_conn.SecLevel() != SecureLevel::NONE) { - m_securesocket->lowest_layer().async_connect(m_endpoint, boost::bind(&EthStratumClient::connect_handler, this, _1)); + m_securesocket->lowest_layer().async_connect(m_endpoint, boost::bind(&EthStratumClient::connect_handler, shared_from_this(), _1)); } else { m_socket->async_connect(m_endpoint, - boost::bind(&EthStratumClient::connect_handler, this, _1)); + boost::bind(&EthStratumClient::connect_handler, shared_from_this(), _1)); } @@ -416,7 +416,7 @@ void EthStratumClient::check_connect_timeout(const boost::system::error_code& ec m_conntimer.expires_at(boost::posix_time::pos_infin); } // Put the actor back to sleep. - m_conntimer.async_wait(boost::bind(&EthStratumClient::check_connect_timeout, this, boost::asio::placeholders::error)); + m_conntimer.async_wait(boost::bind(&EthStratumClient::check_connect_timeout, shared_from_this(), boost::asio::placeholders::error)); } } @@ -1124,7 +1124,7 @@ void EthStratumClient::io_work_timer_handler(const boost::system::error_code& ec // This does absolutely nothing aside resubmitting timer // ensuring io_service's queue has always something to do m_io_work_timer.expires_from_now(boost::posix_time::seconds(60)); - m_io_work_timer.async_wait(boost::bind(&EthStratumClient::io_work_timer_handler, this, boost::asio::placeholders::error)); + m_io_work_timer.async_wait(boost::bind(&EthStratumClient::io_work_timer_handler, shared_from_this(), boost::asio::placeholders::error)); } @@ -1188,7 +1188,7 @@ void EthStratumClient::submitSolution(Solution solution) { m_responsetimer.cancel(); m_responsetimer.expires_from_now(boost::posix_time::seconds(m_responsetimeout)); - m_responsetimer.async_wait(boost::bind(&EthStratumClient::response_timeout_handler, this, boost::asio::placeholders::error)); + m_responsetimer.async_wait(boost::bind(&EthStratumClient::response_timeout_handler, shared_from_this(), boost::asio::placeholders::error)); Json::Value jReq; @@ -1242,13 +1242,13 @@ void EthStratumClient::recvSocketData() { if (m_conn.SecLevel() != SecureLevel::NONE) { async_read_until(*m_securesocket, m_recvBuffer, "\n", - boost::bind(&EthStratumClient::onRecvSocketDataCompleted, this, + boost::bind(&EthStratumClient::onRecvSocketDataCompleted, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } else { async_read_until(*m_nonsecuresocket, m_recvBuffer, "\n", - boost::bind(&EthStratumClient::onRecvSocketDataCompleted, this, + boost::bind(&EthStratumClient::onRecvSocketDataCompleted, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } @@ -1329,13 +1329,13 @@ void EthStratumClient::sendSocketData(Json::Value const & jReq) { if (m_conn.SecLevel() != SecureLevel::NONE) { async_write(*m_securesocket, m_sendBuffer, - boost::bind(&EthStratumClient::onSendSocketDataCompleted, this, boost::asio::placeholders::error)); + boost::bind(&EthStratumClient::onSendSocketDataCompleted, shared_from_this(), boost::asio::placeholders::error)); } else { async_write(*m_nonsecuresocket, m_sendBuffer, - boost::bind(&EthStratumClient::onSendSocketDataCompleted, this, boost::asio::placeholders::error)); + boost::bind(&EthStratumClient::onSendSocketDataCompleted, shared_from_this(), boost::asio::placeholders::error)); } diff --git a/libpoolprotocols/stratum/EthStratumClient.h b/libpoolprotocols/stratum/EthStratumClient.h index 5e5980c1da..ebce8704a2 100644 --- a/libpoolprotocols/stratum/EthStratumClient.h +++ b/libpoolprotocols/stratum/EthStratumClient.h @@ -4,6 +4,8 @@ #include #include #include +#include +#include #include #include #include @@ -18,10 +20,17 @@ using namespace std; using namespace dev; using namespace dev::eth; -class EthStratumClient : public PoolClient +class EthStratumClient : public PoolClient, public boost::enable_shared_from_this { public: + typedef boost::shared_ptr pointer; + + static pointer create(int worktimeout, int responsetimeout, string const & email, bool const & submitHashrate) + { + return pointer(new EthStratumClient(worktimeout, responsetimeout, email, submitHashrate)); + }; + typedef enum { STRATUM = 0, ETHPROXY, ETHEREUMSTRATUM } StratumProtocol; EthStratumClient(int worktimeout, int responsetimeout, string const & email, bool const & submitHashrate); From 9d1f99dd90765247a3bb53fbfb74dd5154ab6245 Mon Sep 17 00:00:00 2001 From: AndreaLanfranchi Date: Mon, 21 May 2018 19:57:41 +0200 Subject: [PATCH 03/13] Working UNSTABLE Reconnects ok. --- ethminer/MinerAux.h | 2 +- libethcore/Farm.h | 6 +- libpoolprotocols/PoolManager.cpp | 29 +++--- libpoolprotocols/stratum/EthStratumClient.cpp | 98 +++++++++---------- libpoolprotocols/stratum/EthStratumClient.h | 7 +- 5 files changed, 66 insertions(+), 76 deletions(-) diff --git a/ethminer/MinerAux.h b/ethminer/MinerAux.h index ba7da4bc5c..6108a1755b 100644 --- a/ethminer/MinerAux.h +++ b/ethminer/MinerAux.h @@ -788,7 +788,7 @@ class MinerCLI //PoolClient *client = nullptr; //if (m_mode == OperationMode::Stratum) { - // client = new EthStratumClient(m_worktimeout, m_responsetimeout, m_email, m_report_stratum_hashrate); + // // client = new EthStratumClient(m_worktimeout, m_responsetimeout, m_email, m_report_stratum_hashrate); //} //else if (m_mode == OperationMode::Farm) { // client = new EthGetworkClient(m_farmRecheckPeriod); diff --git a/libethcore/Farm.h b/libethcore/Farm.h index f4a0098ae1..7c6a2b11c7 100644 --- a/libethcore/Farm.h +++ b/libethcore/Farm.h @@ -177,8 +177,8 @@ class Farm: public FarmFace } m_hashrateTimer.cancel(); - m_io_service.stop(); - m_serviceThread.join(); + //m_io_service.stop(); + //m_serviceThread.join(); m_lastProgresses.clear(); } @@ -446,7 +446,7 @@ class Farm: public FarmFace mutable SolutionStats m_solutionStats; std::chrono::steady_clock::time_point m_farm_launched = std::chrono::steady_clock::now(); - string m_pool_addresses; + string m_pool_addresses; uint64_t m_nonce_scrambler; wrap_nvml_handle *nvmlh = NULL; diff --git a/libpoolprotocols/PoolManager.cpp b/libpoolprotocols/PoolManager.cpp index c045d77a0c..a60525b62d 100644 --- a/libpoolprotocols/PoolManager.cpp +++ b/libpoolprotocols/PoolManager.cpp @@ -28,6 +28,20 @@ PoolManager::PoolManager(EthStratumClient::pointer client, Farm &farm, MinerType { m_connectionAttempt = 0; cnote << "Connected to " << m_connections[m_activeConnectionIdx].Host() << p_client->ActiveEndPoint(); + + if (!m_farm.isMining()) + { + cnote << "Spinning up miners..."; + if (m_minerType == MinerType::CL) + m_farm.start("opencl", false); + else if (m_minerType == MinerType::CUDA) + m_farm.start("cuda", false); + else if (m_minerType == MinerType::Mixed) { + m_farm.start("cuda", false); + m_farm.start("opencl", true); + } + } + }); p_client->onDisconnected([&]() @@ -36,7 +50,7 @@ PoolManager::PoolManager(EthStratumClient::pointer client, Farm &farm, MinerType if (m_farm.isMining()) { cnote << "Shutting down miners..."; - m_farm.stop(); + // m_farm.stop(); } }); @@ -55,19 +69,6 @@ PoolManager::PoolManager(EthStratumClient::pointer client, Farm &farm, MinerType cnote << "New pool difficulty:" << EthWhite << diffToDisplay(double(dividend / divisor)) << EthReset; } - if (!m_farm.isMining()) - { - cnote << "Spinning up miners..."; - if (m_minerType == MinerType::CL) - m_farm.start("opencl", false); - else if (m_minerType == MinerType::CUDA) - m_farm.start("cuda", false); - else if (m_minerType == MinerType::Mixed) { - m_farm.start("cuda", false); - m_farm.start("opencl", true); - } - } - m_farm.setWork(wp); }); diff --git a/libpoolprotocols/stratum/EthStratumClient.cpp b/libpoolprotocols/stratum/EthStratumClient.cpp index 4cbcefd052..7393591be0 100644 --- a/libpoolprotocols/stratum/EthStratumClient.cpp +++ b/libpoolprotocols/stratum/EthStratumClient.cpp @@ -45,6 +45,7 @@ EthStratumClient::EthStratumClient(int worktimeout, int responsetimeout, string m_responsetimeout(responsetimeout), m_io_work(m_io_service), m_io_work_timer(m_io_service), + m_io_strand(m_io_service), m_socket(nullptr), m_conntimer(m_io_service), m_worktimer(m_io_service), @@ -74,7 +75,7 @@ void EthStratumClient::connect() if (!m_serviceThread.joinable()) { m_io_work_timer.expires_from_now(boost::posix_time::seconds(60)); - m_io_work_timer.async_wait(boost::bind(&EthStratumClient::io_work_timer_handler, shared_from_this(), boost::asio::placeholders::error)); + m_io_work_timer.async_wait(m_io_strand.wrap(boost::bind(&EthStratumClient::io_work_timer_handler, shared_from_this(), boost::asio::placeholders::error))); // Start io service m_serviceThread = std::thread{ boost::bind(&boost::asio::io_service::run, &m_io_service) }; @@ -163,18 +164,15 @@ void EthStratumClient::connect() #endif // Begin resolve all ips associated to hostname - if (m_endpoints.empty()) { + // empty queue from any previous listed ip + // calling the resolver each time is useful as most + // load balancer will give Ips in different order + m_endpoints = std::queue>(); + m_resolver = tcp::resolver(m_io_service); + tcp::resolver::query q(m_conn.Host(), toString(m_conn.Port())); + m_resolver.async_resolve(q, + m_io_strand.wrap(boost::bind(&EthStratumClient::resolve_handler, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::iterator))); - tcp::resolver::query q(m_conn.Host(), toString(m_conn.Port())); - m_resolver.async_resolve(q, - boost::bind(&EthStratumClient::resolve_handler, - shared_from_this(), boost::asio::placeholders::error, - boost::asio::placeholders::iterator)); - - } - else { - start_connect(); - } @@ -192,9 +190,7 @@ void EthStratumClient::connect() //} - - - + } void EthStratumClient::disconnect() @@ -207,6 +203,9 @@ void EthStratumClient::disconnect() m_disconnecting.store(true, std::memory_order::memory_order_relaxed); } + // Cancel any outstanding async operation + m_socket->cancel(); + m_io_service.post([&] { m_conntimer.cancel(); m_worktimer.cancel(); @@ -224,11 +223,11 @@ void EthStratumClient::disconnect() if (m_conn.SecLevel() != SecureLevel::NONE) { cnote << "Initiated m_securesocket->async_shutdown"; - + // This will initiate the exchange of "close_notify" message among parties. // If both client and server are connected then we expect the handler with success // As there may be a connection issue we also endorse a timeout - m_securesocket->async_shutdown(boost::bind(&EthStratumClient::onSSLShutdownCompleted, shared_from_this(), boost::asio::placeholders::error)); + m_securesocket->async_shutdown(m_io_strand.wrap(boost::bind(&EthStratumClient::onSSLShutdownCompleted, shared_from_this(), boost::asio::placeholders::error))); m_conntimer.expires_from_now(boost::posix_time::seconds(m_responsetimeout)); m_conntimer.async_wait(boost::bind(&EthStratumClient::check_connect_timeout, shared_from_this(), boost::asio::placeholders::error)); @@ -302,9 +301,11 @@ void EthStratumClient::resolve_handler(const boost::system::error_code& ec, tcp: m_endpoints.push(i->endpoint()); i++; } + m_resolver.cancel(); - // Resolver has finished so begin connecting - start_connect(); + // Resolver has finished so invoke connection asynchronously + m_io_service.post(m_io_strand.wrap(boost::bind(&EthStratumClient::start_connect, shared_from_this()))); + } else @@ -321,8 +322,7 @@ void EthStratumClient::reset_work_timeout() { m_worktimer.cancel(); m_worktimer.expires_from_now(boost::posix_time::seconds(m_worktimeout)); - m_worktimer.async_wait(boost::bind(&EthStratumClient::work_timeout_handler, shared_from_this(), boost::asio::placeholders::error)); - //m_worktimer.async_wait(&work_timeout_handler); + m_worktimer.async_wait(m_io_strand.wrap(boost::bind(&EthStratumClient::work_timeout_handler, shared_from_this(), boost::asio::placeholders::error))); } @@ -339,15 +339,15 @@ void EthStratumClient::start_connect() cnote << ("Trying " + toString(m_endpoint) + " ..."); m_conntimer.expires_from_now(boost::posix_time::seconds(m_responsetimeout)); - m_conntimer.async_wait(boost::bind(&EthStratumClient::check_connect_timeout, shared_from_this(), boost::asio::placeholders::error)); + m_conntimer.async_wait(m_io_strand.wrap(boost::bind(&EthStratumClient::check_connect_timeout, shared_from_this(), boost::asio::placeholders::error))); // Start connecting async if (m_conn.SecLevel() != SecureLevel::NONE) { - m_securesocket->lowest_layer().async_connect(m_endpoint, boost::bind(&EthStratumClient::connect_handler, shared_from_this(), _1)); + m_securesocket->lowest_layer().async_connect(m_endpoint, m_io_strand.wrap(boost::bind(&EthStratumClient::connect_handler, shared_from_this(), _1))); } else { m_socket->async_connect(m_endpoint, - boost::bind(&EthStratumClient::connect_handler, shared_from_this(), _1)); + m_io_strand.wrap(boost::bind(&EthStratumClient::connect_handler, shared_from_this(), _1))); } @@ -357,7 +357,7 @@ void EthStratumClient::start_connect() dev::setThreadName("stratum"); m_connecting.store(false, std::memory_order_relaxed); - cwarn << "No ip addresses to try for host:" << m_conn.Host(); + cwarn << "No more ip addresses to try for host:" << m_conn.Host(); // Trigger handlers if (m_onDisconnected) { m_onDisconnected(); } @@ -365,21 +365,6 @@ void EthStratumClient::start_connect() } - //if (endpoint_iter != tcp::resolver::iterator()) { - - // cnote << ("Trying " + toString(endpoint_iter->endpoint()) + " ..."); - // - // m_conntimer.expires_from_now(boost::posix_time::seconds(m_responsetimeout)); - - // // Start connecting async - // m_socket->async_connect(endpoint_iter->endpoint(), - // boost::bind(&EthStratumClient::connect_handler, this, _1, endpoint_iter)); - - //} - //else { - - - //} } void EthStratumClient::check_connect_timeout(const boost::system::error_code& ec) @@ -416,7 +401,7 @@ void EthStratumClient::check_connect_timeout(const boost::system::error_code& ec m_conntimer.expires_at(boost::posix_time::pos_infin); } // Put the actor back to sleep. - m_conntimer.async_wait(boost::bind(&EthStratumClient::check_connect_timeout, shared_from_this(), boost::asio::placeholders::error)); + m_conntimer.async_wait(m_io_strand.wrap(boost::bind(&EthStratumClient::check_connect_timeout, shared_from_this(), boost::asio::placeholders::error))); } } @@ -433,7 +418,7 @@ void EthStratumClient::connect_handler(const boost::system::error_code& ec) cwarn << ("Error " + toString(m_endpoint) + " [Timeout]"); // Try the next available endpoint. - start_connect(); + m_io_service.post(m_io_strand.wrap(boost::bind(&EthStratumClient::start_connect, shared_from_this()))); } else if (ec) { @@ -445,7 +430,7 @@ void EthStratumClient::connect_handler(const boost::system::error_code& ec) m_socket->close(); // Try the next available endpoint. - start_connect(); + m_io_service.post(m_io_strand.wrap(boost::bind(&EthStratumClient::start_connect, shared_from_this()))); } else { @@ -458,6 +443,8 @@ void EthStratumClient::connect_handler(const boost::system::error_code& ec) if (m_conn.SecLevel() != SecureLevel::NONE) { boost::system::error_code hec; + m_securesocket->lowest_layer().set_option(boost::asio::socket_base::keep_alive(true)); + m_securesocket->lowest_layer().set_option(tcp::no_delay(true)); cnote << "Start handshake ..."; m_securesocket->handshake(boost::asio::ssl::stream_base::client, hec); @@ -482,10 +469,14 @@ void EthStratumClient::connect_handler(const boost::system::error_code& ec) // Disconnection is triggered on no more IP available m_connected.store(false, std::memory_order_relaxed); m_socket->close(); - start_connect(); + m_io_service.post(m_io_strand.wrap(boost::bind(&EthStratumClient::start_connect, shared_from_this()))); return; } } + else { + m_nonsecuresocket->set_option(boost::asio::socket_base::keep_alive(true)); + m_nonsecuresocket->set_option(tcp::no_delay(true)); + } // Here is where we're properly connected m_connected.store(true, std::memory_order_relaxed); @@ -1124,7 +1115,7 @@ void EthStratumClient::io_work_timer_handler(const boost::system::error_code& ec // This does absolutely nothing aside resubmitting timer // ensuring io_service's queue has always something to do m_io_work_timer.expires_from_now(boost::posix_time::seconds(60)); - m_io_work_timer.async_wait(boost::bind(&EthStratumClient::io_work_timer_handler, shared_from_this(), boost::asio::placeholders::error)); + m_io_work_timer.async_wait(m_io_strand.wrap(boost::bind(&EthStratumClient::io_work_timer_handler, shared_from_this(), boost::asio::placeholders::error))); } @@ -1188,7 +1179,7 @@ void EthStratumClient::submitSolution(Solution solution) { m_responsetimer.cancel(); m_responsetimer.expires_from_now(boost::posix_time::seconds(m_responsetimeout)); - m_responsetimer.async_wait(boost::bind(&EthStratumClient::response_timeout_handler, shared_from_this(), boost::asio::placeholders::error)); + m_responsetimer.async_wait(m_io_strand.wrap(boost::bind(&EthStratumClient::response_timeout_handler, shared_from_this(), boost::asio::placeholders::error))); Json::Value jReq; @@ -1242,14 +1233,12 @@ void EthStratumClient::recvSocketData() { if (m_conn.SecLevel() != SecureLevel::NONE) { async_read_until(*m_securesocket, m_recvBuffer, "\n", - boost::bind(&EthStratumClient::onRecvSocketDataCompleted, shared_from_this(), - boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); + m_io_strand.wrap(boost::bind(&EthStratumClient::onRecvSocketDataCompleted, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); } else { async_read_until(*m_nonsecuresocket, m_recvBuffer, "\n", - boost::bind(&EthStratumClient::onRecvSocketDataCompleted, shared_from_this(), - boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); + m_io_strand.wrap(boost::bind(&EthStratumClient::onRecvSocketDataCompleted, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); } } @@ -1278,7 +1267,7 @@ void EthStratumClient::onRecvSocketDataCompleted(const boost::system::error_code Json::Value jMsg; Json::Reader jRdr; if (jRdr.parse(message, jMsg)) { - processReponse(jMsg); + m_io_service.post(boost::bind(&EthStratumClient::processReponse, shared_from_this(), jMsg)); } else { cwarn << "Got invalid Json message :" + jRdr.getFormattedErrorMessages(); @@ -1329,13 +1318,13 @@ void EthStratumClient::sendSocketData(Json::Value const & jReq) { if (m_conn.SecLevel() != SecureLevel::NONE) { async_write(*m_securesocket, m_sendBuffer, - boost::bind(&EthStratumClient::onSendSocketDataCompleted, shared_from_this(), boost::asio::placeholders::error)); + m_io_strand.wrap(boost::bind(&EthStratumClient::onSendSocketDataCompleted, shared_from_this(), boost::asio::placeholders::error))); } else { async_write(*m_nonsecuresocket, m_sendBuffer, - boost::bind(&EthStratumClient::onSendSocketDataCompleted, shared_from_this(), boost::asio::placeholders::error)); + m_io_strand.wrap(boost::bind(&EthStratumClient::onSendSocketDataCompleted, shared_from_this(), boost::asio::placeholders::error))); } @@ -1362,7 +1351,8 @@ void EthStratumClient::onSendSocketDataCompleted(const boost::system::error_code void EthStratumClient::onSSLShutdownCompleted(const boost::system::error_code& ec) { cnote << "onSSLShutdownCompleted Error code is : " << ec.message(); - disconnect_finalize(); + m_io_service.post(boost::bind(&EthStratumClient::disconnect_finalize, shared_from_this())); + //if (ec == boost::asio::error::operation_aborted) { diff --git a/libpoolprotocols/stratum/EthStratumClient.h b/libpoolprotocols/stratum/EthStratumClient.h index ebce8704a2..84086d85c7 100644 --- a/libpoolprotocols/stratum/EthStratumClient.h +++ b/libpoolprotocols/stratum/EthStratumClient.h @@ -98,14 +98,13 @@ class EthStratumClient : public PoolClient, public boost::enable_shared_from_thi boost::asio::io_service m_io_service; // The IO service itself boost::asio::io_service::work m_io_work; // The IO work which prevents io_service.run() to return on no work thus terminating thread boost::asio::deadline_timer m_io_work_timer; // A dummy timer to keep io_service with something to do + boost::asio::io_service::strand m_io_strand; boost::asio::ip::tcp::socket *m_socket; // Use shared ptrs to avoid crashes due to async_writes // see https://stackoverflow.com/questions/41526553/can-async-write-cause-segmentation-fault-when-this-is-deleted - std::shared_ptr > - m_securesocket; - std::shared_ptr - m_nonsecuresocket; + std::shared_ptr> m_securesocket; + std::shared_ptr m_nonsecuresocket; boost::asio::streambuf m_sendBuffer; boost::asio::streambuf m_recvBuffer; From 11133f76e97b5dc372b2b2527c05c02308b72ec8 Mon Sep 17 00:00:00 2001 From: AndreaLanfranchi Date: Tue, 22 May 2018 13:39:45 +0200 Subject: [PATCH 04/13] Reconnect release candidate * Made boost's io_service global * No stops of io_service which prevent restart of resolver or sockets * On disconnects this behavior is implemented : mining is stopped only if switching pools (failovers) otherwise it keeps mining trying a fast reconnect on same pool * SSL and plain TCP reconnect properly on Linux Feed back needed on Windows. --- ethminer/MinerAux.h | 132 ++++++++++++----- libethcore/Farm.h | 29 ++-- libpoolprotocols/PoolManager.cpp | 61 ++++++-- libpoolprotocols/PoolManager.h | 10 +- libpoolprotocols/stratum/EthStratumClient.cpp | 139 +++++------------- libpoolprotocols/stratum/EthStratumClient.h | 19 +-- 6 files changed, 204 insertions(+), 186 deletions(-) diff --git a/ethminer/MinerAux.h b/ethminer/MinerAux.h index 6108a1755b..a3dc6391c3 100644 --- a/ethminer/MinerAux.h +++ b/ethminer/MinerAux.h @@ -32,6 +32,7 @@ #include #include #include +#include #include #include @@ -92,6 +93,47 @@ class MinerCLI Stratum }; + MinerCLI() : + m_io_work(m_io_service), + m_io_work_timer(m_io_service), + m_io_strand(m_io_service) + { + // Post first deadline timer to give io_service + // initial work + m_io_work_timer.expires_from_now(boost::posix_time::seconds(60)); + m_io_work_timer.async_wait(m_io_strand.wrap(boost::bind(&MinerCLI::io_work_timer_handler, this, boost::asio::placeholders::error))); + + // Start io_service in it's own thread + m_io_thread = std::thread{ boost::bind(&boost::asio::io_service::run, &m_io_service) }; + + // Io service is now live and running + // All components using io_service should post to reference of m_io_service + // and should not start/stop or even join threads (which heavily time consuming) + + + } + + void io_work_timer_handler(const boost::system::error_code& ec) { + + if (!ec) { + + // This does absolutely nothing aside resubmitting timer + // ensuring io_service's queue has always something to do + m_io_work_timer.expires_from_now(boost::posix_time::seconds(120)); + m_io_work_timer.async_wait(m_io_strand.wrap(boost::bind(&MinerCLI::io_work_timer_handler, this, boost::asio::placeholders::error))); + + } + + } + + void stop_io_service() { + + // Here we stop all io_service's related activities + m_io_service.stop(); + m_io_thread.join(); + + } + static void signalHandler(int sig) { (void)sig; @@ -542,6 +584,7 @@ class MinerCLI if (m_minerType == MinerType::CUDA || m_minerType == MinerType::Mixed) CUDAMiner::listDevices(); #endif + stop_io_service(); exit(0); } @@ -562,16 +605,19 @@ class MinerCLI CLMiner::setThreadsPerHash(m_openclThreadsPerHash); if (!CLMiner::configureGPU( - m_localWorkSize, - m_globalWorkSizeMultiplier, - m_openclPlatform, - 0, - m_dagLoadMode, - m_dagCreateDevice, - m_noEval, - m_exit - )) + m_localWorkSize, + m_globalWorkSizeMultiplier, + m_openclPlatform, + 0, + m_dagLoadMode, + m_dagCreateDevice, + m_noEval, + m_exit + )) { + stop_io_service(); exit(1); + }; + CLMiner::setNumInstances(m_miningThreads); #else cerr << "Selected GPU mining without having compiled with -DETHASHCL=1" << endl; @@ -597,12 +643,16 @@ class MinerCLI m_dagCreateDevice, m_noEval, m_exit - )) + )) + { + stop_io_service(); exit(1); + } CUDAMiner::setParallelHash(m_parallelHash); #else cerr << "CUDA support disabled. Configure project build with -DETHASHCUDA=ON" << endl; + stop_io_service(); exit(1); #endif } @@ -706,7 +756,7 @@ class MinerCLI genesis.setNumber(m_benchmarkBlock); genesis.setDifficulty(u256(1) << 64); - Farm f; + Farm f(m_io_service); map sealers; #if ETH_ETHASHCL sealers["opencl"] = Farm::SealerDescriptor{ @@ -769,7 +819,7 @@ class MinerCLI } else cout << "inner mean: n/a" << endl; - + stop_io_service(); exit(0); } @@ -783,32 +833,33 @@ class MinerCLI sealers["cuda"] = Farm::SealerDescriptor{&CUDAMiner::instances, [](FarmFace& _farm, unsigned _index){ return new CUDAMiner(_farm, _index); }}; #endif - EthStratumClient::pointer client = EthStratumClient::create(m_worktimeout, m_responsetimeout, m_email, m_report_stratum_hashrate); - - //PoolClient *client = nullptr; - - //if (m_mode == OperationMode::Stratum) { - // // client = new EthStratumClient(m_worktimeout, m_responsetimeout, m_email, m_report_stratum_hashrate); - //} - //else if (m_mode == OperationMode::Farm) { - // client = new EthGetworkClient(m_farmRecheckPeriod); - //} - //else if (m_mode == OperationMode::Simulation) { - // client = new SimulateClient(20, m_benchmarkBlock); - //} - //else { - // cwarn << "Invalid OperationMode"; - // exit(1); - //} - - //// Should not happen! - //if (!client) { - // cwarn << "Invalid PoolClient"; - // exit(1); - //} + //EthStratumClient::pointer client = EthStratumClient::create(m_worktimeout, m_responsetimeout, m_email, m_report_stratum_hashrate); + + PoolClient *client = nullptr; + + if (m_mode == OperationMode::Stratum) { + client = new EthStratumClient(m_io_service, m_worktimeout, m_responsetimeout, m_email, m_report_stratum_hashrate); + } + else if (m_mode == OperationMode::Farm) { + client = new EthGetworkClient(m_farmRecheckPeriod); + } + else if (m_mode == OperationMode::Simulation) { + client = new SimulateClient(20, m_benchmarkBlock); + } + else { + cwarn << "Invalid OperationMode"; + exit(1); + } + + // Should not happen! + if (!client) { + cwarn << "Invalid PoolClient"; + stop_io_service(); + exit(1); + } //sealers, m_minerType - Farm f; + Farm f(m_io_service); f.setSealers(sealers); PoolManager mgr(client, f, m_minerType, m_maxFarmRetries); @@ -849,6 +900,8 @@ class MinerCLI } mgr.stop(); + stop_io_service(); + cnote << "Terminated !"; exit(0); } @@ -856,6 +909,13 @@ class MinerCLI /// Operating mode. OperationMode m_mode = OperationMode::None; + /// Global boost's io_service + std::thread m_io_thread; // The IO service thread + boost::asio::io_service m_io_service; // The IO service itself + boost::asio::io_service::work m_io_work; // The IO work which prevents io_service.run() to return on no work thus terminating thread + boost::asio::deadline_timer m_io_work_timer; // A dummy timer to keep io_service with something to do and prevent io shutdown + boost::asio::io_service::strand m_io_strand; // A strand to serialize posts in multithreaded environment + /// Mining options MinerType m_minerType = MinerType::Mixed; unsigned m_openclPlatform = 0; diff --git a/libethcore/Farm.h b/libethcore/Farm.h index 7c6a2b11c7..a2225462c5 100644 --- a/libethcore/Farm.h +++ b/libethcore/Farm.h @@ -56,7 +56,10 @@ class Farm: public FarmFace std::function create; }; - Farm(): m_hashrateTimer(m_io_service) + Farm(boost::asio::io_service & io_service): + m_io_service(io_service), + m_io_strand(io_service), + m_hashrateTimer(io_service) { // Given that all nonces are equally likely to solve the problem // we could reasonably always start the nonce search ranges @@ -153,14 +156,14 @@ class Farm: public FarmFace // Start hashrate collector m_hashrateTimer.cancel(); m_hashrateTimer.expires_from_now(boost::posix_time::milliseconds(1000)); - m_hashrateTimer.async_wait(boost::bind(&Farm::processHashRate, this, boost::asio::placeholders::error)); + m_hashrateTimer.async_wait(m_io_strand.wrap(boost::bind(&Farm::processHashRate, this, boost::asio::placeholders::error))); - if (m_serviceThread.joinable()) { - m_io_service.reset(); - m_serviceThread.join(); - } + //if (m_serviceThread.joinable()) { + // m_io_service.reset(); + // m_serviceThread.join(); + //} - m_serviceThread = std::thread{ boost::bind(&boost::asio::io_service::run, &m_io_service) }; + //m_serviceThread = std::thread{ boost::bind(&boost::asio::io_service::run, &m_io_service) }; return true; } @@ -177,8 +180,6 @@ class Farm: public FarmFace } m_hashrateTimer.cancel(); - //m_io_service.stop(); - //m_serviceThread.join(); m_lastProgresses.clear(); } @@ -220,9 +221,8 @@ class Farm: public FarmFace collectHashRate(); // Restart timer - m_hashrateTimer.cancel(); m_hashrateTimer.expires_from_now(boost::posix_time::milliseconds(1000)); - m_hashrateTimer.async_wait(boost::bind(&Farm::processHashRate, this, boost::asio::placeholders::error)); + m_hashrateTimer.async_wait(m_io_strand.wrap(boost::bind(&Farm::processHashRate, this, boost::asio::placeholders::error))); } } @@ -438,8 +438,11 @@ class Farm: public FarmFace std::chrono::steady_clock::time_point m_lastStart; uint64_t m_hashrateSmoothInterval = 10000; - std::thread m_serviceThread; ///< The IO service thread. - boost::asio::io_service m_io_service; + + // std::thread m_serviceThread; ///< The IO service thread. + + boost::asio::io_service & m_io_service; // The IO service reference passed in the constructor + boost::asio::io_service::strand m_io_strand; boost::asio::deadline_timer m_hashrateTimer; std::vector m_lastProgresses; diff --git a/libpoolprotocols/PoolManager.cpp b/libpoolprotocols/PoolManager.cpp index a60525b62d..8897b93e8d 100644 --- a/libpoolprotocols/PoolManager.cpp +++ b/libpoolprotocols/PoolManager.cpp @@ -19,7 +19,7 @@ static string diffToDisplay(double diff) return ss.str(); } -PoolManager::PoolManager(EthStratumClient::pointer client, Farm &farm, MinerType const & minerType, unsigned maxTries) : m_farm(farm), m_minerType(minerType) +PoolManager::PoolManager(PoolClient * client, Farm &farm, MinerType const & minerType, unsigned maxTries) : m_farm(farm), m_minerType(minerType) { p_client = client; m_maxConnectionAttempts = maxTries; @@ -48,10 +48,14 @@ PoolManager::PoolManager(EthStratumClient::pointer client, Farm &farm, MinerType { cnote << "Disconnected from " + m_connections[m_activeConnectionIdx].Host() << p_client->ActiveEndPoint(); - if (m_farm.isMining()) { - cnote << "Shutting down miners..."; - // m_farm.stop(); - } + // Do not stop mining here + // Workloop will determine if we're trying a fast reconnect to same pool + // or if we're switching to failover(s) + + //if (m_farm.isMining()) { + // cnote << "Shutting down miners..."; + // m_farm.stop(); + //} }); @@ -145,15 +149,28 @@ PoolManager::PoolManager(EthStratumClient::pointer client, Farm &farm, MinerType void PoolManager::stop() { if (m_running.load(std::memory_order_relaxed)) { + cnote << "Shutting down..."; + m_running.store(false, std::memory_order_relaxed); - if (p_client->isConnected()) { p_client->disconnect(); } + if (p_client->isConnected()) + { + + // Disconnection will also trigger + // stop for mining + p_client->disconnect(); - if (m_farm.isMining()) - { - cnote << "Shutting down miners..."; - m_farm.stop(); } + else { + + if (m_farm.isMining()) + { + cnote << "Shutting down miners..."; + m_farm.stop(); + } + + } + } } @@ -171,12 +188,29 @@ void PoolManager::workLoop() // Rotate connections if above max attempts threshold if (m_connectionAttempt >= m_maxConnectionAttempts) { + unsigned lastConnectionIdx = m_activeConnectionIdx; + m_connectionAttempt = 0; m_activeConnectionIdx++; if (m_activeConnectionIdx == m_connections.size()) { m_activeConnectionIdx = 0; } + if (lastConnectionIdx != m_activeConnectionIdx) { + + // Stop mining if applicable as we're switching + if (m_farm.isMining()) { + cnote << "Shutting down miners..."; + m_farm.stop(); + } + + // Give some time to mining threads to shutdown + for (auto i = 4; --i; this_thread::sleep_for(chrono::seconds(1))) { + cnote << "Retrying in " << i << "... \r"; + } + } + + } if (m_connections[m_activeConnectionIdx].Host() != "exit") { @@ -195,6 +229,13 @@ void PoolManager::workLoop() dev::setThreadName("main"); cnote << "No more failover connections."; + + // Stop mining if applicable + if (m_farm.isMining()) { + cnote << "Shutting down miners..."; + m_farm.stop(); + } + m_running.store(false, std::memory_order_relaxed); continue; } diff --git a/libpoolprotocols/PoolManager.h b/libpoolprotocols/PoolManager.h index 96cb446624..b92b702cb5 100644 --- a/libpoolprotocols/PoolManager.h +++ b/libpoolprotocols/PoolManager.h @@ -6,11 +6,6 @@ #include #include "PoolClient.h" -#include "stratum/EthStratumClient.h" -#include "getwork/EthGetworkClient.h" -#include "testing/SimulateClient.h" - - #if ETH_DBUS #include "DBusInt.h" @@ -25,7 +20,7 @@ namespace dev class PoolManager { public: - PoolManager(EthStratumClient::pointer client, Farm &farm, MinerType const & minerType, unsigned maxTries); + PoolManager(PoolClient * client, Farm &farm, MinerType const & minerType, unsigned maxTries); void addConnection(URI &conn); void clearConnections(); void start(); @@ -49,8 +44,7 @@ namespace dev h256 m_lastBoundary = h256(); - EthStratumClient::pointer p_client; - //PoolClient *p_client; + PoolClient *p_client; Farm &m_farm; MinerType m_minerType; std::chrono::steady_clock::time_point m_submit_time; diff --git a/libpoolprotocols/stratum/EthStratumClient.cpp b/libpoolprotocols/stratum/EthStratumClient.cpp index 7393591be0..40907b0c4a 100644 --- a/libpoolprotocols/stratum/EthStratumClient.cpp +++ b/libpoolprotocols/stratum/EthStratumClient.cpp @@ -40,17 +40,16 @@ static void diffToTarget(uint32_t *target, double diff) } -EthStratumClient::EthStratumClient(int worktimeout, int responsetimeout, string const & email, bool const & submitHashrate) : PoolClient(), +EthStratumClient::EthStratumClient(boost::asio::io_service & io_service, int worktimeout, int responsetimeout, string const & email, bool const & submitHashrate) : PoolClient(), m_worktimeout(worktimeout), m_responsetimeout(responsetimeout), - m_io_work(m_io_service), - m_io_work_timer(m_io_service), - m_io_strand(m_io_service), + m_io_service(io_service), + m_io_strand(io_service), m_socket(nullptr), - m_conntimer(m_io_service), - m_worktimer(m_io_service), - m_responsetimer(m_io_service), - m_resolver(m_io_service), + m_conntimer(io_service), + m_worktimer(io_service), + m_responsetimer(io_service), + m_resolver(io_service), m_endpoints(), m_email(email), m_submit_hashrate(submitHashrate) @@ -63,25 +62,13 @@ EthStratumClient::EthStratumClient(int worktimeout, int responsetimeout, string EthStratumClient::~EthStratumClient() { - m_io_work_timer.cancel(); - m_io_service.stop(); - m_serviceThread.join(); + // Do not stop io service. + // It's global } void EthStratumClient::connect() { - // Start service thread immediately - if (!m_serviceThread.joinable()) { - - m_io_work_timer.expires_from_now(boost::posix_time::seconds(60)); - m_io_work_timer.async_wait(m_io_strand.wrap(boost::bind(&EthStratumClient::io_work_timer_handler, shared_from_this(), boost::asio::placeholders::error))); - - // Start io service - m_serviceThread = std::thread{ boost::bind(&boost::asio::io_service::run, &m_io_service) }; - - } - // Prevent unnecessary and potentially dangerous recursion if (m_connecting.load(std::memory_order::memory_order_relaxed)) { return; @@ -90,7 +77,6 @@ void EthStratumClient::connect() m_connecting.store(true, std::memory_order::memory_order_relaxed); } - m_connected.store(false, std::memory_order_relaxed); m_subscribed.store(false, std::memory_order_relaxed); m_authorized.store(false, std::memory_order_relaxed); @@ -171,25 +157,7 @@ void EthStratumClient::connect() m_resolver = tcp::resolver(m_io_service); tcp::resolver::query q(m_conn.Host(), toString(m_conn.Port())); m_resolver.async_resolve(q, - m_io_strand.wrap(boost::bind(&EthStratumClient::resolve_handler, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::iterator))); - - - - - //// IMPORTANT !! - //if (m_serviceThread.joinable()) - //{ - // // If the service thread have been created try to reset the service. - // //m_serviceThread.join(); - // //m_io_service.rese(); - //} - //else { - - // // Start io service - // m_serviceThread = std::thread{ boost::bind(&boost::asio::io_service::run, &m_io_service) }; - - //} - + m_io_strand.wrap(boost::bind(&EthStratumClient::resolve_handler, this, boost::asio::placeholders::error, boost::asio::placeholders::iterator))); } @@ -222,15 +190,13 @@ void EthStratumClient::disconnect() if (m_conn.SecLevel() != SecureLevel::NONE) { - cnote << "Initiated m_securesocket->async_shutdown"; - // This will initiate the exchange of "close_notify" message among parties. // If both client and server are connected then we expect the handler with success // As there may be a connection issue we also endorse a timeout - m_securesocket->async_shutdown(m_io_strand.wrap(boost::bind(&EthStratumClient::onSSLShutdownCompleted, shared_from_this(), boost::asio::placeholders::error))); + m_securesocket->async_shutdown(m_io_strand.wrap(boost::bind(&EthStratumClient::onSSLShutdownCompleted, this, boost::asio::placeholders::error))); m_conntimer.expires_from_now(boost::posix_time::seconds(m_responsetimeout)); - m_conntimer.async_wait(boost::bind(&EthStratumClient::check_connect_timeout, shared_from_this(), boost::asio::placeholders::error)); + m_conntimer.async_wait(boost::bind(&EthStratumClient::check_connect_timeout, this, boost::asio::placeholders::error)); // Rest of disconnection is performed asynchronously @@ -258,14 +224,12 @@ void EthStratumClient::disconnect() void EthStratumClient::disconnect_finalize() { - cnote << "Entered disconnect_finalize()"; - if (m_conn.SecLevel() != SecureLevel::NONE) { - m_securesocket->lowest_layer().shutdown(boost::asio::ip::tcp::socket::shutdown_both); - m_securesocket->lowest_layer().close(); - m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both); - m_socket->close(); + if (m_securesocket->lowest_layer().is_open()) { + m_securesocket->lowest_layer().shutdown(boost::asio::ip::tcp::socket::shutdown_both); + m_securesocket->lowest_layer().close(); + } m_securesocket = nullptr; m_socket = nullptr; @@ -304,7 +268,7 @@ void EthStratumClient::resolve_handler(const boost::system::error_code& ec, tcp: m_resolver.cancel(); // Resolver has finished so invoke connection asynchronously - m_io_service.post(m_io_strand.wrap(boost::bind(&EthStratumClient::start_connect, shared_from_this()))); + m_io_service.post(m_io_strand.wrap(boost::bind(&EthStratumClient::start_connect, this))); } @@ -322,7 +286,7 @@ void EthStratumClient::reset_work_timeout() { m_worktimer.cancel(); m_worktimer.expires_from_now(boost::posix_time::seconds(m_worktimeout)); - m_worktimer.async_wait(m_io_strand.wrap(boost::bind(&EthStratumClient::work_timeout_handler, shared_from_this(), boost::asio::placeholders::error))); + m_worktimer.async_wait(m_io_strand.wrap(boost::bind(&EthStratumClient::work_timeout_handler, this, boost::asio::placeholders::error))); } @@ -339,15 +303,15 @@ void EthStratumClient::start_connect() cnote << ("Trying " + toString(m_endpoint) + " ..."); m_conntimer.expires_from_now(boost::posix_time::seconds(m_responsetimeout)); - m_conntimer.async_wait(m_io_strand.wrap(boost::bind(&EthStratumClient::check_connect_timeout, shared_from_this(), boost::asio::placeholders::error))); + m_conntimer.async_wait(m_io_strand.wrap(boost::bind(&EthStratumClient::check_connect_timeout, this, boost::asio::placeholders::error))); // Start connecting async if (m_conn.SecLevel() != SecureLevel::NONE) { - m_securesocket->lowest_layer().async_connect(m_endpoint, m_io_strand.wrap(boost::bind(&EthStratumClient::connect_handler, shared_from_this(), _1))); + m_securesocket->lowest_layer().async_connect(m_endpoint, m_io_strand.wrap(boost::bind(&EthStratumClient::connect_handler, this, _1))); } else { m_socket->async_connect(m_endpoint, - m_io_strand.wrap(boost::bind(&EthStratumClient::connect_handler, shared_from_this(), _1))); + m_io_strand.wrap(boost::bind(&EthStratumClient::connect_handler, this, _1))); } @@ -401,7 +365,7 @@ void EthStratumClient::check_connect_timeout(const boost::system::error_code& ec m_conntimer.expires_at(boost::posix_time::pos_infin); } // Put the actor back to sleep. - m_conntimer.async_wait(m_io_strand.wrap(boost::bind(&EthStratumClient::check_connect_timeout, shared_from_this(), boost::asio::placeholders::error))); + m_conntimer.async_wait(m_io_strand.wrap(boost::bind(&EthStratumClient::check_connect_timeout, this, boost::asio::placeholders::error))); } } @@ -418,7 +382,7 @@ void EthStratumClient::connect_handler(const boost::system::error_code& ec) cwarn << ("Error " + toString(m_endpoint) + " [Timeout]"); // Try the next available endpoint. - m_io_service.post(m_io_strand.wrap(boost::bind(&EthStratumClient::start_connect, shared_from_this()))); + m_io_service.post(m_io_strand.wrap(boost::bind(&EthStratumClient::start_connect, this))); } else if (ec) { @@ -430,7 +394,7 @@ void EthStratumClient::connect_handler(const boost::system::error_code& ec) m_socket->close(); // Try the next available endpoint. - m_io_service.post(m_io_strand.wrap(boost::bind(&EthStratumClient::start_connect, shared_from_this()))); + m_io_service.post(m_io_strand.wrap(boost::bind(&EthStratumClient::start_connect, this))); } else { @@ -446,9 +410,7 @@ void EthStratumClient::connect_handler(const boost::system::error_code& ec) m_securesocket->lowest_layer().set_option(boost::asio::socket_base::keep_alive(true)); m_securesocket->lowest_layer().set_option(tcp::no_delay(true)); - cnote << "Start handshake ..."; m_securesocket->handshake(boost::asio::ssl::stream_base::client, hec); - cnote << "End handshake"; if (hec) { cwarn << "SSL/TLS Handshake failed: " << hec.message(); @@ -469,7 +431,7 @@ void EthStratumClient::connect_handler(const boost::system::error_code& ec) // Disconnection is triggered on no more IP available m_connected.store(false, std::memory_order_relaxed); m_socket->close(); - m_io_service.post(m_io_strand.wrap(boost::bind(&EthStratumClient::start_connect, shared_from_this()))); + m_io_service.post(m_io_strand.wrap(boost::bind(&EthStratumClient::start_connect, this))); return; } } @@ -1108,20 +1070,6 @@ void EthStratumClient::processReponse(Json::Value& responseObject) } -void EthStratumClient::io_work_timer_handler(const boost::system::error_code& ec) { - - if (!ec) { - - // This does absolutely nothing aside resubmitting timer - // ensuring io_service's queue has always something to do - m_io_work_timer.expires_from_now(boost::posix_time::seconds(60)); - m_io_work_timer.async_wait(m_io_strand.wrap(boost::bind(&EthStratumClient::io_work_timer_handler, shared_from_this(), boost::asio::placeholders::error))); - - } - -} - - void EthStratumClient::work_timeout_handler(const boost::system::error_code& ec) { if (!ec) { @@ -1179,7 +1127,7 @@ void EthStratumClient::submitSolution(Solution solution) { m_responsetimer.cancel(); m_responsetimer.expires_from_now(boost::posix_time::seconds(m_responsetimeout)); - m_responsetimer.async_wait(m_io_strand.wrap(boost::bind(&EthStratumClient::response_timeout_handler, shared_from_this(), boost::asio::placeholders::error))); + m_responsetimer.async_wait(m_io_strand.wrap(boost::bind(&EthStratumClient::response_timeout_handler, this, boost::asio::placeholders::error))); Json::Value jReq; @@ -1233,12 +1181,12 @@ void EthStratumClient::recvSocketData() { if (m_conn.SecLevel() != SecureLevel::NONE) { async_read_until(*m_securesocket, m_recvBuffer, "\n", - m_io_strand.wrap(boost::bind(&EthStratumClient::onRecvSocketDataCompleted, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); + m_io_strand.wrap(boost::bind(&EthStratumClient::onRecvSocketDataCompleted, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); } else { async_read_until(*m_nonsecuresocket, m_recvBuffer, "\n", - m_io_strand.wrap(boost::bind(&EthStratumClient::onRecvSocketDataCompleted, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); + m_io_strand.wrap(boost::bind(&EthStratumClient::onRecvSocketDataCompleted, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); } } @@ -1267,7 +1215,7 @@ void EthStratumClient::onRecvSocketDataCompleted(const boost::system::error_code Json::Value jMsg; Json::Reader jRdr; if (jRdr.parse(message, jMsg)) { - m_io_service.post(boost::bind(&EthStratumClient::processReponse, shared_from_this(), jMsg)); + m_io_service.post(boost::bind(&EthStratumClient::processReponse, this, jMsg)); } else { cwarn << "Got invalid Json message :" + jRdr.getFormattedErrorMessages(); @@ -1318,13 +1266,13 @@ void EthStratumClient::sendSocketData(Json::Value const & jReq) { if (m_conn.SecLevel() != SecureLevel::NONE) { async_write(*m_securesocket, m_sendBuffer, - m_io_strand.wrap(boost::bind(&EthStratumClient::onSendSocketDataCompleted, shared_from_this(), boost::asio::placeholders::error))); + m_io_strand.wrap(boost::bind(&EthStratumClient::onSendSocketDataCompleted, this, boost::asio::placeholders::error))); } else { async_write(*m_nonsecuresocket, m_sendBuffer, - m_io_strand.wrap(boost::bind(&EthStratumClient::onSendSocketDataCompleted, shared_from_this(), boost::asio::placeholders::error))); + m_io_strand.wrap(boost::bind(&EthStratumClient::onSendSocketDataCompleted, this, boost::asio::placeholders::error))); } @@ -1335,8 +1283,8 @@ void EthStratumClient::onSendSocketDataCompleted(const boost::system::error_code if (ec) { if ((ec.category() == boost::asio::error::get_ssl_category()) && (SSL_R_PROTOCOL_IS_SHUTDOWN == ERR_GET_REASON(ec.value()))) { - cnote << "onSendSocketDataCompleted. Error code is " << ec.message(); - m_securesocket->lowest_layer().close(); + cnote << "SSL Stream error :" << ec.message(); + disconnect(); } if (isConnected()) { @@ -1350,23 +1298,8 @@ void EthStratumClient::onSendSocketDataCompleted(const boost::system::error_code void EthStratumClient::onSSLShutdownCompleted(const boost::system::error_code& ec) { - cnote << "onSSLShutdownCompleted Error code is : " << ec.message(); - m_io_service.post(boost::bind(&EthStratumClient::disconnect_finalize, shared_from_this())); + (void)ec; + // cnote << "onSSLShutdownCompleted Error code is : " << ec.message(); + m_io_service.post(m_io_strand.wrap(boost::bind(&EthStratumClient::disconnect_finalize, this))); - - //if (ec == boost::asio::error::operation_aborted) { - - // // Timeout has triggered before - // // We can only close the transport - // disconnect_finalize(); - //} - //else { - - //} - // - // (void)ec; - //m_securesocket->lowest_layer().shutdown(tcp::socket::shutdown_both); - //m_securesocket->lowest_layer().close(); - - } \ No newline at end of file diff --git a/libpoolprotocols/stratum/EthStratumClient.h b/libpoolprotocols/stratum/EthStratumClient.h index 84086d85c7..a90ce8da88 100644 --- a/libpoolprotocols/stratum/EthStratumClient.h +++ b/libpoolprotocols/stratum/EthStratumClient.h @@ -4,8 +4,6 @@ #include #include #include -#include -#include #include #include #include @@ -20,20 +18,13 @@ using namespace std; using namespace dev; using namespace dev::eth; -class EthStratumClient : public PoolClient, public boost::enable_shared_from_this +class EthStratumClient : public PoolClient { public: - typedef boost::shared_ptr pointer; - - static pointer create(int worktimeout, int responsetimeout, string const & email, bool const & submitHashrate) - { - return pointer(new EthStratumClient(worktimeout, responsetimeout, email, submitHashrate)); - }; - typedef enum { STRATUM = 0, ETHPROXY, ETHEREUMSTRATUM } StratumProtocol; - EthStratumClient(int worktimeout, int responsetimeout, string const & email, bool const & submitHashrate); + EthStratumClient(boost::asio::io_service & io_service, int worktimeout, int responsetimeout, string const & email, bool const & submitHashrate); ~EthStratumClient(); void connect(); @@ -61,7 +52,6 @@ class EthStratumClient : public PoolClient, public boost::enable_shared_from_thi void start_connect(); void check_connect_timeout(const boost::system::error_code& ec); void connect_handler(const boost::system::error_code& ec); - void io_work_timer_handler(const boost::system::error_code& ec); void work_timeout_handler(const boost::system::error_code& ec); void response_timeout_handler(const boost::system::error_code& ec); @@ -94,10 +84,7 @@ class EthStratumClient : public PoolClient, public boost::enable_shared_from_thi bool m_stale = false; - std::thread m_serviceThread; // The IO service thread. - boost::asio::io_service m_io_service; // The IO service itself - boost::asio::io_service::work m_io_work; // The IO work which prevents io_service.run() to return on no work thus terminating thread - boost::asio::deadline_timer m_io_work_timer; // A dummy timer to keep io_service with something to do + boost::asio::io_service & m_io_service; // The IO service reference passed in the constructor boost::asio::io_service::strand m_io_strand; boost::asio::ip::tcp::socket *m_socket; From a713f035d70f5b75419274165cc50d1b61e7a8cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Bylica?= Date: Tue, 22 May 2018 14:30:19 +0200 Subject: [PATCH 05/13] Cleanup PoolClient --- libethcore/Farm.h | 2 -- libpoolprotocols/PoolClient.h | 3 +++ libpoolprotocols/stratum/EthStratumClient.h | 10 +++++----- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/libethcore/Farm.h b/libethcore/Farm.h index a2225462c5..405799212b 100644 --- a/libethcore/Farm.h +++ b/libethcore/Farm.h @@ -57,7 +57,6 @@ class Farm: public FarmFace }; Farm(boost::asio::io_service & io_service): - m_io_service(io_service), m_io_strand(io_service), m_hashrateTimer(io_service) { @@ -441,7 +440,6 @@ class Farm: public FarmFace // std::thread m_serviceThread; ///< The IO service thread. - boost::asio::io_service & m_io_service; // The IO service reference passed in the constructor boost::asio::io_service::strand m_io_strand; boost::asio::deadline_timer m_hashrateTimer; std::vector m_lastProgresses; diff --git a/libpoolprotocols/PoolClient.h b/libpoolprotocols/PoolClient.h index 1fd230b089..6809525a97 100644 --- a/libpoolprotocols/PoolClient.h +++ b/libpoolprotocols/PoolClient.h @@ -16,6 +16,9 @@ namespace dev class PoolClient { public: + virtual ~PoolClient() noexcept = default; + + void setConnection(URI &conn) { m_conn = conn; diff --git a/libpoolprotocols/stratum/EthStratumClient.h b/libpoolprotocols/stratum/EthStratumClient.h index a90ce8da88..3fa0a6636a 100644 --- a/libpoolprotocols/stratum/EthStratumClient.h +++ b/libpoolprotocols/stratum/EthStratumClient.h @@ -27,8 +27,8 @@ class EthStratumClient : public PoolClient EthStratumClient(boost::asio::io_service & io_service, int worktimeout, int responsetimeout, string const & email, bool const & submitHashrate); ~EthStratumClient(); - void connect(); - void disconnect(); + void connect() override; + void disconnect() override; // Connected and Connection Statuses bool isConnected() override { return m_connected.load(std::memory_order_relaxed) && !isPendingState(); } @@ -36,10 +36,10 @@ class EthStratumClient : public PoolClient bool isSubscribed() { return m_subscribed.load(std::memory_order_relaxed); } bool isAuthorized() { return m_authorized.load(std::memory_order_relaxed); } - string ActiveEndPoint() { return " [" + toString(m_endpoint) + "]"; }; + string ActiveEndPoint() override { return " [" + toString(m_endpoint) + "]"; }; - void submitHashrate(string const & rate); - void submitSolution(Solution solution); + void submitHashrate(string const & rate) override; + void submitSolution(Solution solution) override; h256 currentHeaderHash() { return m_current.header; } bool current() { return static_cast(m_current); } From f69078eaaf1fccb0ed39e46a9ec2d936a475cf76 Mon Sep 17 00:00:00 2001 From: AndreaLanfranchi Date: Tue, 22 May 2018 19:20:23 +0200 Subject: [PATCH 06/13] Reset connection pending state on resolver failure. --- libpoolprotocols/stratum/EthStratumClient.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/libpoolprotocols/stratum/EthStratumClient.cpp b/libpoolprotocols/stratum/EthStratumClient.cpp index 40907b0c4a..090af89ae9 100644 --- a/libpoolprotocols/stratum/EthStratumClient.cpp +++ b/libpoolprotocols/stratum/EthStratumClient.cpp @@ -276,6 +276,11 @@ void EthStratumClient::resolve_handler(const boost::system::error_code& ec, tcp: { dev::setThreadName("stratum"); cwarn << "Could not resolve host " << m_conn.Host() << ", " << ec.message(); + + // Release locking flag and set connection status + m_connected.store(false, std::memory_order_relaxed); + m_disconnecting.store(false, std::memory_order::memory_order_relaxed); + // Trigger handlers if (m_onDisconnected) { m_onDisconnected(); } From 0722124e1eda5136a31d23ad131849eb10eba864 Mon Sep 17 00:00:00 2001 From: AndreaLanfranchi Date: Tue, 22 May 2018 19:25:23 +0200 Subject: [PATCH 07/13] Amendment on connect pending state for resolver failure --- libpoolprotocols/stratum/EthStratumClient.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libpoolprotocols/stratum/EthStratumClient.cpp b/libpoolprotocols/stratum/EthStratumClient.cpp index 090af89ae9..5577fb0181 100644 --- a/libpoolprotocols/stratum/EthStratumClient.cpp +++ b/libpoolprotocols/stratum/EthStratumClient.cpp @@ -279,7 +279,7 @@ void EthStratumClient::resolve_handler(const boost::system::error_code& ec, tcp: // Release locking flag and set connection status m_connected.store(false, std::memory_order_relaxed); - m_disconnecting.store(false, std::memory_order::memory_order_relaxed); + m_connecting.store(false, std::memory_order::memory_order_relaxed); // Trigger handlers if (m_onDisconnected) { m_onDisconnected(); } From 5681a7cc02bfda538baaab7f577212951e956db7 Mon Sep 17 00:00:00 2001 From: AndreaLanfranchi Date: Tue, 22 May 2018 19:50:20 +0200 Subject: [PATCH 08/13] Add delay to reconnection on pool switching --- libpoolprotocols/PoolManager.cpp | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/libpoolprotocols/PoolManager.cpp b/libpoolprotocols/PoolManager.cpp index 8897b93e8d..702a04b93f 100644 --- a/libpoolprotocols/PoolManager.cpp +++ b/libpoolprotocols/PoolManager.cpp @@ -188,29 +188,26 @@ void PoolManager::workLoop() // Rotate connections if above max attempts threshold if (m_connectionAttempt >= m_maxConnectionAttempts) { - unsigned lastConnectionIdx = m_activeConnectionIdx; - m_connectionAttempt = 0; m_activeConnectionIdx++; if (m_activeConnectionIdx == m_connections.size()) { m_activeConnectionIdx = 0; } - if (lastConnectionIdx != m_activeConnectionIdx) { - - // Stop mining if applicable as we're switching - if (m_farm.isMining()) { - cnote << "Shutting down miners..."; - m_farm.stop(); - } + // Stop mining if applicable as we're switching + if (m_farm.isMining()) { + cnote << "Shutting down miners..."; + m_farm.stop(); // Give some time to mining threads to shutdown for (auto i = 4; --i; this_thread::sleep_for(chrono::seconds(1))) { cnote << "Retrying in " << i << "... \r"; } + } + } if (m_connections[m_activeConnectionIdx].Host() != "exit") { From c297baf0a632176ca2463da55bd1112e23c6e5e5 Mon Sep 17 00:00:00 2001 From: AndreaLanfranchi Date: Tue, 22 May 2018 20:10:41 +0200 Subject: [PATCH 09/13] Give thread proper name --- libpoolprotocols/PoolManager.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/libpoolprotocols/PoolManager.cpp b/libpoolprotocols/PoolManager.cpp index 702a04b93f..db64bb6f6e 100644 --- a/libpoolprotocols/PoolManager.cpp +++ b/libpoolprotocols/PoolManager.cpp @@ -46,6 +46,7 @@ PoolManager::PoolManager(PoolClient * client, Farm &farm, MinerType const & mine p_client->onDisconnected([&]() { + dev::setThreadName("main"); cnote << "Disconnected from " + m_connections[m_activeConnectionIdx].Host() << p_client->ActiveEndPoint(); // Do not stop mining here @@ -183,6 +184,8 @@ void PoolManager::workLoop() // Otherwise do nothing and wait until connection state is NOT pending if (!p_client->isPendingState()) { + dev::setThreadName("main"); + if (!p_client->isConnected()) { // Rotate connections if above max attempts threshold @@ -224,7 +227,6 @@ void PoolManager::workLoop() } else { - dev::setThreadName("main"); cnote << "No more failover connections."; // Stop mining if applicable From da3f08afc4b9793aae800444558d7d066183883e Mon Sep 17 00:00:00 2001 From: AndreaLanfranchi Date: Thu, 24 May 2018 10:27:51 +0200 Subject: [PATCH 10/13] Code cleanup --- ethminer/MinerAux.h | 2 -- libethcore/Farm.h | 7 ------- libpoolprotocols/PoolManager.cpp | 5 ----- 3 files changed, 14 deletions(-) diff --git a/ethminer/MinerAux.h b/ethminer/MinerAux.h index a3dc6391c3..3da9498788 100644 --- a/ethminer/MinerAux.h +++ b/ethminer/MinerAux.h @@ -833,8 +833,6 @@ class MinerCLI sealers["cuda"] = Farm::SealerDescriptor{&CUDAMiner::instances, [](FarmFace& _farm, unsigned _index){ return new CUDAMiner(_farm, _index); }}; #endif - //EthStratumClient::pointer client = EthStratumClient::create(m_worktimeout, m_responsetimeout, m_email, m_report_stratum_hashrate); - PoolClient *client = nullptr; if (m_mode == OperationMode::Stratum) { diff --git a/libethcore/Farm.h b/libethcore/Farm.h index 405799212b..078a9e5f26 100644 --- a/libethcore/Farm.h +++ b/libethcore/Farm.h @@ -157,13 +157,6 @@ class Farm: public FarmFace m_hashrateTimer.expires_from_now(boost::posix_time::milliseconds(1000)); m_hashrateTimer.async_wait(m_io_strand.wrap(boost::bind(&Farm::processHashRate, this, boost::asio::placeholders::error))); - //if (m_serviceThread.joinable()) { - // m_io_service.reset(); - // m_serviceThread.join(); - //} - - //m_serviceThread = std::thread{ boost::bind(&boost::asio::io_service::run, &m_io_service) }; - return true; } diff --git a/libpoolprotocols/PoolManager.cpp b/libpoolprotocols/PoolManager.cpp index db64bb6f6e..eaa2dce9e3 100644 --- a/libpoolprotocols/PoolManager.cpp +++ b/libpoolprotocols/PoolManager.cpp @@ -53,11 +53,6 @@ PoolManager::PoolManager(PoolClient * client, Farm &farm, MinerType const & mine // Workloop will determine if we're trying a fast reconnect to same pool // or if we're switching to failover(s) - //if (m_farm.isMining()) { - // cnote << "Shutting down miners..."; - // m_farm.stop(); - //} - }); p_client->onWorkReceived([&](WorkPackage const& wp) From 4c70f0a3de6b0f7b4956491b44e8b332c800d240 Mon Sep 17 00:00:00 2001 From: AndreaLanfranchi Date: Thu, 24 May 2018 10:45:50 +0200 Subject: [PATCH 11/13] Code cleanup --- libethcore/Farm.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/libethcore/Farm.h b/libethcore/Farm.h index 078a9e5f26..3c4feebc8a 100644 --- a/libethcore/Farm.h +++ b/libethcore/Farm.h @@ -431,8 +431,6 @@ class Farm: public FarmFace std::chrono::steady_clock::time_point m_lastStart; uint64_t m_hashrateSmoothInterval = 10000; - // std::thread m_serviceThread; ///< The IO service thread. - boost::asio::io_service::strand m_io_strand; boost::asio::deadline_timer m_hashrateTimer; std::vector m_lastProgresses; From 0989ed16cd4558ebeb24c693d6077063e82b59ca Mon Sep 17 00:00:00 2001 From: AndreaLanfranchi Date: Fri, 25 May 2018 12:58:52 +0200 Subject: [PATCH 12/13] Minutiae --- libpoolprotocols/PoolManager.cpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/libpoolprotocols/PoolManager.cpp b/libpoolprotocols/PoolManager.cpp index eaa2dce9e3..a5846d0eb2 100644 --- a/libpoolprotocols/PoolManager.cpp +++ b/libpoolprotocols/PoolManager.cpp @@ -172,6 +172,9 @@ void PoolManager::stop() void PoolManager::workLoop() { + + dev::setThreadName("main"); + while (m_running.load(std::memory_order_relaxed)) { @@ -179,8 +182,6 @@ void PoolManager::workLoop() // Otherwise do nothing and wait until connection state is NOT pending if (!p_client->isPendingState()) { - dev::setThreadName("main"); - if (!p_client->isConnected()) { // Rotate connections if above max attempts threshold @@ -204,8 +205,6 @@ void PoolManager::workLoop() } - - } if (m_connections[m_activeConnectionIdx].Host() != "exit") { From c66718690723a17645e2163f1b3859ff01ef2f25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Bylica?= Date: Fri, 25 May 2018 13:30:04 +0200 Subject: [PATCH 13/13] Add CHANGELOG entry --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index fd8c020472..700a020f6c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,5 +4,7 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). ## Unreleased +### Fixed +- Reconnecting with mining pool improved [[#1135](https://github.com/ethereum-mining/ethminer/pull/1135)]. ### Removed - Disabled Debug configuration for Visual Studio [[#69](https://github.com/ethereum-mining/ethminer/issues/69)] [[#1131](https://github.com/ethereum-mining/ethminer/pull/1131)].