From 11bbff1a2a46cb3a7cdd1f5c298a27e5b5c88f3c Mon Sep 17 00:00:00 2001 From: Joshua Miller Date: Sat, 9 Nov 2013 16:38:53 -0600 Subject: [PATCH] complete refactor to udpipe --- src/udpipe.cpp | 198 +++++++++++++++++ src/udpipe.h | 60 ++++++ src/udpipe_client.cpp | 158 ++++++++++++++ src/udpipe_client.h | 26 +++ src/udpipe_server.cpp | 166 ++++++++++++++ src/udpipe_server.h | 24 +++ src/udpipe_threads.cpp | 477 +++++++++++++++++++++++++++++++++++++++++ src/udpipe_threads.h | 25 +++ 8 files changed, 1134 insertions(+) create mode 100644 src/udpipe.cpp create mode 100644 src/udpipe.h create mode 100644 src/udpipe_client.cpp create mode 100644 src/udpipe_client.h create mode 100644 src/udpipe_server.cpp create mode 100644 src/udpipe_server.h create mode 100644 src/udpipe_threads.cpp create mode 100644 src/udpipe_threads.h diff --git a/src/udpipe.cpp b/src/udpipe.cpp new file mode 100644 index 0000000..19d5c02 --- /dev/null +++ b/src/udpipe.cpp @@ -0,0 +1,198 @@ +/***************************************************************************** +Copyright 2013 Laboratory for Advanced Computing at the University of Chicago + +This file is part of udpipe + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions +and limitations under the License. +*****************************************************************************/ +#include +#include +#include +#include +#include +#include + +#include "udpipe.h" +#include "udpipe_server.h" +#include "udpipe_client.h" + +using std::cerr; +using std::endl; + +void usage(){ + fprintf(stderr, "usage: udpipe [udpipe options] host port\n"); + fprintf(stderr, "options:\n"); + // fprintf(stderr, "\t\t%s %s\t%s\n", "", "", ""); + + exit(1); +} + +void initialize_thread_args(thread_args *args){ + args->ip = NULL; + args->port = NULL; + args->blast = 0; + args->blast_rate = 1000; + args->udt_buff = BUFF_SIZE; + args->udp_buff = BUFF_SIZE; + args->mss = 8400; + args->use_crypto = 0; + args->verbose = 0; + args->n_crypto_threads = 1; + args->print_speed = 0; +} + +int main(int argc, char *argv[]){ + + int opt; + enum {NONE, SERVER, CLIENT}; + int operation = CLIENT; + + thread_args args; + initialize_thread_args(&args); + int use_crypto = 0; + char* path_to_key = NULL; + char* key = NULL; + int n_crypto_threads = 1; + + // ----------- [ Read in options + while ((opt = getopt (argc, argv, "hvsn:lp:f:")) != -1){ + switch (opt){ + + case 's': + args.print_speed = 1; + break; + + case 'l': + operation = SERVER; + break; + + case 'v': + args.verbose = 1; + break; + + case 'n': + args.use_crypto = 1; + use_crypto = 1; + n_crypto_threads = atoi(optarg); + break; + + case 'p': + args.use_crypto = 1; + use_crypto = 1; + key = strdup(optarg); + break; + + case 'f': + args.use_crypto = 1; + use_crypto = 1; + path_to_key = strdup(optarg); + break; + + case 'h': + usage(); + break; + + default: + fprintf(stderr, "Unknown command line arg. -h for help.\n"); + usage(); + exit(1); + + } + } + + if (use_crypto && (path_to_key && key)){ + fprintf(stderr, "error: Please specify either key or key file, not both.\n"); + exit(1); + } + + if (path_to_key){ + FILE*key_file = fopen(path_to_key, "r"); + if (!key_file){ + fprintf(stderr, "key file: %s.\n", strerror(errno)); + exit(1); + } + + fseek(key_file, 0, SEEK_END); + long size = ftell(key_file); + fseek(key_file, 0, SEEK_SET); + key = (char*)malloc(size); + fread(key, 1, size, key_file); + + } + + // if (!use_crypto && key){ + // fprintf(stderr, "warning: You've specified a key, but you don't have encryption turned on.\nProceeding without encryption.\n"); + // } + + if (use_crypto && !key){ + fprintf(stderr, "Please either: \n (1) %s\n (2) %s\n (3) %s\n", + "include key in cli [-p key]", + "read on in from file [-f /path/to/key/file]", + "choose not to use encryption, remove [-n]"); + exit(1); + } + + // Setup host + if (operation == CLIENT){ + if (optind < argc){ + args.ip = strdup(argv[optind++]); + } else { + cerr << "error: Please specify server host." << endl; + exit(1); + } + } + + // Check port input + if (optind < argc){ + args.port = strdup(argv[optind++]); + } else { + cerr << "error: Please specify port num." << endl; + exit(1); + } + + // Initialize crypto + if (use_crypto){ + + char* cipher = (char*) "aes-128"; + crypto enc(EVP_ENCRYPT, PASSPHRASE_SIZE, (unsigned char*)key, cipher, n_crypto_threads); + crypto dec(EVP_DECRYPT, PASSPHRASE_SIZE, (unsigned char*)key, cipher, n_crypto_threads); + args.enc = &enc; + args.dec = &dec; + + args.n_crypto_threads = n_crypto_threads; + + } else { + + args.enc = NULL; + args.dec = NULL; + + } + + if (key) + memset(key, 0, strlen(key)); + + // ----------- [ Spawn correct process + if (operation == SERVER){ + run_server(&args); + + } else if (operation == CLIENT){ + run_client(&args); + + } else { + cerr << "Operation type not known" << endl; + + } + + + +} diff --git a/src/udpipe.h b/src/udpipe.h new file mode 100644 index 0000000..4ddf4b7 --- /dev/null +++ b/src/udpipe.h @@ -0,0 +1,60 @@ +/***************************************************************************** +Copyright 2013 Laboratory for Advanced Computing at the University of Chicago + +This file is part of udr/udt + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions +and limitations under the License. +*****************************************************************************/ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "cc.h" +#include "udpipe_threads.h" +#include "crypto.h" + +/* #define BUFF_SIZE 327680 */ +#define BUFF_SIZE 67108864 + +typedef struct rs_args{ + UDTSOCKET*usocket; + crypto *c; + int use_crypto; + int verbose; + int n_crypto_threads; + +} rs_args; + +typedef struct thread_args{ + crypto *enc; + crypto *dec; + char *ip; + char *port; + int blast; + int blast_rate; + size_t udt_buff; + size_t udp_buff; + int mss; + int use_crypto; + int verbose; + int n_crypto_threads; + int print_speed; +} thread_args; + +void* send_buf_threaded(void*_args); diff --git a/src/udpipe_client.cpp b/src/udpipe_client.cpp new file mode 100644 index 0000000..4469b89 --- /dev/null +++ b/src/udpipe_client.cpp @@ -0,0 +1,158 @@ +/***************************************************************************** +Copyright 2013 Laboratory for Advanced Computing at the University of Chicago + +This file is part of udpipe. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions +and limitations under the License. +*****************************************************************************/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "udpipe.h" +#include "udpipe_client.h" + +#define prii(x) fprintf(stderr,"debug:%d\n",x) +#define pris(x) fprintf(stderr,"debug: %s\n",x) +#define uc_err(x) {fprintf(stderr,"error:%s\n",x);exit(1);} + +using std::cerr; +using std::endl; + +int run_client(thread_args *args) +{ + + if (args->verbose) + fprintf(stderr, "[client] Running client...\n"); + + char *ip = args->ip; + char *port = args->port; + int blast = args->blast; + int blast_rate = args->blast_rate; + int udt_buff = args->udt_buff; + int udp_buff = args->udp_buff; // 67108864; + int mss = args->mss; + + + if (args->verbose) + fprintf(stderr, "Starting UDT...\n"); + + UDT::startup(); + + struct addrinfo hints, *local, *peer; + + memset(&hints, 0, sizeof(struct addrinfo)); + + hints.ai_flags = AI_PASSIVE; + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + + if (0 != getaddrinfo(NULL, port, &hints, &local)){ + cerr << "incorrect network address.\n" << endl; + return 1; + } + + + if (args->verbose) + fprintf(stderr, "[client] Creating socket...\n"); + + + UDTSOCKET client; + client = UDT::socket(local->ai_family, local->ai_socktype, local->ai_protocol); + + // UDT Options + if (blast) + UDT::setsockopt(client, 0, UDT_CC, new CCCFactory, sizeof(CCCFactory)); + + UDT::setsockopt(client, 0, UDT_MSS, &mss, sizeof(int)); + UDT::setsockopt(client, 0, UDT_SNDBUF, &udt_buff, sizeof(int)); + UDT::setsockopt(client, 0, UDP_SNDBUF, &udp_buff, sizeof(int)); + + // freeaddrinfo(local); + + if (0 != getaddrinfo(ip, port, &hints, &peer)) { + cerr << "incorrect server/peer address. " << ip << ":" << port << endl; + return 1; + } + + if (args->verbose) + fprintf(stderr, "[client] Connecting to server...\n"); + + if (UDT::ERROR == UDT::connect(client, peer->ai_addr, peer->ai_addrlen)) { + + // cerr << "connect: " << UDT::getlasterror().getErrorCode() << endl; + cerr << "connect: " << UDT::getlasterror().getErrorMessage() << endl; + + return 1; + } + + if (args->verbose) + fprintf(stderr, "[client] Creating receive thread...\n"); + + pthread_t rcvthread, sndthread; + rs_args rcvargs; + rcvargs.usocket = new UDTSOCKET(client); + rcvargs.use_crypto = args->use_crypto; + rcvargs.verbose = args->verbose; + rcvargs.n_crypto_threads = args->n_crypto_threads; + rcvargs.c = args->dec; + + pthread_create(&rcvthread, NULL, recvdata, &rcvargs); + pthread_detach(rcvthread); + + + if (args->verbose) + fprintf(stderr, "[client] Creating send thread...\n"); + + + rs_args send_args; + send_args.usocket = new UDTSOCKET(client); + send_args.use_crypto = args->use_crypto; + send_args.verbose = args->verbose; + send_args.n_crypto_threads = args->n_crypto_threads; + send_args.c = args->enc; + + // freeaddrinfo(peer); + + if (blast) { + CUDPBlast* cchandle = NULL; + int temp; + UDT::getsockopt(client, 0, UDT_CC, &cchandle, &temp); + if (NULL != cchandle) + cchandle->setRate(blast_rate); + } + + if (args->print_speed){ + pthread_t mon_thread; + pthread_create(&mon_thread, NULL, monitor, &client); + + } + + pthread_create(&sndthread, NULL, senddata, &send_args); + + + void * retval; + pthread_join(sndthread, &retval); + + UDT::cleanup(); + + return 0; +} + diff --git a/src/udpipe_client.h b/src/udpipe_client.h new file mode 100644 index 0000000..536ad46 --- /dev/null +++ b/src/udpipe_client.h @@ -0,0 +1,26 @@ +/***************************************************************************** +Copyright 2013 Laboratory for Advanced Computing at the University of Chicago + +This file is part of udr/udt + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions +and limitations under the License. +*****************************************************************************/ +#include +#include +#include +#include +#include + + + +int run_client(thread_args *args); diff --git a/src/udpipe_server.cpp b/src/udpipe_server.cpp new file mode 100644 index 0000000..4a13a43 --- /dev/null +++ b/src/udpipe_server.cpp @@ -0,0 +1,166 @@ +/***************************************************************************** +Copyright 2013 Laboratory for Advanced Computing at the University of Chicago + +This file is part of udpipe + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions +and limitations under the License. +*****************************************************************************/ + +#include +#include +#include +#include +#include +#include + +#include "udpipe.h" + +using std::cerr; +using std::endl; +using std::string; + +void* recvdata(void*); +void* senddata(void*); + +int buffer_size; + +int run_server(thread_args *args){ + + if (args->verbose) + fprintf(stderr, "[server] Running server...\n"); + + char *port = args->port; + int blast = args->blast; + int udt_buff = args->udt_buff; + int udp_buff = args->udp_buff; // 67108864; + int mss = args->mss; + + + if (args->verbose) + fprintf(stderr, "[server] Starting UDT...\n"); + UDT::startup(); + + addrinfo hints; + addrinfo* res; + + memset(&hints, 0, sizeof(struct addrinfo)); + + hints.ai_flags = AI_PASSIVE; + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + + string service(port); + + if (0 != getaddrinfo(NULL, service.c_str(), &hints, &res)) { + cerr << "illegal port number or port is busy.\n" << endl; + return 1; + } + + buffer_size = udt_buff; + + + if (args->verbose) + fprintf(stderr, "[server] Creating socket...\n"); + + UDTSOCKET serv; + serv = UDT::socket(res->ai_family, res->ai_socktype, res->ai_protocol); + + // UDT Options + if (blast) + UDT::setsockopt(serv, 0, UDT_CC, new CCCFactory, sizeof(CCCFactory)); + + UDT::setsockopt(serv, 0, UDT_MSS, &mss, sizeof(int)); + UDT::setsockopt(serv, 0, UDT_RCVBUF, &udt_buff, sizeof(int)); + UDT::setsockopt(serv, 0, UDP_RCVBUF, &udp_buff, sizeof(int)); + + + if (args->verbose) + fprintf(stderr, "[server] Binding socket...\n"); + + + if (UDT::ERROR == UDT::bind(serv, res->ai_addr, res->ai_addrlen)) { + cerr << "bind: " << UDT::getlasterror().getErrorMessage() << endl; + return 1; + } + + if (UDT::ERROR == UDT::listen(serv, 10)){ + cerr << "listen: " << UDT::getlasterror().getErrorMessage() << endl; + return 1; + } + + sockaddr_storage clientaddr; + int addrlen = sizeof(clientaddr); + + + UDTSOCKET recver; + pthread_t rcvthread, sndthread; + + + if (args->verbose) + fprintf(stderr, "[server] Listening for client...\n"); + + if (UDT::INVALID_SOCK == (recver = UDT::accept(serv, + (sockaddr*)&clientaddr, &addrlen))) { + + cerr << "accept: " << UDT::getlasterror().getErrorMessage() << endl; + return 1; + } + + if (args->verbose) + fprintf(stderr, "[server] New client connection...\n"); + + char clienthost[NI_MAXHOST]; + char clientservice[NI_MAXSERV]; + getnameinfo((sockaddr *)&clientaddr, addrlen, clienthost, + sizeof(clienthost), clientservice, sizeof(clientservice), + NI_NUMERICHOST|NI_NUMERICSERV); + + + if (args->verbose) + fprintf(stderr, "[server] Creating receve thread...\n"); + + rs_args rcvargs; + rcvargs.usocket = new UDTSOCKET(recver); + rcvargs.use_crypto = args->use_crypto; + rcvargs.verbose = args->verbose; + rcvargs.n_crypto_threads = args->n_crypto_threads; + rcvargs.c = args->dec; + + pthread_create(&rcvthread, NULL, recvdata, &rcvargs); + pthread_detach(rcvthread); + + if (args->verbose) + fprintf(stderr, "[server] Creating send thread.\n"); + + rs_args send_args; + send_args.usocket = new UDTSOCKET(recver); + send_args.use_crypto = args->use_crypto; + send_args.verbose = args->verbose; + send_args.n_crypto_threads = args->n_crypto_threads; + send_args.c = args->enc; + + if (args->print_speed){ + pthread_t mon_thread; + pthread_create(&mon_thread, NULL, monitor, &recver); + + } + + + pthread_create(&sndthread, NULL, senddata, &send_args); + pthread_join(sndthread, NULL); + + UDT::cleanup(); + + return 0; +} + diff --git a/src/udpipe_server.h b/src/udpipe_server.h new file mode 100644 index 0000000..dc410cb --- /dev/null +++ b/src/udpipe_server.h @@ -0,0 +1,24 @@ +/***************************************************************************** +Copyright 2013 Laboratory for Advanced Computing at the University of Chicago + +This file is part of udr/udt + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions +and limitations under the License. +*****************************************************************************/ +#include +#include +#include +#include +#include + +int run_server(thread_args* args); diff --git a/src/udpipe_threads.cpp b/src/udpipe_threads.cpp new file mode 100644 index 0000000..dfb46d2 --- /dev/null +++ b/src/udpipe_threads.cpp @@ -0,0 +1,477 @@ +/***************************************************************************** +Copyright 2013 Laboratory for Advanced Computing at the University of Chicago + +This file is part of udpipe. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions +and limitations under the License. +*****************************************************************************/ +#include +#include +#include +#include +#include +#include + +#include "udpipe.h" +#include "udpipe_threads.h" + +#define DEBUG 0 +#define EXIT_FAILURE 1 + +#define prii(x) fprintf(stderr,"debug:%d\n",x) +#define pris(x) fprintf(stderr,"debug: %s\n",x) +#define prisi(x,y) fprintf(stderr,"%s: %d\n",x,y) +#define uc_err(x) {fprintf(stderr,"error:%s\n",x);exit(EXIT_FAILURE);} + +const int ECONNLOST = 2001; + +using std::cerr; +using std::endl; + +int READ_IN = 0; + +void print_bytes(const void *object, size_t size) +{ + size_t i; + + fprintf(stderr, "[ "); + for(i = 0; i < size; i++){ + fprintf(stderr, "%02x ", ((const unsigned char *) object)[i] & 0xff); + } + fprintf(stderr, "]\n"); +} + +void send_full(UDTSOCKET sock, char* buffer, int len){ + + int sent = 0; + int rs = 0; + while (sent < len){ + rs = UDT::send(sock, buffer+sent, len-sent, 0); + if (UDT::ERROR == rs) { + if (UDT::getlasterror().getErrorCode() != ECONNLOST) + cerr << "recv:" << UDT::getlasterror().getErrorMessage() << + "send_full: Unable to send data." << endl; + exit(1); + } + sent += rs; + } + + +} + +void recv_full(UDTSOCKET sock, char* buffer, int len) +{ + + int recvd = 0; + int rs = 0; + while (recvd < len){ + rs = UDT::recv(sock, buffer+recvd, len-recvd, 0); + if (UDT::ERROR == rs) { + if (UDT::getlasterror().getErrorCode() != ECONNLOST) + cerr << "recv:" << UDT::getlasterror().getErrorMessage() << + "send_full: Unable to send data." << endl; + exit(1); + } + recvd += rs; + } + + +} + +const int KEY_LEN = 1026; +int signed_auth; + + +void auth_peer(rs_args* args) +{ + + + char key[KEY_LEN]; + char signed_key[KEY_LEN]; + + RAND_bytes((unsigned char*)key, KEY_LEN); + + signed_auth = 0; + + send_full(*args->usocket, key, KEY_LEN); + + while (!signed_auth); + + recv_full(*args->usocket, signed_key, KEY_LEN); + + // fprintf(stderr, "key: "); print_bytes(key, 16); + // fprintf(stderr, "signed_key: "); print_bytes(signed_key, 16); + + int crypt_len = KEY_LEN/4; + for (int i = 0; i < crypt_len; i += crypt_len) + pass_to_enc_thread(signed_key+i, signed_key+i, crypt_len, args->c); + join_all_encryption_threads(args->c); + + if (memcmp(key, signed_key, KEY_LEN)){ + fprintf(stderr, "Authorization failed\n"); + exit(1); + + } + +} + + +void sign_auth(rs_args* args) +{ + + char key[KEY_LEN]; + + recv_full(*args->usocket, key, KEY_LEN); + + // fprintf(stderr, "signing: "); print_bytes(key, 16); + + int crypt_len = KEY_LEN/4; + for (int i = 0; i < crypt_len; i += crypt_len) + pass_to_enc_thread(key+i, key+i, crypt_len, args->c); + + join_all_encryption_threads(args->c); + + // fprintf(stderr, "signed: "); print_bytes(key, 16); + + send_full(*args->usocket, key, KEY_LEN); + + signed_auth = 1; + +} + + +void* recvdata(void * _args) +{ + + rs_args * args = (rs_args*)_args; + + if (args->verbose) + fprintf(stderr, "[recv thread] Initializing receive thread...\n"); + + if (args->verbose && args->use_crypto) + fprintf(stderr, "[recv thread] Receive encryption is on.\n"); + + UDTSOCKET recver = *args->usocket; + + int crypto_buff_len = BUFF_SIZE / args->n_crypto_threads; + int buffer_cursor; + + char* indata = (char*) malloc(BUFF_SIZE*sizeof(char)); + if (!indata){ + fprintf(stderr, "Unable to allocate decryption buffer"); + exit(EXIT_FAILURE); + } + + if (args->use_crypto) + auth_peer(args); + + // if (args->verbose) + // fprintf(stderr, "[recv thread] Checking encryption...\n"); + + // long remote_ssl_version = 0; + // int rs = UDT::recv(recver, (char*)&remote_ssl_version, sizeof(long), 0); + + // if (UDT::ERROR == rs) { + // if (UDT::getlasterror().getErrorCode() != ECONNLOST) + // cerr << "recv:" << UDT::getlasterror().getErrorMessage() << + // "Unable to determine remote crypto method" << endl; + // exit(1); + // } + + // if (args->use_crypto){ + // if (remote_ssl_version == 0){ + // cerr << "recv: Encryption mismatch: local[None] to remote[OpenSSL]" << endl; + // UDT::close(recver); + // exit(1); + // } + + // if (remote_ssl_version != OPENSSL_VERSION_NUMBER){ + // // versions don't match + // } + + // } else { + // if (remote_ssl_version != 0) { + // cerr << "recv: Encryption mismatch: local[OpenSSL] to remote[None]" << endl; + // write(fileno(stderr), &remote_ssl_version, sizeof(long)); + // UDT::close(recver); + // exit(1); + // } + // } + + READ_IN = 1; + + int new_block = 1; + int block_size = 0; + int offset = sizeof(int)/sizeof(char); + int crypto_cursor; + + if (args->verbose) + fprintf(stderr, "[recv thread] Listening on receive thread.\n"); + + if(args->use_crypto) { + while(true) { + int rs; + + if (new_block){ + + block_size = 0; + rs = UDT::recv(recver, (char*)&block_size, offset, 0); + + if (UDT::ERROR == rs) { + if (UDT::getlasterror().getErrorCode() != ECONNLOST){ + cerr << "recv:" << UDT::getlasterror().getErrorMessage() << endl; + exit(1); + } + exit(0); + } + + new_block = 0; + buffer_cursor = 0; + crypto_cursor = 0; + + } + + rs = UDT::recv(recver, indata+buffer_cursor, + block_size-buffer_cursor, 0); + + + if (UDT::ERROR == rs) { + if (UDT::getlasterror().getErrorCode() != ECONNLOST){ + cerr << "recv:" << UDT::getlasterror().getErrorMessage() << endl; + exit(1); + } + exit(0); + } + + + buffer_cursor += rs; + + // Decrypt any full encryption buffer sectors + while (crypto_cursor + crypto_buff_len < buffer_cursor){ + + pass_to_enc_thread(indata+crypto_cursor, indata+crypto_cursor, + crypto_buff_len, args->c); + crypto_cursor += crypto_buff_len; + + } + + // If we received the whole block + if (buffer_cursor == block_size){ + + int size = buffer_cursor - crypto_cursor; + pass_to_enc_thread(indata+crypto_cursor, indata+crypto_cursor, + size, args->c); + crypto_cursor += size; + + join_all_encryption_threads(args->c); + + write(fileno(stdout), indata, block_size); + + buffer_cursor = 0; + crypto_cursor = 0; + new_block = 1; + + } + } + + } else { + + int rs; + while (1){ + + rs = UDT::recv(recver, indata, BUFF_SIZE, 0); + + if (UDT::ERROR == rs) { + if (UDT::getlasterror().getErrorCode() != ECONNLOST){ + cerr << "recv:" << UDT::getlasterror().getErrorMessage() << endl; + exit(1); + } + exit(0); + } + + write(fileno(stdout), indata, rs); + + } + + + } + + UDT::close(recver); + return NULL; + +} + + +void* senddata(void* _args) +{ + rs_args * args = (rs_args*) _args; + + if (args->verbose) + fprintf(stderr, "[send thread] Initializing send thread...\n"); + + UDTSOCKET client = *(UDTSOCKET*)args->usocket; + + if (args->verbose && args->use_crypto) + fprintf(stderr, "[send thread] Send encryption is on.\n"); + + char* outdata = (char*)malloc(BUFF_SIZE*sizeof(char)); + int crypto_buff_len = BUFF_SIZE / args->n_crypto_threads; + + int offset = sizeof(int)/sizeof(char); + int bytes_read; + + if (args->verbose) + fprintf(stderr, "[send thread] Sending encryption status...\n"); + + if (args->use_crypto) + sign_auth(args); + + // long local_openssl_version; + // if (args->use_crypto) + // local_openssl_version = OPENSSL_VERSION_NUMBER; + // else + // local_openssl_version = 0; + + + // if (UDT::send(client, (char*)&local_openssl_version, sizeof(long), 0) < 0){ + // // cerr << "send:" << UDT::getlasterror().getErrorMessage() << endl; + // // UDT::close(client); + // // exit(1); + // } + + while (!READ_IN); + + if (args->verbose) + fprintf(stderr, "[send thread] Send thread listening on stdin.\n"); + + if (args->use_crypto){ + + while(true) { + int ss; + + bytes_read = read(fileno(stdin), outdata+offset, BUFF_SIZE); + + if(bytes_read < 0){ + cerr << "send:" << UDT::getlasterror().getErrorMessage() << endl; + UDT::close(client); + exit(1); + } + + if(bytes_read == 0) { + sleep(1); + UDT::close(client); + exit(0); + } + + if(args->use_crypto){ + + *((int*)outdata) = bytes_read; + int crypto_cursor = 0; + + while (crypto_cursor < bytes_read){ + int size = min(crypto_buff_len, bytes_read-crypto_cursor); + pass_to_enc_thread(outdata+crypto_cursor+offset, + outdata+crypto_cursor+offset, + size, args->c); + + crypto_cursor += size; + + } + + join_all_encryption_threads(args->c); + + bytes_read += offset; + + } + + int ssize = 0; + while(ssize < bytes_read) { + + if (UDT::ERROR == (ss = UDT::send(client, outdata + ssize, + bytes_read - ssize, 0))) { + + cerr << "send:" << UDT::getlasterror().getErrorMessage() << endl; + return NULL; + } + + ssize += ss; + + } + + + } + + } else { + + while (1) { + bytes_read = read(fileno(stdin), outdata, BUFF_SIZE); + int ssize = 0; + int ss; + + if(bytes_read == 0) { + UDT::close(client); + sleep(1); + exit(0); + } + + + while(ssize < bytes_read) { + + if (UDT::ERROR == (ss = UDT::send(client, outdata + ssize, + bytes_read - ssize, 0))) { + cerr << "send:" << UDT::getlasterror().getErrorMessage() << endl; + return NULL; + } + + ssize += ss; + + } + + } + } + + UDT::close(client); + + return NULL; +} + + +void* monitor(void* s) +{ + UDTSOCKET u = *(UDTSOCKET*)s; + + UDT::TRACEINFO perf; + + cerr << "Snd(Mb/s)\tRcv(Mb/s)\tRTT(ms)\tLoss\tPktSndPeriod(us)\tRecvACK\tRecvNAK" << endl; + + while (true) { + sleep(1); + + if (UDT::ERROR == UDT::perfmon(u, &perf)) { + cout << "perfmon: " << UDT::getlasterror().getErrorMessage() << endl; + break; + } + + cerr << perf.mbpsSendRate << "\t\t" + << perf.mbpsRecvRate << "\t\t" + << perf.msRTT << "\t" + << perf.pktRcvLoss << "\t" + << perf.pktRecv << "\t\t\t" + << perf.pktRecvACK << "\t" + << perf.pktRecvNAK << endl; + } + + return NULL; + +} + diff --git a/src/udpipe_threads.h b/src/udpipe_threads.h new file mode 100644 index 0000000..cee8578 --- /dev/null +++ b/src/udpipe_threads.h @@ -0,0 +1,25 @@ +/***************************************************************************** +Copyright 2013 Laboratory for Advanced Computing at the University of Chicago + +This file is part of . + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions +and limitations under the License. +*****************************************************************************/ + +// this should be maxline size +/* #define BUF_SIZE 100000 */ +#include "crypto.h" + +void* recvdata(void*); +void* senddata(void*); +void* monitor(void*);