Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
86891: bazci: enable --flaky_test_attempts=3 for Unit tests on release branches r=rickystewart,healthy-pod,renatolabs a=srosenberg

When a test fails, --flaky_test_attempts=3 will cause up to two retry
attempts by executing the corresponding test binary (shard).
If a test passes on either attempt, it is reported 'FLAKY' in the
test result summary. In addition, each attempt yields an (XML) log
under 'test_attempts' subdirectory. The attempts are now copied
into the artifactsDir.

Note, while the test result summary may say the test is 'FLAKY', it's
reported as 'PASS' as per main test.xml. Thus, a 'FLAKY' test will
not fail the CI; i.e., a test succeeding after up to two
retries is assumed to have _passed_ whereas three consecutive
failures result in a failed test. Consequently, CI now has a higher
probability of passing than before this change. However, the two additional
attempts per test binary (shard) could slow down the build. Hence, initially
the plan is to enable only on release branches and not staging (i.e., bors).

This change supports [1]. In a follow-up PR, a test reporter could be
augmented to parse attempt logs thereby obtaining a set of FLAKY tests
which should be skipped on subsequent builds.

[1] #81293

Release justification: CI improvement
Release note: None
Epic: None

92955: opt: inline UDFs as subqueries r=mgartner a=mgartner

UDFs are now inlined as subqueries by a normalization rule when possible, speeding up their evaluation.

Epic: CRDB-20370

Release note (performance improvement): Some types of user-defined functions are now inlined in query plans as relation expressions, which speeds up their evaluation. UDFs must be non-volatile and have a single statement in the function body to be inlined.

94660: colserde: perform memory accounting for scratch slices in the converter r=yuzefovich a=yuzefovich

This commit introduces the memory accounting for values and offsets
scratch slices that are being reused across `BatchToArrow` calls since
recently. This required a lot of plumbing to propagate the memory
account from all places. The converter is careful to only grow and
shrink the account by its own usage, so the same memory account can be
reused across multiple converters (as long as there is no concurrency
going on, and we only can have concurrency for hash router outputs, so
there we give each output a separate account).

An additional minor change is that now in `diskQueue.Enqueue` we
properly `Close` the `FileSerializer` before `nil`-ing it out. This
isn't a problem per se since it is the caller's responsibility to close
the account used by the serializer, but it's nice to properly release
the accounted for bytes.

Epic: None

Release note: None

94702: workload: fix non-determinism in TPC-H data generation r=rytaft a=rytaft

This commit fixes the non-determinism in the TPC-H data generation by using a local slice for the random permutation of indexes into `randPartNames`. It also fixes the permutation algorithm to use a modified Fisher–Yates shuffle.

Fixes #93958

Release note: None

94713: sql: add user-facing error for invalid job type in job_payload_type builtin r=jayshrivastava a=jayshrivastava

### sql: add user-facing error for invalid job type in job_payload_type builtin

Previously, the `job_payload_type` builtin would return an internal error
if the payload could be unmarshalled, but the type could not be determined.
This changes ensures the builtin returns an appropriate user-facing error
in this case.

Epic: none
Fixes: #94680

Release note: None

94723: kvserver: check lease rebalance target exists in cached storelist r=erikgrinaker a=kvoli

From #91593 it was possible for a new lease target to have its store descriptor dereferenced despite not existing in a list of candidate stores earlier.

This patch resolves the dereference.

fixes: #94688

Release note: None

Co-authored-by: Stan Rosenberg <[email protected]>
Co-authored-by: Marcus Gartner <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Rebecca Taft <[email protected]>
Co-authored-by: Jayant Shrivastava <[email protected]>
Co-authored-by: Austen McClernon <[email protected]>
  • Loading branch information
7 people committed Jan 4, 2023
7 parents e3abcd5 + c85a03f + 4370c65 + 2922777 + 06bc01c + ef5ae32 + 39e86ad commit cb3d6db
Show file tree
Hide file tree
Showing 74 changed files with 843 additions and 260 deletions.
12 changes: 10 additions & 2 deletions build/teamcity/cockroach/ci/tests/unit_tests_impl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@
set -xeuo pipefail

bazel build //pkg/cmd/bazci --config=ci

EXTRA_PARAMS=""

if tc_release_branch; then
# enable up to 2 retries (3 attempts, worst-case) per test executable to report flakes but only on release branches (i.e., not staging)
EXTRA_PARAMS=" --flaky_test_attempts=3"
fi

$(bazel info bazel-bin --config=ci)/pkg/cmd/bazci/bazci_/bazci -- test --config=cinolint --config=simplestamp -c fastbuild \
//pkg:small_tests //pkg:medium_tests //pkg:large_tests //pkg:enormous_tests \
--profile=/artifacts/profile.gz
//pkg:small_tests //pkg:medium_tests //pkg:large_tests //pkg:enormous_tests \
--profile=/artifacts/profile.gz $EXTRA_PARAMS
4 changes: 1 addition & 3 deletions pkg/ccl/workloadccl/allccl/all_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,6 @@ func hashTableInitialData(
func TestDeterministicInitialData(t *testing.T) {
defer leaktest.AfterTest(t)()

skip.WithIssue(t, 93958, "flaky test")

// There are other tests that run initial data generation under race, so we
// don't get anything from running this one under race as well.
skip.UnderRace(t, "uninteresting under race")
Expand All @@ -280,7 +278,7 @@ func TestDeterministicInitialData(t *testing.T) {
`sqlsmith`: 0xcbf29ce484222325,
`startrek`: 0xa0249fbdf612734c,
`tpcc`: 0xab32e4f5e899eb2f,
`tpch`: 0xe013881749bb67e8,
`tpch`: 0xe4fd28db230b9149,
`ycsb`: 0x1244ea1c29ef67f6,
}

Expand Down
1 change: 1 addition & 0 deletions pkg/col/colserde/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util/duration",
"//pkg/util/mon",
"@com_github_apache_arrow_go_arrow//array",
"@com_github_apache_arrow_go_arrow//memory",
"@com_github_cockroachdb_errors//:errors",
Expand Down
58 changes: 51 additions & 7 deletions pkg/col/colserde/arrowbatchconverter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package colserde

import (
"context"
"encoding/binary"
"fmt"
"reflect"
Expand All @@ -24,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/errors"
)

Expand All @@ -47,8 +49,14 @@ const (
type ArrowBatchConverter struct {
typs []*types.T

// TODO(yuzefovich): perform memory accounting for these slices.
scratch struct {
// Fields below are only used during batch-to-arrow conversion.

// usesOffsets indicates whether at least one vector needs values and
// offsets scratch slices (in other words, it needs three buffers).
usesOffsets bool
acc *mon.BoundAccount
accountedFor int64
scratch struct {
// arrowData is used as scratch space returned as the corresponding
// conversion result.
arrowData []array.Data
Expand All @@ -68,13 +76,23 @@ type ArrowBatchConverter struct {
// NewArrowBatchConverter converts coldata.Batches to []array.Data and back
// again according to the schema specified by typs. Converting data that does
// not conform to typs results in undefined behavior.
func NewArrowBatchConverter(typs []*types.T, mode ConversionMode) (*ArrowBatchConverter, error) {
c := &ArrowBatchConverter{typs: typs}
//
// acc must be a non-nil unlimited memory account unless ArrowToBatchOnly mode
// is used. The account will be shrunk in Release, however, it is the caller's
// responsibility to close the account.
// NOTE: Release can only be called before the account is closed.
func NewArrowBatchConverter(
typs []*types.T, mode ConversionMode, acc *mon.BoundAccount,
) (*ArrowBatchConverter, error) {
c := &ArrowBatchConverter{typs: typs, acc: acc}
if mode == ArrowToBatchOnly {
// All the allocations below are only used in BatchToArrow, so we don't
// need to allocate them.
return c, nil
}
if acc == nil {
return nil, errors.AssertionFailedf("nil account is given with the conversion mode other than ArrowToBatchOnly")
}
c.scratch.arrowData = make([]array.Data, len(typs))
c.scratch.buffers = make([][]*memory.Buffer, len(typs))
// Calculate the number of buffers needed for all types to be able to batch
Expand All @@ -99,6 +117,7 @@ func NewArrowBatchConverter(typs []*types.T, mode ConversionMode) (*ArrowBatchCo
}
}
if needOffsets {
c.usesOffsets = true
c.scratch.values = make([][]byte, len(typs))
c.scratch.offsets = make([][]int32, len(typs))
}
Expand All @@ -109,7 +128,9 @@ func NewArrowBatchConverter(typs []*types.T, mode ConversionMode) (*ArrowBatchCo
// arrow []array.Data. It is assumed that the batch is not larger than
// coldata.BatchSize(). The returned []array.Data may only be used until the
// next call to BatchToArrow.
func (c *ArrowBatchConverter) BatchToArrow(batch coldata.Batch) ([]array.Data, error) {
func (c *ArrowBatchConverter) BatchToArrow(
ctx context.Context, batch coldata.Batch,
) ([]array.Data, error) {
if batch.Width() != len(c.typs) {
return nil, errors.AssertionFailedf("mismatched batch width and schema length: %d != %d", batch.Width(), len(c.typs))
}
Expand Down Expand Up @@ -314,7 +335,7 @@ func (c *ArrowBatchConverter) BatchToArrow(batch coldata.Batch) ([]array.Data, e
nil /* dtype */, n, c.scratch.buffers[vecIdx], nil /* childData */, 0 /* nulls */, 0, /* offset */
)
}
return c.scratch.arrowData, nil
return c.scratch.arrowData, c.accountForScratch(ctx)
}

// unsafeCastOffsetsArray unsafe-casts the input offsetsBytes slice to point at
Expand All @@ -328,6 +349,26 @@ func unsafeCastOffsetsArray(offsetsInt32 []int32, offsetsBytes *[]byte) {
bytesHeader.Cap = int32Header.Cap * int(memsize.Int32)
}

// accountForScratch performs memory accounting for values and offsets scratch
// slices. Note that we ignore the overhead of the slices themselves as well as
// of other scratch objects since those should be negligible in footprint.
func (c *ArrowBatchConverter) accountForScratch(ctx context.Context) error {
if !c.usesOffsets {
return nil
}
var footprint int64
for i := range c.scratch.values {
// Some of these might be nil in which case footprint won't change.
footprint += int64(cap(c.scratch.values[i]))
footprint += int64(cap(c.scratch.offsets[i])) * memsize.Int32
}
if err := c.acc.Resize(ctx, c.accountedFor, footprint); err != nil {
return err
}
c.accountedFor = footprint
return nil
}

// ArrowToBatch converts []array.Data to a coldata.Batch. There must not be
// more than coldata.BatchSize() elements in data.
//
Expand Down Expand Up @@ -528,6 +569,9 @@ func getValueBytesAndOffsets(

// Release should be called once the converter is no longer needed so that its
// memory could be GCed.
func (c *ArrowBatchConverter) Release() {
func (c *ArrowBatchConverter) Release(ctx context.Context) {
if c.acc != nil {
c.acc.Shrink(ctx, c.accountedFor)
}
*c = ArrowBatchConverter{}
}
22 changes: 14 additions & 8 deletions pkg/col/colserde/arrowbatchconverter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package colserde_test

import (
"bytes"
"context"
"fmt"
"testing"

Expand Down Expand Up @@ -47,14 +48,15 @@ func TestArrowBatchConverterRandom(t *testing.T) {
defer leaktest.AfterTest(t)()

typs, b := randomBatch(testAllocator)
c, err := colserde.NewArrowBatchConverter(typs, colserde.BiDirectional)
c, err := colserde.NewArrowBatchConverter(typs, colserde.BiDirectional, testMemAcc)
require.NoError(t, err)
defer c.Release(context.Background())

// Make a copy of the original batch because the converter modifies and casts
// data without copying for performance reasons.
expected := coldatatestutils.CopyBatch(b, typs, testColumnFactory)

arrowData, err := c.BatchToArrow(b)
arrowData, err := c.BatchToArrow(context.Background(), b)
require.NoError(t, err)
actual := testAllocator.NewMemBatchWithFixedCapacity(typs, b.Length())
require.NoError(t, c.ArrowToBatch(arrowData, b.Length(), actual))
Expand All @@ -68,7 +70,7 @@ func roundTripBatch(
src, dest coldata.Batch, c *colserde.ArrowBatchConverter, r *colserde.RecordBatchSerializer,
) error {
var buf bytes.Buffer
arrowDataIn, err := c.BatchToArrow(src)
arrowDataIn, err := c.BatchToArrow(context.Background(), src)
if err != nil {
return err
}
Expand Down Expand Up @@ -100,8 +102,9 @@ func TestRecordBatchRoundtripThroughBytes(t *testing.T) {
typs, src = randomBatch(testAllocator)
}
dest := testAllocator.NewMemBatchWithMaxCapacity(typs)
c, err := colserde.NewArrowBatchConverter(typs, colserde.BiDirectional)
c, err := colserde.NewArrowBatchConverter(typs, colserde.BiDirectional, testMemAcc)
require.NoError(t, err)
defer c.Release(context.Background())
r, err := colserde.NewRecordBatchSerializer(typs)
require.NoError(t, err)

Expand Down Expand Up @@ -215,16 +218,18 @@ func runConversionBenchmarks(
}

func BenchmarkArrowBatchConverter(b *testing.B) {
ctx := context.Background()
runConversionBenchmarks(
b,
"BatchToArrow",
func(b *testing.B, batch coldata.Batch, typ *types.T) {
c, err := colserde.NewArrowBatchConverter([]*types.T{typ}, colserde.BiDirectional)
c, err := colserde.NewArrowBatchConverter([]*types.T{typ}, colserde.BiDirectional, testMemAcc)
require.NoError(b, err)
defer c.Release(ctx)
var data []array.Data
b.ResetTimer()
for i := 0; i < b.N; i++ {
data, _ = c.BatchToArrow(batch)
data, _ = c.BatchToArrow(ctx, batch)
if len(data) != 1 {
b.Fatal("expected arrow batch of length 1")
}
Expand All @@ -235,9 +240,10 @@ func BenchmarkArrowBatchConverter(b *testing.B) {
},
"ArrowToBatch",
func(b *testing.B, batch coldata.Batch, typ *types.T) {
c, err := colserde.NewArrowBatchConverter([]*types.T{typ}, colserde.BiDirectional)
c, err := colserde.NewArrowBatchConverter([]*types.T{typ}, colserde.BiDirectional, testMemAcc)
require.NoError(b, err)
data, err := c.BatchToArrow(batch)
defer c.Release(ctx)
data, err := c.BatchToArrow(ctx, batch)
dataCopy := make([]array.Data, len(data))
require.NoError(b, err)
result := testAllocator.NewMemBatchWithMaxCapacity([]*types.T{typ})
Expand Down
12 changes: 8 additions & 4 deletions pkg/col/colserde/conversion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package colserde_test

import (
"bytes"
"context"
"testing"

"github.com/apache/arrow/go/arrow/array"
Expand All @@ -22,8 +23,9 @@ import (
)

func BenchmarkConversion(b *testing.B) {
ctx := context.Background()
createSerializer := func(typ *types.T) (*colserde.ArrowBatchConverter, *colserde.RecordBatchSerializer) {
c, err := colserde.NewArrowBatchConverter([]*types.T{typ}, colserde.BatchToArrowOnly)
c, err := colserde.NewArrowBatchConverter([]*types.T{typ}, colserde.BatchToArrowOnly, testMemAcc)
require.NoError(b, err)
s, err := colserde.NewRecordBatchSerializer([]*types.T{typ})
require.NoError(b, err)
Expand All @@ -34,10 +36,11 @@ func BenchmarkConversion(b *testing.B) {
"Serialize",
func(b *testing.B, batch coldata.Batch, typ *types.T) {
c, s := createSerializer(typ)
defer c.Release(ctx)
var buf bytes.Buffer
b.ResetTimer()
for i := 0; i < b.N; i++ {
data, _ := c.BatchToArrow(batch)
data, _ := c.BatchToArrow(ctx, batch)
if len(data) != 1 {
b.Fatal("expected arrow batch of length 1")
}
Expand All @@ -56,8 +59,9 @@ func BenchmarkConversion(b *testing.B) {
var serialized []byte
{
c, s := createSerializer(typ)
defer c.Release(ctx)
var buf bytes.Buffer
data, _ := c.BatchToArrow(batch)
data, _ := c.BatchToArrow(ctx, batch)
if len(data) != 1 {
b.Fatal("expected arrow batch of length 1")
}
Expand All @@ -70,7 +74,7 @@ func BenchmarkConversion(b *testing.B) {
}
serialized = buf.Bytes()
}
c, err := colserde.NewArrowBatchConverter([]*types.T{typ}, colserde.ArrowToBatchOnly)
c, err := colserde.NewArrowBatchConverter([]*types.T{typ}, colserde.ArrowToBatchOnly, nil /* acc */)
require.NoError(b, err)
s, err := colserde.NewRecordBatchSerializer([]*types.T{typ})
require.NoError(b, err)
Expand Down
24 changes: 14 additions & 10 deletions pkg/col/colserde/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package colserde

import (
"bytes"
"context"
"encoding/binary"
"io"
"os"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/errors"
mmap "github.com/edsrzf/mmap-go"
flatbuffers "github.com/google/flatbuffers/go"
Expand Down Expand Up @@ -54,9 +56,11 @@ type FileSerializer struct {
}

// NewFileSerializer creates a FileSerializer for the given types. The caller is
// responsible for closing the given writer.
func NewFileSerializer(w io.Writer, typs []*types.T) (*FileSerializer, error) {
a, err := NewArrowBatchConverter(typs, BatchToArrowOnly)
// responsible for closing the given writer as well as the given memory account.
func NewFileSerializer(
w io.Writer, typs []*types.T, acc *mon.BoundAccount,
) (*FileSerializer, error) {
a, err := NewArrowBatchConverter(typs, BatchToArrowOnly, acc)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -104,10 +108,10 @@ func (s *FileSerializer) Reset(w io.Writer) error {
}

// AppendBatch adds one batch of columnar data to the file.
func (s *FileSerializer) AppendBatch(batch coldata.Batch) error {
func (s *FileSerializer) AppendBatch(ctx context.Context, batch coldata.Batch) error {
offset := int64(s.w.written)

arrow, err := s.a.BatchToArrow(batch)
arrow, err := s.a.BatchToArrow(ctx, batch)
if err != nil {
return err
}
Expand Down Expand Up @@ -153,8 +157,8 @@ func (s *FileSerializer) Finish() error {
}

// Close releases the resources of the serializer.
func (s *FileSerializer) Close() {
s.a.Release()
func (s *FileSerializer) Close(ctx context.Context) {
s.a.Release(ctx)
}

// FileDeserializer decodes columnar data batches from files encoded according
Expand Down Expand Up @@ -213,7 +217,7 @@ func newFileDeserializer(
}
d.typs = typs

if d.a, err = NewArrowBatchConverter(typs, ArrowToBatchOnly); err != nil {
if d.a, err = NewArrowBatchConverter(typs, ArrowToBatchOnly, nil /* acc */); err != nil {
return nil, err
}
if d.rb, err = NewRecordBatchSerializer(typs); err != nil {
Expand All @@ -225,8 +229,8 @@ func newFileDeserializer(
}

// Close releases any resources held by this deserializer.
func (d *FileDeserializer) Close() error {
d.a.Release()
func (d *FileDeserializer) Close(ctx context.Context) error {
d.a.Release(ctx)
return d.bufCloseFn()
}

Expand Down
Loading

0 comments on commit cb3d6db

Please sign in to comment.