Skip to content

Commit

Permalink
Merge pull request cockroachdb#84921 from DrewKimball/backport21.2-83870
Browse files Browse the repository at this point in the history
release-21.2: pgwire: fix buffer size limiting behavior for addBatch
  • Loading branch information
yuzefovich authored Jul 25, 2022
2 parents f3b584d + cb9d20d commit 8f4dc94
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 23 deletions.
28 changes: 12 additions & 16 deletions pkg/sql/pgwire/command_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (r *commandResult) SetError(err error) {

// addInternal is the skeleton of AddRow and AddBatch implementations.
// bufferData should update rowsAffected and buffer the data accordingly.
func (r *commandResult) addInternal(bufferData func()) error {
func (r *commandResult) addInternal(bufferData func() error) error {
r.assertNotReleased()
if r.err != nil {
panic(errors.AssertionFailedf("can't call AddRow after having set error: %s",
Expand All @@ -205,30 +205,29 @@ func (r *commandResult) addInternal(bufferData func()) error {
panic("can't send row after error")
}

bufferData()

var err error
if r.bufferingDisabled {
err = r.conn.Flush(r.pos)
} else {
_ /* flushed */, err = r.conn.maybeFlush(r.pos)
if err := bufferData(); err != nil {
return err
}
return err

return r.conn.maybeFlush(r.pos, r.bufferingDisabled)
}

// AddRow is part of the sql.RestrictedCommandResult interface.
func (r *commandResult) AddRow(ctx context.Context, row tree.Datums) error {
return r.addInternal(func() {
return r.addInternal(func() error {
r.rowsAffected++
r.conn.bufferRow(ctx, row, r.formatCodes, r.conv, r.location, r.types)
return nil
})
}

// AddBatch is part of the sql.RestrictedCommandResult interface.
func (r *commandResult) AddBatch(ctx context.Context, batch coldata.Batch) error {
return r.addInternal(func() {
return r.addInternal(func() error {
r.rowsAffected += batch.Length()
r.conn.bufferBatch(ctx, batch, r.formatCodes, r.conv, r.location)
return r.conn.bufferBatch(
ctx, batch, r.formatCodes, r.conv, r.location, r.pos, r.bufferingDisabled,
)
})
}

Expand Down Expand Up @@ -439,10 +438,7 @@ func (r *limitedCommandResult) AddRow(ctx context.Context, row tree.Datums) erro

return r.moreResultsNeeded(ctx)
}
if _ /* flushed */, err := r.conn.maybeFlush(r.pos); err != nil {
return err
}
return nil
return r.conn.maybeFlush(r.pos, r.bufferingDisabled)
}

// SupportsAddBatch is part of the sql.RestrictedCommandResult interface.
Expand Down
21 changes: 14 additions & 7 deletions pkg/sql/pgwire/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1281,7 +1281,8 @@ func (c *conn) bufferRow(
}

// bufferBatch serializes a batch and adds all the rows from it to the buffer.
// It is a noop for zero-length batch.
// It is a noop for zero-length batch. Depending on the buffer size limit,
// bufferBatch may flush the buffered data to the connection.
//
// formatCodes describes the desired encoding for each column. It can be nil, in
// which case all columns are encoded using the text encoding. Otherwise, it
Expand All @@ -1292,7 +1293,9 @@ func (c *conn) bufferBatch(
formatCodes []pgwirebase.FormatCode,
conv sessiondatapb.DataConversionConfig,
sessionLoc *time.Location,
) {
pos sql.CmdPos,
bufferingDisabled bool,
) error {
sel := batch.Selection()
n := batch.Length()
vecs := batch.ColVecs()
Expand Down Expand Up @@ -1321,7 +1324,11 @@ func (c *conn) bufferBatch(
if err := c.msgBuilder.finishMsg(&c.writerState.buf); err != nil {
panic(fmt.Sprintf("unexpected err from buffer: %s", err))
}
if err := c.maybeFlush(pos, bufferingDisabled); err != nil {
return err
}
}
return nil
}

func (c *conn) bufferReadyForQuery(txnStatus byte) {
Expand Down Expand Up @@ -1524,12 +1531,12 @@ func (c *conn) Flush(pos sql.CmdPos) error {
}

// maybeFlush flushes the buffer to the network connection if it exceeded
// sessionArgs.ConnResultsBufferSize.
func (c *conn) maybeFlush(pos sql.CmdPos) (bool, error) {
if int64(c.writerState.buf.Len()) <= c.sessionArgs.ConnResultsBufferSize {
return false, nil
// sessionArgs.ConnResultsBufferSize or if buffering is disabled.
func (c *conn) maybeFlush(pos sql.CmdPos, bufferingDisabled bool) error {
if !bufferingDisabled && int64(c.writerState.buf.Len()) <= c.sessionArgs.ConnResultsBufferSize {
return nil
}
return true, c.Flush(pos)
return c.Flush(pos)
}

// LockCommunication is part of the ClientComm interface.
Expand Down

0 comments on commit 8f4dc94

Please sign in to comment.