diff --git a/pkg/sql/pgwire/command_result.go b/pkg/sql/pgwire/command_result.go index ef6bfc17bca5..40c4ba19ce4e 100644 --- a/pkg/sql/pgwire/command_result.go +++ b/pkg/sql/pgwire/command_result.go @@ -161,11 +161,13 @@ func (r *commandResult) Close(ctx context.Context, t sql.TransactionStatusIndica r.conn.bufferReadyForQuery(byte(t)) // The error is saved on conn.err. _ /* err */ = r.conn.Flush(r.pos) + r.conn.maybeReallocate() case emptyQueryResponse: r.conn.bufferEmptyQueryResponse() case flush: // The error is saved on conn.err. _ /* err */ = r.conn.Flush(r.pos) + r.conn.maybeReallocate() case noCompletionMsg: // nothing to do default: @@ -435,7 +437,7 @@ func (r *limitedCommandResult) AddRow(ctx context.Context, row tree.Datums) erro return r.moreResultsNeeded(ctx) } - return r.conn.maybeFlush(r.pos, r.bufferingDisabled) + return nil } // SupportsAddBatch is part of the sql.RestrictedCommandResult interface. diff --git a/pkg/sql/pgwire/conn.go b/pkg/sql/pgwire/conn.go index a50a722c9a4b..9b721fa10454 100644 --- a/pkg/sql/pgwire/conn.go +++ b/pkg/sql/pgwire/conn.go @@ -1593,12 +1593,30 @@ func (c *conn) Flush(pos sql.CmdPos) error { // maybeFlush flushes the buffer to the network connection if it exceeded // sessionArgs.ConnResultsBufferSize or if buffering is disabled. func (c *conn) maybeFlush(pos sql.CmdPos, bufferingDisabled bool) error { + // Note that ConnResultsBufferSize cannot be changed during a session, so it + // is safe to use the value stored on sessionArgs. if !bufferingDisabled && int64(c.writerState.buf.Len()) <= c.sessionArgs.ConnResultsBufferSize { return nil } return c.Flush(pos) } +// maybeReallocate checks whether the internal slices used to buffer data have +// overflowed their limits. If so, they will be reallocated to a smaller size. +// maybeReallocate should only be called after the connection has been flushed +// and a query has just been processed. +func (c *conn) maybeReallocate() { + // Note that ConnResultsBufferSize cannot be changed during a session, so it + // is safe to use the value stored on sessionArgs. + limit := int(c.sessionArgs.ConnResultsBufferSize) + if c.msgBuilder.wrapped.Len() == 0 && c.msgBuilder.wrapped.Cap() > limit { + c.msgBuilder.wrapped = bytes.Buffer{} + } + if c.writerState.buf.Len() == 0 && c.writerState.buf.Cap() > limit { + c.writerState.buf = bytes.Buffer{} + } +} + // LockCommunication is part of the ClientComm interface. // // The current implementation of conn writes results to the network