diff --git a/src/log_proxy.c b/src/log_proxy.c index 3afa332..81d19db 100644 --- a/src/log_proxy.c +++ b/src/log_proxy.c @@ -17,8 +17,9 @@ struct sigaction sigact; static gboolean first_iteration = TRUE; -static GMutex *mutex = NULL; -static gboolean stop_flag = FALSE; +static GMutex mutex; +static GAsyncQueue *queue = NULL; +static volatile gboolean stop_signal = FALSE; static GIOChannel *in = NULL; gint _list_compare(gconstpointer a, gconstpointer b) { @@ -97,14 +98,11 @@ gboolean rotate() { void signal_handler(int signum) { if ((signum == SIGTERM) || (signum == SIGTERM)) { - stop_flag = TRUE; + stop_signal = TRUE; } } static void every_second() { - if (stop_flag == TRUE) { - return; - } int fd = lock_control_file(log_file); if (fd >= 0) { if (first_iteration) { @@ -150,15 +148,48 @@ static void every_second() { } } -gpointer management_thread(gpointer data) { +gpointer stop_thread(gpointer data) { + // we do this here and not in signal_handler to avoid malloc in signal + // handlers UNUSED(data); - while (stop_flag == FALSE) { + while (stop_signal == FALSE) { sleep(1); - g_mutex_lock(mutex); + } + g_async_queue_push(queue, GINT_TO_POINTER(2)); + return NULL; +} + +gpointer management_thread(gpointer data) { + gpointer qdata; + GTimeVal tval; + gint stop_flag = 0; + UNUSED(data); + while (TRUE) { + g_get_current_time(&tval); + g_time_val_add(&tval, 1000000); + qdata = g_async_queue_timed_pop(queue, &tval); + if (qdata != NULL) { + stop_flag = GPOINTER_TO_INT(qdata); + } + g_mutex_lock(&mutex); every_second(); - g_mutex_unlock(mutex); + if (qdata != NULL) { + break; + } + g_mutex_unlock(&mutex); } - g_io_channel_shutdown(in, TRUE, NULL); + destroy_output_channel(); + if (rm_fifo_at_exit == TRUE) { + if (fifo != NULL) { + g_unlink(fifo); + } + } + if (stop_flag == 2) { + // we exit here for sigterm as main thread is blocked in reading + exit(0); + } + g_mutex_unlock(&mutex); + return NULL; } void init_or_reinit_output_channel(const gchar *lg_file, gboolean us_locks) { @@ -189,7 +220,7 @@ int main(int argc, char *argv[]) exit(1); } g_thread_init(NULL); - mutex = g_mutex_new(); + queue = g_async_queue_new(); signal(SIGTERM, signal_handler); signal(SIGINT, signal_handler); set_default_values_from_env(); @@ -218,12 +249,12 @@ int main(int argc, char *argv[]) GIOStatus in_status = G_IO_STATUS_NORMAL; GString *in_buffer = g_string_new(NULL); init_or_reinit_output_channel(log_file, use_locks); - GThread* management = g_thread_create(management_thread, NULL, FALSE, NULL); - UNUSED(management); - while ((stop_flag == FALSE) && (in_status != G_IO_STATUS_EOF) && (in_status != G_IO_STATUS_ERROR)) { + g_thread_create(stop_thread, NULL, FALSE, NULL); + GThread* management = g_thread_create(management_thread, NULL, TRUE, NULL); + while ((in_status != G_IO_STATUS_EOF) && (in_status != G_IO_STATUS_ERROR)) { in_status = g_io_channel_read_line_string(in, in_buffer, NULL, NULL); if (in_status == G_IO_STATUS_NORMAL) { - g_mutex_lock(mutex); + g_mutex_lock(&mutex); while (TRUE) { gboolean write_status = write_output_channel(in_buffer); if (write_status == FALSE) { @@ -233,21 +264,16 @@ int main(int argc, char *argv[]) } break; } - g_mutex_unlock(mutex); + g_mutex_unlock(&mutex); } } - every_second(); - destroy_output_channel(); + // if we are here, the "in" input channel is closed + g_async_queue_push(queue, GINT_TO_POINTER(1)); + g_thread_join(management); g_io_channel_shutdown(in, FALSE, NULL); g_io_channel_unref(in); g_string_free(in_buffer, TRUE); g_option_context_free(context); g_free(log_file); - g_mutex_free(mutex); - if (rm_fifo_at_exit == TRUE) { - if (fifo != NULL) { - g_unlink(fifo); - } - } return 0; }