Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upstream: premature connection destruction fix #7728

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions include/fluent-bit/flb_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ struct flb_connection {
*/
int busy_flag;

/* This flag is used to determine if the connection was shut down to ensure we
* don't do it twice when a timeout is detected.
*
* This is required in order to overcome a limitation in the async read / write
* functions that will be addressed as soon as possible.
*/
int shutdown_flag;

/*
* Recycle: if the connection is keepalive, this flag is always on, but if
* the caller wants to drop the connection once is released, it can set
Expand Down
2 changes: 2 additions & 0 deletions src/flb_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ int flb_connection_setup(struct flb_connection *connection,
connection->tls_session = NULL;
connection->ts_created = time(NULL);
connection->ts_assigned = time(NULL);
connection->busy_flag = FLB_FALSE;
connection->shutdown_flag = FLB_FALSE;

connection->net = &connection->stream->net;

Expand Down
72 changes: 64 additions & 8 deletions src/flb_upstream.c
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,23 @@ struct flb_upstream *flb_upstream_create_url(struct flb_config *config,
return u;
}

/* This function shuts the connection down in order to cause
* any client code trying to read or write from it to fail.
*/
static void shutdown_connection(struct flb_connection *u_conn)
{
struct flb_upstream *u;

u = u_conn->upstream;

if (u_conn->fd > 0 &&
!u_conn->shutdown_flag) {
shutdown(u_conn->fd, SHUT_RDWR);

u_conn->shutdown_flag = FLB_TRUE;
}
}

/*
* This function moves the 'upstream connection' into the queue to be
* destroyed. Note that the caller is responsible to validate and check
Expand All @@ -462,13 +479,18 @@ static int prepare_destroy_conn(struct flb_connection *u_conn)
#ifdef FLB_HAVE_TLS
if (u_conn->tls_session != NULL) {
flb_tls_session_destroy(u_conn->tls_session);

u_conn->tls_session = NULL;
}
#endif
shutdown(u_conn->fd, SHUT_RDWR);
shutdown_connection(u_conn);

flb_socket_close(u_conn->fd);

u_conn->fd = -1;
u_conn->event.fd = -1;
}

/* remove connection from the queue */
mk_list_del(&u_conn->_head);

Expand Down Expand Up @@ -843,7 +865,6 @@ int flb_upstream_conn_timeouts(struct mk_list *list)
{
time_t now;
int drop;
int inject;
const char *reason;
struct mk_list *head;
struct mk_list *u_head;
Expand Down Expand Up @@ -904,18 +925,53 @@ int flb_upstream_conn_timeouts(struct mk_list *list)
}
}

inject = FLB_FALSE;
if (u_conn->event.status != MK_EVENT_NONE) {
inject = FLB_TRUE;
}
u_conn->net_error = ETIMEDOUT;
prepare_destroy_conn(u_conn);
if (inject == FLB_TRUE) {

/* We need to shut the connection down
* in order to cause some functions that are not
* aware of the connection error signaling
* mechanism to fail and abort.
*
* These functions do not check the net_error field
* in the connection instance upon being awakened
* so we need to ensure that any read/write
* operations on the socket generate an error.
*
* net_io_write_async
* net_io_read_async
* flb_tls_net_write_async
* flb_tls_net_read_async
*
* This operation could be selectively performed for
* connections that have already been established
* with no side effects because the connection
* establishment code honors `net_error` but
* taking in account that the previous version of
* the code did it unconditionally with no noticeable
* side effects leaving it that way is the safest
* choice at the moment.
*/

if (MK_EVENT_IS_REGISTERED((&u_conn->event))) {
shutdown_connection(u_conn);

mk_event_inject(u_conn->evl,
&u_conn->event,
u_conn->event.mask,
FLB_TRUE);
}
else {
/* I can't think of a valid reason for this code path
* to be taken but considering that it was previously
* possible for it to happen (maybe wesley can shed
* some light on it if he remembers) I'll leave this
* for the moment.
* In any case, it's proven not to interfere with the
* coroutine awakening issue this change addresses.
*/

prepare_destroy_conn(u_conn);
}

flb_upstream_decrement_busy_connections_count(u);
}
Expand Down