diff --git a/large_message_cpp/dds_lms.cpp b/large_message_cpp/dds_lms.cpp index eec6999..1761916 100644 --- a/large_message_cpp/dds_lms.cpp +++ b/large_message_cpp/dds_lms.cpp @@ -14,6 +14,7 @@ static void catch_function(int signo) { int main(int argc, char *argv[]) { + int msgs_received = 0; /* Register a signal handler so DDS doesn't just sit there... */ if (signal(SIGINT, catch_function) == SIG_ERR) { @@ -111,6 +112,7 @@ int main(int argc, char *argv[]) { LargeMsg::LargeMessage *msg = &(large_msg_seq[i]); std::cout << "[" << msg->seq << "]: " << strlen(msg->content.m_ptr) << std::endl; + ++msgs_received; } status = data_reader->return_loan(large_msg_seq, sample_info_seq); @@ -124,6 +126,7 @@ int main(int argc, char *argv[]) #endif } + std::cout << "Received " << msgs_received << " messages." << std::endl; /* Shutdown */ { status = participant->delete_subscriber(subscriber.in()); diff --git a/realtime/CMakeLists.txt b/realtime/CMakeLists.txt new file mode 100644 index 0000000..8a6b33e --- /dev/null +++ b/realtime/CMakeLists.txt @@ -0,0 +1,27 @@ +cmake_minimum_required(VERSION 2.8) +project(realtime) +set (CMAKE_CXX_FLAGS "-std=c++11") + +find_package(rttest REQUIRED) + +link_directories(${rttest_LIBRARY_DIR}) +include_directories(${rttest_INCLUDE_DIRS}) + +find_package(opensplice REQUIRED) +include_directories(${OPENSPLICE_INCLUDE_DIRS}) + +ospl_add_idl(idl/LargeMsg.idl) + +add_executable(dds_lms dds_lms.cpp) +target_link_libraries(dds_lms ${OPENSPLICE_LIBRARIES} ${rttest_LIBRARIES}) + +add_executable(dds_lmp dds_lmp.cpp) +target_link_libraries(dds_lmp ${OPENSPLICE_LIBRARIES} ${rttest_LIBRARIES}) + +find_package(Threads) +if(NOT Threads_FOUND) + message(WARNING "pthread not found. Real-time intra-process pub/sub example will not be built.") +elseif(Threads_FOUND) + add_executable(dds_intraproc dds_intraproc.cpp) + target_link_libraries(dds_intraproc ${OPENSPLICE_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} ${rttest_LIBRARIES}) +endif() diff --git a/realtime/ExamplePublisher.hpp b/realtime/ExamplePublisher.hpp new file mode 100644 index 0000000..6abbbf2 --- /dev/null +++ b/realtime/ExamplePublisher.hpp @@ -0,0 +1,156 @@ +#include "ccpp_dds_dcps.h" +#include "check_status.h" + +#include "ccpp_LargeMsg.h" + + +class ExamplePublisher +{ + private: + DDS::Publisher_var publisher; + DDS::DomainParticipantFactory_var dpf; + DDS::DomainParticipant_var participant; + DDS::Topic_var large_message_topic; + DDS::ReturnCode_t status; + + public: + + DDS::InstanceHandle_t instance_handle; + LargeMsg::LargeMessage msg; + LargeMsg::LargeMessageDataWriter_var data_writer; + int i; + + ExamplePublisher(unsigned int message_size = 16777216); + unsigned int message_size; + void callback(); + bool init(); + bool teardown(); +}; + +ExamplePublisher::ExamplePublisher(unsigned int message_size) : message_size(message_size) +{ +} + +bool ExamplePublisher::init() +{ + this->i = 0; + DDS::DomainId_t domain = DDS::DOMAIN_ID_DEFAULT; + const char * partition_name = "Default"; + const char * topic_name = "big_chatter"; + + /* Create Domain Participant Factory */ + this->dpf = DDS::DomainParticipantFactory::get_instance(); + checkHandle(dpf.in(), "DDS::DomainParticipantFactory::get_instance"); + + /* Create Domain Participant */ + std::cout << "Creating domain participant in publisher" << std::endl; + this->participant = dpf->create_participant( + domain, + PARTICIPANT_QOS_DEFAULT, + NULL, + DDS::STATUS_MASK_NONE + ); + checkHandle(participant.in(), "DDS::DomainParticipantFactory::create_participant"); + + + /* Create a default QoS for Topics */ + DDS::TopicQos default_topic_qos; + status = participant->get_default_topic_qos(default_topic_qos); + checkStatus(status, "DDS::DomainParticipant::get_default_topic_qos"); + // default_topic_qos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS; + default_topic_qos.reliability.kind = DDS::BEST_EFFORT_RELIABILITY_QOS; + + /* Register the LargeMessage Type */ + LargeMsg::LargeMessageTypeSupport_var large_message_ts = new LargeMsg::LargeMessageTypeSupport(); + checkHandle(large_message_ts.in(), "new LargeMessageTypeSupport"); + const char * large_message_type_name = large_message_ts->get_type_name(); + status = large_message_ts->register_type(participant.in(), large_message_type_name); + checkStatus(status, "LargeMsg::LargeMessageTypeSupport::register_type"); + + /* Setup the Publisher's QoS */ + DDS::PublisherQos pub_qos; + status = participant->get_default_publisher_qos(pub_qos); + checkStatus(status, "DDS::DomainParticipant::get_default_publisher_qos"); + pub_qos.partition.name.length(1); + pub_qos.partition.name[0] = partition_name; + + /* Create the publisher */ + this->publisher = participant->create_publisher(pub_qos, NULL, DDS::STATUS_MASK_NONE); + checkHandle(publisher.in(), "DDS::DomainParticipant::create_publisher"); + + /* Create the Topic */ + this->large_message_topic = participant->create_topic( + topic_name, + large_message_type_name, + default_topic_qos, + NULL, + DDS::STATUS_MASK_NONE + ); + checkHandle(large_message_topic.in(), "DDS::DomainParticipant::create_topic(LargeMessage)"); + + /* Create Topic DataWriter */ + DDS::DataWriter_var topic_writer = publisher->create_datawriter( + large_message_topic.in(), + DATAWRITER_QOS_USE_TOPIC_QOS, + NULL, + DDS::STATUS_MASK_NONE + ); + checkHandle(topic_writer.in(), "DDS::Publisher::create_datawriter(LargeMessage)"); + + /* Narrow the Topic DataWriter to be only for LargeMessage */ + this->data_writer = LargeMsg::LargeMessageDataWriter::_narrow(topic_writer.in()); + checkHandle(data_writer.in(), "LargeMsg::LargeMessageDataWriter::_narrow"); + + this->msg.content = std::string(this->message_size, '.').c_str(); + checkHandle(&msg, "new LargeMsg::LargeMessage"); + this->instance_handle = this->data_writer->register_instance(msg); + status = data_writer->write(msg, instance_handle); + /* + if (this->instance_handle == DDS::HANDLE_NIL) + { + std::cout << "Instance handle was NIL" << std::endl; + exit(-1); + } + */ + + std::cout << "Ready to send LargeMessage's" << std::endl; + return true; +} + +void ExamplePublisher::callback() +{ + this->msg.seq = this->i; + ++i; + for (int j = 0; j < this->message_size; ++j) + { + this->msg.content[j] = i; + } + + status = data_writer->write(msg, instance_handle); + + checkStatus(status, "LargeMsg::LargeMessageDataWriter::write"); +} + + +bool ExamplePublisher::teardown() +{ + std::cout << "Finished" << std::endl; + //getchar(); + /* Shutdown */ + { + status = this->publisher->delete_datawriter(this->data_writer.in()); + checkStatus(this->status, "DDS::Publisher::delete_datawriter(data_writer)"); + + status = this->participant->delete_publisher(publisher.in()); + checkStatus(status, "DDS::DomainParticipant::delete_publisher"); + + status = participant->delete_topic(this->large_message_topic.in()); + checkStatus(status, "DDS::DomainParticipant::delete_topic (large_message_topic)"); + + //DDS::string_free(this->large_message_type_name); + + status = this->dpf->delete_participant(participant.in()); + checkStatus(status, "DDS::DomainParticipantFactory::delete_participant"); + } + return true; +} diff --git a/realtime/ExampleSubscriber.hpp b/realtime/ExampleSubscriber.hpp new file mode 100644 index 0000000..3aa0513 --- /dev/null +++ b/realtime/ExampleSubscriber.hpp @@ -0,0 +1,157 @@ +#include "ccpp_dds_dcps.h" +#include "check_status.h" + +#include "ccpp_LargeMsg.h" + +class ExampleSubscriber +{ + private: + DDS::DomainParticipantFactory_var dpf; + DDS::DomainParticipant_var participant; + DDS::Subscriber_var subscriber; + LargeMsg::LargeMessageDataReader_var data_reader; + DDS::Topic_var large_message_topic; + char * large_message_type_name; + LargeMsg::LargeMessageSeq_var large_msg_seq; + DDS::SampleInfoSeq_var sample_info_seq; + bool subscriber_running; + DDS::ReturnCode_t status; + DDS::DomainId_t domain; + const char * partition_name; + const char * topic_name; + DDS::TopicQos default_topic_qos; + LargeMsg::LargeMessageTypeSupport_var large_message_ts; + DDS::SubscriberQos sub_qos; + DDS::DataReader_var topic_reader; + + public: + unsigned long msgs_count; + void callback(); + bool init(); + bool teardown(); +}; + + +bool ExampleSubscriber::init() +{ + this->msgs_count = 0; + domain = DDS::DOMAIN_ID_DEFAULT; + partition_name = "Default"; + topic_name = "big_chatter"; + + /* Create Domain Participant Factory */ + this->dpf = DDS::DomainParticipantFactory::get_instance(); + checkHandle(dpf.in(), "DDS::DomainParticipantFactory::get_instance"); + + /* Create Domain Participant */ + std::cout << "Creating domain participant in subscriber" << std::endl; + this->participant = dpf->create_participant( + domain, + PARTICIPANT_QOS_DEFAULT, + NULL, + DDS::STATUS_MASK_NONE + ); + checkHandle(participant.in(), "DDS::DomainParticipantFactory::create_participant"); + + + /* Create a default QoS for Topics */ + status = participant->get_default_topic_qos(default_topic_qos); + checkStatus(status, "DDS::DomainParticipant::get_default_topic_qos"); + // default_topic_qos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS; + default_topic_qos.reliability.kind = DDS::BEST_EFFORT_RELIABILITY_QOS; + + /* Register the LargeMessage Type */ + large_message_ts = new LargeMsg::LargeMessageTypeSupport(); + checkHandle(large_message_ts.in(), "new LargeMessageTypeSupport"); + this->large_message_type_name = large_message_ts->get_type_name(); + status = large_message_ts->register_type(participant.in(), large_message_type_name); + checkStatus(status, "LargeMsg::LargeMessageTypeSupport::register_type"); + + /* Setup the Subscribers's QoS */ + status = participant->get_default_subscriber_qos(sub_qos); + checkStatus(status, "DDS::DomainParticipant::get_default_subscriber_qos"); + sub_qos.partition.name.length(1); + sub_qos.partition.name[0] = partition_name; + + /* Create the subscriber */ + this->subscriber = participant->create_subscriber( + sub_qos, + NULL, + DDS::STATUS_MASK_NONE + ); + checkHandle(subscriber.in(), "DDS::DomainParticipant::create_subscriber"); + + /* Create the Topic */ + this->large_message_topic = participant->create_topic( + topic_name, + large_message_type_name, + default_topic_qos, + NULL, + DDS::STATUS_MASK_NONE + ); + checkHandle(large_message_topic.in(), "DDS::DomainParticipant::create_topic(LargeMessage)"); + + /* Create Topic specific DataReader */ + topic_reader = subscriber->create_datareader( + large_message_topic.in(), + DATAREADER_QOS_USE_TOPIC_QOS, + NULL, + DDS::STATUS_MASK_NONE + ); + checkHandle(topic_reader.in(), "DDS::Subscriber::create_datareader"); + + /* Narrow topic_reader down to LargeMessage specific DataReader */ + this->data_reader = LargeMsg::LargeMessageDataReader::_narrow(topic_reader.in()); + checkHandle(data_reader.in(), "LargeMsg::LargeMessageDataReader::_narrow"); + + this->large_msg_seq = new LargeMsg::LargeMessageSeq(); + this->sample_info_seq = new DDS::SampleInfoSeq(); + + std::cout << "Polling DataReader..." << std::endl; + return true; +} + +void ExampleSubscriber::callback() +{ + status = data_reader->take( + large_msg_seq, + sample_info_seq, + DDS::LENGTH_UNLIMITED, + DDS::ANY_SAMPLE_STATE, + DDS::ANY_VIEW_STATE, + DDS::ALIVE_INSTANCE_STATE + ); + checkStatus(status, "LargeMsg::LargeMessageDataReader::take"); + + for (DDS::ULong i = 0; i < large_msg_seq->length(); i++) + { + //msg = &(large_msg_seq[i]); + ++this->msgs_count; + } + + status = data_reader->return_loan(large_msg_seq, sample_info_seq); + checkStatus(status, "LargeMsg::LargeMessageDataReader::return_loan"); +} + +bool ExampleSubscriber::teardown() +{ + /* Shutdown */ + if (participant != NULL) + { + if (subscriber.in() != NULL) { + status = participant->delete_subscriber(subscriber.in()); + checkStatus(status, "DDS::DomainParticipant::delete_subscriber"); + } + + status = participant->delete_topic(large_message_topic.in()); + checkStatus(status, "DDS::DomainParticipant::delete_topic (large_message_topic)"); + + status = dpf->delete_participant(participant.in()); + checkStatus(status, "DDS::DomainParticipantFactory::delete_participant"); + } + + DDS::string_free(large_message_type_name); + + return true; +} + diff --git a/realtime/check_status.h b/realtime/check_status.h new file mode 100644 index 0000000..2f5b804 --- /dev/null +++ b/realtime/check_status.h @@ -0,0 +1,46 @@ +#ifndef __CHECKSTATUS_H__ +#define __CHECKSTATUS_H__ + +#include "ccpp_dds_dcps.h" +#include + +const char *RetCodeName[13] = { + "DDS_RETCODE_OK", + "DDS_RETCODE_ERROR", + "DDS_RETCODE_UNSUPPORTED", + "DDS_RETCODE_BAD_PARAMETER", + "DDS_RETCODE_PRECONDITION_NOT_MET", + "DDS_RETCODE_OUT_OF_RESOURCES", + "DDS_RETCODE_NOT_ENABLED", + "DDS_RETCODE_IMMUTABLE_POLICY", + "DDS_RETCODE_INCONSISTENT_POLICY", + "DDS_RETCODE_ALREADY_DELETED", + "DDS_RETCODE_TIMEOUT", + "DDS_RETCODE_NO_DATA", + "DDS_RETCODE_ILLEGAL_OPERATION" +}; + +const char * getErrorName(DDS::ReturnCode_t status) +{ + return RetCodeName[status]; +} + +void checkStatus(DDS::ReturnCode_t status, const char *info) +{ + if (status != DDS::RETCODE_OK && status != DDS::RETCODE_NO_DATA) + { + std::cerr << "Error in " << info << ": " << getErrorName(status) << std::endl; + exit (1); + } +} + +void checkHandle(void *handle, const char *info) +{ + if (!handle) + { + std::cerr << "Error in " << info << ": Creation failed: invalid handle" << std::endl; + exit (1); + } +} + +#endif diff --git a/realtime/dds_intraproc.cpp b/realtime/dds_intraproc.cpp new file mode 100644 index 0000000..cc2b41d --- /dev/null +++ b/realtime/dds_intraproc.cpp @@ -0,0 +1,133 @@ +#include +#include +#include + +#include +#include "ExampleSubscriber.hpp" +#include "ExamplePublisher.hpp" + +ExamplePublisher *pub; +ExampleSubscriber *sub; +unsigned int message_length; + +static void start_rt_thread(void *(*f)(void*)) +{ + pthread_t thread; + pthread_attr_t attr; + + /* init to default values */ + if (pthread_attr_init(&attr)) + { + fprintf(stderr, "Couldn't initialize pthread to default value"); + exit(1); + } + + /* Set the requested stacksize for this thread */ + // put this in rttest? + if (pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN + 2*message_length + sizeof(*sub))) + { + fprintf(stderr, "Couldn't set requested stack size for pthread"); + exit(1); + } + + /* And finally start the actual thread */ + pthread_create(&thread, &attr, f, NULL); +} + +void* pub_callback(void * unused) +{ + if (pub != NULL) + pub->callback(); +} + +void* sub_callback(void * unused) +{ + if (sub != NULL) + sub->callback(); +} + +void *publisher_thread(void *unused) +{ + rttest_spin(pub_callback, NULL); + + rttest_write_results_file("rttest_publisher_results"); + if (pub != NULL) + pub->teardown(); + rttest_finish(); +} + +void *subscriber_thread(void *unused) +{ + rttest_init_new_thread(); + if (rttest_set_sched_priority(98, SCHED_RR) != 0) + { + perror("Failed to set scheduling priorty and policy of thread"); + } + + rttest_spin(sub_callback, NULL); + + rttest_write_results_file("rttest_subscriber_results"); + std::cout << "Subscriber received " << sub->msgs_count << " messages." << std::endl; + /*if (sub != NULL) + sub->teardown();*/ + rttest_finish(); +} + +int main(int argc, char *argv[]) +{ + int c; + + // l stands for message length + opterr = 0; + optind = 1; + int argc_copy = argc; + char *argv_copy[argc]; + for (int i = 0; i < argc; ++i) + { + size_t len = strlen(argv[i]); + argv_copy[i] = (char*) malloc(len); + memcpy(argv_copy[i], argv[i], len); + } + + while ((c = getopt(argc_copy, argv_copy, "l:")) != -1) + { + switch(c) + { + case 'l': + message_length = std::stoul(std::string(optarg)); + break; + default: + break; + } + } + rttest_read_args(argc, argv); + + pub = new ExamplePublisher(); + pub->message_size = message_length; + pub->init(); + sub = new ExampleSubscriber(); + sub->init(); + + if (rttest_lock_memory() != 0) + { + perror("Failed to lock dynamic memory"); + } + rttest_prefault_stack(); + + start_rt_thread(&subscriber_thread); + + if (rttest_set_sched_priority(98, SCHED_RR) != 0) + { + perror("Failed to set scheduling priority and policy"); + } + + publisher_thread(NULL); + /* + for (int i = 0; i < argc; ++i) + { + free(argv_copy[i]); + } + */ + + //delete sub; +} diff --git a/realtime/dds_lmp.cpp b/realtime/dds_lmp.cpp new file mode 100644 index 0000000..982332d --- /dev/null +++ b/realtime/dds_lmp.cpp @@ -0,0 +1,70 @@ +#include +#include + +#include +#include "ExamplePublisher.hpp" + +ExamplePublisher *pub; + +void* pub_callback(void * unused) +{ + pub->callback(); +} + +int main(int argc, char *argv[]) +{ + unsigned int message_length = 1; + { + int c; + // l stands for message length + opterr = 0; + optind = 1; + int argc_copy = argc; + char *argv_copy[argc]; + for (int i = 0; i < argc; ++i) + { + size_t len = strlen(argv[i]); + argv_copy[i] = (char*) malloc(len); + memcpy(argv_copy[i], argv[i], len); + } + + while ((c = getopt(argc_copy, argv_copy, "l:")) != -1) + { + switch(c) + { + case 'l': + message_length = std::stoul(std::string(optarg)); + break; + default: + break; + } + } + } + rttest_read_args(argc, argv); + + pub = new ExamplePublisher; + pub->message_size = message_length; + + pub->init(); + + if (rttest_set_sched_priority(98, SCHED_RR) != 0) + { + std::cout << "Failed to set realtime priority of thread" << std::endl; + } + + + if (rttest_lock_memory() != 0) + { + perror("Failed to lock dynamic memory"); + } + + rttest_prefault_stack(); + + rttest_spin(pub_callback, NULL); + + rttest_write_results(); + rttest_finish(); + + pub->teardown(); + delete pub; +} diff --git a/realtime/dds_lms.cpp b/realtime/dds_lms.cpp new file mode 100644 index 0000000..a0934d5 --- /dev/null +++ b/realtime/dds_lms.cpp @@ -0,0 +1,42 @@ +#include +#include + +#include +#include "ExampleSubscriber.hpp" + +ExampleSubscriber *sub; + +void* sub_callback(void * unused) +{ + sub->callback(); +} + +int main(int argc, char *argv[]) +{ + sub = new ExampleSubscriber(); + sub->init(); + + rttest_read_args(argc, argv); + if (rttest_set_sched_priority(90, SCHED_RR) != 0) + { + perror("Failed to set scheduling priority and policy of thread"); + } + + size_t pool_size = 1024*1024*1024; + size_t stack_size = sizeof(*sub) + 1024*1024; + if (rttest_lock_memory() != 0) + { + perror("Failed to lock memory"); + } + + rttest_prefault_stack(); + + rttest_spin(sub_callback, NULL); + + rttest_write_results(); + rttest_finish(); + + std::cout << "Subscriber received " << sub->msgs_count << " messages." << std::endl; + sub->teardown(); + delete sub; +} diff --git a/realtime/idl/LargeMsg.idl b/realtime/idl/LargeMsg.idl new file mode 100644 index 0000000..e22ded6 --- /dev/null +++ b/realtime/idl/LargeMsg.idl @@ -0,0 +1,8 @@ +module LargeMsg { + struct LargeMessage { + long key; + long seq; // Message sequence number + string content; // message body + }; +#pragma keylist LargeMessage key +}; diff --git a/realtime/msg/LargeMessage.msg b/realtime/msg/LargeMessage.msg new file mode 100644 index 0000000..0c8dfe0 --- /dev/null +++ b/realtime/msg/LargeMessage.msg @@ -0,0 +1,2 @@ +int64 seq +string content diff --git a/realtime_c/CMakeLists.txt b/realtime_c/CMakeLists.txt new file mode 100644 index 0000000..d106a4b --- /dev/null +++ b/realtime_c/CMakeLists.txt @@ -0,0 +1,28 @@ +cmake_minimum_required(VERSION 2.8) +project(realtime_c) +set (CMAKE_CXX_FLAGS "-std=c++11") + +find_package(rttest REQUIRED) + +find_package(opensplice REQUIRED) + +find_package(Threads) + +# find_package only exposes the c++ include directories but not the c headers +include_directories(c /usr/include/opensplice/dcps/C/SAC ${OPENSPLICE_INCLUDE_DIRS} ${rttest_INCLUDE_DIRS}) +link_directories(${OPENSPLICE_LIBRARY_DIRS} ${rttest_LIBRARY_DIR}) + +set(MSG_C_FILES c/LargeMsgSacDcps.c c/LargeMsgSplDcps.c) + +add_executable(publisher rt_publisher.cpp ${MSG_C_FILES}) +target_link_libraries(publisher ${OPENSPLICE_LIBRARIES} ${rttest_LIBRARIES}) + +add_executable(subscriber rt_subscriber.cpp ${MSG_C_FILES}) +target_link_libraries(subscriber ${OPENSPLICE_LIBRARIES} ${rttest_LIBRARIES}) + +if(NOT Threads_FOUND) + message(WARNING "pthread not found. Real-time intra-process pub/sub example will not be built.") +elseif(Threads_FOUND) + add_executable(intraproc rt_intraproc.cpp ${MSG_C_FILES}) + target_link_libraries(intraproc ${OPENSPLICE_LIBRARIES} ${rttest_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}) +endif() diff --git a/realtime_c/ExamplePublisher.hpp b/realtime_c/ExamplePublisher.hpp new file mode 100644 index 0000000..0a0cf2b --- /dev/null +++ b/realtime_c/ExamplePublisher.hpp @@ -0,0 +1,248 @@ +#include "dds_dcps.h" +#include "LargeMsg.h" +#include "check_status.h" +#include "unistd.h" + +#define LargeMsg_MAX_NAME 128 +#define MAX_MSG_LEN 16777216 + +class ExamplePublisher +{ + private: + DDS_DomainParticipantFactory dpf; + DDS_DomainParticipant dp; + DDS_PublisherQos pub_qos; + DDS_TopicQos topic_qos; + DDS_Publisher chatPublisher; + DDS_Topic chatMessageTopic; + LargeMsg_LargeMessageDataWriter talker; + LargeMsg_LargeMessage msg; + DDS_ReturnCode_t status; + DDS_InstanceHandle_t userHandle; + + int i; + + public: + unsigned int message_size; + void callback(); + bool init(); + bool teardown(); +}; + +bool ExamplePublisher::init() +{ + if (message_size > MAX_MSG_LEN) + { + std::cout << "Clamping message size to maximum" << std::endl; + message_size = MAX_MSG_LEN; + } + if (message_size == 0) + { + std::cout << "Cannot have 0 message size, clamping to 1" << std::endl; + message_size = 1; + } + + DDS_DomainId_t domain = DDS_DOMAIN_ID_DEFAULT; + this->i = 0; + /* Create a DomainParticipantFactory and a DomainParticipant */ + /* (using Default QoS settings). */ + this->dpf = DDS_DomainParticipantFactory_get_instance(); + if (!dpf) { + printf("Creating ParticipantFactory failed!!\n"); + exit(-1); + } + printf("Created ParticipantFactory.\n"); + + this->dp = DDS_DomainParticipantFactory_create_participant ( + dpf, + domain, + DDS_PARTICIPANT_QOS_DEFAULT, + NULL, + DDS_STATUS_MASK_NONE); + if (!dp) { + printf("Creating Participant failed!!\n"); + exit(-1); + } + printf("Created Participant.\n"); + + /* Register the required data type for LargeMessage. */ + LargeMsg_LargeMessageTypeSupport chatMessageTS = LargeMsg_LargeMessageTypeSupport__alloc(); + if (!chatMessageTS) { + printf ("Allocating TypeSupport failed!!\n"); + exit(-1); + } + + char *chatMessageTypeName = LargeMsg_LargeMessageTypeSupport_get_type_name(chatMessageTS); + status = LargeMsg_LargeMessageTypeSupport_register_type(chatMessageTS, dp, chatMessageTypeName); + if (status != DDS_RETCODE_OK) { + printf("Registering data type failed. Status = %d\n", status); + exit(-1); + }; + printf("Registered data type.\n"); + DDS_free(chatMessageTS); + + /* Set QoS policty to BEST_EFFORT */ + //topic_qos = DDS_TopicQos__alloc(); + status = DDS_DomainParticipant_get_default_topic_qos(dp, &topic_qos); + checkStatus(status, "DDS_DomainParticipant_get_default_topic_qos"); + topic_qos.reliability.kind = DDS_BEST_EFFORT_RELIABILITY_QOS; + //topic_qos->reliability.kind = DDS_RELIABLE_RELIABILITY_QOS; + + /* Make the tailored QoS the new default. */ + status = DDS_DomainParticipant_set_default_topic_qos(dp, + &topic_qos); + checkStatus(status, "DDS_DomainParticipant_set_default_topic_qos"); + + /*Create the LargeMessage topic */ + this->chatMessageTopic = DDS_DomainParticipant_create_topic( + dp, + "LargeMsg_LargeMessage", + chatMessageTypeName, + &topic_qos, + NULL, + DDS_STATUS_MASK_NONE); + if (!chatMessageTopic) { + printf("Creating LargeMessage topic failed!!\n"); + exit(-1); + }; + printf("Created LargeMessage topic.\n"); + + /* Adapt the default PublisherQos to write into the + "LargeMessageRoom" Partition. */ + + const char *partitionName = "chatter"; + + /* + pub_qos = DDS_PublisherQos__alloc(); + if (!pub_qos) { + printf("Allocating PublisherQos failed!!\n"); + exit(-1); + }*/ + + status = DDS_DomainParticipant_get_default_publisher_qos ( + dp, &pub_qos); + if (status != DDS_RETCODE_OK) { + printf("Getting default publisher qos failed!!\n"); + exit(-1); + } + pub_qos.partition.name._length = 1; + pub_qos.partition.name._maximum = 1; + pub_qos.partition.name._buffer = DDS_StringSeq_allocbuf(1); + if (!pub_qos.partition.name._buffer) { + printf("Allocating partition name failed!!\n"); + exit(-1); + } + pub_qos.partition.name._buffer[0] = DDS_string_alloc(strlen(partitionName)); + if (!pub_qos.partition.name._buffer[0]) { + printf("Allocating partition name failed!!\n"); + exit(-1); + } + strcpy(pub_qos.partition.name._buffer[0], partitionName); + + /* Create a Publisher for the chatter application. */ + this->chatPublisher = DDS_DomainParticipant_create_publisher( + dp, &pub_qos, NULL, DDS_STATUS_MASK_NONE); + if (!chatPublisher) { + printf("Creating publisher failed!!\n"); + exit(-1); + } + printf("Created publisher.\n"); + + /* Create a DataWriter for the LargeMessage Topic + (using the appropriate QoS). */ + this->talker = DDS_Publisher_create_datawriter( + chatPublisher, + chatMessageTopic, + DDS_DATAWRITER_QOS_USE_TOPIC_QOS, + NULL, + DDS_STATUS_MASK_NONE); + if (!talker) { + printf("Creating datawriter failed!!\n"); + exit(-1); + } + checkHandle(talker, "DDS_Publisher_create_datawriter"); + printf("Created datawriter.\n"); + + int ownID = 0; + + //this->msg = LargeMsg_LargeMessage__alloc(); + //checkHandle(msg, "LargeMsg_LargeMessage__alloc"); + msg.seq = 0; + //const unsigned int message_size_const = message_size; + msg.content = DDS_string_alloc(message_size); + checkHandle(msg.content, "DDS_string__alloc"); + int j; + for (j < 0; j < this->message_size; ++j) + { + msg.content[j] = '.'; + } + + // int j; + // snprintf(msg->content, message_size_const, msg_content, msg->seq); + + // register a chat message + + userHandle = LargeMsg_LargeMessageDataWriter_register_instance(talker, &msg); + printf("Created user handle and preallocated message.\n"); + return true; +} + +void ExamplePublisher::callback() +{ + this->msg.seq = this->i; + + int j; + for (j < 0; j < this->message_size; ++j) + { + msg.content[j] = i; + } + + status = LargeMsg_LargeMessageDataWriter_write(talker, &msg, userHandle); + checkStatus(status, "LargeMsg_LargeMessageDataWriter_write"); + ++i; +} + + +bool ExamplePublisher::teardown() +{ + /* Remove the DataWriters */ + status = DDS_Publisher_delete_datawriter(this->chatPublisher, + this->talker); + if (status != DDS_RETCODE_OK) { + printf("Deleting datawriter failed!!\n"); + exit(-1); + } + printf("Deleted datawriter.\n"); + + /* Remove the Publisher. */ + status = DDS_DomainParticipant_delete_publisher( + this->dp, this->chatPublisher); + if (status != DDS_RETCODE_OK) { + printf("Deleting publisher failed!!\n"); + exit(-1); + } + /* De-allocate the PublisherQoS holder. */ + //DDS_free(this->pub_qos); // Note that DDS_free recursively + // de-allocates all indirections!! + printf("Deleted publisher.\n"); + + /* Deleting the Topic. */ + status = DDS_DomainParticipant_delete_topic( + dp, chatMessageTopic); + if (status != DDS_RETCODE_OK) { + printf("Deleting topic failed. Status = %d\n", status); + exit(-1); + }; + //DDS_free(this->topic_qos); + printf("Deleted LargeMessage topic.\n"); + + /* Deleting the DomainParticipant */ + status = DDS_DomainParticipantFactory_delete_participant( + this->dpf, this->dp); + if (status != DDS_RETCODE_OK) { + printf("Deleting participant failed. Status = %d\n", status); + exit(-1); + } + + printf("Deleted Participant.\n"); +} diff --git a/realtime_c/ExampleSubscriber.hpp b/realtime_c/ExampleSubscriber.hpp new file mode 100644 index 0000000..a02fffe --- /dev/null +++ b/realtime_c/ExampleSubscriber.hpp @@ -0,0 +1,171 @@ +#include "dds_dcps.h" +#include "LargeMsg.h" +#include "check_status.h" +#include "unistd.h" + +class ExampleSubscriber +{ + + private: + + DDS_DomainParticipantFactory dpf; + DDS_DomainParticipant dp; + DDS_SubscriberQos *sub_qos; + DDS_Subscriber chatSubscriber; + LargeMsg_LargeMessageDataReader mbReader; + DDS_ReturnCode_t status; + + DDS_sequence_LargeMsg_LargeMessage *msgSeq; + DDS_SampleInfoSeq *infoSeq; + LargeMsg_LargeMessage *msg; + + unsigned long received_msgs_count; + + public: + void callback(); + bool init(); + bool teardown(); +}; + +bool ExampleSubscriber::init() +{ + DDS_DomainId_t domain = DDS_DOMAIN_ID_DEFAULT; + /* Create a DomainParticipantFactory and a DomainParticipant */ + /* (using Default QoS settings). */ + this->dpf = DDS_DomainParticipantFactory_get_instance(); + if (!dpf) { + printf("Creating ParticipantFactory failed!!\n"); + exit(-1); + } + printf("Created ParticipantFactory.\n"); + + this->dp = DDS_DomainParticipantFactory_create_participant ( + dpf, + domain, + DDS_PARTICIPANT_QOS_DEFAULT, + NULL, + DDS_STATUS_MASK_NONE); + if (!dp) { + printf("Creating Participant failed!!\n"); + exit(-1); + } + printf("Created Participant.\n"); + + /* Register the required data type for LargeMessage. */ + LargeMsg_LargeMessageTypeSupport chatMessageTS = LargeMsg_LargeMessageTypeSupport__alloc(); + if (!chatMessageTS) { + printf ("Allocating TypeSupport failed!!\n"); + exit(-1); + } + + char *chatMessageTypeName = LargeMsg_LargeMessageTypeSupport_get_type_name(chatMessageTS); + status = LargeMsg_LargeMessageTypeSupport_register_type( + chatMessageTS, dp, chatMessageTypeName); + if (status != DDS_RETCODE_OK) { + printf("Registering data type failed. Status = %d\n", status); + exit(-1); + }; + printf("Registered data type.\n"); + + /* Set QoS policty to BEST_EFFORT */ + DDS_TopicQos *topic_qos = DDS_TopicQos__alloc(); + status = DDS_DomainParticipant_get_default_topic_qos(dp, topic_qos); + checkStatus(status, "DDS_DomainParticipant_get_default_topic_qos"); + topic_qos->reliability.kind = DDS_BEST_EFFORT_RELIABILITY_QOS; + //topic_qos->reliability.kind = DDS_RELIABLE_RELIABILITY_QOS; + + /* Make the tailored QoS the new default. */ + status = DDS_DomainParticipant_set_default_topic_qos(dp, + topic_qos); + checkStatus(status, "DDS_DomainParticipant_set_default_topic_qos"); + + /*Create the LargeMessage topic */ + DDS_Topic chatMessageTopic = DDS_DomainParticipant_create_topic( + dp, + "LargeMsg_LargeMessage", + chatMessageTypeName, + topic_qos, + NULL, + DDS_STATUS_MASK_NONE); + if (!chatMessageTopic) { + printf("Creating LargeMessage topic failed!!\n"); + exit(-1); + }; + + printf("Created LargeMessage topic.\n"); + + + /* Adapt the default SubscriberQos to read from the + "ChatRoom" Partition. */ + const char *partitionName = "chatter"; + this->sub_qos = DDS_SubscriberQos__alloc(); + checkHandle(sub_qos, "DDS_SubscriberQos__alloc"); + status = DDS_DomainParticipant_get_default_subscriber_qos(this->dp, sub_qos); + checkStatus(status, "DDS_DomainParticipant_get_default_subscriber_qos"); + sub_qos->partition.name._length = 1; + sub_qos->partition.name._maximum = 1; + sub_qos->partition.name._buffer = DDS_StringSeq_allocbuf (1); + checkHandle(sub_qos->partition.name._buffer, "DDS_StringSeq_allocbuf"); + sub_qos->partition.name._buffer[0] = + DDS_string_alloc (strlen(partitionName)); + checkHandle(sub_qos->partition.name._buffer[0], "DDS_string_alloc"); + strcpy(sub_qos->partition.name._buffer[0], partitionName); + /* Create a Subscriber for the MessageBoard application. */ + chatSubscriber = DDS_DomainParticipant_create_subscriber( + this->dp, sub_qos, NULL, DDS_STATUS_MASK_NONE); + checkHandle(chatSubscriber, "DDS_DomainParticipant_create_subscriber"); + /* Create a DataReader for the ChatMessage Topic + (using the appropriate QoS). */ + this->mbReader = DDS_Subscriber_create_datareader( + chatSubscriber, + chatMessageTopic, + DDS_DATAREADER_QOS_USE_TOPIC_QOS, + NULL, + DDS_STATUS_MASK_NONE); + checkHandle(mbReader, "DDS_Subscriber_create_datareader"); + + msgSeq = DDS_sequence_LargeMsg_LargeMessage__alloc(); + checkHandle(msgSeq, "DDS_sequence_Chat_NamedMessage__alloc"); + infoSeq = DDS_SampleInfoSeq__alloc(); + checkHandle(infoSeq, "DDS_SampleInfoSeq__alloc"); + received_msgs_count = 0; + + return true; +} + +void ExampleSubscriber::callback() +{ + status = LargeMsg_LargeMessageDataReader_take( + mbReader, + msgSeq, + infoSeq, + DDS_LENGTH_UNLIMITED, + DDS_ANY_SAMPLE_STATE, + DDS_ANY_VIEW_STATE, + DDS_ALIVE_INSTANCE_STATE); + checkStatus(status, "Chat_NamedMessageDataReader_take"); + + DDS_unsigned_long i; + for (i = 0; i < msgSeq->_length; i++) { + msg = &(msgSeq->_buffer[i]); + ++this->received_msgs_count; + } + + status = LargeMsg_LargeMessageDataReader_return_loan(mbReader, msgSeq, infoSeq); + checkStatus(status, "LargeMsg_LargeMessageDataReader_return_loan"); +} + +bool ExampleSubscriber::teardown() +{ + /* Remove the DataReader */ + DDS_Subscriber_delete_datareader(this->chatSubscriber, this->mbReader); + checkStatus(status, "DDS_Subscriber_delete_datareader"); + /* Remove the Subscriber. */ + status = DDS_DomainParticipant_delete_subscriber(this->dp, chatSubscriber); + checkStatus(status, "DDS_DomainParticipant_delete_subscriber"); + /* De-allocate the SubscriberQoS holder. */ + DDS_free(this->sub_qos); // Note that DDS_free recursively + // de-allocates all indirections!! + printf("Received %lu total messages.\n", this->received_msgs_count); + return false; +} diff --git a/realtime_c/LargeMsg.idl b/realtime_c/LargeMsg.idl new file mode 100644 index 0000000..e22ded6 --- /dev/null +++ b/realtime_c/LargeMsg.idl @@ -0,0 +1,8 @@ +module LargeMsg { + struct LargeMessage { + long key; + long seq; // Message sequence number + string content; // message body + }; +#pragma keylist LargeMessage key +}; diff --git a/realtime_c/build.sh b/realtime_c/build.sh new file mode 100755 index 0000000..a6d3ddf --- /dev/null +++ b/realtime_c/build.sh @@ -0,0 +1,13 @@ +rm -fr c +mkdir c +cd c +OSPL_TMPL_PATH=/usr/etc/opensplice/idlpp idlpp -S -l c ../LargeMsg.idl +echo "Generated C code under '`pwd`'" +cd .. + +rm -fr build +mkdir build +cd build +cmake .. -DCMAKE_INSTALL_PREFIX=~/local +make +cd .. diff --git a/realtime_c/check_status.h b/realtime_c/check_status.h new file mode 100644 index 0000000..eefab3d --- /dev/null +++ b/realtime_c/check_status.h @@ -0,0 +1,46 @@ +#ifndef __CHECKSTATUS_H__ +#define __CHECKSTATUS_H__ + +#include "ccpp_dds_dcps.h" +#include + +const char *RetCodeName[13] = { + "DDS_RETCODE_OK", + "DDS_RETCODE_ERROR", + "DDS_RETCODE_UNSUPPORTED", + "DDS_RETCODE_BAD_PARAMETER", + "DDS_RETCODE_PRECONDITION_NOT_MET", + "DDS_RETCODE_OUT_OF_RESOURCES", + "DDS_RETCODE_NOT_ENABLED", + "DDS_RETCODE_IMMUTABLE_POLICY", + "DDS_RETCODE_INCONSISTENT_POLICY", + "DDS_RETCODE_ALREADY_DELETED", + "DDS_RETCODE_TIMEOUT", + "DDS_RETCODE_NO_DATA", + "DDS_RETCODE_ILLEGAL_OPERATION" +}; + +const char * getErrorName(DDS::ReturnCode_t status) +{ + return RetCodeName[status]; +} + +void checkStatus(DDS::ReturnCode_t status, const char *info) +{ + if (status != DDS::RETCODE_OK && status != DDS::RETCODE_NO_DATA) + { + std::cerr << "Error in " << info << ": " << getErrorName(status) << std::endl; + //exit (1); + } +} + +void checkHandle(void *handle, const char *info) +{ + if (!handle) + { + std::cerr << "Error in " << info << ": Creation failed: invalid handle" << std::endl; + exit (1); + } +} + +#endif diff --git a/realtime_c/rt_intraproc.cpp b/realtime_c/rt_intraproc.cpp new file mode 100644 index 0000000..11b1896 --- /dev/null +++ b/realtime_c/rt_intraproc.cpp @@ -0,0 +1,121 @@ +#include + +#include "ExampleSubscriber.hpp" +#include "ExamplePublisher.hpp" + +ExamplePublisher *pub; +ExampleSubscriber *sub; +unsigned int message_length; + +static void start_rt_thread(void *(*f)(void*)) +{ + pthread_t thread; + pthread_attr_t attr; + + /* init to default values */ + if (pthread_attr_init(&attr)) + { + fprintf(stderr, "Couldn't initialize pthread to default value"); + exit(1); + } + + /* Set the requested stacksize for this thread */ + // put this in rttest? + if (pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN + 2*message_length + sizeof(*sub))) + { + fprintf(stderr, "Couldn't set requested stack size for pthread"); + exit(1); + } + + /* And finally start the actual thread */ + pthread_create(&thread, &attr, f, NULL); +} + +void* pub_callback(void * unused) +{ + if (pub != NULL) + pub->callback(); +} + +void* sub_callback(void * unused) +{ + if (sub != NULL) + sub->callback(); +} + +void *publisher_thread(void *unused) +{ + rttest_spin(pub_callback, NULL); + + rttest_write_results_file("rttest_publisher_results"); + if (pub != NULL) + pub->teardown(); + rttest_finish(); +} + +void *subscriber_thread(void *unused) +{ + rttest_init_new_thread(); + if (rttest_set_sched_priority(97, SCHED_RR) != 0) + { + perror("Failed to set scheduling priority and policy of thread"); + } + + + rttest_spin(sub_callback, NULL); + + rttest_write_results_file("rttest_subscriber_results"); + if (sub != NULL) + sub->teardown(); + rttest_finish(); +} + +int main(int argc, char *argv[]) +{ + int c; + + // l stands for message length + opterr = 0; + optind = 1; + int argc_copy = argc; + char *argv_copy[argc]; + for (int i = 0; i < argc; ++i) + { + size_t len = strlen(argv[i]); + argv_copy[i] = (char*) malloc(len); + memcpy(argv_copy[i], argv[i], len); + } + + while ((c = getopt(argc_copy, argv_copy, "l:")) != -1) + { + switch(c) + { + case 'l': + message_length = std::stoul(std::string(optarg)); + break; + default: + break; + } + } + rttest_read_args(argc, argv); + + pub = new ExamplePublisher(); + pub->message_size = message_length; + pub->init(); + sub = new ExampleSubscriber(); + sub->init(); + + if (rttest_lock_memory() != 0) + { + perror("Failed to lock dynamic memory. Process might not be real-time safe"); + } + rttest_prefault_stack(); + + start_rt_thread(&subscriber_thread); + + if (rttest_set_sched_priority(98, SCHED_RR) != 0) + { + perror("Failed to set scheduling priority and policy of thread"); + } + publisher_thread(NULL); +} diff --git a/realtime_c/rt_publisher.cpp b/realtime_c/rt_publisher.cpp new file mode 100644 index 0000000..80fbb0b --- /dev/null +++ b/realtime_c/rt_publisher.cpp @@ -0,0 +1,65 @@ +#include + +#include +#include "ExamplePublisher.hpp" + +#define STACK_SIZE 1024*1024*1024 + +ExamplePublisher pub; + +void* pub_callback(void * unused) +{ + pub.callback(); +} + +int main(int argc, char *argv[]) +{ + unsigned int message_size = 1; + { + int c; + // l stands for message length + opterr = 0; + optind = 1; + int argc_copy = argc; + char *argv_copy[argc]; + for (int i = 0; i < argc; ++i) + { + size_t len = strlen(argv[i]); + argv_copy[i] = (char*) malloc(len); + memcpy(argv_copy[i], argv[i], len); + } + + while ((c = getopt(argc_copy, argv_copy, "l:")) != -1) + { + switch(c) + { + case 'l': + message_size = std::stoul(std::string(optarg)); + break; + default: + break; + } + } + } + pub.message_size = message_size; + pub.init(); + + rttest_read_args(argc, argv); + if (rttest_set_sched_priority(90, SCHED_RR) != 0) + { + perror("Failed to set scheduling priority and policy of thread"); + } + + if (rttest_lock_memory() != 0) + { + perror("Failed to lock memory"); + } + rttest_prefault_stack(); + + rttest_spin(pub_callback, NULL); + + rttest_write_results(); + rttest_finish(); + + pub.teardown(); +} diff --git a/realtime_c/rt_subscriber.cpp b/realtime_c/rt_subscriber.cpp new file mode 100644 index 0000000..1894bf5 --- /dev/null +++ b/realtime_c/rt_subscriber.cpp @@ -0,0 +1,37 @@ +#include +#include "ExampleSubscriber.hpp" + +#define STACK_SIZE 1024*1024*1024 + +ExampleSubscriber sub; + +void* sub_callback(void * unused) +{ + sub.callback(); +} + +int main(int argc, char *argv[]) +{ + sub.init(); + + rttest_read_args(argc, argv); + + if (rttest_set_sched_priority(90, SCHED_RR) != 0) + { + perror("Failed to set scheduling priority and policy of thread"); + } + + if (rttest_lock_memory() != 0) + { + perror("Failed to lock memory"); + } + + rttest_prefault_stack(); + + rttest_spin(sub_callback, NULL); + + rttest_write_results(); + rttest_finish(); + + sub.teardown(); +} diff --git a/realtime_connext/CMakeLists.txt b/realtime_connext/CMakeLists.txt new file mode 100644 index 0000000..00044a3 --- /dev/null +++ b/realtime_connext/CMakeLists.txt @@ -0,0 +1,31 @@ +cmake_minimum_required(VERSION 2.8 FATAL_ERROR) + +project(realtime_connext) + + +find_package(nddscpp REQUIRED) +find_package(rttest REQUIRED) + +add_definitions(${nddscpp_DEFINITIONS}) + +link_directories(${rttest_LIBRARY_DIR}) +include_directories(${rttest_INCLUDE_DIRS} ${nddscpp_INCLUDE_DIRS}) + +set(CMAKE_CXX_FLAGS "-std=c++11") + +add_library(poll_msg poll.cxx pollPlugin.cxx pollSupport.cxx) +target_link_libraries(poll_msg ${nddscpp_LIBRARIES}) + +add_executable(poll_publisher poll_publisher.cxx) +target_link_libraries(poll_publisher poll_msg ${rttest_LIBRARIES}) + +add_executable(poll_subscriber poll_subscriber.cxx) +target_link_libraries(poll_subscriber poll_msg ${rttest_LIBRARIES}) + +find_package(Threads) +if(NOT Threads_FOUND) + message(WARNING "pthread not found. Intra-process pub/sub example will not be built") +elseif (Threads_FOUND) + add_executable(poll_intraproc poll_intraproc.cxx) + target_link_libraries(poll_intraproc poll_msg ${rttest_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}) +endif() diff --git a/realtime_connext/USER_QOS_PROFILES.xml b/realtime_connext/USER_QOS_PROFILES.xml new file mode 100644 index 0000000..374b827 --- /dev/null +++ b/realtime_connext/USER_QOS_PROFILES.xml @@ -0,0 +1,114 @@ + + + + + + + + + + + + pollDataWriter + + + + + + + pollDataReader + + + + + + + pollParticipant + pollParticipantRole + + + + + + + diff --git a/realtime_connext/build.sh b/realtime_connext/build.sh new file mode 100644 index 0000000..5509f1e --- /dev/null +++ b/realtime_connext/build.sh @@ -0,0 +1,8 @@ +#!/usr/bin/env bash + +rtiddsgen2 poll.idl -replace -language C++ -example x64Linux3.xgcc4.6.3 + +mkdir build +cd build +cmake .. $@ +make diff --git a/realtime_connext/makefile b/realtime_connext/makefile new file mode 100644 index 0000000..46a812e --- /dev/null +++ b/realtime_connext/makefile @@ -0,0 +1,76 @@ +###################################################################### +# makefile_async_x64Linux3.xgcc4.6.3 +# +# (c) Copyright, Real-Time Innovations, 2012. All rights reserved. +# No duplications, whole or partial, manual or electronic, may be made +# without express written permission. Any such copies, or +# revisions thereof, must display this notice unaltered. +# This code contains trade secrets of Real-Time Innovations, Inc. +# +# +# This makefile was automatically generated by rtiddsgen. +# +# To compile, type: +# gmake -f makefile_async_x64Linux3.xgcc4.6.3 +# +# Note: This makefile is only meant to build our example applications and +# may require alterations to build on your system. +# +# This makefile assumes that your build environment is already correctly +# configured. (For example, the correct version of your compiler and +# linker should be on your PATH.) +###################################################################### + +c_cc_x64Linux3.xgcc4.6.3 = gcc +c_cc_flags_x64Linux3.xgcc4.6.3 = -m64 -Wall +c_ld_x64Linux3.xgcc4.6.3 = gcc +c_ld_flags_x64Linux3.xgcc4.6.3 = -m64 +cxx_cc_x64Linux3.xgcc4.6.3 = g++ +cxx_cc_flags_x64Linux3.xgcc4.6.3 = -m64 -Wall +cxx_ld_x64Linux3.xgcc4.6.3 = g++ +cxx_ld_flags_x64Linux3.xgcc4.6.3 = -m64 -static-libgcc +syslibs_x64Linux3.xgcc4.6.3 = -ldl -lnsl -lm -lpthread -lrt +DEFINES_ARCH_SPECIFIC = -DRTI_UNIX -DRTI_LINUX -DRTI_64BIT + + +DEFINES = $(DEFINES_ARCH_SPECIFIC) $(cxx_DEFINES_ARCH_SPECIFIC) + +INCLUDES = -I. -I$(NDDSHOME)/include -I$(NDDSHOME)/include/ndds + +LIBS = -L$(NDDSHOME)/lib/x64Linux3.xgcc4.6.3 \ + -lnddscppz -lnddscz -lnddscorez $(syslibs_x64Linux3.xgcc4.6.3) + +CDRSOURCES = async.idl +COMMONSOURCES = asyncPlugin.cxx asyncSupport.cxx async.cxx +EXEC = async_publisher async_subscriber +DIRECTORIES = objs.dir objs/x64Linux3.xgcc4.6.3.dir +COMMONOBJS = $(COMMONSOURCES:%.cxx=objs/x64Linux3.xgcc4.6.3/%.o) + +# We actually stick the objects in a sub directory to keep your directory clean. +x64Linux3.xgcc4.6.3 : $(DIRECTORIES) $(COMMONOBJS) \ + $(EXEC:%=objs/x64Linux3.xgcc4.6.3/%.o) \ + $(EXEC:%=objs/x64Linux3.xgcc4.6.3/%.out) + +objs/x64Linux3.xgcc4.6.3/%.out : objs/x64Linux3.xgcc4.6.3/%.o + $(cxx_ld_x64Linux3.xgcc4.6.3) $(cxx_ld_flags_x64Linux3.xgcc4.6.3) -o $(@:%.out=%) $(@:%.out=%.o) $(COMMONOBJS) $(LIBS) + +objs/x64Linux3.xgcc4.6.3/%.o : %.cxx + $(cxx_cc_x64Linux3.xgcc4.6.3) $(cxx_cc_flags_x64Linux3.xgcc4.6.3) -o $@ $(DEFINES) $(INCLUDES) -c $< + +# +# Uncomment these lines if you want the support files regenerated when idl +# file is modified +# +async.cxx async.h \ +asyncPlugin.cxx asyncPlugin.h \ +asyncSupport.cxx asyncSupport.h : \ + async.idl + rtiddsgen2 async.idl -replace + +# Here is how we create those subdirectories automatically. +%.dir : + @echo "Checking directory $*" + @if [ ! -d $* ]; then \ + echo "Making directory $*"; \ + mkdir -p $* ; \ + fi; diff --git a/realtime_connext/poll.idl b/realtime_connext/poll.idl new file mode 100644 index 0000000..7a8c9e0 --- /dev/null +++ b/realtime_connext/poll.idl @@ -0,0 +1,16 @@ +/******************************************************************************* + (c) 2005-2014 Copyright, Real-Time Innovations, Inc. All rights reserved. + RTI grants Licensee a license to use, modify, compile, and create derivative + works of the Software. Licensee has the right to distribute object form only + for use with RTI products. The Software is provided "as is", with no warranty + of any type, including any warranty for fitness for any purpose. RTI is under + no obligation to maintain or support the Software. RTI shall not be liable for + any incidental or consequential damages arising out of the use or inability to + use the software. + ******************************************************************************/ +struct poll { + long key; + long seq; + string content; +}; +#pragma keylist poll key diff --git a/realtime_connext/poll_intraproc.cxx b/realtime_connext/poll_intraproc.cxx new file mode 100644 index 0000000..24ff8f8 --- /dev/null +++ b/realtime_connext/poll_intraproc.cxx @@ -0,0 +1,42 @@ +#include "poll_utils.cxx" + +#if !(defined(RTI_VXWORKS) && !defined(__RTP__)) && !defined(RTI_PSOS) +int main(int argc, char *argv[]) +{ + + /* Uncomment this to turn on additional logging + NDDSConfigLogger::get_instance()-> + set_verbosity_by_category(NDDS_CONFIG_LOG_CATEGORY_API, + NDDS_CONFIG_LOG_VERBOSITY_STATUS_ALL); + */ + Arguments args; + args.argc = argc; + args.argv = argv; + + pthread_t thread; + pthread_attr_t attr; + if (pthread_attr_init(&attr)) + { + fprintf(stderr, "Couldn't initialize pthread to default value"); + exit(1); + } + pthread_create(&thread, &attr, publisher_main, static_cast(&args)); + return subscriber_main(&args); +} +#endif + +#ifdef RTI_VX653 +const unsigned char* __ctype = *(__ctypePtrGet()); + +extern "C" void usrAppInit () +{ +#ifdef USER_APPL_INIT + USER_APPL_INIT; /* for backwards compatibility */ +#endif + + /* add application specific code here */ + taskSpawn("pub", RTI_OSAPI_THREAD_PRIORITY_NORMAL, 0x8, 0x150000, (FUNCPTR)publisher_main, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0); + +} +#endif + diff --git a/realtime_connext/poll_publisher.cxx b/realtime_connext/poll_publisher.cxx new file mode 100644 index 0000000..3e26727 --- /dev/null +++ b/realtime_connext/poll_publisher.cxx @@ -0,0 +1,276 @@ + +/* poll_publisher.cxx + + A publication of data of type poll + + This file is derived from code automatically generated by the rtiddsgen + command: + +rtiddsgen -language C++ -example poll.idl + + Example publication of type poll automatically generated by + 'rtiddsgen'. To test them follow these steps: + +(1) Compile this file and the example subscription. + + (2) Start the subscription with the command + objs//poll_subscriber + + (3) Start the publication with the command + objs//poll_publisher + + (4) [Optional] Specify the list of discovery initial peers and + multicast receive addresses via an environment variable or a file + (in the current working directory) called NDDS_DISCOVERY_PEERS. + + You can run any number of publishers and subscribers programs, and can + add and remove them dynamically from the domain. + + + Example: + +To run the example application on domain : + +On Unix: + +objs//poll_publisher o + objs//poll_subscriber + + On Windows: + +objs\\poll_publisher + objs\\poll_subscriber + + +modification history +------------ ------- +*/ + +#include +#include +#ifdef RTI_VX653 +#include +#endif +#include "poll.h" +#include "pollSupport.h" +#include "ndds/ndds_cpp.h" + +/* Delete all entities */ +static int publisher_shutdown( + DDSDomainParticipant *participant) +{ + DDS_ReturnCode_t retcode; + int status = 0; + + if (participant != NULL) { + retcode = participant->delete_contained_entities(); + if (retcode != DDS_RETCODE_OK) { + printf("delete_contained_entities error %d\n", retcode); + status = -1; + } + + retcode = DDSTheParticipantFactory->delete_participant(participant); + if (retcode != DDS_RETCODE_OK) { + printf("delete_participant error %d\n", retcode); + status = -1; + } + } + + /* RTI Connext provides finalize_instance() method on + domain participant factory for people who want to release memory used + by the participant factory. Uncomment the following block of code for + clean destruction of the singleton. */ + /* + + retcode = DDSDomainParticipantFactory::finalize_instance(); + if (retcode != DDS_RETCODE_OK) { + printf("finalize_instance error %d\n", retcode); + status = -1; + } + */ + + return status; +} + +extern "C" int publisher_main(int domainId, int sample_count) +{ + DDSDomainParticipant *participant = NULL; + DDSPublisher *publisher = NULL; + DDSTopic *topic = NULL; + DDSDataWriter *writer = NULL; + pollDataWriter * poll_writer = NULL; + poll *instance = NULL; + DDS_ReturnCode_t retcode; + DDS_InstanceHandle_t instance_handle = DDS_HANDLE_NIL; + const char *type_name = NULL; + int count = 0; + DDS_Duration_t send_period = {4,0}; + + /* To customize participant QoS, use + the configuration file USER_QOS_PROFILES.xml */ + participant = DDSTheParticipantFactory->create_participant( + domainId, DDS_PARTICIPANT_QOS_DEFAULT, + NULL /* listener */, DDS_STATUS_MASK_NONE); + if (participant == NULL) { + printf("create_participant error\n"); + publisher_shutdown(participant); + return -1; + } + + /* To customize publisher QoS, use + the configuration file USER_QOS_PROFILES.xml */ + publisher = participant->create_publisher( + DDS_PUBLISHER_QOS_DEFAULT, NULL /* listener */, DDS_STATUS_MASK_NONE); + if (publisher == NULL) { + printf("create_publisher error\n"); + publisher_shutdown(participant); + return -1; + } + + /* Register type before creating topic */ + type_name = pollTypeSupport::get_type_name(); + retcode = pollTypeSupport::register_type( + participant, type_name); + if (retcode != DDS_RETCODE_OK) { + printf("register_type error %d\n", retcode); + publisher_shutdown(participant); + return -1; + } + + /* To customize topic QoS, use + the configuration file USER_QOS_PROFILES.xml */ + topic = participant->create_topic( + "Example poll", + type_name, DDS_TOPIC_QOS_DEFAULT, NULL /* listener */, + DDS_STATUS_MASK_NONE); + if (topic == NULL) { + printf("create_topic error\n"); + publisher_shutdown(participant); + return -1; + } + + /* To customize data writer QoS, use + the configuration file USER_QOS_PROFILES.xml */ + writer = publisher->create_datawriter( + topic, DDS_DATAWRITER_QOS_DEFAULT, NULL /* listener */, + DDS_STATUS_MASK_NONE); + if (writer == NULL) { + printf("create_datawriter error\n"); + publisher_shutdown(participant); + return -1; + } + poll_writer = pollDataWriter::narrow(writer); + if (poll_writer == NULL) { + printf("DataWriter narrow error\n"); + publisher_shutdown(participant); + return -1; + } + + /* Create data sample for writing */ + instance = pollTypeSupport::create_data(); + if (instance == NULL) { + printf("pollTypeSupport::create_data error\n"); + publisher_shutdown(participant); + return -1; + } + + /* For a data type that has a key, if the same instance is going to be + written multiple times, initialize the key here + and register the keyed instance prior to writing */ + /* + instance_handle = poll_writer->register_instance(*instance); + */ + + /* Main loop */ + for (count=0; (sample_count == 0) || (count < sample_count); ++count) { + + printf("Writing poll, count %d\n", count); + + /* Modify the data to be sent here */ + + retcode = poll_writer->write(*instance, instance_handle); + if (retcode != DDS_RETCODE_OK) { + printf("write error %d\n", retcode); + } + + NDDSUtility::sleep(send_period); + } + + /* + retcode = poll_writer->unregister_instance( + *instance, instance_handle); + if (retcode != DDS_RETCODE_OK) { + printf("unregister instance error %d\n", retcode); + } + */ + + /* Delete data sample */ + retcode = pollTypeSupport::delete_data(instance); + if (retcode != DDS_RETCODE_OK) { + printf("pollTypeSupport::delete_data error %d\n", retcode); + } + + /* Delete all entities */ + return publisher_shutdown(participant); +} + +#if defined(RTI_WINCE) +int wmain(int argc, wchar_t** argv) +{ + int domainId = 0; + int sample_count = 0; /* infinite loop */ + + if (argc >= 2) { + domainId = _wtoi(argv[1]); + } + if (argc >= 3) { + sample_count = _wtoi(argv[2]); + } + + /* Uncomment this to turn on additional logging + NDDSConfigLogger::get_instance()-> + set_verbosity_by_category(NDDS_CONFIG_LOG_CATEGORY_API, + NDDS_CONFIG_LOG_VERBOSITY_STATUS_ALL); + */ + + return publisher_main(domainId, sample_count); +} + +#elif !(defined(RTI_VXWORKS) && !defined(__RTP__)) && !defined(RTI_PSOS) +int main(int argc, char *argv[]) +{ + int domainId = 0; + int sample_count = 0; /* infinite loop */ + + if (argc >= 2) { + domainId = atoi(argv[1]); + } + if (argc >= 3) { + sample_count = atoi(argv[2]); + } + + /* Uncomment this to turn on additional logging + NDDSConfigLogger::get_instance()-> + set_verbosity_by_category(NDDS_CONFIG_LOG_CATEGORY_API, + NDDS_CONFIG_LOG_VERBOSITY_STATUS_ALL); + */ + + return publisher_main(domainId, sample_count); +} +#endif + +#ifdef RTI_VX653 +const unsigned char* __ctype = *(__ctypePtrGet()); + +extern "C" void usrAppInit () +{ + #ifdef USER_APPL_INIT + USER_APPL_INIT; /* for backwards compatibility */ + #endif + + /* add application specific code here */ + taskSpawn("pub", RTI_OSAPI_THREAD_PRIORITY_NORMAL, 0x8, 0x150000, (FUNCPTR)publisher_main, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0); + +} +#endif + diff --git a/realtime_connext/poll_subscriber.cxx b/realtime_connext/poll_subscriber.cxx new file mode 100644 index 0000000..aae517d --- /dev/null +++ b/realtime_connext/poll_subscriber.cxx @@ -0,0 +1,305 @@ + +/* poll_subscriber.cxx + + A subscription example + + This file is derived from code automatically generated by the rtiddsgen + command: + +rtiddsgen -language C++ -example poll.idl + + Example subscription of type poll automatically generated by + 'rtiddsgen'. To test them follow these steps: + +(1) Compile this file and the example publication. + + (2) Start the subscription with the command + objs//poll_subscriber + + (3) Start the publication with the command + objs//poll_publisher + + (4) [Optional] Specify the list of discovery initial peers and + multicast receive addresses via an environment variable or a file + (in the current working directory) called NDDS_DISCOVERY_PEERS. + + You can run any number of publishers and subscribers programs, and can + add and remove them dynamically from the domain. + + + Example: + +To run the example application on domain : + +On Unix: + +objs//poll_publisher + objs//poll_subscriber + + On Windows: + +objs\\poll_publisher + objs\\poll_subscriber + + +modification history +------------ ------- +*/ + +#include +#include +#ifdef RTI_VX653 +#include +#endif +#include "poll.h" +#include "pollSupport.h" +#include "ndds/ndds_cpp.h" + +class pollListener : public DDSDataReaderListener { + public: + virtual void on_requested_deadline_missed( + DDSDataReader* /*reader*/, + const DDS_RequestedDeadlineMissedStatus& /*status*/) {} + + virtual void on_requested_incompatible_qos( + DDSDataReader* /*reader*/, + const DDS_RequestedIncompatibleQosStatus& /*status*/) {} + + virtual void on_sample_rejected( + DDSDataReader* /*reader*/, + const DDS_SampleRejectedStatus& /*status*/) {} + + virtual void on_liveliness_changed( + DDSDataReader* /*reader*/, + const DDS_LivelinessChangedStatus& /*status*/) {} + + virtual void on_sample_lost( + DDSDataReader* /*reader*/, + const DDS_SampleLostStatus& /*status*/) {} + + virtual void on_subscription_matched( + DDSDataReader* /*reader*/, + const DDS_SubscriptionMatchedStatus& /*status*/) {} + + virtual void on_data_available(DDSDataReader* reader); +}; + +void pollListener::on_data_available(DDSDataReader* reader) +{ + pollDataReader *poll_reader = NULL; + pollSeq data_seq; + DDS_SampleInfoSeq info_seq; + DDS_ReturnCode_t retcode; + int i; + + poll_reader = pollDataReader::narrow(reader); + if (poll_reader == NULL) { + printf("DataReader narrow error\n"); + return; + } + + retcode = poll_reader->take( + data_seq, info_seq, DDS_LENGTH_UNLIMITED, + DDS_ANY_SAMPLE_STATE, DDS_ANY_VIEW_STATE, DDS_ANY_INSTANCE_STATE); + + if (retcode == DDS_RETCODE_NO_DATA) { + return; + } else if (retcode != DDS_RETCODE_OK) { + printf("take error %d\n", retcode); + return; + } + + for (i = 0; i < data_seq.length(); ++i) { + if (info_seq[i].valid_data) { + pollTypeSupport::print_data(&data_seq[i]); + } + } + + retcode = poll_reader->return_loan(data_seq, info_seq); + if (retcode != DDS_RETCODE_OK) { + printf("return loan error %d\n", retcode); + } +} + +/* Delete all entities */ +static int subscriber_shutdown( + DDSDomainParticipant *participant) +{ + DDS_ReturnCode_t retcode; + int status = 0; + + if (participant != NULL) { + retcode = participant->delete_contained_entities(); + if (retcode != DDS_RETCODE_OK) { + printf("delete_contained_entities error %d\n", retcode); + status = -1; + } + + retcode = DDSTheParticipantFactory->delete_participant(participant); + if (retcode != DDS_RETCODE_OK) { + printf("delete_participant error %d\n", retcode); + status = -1; + } + } + + /* RTI Connext provides the finalize_instance() method on + domain participant factory for people who want to release memory used + by the participant factory. Uncomment the following block of code for + clean destruction of the singleton. */ + /* + + retcode = DDSDomainParticipantFactory::finalize_instance(); + if (retcode != DDS_RETCODE_OK) { + printf("finalize_instance error %d\n", retcode); + status = -1; + } + */ + return status; +} + +extern "C" int subscriber_main(int domainId, int sample_count) +{ + DDSDomainParticipant *participant = NULL; + DDSSubscriber *subscriber = NULL; + DDSTopic *topic = NULL; + pollListener *reader_listener = NULL; + DDSDataReader *reader = NULL; + DDS_ReturnCode_t retcode; + const char *type_name = NULL; + int count = 0; + DDS_Duration_t receive_period = {4,0}; + int status = 0; + + /* To customize the participant QoS, use + the configuration file USER_QOS_PROFILES.xml */ + participant = DDSTheParticipantFactory->create_participant( + domainId, DDS_PARTICIPANT_QOS_DEFAULT, + NULL /* listener */, DDS_STATUS_MASK_NONE); + if (participant == NULL) { + printf("create_participant error\n"); + subscriber_shutdown(participant); + return -1; + } + + /* To customize the subscriber QoS, use + the configuration file USER_QOS_PROFILES.xml */ + subscriber = participant->create_subscriber( + DDS_SUBSCRIBER_QOS_DEFAULT, NULL /* listener */, DDS_STATUS_MASK_NONE); + if (subscriber == NULL) { + printf("create_subscriber error\n"); + subscriber_shutdown(participant); + return -1; + } + + /* Register the type before creating the topic */ + type_name = pollTypeSupport::get_type_name(); + retcode = pollTypeSupport::register_type( + participant, type_name); + if (retcode != DDS_RETCODE_OK) { + printf("register_type error %d\n", retcode); + subscriber_shutdown(participant); + return -1; + } + + /* To customize the topic QoS, use + the configuration file USER_QOS_PROFILES.xml */ + topic = participant->create_topic( + "Example poll", + type_name, DDS_TOPIC_QOS_DEFAULT, NULL /* listener */, + DDS_STATUS_MASK_NONE); + if (topic == NULL) { + printf("create_topic error\n"); + subscriber_shutdown(participant); + return -1; + } + + /* Create a data reader listener */ + reader_listener = new pollListener(); + + /* To customize the data reader QoS, use + the configuration file USER_QOS_PROFILES.xml */ + reader = subscriber->create_datareader( + topic, DDS_DATAREADER_QOS_DEFAULT, reader_listener, + DDS_STATUS_MASK_ALL); + if (reader == NULL) { + printf("create_datareader error\n"); + subscriber_shutdown(participant); + delete reader_listener; + return -1; + } + + /* Main loop */ + for (count=0; (sample_count == 0) || (count < sample_count); ++count) { + + printf("poll subscriber sleeping for %d sec...\n", + receive_period.sec); + + NDDSUtility::sleep(receive_period); + } + + /* Delete all entities */ + status = subscriber_shutdown(participant); + delete reader_listener; + + return status; +} + +#if defined(RTI_WINCE) +int wmain(int argc, wchar_t** argv) +{ + int domainId = 0; + int sample_count = 0; /* infinite loop */ + + if (argc >= 2) { + domainId = _wtoi(argv[1]); + } + if (argc >= 3) { + sample_count = _wtoi(argv[2]); + } + + /* Uncomment this to turn on additional logging + NDDSConfigLogger::get_instance()-> + set_verbosity_by_category(NDDS_CONFIG_LOG_CATEGORY_API, + NDDS_CONFIG_LOG_VERBOSITY_STATUS_ALL); + */ + + return subscriber_main(domainId, sample_count); +} + +#elif !(defined(RTI_VXWORKS) && !defined(__RTP__)) && !defined(RTI_PSOS) +int main(int argc, char *argv[]) +{ + int domainId = 0; + int sample_count = 0; /* infinite loop */ + + if (argc >= 2) { + domainId = atoi(argv[1]); + } + if (argc >= 3) { + sample_count = atoi(argv[2]); + } + + /* Uncomment this to turn on additional logging + NDDSConfigLogger::get_instance()-> + set_verbosity_by_category(NDDS_CONFIG_LOG_CATEGORY_API, + NDDS_CONFIG_LOG_VERBOSITY_STATUS_ALL); + */ + + return subscriber_main(domainId, sample_count); +} +#endif + +#ifdef RTI_VX653 +const unsigned char* __ctype = *(__ctypePtrGet()); + +extern "C" void usrAppInit () +{ + #ifdef USER_APPL_INIT + USER_APPL_INIT; /* for backwards compatibility */ + #endif + + /* add application specific code here */ + taskSpawn("sub", RTI_OSAPI_THREAD_PRIORITY_NORMAL, 0x8, 0x150000, (FUNCPTR)subscriber_main, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0); + +} +#endif diff --git a/realtime_connext/poll_utils.cxx b/realtime_connext/poll_utils.cxx new file mode 100644 index 0000000..2c7df1a --- /dev/null +++ b/realtime_connext/poll_utils.cxx @@ -0,0 +1,468 @@ +#ifndef EXAMPLE_POLL_UTILS_H +#define EXAMPLE_POLL_UTILS_H + +#include +#include +#include + +#ifdef RTI_VX653 +#include +#endif +#include "poll.h" +#include "pollSupport.h" +#include "ndds/ndds_cpp.h" + +#include + +unsigned int samples_received = 0; + +struct PublisherNode +{ + pollDataWriter * poll_writer; + poll *instance; + DDS_InstanceHandle_t instance_handle; + int count; + unsigned int message_length; +}; + +struct Arguments +{ + int argc; + char **argv; +}; + +/* Delete all entities */ +static int subscriber_shutdown( + DDSDomainParticipant *participant) +{ + printf("Samples received by subscriber: %d\n", samples_received); + DDS_ReturnCode_t retcode; + int status = 0; + + if (participant != NULL) { + retcode = participant->delete_contained_entities(); + if (retcode != DDS_RETCODE_OK) { + printf("delete_contained_entities error %d\n", retcode); + status = -1; + } + + retcode = DDSTheParticipantFactory->delete_participant(participant); + if (retcode != DDS_RETCODE_OK) { + printf("delete_participant error %d\n", retcode); + status = -1; + } + } + + /* RTI Connext provides the finalize_instance() method on + domain participant factory for people who want to release memory used + by the participant factory. Uncomment the following block of code for + clean destruction of the singleton. */ + /* + retcode = DDSDomainParticipantFactory::finalize_instance(); + if (retcode != DDS_RETCODE_OK) { + printf("finalize_instance error %d\n", retcode); + //status = -1; + } + */ + return status; +} + +void *subscriber_callback(void *args) +{ + pollDataReader *poll_reader = static_cast(args); + DDS_SampleInfoSeq info_seq; + pollSeq data_seq; + + /* Check for new data calling the DataReader's take() method */ + DDS_ReturnCode_t retcode = poll_reader->take( + data_seq, info_seq, DDS_LENGTH_UNLIMITED, + DDS_ANY_SAMPLE_STATE, DDS_ANY_VIEW_STATE, DDS_ANY_INSTANCE_STATE); + if (retcode == DDS_RETCODE_NO_DATA) { + /// Not an error + return NULL; + } else if (retcode != DDS_RETCODE_OK) { + // Is an error + printf("take error: %d\n", retcode); + exit(-1); + } + + int len = 0; + double sum = 0; + + /* Iterate through the samples read using the take() method, getting + * the number of samples got and, adding the value of x on each of + * them to calculate the average afterwards. */ + for (int i = 0; i < data_seq.length(); ++i) { + if (!info_seq[i].valid_data) + continue; + len ++; + sum += data_seq[i].seq; + ++samples_received; + } + + /* + if (len > 0) + printf("Got %d samples. Avg = %.1f\n", len, sum/len); + */ + + retcode = poll_reader->return_loan(data_seq, info_seq); + if (retcode != DDS_RETCODE_OK) { + printf("return loan error %d\n", retcode); + } + +} + +int subscriber_main(int argc, char *argv[]) +{ + rttest_read_args(argc, argv); + + int domainId = 0; + DDSDomainParticipant *participant = NULL; + DDSSubscriber *subscriber = NULL; + DDSTopic *topic = NULL; + DDSDataReader *reader = NULL; + DDS_ReturnCode_t retcode; + const char *type_name = NULL; + int count = 0; + /* Poll for new samples every 5 seconds */ + DDS_Duration_t receive_period = {5,0}; + int status = 0; + + /* To customize the participant QoS, use + the configuration file USER_QOS_PROFILES.xml */ + participant = DDSTheParticipantFactory->create_participant( + domainId, DDS_PARTICIPANT_QOS_DEFAULT, + NULL /* listener */, DDS_STATUS_MASK_NONE); + if (participant == NULL) { + printf("create_participant error\n"); + subscriber_shutdown(participant); + return -1; + } + + /* To customize the subscriber QoS, use + the configuration file USER_QOS_PROFILES.xml */ + subscriber = participant->create_subscriber( + DDS_SUBSCRIBER_QOS_DEFAULT, NULL /* listener */, DDS_STATUS_MASK_NONE); + if (subscriber == NULL) { + printf("create_subscriber error\n"); + subscriber_shutdown(participant); + return -1; + } + + /* Register the type before creating the topic */ + type_name = pollTypeSupport::get_type_name(); + retcode = pollTypeSupport::register_type( + participant, type_name); + if (retcode != DDS_RETCODE_OK) { + printf("register_type error %d\n", retcode); + subscriber_shutdown(participant); + return -1; + } + + /* To customize the topic QoS, use + the configuration file USER_QOS_PROFILES.xml */ + topic = participant->create_topic( + "Example poll", + type_name, DDS_TOPIC_QOS_DEFAULT, NULL /* listener */, + DDS_STATUS_MASK_NONE); + if (topic == NULL) { + printf("create_topic error\n"); + subscriber_shutdown(participant); + return -1; + } + + /* Call create_datareader passing NULL in the listener parameter */ + reader = subscriber->create_datareader( + topic, DDS_DATAREADER_QOS_DEFAULT, NULL, + DDS_STATUS_MASK_ALL); + if (reader == NULL) { + printf("create_datareader error\n"); + subscriber_shutdown(participant); + return -1; + } + + /* If you want to change datareader_qos.history.kind programmatically rather + * than using the XML file, you will need to add the following lines to your + * code and comment out the create_datareader call above. */ + + /* + DDS_DataReaderQos datareader_qos; + retcode = subscriber->get_default_datareader_qos(datareader_qos); + if (retcode != DDS_RETCODE_OK) { + printf("get_default_datareader_qos error\n"); + return -1; + } + + datareader_qos.history.kind = DDS_KEEP_ALL_HISTORY_QOS; + + reader = subscriber->create_datareader( + topic, datareader_qos, NULL, + DDS_STATUS_MASK_ALL); + if (reader == NULL) { + printf("create_datareader error\n"); + subscriber_shutdown(participant); + return -1; + } + */ + + pollDataReader *poll_reader = pollDataReader::narrow(reader); + if (poll_reader == NULL) { + printf("DataReader narrow error\n"); + return -1; + } + + if (rttest_set_sched_priority(97, SCHED_RR) != 0) { + perror("Failed to set scheduling priority"); + } + + if (rttest_lock_memory() != 0) { + perror("Couldn't lock memory"); + } + rttest_prefault_stack(); + + /* Main loop */ + rttest_spin(subscriber_callback, static_cast(poll_reader)); + + /* Delete all entities */ + status = subscriber_shutdown(participant); + + rttest_write_results_file("rttest_subscriber_results"); + rttest_finish(); + + return status; +} + +int subscriber_main(Arguments *args) +{ + return subscriber_main(args->argc, args->argv); +} + +void *subscriber_main(void *args) +{ + Arguments *new_args = static_cast(args); + subscriber_main(new_args); + return NULL; +} + +/* Delete all entities */ +static int publisher_shutdown( + DDSDomainParticipant *participant) +{ + DDS_ReturnCode_t retcode; + int status = 0; + + if (participant != NULL) { + retcode = participant->delete_contained_entities(); + if (retcode != DDS_RETCODE_OK) { + printf("delete_contained_entities error %d\n", retcode); + status = -1; + } + + retcode = DDSTheParticipantFactory->delete_participant(participant); + if (retcode != DDS_RETCODE_OK) { + printf("delete_participant error %d\n", retcode); + status = -1; + } + } + + /* RTI Connext provides finalize_instance() method on + domain participant factory for people who want to release memory used + by the participant factory. Uncomment the following block of code for + clean destruction of the singleton. */ + retcode = DDSDomainParticipantFactory::finalize_instance(); + if (retcode != DDS_RETCODE_OK) { + printf("finalize_instance error %d\n", retcode); + //status = -1; + } + + return status; +} + +void *publisher_callback(void *args) +{ + PublisherNode *pub_node = static_cast(args); + // message instance, instance_handle, poll_writer + /* Set x to a random number between 0 and 9 */ + pub_node->instance->seq = pub_node->count; + for (int j = 0; j < pub_node->message_length; j++) { + pub_node->instance->content[j] = pub_node->count; + } + + DDS_ReturnCode_t retcode = pub_node->poll_writer->write(*pub_node->instance, pub_node->instance_handle); + if (retcode != DDS_RETCODE_OK) { + printf("write error %d\n", retcode); + } + ++pub_node->count; +} + +int publisher_main(int argc, char *argv[]) +{ + unsigned int message_length = 1; + { + int c; + // l stands for message length + opterr = 0; + optind = 1; + int argc_copy = argc; + char *argv_copy[argc]; + for (int i = 0; i < argc; ++i) + { + size_t len = strlen(argv[i]); + argv_copy[i] = (char*) malloc(len); + memcpy(argv_copy[i], argv[i], len); + } + + while ((c = getopt(argc_copy, argv_copy, "l:")) != -1) + { + switch(c) + { + case 'l': + message_length = std::stoul(std::string(optarg)); + break; + default: + break; + } + } + } + + rttest_read_args(argc, argv); + int domainId = 0; + DDSDomainParticipant *participant = NULL; + DDSPublisher *publisher = NULL; + DDSTopic *topic = NULL; + DDSDataWriter *writer = NULL; + pollDataWriter * poll_writer = NULL; + poll *instance = NULL; + DDS_ReturnCode_t retcode; + DDS_InstanceHandle_t instance_handle = DDS_HANDLE_NIL; + const char *type_name = NULL; + int count = 0; + + /* To customize participant QoS, use + the configuration file USER_QOS_PROFILES.xml */ + participant = DDSTheParticipantFactory->create_participant( + domainId, DDS_PARTICIPANT_QOS_DEFAULT, + NULL /* listener */, DDS_STATUS_MASK_NONE); + if (participant == NULL) { + printf("create_participant error\n"); + publisher_shutdown(participant); + return -1; + } + + /* To customize publisher QoS, use + the configuration file USER_QOS_PROFILES.xml */ + publisher = participant->create_publisher( + DDS_PUBLISHER_QOS_DEFAULT, NULL /* listener */, DDS_STATUS_MASK_NONE); + if (publisher == NULL) { + printf("create_publisher error\n"); + publisher_shutdown(participant); + return -1; + } + + /* Register type before creating topic */ + type_name = pollTypeSupport::get_type_name(); + retcode = pollTypeSupport::register_type( + participant, type_name); + if (retcode != DDS_RETCODE_OK) { + printf("register_type error %d\n", retcode); + publisher_shutdown(participant); + return -1; + } + + /* To customize topic QoS, use + the configuration file USER_QOS_PROFILES.xml */ + topic = participant->create_topic( + "Example poll", + type_name, DDS_TOPIC_QOS_DEFAULT, NULL /* listener */, + DDS_STATUS_MASK_NONE); + if (topic == NULL) { + printf("create_topic error\n"); + publisher_shutdown(participant); + return -1; + } + + /* To customize data writer QoS, use + the configuration file USER_QOS_PROFILES.xml */ + writer = publisher->create_datawriter( + topic, DDS_DATAWRITER_QOS_DEFAULT, NULL /* listener */, + DDS_STATUS_MASK_NONE); + if (writer == NULL) { + printf("create_datawriter error\n"); + publisher_shutdown(participant); + return -1; + } + poll_writer = pollDataWriter::narrow(writer); + if (poll_writer == NULL) { + printf("DataWriter narrow error\n"); + publisher_shutdown(participant); + return -1; + } + + /* Create data sample for writing */ + + instance = pollTypeSupport::create_data(); + + if (instance == NULL) { + printf("pollTypeSupport::create_data error\n"); + publisher_shutdown(participant); + return -1; + } + strcpy(instance->content, std::string(message_length, '.').c_str()); + + /* For a data type that has a key, if the same instance is going to be + written multiple times, initialize the key here + and register the keyed instance prior to writing */ + instance_handle = poll_writer->register_instance(*instance); + + PublisherNode pub_node; + pub_node.instance = instance; + pub_node.instance_handle = instance_handle; + pub_node.poll_writer = poll_writer; + pub_node.count = 0; + pub_node.message_length = message_length; + + /* Initialize random seed before entering the loop */ + // srand(time(NULL)); + + if (rttest_set_sched_priority(98, SCHED_RR) != 0) { + perror("Failed to set scheduling priority"); + } + + if (rttest_lock_memory() != 0) { + perror("Couldn't lock memory"); + } + rttest_prefault_stack(); + + rttest_spin(publisher_callback, static_cast(&pub_node)); + rttest_write_results_file("rttest_publisher_results"); + rttest_finish(); + + retcode = poll_writer->unregister_instance( + *instance, instance_handle); + if (retcode != DDS_RETCODE_OK) { + printf("unregister instance error %d\n", retcode); + } + + /* Delete data sample */ + retcode = pollTypeSupport::delete_data(instance); + if (retcode != DDS_RETCODE_OK) { + printf("pollTypeSupport::delete_data error %d\n", retcode); + } + + /* Delete all entities */ + return publisher_shutdown(participant); +} + +int publisher_main(Arguments *args) +{ + return publisher_main(args->argc, args->argv); +} + +void *publisher_main(void *args) +{ + Arguments *new_args = static_cast(args); + int ret = publisher_main(new_args); + return static_cast(&ret); +} + +#endif diff --git a/tutorial_c/publisher/CMakeLists.txt b/tutorial_c/publisher/CMakeLists.txt index 5324fe6..c7ea618 100644 --- a/tutorial_c/publisher/CMakeLists.txt +++ b/tutorial_c/publisher/CMakeLists.txt @@ -1,10 +1,16 @@ cmake_minimum_required(VERSION 2.8) project(publisher) +find_package(rttest REQUIRED) + find_package(opensplice REQUIRED) + # find_package only exposes the c++ include directories but not the c headers -include_directories(c /usr/include/opensplice/dcps/C/SAC ${OPENSPLICE_INCLUDE_DIRS}) -link_directories(${OPENSPLICE_LIBRARY_DIRS}) +include_directories(c /usr/include/opensplice/dcps/C/SAC ${OPENSPLICE_INCLUDE_DIRS} ${rttest_INCLUDE_DIRS}) +link_directories(${OPENSPLICE_LIBRARY_DIRS} ${rttest_LIBRARY_DIR}) add_executable(publisher main.c c/ChatSacDcps.c c/ChatSplDcps.c) target_link_libraries(publisher ${OPENSPLICE_LIBRARIES}) + +add_executable(rtt_publisher instrumented_main.c c/ChatSacDcps.c c/ChatSplDcps.c) +target_link_libraries(rtt_publisher ${OPENSPLICE_LIBRARIES} ${rttest_LIBRARIES} m stdc++) diff --git a/tutorial_c/publisher/build.sh b/tutorial_c/publisher/build.sh index a3b4aec..8ee6e2d 100755 --- a/tutorial_c/publisher/build.sh +++ b/tutorial_c/publisher/build.sh @@ -8,6 +8,6 @@ cd .. rm -fr build mkdir build cd build -cmake .. +cmake .. -DCMAKE_INSTALL_PREFIX=~/local make cd .. diff --git a/tutorial_c/publisher/instrumented_main.c b/tutorial_c/publisher/instrumented_main.c new file mode 100644 index 0000000..77de96c --- /dev/null +++ b/tutorial_c/publisher/instrumented_main.c @@ -0,0 +1,239 @@ +/* CreateTopics.c */ +#include "dds_dcps.h" +#include "Chat.h" +#include "unistd.h" + +#include + +#define MAX_MSG_LEN 16777216 + +int i = 0; +Chat_ChatMessageDataWriter talker; +Chat_ChatMessage *msg; +DDS_ReturnCode_t status; +DDS_InstanceHandle_t userHandle; +char *msg_content; + +void* callback(void* unused) +{ + msg->index = i; + snprintf(msg->content, MAX_MSG_LEN, msg_content, msg->index); + status = Chat_ChatMessageDataWriter_write(talker, msg, userHandle); + // checkStatus(status, "Chat_ChatMessageDataWriter_write"); + ++i; +} + +int main ( + int argc, + char *argv[]) +{ + // begin init + + DDS_DomainParticipantFactory dpf; + DDS_DomainParticipant dp; + DDS_DomainId_t domain = DDS_DOMAIN_ID_DEFAULT; + Chat_ChatMessageTypeSupport chatMessageTS; + DDS_Topic chatMessageTopic; + char *chatMessageTypeName; + + DDS_PublisherQos *pub_qos; + DDS_DataWriterQos *dw_qos; + DDS_Publisher chatPublisher; + Chat_NameServiceDataWriter nameServer; + char *partitionName = NULL; + + DDS_Topic nameServiceTopic; + + int ownID; + + Chat_NameService ns; + + + /* Create a DomainParticipantFactory and a DomainParticipant */ + /* (using Default QoS settings). */ + dpf = DDS_DomainParticipantFactory_get_instance(); + if (!dpf) { + printf("Creating ParticipantFactory failed!!\n"); + exit(-1); + } + printf("Created ParticipantFactory.\n"); + + dp = DDS_DomainParticipantFactory_create_participant ( + dpf, + domain, + DDS_PARTICIPANT_QOS_DEFAULT, + NULL, + DDS_STATUS_MASK_NONE); + if (!dp) { + printf("Creating Participant failed!!\n"); + exit(-1); + } + printf("Created Participant.\n"); + + + /* Register the required data type for ChatMessage. */ + chatMessageTS = Chat_ChatMessageTypeSupport__alloc(); + if (!chatMessageTS) { + printf ("Allocating TypeSupport failed!!\n"); + exit(-1); + }; + chatMessageTypeName = Chat_ChatMessageTypeSupport_get_type_name(chatMessageTS); + status = Chat_ChatMessageTypeSupport_register_type( + chatMessageTS, dp, chatMessageTypeName); + if (status != DDS_RETCODE_OK) { + printf("Registering data type failed. Status = %d\n", status); + exit(-1); + }; + printf("Registered data type.\n"); + + /*Create the ChatMessage topic */ + chatMessageTopic = DDS_DomainParticipant_create_topic( + dp, + "Chat_ChatMessage", + chatMessageTypeName, + DDS_TOPIC_QOS_DEFAULT, + NULL, + DDS_STATUS_MASK_NONE); + if (!chatMessageTopic) { + printf("Creating ChatMessage topic failed!!\n"); + exit(-1); + }; + printf("Created ChatMessage topic.\n"); + + /* Adapt the default PublisherQos to write into the + "ChatRoom" Partition. */ + partitionName = "ChatRoom"; + pub_qos = DDS_PublisherQos__alloc(); + if (!pub_qos) { + printf("Allocating PublisherQos failed!!\n"); + exit(-1); + } + status = DDS_DomainParticipant_get_default_publisher_qos ( + dp, pub_qos); + if (status != DDS_RETCODE_OK) { + printf("Getting default publisher qos failed!!\n"); + exit(-1); + } + pub_qos->partition.name._length = 1; + pub_qos->partition.name._maximum = 1; + pub_qos->partition.name._buffer = DDS_StringSeq_allocbuf (1); + if (!pub_qos->partition.name._buffer) { + printf("Allocating partition name failed!!\n"); + exit(-1); + } + pub_qos->partition.name._buffer[0] = DDS_string_alloc ( + strlen(partitionName)); + if (!pub_qos->partition.name._buffer[0]) { + printf("Allocating partition name failed!!\n"); + exit(-1); + } + strcpy (pub_qos->partition.name._buffer[0], partitionName); + + /* Create a Publisher for the chatter application. */ + chatPublisher = DDS_DomainParticipant_create_publisher( + dp, pub_qos, NULL, DDS_STATUS_MASK_NONE); + if (!chatPublisher) { + printf("Creating publisher failed!!\n"); + exit(-1); + } + printf("Created publisher.\n"); + + /* Create a DataWriter for the ChatMessage Topic + (using the appropriate QoS). */ + talker = DDS_Publisher_create_datawriter( + chatPublisher, + chatMessageTopic, + DDS_DATAWRITER_QOS_USE_TOPIC_QOS, + NULL, + DDS_STATUS_MASK_NONE); + if (!talker) { + printf("Creating datawriter failed!!\n"); + exit(-1); + } + printf("Created datawriter.\n"); + + ownID = 0; + + msg = Chat_ChatMessage__alloc(); + //checkHandle(msg, "Chat_ChatMessage__alolc"); + msg->userID = ownID; + msg->index = 0; + msg->content = DDS_string_alloc(MAX_MSG_LEN); + //checkHandle(msg->content, "DDS_string_alloc"); + snprintf(msg->content, MAX_MSG_LEN, "hello world"); + + // register a chat message + userHandle = Chat_ChatMessageDataWriter_register_instance(talker, msg); + + ns.userID = ownID; + ns.name = DDS_string_alloc(Chat_MAX_NAME+1); + //checkHandle(ns.name, "DDS_string_alloc"); + char *chatterName; + snprintf(ns.name, Chat_MAX_NAME+1, "Chatter %d", ownID); + + int j; + msg_content = (char*) malloc(MAX_MSG_LEN); + for (j < 0; j < MAX_MSG_LEN; ++j) + { + msg_content[j] = '.'; + } + + // Write user information + status = Chat_NameServiceDataWriter_write(nameServer, &ns, DDS_HANDLE_NIL); + //checkStatus(status, "Chat_ChatMessageDataWriter_write"); + + printf("Created user handle and preallocated message.\n"); + + // end init + + rttest_read_args(argc, argv); + rttest_set_sched_priority(90, SCHED_RR); + + rttest_spin(callback, NULL); + rttest_write_results(); + rttest_finish(); + + // begin teardown + + /* Remove the DataWriters */ + status = DDS_Publisher_delete_datawriter(chatPublisher, + talker); + if (status != DDS_RETCODE_OK) { + printf("Deleting datawriter failed!!\n"); + exit(-1); + } + printf("Deleted datawriter.\n"); + + /* Remove the Publisher. */ + status = DDS_DomainParticipant_delete_publisher( + dp, chatPublisher); + if (status != DDS_RETCODE_OK) { + printf("Deleting publisher failed!!\n"); + exit(-1); + } + /* De-allocate the PublisherQoS holder. */ + DDS_free(pub_qos); // Note that DDS_free recursively + // de-allocates all indirections!! + printf("Deleted publisher.\n"); + + /* Deleting the Topic. */ + status = DDS_DomainParticipant_delete_topic( + dp, chatMessageTopic); + if (status != DDS_RETCODE_OK) { + printf("Deleting topic failed. Status = %d\n", status); + exit(-1); + }; + printf("Deleted ChatMessage topic.\n"); + + /* Deleting the DomainParticipant */ + status = DDS_DomainParticipantFactory_delete_participant( + dpf, dp); + if (status != DDS_RETCODE_OK) { + printf("Deleting participant failed. Status = %d\n", status); + exit(-1); + }; + printf("Deleted Participant.\n"); + + /* Everything is fine, return normally. */ + return 0; +}; diff --git a/tutorial_c/publisher/main.c b/tutorial_c/publisher/main.c index 15506f6..aaf0ef3 100644 --- a/tutorial_c/publisher/main.c +++ b/tutorial_c/publisher/main.c @@ -3,6 +3,8 @@ #include "Chat.h" #include "unistd.h" +#define MAX_MSG_LEN 128 + int main ( int argc, char *argv[]) @@ -127,36 +129,54 @@ int main ( } printf("Created datawriter.\n"); - /* Create a DataWriter for the NameService Topic - (using the appropriate QoS). */ - /*dw_qos = DDS_DataWriterQos__alloc(); - if (!dw_qos) { - printf("Allocating datawriter QoS failed!!\n"); - exit(-1); + // Initialize message + int ownID = 0; + + Chat_ChatMessage *msg; + msg = Chat_ChatMessage__alloc(); + //checkHandle(msg, "Chat_ChatMessage__alolc"); + msg->userID = ownID; + msg->index = 0; + msg->content = DDS_string_alloc(MAX_MSG_LEN); + //checkHandle(msg->content, "DDS_string_alloc"); + snprintf(msg->content, MAX_MSG_LEN, "hello world"); + + // register a chat message + DDS_InstanceHandle_t userHandle; + userHandle = Chat_ChatMessageDataWriter_register_instance(talker, msg); + + Chat_NameService ns; + ns.userID = ownID; + ns.name = DDS_string_alloc(Chat_MAX_NAME+1); + //checkHandle(ns.name, "DDS_string_alloc"); + char *chatterName; + if (chatterName) { + strncpy(ns.name, chatterName, Chat_MAX_NAME + 1); } - status = DDS_Publisher_get_default_datawriter_qos( - chatPublisher, dw_qos); - if (status != DDS_RETCODE_OK) { - printf("Getting default datawriter qos failed!!\n"); - exit(-1); + else { + snprintf(ns.name, Chat_MAX_NAME+1, "Chatter %d", ownID); } - //status = DDS_Publisher_copy_from_topic_qos( - // chatPublisher, dw_qos, setting_topic_qos); - //checkStatus(status, "DDS_Publisher_copy_from_topic_qos"); - dw_qos->writer_data_lifecycle.autodispose_unregistered_instances = FALSE; - nameServer = DDS_Publisher_create_datawriter( - chatPublisher, - nameServiceTopic, - dw_qos, - NULL, - DDS_STATUS_MASK_NONE); - if (!nameServer) { - printf("Creating datawriter (NameService) failed!!\n"); - exit(-1); - } - printf("Created datawriter (NameService).\n");*/ - sleep(5); + // Write user information + status = Chat_NameServiceDataWriter_write(nameServer, &ns, DDS_HANDLE_NIL); + //checkStatus(status, "Chat_ChatMessageDataWriter_write"); + + // Write a message + status = Chat_ChatMessageDataWriter_write(talker, msg, userHandle); + // checkStatus(status, "Chat_ChatMessageDataWriter_write"); + + // pause + sleep(1); + + int i = 0; + for (i = 1; i < 6; ++i) + { + msg->index = i; + snprintf(msg->content, MAX_MSG_LEN, "Message number: %d", msg->index); + status = Chat_ChatMessageDataWriter_write(talker, msg, userHandle); + // checkStatus(status, "Chat_ChatMessageDataWriter_write"); + sleep(1); + } /* Remove the DataWriters */