diff --git a/Inc/dataStream.h b/Inc/dataStream.h deleted file mode 100644 index e0f8cb85..00000000 --- a/Inc/dataStream.h +++ /dev/null @@ -1,32 +0,0 @@ -#ifndef _DATA_STREAM_H_ -#define _DATA_STREAM_H_ - -#include -#include - -#ifdef __cplusplus -extern "C" { -#endif - -enum ReceiveResult -{ - RECEIVE_RESULT_OK, - RECEIVE_RESULT_TIMEOUT, - RECEIVE_RESULT_EOF, - RECEIVE_RESULT_ERROR, -}; - -struct DataStream -{ - enum ReceiveResult (*receive)(struct DataStream* stream, void* buffer, size_t bufferSize, struct timeval* timeout, size_t* receivedSize); - void (*close)(struct DataStream* stream); -}; - -struct DataStream* dataStreamCreateSocket(const char* server, int port); -struct DataStream* dataStreamCreateFile(const char* file); - -#ifdef __cplusplus -} -#endif - -#endif \ No newline at end of file diff --git a/Inc/stream.h b/Inc/stream.h new file mode 100644 index 00000000..345cfe76 --- /dev/null +++ b/Inc/stream.h @@ -0,0 +1,33 @@ +#ifndef _STREAM_H_ +#define _STREAM_H_ + +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +enum ReceiveResult +{ + RECEIVE_RESULT_OK, + RECEIVE_RESULT_TIMEOUT, + RECEIVE_RESULT_EOF, + RECEIVE_RESULT_ERROR, +}; + +struct Stream +{ + enum ReceiveResult ( *receive )( struct Stream *stream, void *buffer, size_t bufferSize, + struct timeval *timeout, size_t *receivedSize ); + void ( *close )( struct Stream *stream ); +}; + +struct Stream *streamCreateSocket( const char *server, int port ); +struct Stream *streamCreateFile( const char *file ); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/Makefile b/Makefile index 6d94e8e6..9089cbd4 100644 --- a/Makefile +++ b/Makefile @@ -105,11 +105,11 @@ endif # ========== ORBLIB_CFILES = $(App_DIR)/itmDecoder.c $(App_DIR)/tpiuDecoder.c $(App_DIR)/msgDecoder.c $(App_DIR)/msgSeq.c $(App_DIR)/traceDecoder.c -ORBLIB_CFILES = $(App_DIR)/itmDecoder.c $(App_DIR)/tpiuDecoder.c $(App_DIR)/msgDecoder.c $(App_DIR)/msgSeq.c $(App_DIR)/traceDecoder.c $(App_DIR)/dataStream_socket.c +ORBLIB_CFILES = $(App_DIR)/itmDecoder.c $(App_DIR)/tpiuDecoder.c $(App_DIR)/msgDecoder.c $(App_DIR)/msgSeq.c $(App_DIR)/traceDecoder.c $(App_DIR)/stream_socket.c ifdef WINDOWS else - ORBLIB_CFILES += $(App_DIR)/dataStream_file_posix.c + ORBLIB_CFILES += $(App_DIR)/stream_file_posix.c endif ORBUCULUM_CFILES = $(App_DIR)/$(ORBUCULUM).c $(App_DIR)/nwclient.c diff --git a/Src/dataStream_file_posix.c b/Src/dataStream_file_posix.c deleted file mode 100644 index 5781f2a5..00000000 --- a/Src/dataStream_file_posix.c +++ /dev/null @@ -1,84 +0,0 @@ -#include "dataStream.h" -#include -#include -#include - -#include "generics.h" - - -struct PosixFileDataStream -{ - struct DataStream base; - int file; -}; - -#define SELF(stream) ((struct PosixFileDataStream*)(stream)) - -static enum ReceiveResult posixFileStreamReceive(struct DataStream* stream, void* buffer, size_t bufferSize, struct timeval* timeout, size_t* receivedSize) -{ - struct PosixFileDataStream* self = SELF(stream); - fd_set readFd; - FD_ZERO(&readFd); - FD_SET(self->file, &readFd); - - int r = select(self->file + 1, &readFd, NULL, NULL, timeout); - - if(r < 0) - { - return RECEIVE_RESULT_ERROR; - } - - if(r == 0) - { - *receivedSize = 0; - return RECEIVE_RESULT_TIMEOUT; - } - - *receivedSize = read(self->file, buffer, bufferSize); - if(*receivedSize == 0) - { - return RECEIVE_RESULT_EOF; - } - - return RECEIVE_RESULT_OK; -} - -static void posixFileStreamClose(struct DataStream* stream) -{ - struct PosixFileDataStream* self = SELF(stream); - close(self->file); -} - -static int posixFileStreamCreate(const char* file) -{ - int f = open(file, O_RDONLY); - - if (f < 0) - { - genericsExit( -4, "Can't open file %s" EOL, file ); - } - - return f; -} - -struct DataStream* dataStreamCreateFile(const char* file) -{ - struct PosixFileDataStream* stream = SELF(calloc(1, sizeof(struct PosixFileDataStream))); - - if(stream == NULL) - { - return NULL; - } - - stream->base.receive = posixFileStreamReceive; - stream->base.close = posixFileStreamClose; - stream->file = posixFileStreamCreate(file); - - if(stream->file == -1) - { - free(stream); - return NULL; - } - - return &stream->base; -} \ No newline at end of file diff --git a/Src/dataStream_socket.c b/Src/dataStream_socket.c deleted file mode 100644 index 4f53fc9e..00000000 --- a/Src/dataStream_socket.c +++ /dev/null @@ -1,134 +0,0 @@ -#include "dataStream.h" -#include -#include -#include -#include -#ifdef WIN32 - #include -#else - #include - #include - #include -#endif - -#include "generics.h" - - -struct SocketDataStream -{ - struct DataStream base; - int socket; -}; - -#define SELF(stream) ((struct SocketDataStream*)(stream)) - -#ifdef WIN32 - // https://stackoverflow.com/a/14388707/995351 - #define SO_REUSEPORT SO_REUSEADDR -#endif - -static enum ReceiveResult socketStreamReceive(struct DataStream* stream, void* buffer, size_t bufferSize, struct timeval* timeout, size_t* receivedSize) -{ - struct SocketDataStream* self = SELF(stream); - - *receivedSize = 0; - - fd_set readFd; - FD_ZERO(&readFd); - FD_SET(self->socket, &readFd); - - int r = select(self->socket + 1, &readFd, NULL, NULL, timeout); - - if(r < 0) - { - return RECEIVE_RESULT_ERROR; - } - - if(r == 0) - { - *receivedSize = 0; - return RECEIVE_RESULT_TIMEOUT; - } - - ssize_t result = recv(self->socket, buffer, bufferSize, 0); - if(result <= 0) - { - // report connection broken as error - return RECEIVE_RESULT_ERROR; - } - - *receivedSize = result; - return RECEIVE_RESULT_OK; -} - -static void socketStreamClose(struct DataStream* stream) -{ - struct SocketDataStream* self = SELF(stream); - close(self->socket); -} - -static int socketStreamCreate(const char* server, int port) -{ - #ifdef WIN32 - WSADATA wsaData; - WSAStartup(MAKEWORD(2, 2), &wsaData); - #endif - - int sockfd = socket( AF_INET, SOCK_STREAM, IPPROTO_TCP ); - - int flag = 1; - setsockopt( sockfd, SOL_SOCKET, SO_REUSEPORT, (const void*)&flag, sizeof( flag ) ); - - if ( sockfd < 0 ) - { - genericsReport( V_ERROR, "Error creating socket" EOL ); - return -1; - } - - struct hostent* serverEnt = gethostbyname( server ); - if ( !serverEnt ) - { - close(sockfd); - genericsReport( V_ERROR, "Cannot find host" EOL ); - return -1; - } - - struct sockaddr_in serv_addr; - /* Now open the network connection */ - memset( &serv_addr, 0, sizeof( serv_addr ) ); - - serv_addr.sin_family = AF_INET; - memcpy(&serv_addr.sin_addr.s_addr, serverEnt->h_addr, serverEnt->h_length); - serv_addr.sin_port = htons(port); - - if ( connect( sockfd, ( struct sockaddr * ) &serv_addr, sizeof( serv_addr ) ) < 0 ) - { - close(sockfd); - genericsReport( V_ERROR, "Could not connect" EOL ); - return -1; - } - - return sockfd; -} - -struct DataStream* dataStreamCreateSocket(const char* server, int port) -{ - struct SocketDataStream* stream = SELF(calloc(1, sizeof(struct SocketDataStream))); - - if(stream == NULL) - { - return NULL; - } - - stream->base.receive = socketStreamReceive; - stream->base.close = socketStreamClose; - stream->socket = socketStreamCreate(server, port); - - if(stream->socket == -1) - { - free(stream); - return NULL; - } - - return &stream->base; -} \ No newline at end of file diff --git a/Src/orbcat.c b/Src/orbcat.c index ddd22d0d..7ac67e93 100644 --- a/Src/orbcat.c +++ b/Src/orbcat.c @@ -20,7 +20,7 @@ #include "tpiuDecoder.h" #include "itmDecoder.h" #include "msgDecoder.h" -#include "dataStream.h" +#include "stream.h" #define NUM_CHANNELS 32 #define HW_CHANNEL (NUM_CHANNELS) /* Make the hardware fifo on the end of the software ones */ @@ -504,34 +504,35 @@ int _processOptions( int argc, char *argv[] ) } // ==================================================================================================== -static struct DataStream* tryOpenDataStream() +static struct Stream *_tryOpenStream() { - if(options.file != NULL) + if ( options.file != NULL ) { - return dataStreamCreateFile(options.file); + return streamCreateFile( options.file ); } else { - return dataStreamCreateSocket(options.server, options.port); + return streamCreateSocket( options.server, options.port ); } } +// ==================================================================================================== -static void feedDataStream(struct DataStream* stream) +static void _feedStream( struct Stream *stream ) { unsigned char cbw[TRANSFER_SIZE]; - while(true) + while ( true ) { size_t receivedSize; - enum ReceiveResult result = stream->receive(stream, cbw, TRANSFER_SIZE, NULL, &receivedSize); + enum ReceiveResult result = stream->receive( stream, cbw, TRANSFER_SIZE, NULL, &receivedSize ); - if(result != RECEIVE_RESULT_OK) + if ( result != RECEIVE_RESULT_OK ) { - if(result == RECEIVE_RESULT_EOF && options.endTerminate) + if ( result == RECEIVE_RESULT_EOF && options.endTerminate ) { return; } - else if (result == RECEIVE_RESULT_ERROR) + else if ( result == RECEIVE_RESULT_ERROR ) { break; } @@ -564,21 +565,22 @@ int main( int argc, char *argv[] ) TPIUDecoderInit( &_r.t ); ITMDecoderInit( &_r.i, options.forceITMSync ); - while(true) + while ( true ) { - struct DataStream* stream = NULL; - while(true) + struct Stream *stream = NULL; + + while ( true ) { - stream = tryOpenDataStream(); + stream = _tryOpenStream(); - if(stream != NULL) + if ( stream != NULL ) { break; } genericsReport( V_ERROR, "Failed to open data stream" EOL ); - if(options.endTerminate) + if ( options.endTerminate ) { break; } @@ -588,17 +590,17 @@ int main( int argc, char *argv[] ) } } - if(stream == NULL) + if ( stream == NULL ) { break; } - feedDataStream(stream); + _feedStream( stream ); - stream->close(stream); - free(stream); + stream->close( stream ); + free( stream ); - if(options.endTerminate) + if ( options.endTerminate ) { break; } @@ -607,6 +609,6 @@ int main( int argc, char *argv[] ) usleep( 100000 ); } } - + } // ==================================================================================================== diff --git a/Src/orbmortem.c b/Src/orbmortem.c index 0bbdddda..e8cbf5a8 100644 --- a/Src/orbmortem.c +++ b/Src/orbmortem.c @@ -1128,6 +1128,13 @@ static void _doExit( void ) usleep( 200 ); SIOterminate( _r.sio ); } + +// ==================================================================================================== +// ==================================================================================================== +// ==================================================================================================== +// Publicly available routines +// ==================================================================================================== +// ==================================================================================================== // ==================================================================================================== int main( int argc, char *argv[] ) diff --git a/Src/orbtop.c b/Src/orbtop.c index c960c2e1..52bc8875 100644 --- a/Src/orbtop.c +++ b/Src/orbtop.c @@ -27,7 +27,7 @@ #include "symbols.h" #include "msgSeq.h" #include "nw.h" -#include "dataStream.h" +#include "stream.h" #define CUTOFF (10) /* Default cutoff at 0.1% */ #define TOP_UPDATE_INTERVAL (1000) /* Interval between each on screen update */ @@ -1093,17 +1093,17 @@ int _processOptions( int argc, char *argv[] ) return OK; } +// ==================================================================================================== - -static struct DataStream* openDataStream() +static struct Stream *_openStream() { - if(options.file != NULL) + if ( options.file != NULL ) { - return dataStreamCreateFile(options.file); + return streamCreateFile( options.file ); } else { - return dataStreamCreateSocket(options.server, options.port); + return streamCreateSocket( options.server, options.port ); } } @@ -1168,12 +1168,12 @@ int main( int argc, char *argv[] ) while ( 1 ) { - struct DataStream* stream = openDataStream(); + struct Stream *stream = _openStream(); - if(stream == NULL) + if ( stream == NULL ) { genericsReport( V_ERROR, "Failed to open data stream" EOL ); - usleep(500 * 1000); + usleep( 500 * 1000 ); continue; } @@ -1196,7 +1196,7 @@ int main( int argc, char *argv[] ) tv.tv_sec = remainTime / 1000000; tv.tv_usec = remainTime % 1000000; - receiveResult = stream->receive(stream, cbw, TRANSFER_SIZE, &tv, &receivedSize); + receiveResult = stream->receive( stream, cbw, TRANSFER_SIZE, &tv, &receivedSize ); } else { @@ -1204,13 +1204,13 @@ int main( int argc, char *argv[] ) receivedSize = 0; } - if(receiveResult == RECEIVE_RESULT_ERROR) + if ( receiveResult == RECEIVE_RESULT_ERROR ) { /* Something went wrong in the receive */ break; } - if(receiveResult == RECEIVE_RESULT_EOF) + if ( receiveResult == RECEIVE_RESULT_EOF ) { /* We are at EOF, hopefully next loop will get more data. */ } @@ -1241,7 +1241,7 @@ int main( int argc, char *argv[] ) } /* See if its time to post-process it */ - if ( receiveResult == RECEIVE_RESULT_TIMEOUT || remainTime <= 0) + if ( receiveResult == RECEIVE_RESULT_TIMEOUT || remainTime <= 0 ) { /* Create the report that we will output */ total = _consolodateReport( &report, &reportLines ); @@ -1284,8 +1284,8 @@ int main( int argc, char *argv[] ) } } - stream->close(stream); - free(stream); + stream->close( stream ); + free( stream ); } if ( ( !ITMDecoderGetStats( &_r.i )->tpiuSyncCount ) ) diff --git a/Src/stream_file_posix.c b/Src/stream_file_posix.c new file mode 100644 index 00000000..e408a718 --- /dev/null +++ b/Src/stream_file_posix.c @@ -0,0 +1,98 @@ +#include "stream.h" +#include +#include +#include + +#include "generics.h" + + +struct PosixFileStream +{ + struct Stream base; + int file; +}; + +#define SELF(stream) ((struct PosixFileStream*)(stream)) + +// ==================================================================================================== +static enum ReceiveResult _posixFileStreamReceive( struct Stream *stream, void *buffer, size_t bufferSize, + struct timeval *timeout, size_t *receivedSize ) +{ + struct PosixFileStream *self = SELF( stream ); + fd_set readFd; + FD_ZERO( &readFd ); + FD_SET( self->file, &readFd ); + + int r = select( self->file + 1, &readFd, NULL, NULL, timeout ); + + if ( r < 0 ) + { + return RECEIVE_RESULT_ERROR; + } + + if ( r == 0 ) + { + *receivedSize = 0; + return RECEIVE_RESULT_TIMEOUT; + } + + *receivedSize = read( self->file, buffer, bufferSize ); + + if ( *receivedSize == 0 ) + { + return RECEIVE_RESULT_EOF; + } + + return RECEIVE_RESULT_OK; +} + +// ==================================================================================================== +static void _posixFileStreamClose( struct Stream *stream ) +{ + struct PosixFileStream *self = SELF( stream ); + close( self->file ); +} + +// ==================================================================================================== +static int _posixFileStreamCreate( const char *file ) +{ + int f = open( file, O_RDONLY ); + + if ( f < 0 ) + { + genericsExit( -4, "Can't open file %s" EOL, file ); + } + + return f; +} + +// ==================================================================================================== +// ==================================================================================================== +// ==================================================================================================== +// Publicly available routines +// ==================================================================================================== +// ==================================================================================================== +// ==================================================================================================== + +struct Stream *streamCreateFile( const char *file ) +{ + struct PosixFileStream *stream = SELF( calloc( 1, sizeof( struct PosixFileStream ) ) ); + + if ( stream == NULL ) + { + return NULL; + } + + stream->base.receive = _posixFileStreamReceive; + stream->base.close = _posixFileStreamClose; + stream->file = _posixFileStreamCreate( file ); + + if ( stream->file == -1 ) + { + free( stream ); + return NULL; + } + + return &stream->base; +} +// ==================================================================================================== diff --git a/Src/stream_socket.c b/Src/stream_socket.c new file mode 100644 index 00000000..10b61cee --- /dev/null +++ b/Src/stream_socket.c @@ -0,0 +1,158 @@ +#include "stream.h" +#include +#include +#include +#include +#ifdef WIN32 + #include +#else + #include + #include + #include +#endif + +#include "generics.h" + + +struct SocketStream +{ + struct Stream base; + int socket; +}; + +#define SELF(stream) ((struct SocketStream*)(stream)) + +#ifdef WIN32 + // https://stackoverflow.com/a/14388707/995351 + #define SO_REUSEPORT SO_REUSEADDR +#endif + +// ==================================================================================================== +// ==================================================================================================== +// ==================================================================================================== +// Private routines +// ==================================================================================================== +// ==================================================================================================== +// ==================================================================================================== +static enum ReceiveResult _socketStreamReceive( struct Stream *stream, void *buffer, size_t bufferSize, + struct timeval *timeout, size_t *receivedSize ) +{ + struct SocketStream *self = SELF( stream ); + + *receivedSize = 0; + + fd_set readFd; + FD_ZERO( &readFd ); + FD_SET( self->socket, &readFd ); + + int r = select( self->socket + 1, &readFd, NULL, NULL, timeout ); + + if ( r < 0 ) + { + return RECEIVE_RESULT_ERROR; + } + + if ( r == 0 ) + { + *receivedSize = 0; + return RECEIVE_RESULT_TIMEOUT; + } + + ssize_t result = recv( self->socket, buffer, bufferSize, 0 ); + + if ( result <= 0 ) + { + // report connection broken as error + return RECEIVE_RESULT_ERROR; + } + + *receivedSize = result; + return RECEIVE_RESULT_OK; +} + +// ==================================================================================================== +static void _socketStreamClose( struct Stream *stream ) +{ + struct SocketStream *self = SELF( stream ); + close( self->socket ); +} + +// ==================================================================================================== +static int _socketStreamCreate( const char *server, int port ) +{ +#ifdef WIN32 + WSADATA wsaData; + WSAStartup( MAKEWORD( 2, 2 ), &wsaData ); +#endif + + int sockfd = socket( AF_INET, SOCK_STREAM, IPPROTO_TCP ); + + int flag = 1; + setsockopt( sockfd, SOL_SOCKET, SO_REUSEPORT, ( const void * )&flag, sizeof( flag ) ); + + if ( sockfd < 0 ) + { + genericsReport( V_ERROR, "Error creating socket" EOL ); + return -1; + } + + struct hostent *serverEnt = gethostbyname( server ); + + if ( !serverEnt ) + { + close( sockfd ); + genericsReport( V_ERROR, "Cannot find host" EOL ); + return -1; + } + + struct sockaddr_in serv_addr; + + /* Now open the network connection */ + memset( &serv_addr, 0, sizeof( serv_addr ) ); + + serv_addr.sin_family = AF_INET; + + memcpy( &serv_addr.sin_addr.s_addr, serverEnt->h_addr, serverEnt->h_length ); + + serv_addr.sin_port = htons( port ); + + if ( connect( sockfd, ( struct sockaddr * ) &serv_addr, sizeof( serv_addr ) ) < 0 ) + { + close( sockfd ); + genericsReport( V_ERROR, "Could not connect" EOL ); + return -1; + } + + return sockfd; +} + +// ==================================================================================================== +// ==================================================================================================== +// ==================================================================================================== +// Publicly available routines +// ==================================================================================================== +// ==================================================================================================== +// ==================================================================================================== + +struct Stream *streamCreateSocket( const char *server, int port ) +{ + struct SocketStream *stream = SELF( calloc( 1, sizeof( struct SocketStream ) ) ); + + if ( stream == NULL ) + { + return NULL; + } + + stream->base.receive = _socketStreamReceive; + stream->base.close = _socketStreamClose; + stream->socket = _socketStreamCreate( server, port ); + + if ( stream->socket == -1 ) + { + free( stream ); + return NULL; + } + + return &stream->base; +} +// ====================================================================================================