Skip to content

Commit

Permalink
Merge #85949
Browse files Browse the repository at this point in the history
85949: sql: reset conn buffers after each query r=DrewKimball a=DrewKimball

**sql: reset conn buffers after each query**

There are two buffers in `pgwire.conn` that are used when sending results
to the client. They have to buffer each row in its entirety before flushing,
so wide rows can cause the capacity of these buffers to exceed the limit.

Previously, the buffers were never reallocated even when they exceeded the
size limit. This could cause a long-running session to hold on to large
amounts of memory until the session was ended. This increased the risk of
OOM, in addition to being inefficient.

This commit adds a method `maybeReallocate` to `pgwire.conn`, which
reallocates each buffer if it has reached the size limit. `maybeReallocate`
is called upon closing `Sync` and `Flush` command results. These are
natural places to reallocate the buffers, since they already flush the
buffers to the client and are called roughly at the statement granularity,
rather than for every row. This allows long-running sessions to bound their
memory usage without causing reallocation on every wide row that is sent
to the client.

Informs #80497

Release note (performance improvement): Long-running sql sessions are
now less likely to maintain large allocations for long periods of time,
which decreases the risk of OOM and improves memory utilization.

Release justification: bug fix to reduce likelihood of OOM

**sql: remove unnecessary maybeFlush call**

This commit fixes an oversight of #83870 that added a redundant call to
`maybeAppend` in `limitedCommandResult.AddRow`.

Release note: None

Release justification: low-risk fix for oversight in previous bug fix

Co-authored-by: DrewKimball <[email protected]>
  • Loading branch information
craig[bot] and DrewKimball committed Aug 18, 2022
2 parents 5c2c62e + 66a2b01 commit 04c6a1a
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 1 deletion.
4 changes: 3 additions & 1 deletion pkg/sql/pgwire/command_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,13 @@ func (r *commandResult) Close(ctx context.Context, t sql.TransactionStatusIndica
r.conn.bufferReadyForQuery(byte(t))
// The error is saved on conn.err.
_ /* err */ = r.conn.Flush(r.pos)
r.conn.maybeReallocate()
case emptyQueryResponse:
r.conn.bufferEmptyQueryResponse()
case flush:
// The error is saved on conn.err.
_ /* err */ = r.conn.Flush(r.pos)
r.conn.maybeReallocate()
case noCompletionMsg:
// nothing to do
default:
Expand Down Expand Up @@ -435,7 +437,7 @@ func (r *limitedCommandResult) AddRow(ctx context.Context, row tree.Datums) erro

return r.moreResultsNeeded(ctx)
}
return r.conn.maybeFlush(r.pos, r.bufferingDisabled)
return nil
}

// SupportsAddBatch is part of the sql.RestrictedCommandResult interface.
Expand Down
18 changes: 18 additions & 0 deletions pkg/sql/pgwire/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1593,12 +1593,30 @@ func (c *conn) Flush(pos sql.CmdPos) error {
// maybeFlush flushes the buffer to the network connection if it exceeded
// sessionArgs.ConnResultsBufferSize or if buffering is disabled.
func (c *conn) maybeFlush(pos sql.CmdPos, bufferingDisabled bool) error {
// Note that ConnResultsBufferSize cannot be changed during a session, so it
// is safe to use the value stored on sessionArgs.
if !bufferingDisabled && int64(c.writerState.buf.Len()) <= c.sessionArgs.ConnResultsBufferSize {
return nil
}
return c.Flush(pos)
}

// maybeReallocate checks whether the internal slices used to buffer data have
// overflowed their limits. If so, they will be reallocated to a smaller size.
// maybeReallocate should only be called after the connection has been flushed
// and a query has just been processed.
func (c *conn) maybeReallocate() {
// Note that ConnResultsBufferSize cannot be changed during a session, so it
// is safe to use the value stored on sessionArgs.
limit := int(c.sessionArgs.ConnResultsBufferSize)
if c.msgBuilder.wrapped.Len() == 0 && c.msgBuilder.wrapped.Cap() > limit {
c.msgBuilder.wrapped = bytes.Buffer{}
}
if c.writerState.buf.Len() == 0 && c.writerState.buf.Cap() > limit {
c.writerState.buf = bytes.Buffer{}
}
}

// LockCommunication is part of the ClientComm interface.
//
// The current implementation of conn writes results to the network
Expand Down

0 comments on commit 04c6a1a

Please sign in to comment.