Skip to content

Commit

Permalink
Switch to async state service request (gazebosim#461)
Browse files Browse the repository at this point in the history
Signed-off-by: Ian Chen <[email protected]>
Signed-off-by: Guillaume Doisy <[email protected]>
  • Loading branch information
iche033 authored and Guillaume Doisy committed Dec 11, 2020
1 parent 3025646 commit 1a0524f
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 14 deletions.
6 changes: 2 additions & 4 deletions include/ignition/gazebo/gui/GuiRunner.hh
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,9 @@ class IGNITION_GAZEBO_VISIBLE GuiRunner : public QObject
/// \brief Make a new state request to the server.
public slots: void RequestState();

/// \brief Callback for the state service.
/// \brief Callback for the async state service.
/// \param[in] _res Response containing new state.
/// \param[in] _result True if successful.
private: void OnStateService(const msgs::SerializedStepMap &_res,
const bool _result);
private: void OnStateAsyncService(const msgs::SerializedStepMap &_res);

/// \brief Callback when a new state is received from the server.
/// \param[in] _msg New state message.
Expand Down
27 changes: 18 additions & 9 deletions src/gui/GuiRunner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,16 @@ GuiRunner::~GuiRunner() = default;
/////////////////////////////////////////////////
void GuiRunner::RequestState()
{
this->node.Request(this->stateTopic, &GuiRunner::OnStateService, this);
// set up service for async state response callback
std::string id = std::to_string(gui::App()->applicationPid());
std::string reqSrv =
this->node.Options().NameSpace() + "/" + id + "/state_async";
this->node.Advertise(reqSrv, &GuiRunner::OnStateAsyncService, this);
ignition::msgs::StringMsg req;
req.set_data(reqSrv);

// send async state request
this->node.Request(this->stateTopic + "_async", req);
}

/////////////////////////////////////////////////
Expand All @@ -77,17 +86,17 @@ void GuiRunner::OnPluginAdded(const QString &_objectName)
}

/////////////////////////////////////////////////
void GuiRunner::OnStateService(const msgs::SerializedStepMap &_res,
const bool _result)
void GuiRunner::OnStateAsyncService(const msgs::SerializedStepMap &_res)
{
if (!_result)
{
ignerr << "Service call failed for [" << this->stateTopic << "]"
<< std::endl;
return;
}
this->OnState(_res);

// todo(anyone) store reqSrv string in a member variable and use it here
// and in RequestState()
std::string id = std::to_string(gui::App()->applicationPid());
std::string reqSrv =
this->node.Options().NameSpace() + "/" + id + "/state_async";
this->node.UnadvertiseSrv(reqSrv);

// Only subscribe to periodic updates after receiving initial state
if (this->node.SubscribedTopics().empty())
this->node.Subscribe(this->stateTopic, &GuiRunner::OnState, this);
Expand Down
1 change: 0 additions & 1 deletion src/systems/physics/Physics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,6 @@ void PhysicsPrivate::CreatePhysicsEntities(const EntityComponentManager &_ecm)
<< std::endl;
return true;
}

// TODO(anyone) Don't load models unless they have collisions

// Check if parent world / model exists
Expand Down
40 changes: 40 additions & 0 deletions src/systems/scene_broadcaster/SceneBroadcaster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <chrono>
#include <condition_variable>
#include <string>
#include <unordered_set>

#include <ignition/common/Profiler.hh>
#include <ignition/math/graph/Graph.hh>
Expand Down Expand Up @@ -75,6 +76,10 @@ class ignition::gazebo::systems::SceneBroadcasterPrivate
/// \return True if successful.
public: bool StateService(ignition::msgs::SerializedStepMap &_res);

/// \brief Callback for state service - non blocking.
/// \param[out] _res Response containing the last available full state.
public: void StateAsyncService(const ignition::msgs::StringMsg &_req);

/// \brief Updates the scene graph when entities are added
/// \param[in] _manager The entity component manager
public: void SceneGraphAddEntities(const EntityComponentManager &_manager);
Expand Down Expand Up @@ -190,6 +195,9 @@ class ignition::gazebo::systems::SceneBroadcasterPrivate

/// \brief Flag used to indicate if the state service was called.
public: bool stateServiceRequest{false};

/// \brief A list of async state requests
public: std::unordered_set<std::string> stateRequests;
};

//////////////////////////////////////////////////
Expand Down Expand Up @@ -270,6 +278,7 @@ void SceneBroadcaster::PostUpdate(const UpdateInfo &_info,

if (this->dataPtr->stateServiceRequest || shouldPublish)
{
std::unique_lock<std::mutex> lock(this->dataPtr->stateMutex);
this->dataPtr->stepMsg.Clear();

set(this->dataPtr->stepMsg.mutable_stats(), _info);
Expand All @@ -294,6 +303,16 @@ void SceneBroadcaster::PostUpdate(const UpdateInfo &_info,
this->dataPtr->stateCv.notify_all();
}

// process async state requests
if (!this->dataPtr->stateRequests.empty())
{
for (const auto &reqSrv : this->dataPtr->stateRequests)
{
this->dataPtr->node->Request(reqSrv, this->dataPtr->stepMsg);
}
this->dataPtr->stateRequests.clear();
}

// Poses periodically + change events
// TODO(louise) Send changed state periodically instead, once it reflects
// changed components
Expand Down Expand Up @@ -448,6 +467,8 @@ void SceneBroadcasterPrivate::SetupTransport(const std::string &_worldName)
<< graphService << "]" << std::endl;

// State service
// Note: GuiRunner used to call this service but it is now using the async
// version (state_async)
std::string stateService{"state"};

this->node->Advertise(stateService, &SceneBroadcasterPrivate::StateService,
Expand All @@ -456,6 +477,15 @@ void SceneBroadcasterPrivate::SetupTransport(const std::string &_worldName)
ignmsg << "Serving full state on [" << opts.NameSpace() << "/"
<< stateService << "]" << std::endl;

// Async State service
std::string stateAsyncService{"state_async"};

this->node->Advertise(stateAsyncService,
&SceneBroadcasterPrivate::StateAsyncService, this);

ignmsg << "Serving full state (async) on [" << opts.NameSpace() << "/"
<< stateAsyncService << "]" << std::endl;

// Scene info topic
std::string sceneTopic{"/world/" + _worldName + "/scene/info"};

Expand Down Expand Up @@ -523,6 +553,15 @@ bool SceneBroadcasterPrivate::SceneInfoService(ignition::msgs::Scene &_res)
return true;
}

//////////////////////////////////////////////////
void SceneBroadcasterPrivate::StateAsyncService(
const ignition::msgs::StringMsg &_req)
{
std::unique_lock<std::mutex> lock(this->stateMutex);
this->stateServiceRequest = true;
this->stateRequests.insert(_req.data());
}

//////////////////////////////////////////////////
bool SceneBroadcasterPrivate::StateService(
ignition::msgs::SerializedStepMap &_res)
Expand All @@ -531,6 +570,7 @@ bool SceneBroadcasterPrivate::StateService(

// Lock and wait for an iteration to be run and fill the state
std::unique_lock<std::mutex> lock(this->stateMutex);

this->stateServiceRequest = true;
auto success = this->stateCv.wait_for(lock, 5s, [&]
{
Expand Down
25 changes: 25 additions & 0 deletions test/integration/scene_broadcaster_system.cc
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,13 @@ TEST_P(SceneBroadcasterTest, State)
checkMsg(_res, 3);
};

// async state request with full state response
std::function<void(const msgs::SerializedStepMap &)> cbAsync =
[&](const msgs::SerializedStepMap &_res)
{
checkMsg(_res, 16);
};

// The request is blocking even though it's meant to be async, so we spin a
// thread
auto request = [&]()
Expand Down Expand Up @@ -429,6 +436,24 @@ TEST_P(SceneBroadcasterTest, State)
while (!received && sleep++ < maxSleep)
IGN_SLEEP_MS(100);
EXPECT_TRUE(received);

// test async state request
received = false;
std::string reqSrv = "/state_async_callback_test";
node.Advertise(reqSrv, cbAsync);

ignition::msgs::StringMsg req;
req.set_data(reqSrv);
node.Request("/world/default/state_async", req);

sleep = 0;
while (!received && sleep++ < maxSleep)
{
// Run server
server.Run(true, 1, false);
IGN_SLEEP_MS(100);
}
EXPECT_TRUE(received);
}

/////////////////////////////////////////////////
Expand Down

0 comments on commit 1a0524f

Please sign in to comment.