diff --git a/plugins/in_forward/fw.c b/plugins/in_forward/fw.c index a329405a37b..0aec63cb28a 100644 --- a/plugins/in_forward/fw.c +++ b/plugins/in_forward/fw.c @@ -89,6 +89,11 @@ static int in_fw_collect(struct flb_input_instance *ins, return -1; } + if (config->is_ingestion_active == FLB_FALSE) { + mk_event_closesocket(fd); + return -1; + } + flb_plg_trace(ins, "new TCP connection arrived FD=%i", fd); conn = fw_conn_add(fd, ctx); if (!conn) { @@ -158,14 +163,32 @@ static int in_fw_init(struct flb_input_instance *ins, ctx->server_fd, config); if (ret == -1) { - flb_plg_error(ctx->ins, "could not set collector for IN_FW input plugin"); + flb_plg_error(ctx->ins, "could not set server socket collector"); fw_config_destroy(ctx); return -1; } + ctx->coll_fd = ret; return 0; } +static void in_fw_pause(void *data, struct flb_config *config) +{ + struct flb_in_fw_config *ctx = data; + + /* + * If the plugin is paused AND the ingestion not longer active, + * it means we are in a shutdown phase. This plugin can safetly + * close the socket server collector. + * + * This socket stop is a workaround since the server API will be + * refactored shortly. + */ + if (config->is_ingestion_active == FLB_FALSE) { + mk_event_closesocket(ctx->server_fd); + } +} + static int in_fw_exit(void *data, struct flb_config *config) { struct mk_list *tmp; @@ -191,6 +214,7 @@ struct flb_input_plugin in_forward_plugin = { .cb_pre_run = NULL, .cb_collect = in_fw_collect, .cb_flush_buf = NULL, + .cb_pause = in_fw_pause, .cb_exit = in_fw_exit, .flags = FLB_INPUT_NET }; diff --git a/plugins/in_forward/fw.h b/plugins/in_forward/fw.h index 7c23875d174..8624f6609ed 100644 --- a/plugins/in_forward/fw.h +++ b/plugins/in_forward/fw.h @@ -36,6 +36,7 @@ struct flb_in_fw_config { /* Unix Socket (TCP only) */ char *unix_path; /* Unix path for socket */ + int coll_fd; struct mk_list connections; /* List of active connections */ struct mk_event_loop *evl; /* Event loop file descriptor */ struct flb_input_instance *ins; /* Input plugin instace */