diff --git a/pkg/cloud/userfile/filetable/file_table_read_writer.go b/pkg/cloud/userfile/filetable/file_table_read_writer.go index 51533f14eb8e..bc6f037d2570 100644 --- a/pkg/cloud/userfile/filetable/file_table_read_writer.go +++ b/pkg/cloud/userfile/filetable/file_table_read_writer.go @@ -463,14 +463,19 @@ type payloadWriter struct { payloadTableName string } -// WriteChunk inserts a single row into the Payload table as an operation in the -// transaction txn. -// TODO(janexing): can the insert happen with a nil txn? -func (p *payloadWriter) WriteChunk(buf []byte, txn isql.Txn) (int, error) { - insertChunkQuery := fmt.Sprintf(`INSERT INTO %s VALUES ($1, $2, $3)`, p.payloadTableName) - _, err := txn.ExecEx(p.ctx, "insert-file-chunk", txn.KV(), p.execSessionDataOverride, - insertChunkQuery, p.fileID, p.byteOffset, buf) - if err != nil { +// WriteChunkInTxn inserts a single row into the Payload table as an +// operation in the transaction txn. +// +// TODO(janexing): Is it necessary to run the insert in a txn? +func (p *payloadWriter) WriteChunkInTxn(buf []byte) (int, error) { + if err := p.ief.Txn(p.ctx, func( + ctx context.Context, txn isql.Txn, + ) error { + insertChunkQuery := fmt.Sprintf(`INSERT INTO %s VALUES ($1, $2, $3)`, p.payloadTableName) + _, err := txn.ExecEx(p.ctx, "insert-file-chunk", txn.KV(), p.execSessionDataOverride, + insertChunkQuery, p.fileID, p.byteOffset, buf) + return err + }); err != nil { return 0, err } @@ -574,18 +579,10 @@ func (w *chunkWriter) Write(buf []byte) (int, error) { // If the buffer has been filled to capacity, write the chunk inside a txn // retry loop. if w.buf.Len() == w.buf.Cap() { - // TODO(janexing): Is it necessary to run the following within a txn? - if err := w.pw.ief.Txn(w.pw.ctx, func( - ctx context.Context, txn isql.Txn, - ) error { - if n, err := w.pw.WriteChunk(w.buf.Bytes(), txn); err != nil { - return err - } else if n != w.buf.Len() { - return errors.Wrap(io.ErrShortWrite, "error when writing in chunkWriter") - } - return nil - }); err != nil { + if n, err := w.pw.WriteChunkInTxn(w.buf.Bytes()); err != nil { return 0, err + } else if n != w.buf.Len() { + return n, errors.Wrap(io.ErrShortWrite, "error when writing in chunkWriter") } w.buf.Reset() } @@ -606,17 +603,10 @@ func (w *chunkWriter) Close() error { // payloadWriter Write() method, then the txn is aborted and the error is // propagated here. if w.buf.Len() > 0 { - if err := w.pw.ief.Txn(w.pw.ctx, func( - ctx context.Context, txn isql.Txn, - ) error { - if n, err := w.pw.WriteChunk(w.buf.Bytes(), txn); err != nil { - return err - } else if n != w.buf.Len() { - return errors.Wrap(io.ErrShortWrite, "error when closing chunkWriter") - } - return nil - }); err != nil { + if n, err := w.pw.WriteChunkInTxn(w.buf.Bytes()); err != nil { return err + } else if n != w.buf.Len() { + return errors.Wrap(io.ErrShortWrite, "error when closing chunkWriter") } }