From d9bfacbb9a0dc44d2e168f2ce812fb6be3439be5 Mon Sep 17 00:00:00 2001 From: Aditya Pande Date: Fri, 4 Nov 2022 15:55:02 -0700 Subject: [PATCH] Acoustic comms : Packet collision timeout (#1755) * Acoustic comms : Packet collision timeouts added. Signed-off-by: Aditya --- .../acoustic_comms_packet_collision.sdf | 256 ++++++++++++++++++ src/systems/acoustic_comms/AcousticComms.cc | 76 +++++- src/systems/acoustic_comms/AcousticComms.hh | 12 + test/integration/acoustic_comms.cc | 35 ++- 4 files changed, 364 insertions(+), 15 deletions(-) create mode 100644 examples/worlds/acoustic_comms_packet_collision.sdf diff --git a/examples/worlds/acoustic_comms_packet_collision.sdf b/examples/worlds/acoustic_comms_packet_collision.sdf new file mode 100644 index 0000000000..cde89b7015 --- /dev/null +++ b/examples/worlds/acoustic_comms_packet_collision.sdf @@ -0,0 +1,256 @@ + + + + + + + 0.001 + 1.0 + + + + + + + + + 6 + 1400 + 1 + 0.1 + + + + true + 0 0 10 0 0 0 + 1 1 1 1 + 0.5 0.5 0.5 1 + + 1000 + 0.9 + 0.01 + 0.001 + + -0.5 0.1 -0.9 + + + + true + + + + + 0 0 1 + 100 100 + + + + + + + 0 0 1 + 100 100 + + + + 0.8 0.8 0.8 1 + 0.8 0.8 0.8 1 + 0.8 0.8 0.8 1 + + + + + + + 2 0 0.5 0 0 0 + + + + 0.16666 + 0 + 0 + 0.16666 + 0 + 0.16666 + + 1.0 + + + + + 1 1 1 + + + + + + + + 1 1 1 + + + + 1 0 0 1 + 1 0 0 1 + 1 0 0 1 + + + + + +
addr1
+ addr1/rx +
+
+ + + -2 0 0.5 0 0 0 + + + + 0.16666 + 0 + 0 + 0.16666 + 0 + 0.16666 + + 1.0 + + + + + 1 1 1 + + + + + + + + 1 1 1 + + + + 0 0 1 1 + 0 0 1 1 + 0 0 1 1 + + + + + +
addr2
+ addr2/rx + + /broker/bind + /broker/unbind + +
+ +
+ + + 5 0 0.5 0 0 0 + + + + 0.16666 + 0 + 0 + 0.16666 + 0 + 0.16666 + + 1.0 + + + + + 1 1 1 + + + + + + + + 1 1 1 + + + + 1 0 0 1 + 1 0 0 1 + 1 0 0 1 + + + + + +
addr3
+ addr3/rx +
+
+ + + -5 0 0.5 0 0 0 + + + + 0.16666 + 0 + 0 + 0.16666 + 0 + 0.16666 + + 1.0 + + + + + 1 1 1 + + + + + + + + 1 1 1 + + + + 1 0 0 1 + 1 0 0 1 + 1 0 0 1 + + + + + +
addr4
+ addr4/rx +
+
+ +
+
diff --git a/src/systems/acoustic_comms/AcousticComms.cc b/src/systems/acoustic_comms/AcousticComms.cc index cfb4863c89..93699fdd70 100644 --- a/src/systems/acoustic_comms/AcousticComms.cc +++ b/src/systems/acoustic_comms/AcousticComms.cc @@ -19,7 +19,6 @@ * Development of this module has been funded by the Monterey Bay Aquarium * Research Institute (MBARI) and the David and Lucile Packard Foundation */ -#include #include #include @@ -42,11 +41,26 @@ class AcousticComms::Implementation /// \brief Default speed of sound in air in metres/sec. public: double speedOfSound = 343.0; + /// \brief Default collision time interval per byte in sec. + public: double collisionTimePerByte = 0; + + /// \brief Default collision time when packet is dropped + /// in sec. + public: double collisionTimePacketDrop = 0; + /// \brief Position of the transmitter at the time the message was /// sent, or first processed. public: std::unordered_map , math::Vector3d> poseSrcAtMsgTimestamp; + + /// \brief Map that holds data of the address of a receiver, + /// the timestamp, length of the last message recevied by it. + public: std::unordered_map + , + std::chrono::duration>> + lastMsgReceivedInfo; }; ////////////////////////////////////////////////// @@ -70,6 +84,11 @@ void AcousticComms::Load( { this->dataPtr->speedOfSound = _sdf->Get("speed_of_sound"); } + if (_sdf->HasElement("collision_time_per_byte")) + { + this->dataPtr->collisionTimePerByte = + _sdf->Get("collision_time_per_byte"); + } gzmsg << "AcousticComms configured with max range : " << this->dataPtr->maxRange << " m and speed of sound : " << @@ -175,8 +194,61 @@ void AcousticComms::Step( { if (distanceCoveredByMessage >= distanceToTransmitter) { + // This message has effectively reached the destination. + bool receivedSuccessfully = false; + + // Check for time collision + if (this->dataPtr->lastMsgReceivedInfo.count( + msg->dst_address()) == 0) + { + // This is the first message received by this address. + receivedSuccessfully = true; + } + else + { + // A previous msg was already received at this address. + // time gap = current time - time at which last msg was received. + std::chrono::duration timeGap = currTimestamp - + std::get<0>(this->dataPtr->lastMsgReceivedInfo[ + msg->dst_address()]); + + // drop interval = collision time interval per byte * + // length of last msg received. + auto dropInterval = std::chrono::duration( + std::get<1>(this->dataPtr->lastMsgReceivedInfo[ + msg->dst_address()])); + + if (timeGap >= dropInterval) + receivedSuccessfully = true; + } + // This message needs to be processed. - _newRegistry[msg->dst_address()].inboundMsgs.push_back(msg); + // Push the msg to inbound of the destination if + // receivedSuccessfully is true, else it is dropped. + if (receivedSuccessfully) + { + _newRegistry[msg->dst_address()].inboundMsgs.push_back(msg); + // Update the (receive time, length of msg) tuple + // for the last msg for this address. + auto blockingTime = std::chrono::duration( + this->dataPtr->collisionTimePerByte * + msg->data().length()); + + this->dataPtr->lastMsgReceivedInfo[msg->dst_address()] = + std::make_tuple(currTimestamp, blockingTime); + } + else + { + // Packet was dropped due to collision. + auto blockingTime = std::chrono::duration( + this->dataPtr->collisionTimePacketDrop + ); + + std::get<1>(this->dataPtr->lastMsgReceivedInfo[ + msg->dst_address()]) += blockingTime; + } + + // Stop keeping track of the position of its source. this->dataPtr->poseSrcAtMsgTimestamp.erase(msg); } else diff --git a/src/systems/acoustic_comms/AcousticComms.hh b/src/systems/acoustic_comms/AcousticComms.hh index a4e6adedd3..f443e3970c 100644 --- a/src/systems/acoustic_comms/AcousticComms.hh +++ b/src/systems/acoustic_comms/AcousticComms.hh @@ -23,6 +23,9 @@ #define GZ_SIM_SYSTEMS_ACOUSTICCOMMS_HH_ #include +#include +#include +#include #include #include "gz/sim/comms/ICommsModel.hh" @@ -49,6 +52,13 @@ namespace systems /// happen beyond this range. Default is 1000. /// * : Speed of sound in the medium (meters/sec). /// Default is 343.0 + /// * : If a subscriber receives a message + /// 'b' bytes long at time 't0', it won't receive + /// and other message till time : + /// 't0 + b * collision_time_per_byte' + /// * : If a packet is dropped at time + /// `t0`, the next packet won't be received until + /// time `t0 + collision_time_packet_drop` /// /// Here's an example: /// /// 6 /// 1400 + /// 0.001 + /// 0.001 /// class AcousticComms: diff --git a/test/integration/acoustic_comms.cc b/test/integration/acoustic_comms.cc index f7ea59db3b..a0d0c4e930 100644 --- a/test/integration/acoustic_comms.cc +++ b/test/integration/acoustic_comms.cc @@ -39,13 +39,16 @@ class AcousticCommsTestDefinition std::string worldFile; std::string srcAddr; std::string dstAddr; - int numMsgs; + int numMsgsSent; + int numMsgsReceived; AcousticCommsTestDefinition( std::string worldFile_, std::string srcAddr_, - std::string dstAddr_, int numMsgs_) : + std::string dstAddr_, int numMsgsSent_, + int numMsgsReceived_): worldFile(worldFile_), srcAddr(srcAddr_), - dstAddr(dstAddr_), numMsgs(numMsgs_) {} + dstAddr(dstAddr_), numMsgsSent(numMsgsSent_), + numMsgsReceived(numMsgsReceived_) {} }; ///////////////////////////////////////////////// @@ -80,7 +83,7 @@ TEST_P(AcousticCommsTestFixture, { // Verify msg content std::lock_guard lock(mutex); - std::string expected = "hello world " + std::to_string(msgCounter); + std::string expected = std::to_string(msgCounter); gz::msgs::StringMsg receivedMsg; receivedMsg.ParseFromString(_msg.data()); @@ -109,11 +112,11 @@ TEST_P(AcousticCommsTestFixture, // Publish some messages. gz::msgs::StringMsg payload; - int pubCount = param.numMsgs; + int pubCount = param.numMsgsSent; for (int i = 0; i < pubCount; ++i) { // Prepare the payload. - payload.set_data("hello world " + std::to_string(i)); + payload.set_data(std::to_string(i)); std::string serializedData; EXPECT_TRUE(payload.SerializeToString(&serializedData)) << payload.DebugString(); @@ -132,11 +135,11 @@ TEST_P(AcousticCommsTestFixture, done = (msgCounter == pubCount) && (pubCount != 0); } } - if (param.numMsgs != 0) + if (param.numMsgsSent != 0 && param.numMsgsSent == param.numMsgsReceived) { EXPECT_TRUE(done); } - EXPECT_EQ(pubCount, msgCounter); + EXPECT_EQ(param.numMsgsReceived, msgCounter); } INSTANTIATE_TEST_SUITE_P( @@ -144,14 +147,20 @@ INSTANTIATE_TEST_SUITE_P( AcousticCommsTestFixture, ::testing::Values( AcousticCommsTestDefinition( - "acoustic_comms.sdf", "addr2", "addr1", 3), + "acoustic_comms.sdf", "addr2", "addr1", 3, 3), + // 3 msgs will be sent, but 0 will be received as + // the distance between endpoints exceeds the max range set in the plugin. AcousticCommsTestDefinition( - "acoustic_comms.sdf", "addr4", "addr3", 0), + "acoustic_comms.sdf", "addr4", "addr3", 3, 0), // The source is moving and the destination is stationary. AcousticCommsTestDefinition( - "acoustic_comms_moving_targets.sdf", "addr2", "addr1", 3), + "acoustic_comms_moving_targets.sdf", "addr2", "addr1", 3, 3), // The source is stationary and the destnation is moving. AcousticCommsTestDefinition( - "acoustic_comms_moving_targets.sdf", "addr4", "addr3", 3) - ) + "acoustic_comms_moving_targets.sdf", "addr4", "addr3", 3, 3), + // Message packets will be dropped as they are sent too fast + // compared to "collision_time_interval". + AcousticCommsTestDefinition( + "acoustic_comms_packet_collision.sdf", "addr2", "addr1", 3, 1) + ) );