diff --git a/zebra/zserv.c b/zebra/zserv.c index 6a64176d98b3..c444eebee344 100644 --- a/zebra/zserv.c +++ b/zebra/zserv.c @@ -306,6 +306,12 @@ static void zserv_write(struct event *thread) * this task reschedules itself. * * Any failure in any of these actions is handled by terminating the client. + * + * The client's input buffer ibuf_fifo can have a maximum items as configured + * in the packets_to_process. Once the maximum item limit is reached, + * the client pthread expects main to signal for it to continue reading the + * incoming buffer. This way we are not filling up the FIFO more than the + * maximum when the zebra main is busy. */ static void zserv_read(struct event *thread) { @@ -314,9 +320,9 @@ static void zserv_read(struct event *thread) size_t already; struct stream_fifo *cache; uint32_t p2p_orig; - uint32_t p2p; struct zmsghdr hdr; + size_t client_ibuf_fifo_cnt = stream_fifo_count_safe(client->ibuf_fifo); p2p_orig = atomic_load_explicit(&zrouter.packets_to_process, memory_order_relaxed); @@ -324,7 +330,7 @@ static void zserv_read(struct event *thread) p2p = p2p_orig; sock = EVENT_FD(thread); - while (p2p) { + while (p2p - client_ibuf_fifo_cnt) { ssize_t nb; bool hdrvalid; char errmsg[256]; @@ -435,6 +441,9 @@ static void zserv_read(struct event *thread) while (cache->head) stream_fifo_push(client->ibuf_fifo, stream_fifo_pop(cache)); + /*Need to update count since main thread could have processed few*/ + client_ibuf_fifo_cnt = + stream_fifo_count_safe(client->ibuf_fifo); } /* Schedule job to process those packets */ @@ -442,14 +451,50 @@ static void zserv_read(struct event *thread) } - if (IS_ZEBRA_DEBUG_PACKET) - zlog_debug("Read %d packets from client: %s", p2p_orig - p2p, - zebra_route_string(client->proto)); + /* + * We arrive at this point in three cases. + * Case-1: Initially Client buffer is already to its max limit i.e. + * client_ibuf_fifo == p2p_orig. + * Case-2: Initially Client buffer is Not at the max limit but becomes full + * i.e. Initial client buffer had 90 elements with max (say 100). + * So we add 10 more items to fifo and reach the max limit. + * Case-3: Initially Client buffer is Not at the max limit and neither is + * it after the handling of new incoming buffer msg + * i.e. Initial client buffer had 40 elements with max (say 100). + * So now we add 20 more items to fifo. + * + * Only in case-3 we reschedule ourselves. + * In case-1 and case-2, the zserv_process_messages() (main thread) + * after processing the msgs from client input buffer (ibuf_fifo), wakes up + * the current client pthread to continue reading incoming messages. + */ + if (client_ibuf_fifo_cnt == p2p_orig) { + /* Case-1 */ + if (p2p == p2p_orig) { + if (IS_ZEBRA_DEBUG_PACKET) + zlog_debug("Client: %s input buffer (ibuf_fifo) has reached its limit: %d", + zebra_route_string(client->proto), + p2p_orig); + } else { + /* Case-2*/ + if (IS_ZEBRA_DEBUG_PACKET) + zlog_debug("Client: %s input buffer has reached its limit %d after reading %d packets", + zebra_route_string(client->proto), + p2p_orig, p2p_orig - p2p); + stream_fifo_free(cache); + } + } else { + /* Case-3*/ + if (IS_ZEBRA_DEBUG_PACKET) + zlog_debug("Read %d packets from client: %s", + p2p_orig - p2p, + zebra_route_string(client->proto)); - /* Reschedule ourselves */ - zserv_client_event(client, ZSERV_CLIENT_READ); + /* Reschedule ourselves since we have space in ibuf_fifo*/ + zserv_client_event(client, ZSERV_CLIENT_READ); - stream_fifo_free(cache); + stream_fifo_free(cache); + } return; @@ -483,14 +528,18 @@ static void zserv_client_event(struct zserv *client, * as the task argument. * * Each message is popped off the client's input queue and the action associated - * with the message is executed. This proceeds until there are no more messages, - * an error occurs, or the processing limit is reached. + * with the message is executed. This proceeds until an error occurs, or the + * processing limit is reached. * * The client's I/O thread can push at most zrouter.packets_to_process messages * onto the input buffer before notifying us there are packets to read. As long * as we always process zrouter.packets_to_process messages here, then we can * rely on the read thread to handle queuing this task enough times to process * everything on the input queue. + * + * If the client ibuf has maximum items(packets_to_process) then the main thread + * schedules a call to the client pthread to pump in more items on the client's + * input buffer fifo. */ static void zserv_process_messages(struct event *thread) { @@ -498,21 +547,16 @@ static void zserv_process_messages(struct event *thread) struct stream *msg; struct stream_fifo *cache = stream_fifo_new(); uint32_t p2p = zrouter.packets_to_process; - bool need_resched = false; + size_t client_ibuf_fifo_cnt; frr_with_mutex (&client->ibuf_mtx) { + client_ibuf_fifo_cnt = stream_fifo_count_safe(client->ibuf_fifo); uint32_t i; for (i = 0; i < p2p && stream_fifo_head(client->ibuf_fifo); ++i) { msg = stream_fifo_pop(client->ibuf_fifo); stream_fifo_push(cache, msg); } - - /* Need to reschedule processing work if there are still - * packets in the fifo. - */ - if (stream_fifo_head(client->ibuf_fifo)) - need_resched = true; } /* Process the batch of messages */ @@ -520,10 +564,13 @@ static void zserv_process_messages(struct event *thread) zserv_handle_commands(client, cache); stream_fifo_free(cache); + if (IS_ZEBRA_DEBUG_PACKET) + zlog_debug("Zebra Main Processed %zu packets from client: %s", + client_ibuf_fifo_cnt, + zebra_route_string(client->proto)); - /* Reschedule ourselves if necessary */ - if (need_resched) - zserv_event(client, ZSERV_PROCESS_MESSAGES); + if (client_ibuf_fifo_cnt == p2p) + zserv_client_event(client, ZSERV_CLIENT_READ); } int zserv_send_message(struct zserv *client, struct stream *msg)