Skip to content

Commit

Permalink
in_forward: partial workaround to close connections on SIGTERM (fluen…
Browse files Browse the repository at this point in the history
…t#2610)

This patch adds some extra handling for service termination, so when
the engine paused ingestion, the server socket is close to avoid
having incoming connection and not-processed records.

This patch works when the service is not in a scenario with back-pressure,
so it can be considered a partial workaround.

There is an upcoming re-resign of server side API for scalability that
will address all these issues.

Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
edsiper authored and Magnus Sirviö committed Oct 7, 2020
1 parent 15ef9f2 commit 2548df5
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
26 changes: 25 additions & 1 deletion plugins/in_forward/fw.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ static int in_fw_collect(struct flb_input_instance *ins,
return -1;
}

if (config->is_ingestion_active == FLB_FALSE) {
mk_event_closesocket(fd);
return -1;
}

flb_plg_trace(ins, "new TCP connection arrived FD=%i", fd);
conn = fw_conn_add(fd, ctx);
if (!conn) {
Expand Down Expand Up @@ -158,14 +163,32 @@ static int in_fw_init(struct flb_input_instance *ins,
ctx->server_fd,
config);
if (ret == -1) {
flb_plg_error(ctx->ins, "could not set collector for IN_FW input plugin");
flb_plg_error(ctx->ins, "could not set server socket collector");
fw_config_destroy(ctx);
return -1;
}
ctx->coll_fd = ret;

return 0;
}

static void in_fw_pause(void *data, struct flb_config *config)
{
struct flb_in_fw_config *ctx = data;

/*
* If the plugin is paused AND the ingestion not longer active,
* it means we are in a shutdown phase. This plugin can safetly
* close the socket server collector.
*
* This socket stop is a workaround since the server API will be
* refactored shortly.
*/
if (config->is_ingestion_active == FLB_FALSE) {
mk_event_closesocket(ctx->server_fd);
}
}

static int in_fw_exit(void *data, struct flb_config *config)
{
struct mk_list *tmp;
Expand All @@ -191,6 +214,7 @@ struct flb_input_plugin in_forward_plugin = {
.cb_pre_run = NULL,
.cb_collect = in_fw_collect,
.cb_flush_buf = NULL,
.cb_pause = in_fw_pause,
.cb_exit = in_fw_exit,
.flags = FLB_INPUT_NET
};
1 change: 1 addition & 0 deletions plugins/in_forward/fw.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ struct flb_in_fw_config {
/* Unix Socket (TCP only) */
char *unix_path; /* Unix path for socket */

int coll_fd;
struct mk_list connections; /* List of active connections */
struct mk_event_loop *evl; /* Event loop file descriptor */
struct flb_input_instance *ins; /* Input plugin instace */
Expand Down

0 comments on commit 2548df5

Please sign in to comment.