diff --git a/dev b/dev index b6161ef7bf6f..796d134d2972 100755 --- a/dev +++ b/dev @@ -3,7 +3,7 @@ set -euo pipefail # Bump this counter to force rebuilding `dev` on all machines. -DEV_VERSION=39 +DEV_VERSION=40 THIS_DIR=$(cd "$(dirname "$0")" && pwd) BINARY_DIR=$THIS_DIR/bin/dev-versions diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index 4fc803389f89..b40d956b7a62 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" ) @@ -191,6 +192,9 @@ func (s *streamIngestionResumer) Resume(resumeCtx context.Context, execCtx inter func revertToCutoverTimestamp( ctx context.Context, execCtx interface{}, ingestionJobID jobspb.JobID, ) error { + ctx, span := tracing.ChildSpan(ctx, "streamingest.revertToCutoverTimestamp") + defer span.Finish() + p := execCtx.(sql.JobExecContext) db := p.ExecCfg().DB j, err := p.ExecCfg().JobRegistry.LoadJob(ctx, ingestionJobID) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go index 259fab7f24e8..982b2b0efa54 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go @@ -45,9 +45,10 @@ import ( ) func verifyIngestionStats(t *testing.T, streamID int64, cutoverTime time.Time, stats string) { - fetchValueKey := func(j json2.JSON, key string) json2.JSON { + fetchRequiredValueKey := func(j json2.JSON, key string) json2.JSON { val, err := j.FetchValKey(key) require.NoError(t, err) + require.NotNilf(t, val, "expected key %q to in json %q", key, j) return val } @@ -60,26 +61,25 @@ func verifyIngestionStats(t *testing.T, streamID int64, cutoverTime time.Time, s statsJSON, err := json2.ParseJSON(stats) require.NoError(t, err) - ingestionProgress := fetchValueKey(statsJSON, "ingestion_progress") + ingestionProgress := fetchRequiredValueKey(statsJSON, "ingestion_progress") require.Equal(t, cutoverTime.UnixNano(), - parseInt64(fetchValueKey(fetchValueKey(ingestionProgress, "cutover_time"), "wall_time").String())) + parseInt64(fetchRequiredValueKey(fetchRequiredValueKey(ingestionProgress, "cutover_time"), "wall_time").String())) - partitionProgressIter, err := fetchValueKey(ingestionProgress, "partition_progress").ObjectIter() + partitionProgressIter, err := fetchRequiredValueKey(ingestionProgress, "partition_progress").ObjectIter() require.NoError(t, err) for partitionProgressIter.Next() { - require.Less(t, cutoverTime.UnixNano(), parseInt64(fetchValueKey(fetchValueKey( + require.Less(t, cutoverTime.UnixNano(), parseInt64(fetchRequiredValueKey(fetchRequiredValueKey( partitionProgressIter.Value(), "ingested_timestamp"), "wall_time").String())) } require.Equal(t, strconv.Itoa(int(streampb.StreamReplicationStatus_STREAM_INACTIVE)), - fetchValueKey(fetchValueKey(statsJSON, "stream_replication_status"), "stream_status").String()) + fetchRequiredValueKey(fetchRequiredValueKey(statsJSON, "producer_status"), "stream_status").String()) } // TestTenantStreaming tests that tenants can stream changes end-to-end. func TestTenantStreaming(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.WithIssue(t, 83697) skip.UnderRace(t, "slow under race") @@ -120,6 +120,7 @@ SET CLUSTER SETTING kv.rangefeed.enabled = true; SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s'; SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms'; SET CLUSTER SETTING stream_replication.min_checkpoint_frequency = '1s'; +SET CLUSTER SETTING stream_replication.stream_liveness_track_frequency = '500ms'; `, ";")...) @@ -129,14 +130,19 @@ SET CLUSTER SETTING stream_replication.min_checkpoint_frequency = '1s'; // is required. Tracked with #76378. // TODO(ajstorm): This may be the right course of action here as the // replication is now being run inside a tenant. - base.TestServerArgs{DisableDefaultTestTenant: true}, + base.TestServerArgs{ + DisableDefaultTestTenant: true, + Knobs: base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + }, + }, roachpb.MakeTenantID(20)) defer cleanupDest() // destSQL refers to the system tenant as that's the one that's running the // job. destSQL := hDest.SysDB destSQL.ExecMultiple(t, strings.Split(` -SET CLUSTER SETTING stream_replication.consumer_heartbeat_frequency = '2s'; +SET CLUSTER SETTING stream_replication.consumer_heartbeat_frequency = '100ms'; SET CLUSTER SETTING bulkio.stream_ingestion.minimum_flush_interval = '500ms'; SET CLUSTER SETTING bulkio.stream_ingestion.cutover_signal_poll_interval = '100ms'; SET CLUSTER SETTING streaming.partition_progress_frequency = '100ms'; diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index 80c76c76966c..4ff778c33684 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -591,7 +591,7 @@ func (sip *streamIngestionProcessor) bufferKV(kv *roachpb.KeyValue) error { } func (sip *streamIngestionProcessor) bufferCheckpoint(event partitionEvent) error { - log.Infof(sip.Ctx, "got checkpoint %v", event.GetResolved()) + log.VInfof(sip.Ctx, 3, "got checkpoint %v", event.GetResolved()) resolvedTimePtr := event.GetResolved() if resolvedTimePtr == nil { return errors.New("checkpoint event expected to have a resolved timestamp") diff --git a/pkg/ccl/streamingccl/streamproducer/event_stream.go b/pkg/ccl/streamingccl/streamproducer/event_stream.go index fda74cf22210..c72fc9aeb435 100644 --- a/pkg/ccl/streamingccl/streamproducer/event_stream.go +++ b/pkg/ccl/streamingccl/streamproducer/event_stream.go @@ -223,9 +223,7 @@ func (s *eventStream) onValue(ctx context.Context, value *roachpb.RangeFeedValue select { case <-ctx.Done(): case s.eventsCh <- roachpb.RangeFeedEvent{Val: value}: - if log.V(1) { - log.Infof(ctx, "onValue: %s@%s", value.Key, value.Value.Timestamp) - } + log.VInfof(ctx, 1, "onValue: %s@%s", value.Key, value.Value.Timestamp) } } @@ -233,9 +231,7 @@ func (s *eventStream) onCheckpoint(ctx context.Context, checkpoint *roachpb.Rang select { case <-ctx.Done(): case s.eventsCh <- roachpb.RangeFeedEvent{Checkpoint: checkpoint}: - if log.V(1) { - log.Infof(ctx, "onCheckpoint: %s@%s", checkpoint.Span, checkpoint.ResolvedTS) - } + log.VInfof(ctx, 1, "onCheckpoint: %s@%s", checkpoint.Span, checkpoint.ResolvedTS) } } @@ -248,9 +244,7 @@ func (s *eventStream) onSpanCompleted(ctx context.Context, sp roachpb.Span) erro case <-ctx.Done(): return ctx.Err() case s.eventsCh <- roachpb.RangeFeedEvent{Checkpoint: &checkpoint}: - if log.V(1) { - log.Infof(ctx, "onSpanCompleted: %s@%s", checkpoint.Span, checkpoint.ResolvedTS) - } + log.VInfof(ctx, 1, "onSpanCompleted: %s@%s", checkpoint.Span, checkpoint.ResolvedTS) return nil } } @@ -259,9 +253,7 @@ func (s *eventStream) onSSTable(ctx context.Context, sst *roachpb.RangeFeedSSTab select { case <-ctx.Done(): case s.eventsCh <- roachpb.RangeFeedEvent{SST: sst}: - if log.V(1) { - log.Infof(ctx, "onSSTable: %s@%s", sst.Span, sst.WriteTS) - } + log.VInfof(ctx, 1, "onSSTable: %s@%s", sst.Span, sst.WriteTS) } } diff --git a/pkg/cmd/dev/generate.go b/pkg/cmd/dev/generate.go index c06018148a92..baff06353719 100644 --- a/pkg/cmd/dev/generate.go +++ b/pkg/cmd/dev/generate.go @@ -37,6 +37,7 @@ func makeGenerateCmd(runE func(cmd *cobra.Command, args []string) error) *cobra. dev generate bazel # DEPS.bzl and BUILD.bazel files dev generate cgo # files that help non-Bazel systems (IDEs, go) link to our C dependencies dev generate docs # generates documentation + dev generate execgen # generates execgen go code (subset of 'dev generate go') dev generate go # generates go code (execgen, stringer, protobufs, etc.), plus everything 'cgo' generates dev generate go_nocgo # generates go code (execgen, stringer, protobufs, etc.) dev generate protobuf # *.pb.go files (subset of 'dev generate go') @@ -63,6 +64,7 @@ func (d *dev) generate(cmd *cobra.Command, targets []string) error { "bazel": d.generateBazel, "cgo": d.generateCgo, "docs": d.generateDocs, + "execgen": d.generateExecgen, "go": d.generateGo, "go_nocgo": d.generateGoNoCgo, "protobuf": d.generateProtobuf, @@ -154,6 +156,10 @@ func (d *dev) generateDocs(cmd *cobra.Command) error { return d.generateRedactSafe(ctx) } +func (d *dev) generateExecgen(cmd *cobra.Command) error { + return d.generateTarget(cmd.Context(), "//pkg/gen:execgen") +} + func (d *dev) generateGoAndDocs(cmd *cobra.Command) error { ctx := cmd.Context() if err := d.generateTarget(ctx, "//pkg/gen"); err != nil { diff --git a/pkg/sql/clusterunique/BUILD.bazel b/pkg/sql/clusterunique/BUILD.bazel index a7a8b1af16d8..121225d5c575 100644 --- a/pkg/sql/clusterunique/BUILD.bazel +++ b/pkg/sql/clusterunique/BUILD.bazel @@ -9,5 +9,6 @@ go_library( "//pkg/base", "//pkg/util/hlc", "//pkg/util/uint128", + "@com_github_cockroachdb_errors//:errors", ], ) diff --git a/pkg/sql/clusterunique/id.go b/pkg/sql/clusterunique/id.go index f8501a393db0..eb2ef2b6e3c2 100644 --- a/pkg/sql/clusterunique/id.go +++ b/pkg/sql/clusterunique/id.go @@ -14,6 +14,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/uint128" + "github.com/cockroachdb/errors" ) // ID represents an identifier that is guaranteed to be unique across @@ -53,3 +54,22 @@ func IDFromBytes(b []byte) ID { func (id ID) GetNodeID() int32 { return int32(0xFFFFFFFF & id.Lo) } + +// Size returns the marshalled size of id in bytes. +func (id ID) Size() int { + return len(id.GetBytes()) +} + +// MarshalTo marshals id to data. +func (id ID) MarshalTo(data []byte) (int, error) { + return copy(data, id.GetBytes()), nil +} + +// Unmarshal unmarshals data to id. +func (id *ID) Unmarshal(data []byte) error { + if len(data) != 16 { + return errors.Errorf("input data %s for uint128 must be 16 bytes", data) + } + id.Uint128 = uint128.FromBytes(data) + return nil +} diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index ead8df05aafc..98878f46f316 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -2180,10 +2180,7 @@ func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) { txnStart := ex.state.mu.txnStart ex.state.mu.RUnlock() - // Transaction received time is the time at which the statement that prompted - // the creation of this transaction was received. - ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionTransactionReceived, - ex.phaseTimes.GetSessionPhaseTime(sessionphase.SessionQueryReceived)) + ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionTransactionStarted, txnStart) ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionFirstStartExecTransaction, timeutil.Now()) ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionMostRecentStartExecTransaction, ex.phaseTimes.GetSessionPhaseTime(sessionphase.SessionFirstStartExecTransaction)) @@ -2277,6 +2274,10 @@ func (ex *connExecutor) recordTransactionFinish( BytesRead: ex.extraTxnState.bytesRead, } + if ex.server.cfg.TestingKnobs.OnRecordTxnFinish != nil { + ex.server.cfg.TestingKnobs.OnRecordTxnFinish(ex.executorType == executorTypeInternal, ex.phaseTimes, ex.planner.stmt.SQL) + } + return ex.statsCollector.RecordTransaction( ctx, transactionFingerprintID, diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index f166a13f1bdc..91d44d1b7949 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -6211,9 +6211,9 @@ CREATE TABLE crdb_internal.node_execution_outliers ( ctx context.Context, o *outliers.Outlier, ) { err = errors.CombineErrors(err, addRow( - tree.NewDString(hex.EncodeToString(o.Session.ID)), - tree.NewDUuid(tree.DUuid{UUID: *o.Transaction.ID}), - tree.NewDString(hex.EncodeToString(o.Statement.ID)), + tree.NewDString(hex.EncodeToString(o.Session.ID.GetBytes())), + tree.NewDUuid(tree.DUuid{UUID: o.Transaction.ID}), + tree.NewDString(hex.EncodeToString(o.Statement.ID.GetBytes())), tree.NewDBytes(tree.DBytes(sqlstatsutil.EncodeUint64ToBytes(uint64(o.Statement.FingerprintID)))), )) }) diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 843402a3f5f7..975dd8ef87ae 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -86,6 +86,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/sessioninit" + "github.com/cockroachdb/cockroach/pkg/sql/sessionphase" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" @@ -1462,6 +1463,10 @@ type ExecutorTestingKnobs struct { // AfterBackupCheckpoint if set will be called after a BACKUP-CHECKPOINT // is written. AfterBackupCheckpoint func() + + // OnRecordTxnFinish, if set, will be called as we record a transaction + // finishing. + OnRecordTxnFinish func(isInternal bool, phaseTimes *sessionphase.Times, stmt string) } // PGWireTestingKnobs contains knobs for the pgwire module. diff --git a/pkg/sql/sem/tree/datum.go b/pkg/sql/sem/tree/datum.go index be05c210f181..527a311427f6 100644 --- a/pkg/sql/sem/tree/datum.go +++ b/pkg/sql/sem/tree/datum.go @@ -924,7 +924,7 @@ func (d *DFloat) Prev(ctx CompareContext) (Datum, bool) { return nil, false } if f == math.Inf(-1) { - return dNaNFloat, true + return DNaNFloat, true } return NewDFloat(DFloat(math.Nextafter(f, math.Inf(-1)))), true } @@ -933,7 +933,7 @@ func (d *DFloat) Prev(ctx CompareContext) (Datum, bool) { func (d *DFloat) Next(ctx CompareContext) (Datum, bool) { f := float64(*d) if math.IsNaN(f) { - return dNegInfFloat, true + return DNegInfFloat, true } if f == math.Inf(+1) { return nil, false @@ -941,14 +941,20 @@ func (d *DFloat) Next(ctx CompareContext) (Datum, bool) { return NewDFloat(DFloat(math.Nextafter(f, math.Inf(+1)))), true } -var dZeroFloat = NewDFloat(0.0) -var dPosInfFloat = NewDFloat(DFloat(math.Inf(+1))) -var dNegInfFloat = NewDFloat(DFloat(math.Inf(-1))) -var dNaNFloat = NewDFloat(DFloat(math.NaN())) +var ( + // DZeroFloat is the DFloat for zero. + DZeroFloat = NewDFloat(0) + // DPosInfFloat is the DFloat for positive infinity. + DPosInfFloat = NewDFloat(DFloat(math.Inf(+1))) + // DNegInfFloat is the DFloat for negative infinity. + DNegInfFloat = NewDFloat(DFloat(math.Inf(-1))) + // DNaNFloat is the DFloat for NaN. + DNaNFloat = NewDFloat(DFloat(math.NaN())) +) // IsMax implements the Datum interface. func (d *DFloat) IsMax(ctx CompareContext) bool { - return *d == *dPosInfFloat + return *d == *DPosInfFloat } // IsMin implements the Datum interface. @@ -958,12 +964,12 @@ func (d *DFloat) IsMin(ctx CompareContext) bool { // Max implements the Datum interface. func (d *DFloat) Max(ctx CompareContext) (Datum, bool) { - return dPosInfFloat, true + return DPosInfFloat, true } // Min implements the Datum interface. func (d *DFloat) Min(ctx CompareContext) (Datum, bool) { - return dNaNFloat, true + return DNaNFloat, true } // AmbiguousFormat implements the Datum interface. @@ -2553,7 +2559,8 @@ func MustMakeDTimestamp(t time.Time, precision time.Duration) *DTimestamp { return ret } -var dZeroTimestamp = &DTimestamp{} +// DZeroTimestamp is the zero-valued DTimestamp. +var DZeroTimestamp = &DTimestamp{} // time.Time formats. const ( @@ -2868,7 +2875,8 @@ func ParseDTimestampTZ( return d, dependsOnContext, err } -var dZeroTimestampTZ = &DTimestampTZ{} +// DZeroTimestampTZ is the zero-valued DTimestampTZ. +var DZeroTimestampTZ = &DTimestampTZ{} // AsDTimestampTZ attempts to retrieve a DTimestampTZ from an Expr, returning a // DTimestampTZ and a flag signifying whether the assertion was successful. The @@ -5303,13 +5311,13 @@ func NewDefaultDatum(collationEnv *CollationEnvironment, t *types.T) (d Datum, e case types.IntFamily: return DZero, nil case types.FloatFamily: - return dZeroFloat, nil + return DZeroFloat, nil case types.DecimalFamily: return dZeroDecimal, nil case types.DateFamily: return dEpochDate, nil case types.TimestampFamily: - return dZeroTimestamp, nil + return DZeroTimestamp, nil case types.IntervalFamily: return dZeroInterval, nil case types.StringFamily: @@ -5317,7 +5325,7 @@ func NewDefaultDatum(collationEnv *CollationEnvironment, t *types.T) (d Datum, e case types.BytesFamily: return dEmptyBytes, nil case types.TimestampTZFamily: - return dZeroTimestampTZ, nil + return DZeroTimestampTZ, nil case types.CollatedStringFamily: return NewDCollatedString("", t.Locale(), collationEnv) case types.OidFamily: diff --git a/pkg/sql/sessionphase/session_phase.go b/pkg/sql/sessionphase/session_phase.go index daf5d78e3920..0218003d164a 100644 --- a/pkg/sql/sessionphase/session_phase.go +++ b/pkg/sql/sessionphase/session_phase.go @@ -59,9 +59,9 @@ const ( // have no execution, like SHOW TRANSACTION STATUS. SessionQueryServiced - // SessionTransactionReceived is the SessionPhase when a transaction is - // received. - SessionTransactionReceived + // SessionTransactionStarted is the SessionPhase when a transaction is + // started. + SessionTransactionStarted // SessionFirstStartExecTransaction is the SessionPhase when a transaction // is started for the first time. @@ -197,7 +197,7 @@ func (t *Times) GetTransactionRetryLatency() time.Duration { // GetTransactionServiceLatency returns the total time to service the // transaction. func (t *Times) GetTransactionServiceLatency() time.Duration { - return t.times[SessionEndExecTransaction].Sub(t.times[SessionTransactionReceived]) + return t.times[SessionEndExecTransaction].Sub(t.times[SessionTransactionStarted]) } // GetCommitLatency returns the total time spent for the transaction to diff --git a/pkg/sql/sqlstats/outliers/BUILD.bazel b/pkg/sql/sqlstats/outliers/BUILD.bazel index c4831ea1c17e..20d6b3991452 100644 --- a/pkg/sql/sqlstats/outliers/BUILD.bazel +++ b/pkg/sql/sqlstats/outliers/BUILD.bazel @@ -21,8 +21,6 @@ go_library( "//pkg/util/metric", "//pkg/util/quantile", "//pkg/util/syncutil", - "//pkg/util/uint128", - "//pkg/util/uuid", "@com_github_prometheus_client_model//go", ], ) @@ -57,5 +55,8 @@ go_proto_library( importpath = "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/outliers", proto = ":outliers_proto", visibility = ["//visibility:public"], - deps = ["@com_github_gogo_protobuf//gogoproto"], + deps = [ + "//pkg/util/uuid", # keep + "@com_github_gogo_protobuf//gogoproto", + ], ) diff --git a/pkg/sql/sqlstats/outliers/detector.go b/pkg/sql/sqlstats/outliers/detector.go index ed304b9b912a..e61e7c3ac778 100644 --- a/pkg/sql/sqlstats/outliers/detector.go +++ b/pkg/sql/sqlstats/outliers/detector.go @@ -20,7 +20,7 @@ import ( type detector interface { enabled() bool - isOutlier(*Outlier_Statement) bool + isOutlier(*Statement) bool } var _ detector = &anyDetector{} @@ -40,7 +40,7 @@ func (a anyDetector) enabled() bool { return false } -func (a anyDetector) isOutlier(statement *Outlier_Statement) bool { +func (a anyDetector) isOutlier(statement *Statement) bool { // Because some detectors may need to observe all statements to build up // their baseline sense of what "normal" is, we avoid short-circuiting. result := false @@ -68,7 +68,7 @@ func (d latencyQuantileDetector) enabled() bool { return LatencyQuantileDetectorEnabled.Get(&d.settings.SV) } -func (d *latencyQuantileDetector) isOutlier(stmt *Outlier_Statement) (decision bool) { +func (d *latencyQuantileDetector) isOutlier(stmt *Statement) (decision bool) { if !d.enabled() { return false } @@ -86,7 +86,7 @@ func (d *latencyQuantileDetector) isOutlier(stmt *Outlier_Statement) (decision b } func (d *latencyQuantileDetector) withFingerprintLatencySummary( - stmt *Outlier_Statement, consumer func(latencySummary *quantile.Stream), + stmt *Statement, consumer func(latencySummary *quantile.Stream), ) { var latencySummary *quantile.Stream @@ -138,6 +138,6 @@ func (l latencyThresholdDetector) enabled() bool { return LatencyThreshold.Get(&l.st.SV) > 0 } -func (l latencyThresholdDetector) isOutlier(s *Outlier_Statement) bool { +func (l latencyThresholdDetector) isOutlier(s *Statement) bool { return l.enabled() && s.LatencyInSeconds >= LatencyThreshold.Get(&l.st.SV).Seconds() } diff --git a/pkg/sql/sqlstats/outliers/detector_test.go b/pkg/sql/sqlstats/outliers/detector_test.go index c4e1b0234916..cbc72919f5cc 100644 --- a/pkg/sql/sqlstats/outliers/detector_test.go +++ b/pkg/sql/sqlstats/outliers/detector_test.go @@ -39,17 +39,17 @@ func TestAnyDetector(t *testing.T) { t.Run("isOutlier is false without any detectors", func(t *testing.T) { detector := &anyDetector{} - require.False(t, detector.isOutlier(&Outlier_Statement{})) + require.False(t, detector.isOutlier(&Statement{})) }) t.Run("isOutlier is false without any concerned detectors", func(t *testing.T) { detector := &anyDetector{[]detector{&fakeDetector{}, &fakeDetector{}}} - require.False(t, detector.isOutlier(&Outlier_Statement{})) + require.False(t, detector.isOutlier(&Statement{})) }) t.Run("isOutlier is true with at least one concerned detector", func(t *testing.T) { detector := &anyDetector{[]detector{&fakeDetector{stubIsOutlier: true}, &fakeDetector{}}} - require.True(t, detector.isOutlier(&Outlier_Statement{})) + require.True(t, detector.isOutlier(&Statement{})) }) t.Run("isOutlier consults all detectors without short-circuiting", func(t *testing.T) { @@ -59,7 +59,7 @@ func TestAnyDetector(t *testing.T) { d1 := &fakeDetector{stubIsOutlier: true} d2 := &fakeDetector{stubIsOutlier: true} detector := &anyDetector{[]detector{d1, d2}} - detector.isOutlier(&Outlier_Statement{}) + detector.isOutlier(&Statement{}) require.True(t, d1.isOutlierCalled, "the first detector should be consulted") require.True(t, d2.isOutlierCalled, "the second detector should be consulted") }) @@ -117,9 +117,9 @@ func TestLatencyQuantileDetector(t *testing.T) { t.Run(test.name, func(t *testing.T) { d := newLatencyQuantileDetector(st, NewMetrics()) for i := 0; i < 1000; i++ { - d.isOutlier(&Outlier_Statement{LatencyInSeconds: test.seedLatency.Seconds()}) + d.isOutlier(&Statement{LatencyInSeconds: test.seedLatency.Seconds()}) } - require.Equal(t, test.isOutlier, d.isOutlier(&Outlier_Statement{LatencyInSeconds: test.candidateLatency.Seconds()})) + require.Equal(t, test.isOutlier, d.isOutlier(&Statement{LatencyInSeconds: test.candidateLatency.Seconds()})) }) } }) @@ -172,7 +172,7 @@ func TestLatencyQuantileDetector(t *testing.T) { d := newLatencyQuantileDetector(st, metrics) // Show the detector `test.fingerprints` distinct fingerprints. for i := 0; i < test.fingerprints; i++ { - d.isOutlier(&Outlier_Statement{ + d.isOutlier(&Statement{ LatencyInSeconds: LatencyQuantileDetectorInterestingThreshold.Get(&st.SV).Seconds(), FingerprintID: roachpb.StmtFingerprintID(i), }) @@ -191,7 +191,7 @@ func BenchmarkLatencyQuantileDetector(b *testing.B) { LatencyQuantileDetectorEnabled.Override(context.Background(), &settings.SV, true) d := newLatencyQuantileDetector(settings, NewMetrics()) for i := 0; i < b.N; i++ { - d.isOutlier(&Outlier_Statement{ + d.isOutlier(&Statement{ LatencyInSeconds: random.Float64(), }) } @@ -212,21 +212,21 @@ func TestLatencyThresholdDetector(t *testing.T) { t.Run("isOutlier false when disabled", func(t *testing.T) { detector := latencyThresholdDetector{st: cluster.MakeTestingClusterSettings()} - require.False(t, detector.isOutlier(&Outlier_Statement{LatencyInSeconds: 1})) + require.False(t, detector.isOutlier(&Statement{LatencyInSeconds: 1})) }) t.Run("isOutlier false when fast enough", func(t *testing.T) { st := cluster.MakeTestingClusterSettings() LatencyThreshold.Override(context.Background(), &st.SV, 1*time.Second) detector := latencyThresholdDetector{st: st} - require.False(t, detector.isOutlier(&Outlier_Statement{LatencyInSeconds: 0.5})) + require.False(t, detector.isOutlier(&Statement{LatencyInSeconds: 0.5})) }) t.Run("isOutlier true beyond threshold", func(t *testing.T) { st := cluster.MakeTestingClusterSettings() LatencyThreshold.Override(context.Background(), &st.SV, 1*time.Second) detector := latencyThresholdDetector{st: st} - require.True(t, detector.isOutlier(&Outlier_Statement{LatencyInSeconds: 1})) + require.True(t, detector.isOutlier(&Statement{LatencyInSeconds: 1})) }) } @@ -240,7 +240,7 @@ func (f fakeDetector) enabled() bool { return f.stubEnabled } -func (f *fakeDetector) isOutlier(_ *Outlier_Statement) bool { +func (f *fakeDetector) isOutlier(_ *Statement) bool { f.isOutlierCalled = true return f.stubIsOutlier } diff --git a/pkg/sql/sqlstats/outliers/outliers.go b/pkg/sql/sqlstats/outliers/outliers.go index 0f8c8a557843..ceac29c13c4a 100644 --- a/pkg/sql/sqlstats/outliers/outliers.go +++ b/pkg/sql/sqlstats/outliers/outliers.go @@ -14,12 +14,10 @@ import ( "context" "time" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/clusterunique" "github.com/cockroachdb/cockroach/pkg/util/metric" - "github.com/cockroachdb/cockroach/pkg/util/uuid" prometheus "github.com/prometheus/client_model/go" ) @@ -129,15 +127,10 @@ type Reader interface { // exposes the set of currently retained outliers. type Registry interface { // ObserveStatement notifies the registry of a statement execution. - ObserveStatement( - sessionID clusterunique.ID, - statementID clusterunique.ID, - statementFingerprintID roachpb.StmtFingerprintID, - latencyInSeconds float64, - ) + ObserveStatement(sessionID clusterunique.ID, statement *Statement) // ObserveTransaction notifies the registry of the end of a transaction. - ObserveTransaction(sessionID clusterunique.ID, txnID uuid.UUID) + ObserveTransaction(sessionID clusterunique.ID, transaction *Transaction) Reader } diff --git a/pkg/sql/sqlstats/outliers/outliers.proto b/pkg/sql/sqlstats/outliers/outliers.proto index 3bf539c67d89..28becb347fb0 100644 --- a/pkg/sql/sqlstats/outliers/outliers.proto +++ b/pkg/sql/sqlstats/outliers/outliers.proto @@ -14,23 +14,28 @@ option go_package = "outliers"; import "gogoproto/gogo.proto"; -message Outlier { - message Session { - bytes id = 1 [(gogoproto.customname) = "ID"]; - } +message Session { + bytes id = 1 [(gogoproto.customname) = "ID", + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/sql/clusterunique.ID", + (gogoproto.nullable) = false]; +} - message Transaction { - bytes id = 1 [(gogoproto.customname) = "ID", - (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"]; - } +message Transaction { + bytes id = 1 [(gogoproto.customname) = "ID", + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID", + (gogoproto.nullable) = false]; +} - message Statement { - bytes id = 1 [(gogoproto.customname) = "ID"]; - uint64 fingerprint_id = 2 [(gogoproto.customname) = "FingerprintID", - (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.StmtFingerprintID"]; - double latency_in_seconds = 3; - } +message Statement { + bytes id = 1 [(gogoproto.customname) = "ID", + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/sql/clusterunique.ID", + (gogoproto.nullable) = false]; + uint64 fingerprint_id = 2 [(gogoproto.customname) = "FingerprintID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.StmtFingerprintID"]; + double latency_in_seconds = 3; +} +message Outlier { Session session = 1; Transaction transaction = 2; Statement statement = 3; diff --git a/pkg/sql/sqlstats/outliers/outliers_test.go b/pkg/sql/sqlstats/outliers/outliers_test.go index 87b2c3ef5090..2ad09ecde62b 100644 --- a/pkg/sql/sqlstats/outliers/outliers_test.go +++ b/pkg/sql/sqlstats/outliers/outliers_test.go @@ -28,30 +28,25 @@ import ( func TestOutliers(t *testing.T) { ctx := context.Background() - sessionID := clusterunique.IDFromBytes([]byte("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")) - txnID := uuid.FastMakeV4() - stmtID := clusterunique.IDFromBytes([]byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")) - stmtFptID := roachpb.StmtFingerprintID(100) + session := &outliers.Session{ID: clusterunique.IDFromBytes([]byte("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"))} + transaction := &outliers.Transaction{ID: uuid.FastMakeV4()} + statement := &outliers.Statement{ + ID: clusterunique.IDFromBytes([]byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")), + FingerprintID: roachpb.StmtFingerprintID(100), + LatencyInSeconds: 2, + } t.Run("detection", func(t *testing.T) { st := cluster.MakeTestingClusterSettings() outliers.LatencyThreshold.Override(ctx, &st.SV, 1*time.Second) registry := outliers.New(st, outliers.NewMetrics()) - registry.ObserveStatement(sessionID, stmtID, stmtFptID, 2) - registry.ObserveTransaction(sessionID, txnID) + registry.ObserveStatement(session.ID, statement) + registry.ObserveTransaction(session.ID, transaction) expected := []*outliers.Outlier{{ - Session: &outliers.Outlier_Session{ - ID: sessionID.GetBytes(), - }, - Transaction: &outliers.Outlier_Transaction{ - ID: &txnID, - }, - Statement: &outliers.Outlier_Statement{ - ID: stmtID.GetBytes(), - FingerprintID: stmtFptID, - LatencyInSeconds: 2, - }, + Session: session, + Transaction: transaction, + Statement: statement, }} var actual []*outliers.Outlier @@ -69,8 +64,8 @@ func TestOutliers(t *testing.T) { st := cluster.MakeTestingClusterSettings() outliers.LatencyThreshold.Override(ctx, &st.SV, 0) registry := outliers.New(st, outliers.NewMetrics()) - registry.ObserveStatement(sessionID, stmtID, stmtFptID, 2) - registry.ObserveTransaction(sessionID, txnID) + registry.ObserveStatement(session.ID, statement) + registry.ObserveTransaction(session.ID, transaction) var actual []*outliers.Outlier registry.IterateOutliers( @@ -85,9 +80,14 @@ func TestOutliers(t *testing.T) { t.Run("too fast", func(t *testing.T) { st := cluster.MakeTestingClusterSettings() outliers.LatencyThreshold.Override(ctx, &st.SV, 1*time.Second) + statement2 := &outliers.Statement{ + ID: clusterunique.IDFromBytes([]byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")), + FingerprintID: roachpb.StmtFingerprintID(100), + LatencyInSeconds: 0.5, + } registry := outliers.New(st, outliers.NewMetrics()) - registry.ObserveStatement(sessionID, stmtID, stmtFptID, 0.5) - registry.ObserveTransaction(sessionID, txnID) + registry.ObserveStatement(session.ID, statement2) + registry.ObserveTransaction(session.ID, transaction) var actual []*outliers.Outlier registry.IterateOutliers( @@ -100,43 +100,30 @@ func TestOutliers(t *testing.T) { }) t.Run("buffering statements per session", func(t *testing.T) { - otherSessionID := clusterunique.IDFromBytes([]byte("cccccccccccccccccccccccccccccccc")) - otherTxnID := uuid.FastMakeV4() - otherStmtID := clusterunique.IDFromBytes([]byte("dddddddddddddddddddddddddddddddd")) - otherStmtFptID := roachpb.StmtFingerprintID(101) + otherSession := &outliers.Session{ID: clusterunique.IDFromBytes([]byte("cccccccccccccccccccccccccccccccc"))} + otherTransaction := &outliers.Transaction{ID: uuid.FastMakeV4()} + otherStatement := &outliers.Statement{ + ID: clusterunique.IDFromBytes([]byte("dddddddddddddddddddddddddddddddd")), + FingerprintID: roachpb.StmtFingerprintID(101), + LatencyInSeconds: 3, + } st := cluster.MakeTestingClusterSettings() outliers.LatencyThreshold.Override(ctx, &st.SV, 1*time.Second) registry := outliers.New(st, outliers.NewMetrics()) - registry.ObserveStatement(sessionID, stmtID, stmtFptID, 2) - registry.ObserveStatement(otherSessionID, otherStmtID, otherStmtFptID, 3) - registry.ObserveTransaction(sessionID, txnID) - registry.ObserveTransaction(otherSessionID, otherTxnID) + registry.ObserveStatement(session.ID, statement) + registry.ObserveStatement(otherSession.ID, otherStatement) + registry.ObserveTransaction(session.ID, transaction) + registry.ObserveTransaction(otherSession.ID, otherTransaction) expected := []*outliers.Outlier{{ - Session: &outliers.Outlier_Session{ - ID: sessionID.GetBytes(), - }, - Transaction: &outliers.Outlier_Transaction{ - ID: &txnID, - }, - Statement: &outliers.Outlier_Statement{ - ID: stmtID.GetBytes(), - FingerprintID: stmtFptID, - LatencyInSeconds: 2, - }, + Session: session, + Transaction: transaction, + Statement: statement, }, { - Session: &outliers.Outlier_Session{ - ID: otherSessionID.GetBytes(), - }, - Transaction: &outliers.Outlier_Transaction{ - ID: &otherTxnID, - }, - Statement: &outliers.Outlier_Statement{ - ID: otherStmtID.GetBytes(), - FingerprintID: otherStmtFptID, - LatencyInSeconds: 3, - }, + Session: otherSession, + Transaction: otherTransaction, + Statement: otherStatement, }} var actual []*outliers.Outlier registry.IterateOutliers( @@ -148,7 +135,7 @@ func TestOutliers(t *testing.T) { // IterateOutliers doesn't specify its iteration order, so we sort here for a stable test. sort.Slice(actual, func(i, j int) bool { - return bytes.Compare(actual[i].Session.ID, actual[j].Session.ID) < 0 + return bytes.Compare(actual[i].Session.ID.GetBytes(), actual[j].Session.ID.GetBytes()) < 0 }) require.Equal(t, expected, actual) diff --git a/pkg/sql/sqlstats/outliers/registry.go b/pkg/sql/sqlstats/outliers/registry.go index c3ece56bbbf1..002defbe1405 100644 --- a/pkg/sql/sqlstats/outliers/registry.go +++ b/pkg/sql/sqlstats/outliers/registry.go @@ -13,13 +13,10 @@ package outliers import ( "context" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/clusterunique" "github.com/cockroachdb/cockroach/pkg/util/cache" "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/cockroachdb/cockroach/pkg/util/uint128" - "github.com/cockroachdb/cockroach/pkg/util/uuid" ) // maxCacheSize is the number of detected outliers we will retain in memory. @@ -40,7 +37,7 @@ type registry struct { // before enabling the outliers subsystem by default. mu struct { syncutil.RWMutex - statements map[clusterunique.ID][]*Outlier_Statement + statements map[clusterunique.ID][]*Statement outliers *cache.UnorderedCache } } @@ -59,30 +56,21 @@ func newRegistry(st *cluster.Settings, metrics Metrics) Registry { latencyThresholdDetector{st: st}, newLatencyQuantileDetector(st, metrics), }}} - r.mu.statements = make(map[clusterunique.ID][]*Outlier_Statement) + r.mu.statements = make(map[clusterunique.ID][]*Statement) r.mu.outliers = cache.NewUnorderedCache(config) return r } -func (r *registry) ObserveStatement( - sessionID clusterunique.ID, - statementID clusterunique.ID, - statementFingerprintID roachpb.StmtFingerprintID, - latencyInSeconds float64, -) { +func (r *registry) ObserveStatement(sessionID clusterunique.ID, statement *Statement) { if !r.enabled() { return } r.mu.Lock() defer r.mu.Unlock() - r.mu.statements[sessionID] = append(r.mu.statements[sessionID], &Outlier_Statement{ - ID: statementID.GetBytes(), - FingerprintID: statementFingerprintID, - LatencyInSeconds: latencyInSeconds, - }) + r.mu.statements[sessionID] = append(r.mu.statements[sessionID], statement) } -func (r *registry) ObserveTransaction(sessionID clusterunique.ID, txnID uuid.UUID) { +func (r *registry) ObserveTransaction(sessionID clusterunique.ID, transaction *Transaction) { if !r.enabled() { return } @@ -100,9 +88,9 @@ func (r *registry) ObserveTransaction(sessionID clusterunique.ID, txnID uuid.UUI if hasOutlier { for _, s := range statements { - r.mu.outliers.Add(uint128.FromBytes(s.ID), &Outlier{ - Session: &Outlier_Session{ID: sessionID.GetBytes()}, - Transaction: &Outlier_Transaction{ID: &txnID}, + r.mu.outliers.Add(s.ID, &Outlier{ + Session: &Session{ID: sessionID}, + Transaction: transaction, Statement: s, }) } diff --git a/pkg/sql/sqlstats/sslocal/BUILD.bazel b/pkg/sql/sqlstats/sslocal/BUILD.bazel index 7e8144f61697..7d3fffa5b144 100644 --- a/pkg/sql/sqlstats/sslocal/BUILD.bazel +++ b/pkg/sql/sqlstats/sslocal/BUILD.bazel @@ -69,6 +69,7 @@ go_test( "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/mon", + "@com_github_jackc_pgx_v4//:pgx", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/sql/sqlstats/sslocal/sql_stats_test.go b/pkg/sql/sqlstats/sslocal/sql_stats_test.go index de2cca5ca4ef..1158be2a30f0 100644 --- a/pkg/sql/sqlstats/sslocal/sql_stats_test.go +++ b/pkg/sql/sqlstats/sslocal/sql_stats_test.go @@ -13,7 +13,9 @@ package sslocal_test import ( "context" "math" + "net/url" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -37,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/jackc/pgx/v4" "github.com/stretchr/testify/require" ) @@ -668,3 +671,61 @@ func TestUnprivilegedUserReset(t *testing.T) { require.Contains(t, err.Error(), "requires admin privilege") } + +func TestTransactionServiceLatencyOnExtendedProtocol(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + testData := []*struct { + query string + placeholders []interface{} + phaseTimes *sessionphase.Times + }{ + { + query: "SELECT $1::INT8", + placeholders: []interface{}{1}, + phaseTimes: nil, + }, + } + + waitTxnFinish := make(chan struct{}) + currentTestCaseIdx := 0 + const latencyThreshold = time.Second * 5 + + params, _ := tests.CreateTestServerParams() + params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{ + OnRecordTxnFinish: func(isInternal bool, phaseTimes *sessionphase.Times, stmt string) { + if !isInternal && testData[currentTestCaseIdx].query == stmt { + testData[currentTestCaseIdx].phaseTimes = phaseTimes.Clone() + go func() { + waitTxnFinish <- struct{}{} + }() + } + }, + } + s, _, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + + pgURL, cleanupGoDB := sqlutils.PGUrl( + t, s.ServingSQLAddr(), "StartServer", url.User(username.RootUser)) + defer cleanupGoDB() + c, err := pgx.Connect(ctx, pgURL.String()) + require.NoError(t, err, "error connecting with pg url") + + for currentTestCaseIdx < len(testData) { + tc := testData[currentTestCaseIdx] + // Make extended protocol query + _ = c.QueryRow(ctx, tc.query, tc.placeholders...) + require.NoError(t, err, "error scanning row") + <-waitTxnFinish + + // Ensure test case phase times are populated by query txn. + require.True(t, tc.phaseTimes != nil) + // Ensure SessionTransactionStarted variable is populated. + require.True(t, !tc.phaseTimes.GetSessionPhaseTime(sessionphase.SessionTransactionStarted).IsZero()) + // Ensure compute transaction service latency is within a reasonable threshold. + require.True(t, tc.phaseTimes.GetTransactionServiceLatency() < latencyThreshold) + currentTestCaseIdx++ + } +} diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go index 5ef4569ffcad..9691c0cc194b 100644 --- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/execstats" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" + "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/outliers" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/errors" ) @@ -159,7 +160,11 @@ func (s *Container) RecordStatement( } } - s.outliersRegistry.ObserveStatement(value.SessionID, value.StatementID, stmtFingerprintID, value.ServiceLatency) + s.outliersRegistry.ObserveStatement(value.SessionID, &outliers.Statement{ + ID: value.StatementID, + FingerprintID: stmtFingerprintID, + LatencyInSeconds: value.ServiceLatency, + }) return stats.ID, nil } @@ -269,7 +274,7 @@ func (s *Container) RecordTransaction( stats.mu.data.ExecStats.MaxDiskUsage.Record(stats.mu.data.ExecStats.Count, float64(value.ExecStats.MaxDiskUsage)) } - s.outliersRegistry.ObserveTransaction(value.SessionID, value.TransactionID) + s.outliersRegistry.ObserveTransaction(value.SessionID, &outliers.Transaction{ID: value.TransactionID}) return nil } diff --git a/pkg/sql/stats/BUILD.bazel b/pkg/sql/stats/BUILD.bazel index 7d818e5ac120..fac5b36b18df 100644 --- a/pkg/sql/stats/BUILD.bazel +++ b/pkg/sql/stats/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "histogram.go", "json.go", "new_stat.go", + "quantile.go", "row_sampling.go", "stats_cache.go", ], @@ -52,6 +53,7 @@ go_library( "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/timeutil", + "//pkg/util/timeutil/pgdate", "//pkg/util/tracing", "@com_github_cockroachdb_errors//:errors", ], @@ -67,6 +69,7 @@ go_test( "delete_stats_test.go", "histogram_test.go", "main_test.go", + "quantile_test.go", "row_sampling_test.go", "stats_cache_test.go", ], @@ -117,6 +120,7 @@ go_test( "//pkg/util/randutil", "//pkg/util/retry", "//pkg/util/timeutil", + "//pkg/util/timeutil/pgdate", "@com_github_cockroachdb_errors//:errors", ], ) diff --git a/pkg/sql/stats/quantile.go b/pkg/sql/stats/quantile.go new file mode 100644 index 000000000000..93dd8a2b656c --- /dev/null +++ b/pkg/sql/stats/quantile.go @@ -0,0 +1,173 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package stats + +import ( + "math" + "time" + + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil/pgdate" + "github.com/cockroachdb/errors" +) + +// CanMakeQuantile returns true if a quantile function can be created for a +// histogram of the given type. +// TODO(michae2): Add support for DECIMAL, TIME, TIMETZ, and INTERVAL. +func CanMakeQuantile(colType *types.T) bool { + if colType.UserDefined() { + return false + } + switch colType.Family() { + case types.IntFamily, + types.FloatFamily, + types.DateFamily, + types.TimestampFamily, + types.TimestampTZFamily: + return true + default: + return false + } +} + +// ToQuantileValue converts from a datum to a float suitable for use in a quantile +// function. It differs from eval.PerformCast in a few ways: +// 1. It supports conversions that are not legal casts (e.g. DATE to FLOAT). +// 2. It errors on NaN and infinite values because they will break our model. +// FromQuantileValue is the inverse of this function, and together they should +// support round-trip conversions. +// TODO(michae2): Add support for DECIMAL, TIME, TIMETZ, and INTERVAL. +func ToQuantileValue(d tree.Datum) (float64, error) { + switch v := d.(type) { + case *tree.DInt: + return float64(*v), nil + case *tree.DFloat: + if math.IsNaN(float64(*v)) || math.IsInf(float64(*v), 0) { + return 0, tree.ErrFloatOutOfRange + } + return float64(*v), nil + case *tree.DDate: + if !v.IsFinite() { + return 0, tree.ErrFloatOutOfRange + } + // We use PG epoch instead of Unix epoch to simplify clamping when + // converting back. + return float64(v.PGEpochDays()), nil + case *tree.DTimestamp: + if v.Equal(pgdate.TimeInfinity) || v.Equal(pgdate.TimeNegativeInfinity) { + return 0, tree.ErrFloatOutOfRange + } + return float64(v.Unix()) + float64(v.Nanosecond())*1e-9, nil + case *tree.DTimestampTZ: + // TIMESTAMPTZ doesn't store a timezone, so this is the same as TIMESTAMP. + if v.Equal(pgdate.TimeInfinity) || v.Equal(pgdate.TimeNegativeInfinity) { + return 0, tree.ErrFloatOutOfRange + } + return float64(v.Unix()) + float64(v.Nanosecond())*1e-9, nil + default: + return 0, errors.Errorf("cannot make quantile value from %v", d) + } +} + +var ( + // quantileMinTimestamp is an alternative minimum finite DTimestamp value to + // avoid the problems around TimeNegativeInfinity, see #41564. + quantileMinTimestamp = tree.MinSupportedTime.Add(time.Second) + quantileMinTimestampSec = float64(quantileMinTimestamp.Unix()) + // quantileMaxTimestamp is an alternative maximum finite DTimestamp value to + // avoid the problems around TimeInfinity, see #41564. + quantileMaxTimestamp = tree.MaxSupportedTime.Add(-1 * time.Second).Truncate(time.Second) + quantileMaxTimestampSec = float64(quantileMaxTimestamp.Unix()) +) + +// FromQuantileValue converts from a quantile value back to a datum suitable for +// use in a histogram. It is the inverse of ToQuantileValue. It differs from +// eval.PerformCast in a few ways: +// 1. It supports conversions that are not legal casts (e.g. FLOAT to DATE). +// 2. It errors on NaN and infinite values because they indicate a problem with +// the regression model rather than valid values. +// 3. On overflow or underflow it clamps to maximum or minimum finite values +// rather than failing the conversion (and thus the entire histogram). +// TODO(michae2): Add support for DECIMAL, TIME, TIMETZ, and INTERVAL. +func FromQuantileValue(colType *types.T, val float64) (tree.Datum, error) { + if math.IsNaN(val) || math.IsInf(val, 0) { + return nil, tree.ErrFloatOutOfRange + } + switch colType.Family() { + case types.IntFamily: + i := math.Round(val) + // Clamp instead of truncating. + switch colType.Width() { + case 16: + if i < math.MinInt16 { + i = math.MinInt16 + } else if i > math.MaxInt16 { + i = math.MaxInt16 + } + case 32: + if i < math.MinInt32 { + i = math.MinInt32 + } else if i > math.MaxInt32 { + i = math.MaxInt32 + } + default: + if i < math.MinInt64 { + i = math.MinInt64 + } else if i >= math.MaxInt64 { + // float64 cannot represent 2^63 - 1 exactly, so cast directly to DInt. + return tree.NewDInt(tree.DInt(math.MaxInt64)), nil + } + } + return tree.NewDInt(tree.DInt(i)), nil + case types.FloatFamily: + switch colType.Width() { + case 32: + if val <= -math.MaxFloat32 { + val = -math.MaxFloat32 + } else if val >= math.MaxFloat32 { + val = math.MaxFloat32 + } else { + val = float64(float32(val)) + } + } + return tree.NewDFloat(tree.DFloat(val)), nil + case types.DateFamily: + days := math.Round(val) + // First clamp to int32. + if days < math.MinInt32 { + days = math.MinInt32 + } else if days > math.MaxInt32 { + days = math.MaxInt32 + } + // Then clamp to pgdate.Date. + return tree.NewDDate(pgdate.MakeDateFromPGEpochClampFinite(int32(days))), nil + case types.TimestampFamily, types.TimestampTZFamily: + sec, frac := math.Modf(val) + var t time.Time + // Clamp to (our alternative finite) DTimestamp bounds. + if sec <= quantileMinTimestampSec { + t = quantileMinTimestamp + } else if sec >= quantileMaxTimestampSec { + t = quantileMaxTimestamp + } else { + t = timeutil.Unix(int64(sec), int64(frac*1e9)) + } + roundTo := tree.TimeFamilyPrecisionToRoundDuration(colType.Precision()) + if colType.Family() == types.TimestampFamily { + return tree.MakeDTimestamp(t, roundTo) + } + return tree.MakeDTimestampTZ(t, roundTo) + default: + return nil, errors.Errorf("cannot convert quantile value to type %s", colType.Name()) + } +} diff --git a/pkg/sql/stats/quantile_test.go b/pkg/sql/stats/quantile_test.go new file mode 100644 index 000000000000..ea4027fe7c70 --- /dev/null +++ b/pkg/sql/stats/quantile_test.go @@ -0,0 +1,573 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package stats + +import ( + "math" + "strconv" + "testing" + + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/timeutil/pgdate" +) + +// TODO(michae2): Test that a random histogram can round-trip to quantile +// and back. + +// Test conversions from datum to quantile value and back. +func TestQuantileValueRoundTrip(t *testing.T) { + testCases := []struct { + typ *types.T + dat tree.Datum + val float64 + err bool + }{ + // Integer cases. + { + typ: types.Int, + dat: tree.NewDInt(tree.DInt(0)), + val: 0, + }, + { + typ: types.Int, + dat: tree.NewDInt(tree.DInt(42)), + val: 42, + }, + { + typ: types.Int, + dat: tree.NewDInt(tree.DInt(math.MinInt32 - 1)), + val: math.MinInt32 - 1, + }, + { + typ: types.Int, + dat: tree.NewDInt(tree.DInt(math.MaxInt32 + 1)), + val: math.MaxInt32 + 1, + }, + { + typ: types.Int, + dat: tree.NewDInt(tree.DInt(math.MinInt64)), + val: math.MinInt64, + }, + { + typ: types.Int, + dat: tree.NewDInt(tree.DInt(math.MaxInt64)), + val: math.MaxInt64, + }, + { + typ: types.Int4, + dat: tree.NewDInt(tree.DInt(math.MinInt16 - 1)), + val: math.MinInt16 - 1, + }, + { + typ: types.Int4, + dat: tree.NewDInt(tree.DInt(math.MaxInt16 + 1)), + val: math.MaxInt16 + 1, + }, + { + typ: types.Int4, + dat: tree.NewDInt(tree.DInt(math.MinInt32)), + val: math.MinInt32, + }, + { + typ: types.Int4, + dat: tree.NewDInt(tree.DInt(math.MaxInt32)), + val: math.MaxInt32, + }, + { + typ: types.Int2, + dat: tree.NewDInt(tree.DInt(math.MinInt16)), + val: math.MinInt16, + }, + { + typ: types.Int2, + dat: tree.NewDInt(tree.DInt(math.MaxInt16)), + val: math.MaxInt16, + }, + // Float cases. + { + typ: types.Float, + dat: tree.DZeroFloat, + val: 0, + }, + { + typ: types.Float, + dat: tree.NewDFloat(tree.DFloat(-math.MaxFloat32 - 1)), + val: -math.MaxFloat32 - 1, + }, + { + typ: types.Float, + dat: tree.NewDFloat(tree.DFloat(math.MaxFloat32 + 1)), + val: math.MaxFloat32 + 1, + }, + { + typ: types.Float, + dat: tree.NewDFloat(tree.DFloat(-math.MaxFloat64)), + val: -math.MaxFloat64, + }, + { + typ: types.Float, + dat: tree.NewDFloat(tree.DFloat(math.MaxFloat64)), + val: math.MaxFloat64, + }, + { + typ: types.Float, + dat: tree.NewDFloat(tree.DFloat(math.Pi)), + val: math.Pi, + }, + { + typ: types.Float, + dat: tree.NewDFloat(tree.DFloat(math.SmallestNonzeroFloat64)), + val: math.SmallestNonzeroFloat64, + }, + { + typ: types.Float, + dat: tree.DNaNFloat, + err: true, + }, + { + typ: types.Float, + dat: tree.DNegInfFloat, + err: true, + }, + { + typ: types.Float, + dat: tree.DPosInfFloat, + err: true, + }, + { + typ: types.Float4, + dat: tree.NewDFloat(tree.DFloat(-math.MaxFloat32)), + val: -math.MaxFloat32, + }, + { + typ: types.Float4, + dat: tree.NewDFloat(tree.DFloat(math.MaxFloat32)), + val: math.MaxFloat32, + }, + { + typ: types.Float4, + dat: tree.NewDFloat(tree.DFloat(float32(math.Pi))), + val: float64(float32(math.Pi)), + }, + { + typ: types.Float4, + dat: tree.NewDFloat(tree.DFloat(math.SmallestNonzeroFloat32)), + val: math.SmallestNonzeroFloat32, + }, + { + typ: types.Float4, + dat: tree.DNaNFloat, + err: true, + }, + { + typ: types.Float4, + dat: tree.DNegInfFloat, + err: true, + }, + { + typ: types.Float4, + dat: tree.DPosInfFloat, + err: true, + }, + // Date cases. + { + typ: types.Date, + dat: tree.NewDDate(pgdate.MakeDateFromPGEpochClampFinite(0)), + val: 0, + }, + { + typ: types.Date, + dat: tree.NewDDate(pgdate.LowDate), + val: float64(pgdate.LowDate.PGEpochDays()), + }, + { + typ: types.Date, + dat: tree.NewDDate(pgdate.HighDate), + val: float64(pgdate.HighDate.PGEpochDays()), + }, + { + typ: types.Date, + dat: tree.NewDDate(pgdate.PosInfDate), + err: true, + }, + { + typ: types.Date, + dat: tree.NewDDate(pgdate.NegInfDate), + err: true, + }, + // Timestamp cases. + { + typ: types.Timestamp, + dat: tree.DZeroTimestamp, + val: float64(tree.DZeroTimestamp.Unix()), + }, + { + typ: types.Timestamp, + dat: &tree.DTimestamp{Time: quantileMinTimestamp}, + val: quantileMinTimestampSec, + }, + { + typ: types.Timestamp, + dat: &tree.DTimestamp{Time: quantileMaxTimestamp}, + val: quantileMaxTimestampSec, + }, + { + typ: types.Timestamp, + dat: &tree.DTimestamp{Time: pgdate.TimeNegativeInfinity}, + err: true, + }, + { + typ: types.Timestamp, + dat: &tree.DTimestamp{Time: pgdate.TimeInfinity}, + err: true, + }, + { + typ: types.TimestampTZ, + dat: tree.DZeroTimestampTZ, + val: float64(tree.DZeroTimestampTZ.Unix()), + }, + { + typ: types.TimestampTZ, + dat: &tree.DTimestampTZ{Time: quantileMinTimestamp}, + val: quantileMinTimestampSec, + }, + { + typ: types.TimestampTZ, + dat: &tree.DTimestampTZ{Time: quantileMaxTimestamp}, + val: quantileMaxTimestampSec, + }, + { + typ: types.TimestampTZ, + dat: &tree.DTimestampTZ{Time: pgdate.TimeNegativeInfinity}, + err: true, + }, + { + typ: types.TimestampTZ, + dat: &tree.DTimestampTZ{Time: pgdate.TimeInfinity}, + err: true, + }, + } + evalCtx := eval.NewTestingEvalContext(cluster.MakeTestingClusterSettings()) + for i, tc := range testCases { + t.Run(strconv.Itoa(i), func(t *testing.T) { + val, err := ToQuantileValue(tc.dat) + if err != nil { + if !tc.err { + t.Errorf("test case %d (%v) unexpected ToQuantileValue err: %v", i, tc.typ.Name(), err) + } + return + } + if tc.err { + t.Errorf("test case %d (%v) expected ToQuantileValue err", i, tc.typ.Name()) + return + } + if val != tc.val { + t.Errorf("test case %d (%v) incorrect val %v expected %v", i, tc.typ.Name(), val, tc.val) + return + } + // Check that we can make the round trip. + res, err := FromQuantileValue(tc.typ, val) + if err != nil { + t.Errorf("test case %d (%v) unexpected FromQuantileValue err: %v", i, tc.typ.Name(), err) + return + } + cmp, err := res.CompareError(evalCtx, tc.dat) + if err != nil { + t.Errorf("test case %d (%v) unexpected CompareError err: %v", i, tc.typ.Name(), err) + return + } + if cmp != 0 { + t.Errorf("test case %d (%v) incorrect datum %v expected %v", i, tc.typ.Name(), res, tc.dat) + } + }) + } +} + +// Test conversions from quantile value to datum and back. +// TestQuantileValueRoundTrip covers similar ground, so here we focus on cases +// that overflow or underflow and have to clamp. +func TestQuantileValueRoundTripOverflow(t *testing.T) { + testCases := []struct { + typ *types.T + val float64 + dat tree.Datum + err bool + res float64 + }{ + // Integer cases. + { + typ: types.Int, + val: math.MinInt64 - 1, + dat: tree.NewDInt(tree.DInt(math.MinInt64)), + res: math.MinInt64, + }, + { + typ: types.Int, + val: math.MaxInt64 + 1, + dat: tree.NewDInt(tree.DInt(math.MaxInt64)), + res: math.MaxInt64, + }, + { + typ: types.Int, + val: -math.MaxFloat64, + dat: tree.NewDInt(tree.DInt(math.MinInt64)), + res: math.MinInt64, + }, + { + typ: types.Int, + val: math.MaxFloat64, + dat: tree.NewDInt(tree.DInt(math.MaxInt64)), + res: math.MaxInt64, + }, + { + typ: types.Int4, + val: math.MinInt32 - 1, + dat: tree.NewDInt(tree.DInt(math.MinInt32)), + res: math.MinInt32, + }, + { + typ: types.Int4, + val: math.MaxInt32 + 1, + dat: tree.NewDInt(tree.DInt(math.MaxInt32)), + res: math.MaxInt32, + }, + { + typ: types.Int4, + val: -math.MaxFloat64, + dat: tree.NewDInt(tree.DInt(math.MinInt32)), + res: math.MinInt32, + }, + { + typ: types.Int4, + val: math.MaxFloat64, + dat: tree.NewDInt(tree.DInt(math.MaxInt32)), + res: math.MaxInt32, + }, + { + typ: types.Int2, + val: math.MinInt16 - 1, + dat: tree.NewDInt(tree.DInt(math.MinInt16)), + res: math.MinInt16, + }, + { + typ: types.Int2, + val: math.MaxInt16 + 1, + dat: tree.NewDInt(tree.DInt(math.MaxInt16)), + res: math.MaxInt16, + }, + { + typ: types.Int2, + val: -math.MaxFloat64, + dat: tree.NewDInt(tree.DInt(math.MinInt16)), + res: math.MinInt16, + }, + { + typ: types.Int2, + val: math.MaxFloat64, + dat: tree.NewDInt(tree.DInt(math.MaxInt16)), + res: math.MaxInt16, + }, + // Float cases. + { + typ: types.Float, + val: -math.MaxFloat64, + dat: tree.NewDFloat(tree.DFloat(-math.MaxFloat64)), + res: -math.MaxFloat64, + }, + { + typ: types.Float, + val: math.MaxFloat64, + dat: tree.NewDFloat(tree.DFloat(math.MaxFloat64)), + res: math.MaxFloat64, + }, + { + typ: types.Float, + val: -math.SmallestNonzeroFloat64, + dat: tree.NewDFloat(tree.DFloat(-math.SmallestNonzeroFloat64)), + res: -math.SmallestNonzeroFloat64, + }, + { + typ: types.Float, + val: math.SmallestNonzeroFloat64, + dat: tree.NewDFloat(tree.DFloat(math.SmallestNonzeroFloat64)), + res: math.SmallestNonzeroFloat64, + }, + { + typ: types.Float, + val: math.NaN(), + err: true, + }, + { + typ: types.Float, + val: math.Inf(-1), + err: true, + }, + { + typ: types.Float, + val: math.Inf(+1), + err: true, + }, + { + typ: types.Float4, + val: -math.MaxFloat32 - 1, + dat: tree.NewDFloat(tree.DFloat(-math.MaxFloat32)), + res: -math.MaxFloat32, + }, + { + typ: types.Float4, + val: math.MaxFloat32 + 1, + dat: tree.NewDFloat(tree.DFloat(math.MaxFloat32)), + res: math.MaxFloat32, + }, + { + typ: types.Float4, + val: -math.MaxFloat64, + dat: tree.NewDFloat(tree.DFloat(-math.MaxFloat32)), + res: -math.MaxFloat32, + }, + { + typ: types.Float4, + val: math.MaxFloat64, + dat: tree.NewDFloat(tree.DFloat(math.MaxFloat32)), + res: math.MaxFloat32, + }, + { + typ: types.Float4, + val: math.Pi, + dat: tree.NewDFloat(tree.DFloat(float32(math.Pi))), + res: float64(float32(math.Pi)), + }, + { + typ: types.Float4, + val: -math.SmallestNonzeroFloat64, + dat: tree.DZeroFloat, + res: 0, + }, + { + typ: types.Float4, + val: math.SmallestNonzeroFloat64, + dat: tree.DZeroFloat, + res: 0, + }, + // Date cases. + { + typ: types.Date, + val: float64(pgdate.LowDate.PGEpochDays()) - 1, + dat: tree.NewDDate(pgdate.LowDate), + res: float64(pgdate.LowDate.PGEpochDays()), + }, + { + typ: types.Date, + val: float64(pgdate.HighDate.PGEpochDays()) + 1, + dat: tree.NewDDate(pgdate.HighDate), + res: float64(pgdate.HighDate.PGEpochDays()), + }, + { + typ: types.Date, + val: -math.MaxFloat64, + dat: tree.NewDDate(pgdate.LowDate), + res: float64(pgdate.LowDate.PGEpochDays()), + }, + { + typ: types.Date, + val: math.MaxFloat64, + dat: tree.NewDDate(pgdate.HighDate), + res: float64(pgdate.HighDate.PGEpochDays()), + }, + // Timestamp cases. + { + typ: types.Timestamp, + val: float64(pgdate.TimeNegativeInfinity.Unix()), + dat: &tree.DTimestamp{Time: quantileMinTimestamp}, + res: quantileMinTimestampSec, + }, + { + typ: types.Timestamp, + val: float64(pgdate.TimeInfinity.Unix()), + dat: &tree.DTimestamp{Time: quantileMaxTimestamp}, + res: quantileMaxTimestampSec, + }, + { + typ: types.Timestamp, + val: -math.MaxFloat64, + dat: &tree.DTimestamp{Time: quantileMinTimestamp}, + res: quantileMinTimestampSec, + }, + { + typ: types.Timestamp, + val: math.MaxFloat64, + dat: &tree.DTimestamp{Time: quantileMaxTimestamp}, + res: quantileMaxTimestampSec, + }, + { + typ: types.TimestampTZ, + val: float64(pgdate.TimeNegativeInfinity.Unix()), + dat: &tree.DTimestampTZ{Time: quantileMinTimestamp}, + res: quantileMinTimestampSec, + }, + { + typ: types.TimestampTZ, + val: float64(pgdate.TimeInfinity.Unix()), + dat: &tree.DTimestampTZ{Time: quantileMaxTimestamp}, + res: quantileMaxTimestampSec, + }, + { + typ: types.TimestampTZ, + val: -math.MaxFloat64, + dat: &tree.DTimestampTZ{Time: quantileMinTimestamp}, + res: quantileMinTimestampSec, + }, + { + typ: types.TimestampTZ, + val: math.MaxFloat64, + dat: &tree.DTimestampTZ{Time: quantileMaxTimestamp}, + res: quantileMaxTimestampSec, + }, + } + evalCtx := eval.NewTestingEvalContext(cluster.MakeTestingClusterSettings()) + for i, tc := range testCases { + t.Run(strconv.Itoa(i), func(t *testing.T) { + d, err := FromQuantileValue(tc.typ, tc.val) + if err != nil { + if !tc.err { + t.Errorf("test case %d (%v) unexpected FromQuantileValue err: %v", i, tc.typ.Name(), err) + } + return + } + if tc.err { + t.Errorf("test case %d (%v) expected FromQuantileValue err", i, tc.typ.Name()) + return + } + cmp, err := d.CompareError(evalCtx, tc.dat) + if err != nil { + t.Errorf("test case %d (%v) unexpected CompareError err: %v", i, tc.typ.Name(), err) + return + } + if cmp != 0 { + t.Errorf("test case %d (%v) incorrect datum %v expected %v", i, tc.typ.Name(), d, tc.dat) + return + } + // Check that we can make the round trip with the clamped value. + res, err := ToQuantileValue(d) + if err != nil { + t.Errorf("test case %d (%v) unexpected ToQuantileValue err: %v", i, tc.typ.Name(), err) + return + } + if res != tc.res { + t.Errorf("test case %d (%v) incorrect val %v expected %v", i, tc.typ.Name(), res, tc.res) + return + } + }) + } +} diff --git a/pkg/util/timeutil/pgdate/pgdate.go b/pkg/util/timeutil/pgdate/pgdate.go index 9260c1d91696..34f92318a2e1 100644 --- a/pkg/util/timeutil/pgdate/pgdate.go +++ b/pkg/util/timeutil/pgdate/pgdate.go @@ -141,6 +141,17 @@ func MakeDateFromPGEpoch(days int32) (Date, error) { return Date{days: days}, nil } +// MakeDateFromPGEpochClampFinite creates a Date from the number of days since +// 2000-01-01, clamping to LowDate or HighDate if outside those bounds. +func MakeDateFromPGEpochClampFinite(days int32) Date { + if days < lowDays { + return LowDate + } else if days > highDays { + return HighDate + } + return Date{days: days} +} + // ToTime returns d as a time.Time. Non finite dates return an error. func (d Date) ToTime() (time.Time, error) { if d.days == math.MinInt32 || d.days == math.MaxInt32 {