Skip to content

Commit

Permalink
Coallesce small buffers to send them as one
Browse files Browse the repository at this point in the history
Summary: The new async client function to write a vector of buffers can cause lots of small buffers to go through SSL encoding which can be expensive.  This diff attempts to combine the first portions (which are most likely to be small) into a single buffer.  The original code already had a small buffer to build the header into.  This expands that buffer and then continues copying data into it until the next buffer won't fit.  This diff also avoids adding 0 length buffers which could occur with the original implementation.  Note that this buffer coalescing is disabled when the overall packet is very large and gets turned off part way through if network compression is enabled.

Differential Revision: D21867050
  • Loading branch information
jkedgar authored and inikep committed Jul 19, 2024
1 parent 24344dc commit d75beb2
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 46 deletions.
12 changes: 10 additions & 2 deletions include/mysql_async.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ struct io_vec {
size_t iov_len; /**< Number of bytes to transfer */
};

#define ASYNC_COALESCE_BUFFER_SIZE 1024

// Make sure the coalesce buffer size is larger than we used to have
#define ASYNC_INLINE_ORIGINAL_SIZE \
NET_HEADER_SIZE + COMP_HEADER_SIZE + NET_HEADER_SIZE + 1
#if ASYNC_COALESCE_BUFFER_SIZE < ASYNC_INLINE_ORIGINAL_SIZE
#error The coalesce buffer size is too small
#endif

typedef struct NET_ASYNC {
/**
The position in buff we continue reads from when data is next
Expand Down Expand Up @@ -162,8 +171,7 @@ typedef struct NET_ASYNC {
B12 : COM_COMMAND
~~~~~~~~~~~~~~~~~~~
*/
unsigned char inline_async_write_header[NET_HEADER_SIZE + COMP_HEADER_SIZE +
NET_HEADER_SIZE + 1];
unsigned char inline_async_write_header[ASYNC_COALESCE_BUFFER_SIZE];
struct io_vec inline_async_write_vector[3];

/** Keep track of compressed buffers */
Expand Down
109 changes: 65 additions & 44 deletions sql-common/net_serv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,8 @@ static int begin_packet_write_state(NET *net, uchar command,
struct io_vec *vec;
uchar *headers;
uchar **compressed_buffers = nullptr;
bool try_coalesce = true;

if (total_len < MAX_PACKET_LENGTH) {
/*
Most writes hit this case, ie, less than MAX_PACKET_LENGTH of
Expand All @@ -574,6 +576,11 @@ static int begin_packet_write_state(NET *net, uchar command,
my_free(vec);
return 0;
}

// Don't try any coalescing on messages that require multiple network
// packets - the efficiency gains would be minimal and the code would
// be more complex.
try_coalesce = false;
}
/*
Regardless of where vec and headers come from, these are what we
Expand Down Expand Up @@ -631,10 +638,7 @@ static int begin_packet_write_state(NET *net, uchar command,
}
int3store(buf, comp_packet_len);
buf[3] = (uchar)net->compress_pkt_nr++;
/*
The bytes in COMP_HEADER_SIZE are implicitly zero because they were
zero filled. A zero length means that the contents are uncompressed.
*/
memset(buf + NET_HEADER_SIZE, 0, COMP_HEADER_SIZE);
buf += NET_HEADER_SIZE + COMP_HEADER_SIZE;
}

Expand All @@ -653,65 +657,82 @@ static int begin_packet_write_state(NET *net, uchar command,
++bytes_queued;
}

++vec;

/* Second iovec (if any), our optional prefix. */
if (packet_num == 0 && optional_prefix != nullptr) {
(*vec).iov_base = const_cast<uchar *>(optional_prefix);
(*vec).iov_len = prefix_len;
if (try_coalesce &&
vec->iov_len + prefix_len <= ASYNC_COALESCE_BUFFER_SIZE) {
memcpy((uchar *)vec->iov_base + vec->iov_len, optional_prefix,
prefix_len);
vec->iov_len += prefix_len;
} else {
++vec;
(*vec).iov_base = const_cast<uchar *>(optional_prefix);
(*vec).iov_len = prefix_len;
try_coalesce = false;
}
bytes_queued += prefix_len;
++vec;
}
/*
Final iovec, the payload itself. Send however many bytes from
packet we have left, and advance our packet pointer.
*/
const size_t remaining_bytes = packet_size - bytes_queued;
(*vec).iov_base = const_cast<uchar *>(packet);
(*vec).iov_len = remaining_bytes;

bytes_queued += remaining_bytes;
if (remaining_bytes != 0) {
// Turn off coalescing if compressing
try_coalesce = try_coalesce && !net->compress;

if (try_coalesce &&
vec->iov_len + remaining_bytes <= ASYNC_COALESCE_BUFFER_SIZE) {
memcpy((uchar *)vec->iov_base + vec->iov_len, packet, remaining_bytes);
vec->iov_len += remaining_bytes;
} else {
++vec;
(*vec).iov_base = const_cast<uchar *>(packet);
(*vec).iov_len = remaining_bytes;
}

packet += remaining_bytes;
total_len -= bytes_queued;
/* clang-format off */
/*
If we have a payload to compress, then compress_packet will add
compression headers for us. This is what we have at this point where
each line is an iovec.
| len |cpn|uncompress len| len | pn | command |
| prefix + command | 0 |0 | total_len = command + prefix + payload | 0 |COM_* |
bytes_queued += remaining_bytes;

| prefix |
|... |
packet += remaining_bytes;
/* clang-format off */
/*
If we have a payload to compress, then compress_packet will
add compression headers for us. This is what we have at this point where
each line is an iovec.
| len |cpn|uncompress len| len | pn | command |
| prefix + command | 0| 0| total_len = command + prefix + payload | 0 | COM_* |
|payload |
|... |
| prefix |
| ... |
We want to transform into this:
| payload |
| ... |
| len |cpn|uncompress len| len | pn | command |
| prefix +command | 0 |0 | total_len = command + prefix + payload | 0 |COM_* |
We want to transform into this:
| len |cpn|uncompress len| len | pn | command |
| prefix + command | 0| 0| total_len = command + prefix + payload | 0 | COM_* |
| prefix |
|... |
| prefix |
| ... |
| len |cpn|uncompress len| compressed payload |
| len(compressed payload) | 1| len(payload)| compress(payload) |
*/
/* clang-format on */

if (net->compress && remaining_bytes) {
(*vec).iov_base =
compress_packet(net, (uchar *)(*vec).iov_base, &(*vec).iov_len);
if (!(*vec).iov_base) {
reset_packet_write_state(net);
return 0;
| len |cpn|uncompress len| compressed payload |
| len(compressed payload) | 1| len(payload)| compress(payload) |
*/
/* clang-format on */
if (net->compress) {
(*vec).iov_base = compress_packet(
net, static_cast<uchar *>((*vec).iov_base), &(*vec).iov_len);
if (!(*vec).iov_base) {
reset_packet_write_state(net);
return 0;
}
compressed_buffers[net_async->compressed_buffers_size++] =
static_cast<uchar *>((*vec).iov_base);
}
compressed_buffers[net_async->compressed_buffers_size++] =
(uchar *)(*vec).iov_base;
}
++vec;
total_len -= bytes_queued;
try_coalesce = false;

/* Make sure we sent entire packets. */
if (total_len > 0) {
Expand Down

0 comments on commit d75beb2

Please sign in to comment.