Skip to content

Commit

Permalink
Image streaming over websocket (#97)
Browse files Browse the repository at this point in the history
* image streaming using ws

Signed-off-by: Ian Chen <[email protected]>

* fix build

Signed-off-by: Ian Chen <[email protected]>

* add doc

Signed-off-by: Ian Chen <[email protected]>

* Comment out authorization

Signed-off-by: Nate Koenig <[email protected]>

Co-authored-by: Nate Koenig <[email protected]>
  • Loading branch information
iche033 and Nate Koenig authored Apr 7, 2021
1 parent 77f736a commit 3296a3e
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 5 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ ign_find_package(TINYXML2 REQUIRED PRIVATE PRETTY tinyxml2)

#--------------------------------------
# Find ignition-common
ign_find_package(ignition-common3 REQUIRED PRIVATE)
ign_find_package(ignition-common3 REQUIRED PRIVATE COMPONENTS graphics)
set(IGN_COMMON_MAJOR_VER ${ignition-common3_VERSION_MAJOR})

#--------------------------------------
Expand Down
9 changes: 6 additions & 3 deletions examples/websocket.ign
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@
filename="ignition-launch-websocket-server">
<publication_hz>30</publication_hz>
<port>9002</port>
<authorization_key>auth_key</authorization_key>
<admin_authorization_key>admin_key</admin_authorization_key>

<!-- The following elements support authorization keys-->
<!-- <authorization_key>auth_key</authorization_key>
<admin_authorization_key>admin_key</admin_authorization_key>
-->

<!-- The maximum number of allowed connections. If this element is not
defined or negative, then a limit is not enforced. -->
<max_connections>0</max_connections>
<max_connections>-1</max_connections>

<!-- SSL configuration -->
<!-- Specify the path to both the certificate and private key. -->
Expand Down
1 change: 1 addition & 0 deletions plugins/websocket_server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ if (websockets_FOUND)
${websockets_LIBRARIES}
${PROJECT_LIBRARY_TARGET_NAME}
ignition-common${IGN_COMMON_MAJOR_VER}::ignition-common${IGN_COMMON_MAJOR_VER}
ignition-common${IGN_COMMON_MAJOR_VER}::graphics
ignition-msgs${IGN_MSGS_MAJOR_VER}::core
ignition-plugin${IGN_PLUGIN_MAJOR_VER}::core
ignition-transport${IGN_TRANSPORT_MAJOR_VER}::core
Expand Down
82 changes: 81 additions & 1 deletion plugins/websocket_server/WebsocketServer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <algorithm>
#include <ignition/common/Console.hh>
#include <ignition/common/Image.hh>
#include <ignition/common/Util.hh>
#include <ignition/msgs.hh>

Expand Down Expand Up @@ -260,7 +261,6 @@ int rootCallback(struct lws *_wsi,
igndbg << "LWS_CALLBACK_HTTP\n";
return httpCallback(_wsi, _reason, _user, _in, _len);
break;

// Publish outboud messages
case LWS_CALLBACK_SERVER_WRITEABLE:
{
Expand Down Expand Up @@ -761,6 +761,42 @@ void WebsocketServer::OnMessage(int _socketId, const std::string &_msg)
this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));
}
else if (frameParts[0] == "image")
{
// Store the relation of socketId to subscribed topic.
this->topicConnections[frameParts[1]].insert(_socketId);
this->topicTimestamps[frameParts[1]] =
std::chrono::steady_clock::now() - this->publishPeriod;

std::vector<std::string> allTopics;
std::set<std::string> imageTopics;
this->node.TopicList(allTopics);
for (auto queryTopic: allTopics)
{
std::vector<transport::MessagePublisher> publishers;
this->node.TopicInfo(queryTopic, publishers);
for (auto pub: publishers)
{
if (pub.MsgTypeName() == "ignition.msgs.Image")
{
imageTopics.insert(queryTopic);
break;
}
}
}
std::string topic = frameParts[1];
if (!imageTopics.count(topic))
{
igndbg << "Could not find topic: " << topic << " to stream"
<< std::endl;
return;
}

igndbg << "Subscribe request to image topic[" << frameParts[1] << "]\n";
this->node.Subscribe(frameParts[1],
&WebsocketServer::OnWebsocketSubscribedImageMessage, this);
}

}

//////////////////////////////////////////////////
Expand Down Expand Up @@ -809,3 +845,47 @@ void WebsocketServer::OnWebsocketSubscribedMessage(
}
}
}

//////////////////////////////////////////////////
void WebsocketServer::OnWebsocketSubscribedImageMessage(
const ignition::msgs::Image &_msg,
const ignition::transport::MessageInfo &_info)
{
std::map<std::string, std::set<int>>::const_iterator iter =
this->topicConnections.find(_info.Topic());

if (iter != this->topicConnections.end())
{
std::lock_guard<std::mutex> mainLock(this->subscriptionMutex);
std::chrono::time_point<std::chrono::steady_clock> systemTime =
std::chrono::steady_clock::now();

std::chrono::nanoseconds timeDelta =
systemTime - this->topicTimestamps[_info.Topic()];

if (timeDelta > this->publishPeriod)
{
// Store the last time this topic was published.
this->topicTimestamps[_info.Topic()] = systemTime;

// send raw png data
std::vector<unsigned char> buffer;
common::Image image;
image.SetFromData(
(unsigned char*) _msg.data().c_str(),
_msg.width(), _msg.height(), common::Image::RGB_INT8);
image.SavePNGToBuffer(buffer);
std::string img(reinterpret_cast<char *>(buffer.data()), buffer.size());

// Send the message
for (const int &socketId : iter->second)
{
if (this->connections.find(socketId) != this->connections.end())
{
this->QueueMessage(this->connections[socketId].get(),
img.c_str(), img.length());
}
}
}
}
}
7 changes: 7 additions & 0 deletions plugins/websocket_server/WebsocketServer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,13 @@ namespace ignition
const size_t _size,
const ignition::transport::MessageInfo &_info);

/// \brief Callback when an image is received on a topic
/// \param[in] _msg Image msg
/// \param[in] _info ign transport message info
private: void OnWebsocketSubscribedImageMessage(
const ignition::msgs::Image &_msg,
const ignition::transport::MessageInfo &_info);

/// \brief Callback that is triggered when a new connection is
/// established.
/// \param[in] _socketId ID of the socket.
Expand Down

0 comments on commit 3296a3e

Please sign in to comment.