Skip to content

Commit

Permalink
Update log_proxy.c
Browse files Browse the repository at this point in the history
  • Loading branch information
thefab committed Aug 16, 2021
1 parent 9b97aea commit af1b2a0
Showing 1 changed file with 47 additions and 19 deletions.
66 changes: 47 additions & 19 deletions src/log_proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
struct sigaction sigact;
static gboolean first_iteration = TRUE;
static GMutex *mutex = NULL;
static gboolean stop_flag = FALSE;
static GAsyncQueue *queue = NULL;
static volatile gboolean stop_signal = FALSE;
static GIOChannel *in = NULL;

gint _list_compare(gconstpointer a, gconstpointer b) {
Expand Down Expand Up @@ -97,14 +98,11 @@ gboolean rotate() {

void signal_handler(int signum) {
if ((signum == SIGTERM) || (signum == SIGTERM)) {
stop_flag = TRUE;
stop_signal = TRUE;
}
}

static void every_second() {
if (stop_flag == TRUE) {
return;
}
int fd = lock_control_file(log_file);
if (fd >= 0) {
if (first_iteration) {
Expand Down Expand Up @@ -150,15 +148,48 @@ static void every_second() {
}
}

gpointer management_thread(gpointer data) {
gpointer stop_thread(gpointer data) {
// we do this here and not in signal_handler to avoid malloc in signal
// handlers
UNUSED(data);
while (stop_flag == FALSE) {
while (stop_signal == FALSE) {
sleep(1);
}
g_async_queue_push(queue, GINT_TO_POINTER(2));
return NULL;
}

gpointer management_thread(gpointer data) {
gpointer qdata;
GTimeVal tval;
gint stop_flag = 0;
UNUSED(data);
while (TRUE) {
g_get_current_time(&tval);
g_time_val_add(&tval, 1000000);
qdata = g_async_queue_timed_pop(queue, &tval);
if (qdata != NULL) {
stop_flag = GPOINTER_TO_INT(qdata);
}
g_mutex_lock(mutex);
every_second();
if (qdata != NULL) {
break;
}
g_mutex_unlock(mutex);
}
g_io_channel_shutdown(in, TRUE, NULL);
destroy_output_channel();
if (rm_fifo_at_exit == TRUE) {
if (fifo != NULL) {
g_unlink(fifo);
}
}
if (stop_flag == 2) {
// we exit here for sigterm as main thread is blocked in reading
exit(0);
}
g_mutex_unlock(mutex);
return NULL;
}

void init_or_reinit_output_channel(const gchar *lg_file, gboolean us_locks) {
Expand Down Expand Up @@ -190,6 +221,7 @@ int main(int argc, char *argv[])
}
g_thread_init(NULL);
mutex = g_mutex_new();
queue = g_async_queue_new();
signal(SIGTERM, signal_handler);
signal(SIGINT, signal_handler);
set_default_values_from_env();
Expand Down Expand Up @@ -218,9 +250,9 @@ int main(int argc, char *argv[])
GIOStatus in_status = G_IO_STATUS_NORMAL;
GString *in_buffer = g_string_new(NULL);
init_or_reinit_output_channel(log_file, use_locks);
GThread* management = g_thread_create(management_thread, NULL, FALSE, NULL);
UNUSED(management);
while ((stop_flag == FALSE) && (in_status != G_IO_STATUS_EOF) && (in_status != G_IO_STATUS_ERROR)) {
g_thread_create(stop_thread, NULL, FALSE, NULL);
GThread* management = g_thread_create(management_thread, NULL, TRUE, NULL);
while ((in_status != G_IO_STATUS_EOF) && (in_status != G_IO_STATUS_ERROR)) {
in_status = g_io_channel_read_line_string(in, in_buffer, NULL, NULL);
if (in_status == G_IO_STATUS_NORMAL) {
g_mutex_lock(mutex);
Expand All @@ -236,18 +268,14 @@ int main(int argc, char *argv[])
g_mutex_unlock(mutex);
}
}
every_second();
destroy_output_channel();
// if we are here, the "in" input channel is closed
g_async_queue_push(queue, GINT_TO_POINTER(1));
g_thread_join(management);
g_io_channel_shutdown(in, FALSE, NULL);
g_io_channel_unref(in);
g_string_free(in_buffer, TRUE);
g_option_context_free(context);
g_free(log_file);
g_mutex_free(mutex);
if (rm_fifo_at_exit == TRUE) {
if (fifo != NULL) {
g_unlink(fifo);
}
}
g_mutex_clear(mutex);
return 0;
}

0 comments on commit af1b2a0

Please sign in to comment.