From 1a0524fd3fed8607c3911bc255a5e98380c0afe4 Mon Sep 17 00:00:00 2001 From: Ian Chen Date: Wed, 9 Dec 2020 20:29:48 -0800 Subject: [PATCH] Switch to async state service request (#461) Signed-off-by: Ian Chen Signed-off-by: Guillaume Doisy --- include/ignition/gazebo/gui/GuiRunner.hh | 6 +-- src/gui/GuiRunner.cc | 27 ++++++++----- src/systems/physics/Physics.cc | 1 - .../scene_broadcaster/SceneBroadcaster.cc | 40 +++++++++++++++++++ test/integration/scene_broadcaster_system.cc | 25 ++++++++++++ 5 files changed, 85 insertions(+), 14 deletions(-) diff --git a/include/ignition/gazebo/gui/GuiRunner.hh b/include/ignition/gazebo/gui/GuiRunner.hh index e77e9bd3610..43a33a795e2 100644 --- a/include/ignition/gazebo/gui/GuiRunner.hh +++ b/include/ignition/gazebo/gui/GuiRunner.hh @@ -55,11 +55,9 @@ class IGNITION_GAZEBO_VISIBLE GuiRunner : public QObject /// \brief Make a new state request to the server. public slots: void RequestState(); - /// \brief Callback for the state service. + /// \brief Callback for the async state service. /// \param[in] _res Response containing new state. - /// \param[in] _result True if successful. - private: void OnStateService(const msgs::SerializedStepMap &_res, - const bool _result); + private: void OnStateAsyncService(const msgs::SerializedStepMap &_res); /// \brief Callback when a new state is received from the server. /// \param[in] _msg New state message. diff --git a/src/gui/GuiRunner.cc b/src/gui/GuiRunner.cc index a433cc7a4a3..88c7f4d46aa 100644 --- a/src/gui/GuiRunner.cc +++ b/src/gui/GuiRunner.cc @@ -59,7 +59,16 @@ GuiRunner::~GuiRunner() = default; ///////////////////////////////////////////////// void GuiRunner::RequestState() { - this->node.Request(this->stateTopic, &GuiRunner::OnStateService, this); + // set up service for async state response callback + std::string id = std::to_string(gui::App()->applicationPid()); + std::string reqSrv = + this->node.Options().NameSpace() + "/" + id + "/state_async"; + this->node.Advertise(reqSrv, &GuiRunner::OnStateAsyncService, this); + ignition::msgs::StringMsg req; + req.set_data(reqSrv); + + // send async state request + this->node.Request(this->stateTopic + "_async", req); } ///////////////////////////////////////////////// @@ -77,17 +86,17 @@ void GuiRunner::OnPluginAdded(const QString &_objectName) } ///////////////////////////////////////////////// -void GuiRunner::OnStateService(const msgs::SerializedStepMap &_res, - const bool _result) +void GuiRunner::OnStateAsyncService(const msgs::SerializedStepMap &_res) { - if (!_result) - { - ignerr << "Service call failed for [" << this->stateTopic << "]" - << std::endl; - return; - } this->OnState(_res); + // todo(anyone) store reqSrv string in a member variable and use it here + // and in RequestState() + std::string id = std::to_string(gui::App()->applicationPid()); + std::string reqSrv = + this->node.Options().NameSpace() + "/" + id + "/state_async"; + this->node.UnadvertiseSrv(reqSrv); + // Only subscribe to periodic updates after receiving initial state if (this->node.SubscribedTopics().empty()) this->node.Subscribe(this->stateTopic, &GuiRunner::OnState, this); diff --git a/src/systems/physics/Physics.cc b/src/systems/physics/Physics.cc index 3cf1fccada4..84b5276e313 100644 --- a/src/systems/physics/Physics.cc +++ b/src/systems/physics/Physics.cc @@ -702,7 +702,6 @@ void PhysicsPrivate::CreatePhysicsEntities(const EntityComponentManager &_ecm) << std::endl; return true; } - // TODO(anyone) Don't load models unless they have collisions // Check if parent world / model exists diff --git a/src/systems/scene_broadcaster/SceneBroadcaster.cc b/src/systems/scene_broadcaster/SceneBroadcaster.cc index e466d4fb7dc..576a081bc02 100644 --- a/src/systems/scene_broadcaster/SceneBroadcaster.cc +++ b/src/systems/scene_broadcaster/SceneBroadcaster.cc @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -75,6 +76,10 @@ class ignition::gazebo::systems::SceneBroadcasterPrivate /// \return True if successful. public: bool StateService(ignition::msgs::SerializedStepMap &_res); + /// \brief Callback for state service - non blocking. + /// \param[out] _res Response containing the last available full state. + public: void StateAsyncService(const ignition::msgs::StringMsg &_req); + /// \brief Updates the scene graph when entities are added /// \param[in] _manager The entity component manager public: void SceneGraphAddEntities(const EntityComponentManager &_manager); @@ -190,6 +195,9 @@ class ignition::gazebo::systems::SceneBroadcasterPrivate /// \brief Flag used to indicate if the state service was called. public: bool stateServiceRequest{false}; + + /// \brief A list of async state requests + public: std::unordered_set stateRequests; }; ////////////////////////////////////////////////// @@ -270,6 +278,7 @@ void SceneBroadcaster::PostUpdate(const UpdateInfo &_info, if (this->dataPtr->stateServiceRequest || shouldPublish) { + std::unique_lock lock(this->dataPtr->stateMutex); this->dataPtr->stepMsg.Clear(); set(this->dataPtr->stepMsg.mutable_stats(), _info); @@ -294,6 +303,16 @@ void SceneBroadcaster::PostUpdate(const UpdateInfo &_info, this->dataPtr->stateCv.notify_all(); } + // process async state requests + if (!this->dataPtr->stateRequests.empty()) + { + for (const auto &reqSrv : this->dataPtr->stateRequests) + { + this->dataPtr->node->Request(reqSrv, this->dataPtr->stepMsg); + } + this->dataPtr->stateRequests.clear(); + } + // Poses periodically + change events // TODO(louise) Send changed state periodically instead, once it reflects // changed components @@ -448,6 +467,8 @@ void SceneBroadcasterPrivate::SetupTransport(const std::string &_worldName) << graphService << "]" << std::endl; // State service + // Note: GuiRunner used to call this service but it is now using the async + // version (state_async) std::string stateService{"state"}; this->node->Advertise(stateService, &SceneBroadcasterPrivate::StateService, @@ -456,6 +477,15 @@ void SceneBroadcasterPrivate::SetupTransport(const std::string &_worldName) ignmsg << "Serving full state on [" << opts.NameSpace() << "/" << stateService << "]" << std::endl; + // Async State service + std::string stateAsyncService{"state_async"}; + + this->node->Advertise(stateAsyncService, + &SceneBroadcasterPrivate::StateAsyncService, this); + + ignmsg << "Serving full state (async) on [" << opts.NameSpace() << "/" + << stateAsyncService << "]" << std::endl; + // Scene info topic std::string sceneTopic{"/world/" + _worldName + "/scene/info"}; @@ -523,6 +553,15 @@ bool SceneBroadcasterPrivate::SceneInfoService(ignition::msgs::Scene &_res) return true; } +////////////////////////////////////////////////// +void SceneBroadcasterPrivate::StateAsyncService( + const ignition::msgs::StringMsg &_req) +{ + std::unique_lock lock(this->stateMutex); + this->stateServiceRequest = true; + this->stateRequests.insert(_req.data()); +} + ////////////////////////////////////////////////// bool SceneBroadcasterPrivate::StateService( ignition::msgs::SerializedStepMap &_res) @@ -531,6 +570,7 @@ bool SceneBroadcasterPrivate::StateService( // Lock and wait for an iteration to be run and fill the state std::unique_lock lock(this->stateMutex); + this->stateServiceRequest = true; auto success = this->stateCv.wait_for(lock, 5s, [&] { diff --git a/test/integration/scene_broadcaster_system.cc b/test/integration/scene_broadcaster_system.cc index 22f47cc0287..b402fe62692 100644 --- a/test/integration/scene_broadcaster_system.cc +++ b/test/integration/scene_broadcaster_system.cc @@ -397,6 +397,13 @@ TEST_P(SceneBroadcasterTest, State) checkMsg(_res, 3); }; + // async state request with full state response + std::function cbAsync = + [&](const msgs::SerializedStepMap &_res) + { + checkMsg(_res, 16); + }; + // The request is blocking even though it's meant to be async, so we spin a // thread auto request = [&]() @@ -429,6 +436,24 @@ TEST_P(SceneBroadcasterTest, State) while (!received && sleep++ < maxSleep) IGN_SLEEP_MS(100); EXPECT_TRUE(received); + + // test async state request + received = false; + std::string reqSrv = "/state_async_callback_test"; + node.Advertise(reqSrv, cbAsync); + + ignition::msgs::StringMsg req; + req.set_data(reqSrv); + node.Request("/world/default/state_async", req); + + sleep = 0; + while (!received && sleep++ < maxSleep) + { + // Run server + server.Run(true, 1, false); + IGN_SLEEP_MS(100); + } + EXPECT_TRUE(received); } /////////////////////////////////////////////////