Skip to content

Commit

Permalink
in_mqtt: fix memory corruption on dropping packet (#1135)
Browse files Browse the repository at this point in the history
This patch adds an extra verification to the buffer counters to
avoid corruption when memmove() an extra byte.

In addition this patch implement a linked list for the active
connections so when closing Fluent Bit we have a clean exit.

Signed-off-by: Eduardo Silva <[email protected]>
  • Loading branch information
edsiper committed Mar 27, 2019
1 parent 55291c4 commit d978659
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 2 deletions.
1 change: 1 addition & 0 deletions plugins/in_mqtt/mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ static int in_mqtt_exit(void *data, struct flb_config *config)
(void) *config;
struct flb_in_mqtt_config *ctx = data;

mqtt_conn_destroy_all(ctx);
mqtt_config_free(ctx);

return 0;
Expand Down
1 change: 1 addition & 0 deletions plugins/in_mqtt/mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ struct flb_in_mqtt_config {
char msgp[MQTT_MSGP_BUF_SIZE]; /* msgpack static buffer */
struct flb_input_instance *i_ins; /* plugin input instance */
struct mk_event_loop *evl; /* Event loop file descriptor */
struct mk_list conns; /* Active connections */
};

int in_mqtt_collect(struct flb_input_instance *i_ins,
Expand Down
8 changes: 6 additions & 2 deletions plugins/in_mqtt/mqtt_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ struct flb_in_mqtt_config *mqtt_config_init(struct flb_input_instance *i_ins)
char *listen;
struct flb_in_mqtt_config *config;

config = flb_malloc(sizeof(struct flb_in_mqtt_config));
memset(config, '\0', sizeof(struct flb_in_mqtt_config));
config = flb_calloc(1, sizeof(struct flb_in_mqtt_config));
if (!config) {
flb_errno();
return NULL;
}

/* Listen interface (if not set, defaults to 0.0.0.0) */
if (!i_ins->host.listen) {
Expand Down Expand Up @@ -59,6 +62,7 @@ struct flb_in_mqtt_config *mqtt_config_init(struct flb_input_instance *i_ins)
flb_debug("[in_mqtt] Listen='%s' TCP_Port=%s",
config->listen, config->tcp_port);

mk_list_init(&config->conns);
return config;
}

Expand Down
17 changes: 17 additions & 0 deletions plugins/in_mqtt/mqtt_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ struct mqtt_conn *mqtt_conn_add(int fd, struct flb_in_mqtt_config *ctx)

conn = flb_malloc(sizeof(struct mqtt_conn));
if (!conn) {
flb_errno();
return NULL;
}

Expand Down Expand Up @@ -99,6 +100,7 @@ struct mqtt_conn *mqtt_conn_add(int fd, struct flb_in_mqtt_config *ctx)
return NULL;
}

mk_list_add(&conn->_head, &ctx->conns);
return conn;
}

Expand All @@ -109,7 +111,22 @@ int mqtt_conn_del(struct mqtt_conn *conn)

/* Release resources */
close(conn->fd);
mk_list_del(&conn->_head);
flb_free(conn);

return 0;
}

int mqtt_conn_destroy_all(struct flb_in_mqtt_config *ctx)
{
struct mk_list *tmp;
struct mk_list *head;
struct mqtt_conn *conn;

mk_list_foreach_safe(head, tmp, &ctx->conns) {
conn = mk_list_entry(head, struct mqtt_conn, _head);
mqtt_conn_del(conn);
}

return 0;
}
2 changes: 2 additions & 0 deletions plugins/in_mqtt/mqtt_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ struct mqtt_conn {
int buf_len; /* Buffer content length */
unsigned char buf[1024]; /* Buffer data */
struct flb_in_mqtt_config *ctx; /* Plugin configuration context */
struct mk_list _head; /* Link to flb_in_mqtt_config->conns */
};

struct mqtt_conn *mqtt_conn_add(int fd, struct flb_in_mqtt_config *ctx);
int mqtt_conn_del(struct mqtt_conn *conn);
int mqtt_conn_destroy_all(struct flb_in_mqtt_config *ctx);

#endif
8 changes: 8 additions & 0 deletions plugins/in_mqtt/mqtt_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ static inline int mqtt_packet_drop(struct mqtt_conn *conn)
{
int move_bytes;

if (conn->buf_pos == conn->buf_len) {
conn->buf_frame_end = 0;
conn->buf_len = 0;
conn->buf_pos = 0;
return 0;
}

move_bytes = conn->buf_pos + 1;
memmove(conn->buf,
conn->buf + move_bytes,
Expand Down Expand Up @@ -382,6 +389,7 @@ int mqtt_prot_parser(struct mqtt_conn *conn)
/* Prepare for next round */
conn->status = MQTT_NEXT;
conn->buf_pos = conn->buf_frame_end;

mqtt_packet_drop(conn);

if (conn->buf_len > 0) {
Expand Down

0 comments on commit d978659

Please sign in to comment.