Skip to content

Commit

Permalink
Fixed missing callbacks for pending events
Browse files Browse the repository at this point in the history
When creating an Entity with a Listener, any instantaneous events that occurred during creation would miss their callback.
This was caused by the C++ API not being able to wrap the underlying C API with a C++ object prior to C already trying to invoking the callback.
That is now resolved by not setting the Listener at creation time, but by setting it after successfully putting a C++ wrapper around the underlying C Entity.
This should fix issue #410
A prerequisite for applying this fix is to make sure pull request #eclipse-cyclonedds/cyclonedds#1717 is applied to the cyclonedds repository.

Signed-off-by: Erik Hendriks <[email protected]>
  • Loading branch information
e-hndrks authored and eboasson committed Jul 4, 2023
1 parent 02abba3 commit 0bdf3a0
Show file tree
Hide file tree
Showing 12 changed files with 167 additions and 182 deletions.
4 changes: 1 addition & 3 deletions src/ddscxx/include/dds/pub/detail/DataWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ class dds::pub::detail::DataWriter : public ::org::eclipse::cyclonedds::pub::Any

DataWriter(const dds::pub::Publisher& pub,
const ::dds::topic::Topic<T>& topic,
const dds::pub::qos::DataWriterQos& qos,
dds::pub::DataWriterListener<T>* listener,
const dds::core::status::StatusMask& mask);
const dds::pub::qos::DataWriterQos& qos);

virtual ~DataWriter();

Expand Down
48 changes: 8 additions & 40 deletions src/ddscxx/include/dds/pub/detail/DataWriterImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ DataWriter<T, DELEGATE>::DataWriter(
const dds::pub::Publisher& pub,
const dds::topic::Topic<T>& topic) :
dds::core::Reference< DELEGATE<T> >(
new DELEGATE<T>(pub, topic, pub.default_datawriter_qos(), NULL, dds::core::status::StatusMask::none()))
new DELEGATE<T>(pub, topic, pub.default_datawriter_qos()))
{
this->delegate()->listener(NULL, dds::core::status::StatusMask::none());
this->delegate()->init(this->impl_);
}

Expand All @@ -52,8 +53,9 @@ DataWriter<T, DELEGATE>::DataWriter(const dds::pub::Publisher& pub,
dds::pub::DataWriterListener<T>* listener,
const dds::core::status::StatusMask& mask) :
dds::core::Reference< DELEGATE<T> >(
new DELEGATE<T>(pub, topic, qos, listener, mask))
new DELEGATE<T>(pub, topic, qos))
{
this->delegate()->listener(listener, mask);
this->delegate()->init(this->impl_);
}

Expand Down Expand Up @@ -363,9 +365,7 @@ template <typename T>
dds::pub::detail::DataWriter<T>::DataWriter(
const dds::pub::Publisher& pub,
const ::dds::topic::Topic<T>& topic,
const dds::pub::qos::DataWriterQos& qos,
dds::pub::DataWriterListener<T>* listener,
const dds::core::status::StatusMask& mask)
const dds::pub::qos::DataWriterQos& qos)
: ::org::eclipse::cyclonedds::pub::AnyDataWriterDelegate(qos, topic), pub_(pub), topic_(topic)
{
DDSCXX_WARNING_MSVC_OFF(6326)
Expand All @@ -385,7 +385,7 @@ dds::pub::detail::DataWriter<T>::DataWriter(

std::string name = topic.name() + "_datawriter";

this->listener(listener, mask);
this->listener_set(nullptr, dds::core::status::StatusMask::all(), false);
dds_entity_t ddsc_writer = dds_create_writer (ddsc_pub, ddsc_topic, ddsc_qos, this->listener_callbacks);
dds_delete_qos(ddsc_qos);
ISOCPP_DDSC_RESULT_CHECK_AND_THROW(ddsc_writer, "Could not create DataWriter.");
Expand Down Expand Up @@ -417,38 +417,6 @@ dds::pub::detail::DataWriter<T>::init(ObjectDelegate::weak_ref_type weak_ref)
/* Register writer at publisher. */
this->pub_.delegate()->add_datawriter(*this);

// Because listeners are added after writer is created (which is in enabled state, because
// disabled state is not yet supported), events could have occured before listeners were
// registered. Therefore the event handlers for those events are called here.
if (this->listener_get()) {
dds::core::status::StatusMask writerStatus = status_changes();

if (listener_mask.to_ulong() & dds::core::status::StatusMask::liveliness_lost().to_ulong()
&& writerStatus.test(DDS_LIVELINESS_LOST_STATUS_ID))
{
dds::core::status::LivelinessLostStatus status = liveliness_lost_status();
on_liveliness_lost(this->ddsc_entity, status);
}
if (listener_mask.to_ulong() & dds::core::status::StatusMask::offered_deadline_missed().to_ulong()
&& writerStatus.test(DDS_OFFERED_DEADLINE_MISSED_STATUS_ID))
{
dds::core::status::OfferedDeadlineMissedStatus status = offered_deadline_missed_status();
on_offered_deadline_missed(this->ddsc_entity, status);
}
if (listener_mask.to_ulong() & dds::core::status::StatusMask::offered_incompatible_qos().to_ulong()
&& writerStatus.test(DDS_OFFERED_INCOMPATIBLE_QOS_STATUS_ID))
{
dds::core::status::OfferedIncompatibleQosStatus status = offered_incompatible_qos_status();
on_offered_incompatible_qos(this->ddsc_entity, status);
}
if (listener_mask.to_ulong() & dds::core::status::StatusMask::publication_matched().to_ulong()
&& writerStatus.test(DDS_PUBLICATION_MATCHED_STATUS_ID))
{
dds::core::status::PublicationMatchedStatus status = publication_matched_status();
on_publication_matched(this->ddsc_entity, status);
}
}

/* Enable when needed. */
if (this->pub_.delegate()->is_auto_enable()) {
this->enable();
Expand Down Expand Up @@ -878,7 +846,7 @@ dds::pub::detail::DataWriter<T>::listener(DataWriterListener<T>* listener,
{
org::eclipse::cyclonedds::core::ScopedObjectLock scopedLock(*this);
this->check();
this->listener_set(listener, mask);
this->listener_set(listener, mask, true);
}

template <typename T>
Expand All @@ -896,7 +864,7 @@ dds::pub::detail::DataWriter<T>::close()
{
org::eclipse::cyclonedds::core::ScopedObjectLock scopedLock(*this);

this->listener_set(NULL, dds::core::status::StatusMask::none());
this->listener_set(NULL, dds::core::status::StatusMask::none(), true);

topic_.delegate()->decrNrDependents();

Expand Down
11 changes: 3 additions & 8 deletions src/ddscxx/include/dds/sub/detail/DataReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,13 @@ class dds::sub::detail::DataReader : public ::org::eclipse::cyclonedds::sub::Any

DataReader(const dds::sub::Subscriber& sub,
const dds::topic::Topic<T>& topic,
const dds::sub::qos::DataReaderQos& qos,
dds::sub::DataReaderListener<T>* listener = NULL,
const dds::core::status::StatusMask& mask = ::dds::core::status::StatusMask::none());
const dds::sub::qos::DataReaderQos& qos);

DataReader(const dds::sub::Subscriber& sub,
const dds::topic::ContentFilteredTopic<T, dds::topic::detail::ContentFilteredTopic>& topic,
const dds::sub::qos::DataReaderQos& qos,
dds::sub::DataReaderListener<T>* listener = NULL,
const dds::core::status::StatusMask& mask = ::dds::core::status::StatusMask::none());
const dds::sub::qos::DataReaderQos& qos);

void common_constructor(dds::sub::DataReaderListener<T>* listener,
const dds::core::status::StatusMask& mask);
void common_constructor();

virtual ~DataReader();

Expand Down
34 changes: 17 additions & 17 deletions src/ddscxx/include/dds/sub/detail/TDataReaderImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ DataReader<T, DELEGATE>::DataReader(
const dds::topic::Topic<T>& topic):
::dds::core::Reference< DELEGATE<T> >(new DELEGATE<T>(sub, topic, sub->default_datareader_qos()))
{
this->delegate()->listener(NULL, dds::core::status::StatusMask::none());
this->delegate()->init(this->impl_);
}

Expand All @@ -238,8 +239,9 @@ DataReader<T, DELEGATE>::DataReader(
const dds::sub::qos::DataReaderQos& qos,
dds::sub::DataReaderListener<T>* listener,
const dds::core::status::StatusMask& mask) :
::dds::core::Reference< DELEGATE<T> >(new DELEGATE<T>(sub, topic, qos, listener, mask))
::dds::core::Reference< DELEGATE<T> >(new DELEGATE<T>(sub, topic, qos))
{
this->delegate()->listener(listener, mask);
this->delegate()->init(this->impl_);
}

Expand All @@ -250,6 +252,7 @@ DataReader<T, DELEGATE>::DataReader(
const dds::topic::ContentFilteredTopic<T>& topic) :
::dds::core::Reference< DELEGATE<T> >(new DELEGATE<T>(sub, topic, sub.default_datareader_qos()))
{
this->delegate()->listener(NULL, dds::core::status::StatusMask::none());
this->delegate()->init(this->impl_);
}

Expand All @@ -260,8 +263,9 @@ DataReader<T, DELEGATE>::DataReader(
const dds::sub::qos::DataReaderQos& qos,
dds::sub::DataReaderListener<T>* listener,
const dds::core::status::StatusMask& mask) :
::dds::core::Reference< DELEGATE<T> >(new DELEGATE<T>(sub, topic, qos, listener, mask))
::dds::core::Reference< DELEGATE<T> >(new DELEGATE<T>(sub, topic, qos))
{
this->delegate()->listener(listener, mask);
this->delegate()->init(this->impl_);
}
#endif /* OMG_DDS_CONTENT_SUBSCRIPTION_SUPPORT */
Expand All @@ -273,6 +277,7 @@ DataReader<T, DELEGATE>::DataReader(
const dds::topic::MultiTopic<T>& topic) :
::dds::core::Reference< DELEGATE<T> >(new DELEGATE<T>(sub, topic))
{
this->delegate()->listener(NULL, dds::core::status::StatusMask::none());
this->delegate()->init(this->impl_);
}

Expand All @@ -283,8 +288,9 @@ DataReader<T, DELEGATE>::DataReader(
const dds::sub::qos::DataReaderQos& qos,
dds::sub::DataReaderListener<T>* listener,
const dds::core::status::StatusMask& mask) :
::dds::core::Reference< DELEGATE<T> >(new DELEGATE<T>(sub, topic, qos, listener, mask))
::dds::core::Reference< DELEGATE<T> >(new DELEGATE<T>(sub, topic, qos))
{
this->delegate()->listener(listener, mask);
this->delegate()->init(this->impl_);
}
#endif /* OMG_DDS_MULTI_TOPIC_SUPPORT */
Expand Down Expand Up @@ -448,33 +454,27 @@ DataReader<T, DELEGATE>::listener() const
template <typename T>
dds::sub::detail::DataReader<T>::DataReader(const dds::sub::Subscriber& sub,
const dds::topic::Topic<T>& topic,
const dds::sub::qos::DataReaderQos& qos,
dds::sub::DataReaderListener<T>* listener,
const dds::core::status::StatusMask& mask)
const dds::sub::qos::DataReaderQos& qos)
: ::org::eclipse::cyclonedds::sub::AnyDataReaderDelegate(qos, topic), sub_(sub),
typed_sample_()
{
common_constructor(listener, mask);
common_constructor();
}

template <typename T>
dds::sub::detail::DataReader<T>::DataReader(const dds::sub::Subscriber& sub,
const dds::topic::ContentFilteredTopic<T, dds::topic::detail::ContentFilteredTopic>& topic,
const dds::sub::qos::DataReaderQos& qos,
dds::sub::DataReaderListener<T>* listener,
const dds::core::status::StatusMask& mask)
const dds::sub::qos::DataReaderQos& qos)
: ::org::eclipse::cyclonedds::sub::AnyDataReaderDelegate(qos, topic), sub_(sub),
typed_sample_()

{
common_constructor(listener, mask);
common_constructor();
}

template <typename T>
void
dds::sub::detail::DataReader<T>::common_constructor(
dds::sub::DataReaderListener<T>* listener,
const dds::core::status::StatusMask& mask)
dds::sub::detail::DataReader<T>::common_constructor()
{
DDSCXX_WARNING_MSVC_OFF(4127)
DDSCXX_WARNING_MSVC_OFF(6326)
Expand All @@ -498,7 +498,7 @@ dds::sub::detail::DataReader<T>::common_constructor(
c_value *params = this->AnyDataReaderDelegate::td_.delegate()->reader_parameters();
#endif

this->listener(listener, mask);
this->listener_set(nullptr, dds::core::status::StatusMask::all(), false);
dds_entity_t ddsc_reader = dds_create_reader(ddsc_sub, ddsc_top, ddsc_qos, this->listener_callbacks);
dds_delete_qos(ddsc_qos);
ISOCPP_DDSC_RESULT_CHECK_AND_THROW(ddsc_reader, "Could not create DataReader.");
Expand Down Expand Up @@ -757,7 +757,7 @@ dds::sub::detail::DataReader<T>::close()
this->prevent_callbacks();
org::eclipse::cyclonedds::core::ScopedObjectLock scopedLock(*this);

this->listener_set(NULL, dds::core::status::StatusMask::none());
this->listener_set(NULL, dds::core::status::StatusMask::none(), true);

this->sub_.delegate()->remove_datareader(*this);

Expand Down Expand Up @@ -790,7 +790,7 @@ dds::sub::detail::DataReader<T>::listener(
const dds::core::status::StatusMask& event_mask)
{
org::eclipse::cyclonedds::core::ScopedObjectLock scopedLock(*this);
this->listener_set( l, event_mask ) ;
this->listener_set( l, event_mask, true ) ;
scopedLock.unlock();
}

Expand Down
4 changes: 2 additions & 2 deletions src/ddscxx/include/dds/topic/detail/TTopicImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ dds::topic::detail::Topic<T>::close()
ISOCPP_THROW_EXCEPTION(ISOCPP_PRECONDITION_NOT_MET_ERROR, "Topic still has unclosed dependencies (e.g. Readers/Writers/ContentFilteredTopics)");
}

this->listener_set(NULL, dds::core::status::StatusMask::none());
this->listener_set(NULL, dds::core::status::StatusMask::none(), true);

this->myParticipant.delegate()->remove_topic(*this);

Expand Down Expand Up @@ -260,7 +260,7 @@ dds::topic::detail::Topic<T>::listener(TopicListener<T>* listener,
const ::dds::core::status::StatusMask& mask)
{
org::eclipse::cyclonedds::core::ScopedObjectLock scopedLock(*this);
this->listener_set(listener, mask);
this->listener_set(listener, mask, true);
scopedLock.unlock();
}

Expand Down
14 changes: 12 additions & 2 deletions src/ddscxx/include/org/eclipse/cyclonedds/core/EntityDelegate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ namespace cyclonedds
{
namespace core
{
class OMG_DDS_API EntityDelegate;

struct ListenerArg {
EntityDelegate *cpp_ref;
bool reset_on_invoke;

ListenerArg(EntityDelegate *cpp_ref_, bool reset_on_invoke_);
};

class OMG_DDS_API EntityDelegate :
public virtual ::org::eclipse::cyclonedds::core::DDScObjectDelegate
Expand Down Expand Up @@ -62,7 +70,10 @@ class OMG_DDS_API EntityDelegate :

protected:
void listener_set(void *listener,
const dds::core::status::StatusMask& mask);
const dds::core::status::StatusMask& mask,
bool reset_on_invoke);

void prevent_callbacks();

public:
const dds::core::status::StatusMask get_listener_mask() const ;
Expand Down Expand Up @@ -118,7 +129,6 @@ class OMG_DDS_API EntityDelegate :
static volatile unsigned int entityID_;
bool enabled_;
dds::core::status::StatusMask listener_mask;
void prevent_callbacks();
long callback_count;
dds_listener_t *listener_callbacks;

Expand Down
Loading

0 comments on commit 0bdf3a0

Please sign in to comment.