Skip to content

Commit

Permalink
Merge branch 'hotfix/read-complete-responses'
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelgruner committed Jul 31, 2020
2 parents db32602 + 7640f20 commit 35d3ee1
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 61 deletions.
2 changes: 1 addition & 1 deletion configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ dnl required version of autoconf
AC_PREREQ([2.53])

dnl Gstreamer's daemon package name and version
AC_INIT([gstd],[0.10.0])
AC_INIT([gstd],[0.10.1])

dnl required version of gstreamer and gst-plugins-base
GST_REQUIRED=1.0.0
Expand Down
119 changes: 79 additions & 40 deletions gst_client/gst_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <gio/gio.h>
#include <gio/gunixsocketaddress.h>
#include <glib/gstdio.h>
#include <gmodule.h>
#include <locale.h>
#include <setjmp.h>
#include <stdio.h>
Expand Down Expand Up @@ -62,6 +63,10 @@ extern gint read_history ();
#define GSTD_CLIENT_DEFAULT_UNIX_BASE_NAME "gstd_unix_socket"
#define GSTD_CLIENT_DEFAULT_TCP_PORT 5000
#define GSTD_CLIENT_DEFAULT_UNIX_PORT 0
#define GSTD_CLIENT_MAX_RESPONSE 10485760 /* 10*1024*1024 */
#define GSTD_CLIENT_DOMAIN "gst-client"

static GQuark quark;

typedef struct _GstdClientData GstdClientData;
typedef struct _GstdClientCmd GstdClientCmd;
Expand Down Expand Up @@ -97,6 +102,7 @@ static gint gstd_client_cmd_source (gchar *, gchar *, GstdClientData *);
static gchar *gstd_client_completer (const gchar *, gint);
static gint gstd_client_execute (gchar *, GstdClientData *);
static void gstd_client_header (gboolean quiet);
static void gstd_client_process_error (GError * error);

/* Global variables */

Expand Down Expand Up @@ -281,6 +287,7 @@ main (gint argc, gchar * argv[])
const gchar *history_env = "GSTC_HISTORY";
gchar *history_full = NULL;
GstdClientData *data;
quark = g_quark_from_static_string (GSTD_CLIENT_DOMAIN);

/* Cmdline options */
gboolean quiet;
Expand Down Expand Up @@ -562,78 +569,110 @@ gstd_client_cmd_help (gchar * name, gchar * arg, GstdClientData * data)
return 0;
}

static void
gstd_client_process_error (GError * error)
{
g_return_if_fail (error);

g_printerr ("%s\n", error->message);
g_error_free (error);
}

static gint
gstd_client_cmd_socket (gchar * name, gchar * arg, GstdClientData * data)
{
gchar *cmd;
GError *err = NULL;
GInputStream *istream;
GOutputStream *ostream;
gchar buffer[1024 * 1024];
gint read;
gchar buffer[1024];
GString *response = NULL;
gchar *array = NULL;
gint read = 0;
gint acc_read = 0;
gchar *path_name;
const gchar terminator = '\0';
gint ret = -1;

g_return_val_if_fail (name, -1);
g_return_val_if_fail (arg, -1);
g_return_val_if_fail (data, -1);

cmd = g_strconcat (name, " ", arg, NULL);

if (!data->con) {
if (data->use_unix) {
GSocketAddress *socket_address;
g_socket_client_set_family (data->client, G_SOCKET_FAMILY_UNIX);
path_name = g_strdup_printf ("%s_%d", data->address, data->unix_port);
socket_address = g_unix_socket_address_new (path_name);
g_free (path_name);
if (data->use_unix) {
GSocketAddress *socket_address;
g_socket_client_set_family (data->client, G_SOCKET_FAMILY_UNIX);
path_name = g_strdup_printf ("%s_%d", data->address, data->unix_port);
socket_address = g_unix_socket_address_new (path_name);
g_free (path_name);

data->con = g_socket_client_connect (data->client,
(GSocketConnectable *) socket_address, NULL, &err);
} else {
data->con = g_socket_client_connect_to_host (data->client,
data->address, data->tcp_port, NULL, &err);
}
data->con = g_socket_client_connect (data->client,
(GSocketConnectable *) socket_address, NULL, &err);
} else {
data->con = g_socket_client_connect_to_host (data->client,
data->address, data->tcp_port, NULL, &err);
}

if (err)
goto error;
if (err) {
gstd_client_process_error (err);
goto out;
}

istream = g_io_stream_get_input_stream (G_IO_STREAM (data->con));
ostream = g_io_stream_get_output_stream (G_IO_STREAM (data->con));

cmd = g_strconcat (name, " ", arg, NULL);

err = NULL;
g_output_stream_write (ostream, cmd, strlen (cmd), NULL, &err);
g_free (cmd);
if (err)
goto error;

if (err) {
gstd_client_process_error (err);
goto write_error;
}

//Paranoia flush
g_output_stream_flush (ostream, NULL, NULL);

read = g_input_stream_read (istream, &buffer, sizeof (buffer), NULL, &err);
if (err)
goto error;
response = g_string_new ("");

do {
read = g_input_stream_read (istream, &buffer, sizeof (buffer), NULL, &err);
if (err) {
gstd_client_process_error (err);
goto read_error;
}

g_string_append_len (response, buffer, read);

acc_read += read;
if (acc_read >= GSTD_CLIENT_MAX_RESPONSE) {
g_set_error (&err, quark, -1,
"Response exceeded %d bytes limit, probably the trailing "
"NULL character was missing.", GSTD_CLIENT_MAX_RESPONSE);
gstd_client_process_error (err);
goto read_error;
}

} while (buffer[read - 1] != terminator);

//Make sure it has its sentinel and print
buffer[read] = '\0';
g_print ("%s\n", buffer);
array = g_string_free (response, FALSE);
g_print ("%s\n", array);
g_free (array);

// FIXME: Hack to open a new connection with every message
ret = 0;
goto out;

read_error:
g_string_free (response, TRUE);

write_error:
g_io_stream_close (G_IO_STREAM (data->con), NULL, NULL);
g_object_unref (data->con);
data->con = NULL;

return 0;

error:
{
g_printerr ("%s\n", err->message);
g_error_free (err);
if (data->con) {
g_object_unref (data->con);
data->con = NULL;
}
return -1;
}
out:
return ret;
}

static gint
Expand Down
5 changes: 4 additions & 1 deletion libgstc/libgstc.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ extern "C"
* @GSTC_SOCKET_ERROR: Unable to open the network socket
* @GSTC_THREAD_ERROR: Unable to create a new thread
* @GSTC_BUS_TIMEOUT: A timeout was received while waiting on the bus
* @GSTC_LONG_RESPONSE: The response exceeds our maximum, typically
* meaning a missing null terminator
*
* Return codes for the different libgstc operations
*/
Expand All @@ -118,7 +120,8 @@ typedef enum
GSTC_SOCKET_ERROR = -10,
GSTC_THREAD_ERROR = -11,
GSTC_BUS_TIMEOUT = -12,
GSTC_SOCKET_TIMEOUT = -13
GSTC_SOCKET_TIMEOUT = -13,
GSTC_LONG_RESPONSE = -14
} GstcStatus;

/**
Expand Down
77 changes: 59 additions & 18 deletions libgstc/libgstc_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@

/* Allow the user to override this value at build time */
#ifndef GSTC_MAX_RESPONSE_LENGTH
# define GSTC_MAX_RESPONSE_LENGTH 8192
# define GSTC_MAX_RESPONSE_LENGTH 10485760 //10 * 1024 * 1024
#endif

#define NUMBER_OF_SOCKETS (1)

static int create_new_socket ();
static GstcStatus open_socket (GstcSocket * self);
static GstcStatus accumulate_response (int socket, char **response);

struct _GstcSocket
{
Expand Down Expand Up @@ -129,6 +130,47 @@ gstc_socket_new (const char *address, const unsigned int port,
return ret;
}

static GstcStatus
accumulate_response (int socket, char **response)
{
char buffer[1024];
ssize_t read = 0;
size_t acc = 0;
int flags = 0;
const char terminator = '\0';
GstcStatus ret = GSTC_OK;
char *dst = NULL;

gstc_assert_and_ret_val (NULL != response, GSTC_NULL_ARGUMENT);

*response = NULL;

do {
read = recv (socket, buffer, sizeof(buffer), flags);

if (read < 0) {
ret = GSTC_RECV_ERROR;
break;
}

*response = realloc (*response, acc + read);

dst = *response + acc;
acc += read;

if (acc >= GSTC_MAX_RESPONSE_LENGTH) {
ret = GSTC_LONG_RESPONSE;
free (*response);
*response = NULL;
break;
}

memcpy (dst, buffer, read);
} while (buffer[read - 1] != terminator);

return ret;
}

GstcStatus
gstc_socket_send (GstcSocket * self, const char *request, char **response,
const int timeout)
Expand All @@ -150,41 +192,40 @@ gstc_socket_send (GstcSocket * self, const char *request, char **response,

if (send (self->socket, request, strlen (request), 0) < 0) {
ret = GSTC_SEND_ERROR;
goto out;
goto close_con;
}

*response = malloc (GSTC_MAX_RESPONSE_LENGTH);

ufds[0].fd = self->socket;
ufds[0].events = POLLIN;

rv = poll (ufds, NUMBER_OF_SOCKETS, timeout);

/* Error ocurred in poll */
if (rv == -1) {
return GSTC_SOCKET_ERROR;
ret = GSTC_SOCKET_ERROR;
goto close_con;
}

/* Timeout ocurred */
else if (rv == 0) {
return GSTC_SOCKET_TIMEOUT;
} else {
/* Check for events on the socket */
if (ufds[0].revents & POLLIN) {
if (recv (self->socket, *response, GSTC_MAX_RESPONSE_LENGTH, 0) < 0) {
return GSTC_RECV_ERROR;
}
} else {
return GSTC_SOCKET_ERROR;
}
if (rv == 0) {
ret = GSTC_SOCKET_TIMEOUT;
goto close_con;
}

ret = GSTC_OK;
/* Check for events on the socket */
if (0 == (ufds[0].revents & POLLIN)) {
ret = GSTC_SOCKET_ERROR;
goto close_con;
}

out:
ret = accumulate_response (self->socket, response);

close_con:
if (!self->keep_connection_open) {
close (self->socket);
}

out:
return ret;
}

Expand Down
2 changes: 1 addition & 1 deletion meson.build
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
project('gstd', 'c',
version : '0.10.0',
version : '0.10.1',
meson_version : '>= 0.50',)

gstd_version = meson.project_version()
Expand Down
39 changes: 39 additions & 0 deletions tests/libgstc/test_libgstc_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,44 @@ GST_START_TEST (test_socket_null_resp_placeholder)

GST_END_TEST;

GST_START_TEST (test_socket_long_response)
{
GstcSocket *gstc_socket;
GstcStatus ret;
const gchar *address = "127.0.0.1";
const gint port = 54321;
const long wait_time = -1;
const gint keep_open = FALSE;
const gchar *request = "ping";
gchar *expected;
gint size = 1024*1024;
gchar *response;
int i = 0;

ret = gstc_socket_new (address, port, keep_open, &gstc_socket);
assert_equals_int (GSTC_OK, ret);
fail_if (NULL == gstc_socket);

expected = g_malloc (size);
for (i = 0; i < size; ++i) {
expected[i] = 'a' + (i%10);
}
expected[size - 1] = '\0';

_mock_expected = expected;
ret = gstc_socket_send (gstc_socket, request, &response, wait_time);

assert_equals_int (GSTC_OK, ret);
assert_equals_string (expected, response);

g_free (expected);

g_free (response);
gstc_socket_free (gstc_socket);
}

GST_END_TEST;

static Suite *
libgstc_client_suite (void)
{
Expand All @@ -459,6 +497,7 @@ libgstc_client_suite (void)
tcase_add_test (tc, test_socket_null_socket);
tcase_add_test (tc, test_socket_null_request);
tcase_add_test (tc, test_socket_null_resp_placeholder);
tcase_add_test (tc, test_socket_long_response);

return suite;
}
Expand Down

0 comments on commit 35d3ee1

Please sign in to comment.