Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Macos2async #732

Merged
merged 17 commits into from
Aug 8, 2018
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ if( INCLUDE_PYTHON )
set( MACOS_USE_PYTHON_MODULE_DESC "Specifies which Python module to build Malmo on Apple MacOS" )
set( USE_PYTHON_VERSIONS 3.6 CACHE STRING ${USE_PYTHON_VERSIONS_DESC} )
# Boost has switched to using a 2 digit naming convention for python on MacOS.
set( MACOS_USE_PYTHON_MODULE "python36" CACHE STRING ${MACOS_USE_PYTHON_MODULE_DESC} )
set( MACOS_USE_PYTHON_MODULE "python37" CACHE STRING ${MACOS_USE_PYTHON_MODULE_DESC} )
endif()

set( WARNINGS_AS_ERRORS OFF )
Expand Down
37 changes: 28 additions & 9 deletions Malmo/src/AgentHost.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ namespace malmo

// start the io_service on background threads
this->work = boost::in_place(boost::ref(this->io_service));
const int NUM_BACKGROUND_THREADS = 1; // can be increased if I/O becomes a bottleneck
const int NUM_BACKGROUND_THREADS = 3; // can be increased if I/O becomes a bottleneck
for( int i = 0; i < NUM_BACKGROUND_THREADS; i++ )
this->background_threads.push_back( boost::make_shared<boost::thread>( boost::bind( &boost::asio::io_service::run, &this->io_service ) ) );
}
Expand Down Expand Up @@ -113,7 +113,7 @@ namespace malmo
std::string line = "";
std::string version = "";
// Keep concatenating lines until we have a match, or we run out of schema.
while (version.empty() && getline(stream, line))
while (version.empty() && !stream.eof() && getline(stream, line))
{
boost::trim(line);
xml += line;
Expand Down Expand Up @@ -314,9 +314,10 @@ namespace malmo
{
reply = rpc.sendStringAndGetShortReply(this->io_service, item->ip_address, item->control_port, request, false);
}
catch (std::exception&)
catch (std::exception& e)
{
// This is expected quite often - client is likely not running.
LOGINFO(LT("Client could not be contacted: "), item->ip_address, LT(":"), item->control_port, LT(" "), e.what());
continue;
}
LOGINFO(LT("Reserving client, received reply from "), item->ip_address, LT(": "), reply);
Expand Down Expand Up @@ -452,7 +453,8 @@ namespace malmo
{
boost::lock_guard<boost::mutex> scope_guard(this->world_state_mutex);

return this->world_state;
WorldState current_world_state(this->world_state); // Copy while holding lock.
return current_world_state;
}

WorldState AgentHost::getWorldState()
Expand Down Expand Up @@ -498,8 +500,13 @@ namespace malmo
{
return; // can re-use existing server
}

if (this->mission_control_server != 0) {
this->mission_control_server->close();
}

this->mission_control_server = boost::make_shared<StringServer>(this->io_service, port, boost::bind(&AgentHost::onMissionControlMessage, this, _1), "mcp");
this->mission_control_server->start();
this->mission_control_server->start(mission_control_server);
}

boost::shared_ptr<VideoServer> AgentHost::listenForVideo(boost::shared_ptr<VideoServer> video_server, int port, short width, short height, short channels, TimestampedVideoFrame::FrameType frametype)
Expand Down Expand Up @@ -530,6 +537,10 @@ namespace malmo
video_server->getChannels() != channels ||
video_server->getFrameType() != frametype)
{
if (video_server != 0) {
video_server->close();
}

// Can't use the server passed in - create a new one.
ret_server = boost::make_shared<VideoServer>( this->io_service, port, width, height, channels, frametype, boost::bind(&AgentHost::onVideo, this, _1));

Expand All @@ -540,8 +551,8 @@ namespace malmo
ret_server->recordBmps(this->current_mission_record->getTemporaryDirectory());
}

ret_server->start();
}
ret_server->start(ret_server);
}
else {
// re-use the existing video_server
// but now we need to re-create the file writers with the new file names
Expand All @@ -562,8 +573,12 @@ namespace malmo
{
if( !this->rewards_server || ( port != 0 && this->rewards_server->getPort() != port ) )
{
if (rewards_server != nullptr) {
rewards_server->close();
}

this->rewards_server = boost::make_shared<StringServer>(this->io_service, port, boost::bind(&AgentHost::onReward, this, _1), "rew");
this->rewards_server->start();
this->rewards_server->start(rewards_server);
}

if (this->current_mission_record->isRecordingRewards()){
Expand All @@ -575,8 +590,12 @@ namespace malmo
{
if( !this->observations_server || ( port != 0 && this->observations_server->getPort() != port ) )
{
if (observations_server != nullptr) {
observations_server->close();
}

this->observations_server = boost::make_shared<StringServer>(this->io_service, port, boost::bind(&AgentHost::onObservation, this, _1), "obs");
this->observations_server->start();
this->observations_server->start(observations_server);
}

if (this->current_mission_record->isRecordingObservations()){
Expand Down
3 changes: 2 additions & 1 deletion Malmo/src/ClientConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ namespace malmo
if (ec)
LOGERROR(LT("Error resolving remote endpoint: "), ec.message());
boost::lock_guard<boost::mutex> scope_guard(this->outbox_mutex);
this->outbox.pop_front();
if (!this->outbox.empty())
this->outbox.pop_front();
}
if (!this->outbox.empty())
this->write();
Expand Down
15 changes: 13 additions & 2 deletions Malmo/src/StringServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,27 @@
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>

#include <iostream>

namespace malmo {
StringServer::StringServer(boost::asio::io_service& io_service, int port, const boost::function<void(const TimestampedString string_message)> handle_string, const std::string& log_name)
: handle_string(handle_string)
, server(io_service, port, boost::bind(&StringServer::handleMessage, this, _1), log_name)
{
}

void StringServer::start()
void StringServer::start(boost::shared_ptr<StringServer>& scope)
{
this->server.start();
this->scope = scope;
this->server.start(scope.get());
}

void StringServer::close() {
this->server.close();
}

void StringServer::release() {
this->scope = 0;
}

StringServer& StringServer::record(std::string path)
Expand Down
12 changes: 10 additions & 2 deletions Malmo/src/StringServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
// Boost:
#include <boost/function.hpp>
#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>

// STL:
#include <fstream>
Expand All @@ -36,7 +37,7 @@
namespace malmo
{
//! A TCP server that receives strings and can optionally persist to file.
class StringServer
class StringServer : ServerScope
{
public:

Expand All @@ -57,7 +58,12 @@ namespace malmo
void recordMessage(const TimestampedString message);

//! Starts the string server.
void start();

void start(boost::shared_ptr<StringServer>& scope);

virtual void release();

void close();

private:

Expand All @@ -67,6 +73,8 @@ namespace malmo
TCPServer server;
std::ofstream writer;
boost::mutex write_mutex;

boost::shared_ptr<StringServer> scope = nullptr;
};
}

Expand Down
61 changes: 44 additions & 17 deletions Malmo/src/TCPConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,32 +141,59 @@ namespace malmo
else
LOGERROR(LT("TCPConnection("), this->log_name, LT(")::handle_read_line("), safe_local_endpoint(), LT("/"), safe_remote_endpoint(), LT(") - bytes_transferred: "), bytes_transferred, LT(" - ERROR: "), error.message());
}

void TCPConnection::processMessage()
{
LOGFINE(LT("TCPConnection("), this->log_name, LT(")::processMessage("), safe_local_endpoint(), LT("/"), safe_remote_endpoint(), LT(") - bytes received: "), this->body_buffer.size());

if( this->confirm_with_fixed_reply )
sendReply();
this->onMessageReceived( TimestampedUnsignedCharVector( boost::posix_time::microsec_clock::universal_time(),
this->body_buffer ) );
this->read();
if (this->confirm_with_fixed_reply)
{
reply();
}
else
{
deliverMessage();
}
}

void TCPConnection::sendReply()
void TCPConnection::reply()
{
const int REPLY_SIZE_HEADER_LENGTH = 4;
boost::system::error_code ec;
u_long reply_size_header = htonl((u_long)this->fixed_reply.size());
size_t bytes_written = boost::asio::write(this->socket, boost::asio::buffer(&reply_size_header, REPLY_SIZE_HEADER_LENGTH), ec);
if (bytes_written != REPLY_SIZE_HEADER_LENGTH || ec)
LOGERROR(LT("TCPConnection("), this->log_name, LT(")::sendReply - ONLY SENT "), bytes_written, LT(" BYTES: "), ec.message());

bytes_written = boost::asio::write( this->socket, boost::asio::buffer(this->fixed_reply), boost::asio::transfer_all(), ec );
if (ec)
LOGERROR(LT("TCPConnection("), this->log_name, LT(")::sendReply - failed to send body of message: "), ec.message());
this->reply_size_header = htonl((u_long)this->fixed_reply.size());

// Send header and continue after with response body.
boost::asio::async_write(this->socket, boost::asio::buffer(&this->reply_size_header, REPLY_SIZE_HEADER_LENGTH), boost::bind(&TCPConnection::transferredHeader, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}

void TCPConnection::transferredHeader(const boost::system::error_code& error, std::size_t bytes_transferred) {
if (!error)
{
// Send body and continue after with message delivery.
boost::asio::async_write(this->socket, boost::asio::buffer(this->fixed_reply), boost::bind(&TCPConnection::transferredBody, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}
else
LOGFINE(LT("TCPConnection("), this->log_name, LT(")::sendReply sent "), bytes_written, LT(" bytes"));
{
LOGERROR(LT("TCPConnection("), this->log_name, LT(")::transferredHeader - failed to send header of message: "), error.message());
}
}

void TCPConnection::transferredBody(const boost::system::error_code& error, std::size_t bytes_transferred) {
if (!error)
{
LOGFINE(LT("TCPConnection("), this->log_name, LT(")::transferredBody sent "), bytes_transferred, LT(" bytes"));

this->deliverMessage();
}
else
{
LOGERROR(LT("TCPConnection("), this->log_name, LT(")::transferredBody - failed to send body of message: "), error.message());
}
}

void TCPConnection::deliverMessage()
{
this->onMessageReceived(TimestampedUnsignedCharVector(boost::posix_time::microsec_clock::universal_time(), this->body_buffer));
this->read(); // Continue on with reading of next request message.
}

TCPConnection::TCPConnection(boost::asio::io_service& io_service, boost::function<void(const TimestampedUnsignedCharVector) > callback, bool expect_size_header, const std::string& log_name)
Expand Down
8 changes: 7 additions & 1 deletion Malmo/src/TCPConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,13 @@ namespace malmo
void handle_read_line( const boost::system::error_code& error, size_t bytes_transferred );

size_t getSizeFromHeader();

void processMessage();
void sendReply();
void reply();
void deliverMessage();

void transferredHeader(const boost::system::error_code& ec, std::size_t transferred);
void transferredBody(const boost::system::error_code& ec, std::size_t transferred);

private:

Expand All @@ -84,6 +89,7 @@ namespace malmo
std::string fixed_reply;
bool expect_size_header;
std::string log_name;
u_long reply_size_header;
};
}

Expand Down
46 changes: 40 additions & 6 deletions Malmo/src/TCPServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,17 @@ namespace malmo
}
}

void TCPServer::start()
void TCPServer::start(ServerScope* scope)
{
this->scope = scope;
this->startAccept();
}


void TCPServer::close() {
this->closing = true;
this->acceptor->close();
}

void TCPServer::confirmWithFixedReply(std::string reply)
{
this->confirm_with_fixed_reply = true;
Expand All @@ -72,9 +78,14 @@ namespace malmo

void TCPServer::startAccept()
{
boost::function<void(const TimestampedUnsignedCharVector) > deliverMsgIfNotClosed = [this](const TimestampedUnsignedCharVector msg) {
if (!this->closing)
this->onMessageReceived(msg);
};

boost::shared_ptr<TCPConnection> new_connection = TCPConnection::create(
this->acceptor->get_io_service(),
this->onMessageReceived,
deliverMsgIfNotClosed,
this->expect_size_header,
this->log_name
);
Expand All @@ -89,17 +100,40 @@ namespace malmo
boost::asio::placeholders::error));
}

void TCPServer::handleAccept(
void TCPServer::handleAccept(
boost::shared_ptr<TCPConnection> new_connection,
const boost::system::error_code& error)
{
// On closing or on error release scope of async io processing which can be us.

if (!error)
{
new_connection->read();
this->startAccept();
if (this->closing)
{
new_connection.get()->getSocket().close();
if (this->scope != nullptr)
this->scope->release();
}
else {
new_connection->read();
if (!this->closing)
{
this->startAccept();
}
else
{
if (this->scope != nullptr)
this->scope->release();
}
}
}
else
{
LOGERROR(LT("TCPServer::handleAccept("), this->log_name, LT(") - "), error.message());
if (this->scope != nullptr) {
this->scope->release();
}
}
}

int TCPServer::getPort() const
Expand Down
Loading