Skip to content

Commit

Permalink
in_forward: partial workaround to close connections on SIGTERM (#2610)
Browse files Browse the repository at this point in the history
This patch adds some extra handling for service termination, so when
the engine paused ingestion, the server socket is close to avoid
having incoming connection and not-processed records.

This patch works when the service is not in a scenario with back-pressure,
so it can be considered a partial workaround.

There is an upcoming re-resign of server side API for scalability that
will address all these issues.

Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
edsiper committed Oct 2, 2020
1 parent 868cea9 commit 4e0e687
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
26 changes: 25 additions & 1 deletion plugins/in_forward/fw.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ static int in_fw_collect(struct flb_input_instance *ins,
return -1;
}

if (config->is_ingestion_active == FLB_FALSE) {
mk_event_closesocket(fd);
return -1;
}

flb_plg_trace(ins, "new TCP connection arrived FD=%i", fd);
conn = fw_conn_add(fd, ctx);
if (!conn) {
Expand Down Expand Up @@ -158,14 +163,32 @@ static int in_fw_init(struct flb_input_instance *ins,
ctx->server_fd,
config);
if (ret == -1) {
flb_plg_error(ctx->ins, "could not set collector for IN_FW input plugin");
flb_plg_error(ctx->ins, "could not set server socket collector");
fw_config_destroy(ctx);
return -1;
}
ctx->coll_fd = ret;

return 0;
}

static void in_fw_pause(void *data, struct flb_config *config)
{
struct flb_in_fw_config *ctx = data;

/*
* If the plugin is paused AND the ingestion not longer active,
* it means we are in a shutdown phase. This plugin can safetly
* close the socket server collector.
*
* This socket stop is a workaround since the server API will be
* refactored shortly.
*/
if (config->is_ingestion_active == FLB_FALSE) {
mk_event_closesocket(ctx->server_fd);
}
}

static int in_fw_exit(void *data, struct flb_config *config)
{
struct mk_list *tmp;
Expand All @@ -191,6 +214,7 @@ struct flb_input_plugin in_forward_plugin = {
.cb_pre_run = NULL,
.cb_collect = in_fw_collect,
.cb_flush_buf = NULL,
.cb_pause = in_fw_pause,
.cb_exit = in_fw_exit,
.flags = FLB_INPUT_NET
};
1 change: 1 addition & 0 deletions plugins/in_forward/fw.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ struct flb_in_fw_config {
/* Unix Socket (TCP only) */
char *unix_path; /* Unix path for socket */

int coll_fd;
struct mk_list connections; /* List of active connections */
struct mk_event_loop *evl; /* Event loop file descriptor */
struct flb_input_instance *ins; /* Input plugin instace */
Expand Down

0 comments on commit 4e0e687

Please sign in to comment.