Skip to content
This repository has been archived by the owner on Aug 23, 2018. It is now read-only.

Commit

Permalink
Merge pull request #28 from osrf/realtime_connext
Browse files Browse the repository at this point in the history
Realtime testing RTI Connext example
  • Loading branch information
jacquelinekay committed Jul 15, 2015
2 parents b5f942d + 988ef6d commit 96d0648
Show file tree
Hide file tree
Showing 32 changed files with 3,012 additions and 30 deletions.
3 changes: 3 additions & 0 deletions large_message_cpp/dds_lms.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ static void catch_function(int signo) {

int main(int argc, char *argv[])
{
int msgs_received = 0;
/* Register a signal handler so DDS doesn't just sit there... */
if (signal(SIGINT, catch_function) == SIG_ERR)
{
Expand Down Expand Up @@ -111,6 +112,7 @@ int main(int argc, char *argv[])
{
LargeMsg::LargeMessage *msg = &(large_msg_seq[i]);
std::cout << "[" << msg->seq << "]: " << strlen(msg->content.m_ptr) << std::endl;
++msgs_received;
}

status = data_reader->return_loan(large_msg_seq, sample_info_seq);
Expand All @@ -124,6 +126,7 @@ int main(int argc, char *argv[])
#endif
}

std::cout << "Received " << msgs_received << " messages." << std::endl;
/* Shutdown */
{
status = participant->delete_subscriber(subscriber.in());
Expand Down
27 changes: 27 additions & 0 deletions realtime/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
cmake_minimum_required(VERSION 2.8)
project(realtime)
set (CMAKE_CXX_FLAGS "-std=c++11")

find_package(rttest REQUIRED)

link_directories(${rttest_LIBRARY_DIR})
include_directories(${rttest_INCLUDE_DIRS})

find_package(opensplice REQUIRED)
include_directories(${OPENSPLICE_INCLUDE_DIRS})

ospl_add_idl(idl/LargeMsg.idl)

add_executable(dds_lms dds_lms.cpp)
target_link_libraries(dds_lms ${OPENSPLICE_LIBRARIES} ${rttest_LIBRARIES})

add_executable(dds_lmp dds_lmp.cpp)
target_link_libraries(dds_lmp ${OPENSPLICE_LIBRARIES} ${rttest_LIBRARIES})

find_package(Threads)
if(NOT Threads_FOUND)
message(WARNING "pthread not found. Real-time intra-process pub/sub example will not be built.")
elseif(Threads_FOUND)
add_executable(dds_intraproc dds_intraproc.cpp)
target_link_libraries(dds_intraproc ${OPENSPLICE_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} ${rttest_LIBRARIES})
endif()
156 changes: 156 additions & 0 deletions realtime/ExamplePublisher.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
#include "ccpp_dds_dcps.h"
#include "check_status.h"

#include "ccpp_LargeMsg.h"


class ExamplePublisher
{
private:
DDS::Publisher_var publisher;
DDS::DomainParticipantFactory_var dpf;
DDS::DomainParticipant_var participant;
DDS::Topic_var large_message_topic;
DDS::ReturnCode_t status;

public:

DDS::InstanceHandle_t instance_handle;
LargeMsg::LargeMessage msg;
LargeMsg::LargeMessageDataWriter_var data_writer;
int i;

ExamplePublisher(unsigned int message_size = 16777216);
unsigned int message_size;
void callback();
bool init();
bool teardown();
};

ExamplePublisher::ExamplePublisher(unsigned int message_size) : message_size(message_size)
{
}

bool ExamplePublisher::init()
{
this->i = 0;
DDS::DomainId_t domain = DDS::DOMAIN_ID_DEFAULT;
const char * partition_name = "Default";
const char * topic_name = "big_chatter";

/* Create Domain Participant Factory */
this->dpf = DDS::DomainParticipantFactory::get_instance();
checkHandle(dpf.in(), "DDS::DomainParticipantFactory::get_instance");

/* Create Domain Participant */
std::cout << "Creating domain participant in publisher" << std::endl;
this->participant = dpf->create_participant(
domain,
PARTICIPANT_QOS_DEFAULT,
NULL,
DDS::STATUS_MASK_NONE
);
checkHandle(participant.in(), "DDS::DomainParticipantFactory::create_participant");


/* Create a default QoS for Topics */
DDS::TopicQos default_topic_qos;
status = participant->get_default_topic_qos(default_topic_qos);
checkStatus(status, "DDS::DomainParticipant::get_default_topic_qos");
// default_topic_qos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
default_topic_qos.reliability.kind = DDS::BEST_EFFORT_RELIABILITY_QOS;

/* Register the LargeMessage Type */
LargeMsg::LargeMessageTypeSupport_var large_message_ts = new LargeMsg::LargeMessageTypeSupport();
checkHandle(large_message_ts.in(), "new LargeMessageTypeSupport");
const char * large_message_type_name = large_message_ts->get_type_name();
status = large_message_ts->register_type(participant.in(), large_message_type_name);
checkStatus(status, "LargeMsg::LargeMessageTypeSupport::register_type");

/* Setup the Publisher's QoS */
DDS::PublisherQos pub_qos;
status = participant->get_default_publisher_qos(pub_qos);
checkStatus(status, "DDS::DomainParticipant::get_default_publisher_qos");
pub_qos.partition.name.length(1);
pub_qos.partition.name[0] = partition_name;

/* Create the publisher */
this->publisher = participant->create_publisher(pub_qos, NULL, DDS::STATUS_MASK_NONE);
checkHandle(publisher.in(), "DDS::DomainParticipant::create_publisher");

/* Create the Topic */
this->large_message_topic = participant->create_topic(
topic_name,
large_message_type_name,
default_topic_qos,
NULL,
DDS::STATUS_MASK_NONE
);
checkHandle(large_message_topic.in(), "DDS::DomainParticipant::create_topic(LargeMessage)");

/* Create Topic DataWriter */
DDS::DataWriter_var topic_writer = publisher->create_datawriter(
large_message_topic.in(),
DATAWRITER_QOS_USE_TOPIC_QOS,
NULL,
DDS::STATUS_MASK_NONE
);
checkHandle(topic_writer.in(), "DDS::Publisher::create_datawriter(LargeMessage)");

/* Narrow the Topic DataWriter to be only for LargeMessage */
this->data_writer = LargeMsg::LargeMessageDataWriter::_narrow(topic_writer.in());
checkHandle(data_writer.in(), "LargeMsg::LargeMessageDataWriter::_narrow");

this->msg.content = std::string(this->message_size, '.').c_str();
checkHandle(&msg, "new LargeMsg::LargeMessage");
this->instance_handle = this->data_writer->register_instance(msg);
status = data_writer->write(msg, instance_handle);
/*
if (this->instance_handle == DDS::HANDLE_NIL)
{
std::cout << "Instance handle was NIL" << std::endl;
exit(-1);
}
*/

std::cout << "Ready to send LargeMessage's" << std::endl;
return true;
}

void ExamplePublisher::callback()
{
this->msg.seq = this->i;
++i;
for (int j = 0; j < this->message_size; ++j)
{
this->msg.content[j] = i;
}

status = data_writer->write(msg, instance_handle);

checkStatus(status, "LargeMsg::LargeMessageDataWriter::write");
}


bool ExamplePublisher::teardown()
{
std::cout << "Finished" << std::endl;
//getchar();
/* Shutdown */
{
status = this->publisher->delete_datawriter(this->data_writer.in());
checkStatus(this->status, "DDS::Publisher::delete_datawriter(data_writer)");

status = this->participant->delete_publisher(publisher.in());
checkStatus(status, "DDS::DomainParticipant::delete_publisher");

status = participant->delete_topic(this->large_message_topic.in());
checkStatus(status, "DDS::DomainParticipant::delete_topic (large_message_topic)");

//DDS::string_free(this->large_message_type_name);

status = this->dpf->delete_participant(participant.in());
checkStatus(status, "DDS::DomainParticipantFactory::delete_participant");
}
return true;
}
157 changes: 157 additions & 0 deletions realtime/ExampleSubscriber.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
#include "ccpp_dds_dcps.h"
#include "check_status.h"

#include "ccpp_LargeMsg.h"

class ExampleSubscriber
{
private:
DDS::DomainParticipantFactory_var dpf;
DDS::DomainParticipant_var participant;
DDS::Subscriber_var subscriber;
LargeMsg::LargeMessageDataReader_var data_reader;
DDS::Topic_var large_message_topic;
char * large_message_type_name;
LargeMsg::LargeMessageSeq_var large_msg_seq;
DDS::SampleInfoSeq_var sample_info_seq;
bool subscriber_running;
DDS::ReturnCode_t status;
DDS::DomainId_t domain;
const char * partition_name;
const char * topic_name;
DDS::TopicQos default_topic_qos;
LargeMsg::LargeMessageTypeSupport_var large_message_ts;
DDS::SubscriberQos sub_qos;
DDS::DataReader_var topic_reader;

public:
unsigned long msgs_count;
void callback();
bool init();
bool teardown();
};


bool ExampleSubscriber::init()
{
this->msgs_count = 0;
domain = DDS::DOMAIN_ID_DEFAULT;
partition_name = "Default";
topic_name = "big_chatter";

/* Create Domain Participant Factory */
this->dpf = DDS::DomainParticipantFactory::get_instance();
checkHandle(dpf.in(), "DDS::DomainParticipantFactory::get_instance");

/* Create Domain Participant */
std::cout << "Creating domain participant in subscriber" << std::endl;
this->participant = dpf->create_participant(
domain,
PARTICIPANT_QOS_DEFAULT,
NULL,
DDS::STATUS_MASK_NONE
);
checkHandle(participant.in(), "DDS::DomainParticipantFactory::create_participant");


/* Create a default QoS for Topics */
status = participant->get_default_topic_qos(default_topic_qos);
checkStatus(status, "DDS::DomainParticipant::get_default_topic_qos");
// default_topic_qos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
default_topic_qos.reliability.kind = DDS::BEST_EFFORT_RELIABILITY_QOS;

/* Register the LargeMessage Type */
large_message_ts = new LargeMsg::LargeMessageTypeSupport();
checkHandle(large_message_ts.in(), "new LargeMessageTypeSupport");
this->large_message_type_name = large_message_ts->get_type_name();
status = large_message_ts->register_type(participant.in(), large_message_type_name);
checkStatus(status, "LargeMsg::LargeMessageTypeSupport::register_type");

/* Setup the Subscribers's QoS */
status = participant->get_default_subscriber_qos(sub_qos);
checkStatus(status, "DDS::DomainParticipant::get_default_subscriber_qos");
sub_qos.partition.name.length(1);
sub_qos.partition.name[0] = partition_name;

/* Create the subscriber */
this->subscriber = participant->create_subscriber(
sub_qos,
NULL,
DDS::STATUS_MASK_NONE
);
checkHandle(subscriber.in(), "DDS::DomainParticipant::create_subscriber");

/* Create the Topic */
this->large_message_topic = participant->create_topic(
topic_name,
large_message_type_name,
default_topic_qos,
NULL,
DDS::STATUS_MASK_NONE
);
checkHandle(large_message_topic.in(), "DDS::DomainParticipant::create_topic(LargeMessage)");

/* Create Topic specific DataReader */
topic_reader = subscriber->create_datareader(
large_message_topic.in(),
DATAREADER_QOS_USE_TOPIC_QOS,
NULL,
DDS::STATUS_MASK_NONE
);
checkHandle(topic_reader.in(), "DDS::Subscriber::create_datareader");

/* Narrow topic_reader down to LargeMessage specific DataReader */
this->data_reader = LargeMsg::LargeMessageDataReader::_narrow(topic_reader.in());
checkHandle(data_reader.in(), "LargeMsg::LargeMessageDataReader::_narrow");

this->large_msg_seq = new LargeMsg::LargeMessageSeq();
this->sample_info_seq = new DDS::SampleInfoSeq();

std::cout << "Polling DataReader..." << std::endl;
return true;
}

void ExampleSubscriber::callback()
{
status = data_reader->take(
large_msg_seq,
sample_info_seq,
DDS::LENGTH_UNLIMITED,
DDS::ANY_SAMPLE_STATE,
DDS::ANY_VIEW_STATE,
DDS::ALIVE_INSTANCE_STATE
);
checkStatus(status, "LargeMsg::LargeMessageDataReader::take");

for (DDS::ULong i = 0; i < large_msg_seq->length(); i++)
{
//msg = &(large_msg_seq[i]);
++this->msgs_count;
}

status = data_reader->return_loan(large_msg_seq, sample_info_seq);
checkStatus(status, "LargeMsg::LargeMessageDataReader::return_loan");
}

bool ExampleSubscriber::teardown()
{
/* Shutdown */
if (participant != NULL)
{
if (subscriber.in() != NULL) {
status = participant->delete_subscriber(subscriber.in());
checkStatus(status, "DDS::DomainParticipant::delete_subscriber");
}

status = participant->delete_topic(large_message_topic.in());
checkStatus(status, "DDS::DomainParticipant::delete_topic (large_message_topic)");

status = dpf->delete_participant(participant.in());
checkStatus(status, "DDS::DomainParticipantFactory::delete_participant");
}

DDS::string_free(large_message_type_name);

return true;
}

Loading

0 comments on commit 96d0648

Please sign in to comment.