Skip to content

Commit

Permalink
Distributed (IntelLabs#45)
Browse files Browse the repository at this point in the history
* Adding Kafka-VDMS Connectors for Mutli-Modal Scenario

* clean the code, removing evalaution files and debugging message

* Change in VDMSConfig.h

DIRECTORIES_PER_LAYER back to 5

* Delete archive_db.sh

* Delete csvfile.h

* Update kafka_receiver.h

* Update kafka_sender.h

* Fix confilct with the develp branch

* Add librdkafka-dev to docker file

* remove dependency of kafka source code and fix zookeeper port numer and remove the CSV file deps from sender

* Update dockerfiles

* Delete distributed config file

* Fix docker-compose to build the vdms image
  • Loading branch information
Ragaad authored May 26, 2022
1 parent 67472a5 commit dcd4aaf
Show file tree
Hide file tree
Showing 19 changed files with 785 additions and 17 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ else()
add_subdirectory(src/vcl)
add_subdirectory(client/cpp)
add_subdirectory(ext/custom_vcl)
add_subdirectory(distributed)

link_directories(/usr/local/lib /usr/lib/x86_64-linux-gnu/)
include_directories(/usr/include/jsoncpp utils/include/ src/pmgd/include src/pmgd/util include/ src/vcl ${CMAKE_CURRENT_BINARY_DIR}/utils/src/protobuf)
add_library(dms SHARED src/BoundingBoxCommand.cc src/CommunicationManager.cc src/DescriptorsCommand.cc src/DescriptorsManager.cc src/ExceptionsCommand.cc src/ImageCommand.cc src/PMGDIterators.cc src/PMGDQuery.cc src/PMGDQueryHandler.cc src/QueryHandler.cc src/QueryMessage.cc src/RSCommand.cc src/SearchExpression.cc src/Server.cc src/VDMSConfig.cc src/VideoCommand.cc src/AutoDeleteNode.cc ${PROTO_SRCS} ${PROTO_HDRS})
Expand Down
28 changes: 28 additions & 0 deletions distributed/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
cmake_minimum_required(VERSION 3.0.0)
project(kaka_test VERSION 0.1.0 LANGUAGES "CXX")

add_compile_options(-g -fPIC)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -O0 -Wall -coverage -fprofile-arcs -ftest-coverage")
find_package(Protobuf REQUIRED)
include_directories(${CMAKE_CURRENT_BINARY_DIR})
# protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS ../utils/src/protobuf/partitionerMessages.proto ../utils/src/protobuf/pmgdMessages.proto ../utils/src/protobuf/queryMessage.proto)
# add_library(vdms_protobuf SHARED ${PROTO_SRCS} ${PROTO_HDRS})
include_directories(../client/cpp ../utils/include librdkafka/src
/usr/include/jsoncpp/ . .. )
link_directories( /usr/lib /usr/local/lib /usr/lib/x86_64-linux-gnu/ . )

add_executable(meta_data kafka_test.cpp )
target_link_libraries( meta_data jsoncpp protobuf
vdms-utils vdms-client vdms_protobuf rdkafka rdkafka++ pthread glog)

add_executable(image_data mutli_modal.cpp )
target_link_libraries(image_data jsoncpp protobuf
vdms-utils vdms-client vdms_protobuf rdkafka rdkafka++ pthread glog)
add_executable(multi-modal adaptive_platform.cpp )
target_link_libraries(multi-modal jsoncpp protobuf
vdms-utils vdms-client vdms_protobuf rdkafka rdkafka++ pthread glog)


set(CPACK_PROJECT_NAME ${PROJECT_NAME})
set(CPACK_PROJECT_VERSION ${PROJECT_VERSION})

58 changes: 58 additions & 0 deletions distributed/adaptive_platform.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#include "helpers.h"
#include "kafka_receiver.h"
#include "kafka_sender.h"

int main( int argc, char* argv[] ){
std::cout <<"adaptive-multi-modal" <<std::endl;
std::vector <std::unique_ptr<KafkaReceiver>> receivers;
std::vector <std::unique_ptr<KafkaSender>> senders;
int num_receivers=5;
int num_senders=5;
int num_topics =5;
std::string topics[num_topics];
for( int i=0; i< num_topics; i++){
topics[i]="topic_"+ std::to_string(i);
std::cout<< topics[i]<<std::endl;
}
Json::Value result =construct_query();

std::string q =writer.write(result);


std::string msg_meta= query_body(q);
std::string msg;

for (int i=0; i< num_senders ; i++){
senders.push_back(std::make_unique<KafkaSender>(sender_endpoint));
senders[i]->Init();


}
for( int i=0; i< num_receivers; i++){
receivers.push_back(std::make_unique<KafkaReceiver>(receiver_endpoint));
receivers.at(i)->Init();


}
int a=0;
while(true)
{
// while(clock()/CLOCKS_PER_SEC-a < 2);
if ( a>=100)
break;
for ( int i =0; i< num_senders ; i++ ) {

senders[i]->Send(msg_meta,topics[i], MAGENTA);
msg=(receivers[i]->Receive(topics[i],CYAN))->str();
std::cout<<msg <<std::endl;


send_to_vdms(vdms_server2, 55561, msg);
}

a++;
}


return 0;
}
98 changes: 98 additions & 0 deletions distributed/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
version: "3.8"
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.5.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:7.0.1
hostname: broker
container_name: broker
depends_on:
- zookeeper

ports:
- "9092:9092"
- "19092:19092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://broker:19092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:19092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
KAFKA_MESSAGE_MAX_BYTES: 200000
schema-registry:
image: confluentinc/cp-schema-registry:5.5.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper
- broker
ports:
- "8082:8082"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
create-topics:
image: confluentinc/cp-kafka:5.5.0
hostname: create-topics
container_name: create-topics
depends_on:
- broker
command: "
bash -c 'cub kafka-ready -b broker:29092 1 120 && \
kafka-topics --create --if-not-exists --zookeeper zookeeper:2181 --partitions 2 --replication-factor 1 --topic vdms-query'"
environment:
KAFKA_BROKER_ID: ignored
KAFKA_ZOOKEEPER_CONNECT: ignored
data_store_1:
build: '../docker/base'
image: vdms_dist:1
container_name: 'data_store_1'
ports:
- '55560:55560'
# network_mode: "host"
networks:
- default



data_store_2:
image: 'vdms_dist:1'
container_name: 'data_store_2'
network_mode: host
# # volumes:
# # - type: 'bind'
# # - 'vdms_data_2:/vdms/build/db'
# environment:
# NETWORK_PORT: 55561
# hostname: 'data_store_2'
# ports:
# - '55561:55561'
# volumes:
# - 'vdms_data_1':
# - 'vdms_data_2':




Binary file added distributed/examples/brain.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit dcd4aaf

Please sign in to comment.