diff --git a/build/bazelutil/bazelbuild.sh b/build/bazelutil/bazelbuild.sh index 79e2497785e1..58ddf97d5bd4 100755 --- a/build/bazelutil/bazelbuild.sh +++ b/build/bazelutil/bazelbuild.sh @@ -11,4 +11,4 @@ fi bazel build //pkg/cmd/bazci --config=ci $(bazel info bazel-bin)/pkg/cmd/bazci/bazci_/bazci --compilation_mode opt \ --config "$1" \ - build //pkg/cmd/cockroach-short + build //pkg/cmd/cockroach-short //c-deps:libgeos diff --git a/build/toolchains/crosstool-ng/BUILD.tmpl b/build/toolchains/crosstool-ng/BUILD.tmpl index 7ad923275e13..487e09f3c8e0 100644 --- a/build/toolchains/crosstool-ng/BUILD.tmpl +++ b/build/toolchains/crosstool-ng/BUILD.tmpl @@ -42,7 +42,7 @@ filegroup( filegroup( name = "linker_files", srcs = [ - "bin/%{target}-gcc", + "bin/%{target}-g++", ], ) diff --git a/build/toolchains/crosstool-ng/cc_toolchain_config.bzl.tmpl b/build/toolchains/crosstool-ng/cc_toolchain_config.bzl.tmpl index bf70eab6b8db..2355086d9f6e 100644 --- a/build/toolchains/crosstool-ng/cc_toolchain_config.bzl.tmpl +++ b/build/toolchains/crosstool-ng/cc_toolchain_config.bzl.tmpl @@ -1,63 +1,43 @@ load("@bazel_tools//tools/build_defs/cc:action_names.bzl", "ACTION_NAMES") load("@bazel_tools//tools/cpp:cc_toolchain_config_lib.bzl", + "action_config", "feature", "flag_group", "flag_set", - "tool_path") + "tool") all_compile_actions = [ ACTION_NAMES.c_compile, ACTION_NAMES.cpp_compile, - ACTION_NAMES.linkstamp_compile, - ACTION_NAMES.assemble, - ACTION_NAMES.preprocess_assemble, - ACTION_NAMES.cpp_header_parsing, - ACTION_NAMES.cpp_module_compile, - ACTION_NAMES.cpp_module_codegen, - ACTION_NAMES.clif_match, - ACTION_NAMES.lto_backend, ] all_link_actions = [ ACTION_NAMES.cpp_link_executable, - ACTION_NAMES.cpp_link_dynamic_library, - ACTION_NAMES.cpp_link_nodeps_dynamic_library, +] + +all_archive_actions = [ + ACTION_NAMES.cpp_link_static_library, ] def _impl(ctx): - tool_paths = [ - tool_path( - name = "gcc", - path = "bin/%{target}-gcc", - ), - tool_path( - name = "ld", - path = "bin/%{target}-ld", - ), - tool_path( - name = "cpp", - path = "bin/%{target}-g++", - ), - tool_path( - name = "gcov", - path = "bin/%{target}-gcov", - ), - tool_path( - name = "nm", - path = "bin/%{target}-nm", + action_configs = [ + action_config( + action_name = ACTION_NAMES.c_compile, + tools = [tool(path="bin/%{target}-gcc")], ), - tool_path( - name = "objdump", - path = "bin/%{target}-objdump", + action_config( + action_name = ACTION_NAMES.cpp_compile, + tools = [tool(path="bin/%{target}-g++")], ), - tool_path( - name = "strip", - path = "bin/%{target}-strip", + action_config( + action_name = ACTION_NAMES.cpp_link_executable, + tools = [tool(path="bin/%{target}-g++")], ), - tool_path( - name = "ar", - path = "bin/%{target}-ar", + action_config( + action_name = ACTION_NAMES.cpp_link_static_library, + tools = [tool(path="bin/%{target}-ar")], ), + ] opt_feature = feature( @@ -95,6 +75,38 @@ def _impl(ctx): supports_pic_feature = feature(name = "supports_pic", enabled = True) supports_dynamic_linker_feature = feature(name = "supports_dynamic_linker", enabled = False) + default_archiver_flags = feature( + name = "archiver_flags", + enabled = True, + flag_sets = [ + flag_set( + actions = all_archive_actions, + flag_groups = [ + flag_group(flags = ["rcsD"]), + flag_group( + flags = ["%{output_execpath}"], + expand_if_available = "output_execpath", + ), + ], + ), + flag_set( + actions = all_archive_actions, + flag_groups = [ + flag_group( + iterate_over = "libraries_to_link", + flag_groups = [ + flag_group( + flags = ["%{libraries_to_link.name}"], + ), + ], + expand_if_available = "libraries_to_link", + ), + ], + ), + ], + ) + + default_compile_flags = feature( name = "default_compile_flags", enabled = True, @@ -137,6 +149,7 @@ def _impl(ctx): supports_dynamic_linker_feature, default_compile_flags, default_linker_flags, + default_archiver_flags, ] return cc_common.create_cc_toolchain_config_info( @@ -150,7 +163,7 @@ def _impl(ctx): compiler = "clang", abi_version = "clang-10.0.0", abi_libc_version = "%{target}", - tool_paths = tool_paths, + action_configs = action_configs, cxx_builtin_include_directories = [ "%sysroot%/usr/include", "%{repo_path}/%{target}/include/c++/6.5.0", diff --git a/c-deps/BUILD.bazel b/c-deps/BUILD.bazel index 0acb956801a5..8e287fbe670d 100644 --- a/c-deps/BUILD.bazel +++ b/c-deps/BUILD.bazel @@ -66,39 +66,47 @@ cmake( visibility = ["//visibility:public"], ) -# TODO(ricky): Still broken on Windows. # Define the targets for libgeos. cmake( name = "libgeos", - cache_entries = { - "CMAKE_BUILD_TYPE": "Release", - "CMAKE_C_FLAGS": "-fPIC", - "CMAKE_CXX_FLAGS": "-fPIC", - }, - # As of this writing (2021-05-05), foreign_cc - # only knows about windows, darwin and linux. - cmake_options = select({ - "@io_bazel_rules_go//go/platform:windows": ["-GNinja"], - "//conditions:default": ["-GUnix Makefiles"], + cache_entries = select({ + "@io_bazel_rules_go//go/platform:windows": { + "CMAKE_BUILD_TYPE": "Release", + "CMAKE_C_FLAGS": "-fPIC", + "CMAKE_CXX_FLAGS": "-fPIC", + "CMAKE_SYSTEM_NAME": "Windows", + }, + "@io_bazel_rules_go//go/platform:darwin": { + "CMAKE_BUILD_TYPE": "Release", + "CMAKE_C_FLAGS": "-fPIC", + "CMAKE_CXX_FLAGS": "-fPIC", + "CMAKE_SYSTEM_NAME": "Darwin", + }, + "//conditions:default": { + "CMAKE_BUILD_TYPE": "Release", + "CMAKE_C_FLAGS": "-fPIC", + "CMAKE_CXX_FLAGS": "-fPIC", + }, }), + cmake_options = ["-GUnix Makefiles"], lib_source = "@geos//:all", make_commands = [ "mkdir -p libgeos/lib", "make --no-print-directory geos_c", ] + select({ "@io_bazel_rules_go//go/platform:darwin": [ - "cp -L $BUILD_TMPDIR/lib/libgeos.dylib libgeos/lib", - "cp -L $BUILD_TMPDIR/lib/libgeos_c.dylib libgeos/lib", + "cp -L lib/libgeos.dylib libgeos/lib", + "cp -L lib/libgeos_c.dylib libgeos/lib", # TODO(#bazel): install_name_tool is also required here for release. ], "@io_bazel_rules_go//go/platform:windows": [ # NOTE: Windows ends up in bin/ on the BUILD_TMPDIR. - "cp -L $BUILD_TMPDIR/bin/libgeos.dll libgeos/lib", - "cp -L $BUILD_TMPDIR/bin/libgeos_c.dll libgeos/lib", + "cp -L bin/libgeos.dll libgeos/lib", + "cp -L bin/libgeos_c.dll libgeos/lib", ], "//conditions:default": [ - "cp -L $BUILD_TMPDIR/lib/libgeos.so libgeos/lib", - "cp -L $BUILD_TMPDIR/lib/libgeos_c.so libgeos/lib", + "cp -L lib/libgeos.so libgeos/lib", + "cp -L lib/libgeos_c.so libgeos/lib", # TODO(#bazel): patchelf is also required here for release. ], }), diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index f5bc0915ef40..cdd166e29b7c 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -4011,7 +4011,7 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) { rnd, _ := randutil.NewPseudoRand() - var maxCheckopointSize int64 + var maxCheckpointSize int64 testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(db) sqlDB.Exec(t, `CREATE TABLE foo(key INT PRIMARY KEY DEFAULT unique_rowid(), val INT)`) @@ -4055,7 +4055,7 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) { changefeedbase.FrontierCheckpointFrequency.Override( context.Background(), &f.Server().ClusterSettings().SV, 10*time.Millisecond) changefeedbase.FrontierCheckpointMaxBytes.Override( - context.Background(), &f.Server().ClusterSettings().SV, maxCheckopointSize) + context.Background(), &f.Server().ClusterSettings().SV, maxCheckpointSize) registry := f.Server().JobRegistry().(*jobs.Registry) foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved='100ms'`) @@ -4138,7 +4138,7 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) { // TODO(ssd): Tenant testing disabled because of use of DB() for _, sz := range []int64{100 << 20, 100} { - maxCheckopointSize = sz + maxCheckpointSize = sz t.Run(fmt.Sprintf("enterprise-limit=%s", humanize.Bytes(uint64(sz))), enterpriseTest(testFn, feedTestNoTenants)) t.Run(fmt.Sprintf("cloudstorage-limit=%s", humanize.Bytes(uint64(sz))), cloudStorageTest(testFn, feedTestNoTenants)) t.Run(fmt.Sprintf("kafka-limit=%s", humanize.Bytes(uint64(sz))), kafkaTest(testFn, feedTestNoTenants)) @@ -4196,3 +4196,35 @@ func TestCheckpointFrequency(t *testing.T) { require.Equal(t, completionTime, js.lastProgressUpdate) require.False(t, js.progressUpdatesSkipped) } + +func TestChangefeedOrderingWithErrors(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) + + foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH updated`) + webhookFoo := foo.(*webhookFeed) + // retry, then fail, then restart changefeed and successfully send messages + webhookFoo.mockSink.SetStatusCodes(append(repeatStatusCode( + http.StatusInternalServerError, + defaultRetryConfig().MaxRetries+1), + []int{http.StatusOK, http.StatusOK, http.StatusOK}...)) + defer closeFeed(t, foo) + + sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a')`) + sqlDB.Exec(t, `UPSERT INTO foo VALUES (1, 'b')`) + sqlDB.Exec(t, `DELETE FROM foo WHERE a = 1`) + assertPayloadsPerKeyOrderedStripTs(t, foo, []string{ + `foo: [1]->{"after": {"a": 1, "b": "a"}}`, + `foo: [1]->{"after": {"a": 1, "b": "b"}}`, + `foo: [1]->{"after": null}`, + }) + } + + // only used for webhook sink for now since it's the only testfeed where + // we can control the ordering of errors + t.Run(`webhook`, webhookTest(testFn)) +} diff --git a/pkg/ccl/changefeedccl/sink_webhook.go b/pkg/ccl/changefeedccl/sink_webhook.go index 388a9f7057cb..7c9f117bf5c7 100644 --- a/pkg/ccl/changefeedccl/sink_webhook.go +++ b/pkg/ccl/changefeedccl/sink_webhook.go @@ -71,13 +71,13 @@ func encodePayloadWebhook(value []byte) ([]byte, error) { } type webhookSink struct { - ctx context.Context + workerCtx context.Context url sinkURL authHeader string parallelism int client *httputil.Client workerGroup ctxgroup.Group - cancelFunc func() + exitWorkers func() eventsChans []chan []byte inflight *inflightTracker retryCfg retry.Options @@ -135,9 +135,9 @@ func makeWebhookSink( ctx, cancel := context.WithCancel(ctx) sink := &webhookSink{ - ctx: ctx, + workerCtx: ctx, authHeader: opts[changefeedbase.OptWebhookAuthHeader], - cancelFunc: cancel, + exitWorkers: cancel, parallelism: parallelism, retryCfg: retryOptions, } @@ -229,7 +229,7 @@ func defaultWorkerCount() int { func (s *webhookSink) setupWorkers() { s.eventsChans = make([]chan []byte, s.parallelism) - s.workerGroup = ctxgroup.WithContext(s.ctx) + s.workerGroup = ctxgroup.WithContext(s.workerCtx) for i := 0; i < s.parallelism; i++ { s.eventsChans[i] = make(chan []byte) j := i @@ -240,19 +240,21 @@ func (s *webhookSink) setupWorkers() { } } -// TODO (ryan min): Address potential ordering issue where errored message can -// be followed by successful messages. Solution is to immediately stop sending -// messages upon receiving a single error. func (s *webhookSink) workerLoop(workerCh chan []byte) { for { select { - case <-s.ctx.Done(): + case <-s.workerCtx.Done(): return case msg := <-workerCh: - err := s.sendMessageWithRetries(s.ctx, msg) + err := s.sendMessageWithRetries(s.workerCtx, msg) s.inflight.maybeSetError(err) // reduce inflight count by one and reduce memory counter - s.inflight.FinishRequest(s.ctx, int64(len(msg))) + s.inflight.FinishRequest(s.workerCtx, int64(len(msg))) + // shut down all other workers immediately if error encountered + if err != nil { + s.exitWorkers() + return + } } } } @@ -347,6 +349,17 @@ func (i *inflightTracker) maybeSetError(err error) { } } +// hasError checks if inflightTracker has an error on the buffer and returns +// error if exists. +func (i *inflightTracker) hasError() error { + var err error + select { + case err = <-i.errChan: + default: + } + return err +} + // StartRequest enqueues one inflight message to be flushed. func (i *inflightTracker) StartRequest(ctx context.Context, bytes int64) error { i.flushMu.Lock() @@ -398,15 +411,23 @@ func (s *webhookSink) EmitRow( return err } + // check if error has been encountered and exit if needed + err = s.inflight.hasError() + if err != nil { + return err + } + err = s.inflight.StartRequest(ctx, int64(len(j))) if err != nil { return err } select { + // check the webhook sink context in case workers have been terminated + case <-s.workerCtx.Done(): + return s.workerCtx.Err() case <-ctx.Done(): return ctx.Err() - // Errors resulting from sending the message will be expressed in Flush. case s.eventsChans[s.workerIndex(key)] <- j: } return nil @@ -420,6 +441,16 @@ func (s *webhookSink) EmitResolvedTimestamp( return err } + select { + // check the webhook sink context in case workers have been terminated + case <-s.workerCtx.Done(): + return s.workerCtx.Err() + // non-blocking check for error, restart changefeed if encountered + case <-s.inflight.errChan: + return err + default: + } + err = s.inflight.StartRequest(ctx, int64(len(j))) if err != nil { return err @@ -431,7 +462,7 @@ func (s *webhookSink) EmitResolvedTimestamp( err = s.sendMessageWithRetries(ctx, j) s.inflight.maybeSetError(err) s.inflight.FinishRequest(ctx, int64(len(j))) - return nil + return err } func (s *webhookSink) Flush(ctx context.Context) error { @@ -439,10 +470,10 @@ func (s *webhookSink) Flush(ctx context.Context) error { } func (s *webhookSink) Close() error { - s.cancelFunc() + s.exitWorkers() // ignore errors here since we're closing the sink anyway _ = s.workerGroup.Wait() - s.inflight.Close(s.ctx) + s.inflight.Close(s.workerCtx) for _, eventsChan := range s.eventsChans { close(eventsChan) } diff --git a/pkg/ccl/changefeedccl/sink_webhook_test.go b/pkg/ccl/changefeedccl/sink_webhook_test.go index 4f0c0ea99c1d..d083f9c049d4 100644 --- a/pkg/ccl/changefeedccl/sink_webhook_test.go +++ b/pkg/ccl/changefeedccl/sink_webhook_test.go @@ -36,7 +36,20 @@ func getGenericWebhookSinkOptions() map[string]string { return opts } -func setupWebhookSinkWithDetails(details jobspb.ChangefeedDetails, parallelism int) (Sink, error) { +// repeatStatusCode returns an array of status codes that the mock +// webhook sink should return for subsequent requests to ensure that an error +// is reached even with the default retry behavior. +func repeatStatusCode(code int, count int) []int { + arr := make([]int, count) + for i := 0; i < count; i++ { + arr[i] = code + } + return arr +} + +func setupWebhookSinkWithDetails( + ctx context.Context, details jobspb.ChangefeedDetails, parallelism int, +) (Sink, error) { u, err := url.Parse(details.SinkURI) if err != nil { return nil, err @@ -59,7 +72,7 @@ func setupWebhookSinkWithDetails(details jobspb.ChangefeedDetails, parallelism i InitialBackoff: 5 * time.Millisecond, } - sinkSrc, err := makeWebhookSink(context.Background(), sinkURL{URL: u}, details.Opts, parallelism, memMon.MakeBoundAccount(), shortRetryCfg) + sinkSrc, err := makeWebhookSink(ctx, sinkURL{URL: u}, details.Opts, parallelism, memMon.MakeBoundAccount(), shortRetryCfg) if err != nil { return nil, err } @@ -132,7 +145,7 @@ func TestWebhookSink(t *testing.T) { Opts: opts, } - sinkSrc, err := setupWebhookSinkWithDetails(details, parallelism) + sinkSrc, err := setupWebhookSinkWithDetails(context.Background(), details, parallelism) require.NoError(t, err) // sink with client accepting server cert should pass @@ -141,33 +154,25 @@ func TestWebhookSink(t *testing.T) { params.Del(changefeedbase.SinkParamCACert) sinkDestHost.RawQuery = params.Encode() details.SinkURI = fmt.Sprintf("webhook-%s", sinkDestHost.String()) - sinkSrcNoCert, err := setupWebhookSinkWithDetails(details, parallelism) + sinkSrcNoCert, err := setupWebhookSinkWithDetails(context.Background(), details, parallelism) require.NoError(t, err) // now sink's client accepts no custom certs, should reject the server's cert and fail require.NoError(t, sinkSrcNoCert.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), hlc.Timestamp{})) - err = sinkSrcNoCert.Flush(context.Background()) - require.EqualError(t, err, fmt.Sprintf(`Post "%s": x509: certificate signed by unknown authority`, sinkDest.URL())) + require.EqualError(t, sinkSrcNoCert.Flush(context.Background()), + fmt.Sprintf(`Post "%s": x509: certificate signed by unknown authority`, sinkDest.URL())) params.Set(changefeedbase.SinkParamSkipTLSVerify, "true") sinkDestHost.RawQuery = params.Encode() details.SinkURI = fmt.Sprintf("webhook-%s", sinkDestHost.String()) - sinkSrcInsecure, err := setupWebhookSinkWithDetails(details, parallelism) + sinkSrcInsecure, err := setupWebhookSinkWithDetails(context.Background(), details, parallelism) require.NoError(t, err) // client should allow unrecognized certs and pass testSendAndReceiveRows(t, sinkSrcInsecure, sinkDest) - // sink should throw an error if a non-2XX status code is returned - sinkDest.SetStatusCodes([]int{http.StatusBadGateway}) - require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), - []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), hlc.Timestamp{})) - - err = sinkSrc.Flush(context.Background()) - require.EqualError(t, err, "502 Bad Gateway: ") - // sink should throw an error if server is unreachable sinkDest.Close() require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), @@ -218,33 +223,31 @@ func TestWebhookSinkWithAuthOptions(t *testing.T) { Opts: opts, } - sinkSrc, err := setupWebhookSinkWithDetails(details, parallelism) + sinkSrc, err := setupWebhookSinkWithDetails(context.Background(), details, parallelism) require.NoError(t, err) testSendAndReceiveRows(t, sinkSrc, sinkDest) // no credentials should result in a 401 delete(opts, changefeedbase.OptWebhookAuthHeader) - sinkSrcNoCreds, err := setupWebhookSinkWithDetails(details, parallelism) + sinkSrcNoCreds, err := setupWebhookSinkWithDetails(context.Background(), details, parallelism) require.NoError(t, err) require.NoError(t, sinkSrcNoCreds.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), hlc.Timestamp{})) - err = sinkSrcNoCreds.Flush(context.Background()) - require.EqualError(t, err, "401 Unauthorized: ") + require.EqualError(t, sinkSrcNoCreds.Flush(context.Background()), "401 Unauthorized: ") // wrong credentials should result in a 401 as well var wrongAuthHeader string cdctest.EncodeBase64ToString([]byte(fmt.Sprintf("%s:%s", username, "wrong-password")), &wrongAuthHeader) opts[changefeedbase.OptWebhookAuthHeader] = fmt.Sprintf("Basic %s", wrongAuthHeader) - sinkSrcWrongCreds, err := setupWebhookSinkWithDetails(details, parallelism) + sinkSrcWrongCreds, err := setupWebhookSinkWithDetails(context.Background(), details, parallelism) require.NoError(t, err) require.NoError(t, sinkSrcWrongCreds.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), hlc.Timestamp{})) - err = sinkSrcWrongCreds.Flush(context.Background()) - require.EqualError(t, err, "401 Unauthorized: ") + require.EqualError(t, sinkSrcWrongCreds.Flush(context.Background()), "401 Unauthorized: ") require.NoError(t, sinkSrc.Close()) require.NoError(t, sinkSrcNoCreds.Close()) @@ -285,7 +288,7 @@ func TestWebhookSinkRetriesRequests(t *testing.T) { Opts: opts, } - sinkSrc, err := setupWebhookSinkWithDetails(details, parallelism) + sinkSrc, err := setupWebhookSinkWithDetails(context.Background(), details, parallelism) require.NoError(t, err) testSendAndReceiveRows(t, sinkSrc, sinkDest) @@ -319,14 +322,13 @@ func TestWebhookSinkRetriesRequests(t *testing.T) { Opts: opts, } - sinkSrc, err := setupWebhookSinkWithDetails(details, parallelism) + sinkSrc, err := setupWebhookSinkWithDetails(context.Background(), details, parallelism) require.NoError(t, err) require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), hlc.Timestamp{})) - err = sinkSrc.Flush(context.Background()) - require.EqualError(t, err, "500 Internal Server Error: ") + require.EqualError(t, sinkSrc.Flush(context.Background()), "500 Internal Server Error: ") // ensure that failures are retried the default maximum number of times // before returning error @@ -342,3 +344,51 @@ func TestWebhookSinkRetriesRequests(t *testing.T) { retryThenFailureFn(i) } } + +func TestWebhookSinkShutsDownOnError(t *testing.T) { + defer leaktest.AfterTest(t)() + + webhookSinkTestfn := func(parallelism int) { + ctx := context.Background() + cert, certEncoded, err := cdctest.NewCACertBase64Encoded() + require.NoError(t, err) + sinkDest, err := cdctest.StartMockWebhookSink(cert) + require.NoError(t, err) + + opts := getGenericWebhookSinkOptions() + + // retry and fail, then return OK + sinkDest.SetStatusCodes(repeatStatusCode(http.StatusInternalServerError, + defaultRetryConfig().MaxRetries+1)) + sinkDestHost, err := url.Parse(sinkDest.URL()) + require.NoError(t, err) + + params := sinkDestHost.Query() + params.Set(changefeedbase.SinkParamCACert, certEncoded) + sinkDestHost.RawQuery = params.Encode() + + details := jobspb.ChangefeedDetails{ + SinkURI: fmt.Sprintf("webhook-%s", sinkDestHost.String()), + Opts: opts, + } + + sinkSrc, err := setupWebhookSinkWithDetails(ctx, details, parallelism) + require.NoError(t, err) + + require.NoError(t, sinkSrc.EmitRow(ctx, nil, []byte("[1001]"), + []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), hlc.Timestamp{})) + // error should be propagated immediately in the next call + require.EqualError(t, sinkSrc.Flush(ctx), "500 Internal Server Error: ") + + // check that no messages have been delivered + require.Equal(t, "", sinkDest.Pop()) + + sinkDest.Close() + require.NoError(t, sinkSrc.Close()) + } + + // run tests with parallelism from 1-16 (1,2,4,8,16) + for i := 1; i <= 16; i *= 2 { + webhookSinkTestfn(i) + } +} diff --git a/pkg/cli/debug_test.go b/pkg/cli/debug_test.go index 4fad2e005ce0..b680922e4226 100644 --- a/pkg/cli/debug_test.go +++ b/pkg/cli/debug_test.go @@ -140,6 +140,7 @@ func TestOpenReadOnlyStore(t *testing.T) { func TestRemoveDeadReplicas(t *testing.T) { defer leaktest.AfterTest(t)() + skip.WithIssue(t, 50977, "flaky test") defer log.Scope(t).Close(t) // This test is pretty slow under race (200+ cpu-seconds) because it diff --git a/pkg/cmd/bazci/bazci.go b/pkg/cmd/bazci/bazci.go index 579d01530b67..1c220b4953d2 100644 --- a/pkg/cmd/bazci/bazci.go +++ b/pkg/cmd/bazci/bazci.go @@ -122,6 +122,8 @@ type buildInfo struct { testlogsDir string // Expanded list of Go binary targets to be built. goBinaries []string + // Expanded list of cmake targets to be built. + cmakeTargets []string // Expanded list of Go test targets to be run. Test suites are split up // into their component tests and all put in this list, so this may be // considerably longer than the argument list. @@ -171,6 +173,8 @@ func getBuildInfo(args parsedArgs) (buildInfo, error) { fullTarget := outputSplit[2] switch targetKind { + case "cmake": + ret.cmakeTargets = append(ret.cmakeTargets, fullTarget) case "go_binary": ret.goBinaries = append(ret.goBinaries, fullTarget) case "go_test": @@ -241,3 +245,12 @@ func usingCrossWindowsConfig() bool { } return false } + +func usingCrossDarwinConfig() bool { + for _, config := range configs { + if config == "crossmacos" { + return true + } + } + return false +} diff --git a/pkg/cmd/bazci/watch.go b/pkg/cmd/bazci/watch.go index 3d4206262b84..1cfbdeb3d45b 100644 --- a/pkg/cmd/bazci/watch.go +++ b/pkg/cmd/bazci/watch.go @@ -12,6 +12,7 @@ package main import ( "bytes" "encoding/xml" + "fmt" "io" "io/ioutil" "log" @@ -20,6 +21,8 @@ import ( "path/filepath" "strings" "time" + + "github.com/cockroachdb/errors" ) // SourceDir is an enumeration of possible output locations. @@ -206,6 +209,32 @@ func (w watcher) stageBinaryArtifacts() error { return err } } + for _, bin := range w.info.cmakeTargets { + // These targets don't have stable, predictable locations, so + // they have to be hardcoded. + var ext string + if usingCrossWindowsConfig() { + ext = "dll" + } else if usingCrossDarwinConfig() { + ext = "dylib" + } else { + ext = "so" + } + switch bin { + case "//c-deps:libgeos": + for _, relBinPath := range []string{ + fmt.Sprintf("c-deps/libgeos/lib/libgeos_c.%s", ext), + fmt.Sprintf("c-deps/libgeos/lib/libgeos.%s", ext), + } { + err := w.maybeStageArtifact(binSourceDir, relBinPath, 0666, finalizePhase, copyContentTo) + if err != nil { + return err + } + } + default: + return errors.Newf("Unrecognized cmake target %s", bin) + } + } return nil } diff --git a/pkg/cmd/dev/build.go b/pkg/cmd/dev/build.go index 077d1af02c24..42ab7f40ffe7 100644 --- a/pkg/cmd/dev/build.go +++ b/pkg/cmd/dev/build.go @@ -13,6 +13,7 @@ package main import ( "context" "fmt" + "log" "path" "path/filepath" "strings" @@ -22,20 +23,30 @@ import ( "github.com/spf13/cobra" ) +const crossFlag = "cross" + // makeBuildCmd constructs the subcommand used to build the specified binaries. func makeBuildCmd(runE func(cmd *cobra.Command, args []string) error) *cobra.Command { - return &cobra.Command{ + buildCmd := &cobra.Command{ Use: "build ", Short: "Build the specified binaries", Long: "Build the specified binaries.", // TODO(irfansharif): Flesh out the example usage patterns. Example: ` - dev build cockroach --tags=deadlock + dev build cockroach dev build cockroach-{short,oss} dev build {opt,exec}gen`, Args: cobra.MinimumNArgs(0), RunE: runE, } + buildCmd.Flags().String(volumeFlag, "bzlcache", "the Docker volume to use as the Bazel cache (only used for cross builds)") + buildCmd.Flags().String(crossFlag, "", ` + Turns on cross-compilation. Builds the binary using the builder image w/ Docker. + You can optionally set a config, as in --cross=windows. + Defaults to linux if not specified. The config should be the name of a + build configuration specified in .bazelrc, minus the "cross" prefix.`) + buildCmd.Flags().Lookup(crossFlag).NoOptDefVal = "linux" + return buildCmd } // TODO(irfansharif): Add grouping shorthands like "all" or "bins", etc. @@ -59,49 +70,50 @@ var buildTargetMapping = map[string]string{ "roachtest": "//pkg/cmd/roachtest", } -func (d *dev) build(cmd *cobra.Command, targets []string) (err error) { +func (d *dev) build(cmd *cobra.Command, targets []string) error { ctx := cmd.Context() + cross := mustGetFlagString(cmd, crossFlag) - if len(targets) == 0 { - // Default to building the cockroach binary. - targets = append(targets, "cockroach") - } - - var args []string - args = append(args, "build") - args = append(args, "--color=yes") - // Don't let bazel generate any convenience symlinks, we'll create them - // ourself. - args = append(args, "--experimental_convenience_symlinks=ignore") - args = append(args, getConfigFlags()...) - args = append(args, mustGetRemoteCacheArgs(remoteCacheAddr)...) - if numCPUs != 0 { - args = append(args, fmt.Sprintf("--local_cpu_resources=%d", numCPUs)) + args, fullTargets, err := getBasicBuildArgs(targets) + if err != nil { + return err } - var fullTargets []string - for _, target := range targets { - // Assume that targets beginning with `//` or containing `/` - // don't need to be munged. - if strings.HasPrefix(target, "//") || strings.Contains(target, "/") { - args = append(args, target) - fullTargets = append(fullTargets, target) - continue - } - buildTarget, ok := buildTargetMapping[target] - if !ok { - return errors.Newf("unrecognized target: %s", target) + if cross == "" { + args = append(args, getConfigFlags()...) + if err := d.exec.CommandContextNoRecord(ctx, "bazel", args...); err != nil { + return err } - - fullTargets = append(fullTargets, buildTarget) - args = append(args, buildTarget) + return d.symlinkBinaries(ctx, fullTargets) } - - if err := d.exec.CommandContextNoRecord(ctx, "bazel", args...); err != nil { + // Cross-compilation case. + cross = "cross" + cross + volume := mustGetFlagString(cmd, volumeFlag) + args = append(args, fmt.Sprintf("--config=%s", cross)) + dockerArgs, err := d.getDockerRunArgs(ctx, volume, false) + if err != nil { return err } - - return d.symlinkBinaries(ctx, fullTargets) + // Construct a script that builds the binaries and copies them + // to the appropriate location in /artifacts. + var script strings.Builder + script.WriteString("set -euxo pipefail\n") + // TODO(ricky): Actually, we need to shell-quote the arguments, + // but that's hard and I don't think it's necessary for now. + script.WriteString(fmt.Sprintf("bazel %s\n", strings.Join(args, " "))) + script.WriteString(fmt.Sprintf("BAZELBIN=`bazel info bazel-bin --color=no --config=%s`\n", cross)) + for _, target := range fullTargets { + script.WriteString(fmt.Sprintf("cp $BAZELBIN/%s /artifacts\n", targetToRelativeBinPath(target))) + script.WriteString(fmt.Sprintf("chmod +w /artifacts/%s\n", targetToBinBasename(target))) + } + _, err = d.exec.CommandContextWithInput(ctx, script.String(), "docker", dockerArgs...) + if err != nil { + return err + } + for _, target := range fullTargets { + logSuccessfulBuild(target, filepath.Join("artifacts", targetToBinBasename(target))) + } + return nil } func (d *dev) symlinkBinaries(ctx context.Context, targets []string) error { @@ -119,8 +131,7 @@ func (d *dev) symlinkBinaries(ctx context.Context, targets []string) error { if err != nil { return err } - base := filepath.Base(strings.TrimPrefix(target, "//")) - + base := targetToBinBasename(target) var symlinkPath string // Binaries beginning with the string "cockroach" go right at // the top of the workspace; others go in the `bin` directory. @@ -137,19 +148,21 @@ func (d *dev) symlinkBinaries(ctx context.Context, targets []string) error { if err := d.os.Symlink(binaryPath, symlinkPath); err != nil { return err } + rel, err := filepath.Rel(workspace, symlinkPath) + if err != nil { + rel = symlinkPath + } + logSuccessfulBuild(target, rel) } return nil } -func (d *dev) getPathToBin(ctx context.Context, target string) (string, error) { - args := []string{"info", "bazel-bin", "--color=no"} - args = append(args, getConfigFlags()...) - out, err := d.exec.CommandContextSilent(ctx, "bazel", args...) - if err != nil { - return "", err - } - bazelBin := strings.TrimSpace(string(out)) +// targetToRelativeBinPath returns the path of the binary produced by this build +// target relative to bazel-bin. That is, +// filepath.Join(bazelBin, targetToRelativeBinPath(target)) is the absolute +// path to the build binary for the target. +func targetToRelativeBinPath(target string) string { var head string if strings.HasPrefix(target, "@") { doubleSlash := strings.Index(target, "//") @@ -164,5 +177,73 @@ func (d *dev) getPathToBin(ctx context.Context, target string) (string, error) { } else { bin = target[strings.LastIndex(target, "/")+1:] } - return filepath.Join(bazelBin, head, bin+"_", bin), nil + return filepath.Join(head, bin+"_", bin) +} + +func targetToBinBasename(target string) string { + base := filepath.Base(strings.TrimPrefix(target, "//")) + // If there's a colon, the actual name of the executable is + // after it. + colon := strings.LastIndex(base, ":") + if colon >= 0 { + base = base[colon+1:] + } + return base +} + +func (d *dev) getPathToBin(ctx context.Context, target string) (string, error) { + args := []string{"info", "bazel-bin", "--color=no"} + args = append(args, getConfigFlags()...) + out, err := d.exec.CommandContextSilent(ctx, "bazel", args...) + if err != nil { + return "", err + } + bazelBin := strings.TrimSpace(string(out)) + rel := targetToRelativeBinPath(target) + return filepath.Join(bazelBin, rel), nil +} + +// getBasicBuildArgs is for enumerating the arguments to pass to `bazel` in +// order to build the given high-level targets. +// The first string slice returned is the list of arguments (i.e. to pass to +// `CommandContext`), and the second is the full list of targets to be built +// (e.g. after translation, so short -> "//pkg/cmd/cockroach-short"). +func getBasicBuildArgs(targets []string) (args, fullTargets []string, err error) { + if len(targets) == 0 { + // Default to building the cockroach binary. + targets = append(targets, "cockroach") + } + + args = append(args, "build") + args = append(args, "--color=yes") + // Don't let bazel generate any convenience symlinks, we'll create them + // ourself. + args = append(args, "--experimental_convenience_symlinks=ignore") + args = append(args, mustGetRemoteCacheArgs(remoteCacheAddr)...) + if numCPUs != 0 { + args = append(args, fmt.Sprintf("--local_cpu_resources=%d", numCPUs)) + } + + for _, target := range targets { + // Assume that targets beginning with `//` or containing `/` + // don't need to be munged. + if strings.HasPrefix(target, "//") || strings.Contains(target, "/") { + args = append(args, target) + fullTargets = append(fullTargets, target) + continue + } + buildTarget, ok := buildTargetMapping[target] + if !ok { + err = errors.Newf("unrecognized target: %s", target) + return + } + + fullTargets = append(fullTargets, buildTarget) + args = append(args, buildTarget) + } + return +} + +func logSuccessfulBuild(target, rel string) { + log.Printf("Successfully built binary for target %s at %s", target, rel) } diff --git a/pkg/cmd/dev/builder.go b/pkg/cmd/dev/builder.go index 08b96bbcecb1..7010f51a3f6f 100644 --- a/pkg/cmd/dev/builder.go +++ b/pkg/cmd/dev/builder.go @@ -11,8 +11,8 @@ package main import ( + "context" "log" - "os/exec" "path/filepath" "strings" @@ -38,15 +38,15 @@ func makeBuilderCmd(runE func(cmd *cobra.Command, args []string) error) *cobra.C func (d *dev) builder(cmd *cobra.Command, _ []string) error { ctx := cmd.Context() - volume := mustGetFlagString(cmd, volumeFlag) - if !isTesting { - if _, err := exec.LookPath("docker"); err != nil { - return errors.New("Could not find docker in PATH") - } + args, err := d.getDockerRunArgs(ctx, volume, true) + if err != nil { + return err } + return d.exec.CommandContextNoRecord(ctx, "docker", args...) +} - // Ensure the volume to use exists. +func (d *dev) ensureDockerVolume(ctx context.Context, volume string) error { _, err := d.exec.CommandContextSilent(ctx, "docker", "volume", "inspect", volume) if err != nil { log.Printf("Creating volume %s with Docker...", volume) @@ -55,19 +55,38 @@ func (d *dev) builder(cmd *cobra.Command, _ []string) error { return err } } + return nil +} - var args []string - args = append(args, "run", "--rm", "-it") +func (d *dev) getDockerRunArgs( + ctx context.Context, volume string, tty bool, +) (args []string, err error) { + err = ensureBinaryInPath("docker") + if err != nil { + return + } + err = d.ensureDockerVolume(ctx, volume) + if err != nil { + return + } + + args = append(args, "run", "--rm") + if tty { + args = append(args, "-it") + } else { + args = append(args, "-i") + } workspace, err := d.getWorkspace(ctx) if err != nil { - return err + return } args = append(args, "-v", workspace+":/cockroach:ro") args = append(args, "--workdir=/cockroach") // Create the artifacts directory. artifacts := filepath.Join(workspace, "artifacts") - if err = d.os.MkdirAll(artifacts); err != nil { - return err + err = d.os.MkdirAll(artifacts) + if err != nil { + return } args = append(args, "-v", artifacts+":/artifacts") // The `delegated` switch ensures that the container's view of the cache @@ -77,7 +96,7 @@ func (d *dev) builder(cmd *cobra.Command, _ []string) error { // Read the Docker image from build/teamcity-bazel-support.sh. buf, err := d.os.ReadFile(filepath.Join(workspace, "build/teamcity-bazel-support.sh")) if err != nil { - return err + return } var bazelImage string for _, line := range strings.Split(buf, "\n") { @@ -86,9 +105,9 @@ func (d *dev) builder(cmd *cobra.Command, _ []string) error { } } if bazelImage == "" { - return errors.New("Could not find BAZEL_IMAGE in build/teamcity-bazel-support.sh") + err = errors.New("Could not find BAZEL_IMAGE in build/teamcity-bazel-support.sh") + return } args = append(args, bazelImage) - - return d.exec.CommandContextNoRecord(ctx, "docker", args...) + return } diff --git a/pkg/cmd/dev/io/exec/exec.go b/pkg/cmd/dev/io/exec/exec.go index 7a77bf2a6da1..49636a376d07 100644 --- a/pkg/cmd/dev/io/exec/exec.go +++ b/pkg/cmd/dev/io/exec/exec.go @@ -100,7 +100,7 @@ func WithWorkingDir(dir string) func(e *Exec) { // CommandContext wraps around exec.CommandContext, executing the named program // with the given arguments. func (e *Exec) CommandContext(ctx context.Context, name string, args ...string) ([]byte, error) { - return e.commandContextImpl(ctx, false, name, args...) + return e.commandContextImpl(ctx, nil, false, name, args...) } // CommandContextSilent is like CommandContext, but does not take over @@ -108,7 +108,16 @@ func (e *Exec) CommandContext(ctx context.Context, name string, args ...string) func (e *Exec) CommandContextSilent( ctx context.Context, name string, args ...string, ) ([]byte, error) { - return e.commandContextImpl(ctx, true, name, args...) + return e.commandContextImpl(ctx, nil, true, name, args...) +} + +// CommandContextWithInput is like CommandContext, but stdin is piped from an +// in-memory string. +func (e *Exec) CommandContextWithInput( + ctx context.Context, stdin, name string, args ...string, +) ([]byte, error) { + r := strings.NewReader(stdin) + return e.commandContextImpl(ctx, r, false, name, args...) } // CommandContextNoRecord is like CommandContext, but doesn't capture stdout. @@ -150,7 +159,7 @@ func (e *Exec) CommandContextNoRecord(ctx context.Context, name string, args ... } func (e *Exec) commandContextImpl( - ctx context.Context, silent bool, name string, args ...string, + ctx context.Context, stdin io.Reader, silent bool, name string, args ...string, ) ([]byte, error) { var command string if len(args) > 0 { @@ -171,6 +180,9 @@ func (e *Exec) commandContextImpl( cmd.Stdout = io.MultiWriter(e.stdout, &buffer) cmd.Stderr = e.stderr } + if stdin != nil { + cmd.Stdin = stdin + } cmd.Dir = e.dir if err := cmd.Start(); err != nil { diff --git a/pkg/cmd/dev/testdata/build.txt b/pkg/cmd/dev/testdata/build.txt index 47eafd4efb9f..ea6fbc54a53f 100644 --- a/pkg/cmd/dev/testdata/build.txt +++ b/pkg/cmd/dev/testdata/build.txt @@ -1,6 +1,6 @@ dev build cockroach-short ---- -bazel build --color=yes --experimental_convenience_symlinks=ignore --config=dev //pkg/cmd/cockroach-short +bazel build --color=yes --experimental_convenience_symlinks=ignore //pkg/cmd/cockroach-short --config=dev bazel info workspace --color=no --config=dev mkdir go/src/github.com/cockroachdb/cockroach/bin bazel info bazel-bin --color=no --config=dev @@ -9,7 +9,7 @@ ln -s /private/var/tmp/_bazel/99e666e4e674209ecdb66b46371278df/execroot/cockroac dev build cockroach-short --cpus=12 ---- -bazel build --color=yes --experimental_convenience_symlinks=ignore --config=dev --local_cpu_resources=12 //pkg/cmd/cockroach-short +bazel build --color=yes --experimental_convenience_symlinks=ignore --local_cpu_resources=12 //pkg/cmd/cockroach-short --config=dev bazel info workspace --color=no --config=dev mkdir go/src/github.com/cockroachdb/cockroach/bin bazel info bazel-bin --color=no --config=dev @@ -18,7 +18,7 @@ ln -s /private/var/tmp/_bazel/99e666e4e674209ecdb66b46371278df/execroot/cockroac dev build --debug cockroach-short ---- -bazel build --color=yes --experimental_convenience_symlinks=ignore --config=dev //pkg/cmd/cockroach-short +bazel build --color=yes --experimental_convenience_symlinks=ignore //pkg/cmd/cockroach-short --config=dev bazel info workspace --color=no --config=dev mkdir go/src/github.com/cockroachdb/cockroach/bin bazel info bazel-bin --color=no --config=dev @@ -27,7 +27,7 @@ ln -s /private/var/tmp/_bazel/99e666e4e674209ecdb66b46371278df/execroot/cockroac dev build cockroach-short --remote-cache 127.0.0.1:9090 ---- -bazel build --color=yes --experimental_convenience_symlinks=ignore --config=dev --remote_local_fallback --remote_cache=grpc://127.0.0.1:9090 --experimental_remote_downloader=grpc://127.0.0.1:9090 //pkg/cmd/cockroach-short +bazel build --color=yes --experimental_convenience_symlinks=ignore --remote_local_fallback --remote_cache=grpc://127.0.0.1:9090 --experimental_remote_downloader=grpc://127.0.0.1:9090 //pkg/cmd/cockroach-short --config=dev bazel info workspace --color=no --config=dev mkdir go/src/github.com/cockroachdb/cockroach/bin bazel info bazel-bin --color=no --config=dev diff --git a/pkg/cmd/dev/testdata/recording/build.txt b/pkg/cmd/dev/testdata/recording/build.txt index 6dbdf50dae8e..bc46b05e7225 100644 --- a/pkg/cmd/dev/testdata/recording/build.txt +++ b/pkg/cmd/dev/testdata/recording/build.txt @@ -1,4 +1,4 @@ -bazel build --color=yes --experimental_convenience_symlinks=ignore --config=dev //pkg/cmd/cockroach-short +bazel build --color=yes --experimental_convenience_symlinks=ignore //pkg/cmd/cockroach-short --config=dev ---- bazel info workspace --color=no --config=dev @@ -18,7 +18,7 @@ rm go/src/github.com/cockroachdb/cockroach/cockroach-short ln -s /private/var/tmp/_bazel/99e666e4e674209ecdb66b46371278df/execroot/cockroach/bazel-out/darwin-fastbuild/bin/pkg/cmd/cockroach-short/cockroach-short_/cockroach-short go/src/github.com/cockroachdb/cockroach/cockroach-short ---- -bazel build --color=yes --experimental_convenience_symlinks=ignore --config=dev --local_cpu_resources=12 //pkg/cmd/cockroach-short +bazel build --color=yes --experimental_convenience_symlinks=ignore --local_cpu_resources=12 //pkg/cmd/cockroach-short --config=dev ---- bazel info workspace --color=no --config=dev @@ -38,7 +38,7 @@ rm go/src/github.com/cockroachdb/cockroach/cockroach-short ln -s /private/var/tmp/_bazel/99e666e4e674209ecdb66b46371278df/execroot/cockroach/bazel-out/darwin-fastbuild/bin/pkg/cmd/cockroach-short/cockroach-short_/cockroach-short go/src/github.com/cockroachdb/cockroach/cockroach-short ---- -bazel build --color=yes --experimental_convenience_symlinks=ignore --config=dev //pkg/cmd/cockroach-short +bazel build --color=yes --experimental_convenience_symlinks=ignore //pkg/cmd/cockroach-short --config=dev ---- bazel info workspace --color=no --config=dev @@ -58,7 +58,7 @@ rm go/src/github.com/cockroachdb/cockroach/cockroach-short ln -s /private/var/tmp/_bazel/99e666e4e674209ecdb66b46371278df/execroot/cockroach/bazel-out/darwin-fastbuild/bin/pkg/cmd/cockroach-short/cockroach-short_/cockroach-short go/src/github.com/cockroachdb/cockroach/cockroach-short ---- -bazel build --color=yes --experimental_convenience_symlinks=ignore --config=dev --remote_local_fallback --remote_cache=grpc://127.0.0.1:9090 --experimental_remote_downloader=grpc://127.0.0.1:9090 //pkg/cmd/cockroach-short +bazel build --color=yes --experimental_convenience_symlinks=ignore --remote_local_fallback --remote_cache=grpc://127.0.0.1:9090 --experimental_remote_downloader=grpc://127.0.0.1:9090 //pkg/cmd/cockroach-short --config=dev ---- bazel info workspace --color=no --config=dev diff --git a/pkg/cmd/dev/util.go b/pkg/cmd/dev/util.go index 606d95e78016..d02e1e848747 100644 --- a/pkg/cmd/dev/util.go +++ b/pkg/cmd/dev/util.go @@ -15,6 +15,7 @@ import ( "fmt" "log" "net" + "os/exec" "runtime" "strings" "time" @@ -23,7 +24,7 @@ import ( "github.com/spf13/cobra" ) -// To be turned on for tests. +// To be turned on for tests. Turns off some deeper checks for reproducibility. var isTesting bool func mustGetFlagString(cmd *cobra.Command, name string) string { @@ -103,3 +104,12 @@ func addCommonTestFlags(cmd *cobra.Command) { cmd.Flags().StringP(filterFlag, "f", "", "run unit tests matching this regex") cmd.Flags().Duration(timeoutFlag, 0*time.Minute, "timeout for test") } + +func ensureBinaryInPath(bin string) error { + if !isTesting { + if _, err := exec.LookPath(bin); err != nil { + return errors.Newf("Could not find %s in PATH", bin) + } + } + return nil +} diff --git a/pkg/cmd/roachtest/test_impl.go b/pkg/cmd/roachtest/test_impl.go index 3dd41a263644..5943a4b8f462 100644 --- a/pkg/cmd/roachtest/test_impl.go +++ b/pkg/cmd/roachtest/test_impl.go @@ -69,8 +69,9 @@ type testImpl struct { mu struct { syncutil.RWMutex - done bool - failed bool + done bool + failed bool + timeout bool // if failed == true, this indicates whether the test timed out // cancel, if set, is called from the t.Fatal() family of functions when the // test is being marked as failed (i.e. when the failed field above is also // set). This is used to cancel the context passed to t.spec.Run(), so async @@ -96,6 +97,18 @@ type testImpl struct { versionsBinaryOverride map[string]string } +func (t *testImpl) timedOut() bool { + t.mu.Lock() + defer t.mu.Unlock() + return t.mu.timeout +} + +func (t *testImpl) setTimedOut() { + t.mu.Lock() + defer t.mu.Unlock() + t.mu.timeout = true +} + // BuildVersion exposes the build version of the cluster // in this test. func (t *testImpl) BuildVersion() *version.Version { diff --git a/pkg/cmd/roachtest/test_runner.go b/pkg/cmd/roachtest/test_runner.go index cd20583f1f23..0d981b8cfd39 100644 --- a/pkg/cmd/roachtest/test_runner.go +++ b/pkg/cmd/roachtest/test_runner.go @@ -665,7 +665,12 @@ func (r *testRunner) runTest( } shout(ctx, l, stdout, "--- FAIL: %s (%s)\n%s", t.Name(), durationStr, output) - r.maybePostGithubIssue(ctx, l, t, stdout, output) + + issueOutput := output + if t.timedOut() { + issueOutput = "test timed out (see artifacts for details)" + } + r.maybePostGithubIssue(ctx, l, t, stdout, issueOutput) } else { shout(ctx, l, stdout, "--- PASS: %s (%s)", t.Name(), durationStr) // If `##teamcity[testFailed ...]` is not present before `##teamCity[testFinished ...]`, @@ -824,6 +829,7 @@ func (r *testRunner) runTest( // reused since we have a runaway test goroutine that's presumably going // to continue using the cluster. t.printfAndFail(0 /* skip */, "test timed out (%s)", timeout) + t.setTimedOut() select { case <-done: if success { diff --git a/pkg/cmd/roachtest/tests/replicagc.go b/pkg/cmd/roachtest/tests/replicagc.go index 4cab298568de..c2694509693d 100644 --- a/pkg/cmd/roachtest/tests/replicagc.go +++ b/pkg/cmd/roachtest/tests/replicagc.go @@ -21,7 +21,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" ) func registerReplicaGC(r registry.Registry) { @@ -78,7 +80,7 @@ func runReplicaGCChangedPeers( // Start three new nodes that will take over all data. c.Start(ctx, args, c.Range(4, 6)) - // Recommission n1-3, with n3 in absentia, moving the replicas to n4-6. + // Decommission n1-3, with n3 in absentia, moving the replicas to n4-6. if err := h.decommission(ctx, c.Range(1, 3), 2, "--wait=none"); err != nil { t.Fatal(err) } @@ -89,6 +91,16 @@ func runReplicaGCChangedPeers( t.Status("waiting for zero replicas on n2") h.waitForZeroReplicas(ctx, 2) + // Wait for the replica count on n3 to also drop to zero. This makes the test + // "test more" but also it prevents the test from failing spuriously, as later + // in the test any system ranges still on n3 would have a replication factor + // of five applied to them, and they would be unable to move off n3 as n1 and + // n2 will be down at that point. For details, see: + // + // https://github.com/cockroachdb/cockroach/issues/67910#issuecomment-884856356 + t.Status("waiting for zero replicas on n3") + waitForZeroReplicasOnN3(ctx, t, c.Conn(ctx, 1)) + // Stop the remaining two old nodes, no replicas remaining there. c.Stop(ctx, c.Range(1, 2)) @@ -248,3 +260,30 @@ func (h *replicagcTestHelper) isolateDeadNodes(ctx context.Context, runNode int) } } } + +func waitForZeroReplicasOnN3(ctx context.Context, t test.Test, db *gosql.DB) { + if err := retry.ForDuration(5*time.Minute, func() error { + const q = `select range_id, replicas from crdb_internal.ranges_no_leases where replicas @> ARRAY[3];` + rows, err := db.QueryContext(ctx, q) + if err != nil { + return err + } + m := make(map[int64]string) + for rows.Next() { + var rangeID int64 + var replicas string + if err := rows.Scan(&rangeID, replicas); err != nil { + return err + } + } + if err := rows.Err(); err != nil { + return err + } + if len(m) == 0 { + return nil + } + return errors.Errorf("ranges remained on n3 (according to meta2): %+v", m) + }); err != nil { + t.Fatal(err) + } +} diff --git a/pkg/sql/logictest/testdata/logic_test/expression_index b/pkg/sql/logictest/testdata/logic_test/expression_index index 9efb707a782a..7a45923d7769 100644 --- a/pkg/sql/logictest/testdata/logic_test/expression_index +++ b/pkg/sql/logictest/testdata/logic_test/expression_index @@ -875,3 +875,35 @@ SELECT i, j FROM inv@i_plus_100_j_a WHERE i+100 = 101 AND j->'a' @> '"x"' ORDER 1 {"a": "x"} 1 {"a": ["x", "y", "z"]} 1 {"a": [1, "x"]} + +# Unique expression indexes. + +statement ok +CREATE TABLE uniq ( + k INT PRIMARY KEY, + a INT, + b INT, + UNIQUE INDEX ((a + b)) +) + +statement ok +INSERT INTO uniq VALUES (1, 10, 100), (2, 20, 200) + +statement error duplicate key value violates unique constraint \"uniq_idx\" +CREATE UNIQUE INDEX uniq_idx ON uniq ((a > 0)) + +statement error duplicate key value violates unique constraint \"uniq_expr_key\"\nDETAIL: Key \(a \+ b\)=\(110\) already exists +INSERT INTO uniq VALUES (3, 1, 109) + +statement ok +INSERT INTO uniq VALUES (3, 1, 109) ON CONFLICT DO NOTHING + +# Expressions as ON CONFLICT targets are not yet allowed. +# See https://github.com/cockroachdb/cockroach/issues/67893. +statement error syntax error +INSERT INTO uniq VALUES (3, 1, 109) ON CONFLICT ((a + b)) DO NOTHING + +# Expressions as ON CONFLICT targets are not yet allowed. +# See https://github.com/cockroachdb/cockroach/issues/67893. +statement error syntax error +INSERT INTO uniq VALUES (4, 1, 219) ON CONFLICT ((a + b)) DO UPDATE SET b = 90 diff --git a/pkg/sql/opt/optbuilder/mutation_builder_arbiter.go b/pkg/sql/opt/optbuilder/mutation_builder_arbiter.go index 10c68646f82c..f21f3ef9d15c 100644 --- a/pkg/sql/opt/optbuilder/mutation_builder_arbiter.go +++ b/pkg/sql/opt/optbuilder/mutation_builder_arbiter.go @@ -544,7 +544,7 @@ func (h *arbiterPredicateHelper) partialUniqueConstraintPredicate( // arbiterFilters returns a scalar expression representing the arbiter // predicate. If the arbiter predicate contains non-immutable operators, -// ok=true is returned. +// ok=false is returned. func (h *arbiterPredicateHelper) arbiterFilters() (_ memo.FiltersExpr, ok bool) { // The filters have been initialized if they are non-nil or // invalidArbiterPredicate has been set to true. diff --git a/pkg/sql/row/errors.go b/pkg/sql/row/errors.go index 9af6a11a6264..7d19a6b555c3 100644 --- a/pkg/sql/row/errors.go +++ b/pkg/sql/row/errors.go @@ -243,7 +243,11 @@ func DecodeRowInfo( names := make([]string, len(cols)) values := make([]string, len(cols)) for i := range cols { - names[i] = cols[i].GetName() + if cols[i].IsExpressionIndexColumn() { + names[i] = cols[i].GetComputeExpr() + } else { + names[i] = cols[i].GetName() + } if datums[i] == tree.DNull { continue }