Skip to content

Commit

Permalink
Handle backlog of UNIX socket too
Browse files Browse the repository at this point in the history
The cheaper subsystem specifies that "backlog is only available on Linux
and only on TCP sockets (not UNIX domain sockets)."
This commit specifically implement this: UNIX domain socket support for
backlog on Linux, using Netlink to call the kernel and get the queue status.
  • Loading branch information
Pierre Ducroquet committed Oct 17, 2022
1 parent 2fe305f commit e1c8ec1
Showing 1 changed file with 149 additions and 5 deletions.
154 changes: 149 additions & 5 deletions core/master.c
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,13 @@ static void get_tcp_info(struct uwsgi_socket *uwsgi_sock) {

#ifdef UNBIT
#define SIOBKLGQ 0x8908
#else

#include <linux/netlink.h>
#include <linux/rtnetlink.h>
#include <linux/unix_diag.h>
#include <linux/sock_diag.h>

#endif

#ifdef SIOBKLGQ
Expand All @@ -272,7 +279,143 @@ static void get_linux_unbit_SIOBKLGQ(struct uwsgi_socket *uwsgi_sock) {
uwsgi_sock->max_queue = (uint64_t) uwsgi.listen_queue;
}
}
#else

static int get_socket_inode_from_fd(int fd) {
int inode = -1;
char source_link_path[256];
char link_target[256];
sprintf(source_link_path, "/proc/self/fd/%i", fd);
readlink(source_link_path, link_target, 256);
sscanf(link_target, "socket:[%i]", &inode);
return inode;
}

static void send_netlink_query_for_inode(int fd, int target_ino) {
struct sockaddr_nl nladdr = {.nl_family = AF_NETLINK};
struct {
struct nlmsghdr nlh;
struct unix_diag_req udr;
} req = {
.nlh = {
.nlmsg_len = sizeof(req),
.nlmsg_type = SOCK_DIAG_BY_FAMILY,
.nlmsg_flags = NLM_F_REQUEST
},
.udr = {
.sdiag_family = AF_UNIX,
.udiag_show = UDIAG_SHOW_RQLEN,
.udiag_ino = target_ino,
.udiag_cookie = {-1, -1}
}
};
struct iovec iov = {.iov_base = &req, .iov_len = sizeof(req)};
struct msghdr msg = {
.msg_name = &nladdr,
.msg_namelen = sizeof(nladdr),
.msg_iov = &iov,
.msg_iovlen = 1
};

for (;;) {
if (sendmsg(fd, &msg, 0) < 0) {
if (errno == EINTR)
continue;

perror("sendmsg");
return;
}

return;
}
}

static int receive_netlink_answers(int fd, uint64_t *current_queue, uint64_t *max_queue) {
long buf[8192 / sizeof(long)];
struct sockaddr_nl nladdr;
struct iovec iov = {.iov_base = buf, .iov_len = sizeof(buf)};
int flags = 0;

for (;;) {
struct msghdr msg = {
.msg_name = &nladdr,
.msg_namelen = sizeof(nladdr),
.msg_iov = &iov,
.msg_iovlen = 1
};

ssize_t ret = recvmsg(fd, &msg, flags);

if (ret < 0) {
if (errno == EINTR)
continue;

perror("recvmsg");
return -1;
}
if (ret == 0)
return 0;

if (nladdr.nl_family != AF_NETLINK) {
fputs("!AF_NETLINK\n", stderr);
return -1;
}

const struct nlmsghdr *h = (struct nlmsghdr *)buf;

if (!NLMSG_OK(h, ret)) {
fputs("!NLMSG_OK\n", stderr);
return -1;
}

for (; NLMSG_OK(h, ret); h = NLMSG_NEXT(h, ret)) {
if (h->nlmsg_type == NLMSG_DONE)
return 0;
if (h->nlmsg_type == NLMSG_ERROR) {
const struct nlmsgerr *err = NLMSG_DATA(h);
if (h->nlmsg_len < NLMSG_LENGTH(sizeof(*err))) {
fputs("NLMSG_ERROR\n", stderr);
} else {
errno = -err->error;
perror("NLMSG_ERROR");
}
return -1;
}
if (h->nlmsg_type != SOCK_DIAG_BY_FAMILY) {
fprintf(stderr, "unexpected nlmsg_type %u\n", (unsigned)h->nlmsg_type);
return -1;
}

// Now extract queue len from results
const struct unix_diag_msg *diag = NLMSG_DATA(h);
unsigned int rta_len = h->nlmsg_len - NLMSG_LENGTH(sizeof(*diag));

for (struct rtattr *attr = (struct rtattr *)(diag + 1); RTA_OK(attr, rta_len); attr = RTA_NEXT(attr, rta_len)) {
switch (attr->rta_type) {
case UNIX_DIAG_RQLEN:
struct unix_diag_rqlen queue;
memcpy(&queue, RTA_DATA(attr), sizeof(queue));
*current_queue = queue.udiag_rqueue;
*max_queue = queue.udiag_wqueue;
break;
}
}
}
return 0;
}
}

static void get_linux_unix_socket_queue(struct uwsgi_socket *uwsgi_sock) {
int fd = uwsgi_sock->fd;
int inode = get_socket_inode_from_fd(fd);
int diag_socket = socket(AF_NETLINK, SOCK_RAW | SOCK_CLOEXEC, NETLINK_SOCK_DIAG);
send_netlink_query_for_inode(diag_socket, inode);
receive_netlink_answers(diag_socket, &uwsgi_sock->queue, &uwsgi_sock->max_queue);
close(diag_socket);
}

#endif

#endif

static void master_check_listen_queue() {
Expand All @@ -282,15 +425,16 @@ static void master_check_listen_queue() {
while(uwsgi_sock) {
if (uwsgi_sock->family == AF_INET) {
get_tcp_info(uwsgi_sock);
}
}
else if (uwsgi_sock->family == AF_UNIX) {
#ifdef __linux__
#ifdef SIOBKLGQ
else if (uwsgi_sock->family == AF_UNIX) {
get_linux_unbit_SIOBKLGQ(uwsgi_sock);
}
get_linux_unbit_SIOBKLGQ(uwsgi_sock);
#else
get_linux_unix_socket_queue(uwsgi_sock);
#endif
#endif

}
if (uwsgi_sock->queue > backlog) {
backlog = uwsgi_sock->queue;
}
Expand Down

0 comments on commit e1c8ec1

Please sign in to comment.