Skip to content

Commit

Permalink
in_forward: add mutex for connection
Browse files Browse the repository at this point in the history
==================
WARNING: ThreadSanitizer: data race (pid=54513)
  Read of size 8 at 0x7b4400019fe8 by thread T3 (mutexes: write M21):
    #0 mk_list_add /home/taka/git/fluent-bit/lib/monkey/include/monkey/mk_core/mk_list.h:65 (fluent-bit+0x144ca2)
    #1 flb_downstream_conn_get /home/taka/git/fluent-bit/src/flb_downstream.c:302 (fluent-bit+0x14a71d)
    #2 in_fw_collect /home/taka/git/fluent-bit/plugins/in_forward/fw.c:130 (fluent-bit+0x49a2d3)
    #3 input_collector_fd /home/taka/git/fluent-bit/src/flb_input_thread.c:168 (fluent-bit+0xe1395)
    #4 engine_handle_event /home/taka/git/fluent-bit/src/flb_input_thread.c:183 (fluent-bit+0xe2111)
    #5 input_thread /home/taka/git/fluent-bit/src/flb_input_thread.c:384 (fluent-bit+0xe2111)
    #6 step_callback /home/taka/git/fluent-bit/src/flb_worker.c:43 (fluent-bit+0x158b87)

  Previous write of size 8 at 0x7b4400019fe8 by thread T1:
    #0 __mk_list_del /home/taka/git/fluent-bit/lib/monkey/include/monkey/mk_core/mk_list.h:142 (fluent-bit+0x144d08)
    #1 mk_list_del /home/taka/git/fluent-bit/lib/monkey/include/monkey/mk_core/mk_list.h:147 (fluent-bit+0x144d6a)
    #2 prepare_destroy_conn /home/taka/git/fluent-bit/src/flb_downstream.c:209 (fluent-bit+0x14a41a)
    #3 prepare_destroy_conn_safe /home/taka/git/fluent-bit/src/flb_downstream.c:232 (fluent-bit+0x14a4b4)
    #4 flb_downstream_conn_release /home/taka/git/fluent-bit/src/flb_downstream.c:394 (fluent-bit+0x14ab5b)
    #5 fw_conn_del /home/taka/git/fluent-bit/plugins/in_forward/fw_conn.c:179 (fluent-bit+0x4a08c8)
    #6 fw_conn_event /home/taka/git/fluent-bit/plugins/in_forward/fw_conn.c:101 (fluent-bit+0x4a0349)
    #7 flb_engine_start /home/taka/git/fluent-bit/src/flb_engine.c:961 (fluent-bit+0x1273d2)
    #8 flb_lib_worker /home/taka/git/fluent-bit/src/flb_lib.c:629 (fluent-bit+0xa1ced)

Signed-off-by: Takahiro Yamashita <[email protected]>
  • Loading branch information
nokute78 committed Feb 18, 2023
1 parent 439ae4c commit 268a7db
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 9 deletions.
4 changes: 4 additions & 0 deletions plugins/in_forward/fw.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ static int in_fw_collect(struct flb_input_instance *ins,

ctx = in_context;

pthread_mutex_lock(&ctx->connections_mutex);
connection = flb_downstream_conn_get(ctx->downstream);
pthread_mutex_unlock(&ctx->connections_mutex);

if (connection == NULL) {
flb_plg_error(ctx->ins, "could not accept new connection");
Expand All @@ -136,7 +138,9 @@ static int in_fw_collect(struct flb_input_instance *ins,
}

if (!config->is_ingestion_active) {
pthread_mutex_lock(&ctx->connections_mutex);
flb_downstream_conn_release(connection);
pthread_mutex_unlock(&ctx->connections_mutex);

return -1;
}
Expand Down
2 changes: 2 additions & 0 deletions plugins/in_forward/fw.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <msgpack.h>
#include <fluent-bit/flb_input.h>
#include <fluent-bit/flb_pthread.h>

struct flb_in_fw_config {
size_t buffer_max_size; /* Max Buffer size */
Expand All @@ -41,6 +42,7 @@ struct flb_in_fw_config {
int coll_fd;
struct flb_downstream *downstream; /* Client manager */
struct mk_list connections; /* List of active connections */
pthread_mutex_t connections_mutex;
struct mk_event_loop *evl; /* Event loop file descriptor */
struct flb_input_instance *ins; /* Input plugin instace */
};
Expand Down
1 change: 1 addition & 0 deletions plugins/in_forward/fw_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ struct flb_in_fw_config *fw_config_init(struct flb_input_instance *i_ins)
flb_debug("[in_fw] Listen='%s' TCP_Port=%s",
config->listen, config->tcp_port);
}
pthread_mutex_init(&config->connections_mutex, NULL);
return config;
}

Expand Down
20 changes: 12 additions & 8 deletions plugins/in_forward/fw_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ int fw_conn_event(void *data)
if (conn->buf_size >= ctx->buffer_max_size) {
flb_plg_warn(ctx->ins, "fd=%i incoming data exceed limit (%lu bytes)",
event->fd, (ctx->buffer_max_size));
fw_conn_del(conn);
fw_conn_del(ctx, conn);
return -1;
}
else if (conn->buf_size + ctx->buffer_chunk_size > ctx->buffer_max_size) {
Expand Down Expand Up @@ -91,21 +91,21 @@ int fw_conn_event(void *data)

ret = fw_prot_process(ctx->ins, conn);
if (ret == -1) {
fw_conn_del(conn);
fw_conn_del(ctx, conn);
return -1;
}
return bytes;
}
else {
flb_plg_trace(ctx->ins, "fd=%i closed connection", event->fd);
fw_conn_del(conn);
fw_conn_del(ctx, conn);
return -1;
}
}

if (event->mask & MK_EVENT_CLOSE) {
flb_plg_trace(ctx->ins, "fd=%i hangup", event->fd);
fw_conn_del(conn);
fw_conn_del(ctx, conn);
return -1;
}
return 0;
Expand Down Expand Up @@ -162,14 +162,16 @@ struct fw_conn *fw_conn_add(struct flb_connection *connection, struct flb_in_fw_

return NULL;
}

pthread_mutex_lock(&ctx->connections_mutex);
mk_list_add(&conn->_head, &ctx->connections);
pthread_mutex_unlock(&ctx->connections_mutex);

return conn;
}

int fw_conn_del(struct fw_conn *conn)
int fw_conn_del(struct flb_in_fw_config *ctx, struct fw_conn *conn)
{
pthread_mutex_lock(&ctx->connections_mutex);
/* The downstream unregisters the file descriptor from the event-loop
* so there's nothing to be done by the plugin
*/
Expand All @@ -178,6 +180,8 @@ int fw_conn_del(struct fw_conn *conn)
/* Release resources */
mk_list_del(&conn->_head);

pthread_mutex_unlock(&ctx->connections_mutex);

flb_free(conn->buf);
flb_free(conn);

Expand All @@ -192,8 +196,8 @@ int fw_conn_del_all(struct flb_in_fw_config *ctx)

mk_list_foreach_safe(head, tmp, &ctx->connections) {
conn = mk_list_entry(head, struct fw_conn, _head);
fw_conn_del(conn);
fw_conn_del(ctx, conn);
}

return 0;
}
}
2 changes: 1 addition & 1 deletion plugins/in_forward/fw_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ struct fw_conn {
};

struct fw_conn *fw_conn_add(struct flb_connection *connection, struct flb_in_fw_config *ctx);
int fw_conn_del(struct fw_conn *conn);
int fw_conn_del(struct flb_in_fw_config *ctx, struct fw_conn *conn);
int fw_conn_del_all(struct flb_in_fw_config *ctx);

#endif

0 comments on commit 268a7db

Please sign in to comment.