Skip to content

Commit

Permalink
Merge pull request #106698 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-23.1-106503

release-23.1: userfile: fix transaction retry upload bug
  • Loading branch information
stevendanna authored Jul 14, 2023
2 parents 2463cca + 9ecd132 commit a50dd18
Showing 1 changed file with 19 additions and 29 deletions.
48 changes: 19 additions & 29 deletions pkg/cloud/userfile/filetable/file_table_read_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,14 +463,19 @@ type payloadWriter struct {
payloadTableName string
}

// WriteChunk inserts a single row into the Payload table as an operation in the
// transaction txn.
// TODO(janexing): can the insert happen with a nil txn?
func (p *payloadWriter) WriteChunk(buf []byte, txn isql.Txn) (int, error) {
insertChunkQuery := fmt.Sprintf(`INSERT INTO %s VALUES ($1, $2, $3)`, p.payloadTableName)
_, err := txn.ExecEx(p.ctx, "insert-file-chunk", txn.KV(), p.execSessionDataOverride,
insertChunkQuery, p.fileID, p.byteOffset, buf)
if err != nil {
// WriteChunkInTxn inserts a single row into the Payload table as an
// operation in the transaction txn.
//
// TODO(janexing): Is it necessary to run the insert in a txn?
func (p *payloadWriter) WriteChunkInTxn(buf []byte) (int, error) {
if err := p.ief.Txn(p.ctx, func(
ctx context.Context, txn isql.Txn,
) error {
insertChunkQuery := fmt.Sprintf(`INSERT INTO %s VALUES ($1, $2, $3)`, p.payloadTableName)
_, err := txn.ExecEx(p.ctx, "insert-file-chunk", txn.KV(), p.execSessionDataOverride,
insertChunkQuery, p.fileID, p.byteOffset, buf)
return err
}); err != nil {
return 0, err
}

Expand Down Expand Up @@ -574,18 +579,10 @@ func (w *chunkWriter) Write(buf []byte) (int, error) {
// If the buffer has been filled to capacity, write the chunk inside a txn
// retry loop.
if w.buf.Len() == w.buf.Cap() {
// TODO(janexing): Is it necessary to run the following within a txn?
if err := w.pw.ief.Txn(w.pw.ctx, func(
ctx context.Context, txn isql.Txn,
) error {
if n, err := w.pw.WriteChunk(w.buf.Bytes(), txn); err != nil {
return err
} else if n != w.buf.Len() {
return errors.Wrap(io.ErrShortWrite, "error when writing in chunkWriter")
}
return nil
}); err != nil {
if n, err := w.pw.WriteChunkInTxn(w.buf.Bytes()); err != nil {
return 0, err
} else if n != w.buf.Len() {
return n, errors.Wrap(io.ErrShortWrite, "error when writing in chunkWriter")
}
w.buf.Reset()
}
Expand All @@ -606,17 +603,10 @@ func (w *chunkWriter) Close() error {
// payloadWriter Write() method, then the txn is aborted and the error is
// propagated here.
if w.buf.Len() > 0 {
if err := w.pw.ief.Txn(w.pw.ctx, func(
ctx context.Context, txn isql.Txn,
) error {
if n, err := w.pw.WriteChunk(w.buf.Bytes(), txn); err != nil {
return err
} else if n != w.buf.Len() {
return errors.Wrap(io.ErrShortWrite, "error when closing chunkWriter")
}
return nil
}); err != nil {
if n, err := w.pw.WriteChunkInTxn(w.buf.Bytes()); err != nil {
return err
} else if n != w.buf.Len() {
return errors.Wrap(io.ErrShortWrite, "error when closing chunkWriter")
}
}

Expand Down

0 comments on commit a50dd18

Please sign in to comment.