diff --git a/include/fluent-bit/flb_connection.h b/include/fluent-bit/flb_connection.h index b2d1e48c6a4..46fd80c4bad 100644 --- a/include/fluent-bit/flb_connection.h +++ b/include/fluent-bit/flb_connection.h @@ -95,6 +95,14 @@ struct flb_connection { */ int busy_flag; + /* This flag is used to determine if the connection was shut down to ensure we + * don't do it twice when a timeout is detected. + * + * This is required in order to overcome a limitation in the async read / write + * functions that will be addressed as soon as possible. + */ + int shutdown_flag; + /* * Recycle: if the connection is keepalive, this flag is always on, but if * the caller wants to drop the connection once is released, it can set diff --git a/src/flb_connection.c b/src/flb_connection.c index 051a6c9a305..a3ed402651b 100644 --- a/src/flb_connection.c +++ b/src/flb_connection.c @@ -24,6 +24,8 @@ int flb_connection_setup(struct flb_connection *connection, connection->tls_session = NULL; connection->ts_created = time(NULL); connection->ts_assigned = time(NULL); + connection->busy_flag = FLB_FALSE; + connection->shutdown_flag = FLB_FALSE; connection->net = &connection->stream->net; diff --git a/src/flb_upstream.c b/src/flb_upstream.c index 4701cc3d68f..142f9680396 100644 --- a/src/flb_upstream.c +++ b/src/flb_upstream.c @@ -437,6 +437,23 @@ struct flb_upstream *flb_upstream_create_url(struct flb_config *config, return u; } +/* This function shuts the connection down in order to cause + * any client code trying to read or write from it to fail. + */ +static void shutdown_connection(struct flb_connection *u_conn) +{ + struct flb_upstream *u; + + u = u_conn->upstream; + + if (u_conn->fd > 0 && + !u_conn->shutdown_flag) { + shutdown(u_conn->fd, SHUT_RDWR); + + u_conn->shutdown_flag = FLB_TRUE; + } +} + /* * This function moves the 'upstream connection' into the queue to be * destroyed. Note that the caller is responsible to validate and check @@ -462,13 +479,18 @@ static int prepare_destroy_conn(struct flb_connection *u_conn) #ifdef FLB_HAVE_TLS if (u_conn->tls_session != NULL) { flb_tls_session_destroy(u_conn->tls_session); + + u_conn->tls_session = NULL; } #endif - shutdown(u_conn->fd, SHUT_RDWR); + shutdown_connection(u_conn); + flb_socket_close(u_conn->fd); + u_conn->fd = -1; u_conn->event.fd = -1; } + /* remove connection from the queue */ mk_list_del(&u_conn->_head); @@ -843,7 +865,6 @@ int flb_upstream_conn_timeouts(struct mk_list *list) { time_t now; int drop; - int inject; const char *reason; struct mk_list *head; struct mk_list *u_head; @@ -904,18 +925,53 @@ int flb_upstream_conn_timeouts(struct mk_list *list) } } - inject = FLB_FALSE; - if (u_conn->event.status != MK_EVENT_NONE) { - inject = FLB_TRUE; - } u_conn->net_error = ETIMEDOUT; - prepare_destroy_conn(u_conn); - if (inject == FLB_TRUE) { + + /* We need to shut the connection down + * in order to cause some functions that are not + * aware of the connection error signaling + * mechanism to fail and abort. + * + * These functions do not check the net_error field + * in the connection instance upon being awakened + * so we need to ensure that any read/write + * operations on the socket generate an error. + * + * net_io_write_async + * net_io_read_async + * flb_tls_net_write_async + * flb_tls_net_read_async + * + * This operation could be selectively performed for + * connections that have already been established + * with no side effects because the connection + * establishment code honors `net_error` but + * taking in account that the previous version of + * the code did it unconditionally with no noticeable + * side effects leaving it that way is the safest + * choice at the moment. + */ + + if (MK_EVENT_IS_REGISTERED((&u_conn->event))) { + shutdown_connection(u_conn); + mk_event_inject(u_conn->evl, &u_conn->event, u_conn->event.mask, FLB_TRUE); } + else { + /* I can't think of a valid reason for this code path + * to be taken but considering that it was previously + * possible for it to happen (maybe wesley can shed + * some light on it if he remembers) I'll leave this + * for the moment. + * In any case, it's proven not to interfere with the + * coroutine awakening issue this change addresses. + */ + + prepare_destroy_conn(u_conn); + } flb_upstream_decrement_busy_connections_count(u); }