Skip to content

Commit

Permalink
sql: reset conn buffers after each query
Browse files Browse the repository at this point in the history
There are two buffers in `pgwire.conn` that are used when sending results
to the client. They have to buffer each row in its entirety before flushing,
so wide rows can cause the capacity of these buffers to exceed the limit.

Previously, the buffers were never reallocated even when they exceeded the
size limit. This could cause a long-running session to hold on to large
amounts of memory until the session was ended. This increased the risk of
OOM, in addition to being inefficient.

This commit adds a method `maybeReallocate` to `pgwire.conn`, which
reallocates each buffer if it has reached the size limit. `maybeReallocate`
is called upon closing `Sync` and `Flush` command results. These are
natural places to reallocate the buffers, since they already flush the
buffers to the client and are called roughly at the statement granularity,
rather than for every row. This allows long-running sessions to bound their
memory usage without causing reallocation on every wide row that is sent
to the client.

Informs cockroachdb#80497

Release note (performance improvement): Long-running sql sessions are
now less likely to maintain large allocations for long periods of time,
which decreases the risk of OOM and improves memory utilization.
  • Loading branch information
DrewKimball committed Aug 15, 2022
1 parent 33b343d commit 1d035c9
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pkg/sql/pgwire/command_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,13 @@ func (r *commandResult) Close(ctx context.Context, t sql.TransactionStatusIndica
r.conn.bufferReadyForQuery(byte(t))
// The error is saved on conn.err.
_ /* err */ = r.conn.Flush(r.pos)
r.conn.maybeReallocate()
case emptyQueryResponse:
r.conn.bufferEmptyQueryResponse()
case flush:
// The error is saved on conn.err.
_ /* err */ = r.conn.Flush(r.pos)
r.conn.maybeReallocate()
case noCompletionMsg:
// nothing to do
default:
Expand Down
18 changes: 18 additions & 0 deletions pkg/sql/pgwire/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1593,12 +1593,30 @@ func (c *conn) Flush(pos sql.CmdPos) error {
// maybeFlush flushes the buffer to the network connection if it exceeded
// sessionArgs.ConnResultsBufferSize or if buffering is disabled.
func (c *conn) maybeFlush(pos sql.CmdPos, bufferingDisabled bool) error {
// Note that ConnResultsBufferSize cannot be changed during a session, so it
// is safe to use the value stored on sessionArgs.
if !bufferingDisabled && int64(c.writerState.buf.Len()) <= c.sessionArgs.ConnResultsBufferSize {
return nil
}
return c.Flush(pos)
}

// maybeReallocate checks whether the internal slices used to buffer data have
// overflowed their limits. If so, they will be reallocated to a smaller size.
// maybeReallocate should only be called after the connection has been flushed
// and a query has just been processed.
func (c *conn) maybeReallocate() {
// Note that ConnResultsBufferSize cannot be changed during a session, so it
// is safe to use the value stored on sessionArgs.
limit := int(c.sessionArgs.ConnResultsBufferSize)
if c.msgBuilder.wrapped.Len() == 0 && c.msgBuilder.wrapped.Cap() > limit {
c.msgBuilder.wrapped = bytes.Buffer{}
}
if c.writerState.buf.Len() == 0 && c.writerState.buf.Cap() > limit {
c.writerState.buf = bytes.Buffer{}
}
}

// LockCommunication is part of the ClientComm interface.
//
// The current implementation of conn writes results to the network
Expand Down

0 comments on commit 1d035c9

Please sign in to comment.