diff --git a/pkg/kv/bulk/sst_batcher.go b/pkg/kv/bulk/sst_batcher.go index 6485b413638f..92d6115e80a2 100644 --- a/pkg/kv/bulk/sst_batcher.go +++ b/pkg/kv/bulk/sst_batcher.go @@ -316,6 +316,23 @@ func (b *SSTBatcher) flushIfNeeded(ctx context.Context, nextKey roachpb.Key) err } if b.sstWriter.DataSize >= ingestFileSize(b.settings) { + // We're at/over size target, so we want to flush, but first check if we are + // at a new row boundary. Having row-aligned boundaries is not actually + // required by anything, but has the nice property of meaning a split will + // fall between files. This is particularly useful mid-IMPORT when we split + // at the beginning of the next file, as that split is at the row boundary + // at *or before* that file's start. If the file boundary is not aligned, + // the prior file overhangs beginning of the row in which the next file + // starts, so when we split at that row, that overhang into the RHS that we + // just wrote will be rewritten by the subsequent scatter. By waiting for a + // row boundary, we ensure any split is actually between files. + prevRow, prevErr := keys.EnsureSafeSplitKey(b.batchEndKey) + nextRow, nextErr := keys.EnsureSafeSplitKey(nextKey) + if prevErr == nil && nextErr == nil && bytes.Equal(prevRow, nextRow) { + // An error decoding either key implies it is not a valid row key and thus + // not the same row for our purposes; we don't care what the error is. + return nil // keep going to row boundary. + } if err := b.doFlush(ctx, sizeFlush); err != nil { return err }