Skip to content

Commit

Permalink
tcp_bpf: improve ingress redirection performance with message corking
Browse files Browse the repository at this point in the history
The TCP_BPF ingress redirection path currently lacks the message corking
mechanism found in standard TCP. This causes the sender to wake up the
receiver for every message, even when messages are small, resulting in
reduced throughput compared to regular TCP in certain scenarios.

This change introduces a kernel worker-based intermediate layer to provide
automatic message corking for TCP_BPF. While this adds a slight latency
overhead, it significantly improves overall throughput by reducing
unnecessary wake-ups and reducing the sock lock contention.

Reviewed-by: Amery Hung <[email protected]>
Co-developed-by: Cong Wang <[email protected]>
Signed-off-by: Cong Wang <[email protected]>
Signed-off-by: Zijian Zhang <[email protected]>
  • Loading branch information
Zijian Zhang authored and Kernel Patches Daemon committed Feb 26, 2025
1 parent e0dbba9 commit cc9cedf
Show file tree
Hide file tree
Showing 3 changed files with 347 additions and 8 deletions.
19 changes: 19 additions & 0 deletions include/linux/skmsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

#define MAX_MSG_FRAGS MAX_SKB_FRAGS
#define NR_MSG_FRAG_IDS (MAX_MSG_FRAGS + 1)
/* GSO size for TCP BPF backlog processing */
#define TCP_BPF_GSO_SIZE 65536

enum __sk_action {
__SK_DROP = 0,
Expand Down Expand Up @@ -85,8 +87,10 @@ struct sk_psock {
struct sock *sk_redir;
u32 apply_bytes;
u32 cork_bytes;
u32 backlog_since_notify;
unsigned int eval : 8;
unsigned int redir_ingress : 1; /* undefined if sk_redir is null */
unsigned int backlog_work_delayed : 1;
struct sk_msg *cork;
struct sk_psock_progs progs;
#if IS_ENABLED(CONFIG_BPF_STREAM_PARSER)
Expand All @@ -97,6 +101,9 @@ struct sk_psock {
struct sk_buff_head ingress_skb;
struct list_head ingress_msg;
spinlock_t ingress_lock;
struct list_head backlog_msg;
/* spin_lock for backlog_msg and backlog_since_notify */
spinlock_t backlog_msg_lock;
unsigned long state;
struct list_head link;
spinlock_t link_lock;
Expand All @@ -117,11 +124,13 @@ struct sk_psock {
struct mutex work_mutex;
struct sk_psock_work_state work_state;
struct delayed_work work;
struct delayed_work backlog_work;
struct sock *sk_pair;
struct rcu_work rwork;
};

struct sk_msg *sk_msg_alloc(gfp_t gfp);
bool sk_msg_try_coalesce_ok(struct sk_msg *msg, int elem_first_coalesce);
int sk_msg_expand(struct sock *sk, struct sk_msg *msg, int len,
int elem_first_coalesce);
int sk_msg_clone(struct sock *sk, struct sk_msg *dst, struct sk_msg *src,
Expand Down Expand Up @@ -396,9 +405,19 @@ static inline void sk_psock_report_error(struct sk_psock *psock, int err)
sk_error_report(sk);
}

void sk_psock_backlog_msg(struct sk_psock *psock);
struct sk_psock *sk_psock_init(struct sock *sk, int node);
void sk_psock_stop(struct sk_psock *psock);

static inline void sk_psock_run_backlog_work(struct sk_psock *psock,
bool delayed)
{
if (!sk_psock_test_state(psock, SK_PSOCK_TX_ENABLED))
return;
psock->backlog_work_delayed = delayed;
schedule_delayed_work(&psock->backlog_work, delayed ? 1 : 0);
}

#if IS_ENABLED(CONFIG_BPF_STREAM_PARSER)
int sk_psock_init_strp(struct sock *sk, struct sk_psock *psock);
void sk_psock_start_strp(struct sock *sk, struct sk_psock *psock);
Expand Down
139 changes: 138 additions & 1 deletion net/core/skmsg.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

struct kmem_cache *sk_msg_cachep;

static bool sk_msg_try_coalesce_ok(struct sk_msg *msg, int elem_first_coalesce)
bool sk_msg_try_coalesce_ok(struct sk_msg *msg, int elem_first_coalesce)
{
if (msg->sg.end > msg->sg.start &&
elem_first_coalesce < msg->sg.end)
Expand Down Expand Up @@ -707,6 +707,118 @@ static void sk_psock_backlog(struct work_struct *work)
mutex_unlock(&psock->work_mutex);
}

static bool backlog_notify(struct sk_psock *psock, bool m_sched_failed,
bool ingress_empty)
{
/* Notify if:
* 1. We have corked enough bytes
* 2. We have already delayed notification
* 3. Memory allocation failed
* 4. Ingress queue was empty and we're about to add data
*/
return psock->backlog_since_notify >= TCP_BPF_GSO_SIZE ||
psock->backlog_work_delayed ||
m_sched_failed ||
ingress_empty;
}

static bool backlog_xfer_to_local(struct sk_psock *psock, struct sock *sk_from,
struct list_head *local_head, u32 *tot_size)
{
struct sock *sk = psock->sk;
struct sk_msg *msg, *tmp;
u32 size = 0;

list_for_each_entry_safe(msg, tmp, &psock->backlog_msg, list) {
if (msg->sk != sk_from)
break;

if (!__sk_rmem_schedule(sk, msg->sg.size, false))
return true;

list_move_tail(&msg->list, local_head);
sk_wmem_queued_add(msg->sk, -msg->sg.size);
sock_put(msg->sk);
msg->sk = NULL;
psock->backlog_since_notify += msg->sg.size;
size += msg->sg.size;
}

*tot_size = size;
return false;
}

/* This function handles the transfer of backlogged messages from the sender
* backlog queue to the ingress queue of the peer socket. Notification of data
* availability will be sent under some conditions.
*/
void sk_psock_backlog_msg(struct sk_psock *psock)
{
bool rmem_schedule_failed = false;
struct sock *sk_from = NULL;
struct sock *sk = psock->sk;
LIST_HEAD(local_head);
struct sk_msg *msg;
bool should_notify;
u32 tot_size = 0;

if (!sk_psock_test_state(psock, SK_PSOCK_TX_ENABLED))
return;

lock_sock(sk);
spin_lock(&psock->backlog_msg_lock);

msg = list_first_entry_or_null(&psock->backlog_msg,
struct sk_msg, list);
if (!msg) {
should_notify = !list_empty(&psock->ingress_msg);
spin_unlock(&psock->backlog_msg_lock);
goto notify;
}

sk_from = msg->sk;
sock_hold(sk_from);

rmem_schedule_failed = backlog_xfer_to_local(psock, sk_from,
&local_head, &tot_size);
should_notify = backlog_notify(psock, rmem_schedule_failed,
list_empty(&psock->ingress_msg));
spin_unlock(&psock->backlog_msg_lock);

spin_lock_bh(&psock->ingress_lock);
list_splice_tail_init(&local_head, &psock->ingress_msg);
spin_unlock_bh(&psock->ingress_lock);

atomic_add(tot_size, &sk->sk_rmem_alloc);
sk_mem_charge(sk, tot_size);

notify:
if (should_notify) {
psock->backlog_since_notify = 0;
sk_psock_data_ready(sk, psock);
if (!list_empty(&psock->backlog_msg))
sk_psock_run_backlog_work(psock, rmem_schedule_failed);
} else {
sk_psock_run_backlog_work(psock, true);
}
release_sock(sk);

if (sk_from) {
bool slow = lock_sock_fast(sk_from);

sk_mem_uncharge(sk_from, tot_size);
unlock_sock_fast(sk_from, slow);
sock_put(sk_from);
}
}

static void sk_psock_backlog_msg_work(struct work_struct *work)
{
struct delayed_work *dwork = to_delayed_work(work);

sk_psock_backlog_msg(container_of(dwork, struct sk_psock, backlog_work));
}

struct sk_psock *sk_psock_init(struct sock *sk, int node)
{
struct sk_psock *psock;
Expand Down Expand Up @@ -744,8 +856,11 @@ struct sk_psock *sk_psock_init(struct sock *sk, int node)

INIT_DELAYED_WORK(&psock->work, sk_psock_backlog);
mutex_init(&psock->work_mutex);
INIT_DELAYED_WORK(&psock->backlog_work, sk_psock_backlog_msg_work);
INIT_LIST_HEAD(&psock->ingress_msg);
spin_lock_init(&psock->ingress_lock);
INIT_LIST_HEAD(&psock->backlog_msg);
spin_lock_init(&psock->backlog_msg_lock);
skb_queue_head_init(&psock->ingress_skb);

sk_psock_set_state(psock, SK_PSOCK_TX_ENABLED);
Expand Down Expand Up @@ -799,6 +914,26 @@ static void __sk_psock_zap_ingress(struct sk_psock *psock)
__sk_psock_purge_ingress_msg(psock);
}

static void __sk_psock_purge_backlog_msg(struct sk_psock *psock)
{
struct sk_msg *msg, *tmp;

spin_lock(&psock->backlog_msg_lock);
list_for_each_entry_safe(msg, tmp, &psock->backlog_msg, list) {
struct sock *sk_from = msg->sk;
bool slow;

list_del(&msg->list);
slow = lock_sock_fast(sk_from);
sk_wmem_queued_add(sk_from, -msg->sg.size);
sock_put(sk_from);
sk_msg_free(sk_from, msg);
unlock_sock_fast(sk_from, slow);
kfree_sk_msg(msg);
}
spin_unlock(&psock->backlog_msg_lock);
}

static void sk_psock_link_destroy(struct sk_psock *psock)
{
struct sk_psock_link *link, *tmp;
Expand Down Expand Up @@ -828,7 +963,9 @@ static void sk_psock_destroy(struct work_struct *work)
sk_psock_done_strp(psock);

cancel_delayed_work_sync(&psock->work);
cancel_delayed_work_sync(&psock->backlog_work);
__sk_psock_zap_ingress(psock);
__sk_psock_purge_backlog_msg(psock);
mutex_destroy(&psock->work_mutex);

psock_progs_drop(&psock->progs);
Expand Down
Loading

0 comments on commit cc9cedf

Please sign in to comment.