From f8f74599feeecc98f2b2e83554c37c4b938dc83e Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Wed, 29 Jun 2022 10:28:50 -0400 Subject: [PATCH 01/10] sql: fix and rename sql stats session transaction received time Resolves: #82894 Due to a change from #76792, implicit transactions can start before `SessionQueryReceived` session phase time is set by the sqlstats system. In turn, this caused the `SessionTransactionReceived` (now renamed as `SessionTransactionStarted`) session phase time to be recorded incorrectly, causing extremely large transactions times on the UI. This change fixes this mistake by setting the actual transaction start time as the `SessionTransactionStarted` session phase time, instead of `SessionQueryReceived`. Release note (bug fix): The `SessionTransactionReceived` session phase time is no longer recorded incorrectly, fixing large transaction times from appearing on the UI, also renamed to `SessionTransactionStarted`. --- pkg/sql/conn_executor_exec.go | 9 ++-- pkg/sql/exec_util.go | 5 ++ pkg/sql/sessionphase/session_phase.go | 8 +-- pkg/sql/sqlstats/sslocal/BUILD.bazel | 1 + pkg/sql/sqlstats/sslocal/sql_stats_test.go | 61 ++++++++++++++++++++++ 5 files changed, 76 insertions(+), 8 deletions(-) diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 986162b7be74..5a90906e9794 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -2182,10 +2182,7 @@ func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) { ex.state.mu.RUnlock() implicit := ex.implicitTxn() - // Transaction received time is the time at which the statement that prompted - // the creation of this transaction was received. - ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionTransactionReceived, - ex.phaseTimes.GetSessionPhaseTime(sessionphase.SessionQueryReceived)) + ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionTransactionStarted, txnStart) ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionFirstStartExecTransaction, timeutil.Now()) ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionMostRecentStartExecTransaction, ex.phaseTimes.GetSessionPhaseTime(sessionphase.SessionFirstStartExecTransaction)) @@ -2281,6 +2278,10 @@ func (ex *connExecutor) recordTransactionFinish( BytesRead: ex.extraTxnState.bytesRead, } + if ex.server.cfg.TestingKnobs.OnRecordTxnFinish != nil { + ex.server.cfg.TestingKnobs.OnRecordTxnFinish(ex.executorType == executorTypeInternal, ex.phaseTimes, ex.planner.stmt.SQL) + } + return ex.statsCollector.RecordTransaction( ctx, transactionFingerprintID, diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 485fa7a0306a..fc519e953a0c 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -86,6 +86,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/sessioninit" + "github.com/cockroachdb/cockroach/pkg/sql/sessionphase" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" @@ -1462,6 +1463,10 @@ type ExecutorTestingKnobs struct { // AfterBackupCheckpoint if set will be called after a BACKUP-CHECKPOINT // is written. AfterBackupCheckpoint func() + + // OnRecordTxnFinish, if set, will be called as we record a transaction + // finishing. + OnRecordTxnFinish func(isInternal bool, phaseTimes *sessionphase.Times, stmt string) } // PGWireTestingKnobs contains knobs for the pgwire module. diff --git a/pkg/sql/sessionphase/session_phase.go b/pkg/sql/sessionphase/session_phase.go index daf5d78e3920..0218003d164a 100644 --- a/pkg/sql/sessionphase/session_phase.go +++ b/pkg/sql/sessionphase/session_phase.go @@ -59,9 +59,9 @@ const ( // have no execution, like SHOW TRANSACTION STATUS. SessionQueryServiced - // SessionTransactionReceived is the SessionPhase when a transaction is - // received. - SessionTransactionReceived + // SessionTransactionStarted is the SessionPhase when a transaction is + // started. + SessionTransactionStarted // SessionFirstStartExecTransaction is the SessionPhase when a transaction // is started for the first time. @@ -197,7 +197,7 @@ func (t *Times) GetTransactionRetryLatency() time.Duration { // GetTransactionServiceLatency returns the total time to service the // transaction. func (t *Times) GetTransactionServiceLatency() time.Duration { - return t.times[SessionEndExecTransaction].Sub(t.times[SessionTransactionReceived]) + return t.times[SessionEndExecTransaction].Sub(t.times[SessionTransactionStarted]) } // GetCommitLatency returns the total time spent for the transaction to diff --git a/pkg/sql/sqlstats/sslocal/BUILD.bazel b/pkg/sql/sqlstats/sslocal/BUILD.bazel index 7e8144f61697..7d3fffa5b144 100644 --- a/pkg/sql/sqlstats/sslocal/BUILD.bazel +++ b/pkg/sql/sqlstats/sslocal/BUILD.bazel @@ -69,6 +69,7 @@ go_test( "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/mon", + "@com_github_jackc_pgx_v4//:pgx", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/sql/sqlstats/sslocal/sql_stats_test.go b/pkg/sql/sqlstats/sslocal/sql_stats_test.go index cf6fd11d8b89..0617e8582d63 100644 --- a/pkg/sql/sqlstats/sslocal/sql_stats_test.go +++ b/pkg/sql/sqlstats/sslocal/sql_stats_test.go @@ -13,7 +13,9 @@ package sslocal_test import ( "context" "math" + "net/url" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -37,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/jackc/pgx/v4" "github.com/stretchr/testify/require" ) @@ -672,3 +675,61 @@ func TestUnprivilegedUserReset(t *testing.T) { require.Contains(t, err.Error(), "requires admin privilege") } + +func TestTransactionServiceLatencyOnExtendedProtocol(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + testData := []*struct { + query string + placeholders []interface{} + phaseTimes *sessionphase.Times + }{ + { + query: "SELECT $1::INT8", + placeholders: []interface{}{1}, + phaseTimes: nil, + }, + } + + waitTxnFinish := make(chan struct{}) + currentTestCaseIdx := 0 + const latencyThreshold = time.Second * 5 + + params, _ := tests.CreateTestServerParams() + params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{ + OnRecordTxnFinish: func(isInternal bool, phaseTimes *sessionphase.Times, stmt string) { + if !isInternal && testData[currentTestCaseIdx].query == stmt { + testData[currentTestCaseIdx].phaseTimes = phaseTimes.Clone() + go func() { + waitTxnFinish <- struct{}{} + }() + } + }, + } + s, _, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + + pgURL, cleanupGoDB := sqlutils.PGUrl( + t, s.ServingSQLAddr(), "StartServer", url.User(username.RootUser)) + defer cleanupGoDB() + c, err := pgx.Connect(ctx, pgURL.String()) + require.NoError(t, err, "error connecting with pg url") + + for currentTestCaseIdx < len(testData) { + tc := testData[currentTestCaseIdx] + // Make extended protocol query + _ = c.QueryRow(ctx, tc.query, tc.placeholders...) + require.NoError(t, err, "error scanning row") + <-waitTxnFinish + + // Ensure test case phase times are populated by query txn. + require.True(t, tc.phaseTimes != nil) + // Ensure SessionTransactionStarted variable is populated. + require.True(t, !tc.phaseTimes.GetSessionPhaseTime(sessionphase.SessionTransactionStarted).IsZero()) + // Ensure compute transaction service latency is within a reasonable threshold. + require.True(t, tc.phaseTimes.GetTransactionServiceLatency() < latencyThreshold) + currentTestCaseIdx++ + } +} From 29b30624cc469a5fbaf377457de07d4e08a0a9c3 Mon Sep 17 00:00:00 2001 From: Matthew Todd Date: Wed, 8 Jun 2022 14:00:00 -0400 Subject: [PATCH 02/10] outliers: un-nest protobuf messages. I started off nesting them, but the generated Go code felt unnecessarily noisy. Note that these are safe moves because these messages are neither persisted nor sent over the wire yet. Release note: None --- pkg/sql/sqlstats/outliers/detector.go | 10 ++++---- pkg/sql/sqlstats/outliers/detector_test.go | 24 +++++++++---------- pkg/sql/sqlstats/outliers/outliers.proto | 28 +++++++++++----------- pkg/sql/sqlstats/outliers/outliers_test.go | 18 +++++++------- pkg/sql/sqlstats/outliers/registry.go | 10 ++++---- 5 files changed, 45 insertions(+), 45 deletions(-) diff --git a/pkg/sql/sqlstats/outliers/detector.go b/pkg/sql/sqlstats/outliers/detector.go index ed304b9b912a..e61e7c3ac778 100644 --- a/pkg/sql/sqlstats/outliers/detector.go +++ b/pkg/sql/sqlstats/outliers/detector.go @@ -20,7 +20,7 @@ import ( type detector interface { enabled() bool - isOutlier(*Outlier_Statement) bool + isOutlier(*Statement) bool } var _ detector = &anyDetector{} @@ -40,7 +40,7 @@ func (a anyDetector) enabled() bool { return false } -func (a anyDetector) isOutlier(statement *Outlier_Statement) bool { +func (a anyDetector) isOutlier(statement *Statement) bool { // Because some detectors may need to observe all statements to build up // their baseline sense of what "normal" is, we avoid short-circuiting. result := false @@ -68,7 +68,7 @@ func (d latencyQuantileDetector) enabled() bool { return LatencyQuantileDetectorEnabled.Get(&d.settings.SV) } -func (d *latencyQuantileDetector) isOutlier(stmt *Outlier_Statement) (decision bool) { +func (d *latencyQuantileDetector) isOutlier(stmt *Statement) (decision bool) { if !d.enabled() { return false } @@ -86,7 +86,7 @@ func (d *latencyQuantileDetector) isOutlier(stmt *Outlier_Statement) (decision b } func (d *latencyQuantileDetector) withFingerprintLatencySummary( - stmt *Outlier_Statement, consumer func(latencySummary *quantile.Stream), + stmt *Statement, consumer func(latencySummary *quantile.Stream), ) { var latencySummary *quantile.Stream @@ -138,6 +138,6 @@ func (l latencyThresholdDetector) enabled() bool { return LatencyThreshold.Get(&l.st.SV) > 0 } -func (l latencyThresholdDetector) isOutlier(s *Outlier_Statement) bool { +func (l latencyThresholdDetector) isOutlier(s *Statement) bool { return l.enabled() && s.LatencyInSeconds >= LatencyThreshold.Get(&l.st.SV).Seconds() } diff --git a/pkg/sql/sqlstats/outliers/detector_test.go b/pkg/sql/sqlstats/outliers/detector_test.go index c4e1b0234916..cbc72919f5cc 100644 --- a/pkg/sql/sqlstats/outliers/detector_test.go +++ b/pkg/sql/sqlstats/outliers/detector_test.go @@ -39,17 +39,17 @@ func TestAnyDetector(t *testing.T) { t.Run("isOutlier is false without any detectors", func(t *testing.T) { detector := &anyDetector{} - require.False(t, detector.isOutlier(&Outlier_Statement{})) + require.False(t, detector.isOutlier(&Statement{})) }) t.Run("isOutlier is false without any concerned detectors", func(t *testing.T) { detector := &anyDetector{[]detector{&fakeDetector{}, &fakeDetector{}}} - require.False(t, detector.isOutlier(&Outlier_Statement{})) + require.False(t, detector.isOutlier(&Statement{})) }) t.Run("isOutlier is true with at least one concerned detector", func(t *testing.T) { detector := &anyDetector{[]detector{&fakeDetector{stubIsOutlier: true}, &fakeDetector{}}} - require.True(t, detector.isOutlier(&Outlier_Statement{})) + require.True(t, detector.isOutlier(&Statement{})) }) t.Run("isOutlier consults all detectors without short-circuiting", func(t *testing.T) { @@ -59,7 +59,7 @@ func TestAnyDetector(t *testing.T) { d1 := &fakeDetector{stubIsOutlier: true} d2 := &fakeDetector{stubIsOutlier: true} detector := &anyDetector{[]detector{d1, d2}} - detector.isOutlier(&Outlier_Statement{}) + detector.isOutlier(&Statement{}) require.True(t, d1.isOutlierCalled, "the first detector should be consulted") require.True(t, d2.isOutlierCalled, "the second detector should be consulted") }) @@ -117,9 +117,9 @@ func TestLatencyQuantileDetector(t *testing.T) { t.Run(test.name, func(t *testing.T) { d := newLatencyQuantileDetector(st, NewMetrics()) for i := 0; i < 1000; i++ { - d.isOutlier(&Outlier_Statement{LatencyInSeconds: test.seedLatency.Seconds()}) + d.isOutlier(&Statement{LatencyInSeconds: test.seedLatency.Seconds()}) } - require.Equal(t, test.isOutlier, d.isOutlier(&Outlier_Statement{LatencyInSeconds: test.candidateLatency.Seconds()})) + require.Equal(t, test.isOutlier, d.isOutlier(&Statement{LatencyInSeconds: test.candidateLatency.Seconds()})) }) } }) @@ -172,7 +172,7 @@ func TestLatencyQuantileDetector(t *testing.T) { d := newLatencyQuantileDetector(st, metrics) // Show the detector `test.fingerprints` distinct fingerprints. for i := 0; i < test.fingerprints; i++ { - d.isOutlier(&Outlier_Statement{ + d.isOutlier(&Statement{ LatencyInSeconds: LatencyQuantileDetectorInterestingThreshold.Get(&st.SV).Seconds(), FingerprintID: roachpb.StmtFingerprintID(i), }) @@ -191,7 +191,7 @@ func BenchmarkLatencyQuantileDetector(b *testing.B) { LatencyQuantileDetectorEnabled.Override(context.Background(), &settings.SV, true) d := newLatencyQuantileDetector(settings, NewMetrics()) for i := 0; i < b.N; i++ { - d.isOutlier(&Outlier_Statement{ + d.isOutlier(&Statement{ LatencyInSeconds: random.Float64(), }) } @@ -212,21 +212,21 @@ func TestLatencyThresholdDetector(t *testing.T) { t.Run("isOutlier false when disabled", func(t *testing.T) { detector := latencyThresholdDetector{st: cluster.MakeTestingClusterSettings()} - require.False(t, detector.isOutlier(&Outlier_Statement{LatencyInSeconds: 1})) + require.False(t, detector.isOutlier(&Statement{LatencyInSeconds: 1})) }) t.Run("isOutlier false when fast enough", func(t *testing.T) { st := cluster.MakeTestingClusterSettings() LatencyThreshold.Override(context.Background(), &st.SV, 1*time.Second) detector := latencyThresholdDetector{st: st} - require.False(t, detector.isOutlier(&Outlier_Statement{LatencyInSeconds: 0.5})) + require.False(t, detector.isOutlier(&Statement{LatencyInSeconds: 0.5})) }) t.Run("isOutlier true beyond threshold", func(t *testing.T) { st := cluster.MakeTestingClusterSettings() LatencyThreshold.Override(context.Background(), &st.SV, 1*time.Second) detector := latencyThresholdDetector{st: st} - require.True(t, detector.isOutlier(&Outlier_Statement{LatencyInSeconds: 1})) + require.True(t, detector.isOutlier(&Statement{LatencyInSeconds: 1})) }) } @@ -240,7 +240,7 @@ func (f fakeDetector) enabled() bool { return f.stubEnabled } -func (f *fakeDetector) isOutlier(_ *Outlier_Statement) bool { +func (f *fakeDetector) isOutlier(_ *Statement) bool { f.isOutlierCalled = true return f.stubIsOutlier } diff --git a/pkg/sql/sqlstats/outliers/outliers.proto b/pkg/sql/sqlstats/outliers/outliers.proto index 3bf539c67d89..d3e0d4907c28 100644 --- a/pkg/sql/sqlstats/outliers/outliers.proto +++ b/pkg/sql/sqlstats/outliers/outliers.proto @@ -14,23 +14,23 @@ option go_package = "outliers"; import "gogoproto/gogo.proto"; -message Outlier { - message Session { - bytes id = 1 [(gogoproto.customname) = "ID"]; - } +message Session { + bytes id = 1 [(gogoproto.customname) = "ID"]; +} - message Transaction { - bytes id = 1 [(gogoproto.customname) = "ID", - (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"]; - } +message Transaction { + bytes id = 1 [(gogoproto.customname) = "ID", + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"]; +} - message Statement { - bytes id = 1 [(gogoproto.customname) = "ID"]; - uint64 fingerprint_id = 2 [(gogoproto.customname) = "FingerprintID", - (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.StmtFingerprintID"]; - double latency_in_seconds = 3; - } +message Statement { + bytes id = 1 [(gogoproto.customname) = "ID"]; + uint64 fingerprint_id = 2 [(gogoproto.customname) = "FingerprintID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.StmtFingerprintID"]; + double latency_in_seconds = 3; +} +message Outlier { Session session = 1; Transaction transaction = 2; Statement statement = 3; diff --git a/pkg/sql/sqlstats/outliers/outliers_test.go b/pkg/sql/sqlstats/outliers/outliers_test.go index 87b2c3ef5090..6a76dad8bb81 100644 --- a/pkg/sql/sqlstats/outliers/outliers_test.go +++ b/pkg/sql/sqlstats/outliers/outliers_test.go @@ -41,13 +41,13 @@ func TestOutliers(t *testing.T) { registry.ObserveTransaction(sessionID, txnID) expected := []*outliers.Outlier{{ - Session: &outliers.Outlier_Session{ + Session: &outliers.Session{ ID: sessionID.GetBytes(), }, - Transaction: &outliers.Outlier_Transaction{ + Transaction: &outliers.Transaction{ ID: &txnID, }, - Statement: &outliers.Outlier_Statement{ + Statement: &outliers.Statement{ ID: stmtID.GetBytes(), FingerprintID: stmtFptID, LatencyInSeconds: 2, @@ -114,25 +114,25 @@ func TestOutliers(t *testing.T) { registry.ObserveTransaction(otherSessionID, otherTxnID) expected := []*outliers.Outlier{{ - Session: &outliers.Outlier_Session{ + Session: &outliers.Session{ ID: sessionID.GetBytes(), }, - Transaction: &outliers.Outlier_Transaction{ + Transaction: &outliers.Transaction{ ID: &txnID, }, - Statement: &outliers.Outlier_Statement{ + Statement: &outliers.Statement{ ID: stmtID.GetBytes(), FingerprintID: stmtFptID, LatencyInSeconds: 2, }, }, { - Session: &outliers.Outlier_Session{ + Session: &outliers.Session{ ID: otherSessionID.GetBytes(), }, - Transaction: &outliers.Outlier_Transaction{ + Transaction: &outliers.Transaction{ ID: &otherTxnID, }, - Statement: &outliers.Outlier_Statement{ + Statement: &outliers.Statement{ ID: otherStmtID.GetBytes(), FingerprintID: otherStmtFptID, LatencyInSeconds: 3, diff --git a/pkg/sql/sqlstats/outliers/registry.go b/pkg/sql/sqlstats/outliers/registry.go index c3ece56bbbf1..3f2092097759 100644 --- a/pkg/sql/sqlstats/outliers/registry.go +++ b/pkg/sql/sqlstats/outliers/registry.go @@ -40,7 +40,7 @@ type registry struct { // before enabling the outliers subsystem by default. mu struct { syncutil.RWMutex - statements map[clusterunique.ID][]*Outlier_Statement + statements map[clusterunique.ID][]*Statement outliers *cache.UnorderedCache } } @@ -59,7 +59,7 @@ func newRegistry(st *cluster.Settings, metrics Metrics) Registry { latencyThresholdDetector{st: st}, newLatencyQuantileDetector(st, metrics), }}} - r.mu.statements = make(map[clusterunique.ID][]*Outlier_Statement) + r.mu.statements = make(map[clusterunique.ID][]*Statement) r.mu.outliers = cache.NewUnorderedCache(config) return r } @@ -75,7 +75,7 @@ func (r *registry) ObserveStatement( } r.mu.Lock() defer r.mu.Unlock() - r.mu.statements[sessionID] = append(r.mu.statements[sessionID], &Outlier_Statement{ + r.mu.statements[sessionID] = append(r.mu.statements[sessionID], &Statement{ ID: statementID.GetBytes(), FingerprintID: statementFingerprintID, LatencyInSeconds: latencyInSeconds, @@ -101,8 +101,8 @@ func (r *registry) ObserveTransaction(sessionID clusterunique.ID, txnID uuid.UUI if hasOutlier { for _, s := range statements { r.mu.outliers.Add(uint128.FromBytes(s.ID), &Outlier{ - Session: &Outlier_Session{ID: sessionID.GetBytes()}, - Transaction: &Outlier_Transaction{ID: &txnID}, + Session: &Session{ID: sessionID.GetBytes()}, + Transaction: &Transaction{ID: &txnID}, Statement: s, }) } From 561fd97a04a72804bc39d07c7c4c1f0882861ba7 Mon Sep 17 00:00:00 2001 From: Matthew Todd Date: Tue, 5 Jul 2022 19:18:23 -0400 Subject: [PATCH 03/10] outliers: pass stmt and txn data as structs This helps set us up for the asynchronous work in #81021, making these objects earlier so we can keep them around in buffers until we're ready to process them. Release note: None --- pkg/sql/sqlstats/outliers/BUILD.bazel | 6 +- pkg/sql/sqlstats/outliers/outliers.go | 11 +-- pkg/sql/sqlstats/outliers/outliers_test.go | 87 +++++++++---------- pkg/sql/sqlstats/outliers/registry.go | 19 +--- .../sqlstats/ssmemstorage/ss_mem_writer.go | 9 +- 5 files changed, 57 insertions(+), 75 deletions(-) diff --git a/pkg/sql/sqlstats/outliers/BUILD.bazel b/pkg/sql/sqlstats/outliers/BUILD.bazel index c4831ea1c17e..128ca3ebe40c 100644 --- a/pkg/sql/sqlstats/outliers/BUILD.bazel +++ b/pkg/sql/sqlstats/outliers/BUILD.bazel @@ -22,7 +22,6 @@ go_library( "//pkg/util/quantile", "//pkg/util/syncutil", "//pkg/util/uint128", - "//pkg/util/uuid", "@com_github_prometheus_client_model//go", ], ) @@ -57,5 +56,8 @@ go_proto_library( importpath = "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/outliers", proto = ":outliers_proto", visibility = ["//visibility:public"], - deps = ["@com_github_gogo_protobuf//gogoproto"], + deps = [ + "//pkg/util/uuid", # keep + "@com_github_gogo_protobuf//gogoproto", + ], ) diff --git a/pkg/sql/sqlstats/outliers/outliers.go b/pkg/sql/sqlstats/outliers/outliers.go index 0f8c8a557843..ceac29c13c4a 100644 --- a/pkg/sql/sqlstats/outliers/outliers.go +++ b/pkg/sql/sqlstats/outliers/outliers.go @@ -14,12 +14,10 @@ import ( "context" "time" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/clusterunique" "github.com/cockroachdb/cockroach/pkg/util/metric" - "github.com/cockroachdb/cockroach/pkg/util/uuid" prometheus "github.com/prometheus/client_model/go" ) @@ -129,15 +127,10 @@ type Reader interface { // exposes the set of currently retained outliers. type Registry interface { // ObserveStatement notifies the registry of a statement execution. - ObserveStatement( - sessionID clusterunique.ID, - statementID clusterunique.ID, - statementFingerprintID roachpb.StmtFingerprintID, - latencyInSeconds float64, - ) + ObserveStatement(sessionID clusterunique.ID, statement *Statement) // ObserveTransaction notifies the registry of the end of a transaction. - ObserveTransaction(sessionID clusterunique.ID, txnID uuid.UUID) + ObserveTransaction(sessionID clusterunique.ID, transaction *Transaction) Reader } diff --git a/pkg/sql/sqlstats/outliers/outliers_test.go b/pkg/sql/sqlstats/outliers/outliers_test.go index 6a76dad8bb81..1b73b8467139 100644 --- a/pkg/sql/sqlstats/outliers/outliers_test.go +++ b/pkg/sql/sqlstats/outliers/outliers_test.go @@ -29,29 +29,26 @@ func TestOutliers(t *testing.T) { ctx := context.Background() sessionID := clusterunique.IDFromBytes([]byte("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")) + session := &outliers.Session{ID: sessionID.GetBytes()} txnID := uuid.FastMakeV4() - stmtID := clusterunique.IDFromBytes([]byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")) - stmtFptID := roachpb.StmtFingerprintID(100) + transaction := &outliers.Transaction{ID: &txnID} + statement := &outliers.Statement{ + ID: []byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"), + FingerprintID: roachpb.StmtFingerprintID(100), + LatencyInSeconds: 2, + } t.Run("detection", func(t *testing.T) { st := cluster.MakeTestingClusterSettings() outliers.LatencyThreshold.Override(ctx, &st.SV, 1*time.Second) registry := outliers.New(st, outliers.NewMetrics()) - registry.ObserveStatement(sessionID, stmtID, stmtFptID, 2) - registry.ObserveTransaction(sessionID, txnID) + registry.ObserveStatement(sessionID, statement) + registry.ObserveTransaction(sessionID, transaction) expected := []*outliers.Outlier{{ - Session: &outliers.Session{ - ID: sessionID.GetBytes(), - }, - Transaction: &outliers.Transaction{ - ID: &txnID, - }, - Statement: &outliers.Statement{ - ID: stmtID.GetBytes(), - FingerprintID: stmtFptID, - LatencyInSeconds: 2, - }, + Session: session, + Transaction: transaction, + Statement: statement, }} var actual []*outliers.Outlier @@ -69,8 +66,8 @@ func TestOutliers(t *testing.T) { st := cluster.MakeTestingClusterSettings() outliers.LatencyThreshold.Override(ctx, &st.SV, 0) registry := outliers.New(st, outliers.NewMetrics()) - registry.ObserveStatement(sessionID, stmtID, stmtFptID, 2) - registry.ObserveTransaction(sessionID, txnID) + registry.ObserveStatement(sessionID, statement) + registry.ObserveTransaction(sessionID, transaction) var actual []*outliers.Outlier registry.IterateOutliers( @@ -85,9 +82,14 @@ func TestOutliers(t *testing.T) { t.Run("too fast", func(t *testing.T) { st := cluster.MakeTestingClusterSettings() outliers.LatencyThreshold.Override(ctx, &st.SV, 1*time.Second) + statement2 := &outliers.Statement{ + ID: []byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"), + FingerprintID: roachpb.StmtFingerprintID(100), + LatencyInSeconds: 0.5, + } registry := outliers.New(st, outliers.NewMetrics()) - registry.ObserveStatement(sessionID, stmtID, stmtFptID, 0.5) - registry.ObserveTransaction(sessionID, txnID) + registry.ObserveStatement(sessionID, statement2) + registry.ObserveTransaction(sessionID, transaction) var actual []*outliers.Outlier registry.IterateOutliers( @@ -101,42 +103,33 @@ func TestOutliers(t *testing.T) { t.Run("buffering statements per session", func(t *testing.T) { otherSessionID := clusterunique.IDFromBytes([]byte("cccccccccccccccccccccccccccccccc")) + otherSession := &outliers.Session{ + ID: otherSessionID.GetBytes(), + } otherTxnID := uuid.FastMakeV4() - otherStmtID := clusterunique.IDFromBytes([]byte("dddddddddddddddddddddddddddddddd")) - otherStmtFptID := roachpb.StmtFingerprintID(101) + otherTransaction := &outliers.Transaction{ID: &otherTxnID} + otherStatement := &outliers.Statement{ + ID: []byte("dddddddddddddddddddddddddddddddd"), + FingerprintID: roachpb.StmtFingerprintID(101), + LatencyInSeconds: 3, + } st := cluster.MakeTestingClusterSettings() outliers.LatencyThreshold.Override(ctx, &st.SV, 1*time.Second) registry := outliers.New(st, outliers.NewMetrics()) - registry.ObserveStatement(sessionID, stmtID, stmtFptID, 2) - registry.ObserveStatement(otherSessionID, otherStmtID, otherStmtFptID, 3) - registry.ObserveTransaction(sessionID, txnID) - registry.ObserveTransaction(otherSessionID, otherTxnID) + registry.ObserveStatement(sessionID, statement) + registry.ObserveStatement(otherSessionID, otherStatement) + registry.ObserveTransaction(sessionID, transaction) + registry.ObserveTransaction(otherSessionID, otherTransaction) expected := []*outliers.Outlier{{ - Session: &outliers.Session{ - ID: sessionID.GetBytes(), - }, - Transaction: &outliers.Transaction{ - ID: &txnID, - }, - Statement: &outliers.Statement{ - ID: stmtID.GetBytes(), - FingerprintID: stmtFptID, - LatencyInSeconds: 2, - }, + Session: session, + Transaction: transaction, + Statement: statement, }, { - Session: &outliers.Session{ - ID: otherSessionID.GetBytes(), - }, - Transaction: &outliers.Transaction{ - ID: &otherTxnID, - }, - Statement: &outliers.Statement{ - ID: otherStmtID.GetBytes(), - FingerprintID: otherStmtFptID, - LatencyInSeconds: 3, - }, + Session: otherSession, + Transaction: otherTransaction, + Statement: otherStatement, }} var actual []*outliers.Outlier registry.IterateOutliers( diff --git a/pkg/sql/sqlstats/outliers/registry.go b/pkg/sql/sqlstats/outliers/registry.go index 3f2092097759..a471ae3382bb 100644 --- a/pkg/sql/sqlstats/outliers/registry.go +++ b/pkg/sql/sqlstats/outliers/registry.go @@ -13,13 +13,11 @@ package outliers import ( "context" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/clusterunique" "github.com/cockroachdb/cockroach/pkg/util/cache" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/uint128" - "github.com/cockroachdb/cockroach/pkg/util/uuid" ) // maxCacheSize is the number of detected outliers we will retain in memory. @@ -64,25 +62,16 @@ func newRegistry(st *cluster.Settings, metrics Metrics) Registry { return r } -func (r *registry) ObserveStatement( - sessionID clusterunique.ID, - statementID clusterunique.ID, - statementFingerprintID roachpb.StmtFingerprintID, - latencyInSeconds float64, -) { +func (r *registry) ObserveStatement(sessionID clusterunique.ID, statement *Statement) { if !r.enabled() { return } r.mu.Lock() defer r.mu.Unlock() - r.mu.statements[sessionID] = append(r.mu.statements[sessionID], &Statement{ - ID: statementID.GetBytes(), - FingerprintID: statementFingerprintID, - LatencyInSeconds: latencyInSeconds, - }) + r.mu.statements[sessionID] = append(r.mu.statements[sessionID], statement) } -func (r *registry) ObserveTransaction(sessionID clusterunique.ID, txnID uuid.UUID) { +func (r *registry) ObserveTransaction(sessionID clusterunique.ID, transaction *Transaction) { if !r.enabled() { return } @@ -102,7 +91,7 @@ func (r *registry) ObserveTransaction(sessionID clusterunique.ID, txnID uuid.UUI for _, s := range statements { r.mu.outliers.Add(uint128.FromBytes(s.ID), &Outlier{ Session: &Session{ID: sessionID.GetBytes()}, - Transaction: &Transaction{ID: &txnID}, + Transaction: transaction, Statement: s, }) } diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go index 5ef4569ffcad..33ed02629f11 100644 --- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/execstats" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" + "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/outliers" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/errors" ) @@ -159,7 +160,11 @@ func (s *Container) RecordStatement( } } - s.outliersRegistry.ObserveStatement(value.SessionID, value.StatementID, stmtFingerprintID, value.ServiceLatency) + s.outliersRegistry.ObserveStatement(value.SessionID, &outliers.Statement{ + ID: value.StatementID.GetBytes(), + FingerprintID: stmtFingerprintID, + LatencyInSeconds: value.ServiceLatency, + }) return stats.ID, nil } @@ -269,7 +274,7 @@ func (s *Container) RecordTransaction( stats.mu.data.ExecStats.MaxDiskUsage.Record(stats.mu.data.ExecStats.Count, float64(value.ExecStats.MaxDiskUsage)) } - s.outliersRegistry.ObserveTransaction(value.SessionID, value.TransactionID) + s.outliersRegistry.ObserveTransaction(value.SessionID, &outliers.Transaction{ID: &value.TransactionID}) return nil } From 67a8cc1654faccb38c8b1264e5d225834564c6a2 Mon Sep 17 00:00:00 2001 From: Matthew Todd Date: Tue, 5 Jul 2022 19:57:42 -0400 Subject: [PATCH 04/10] outliers: use a custom type for Session.ID It's nicer having a type instead of those raw bytes. Release note: None --- pkg/sql/clusterunique/BUILD.bazel | 1 + pkg/sql/clusterunique/id.go | 20 ++++++++++++++++++++ pkg/sql/crdb_internal.go | 2 +- pkg/sql/sqlstats/outliers/outliers.proto | 3 ++- pkg/sql/sqlstats/outliers/outliers_test.go | 8 +++----- pkg/sql/sqlstats/outliers/registry.go | 2 +- 6 files changed, 28 insertions(+), 8 deletions(-) diff --git a/pkg/sql/clusterunique/BUILD.bazel b/pkg/sql/clusterunique/BUILD.bazel index a7a8b1af16d8..121225d5c575 100644 --- a/pkg/sql/clusterunique/BUILD.bazel +++ b/pkg/sql/clusterunique/BUILD.bazel @@ -9,5 +9,6 @@ go_library( "//pkg/base", "//pkg/util/hlc", "//pkg/util/uint128", + "@com_github_cockroachdb_errors//:errors", ], ) diff --git a/pkg/sql/clusterunique/id.go b/pkg/sql/clusterunique/id.go index f8501a393db0..eb2ef2b6e3c2 100644 --- a/pkg/sql/clusterunique/id.go +++ b/pkg/sql/clusterunique/id.go @@ -14,6 +14,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/uint128" + "github.com/cockroachdb/errors" ) // ID represents an identifier that is guaranteed to be unique across @@ -53,3 +54,22 @@ func IDFromBytes(b []byte) ID { func (id ID) GetNodeID() int32 { return int32(0xFFFFFFFF & id.Lo) } + +// Size returns the marshalled size of id in bytes. +func (id ID) Size() int { + return len(id.GetBytes()) +} + +// MarshalTo marshals id to data. +func (id ID) MarshalTo(data []byte) (int, error) { + return copy(data, id.GetBytes()), nil +} + +// Unmarshal unmarshals data to id. +func (id *ID) Unmarshal(data []byte) error { + if len(data) != 16 { + return errors.Errorf("input data %s for uint128 must be 16 bytes", data) + } + id.Uint128 = uint128.FromBytes(data) + return nil +} diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 4b62d98e72f9..f6081b715cca 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -6203,7 +6203,7 @@ CREATE TABLE crdb_internal.node_execution_outliers ( ctx context.Context, o *outliers.Outlier, ) { err = errors.CombineErrors(err, addRow( - tree.NewDString(hex.EncodeToString(o.Session.ID)), + tree.NewDString(hex.EncodeToString(o.Session.ID.GetBytes())), tree.NewDUuid(tree.DUuid{UUID: *o.Transaction.ID}), tree.NewDString(hex.EncodeToString(o.Statement.ID)), tree.NewDBytes(tree.DBytes(sqlstatsutil.EncodeUint64ToBytes(uint64(o.Statement.FingerprintID)))), diff --git a/pkg/sql/sqlstats/outliers/outliers.proto b/pkg/sql/sqlstats/outliers/outliers.proto index d3e0d4907c28..0311851176e1 100644 --- a/pkg/sql/sqlstats/outliers/outliers.proto +++ b/pkg/sql/sqlstats/outliers/outliers.proto @@ -15,7 +15,8 @@ option go_package = "outliers"; import "gogoproto/gogo.proto"; message Session { - bytes id = 1 [(gogoproto.customname) = "ID"]; + bytes id = 1 [(gogoproto.customname) = "ID", + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/sql/clusterunique.ID"]; } message Transaction { diff --git a/pkg/sql/sqlstats/outliers/outliers_test.go b/pkg/sql/sqlstats/outliers/outliers_test.go index 1b73b8467139..2ae31902b251 100644 --- a/pkg/sql/sqlstats/outliers/outliers_test.go +++ b/pkg/sql/sqlstats/outliers/outliers_test.go @@ -29,7 +29,7 @@ func TestOutliers(t *testing.T) { ctx := context.Background() sessionID := clusterunique.IDFromBytes([]byte("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")) - session := &outliers.Session{ID: sessionID.GetBytes()} + session := &outliers.Session{ID: &sessionID} txnID := uuid.FastMakeV4() transaction := &outliers.Transaction{ID: &txnID} statement := &outliers.Statement{ @@ -103,9 +103,7 @@ func TestOutliers(t *testing.T) { t.Run("buffering statements per session", func(t *testing.T) { otherSessionID := clusterunique.IDFromBytes([]byte("cccccccccccccccccccccccccccccccc")) - otherSession := &outliers.Session{ - ID: otherSessionID.GetBytes(), - } + otherSession := &outliers.Session{ID: &otherSessionID} otherTxnID := uuid.FastMakeV4() otherTransaction := &outliers.Transaction{ID: &otherTxnID} otherStatement := &outliers.Statement{ @@ -141,7 +139,7 @@ func TestOutliers(t *testing.T) { // IterateOutliers doesn't specify its iteration order, so we sort here for a stable test. sort.Slice(actual, func(i, j int) bool { - return bytes.Compare(actual[i].Session.ID, actual[j].Session.ID) < 0 + return bytes.Compare(actual[i].Session.ID.GetBytes(), actual[j].Session.ID.GetBytes()) < 0 }) require.Equal(t, expected, actual) diff --git a/pkg/sql/sqlstats/outliers/registry.go b/pkg/sql/sqlstats/outliers/registry.go index a471ae3382bb..642bf4fddef0 100644 --- a/pkg/sql/sqlstats/outliers/registry.go +++ b/pkg/sql/sqlstats/outliers/registry.go @@ -90,7 +90,7 @@ func (r *registry) ObserveTransaction(sessionID clusterunique.ID, transaction *T if hasOutlier { for _, s := range statements { r.mu.outliers.Add(uint128.FromBytes(s.ID), &Outlier{ - Session: &Session{ID: sessionID.GetBytes()}, + Session: &Session{ID: &sessionID}, Transaction: transaction, Statement: s, }) From c393aac489f06986a4f43bcab68600fad2ef865b Mon Sep 17 00:00:00 2001 From: Matthew Todd Date: Tue, 5 Jul 2022 20:30:07 -0400 Subject: [PATCH 05/10] outliers: use a custom type for Statement.ID It's nicer having a type instead of those raw bytes. Release note: None --- pkg/sql/crdb_internal.go | 2 +- pkg/sql/sqlstats/outliers/BUILD.bazel | 1 - pkg/sql/sqlstats/outliers/outliers.proto | 3 ++- pkg/sql/sqlstats/outliers/outliers_test.go | 9 ++++++--- pkg/sql/sqlstats/outliers/registry.go | 3 +-- pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go | 2 +- 6 files changed, 11 insertions(+), 9 deletions(-) diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index f6081b715cca..675f1b058b0c 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -6205,7 +6205,7 @@ CREATE TABLE crdb_internal.node_execution_outliers ( err = errors.CombineErrors(err, addRow( tree.NewDString(hex.EncodeToString(o.Session.ID.GetBytes())), tree.NewDUuid(tree.DUuid{UUID: *o.Transaction.ID}), - tree.NewDString(hex.EncodeToString(o.Statement.ID)), + tree.NewDString(hex.EncodeToString(o.Statement.ID.GetBytes())), tree.NewDBytes(tree.DBytes(sqlstatsutil.EncodeUint64ToBytes(uint64(o.Statement.FingerprintID)))), )) }) diff --git a/pkg/sql/sqlstats/outliers/BUILD.bazel b/pkg/sql/sqlstats/outliers/BUILD.bazel index 128ca3ebe40c..20d6b3991452 100644 --- a/pkg/sql/sqlstats/outliers/BUILD.bazel +++ b/pkg/sql/sqlstats/outliers/BUILD.bazel @@ -21,7 +21,6 @@ go_library( "//pkg/util/metric", "//pkg/util/quantile", "//pkg/util/syncutil", - "//pkg/util/uint128", "@com_github_prometheus_client_model//go", ], ) diff --git a/pkg/sql/sqlstats/outliers/outliers.proto b/pkg/sql/sqlstats/outliers/outliers.proto index 0311851176e1..35c149733a6a 100644 --- a/pkg/sql/sqlstats/outliers/outliers.proto +++ b/pkg/sql/sqlstats/outliers/outliers.proto @@ -25,7 +25,8 @@ message Transaction { } message Statement { - bytes id = 1 [(gogoproto.customname) = "ID"]; + bytes id = 1 [(gogoproto.customname) = "ID", + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/sql/clusterunique.ID"]; uint64 fingerprint_id = 2 [(gogoproto.customname) = "FingerprintID", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.StmtFingerprintID"]; double latency_in_seconds = 3; diff --git a/pkg/sql/sqlstats/outliers/outliers_test.go b/pkg/sql/sqlstats/outliers/outliers_test.go index 2ae31902b251..094149c9502f 100644 --- a/pkg/sql/sqlstats/outliers/outliers_test.go +++ b/pkg/sql/sqlstats/outliers/outliers_test.go @@ -32,8 +32,9 @@ func TestOutliers(t *testing.T) { session := &outliers.Session{ID: &sessionID} txnID := uuid.FastMakeV4() transaction := &outliers.Transaction{ID: &txnID} + statementID := clusterunique.IDFromBytes([]byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")) statement := &outliers.Statement{ - ID: []byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"), + ID: &statementID, FingerprintID: roachpb.StmtFingerprintID(100), LatencyInSeconds: 2, } @@ -82,8 +83,9 @@ func TestOutliers(t *testing.T) { t.Run("too fast", func(t *testing.T) { st := cluster.MakeTestingClusterSettings() outliers.LatencyThreshold.Override(ctx, &st.SV, 1*time.Second) + statement2ID := clusterunique.IDFromBytes([]byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")) statement2 := &outliers.Statement{ - ID: []byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"), + ID: &statement2ID, FingerprintID: roachpb.StmtFingerprintID(100), LatencyInSeconds: 0.5, } @@ -106,8 +108,9 @@ func TestOutliers(t *testing.T) { otherSession := &outliers.Session{ID: &otherSessionID} otherTxnID := uuid.FastMakeV4() otherTransaction := &outliers.Transaction{ID: &otherTxnID} + otherStatementID := clusterunique.IDFromBytes([]byte("dddddddddddddddddddddddddddddddd")) otherStatement := &outliers.Statement{ - ID: []byte("dddddddddddddddddddddddddddddddd"), + ID: &otherStatementID, FingerprintID: roachpb.StmtFingerprintID(101), LatencyInSeconds: 3, } diff --git a/pkg/sql/sqlstats/outliers/registry.go b/pkg/sql/sqlstats/outliers/registry.go index 642bf4fddef0..8798e662f7a3 100644 --- a/pkg/sql/sqlstats/outliers/registry.go +++ b/pkg/sql/sqlstats/outliers/registry.go @@ -17,7 +17,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/clusterunique" "github.com/cockroachdb/cockroach/pkg/util/cache" "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/cockroachdb/cockroach/pkg/util/uint128" ) // maxCacheSize is the number of detected outliers we will retain in memory. @@ -89,7 +88,7 @@ func (r *registry) ObserveTransaction(sessionID clusterunique.ID, transaction *T if hasOutlier { for _, s := range statements { - r.mu.outliers.Add(uint128.FromBytes(s.ID), &Outlier{ + r.mu.outliers.Add(s.ID, &Outlier{ Session: &Session{ID: &sessionID}, Transaction: transaction, Statement: s, diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go index 33ed02629f11..6b1a13f737de 100644 --- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go @@ -161,7 +161,7 @@ func (s *Container) RecordStatement( } s.outliersRegistry.ObserveStatement(value.SessionID, &outliers.Statement{ - ID: value.StatementID.GetBytes(), + ID: &value.StatementID, FingerprintID: stmtFingerprintID, LatencyInSeconds: value.ServiceLatency, }) From ccb1eacfaba2f48f1e4d6c868459c76f3bfb69c8 Mon Sep 17 00:00:00 2001 From: Matthew Todd Date: Tue, 5 Jul 2022 20:44:14 -0400 Subject: [PATCH 06/10] outliers: require session, statement, and transaction ids Release note: None --- pkg/sql/crdb_internal.go | 2 +- pkg/sql/sqlstats/outliers/outliers.proto | 9 ++-- pkg/sql/sqlstats/outliers/outliers_test.go | 41 ++++++++----------- pkg/sql/sqlstats/outliers/registry.go | 2 +- .../sqlstats/ssmemstorage/ss_mem_writer.go | 4 +- 5 files changed, 27 insertions(+), 31 deletions(-) diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 675f1b058b0c..088e571f8251 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -6204,7 +6204,7 @@ CREATE TABLE crdb_internal.node_execution_outliers ( ) { err = errors.CombineErrors(err, addRow( tree.NewDString(hex.EncodeToString(o.Session.ID.GetBytes())), - tree.NewDUuid(tree.DUuid{UUID: *o.Transaction.ID}), + tree.NewDUuid(tree.DUuid{UUID: o.Transaction.ID}), tree.NewDString(hex.EncodeToString(o.Statement.ID.GetBytes())), tree.NewDBytes(tree.DBytes(sqlstatsutil.EncodeUint64ToBytes(uint64(o.Statement.FingerprintID)))), )) diff --git a/pkg/sql/sqlstats/outliers/outliers.proto b/pkg/sql/sqlstats/outliers/outliers.proto index 35c149733a6a..28becb347fb0 100644 --- a/pkg/sql/sqlstats/outliers/outliers.proto +++ b/pkg/sql/sqlstats/outliers/outliers.proto @@ -16,17 +16,20 @@ import "gogoproto/gogo.proto"; message Session { bytes id = 1 [(gogoproto.customname) = "ID", - (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/sql/clusterunique.ID"]; + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/sql/clusterunique.ID", + (gogoproto.nullable) = false]; } message Transaction { bytes id = 1 [(gogoproto.customname) = "ID", - (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"]; + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID", + (gogoproto.nullable) = false]; } message Statement { bytes id = 1 [(gogoproto.customname) = "ID", - (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/sql/clusterunique.ID"]; + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/sql/clusterunique.ID", + (gogoproto.nullable) = false]; uint64 fingerprint_id = 2 [(gogoproto.customname) = "FingerprintID", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.StmtFingerprintID"]; double latency_in_seconds = 3; diff --git a/pkg/sql/sqlstats/outliers/outliers_test.go b/pkg/sql/sqlstats/outliers/outliers_test.go index 094149c9502f..2ad09ecde62b 100644 --- a/pkg/sql/sqlstats/outliers/outliers_test.go +++ b/pkg/sql/sqlstats/outliers/outliers_test.go @@ -28,13 +28,10 @@ import ( func TestOutliers(t *testing.T) { ctx := context.Background() - sessionID := clusterunique.IDFromBytes([]byte("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")) - session := &outliers.Session{ID: &sessionID} - txnID := uuid.FastMakeV4() - transaction := &outliers.Transaction{ID: &txnID} - statementID := clusterunique.IDFromBytes([]byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")) + session := &outliers.Session{ID: clusterunique.IDFromBytes([]byte("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"))} + transaction := &outliers.Transaction{ID: uuid.FastMakeV4()} statement := &outliers.Statement{ - ID: &statementID, + ID: clusterunique.IDFromBytes([]byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")), FingerprintID: roachpb.StmtFingerprintID(100), LatencyInSeconds: 2, } @@ -43,8 +40,8 @@ func TestOutliers(t *testing.T) { st := cluster.MakeTestingClusterSettings() outliers.LatencyThreshold.Override(ctx, &st.SV, 1*time.Second) registry := outliers.New(st, outliers.NewMetrics()) - registry.ObserveStatement(sessionID, statement) - registry.ObserveTransaction(sessionID, transaction) + registry.ObserveStatement(session.ID, statement) + registry.ObserveTransaction(session.ID, transaction) expected := []*outliers.Outlier{{ Session: session, @@ -67,8 +64,8 @@ func TestOutliers(t *testing.T) { st := cluster.MakeTestingClusterSettings() outliers.LatencyThreshold.Override(ctx, &st.SV, 0) registry := outliers.New(st, outliers.NewMetrics()) - registry.ObserveStatement(sessionID, statement) - registry.ObserveTransaction(sessionID, transaction) + registry.ObserveStatement(session.ID, statement) + registry.ObserveTransaction(session.ID, transaction) var actual []*outliers.Outlier registry.IterateOutliers( @@ -83,15 +80,14 @@ func TestOutliers(t *testing.T) { t.Run("too fast", func(t *testing.T) { st := cluster.MakeTestingClusterSettings() outliers.LatencyThreshold.Override(ctx, &st.SV, 1*time.Second) - statement2ID := clusterunique.IDFromBytes([]byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")) statement2 := &outliers.Statement{ - ID: &statement2ID, + ID: clusterunique.IDFromBytes([]byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")), FingerprintID: roachpb.StmtFingerprintID(100), LatencyInSeconds: 0.5, } registry := outliers.New(st, outliers.NewMetrics()) - registry.ObserveStatement(sessionID, statement2) - registry.ObserveTransaction(sessionID, transaction) + registry.ObserveStatement(session.ID, statement2) + registry.ObserveTransaction(session.ID, transaction) var actual []*outliers.Outlier registry.IterateOutliers( @@ -104,13 +100,10 @@ func TestOutliers(t *testing.T) { }) t.Run("buffering statements per session", func(t *testing.T) { - otherSessionID := clusterunique.IDFromBytes([]byte("cccccccccccccccccccccccccccccccc")) - otherSession := &outliers.Session{ID: &otherSessionID} - otherTxnID := uuid.FastMakeV4() - otherTransaction := &outliers.Transaction{ID: &otherTxnID} - otherStatementID := clusterunique.IDFromBytes([]byte("dddddddddddddddddddddddddddddddd")) + otherSession := &outliers.Session{ID: clusterunique.IDFromBytes([]byte("cccccccccccccccccccccccccccccccc"))} + otherTransaction := &outliers.Transaction{ID: uuid.FastMakeV4()} otherStatement := &outliers.Statement{ - ID: &otherStatementID, + ID: clusterunique.IDFromBytes([]byte("dddddddddddddddddddddddddddddddd")), FingerprintID: roachpb.StmtFingerprintID(101), LatencyInSeconds: 3, } @@ -118,10 +111,10 @@ func TestOutliers(t *testing.T) { st := cluster.MakeTestingClusterSettings() outliers.LatencyThreshold.Override(ctx, &st.SV, 1*time.Second) registry := outliers.New(st, outliers.NewMetrics()) - registry.ObserveStatement(sessionID, statement) - registry.ObserveStatement(otherSessionID, otherStatement) - registry.ObserveTransaction(sessionID, transaction) - registry.ObserveTransaction(otherSessionID, otherTransaction) + registry.ObserveStatement(session.ID, statement) + registry.ObserveStatement(otherSession.ID, otherStatement) + registry.ObserveTransaction(session.ID, transaction) + registry.ObserveTransaction(otherSession.ID, otherTransaction) expected := []*outliers.Outlier{{ Session: session, diff --git a/pkg/sql/sqlstats/outliers/registry.go b/pkg/sql/sqlstats/outliers/registry.go index 8798e662f7a3..002defbe1405 100644 --- a/pkg/sql/sqlstats/outliers/registry.go +++ b/pkg/sql/sqlstats/outliers/registry.go @@ -89,7 +89,7 @@ func (r *registry) ObserveTransaction(sessionID clusterunique.ID, transaction *T if hasOutlier { for _, s := range statements { r.mu.outliers.Add(s.ID, &Outlier{ - Session: &Session{ID: &sessionID}, + Session: &Session{ID: sessionID}, Transaction: transaction, Statement: s, }) diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go index 6b1a13f737de..9691c0cc194b 100644 --- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go @@ -161,7 +161,7 @@ func (s *Container) RecordStatement( } s.outliersRegistry.ObserveStatement(value.SessionID, &outliers.Statement{ - ID: &value.StatementID, + ID: value.StatementID, FingerprintID: stmtFingerprintID, LatencyInSeconds: value.ServiceLatency, }) @@ -274,7 +274,7 @@ func (s *Container) RecordTransaction( stats.mu.data.ExecStats.MaxDiskUsage.Record(stats.mu.data.ExecStats.Count, float64(value.ExecStats.MaxDiskUsage)) } - s.outliersRegistry.ObserveTransaction(value.SessionID, &outliers.Transaction{ID: &value.TransactionID}) + s.outliersRegistry.ObserveTransaction(value.SessionID, &outliers.Transaction{ID: value.TransactionID}) return nil } From faa81afb4c0a5160dbaec88f3234b9dca519ef85 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 6 Jul 2022 19:44:00 -0700 Subject: [PATCH 07/10] cmd/dev: add generate execgen subcommand Release note: None --- dev | 2 +- pkg/cmd/dev/generate.go | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/dev b/dev index b6161ef7bf6f..796d134d2972 100755 --- a/dev +++ b/dev @@ -3,7 +3,7 @@ set -euo pipefail # Bump this counter to force rebuilding `dev` on all machines. -DEV_VERSION=39 +DEV_VERSION=40 THIS_DIR=$(cd "$(dirname "$0")" && pwd) BINARY_DIR=$THIS_DIR/bin/dev-versions diff --git a/pkg/cmd/dev/generate.go b/pkg/cmd/dev/generate.go index c06018148a92..baff06353719 100644 --- a/pkg/cmd/dev/generate.go +++ b/pkg/cmd/dev/generate.go @@ -37,6 +37,7 @@ func makeGenerateCmd(runE func(cmd *cobra.Command, args []string) error) *cobra. dev generate bazel # DEPS.bzl and BUILD.bazel files dev generate cgo # files that help non-Bazel systems (IDEs, go) link to our C dependencies dev generate docs # generates documentation + dev generate execgen # generates execgen go code (subset of 'dev generate go') dev generate go # generates go code (execgen, stringer, protobufs, etc.), plus everything 'cgo' generates dev generate go_nocgo # generates go code (execgen, stringer, protobufs, etc.) dev generate protobuf # *.pb.go files (subset of 'dev generate go') @@ -63,6 +64,7 @@ func (d *dev) generate(cmd *cobra.Command, targets []string) error { "bazel": d.generateBazel, "cgo": d.generateCgo, "docs": d.generateDocs, + "execgen": d.generateExecgen, "go": d.generateGo, "go_nocgo": d.generateGoNoCgo, "protobuf": d.generateProtobuf, @@ -154,6 +156,10 @@ func (d *dev) generateDocs(cmd *cobra.Command) error { return d.generateRedactSafe(ctx) } +func (d *dev) generateExecgen(cmd *cobra.Command) error { + return d.generateTarget(cmd.Context(), "//pkg/gen:execgen") +} + func (d *dev) generateGoAndDocs(cmd *cobra.Command) error { ctx := cmd.Context() if err := d.generateTarget(ctx, "//pkg/gen"); err != nil { From 5ba670c86d8b8f3d33518d2fd92fefbc1b057c36 Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Thu, 7 Jul 2022 08:49:07 +0100 Subject: [PATCH 08/10] streamingccl: unskip TestTenantStreaming This makes a couple of changes aimed at unskipping TestTenantStreaming: - Fix a nil pointer access in our stream status verification function. We changed the name of key that this function was accessing. This NPE was hidden by another panic along the unclean shutdown path in the server. - Lower various intervals so that this test doesn't take 90 seconds. I've run this under stress for a few hundred iterations without error. Release note: None --- .../streamingest/stream_ingestion_job_test.go | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go index 259fab7f24e8..982b2b0efa54 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go @@ -45,9 +45,10 @@ import ( ) func verifyIngestionStats(t *testing.T, streamID int64, cutoverTime time.Time, stats string) { - fetchValueKey := func(j json2.JSON, key string) json2.JSON { + fetchRequiredValueKey := func(j json2.JSON, key string) json2.JSON { val, err := j.FetchValKey(key) require.NoError(t, err) + require.NotNilf(t, val, "expected key %q to in json %q", key, j) return val } @@ -60,26 +61,25 @@ func verifyIngestionStats(t *testing.T, streamID int64, cutoverTime time.Time, s statsJSON, err := json2.ParseJSON(stats) require.NoError(t, err) - ingestionProgress := fetchValueKey(statsJSON, "ingestion_progress") + ingestionProgress := fetchRequiredValueKey(statsJSON, "ingestion_progress") require.Equal(t, cutoverTime.UnixNano(), - parseInt64(fetchValueKey(fetchValueKey(ingestionProgress, "cutover_time"), "wall_time").String())) + parseInt64(fetchRequiredValueKey(fetchRequiredValueKey(ingestionProgress, "cutover_time"), "wall_time").String())) - partitionProgressIter, err := fetchValueKey(ingestionProgress, "partition_progress").ObjectIter() + partitionProgressIter, err := fetchRequiredValueKey(ingestionProgress, "partition_progress").ObjectIter() require.NoError(t, err) for partitionProgressIter.Next() { - require.Less(t, cutoverTime.UnixNano(), parseInt64(fetchValueKey(fetchValueKey( + require.Less(t, cutoverTime.UnixNano(), parseInt64(fetchRequiredValueKey(fetchRequiredValueKey( partitionProgressIter.Value(), "ingested_timestamp"), "wall_time").String())) } require.Equal(t, strconv.Itoa(int(streampb.StreamReplicationStatus_STREAM_INACTIVE)), - fetchValueKey(fetchValueKey(statsJSON, "stream_replication_status"), "stream_status").String()) + fetchRequiredValueKey(fetchRequiredValueKey(statsJSON, "producer_status"), "stream_status").String()) } // TestTenantStreaming tests that tenants can stream changes end-to-end. func TestTenantStreaming(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.WithIssue(t, 83697) skip.UnderRace(t, "slow under race") @@ -120,6 +120,7 @@ SET CLUSTER SETTING kv.rangefeed.enabled = true; SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s'; SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms'; SET CLUSTER SETTING stream_replication.min_checkpoint_frequency = '1s'; +SET CLUSTER SETTING stream_replication.stream_liveness_track_frequency = '500ms'; `, ";")...) @@ -129,14 +130,19 @@ SET CLUSTER SETTING stream_replication.min_checkpoint_frequency = '1s'; // is required. Tracked with #76378. // TODO(ajstorm): This may be the right course of action here as the // replication is now being run inside a tenant. - base.TestServerArgs{DisableDefaultTestTenant: true}, + base.TestServerArgs{ + DisableDefaultTestTenant: true, + Knobs: base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + }, + }, roachpb.MakeTenantID(20)) defer cleanupDest() // destSQL refers to the system tenant as that's the one that's running the // job. destSQL := hDest.SysDB destSQL.ExecMultiple(t, strings.Split(` -SET CLUSTER SETTING stream_replication.consumer_heartbeat_frequency = '2s'; +SET CLUSTER SETTING stream_replication.consumer_heartbeat_frequency = '100ms'; SET CLUSTER SETTING bulkio.stream_ingestion.minimum_flush_interval = '500ms'; SET CLUSTER SETTING bulkio.stream_ingestion.cutover_signal_poll_interval = '100ms'; SET CLUSTER SETTING streaming.partition_progress_frequency = '100ms'; From 75971c3831d2557122c9fafb381a8b6d131d8497 Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Thu, 7 Jul 2022 12:31:31 +0100 Subject: [PATCH 09/10] streamingccl: small logging and tracing cleanups - Add a tracing span to ingestion job cutover. - Move a particularly noisy log message to VInfo(3). - Prefer log.VInfo to `if log.V(n) {}` in cases where we aren't doing expensive argument construction. Release note: None --- .../streamingest/stream_ingestion_job.go | 4 ++++ .../streamingest/stream_ingestion_processor.go | 2 +- .../streamingccl/streamproducer/event_stream.go | 16 ++++------------ 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index 4fc803389f89..b40d956b7a62 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" ) @@ -191,6 +192,9 @@ func (s *streamIngestionResumer) Resume(resumeCtx context.Context, execCtx inter func revertToCutoverTimestamp( ctx context.Context, execCtx interface{}, ingestionJobID jobspb.JobID, ) error { + ctx, span := tracing.ChildSpan(ctx, "streamingest.revertToCutoverTimestamp") + defer span.Finish() + p := execCtx.(sql.JobExecContext) db := p.ExecCfg().DB j, err := p.ExecCfg().JobRegistry.LoadJob(ctx, ingestionJobID) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index 80c76c76966c..4ff778c33684 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -591,7 +591,7 @@ func (sip *streamIngestionProcessor) bufferKV(kv *roachpb.KeyValue) error { } func (sip *streamIngestionProcessor) bufferCheckpoint(event partitionEvent) error { - log.Infof(sip.Ctx, "got checkpoint %v", event.GetResolved()) + log.VInfof(sip.Ctx, 3, "got checkpoint %v", event.GetResolved()) resolvedTimePtr := event.GetResolved() if resolvedTimePtr == nil { return errors.New("checkpoint event expected to have a resolved timestamp") diff --git a/pkg/ccl/streamingccl/streamproducer/event_stream.go b/pkg/ccl/streamingccl/streamproducer/event_stream.go index fda74cf22210..c72fc9aeb435 100644 --- a/pkg/ccl/streamingccl/streamproducer/event_stream.go +++ b/pkg/ccl/streamingccl/streamproducer/event_stream.go @@ -223,9 +223,7 @@ func (s *eventStream) onValue(ctx context.Context, value *roachpb.RangeFeedValue select { case <-ctx.Done(): case s.eventsCh <- roachpb.RangeFeedEvent{Val: value}: - if log.V(1) { - log.Infof(ctx, "onValue: %s@%s", value.Key, value.Value.Timestamp) - } + log.VInfof(ctx, 1, "onValue: %s@%s", value.Key, value.Value.Timestamp) } } @@ -233,9 +231,7 @@ func (s *eventStream) onCheckpoint(ctx context.Context, checkpoint *roachpb.Rang select { case <-ctx.Done(): case s.eventsCh <- roachpb.RangeFeedEvent{Checkpoint: checkpoint}: - if log.V(1) { - log.Infof(ctx, "onCheckpoint: %s@%s", checkpoint.Span, checkpoint.ResolvedTS) - } + log.VInfof(ctx, 1, "onCheckpoint: %s@%s", checkpoint.Span, checkpoint.ResolvedTS) } } @@ -248,9 +244,7 @@ func (s *eventStream) onSpanCompleted(ctx context.Context, sp roachpb.Span) erro case <-ctx.Done(): return ctx.Err() case s.eventsCh <- roachpb.RangeFeedEvent{Checkpoint: &checkpoint}: - if log.V(1) { - log.Infof(ctx, "onSpanCompleted: %s@%s", checkpoint.Span, checkpoint.ResolvedTS) - } + log.VInfof(ctx, 1, "onSpanCompleted: %s@%s", checkpoint.Span, checkpoint.ResolvedTS) return nil } } @@ -259,9 +253,7 @@ func (s *eventStream) onSSTable(ctx context.Context, sst *roachpb.RangeFeedSSTab select { case <-ctx.Done(): case s.eventsCh <- roachpb.RangeFeedEvent{SST: sst}: - if log.V(1) { - log.Infof(ctx, "onSSTable: %s@%s", sst.Span, sst.WriteTS) - } + log.VInfof(ctx, 1, "onSSTable: %s@%s", sst.Span, sst.WriteTS) } } From fdc636b5573ee7777add94777885b2c6a034b0c9 Mon Sep 17 00:00:00 2001 From: Michael Erickson Date: Mon, 20 Jun 2022 15:57:03 -0700 Subject: [PATCH 10/10] sql/stats: conversion of datums to and from quantile function values To predict histograms in statistics forecasts, we will use linear regression over quantile functions. (Quantile functions are another representation of histogram data, in a form more amenable to statistical manipulation.) The conversion of histograms to quantile functions will require conversion of histogram bounds (datums) to quantile values (float64s). And likewise, the inverse conversion from quantile functions back to histograms will require the inverse conversion of float64 quantile values back to datums. These conversions are a little different from our usual SQL conversions in `eval.PerformCast`, so we add them to a new quantile file in the `sql/stats` module. This code was originally part of #77070 but has been pulled out to simplify that PR. A few changes have been made: - `histogramValue` has been renamed to `FromQuantileValue`. - Support for `DECIMAL`, `TIME`, `TIMETZ`, and `INTERVAL` has been dropped. Clamping these types in `FromQuantileValue` was too complex for the first iteration of statistics forecasting. We expect the overwhelming majority of ascending keys to use `INT` or `TIMESTAMP` types. - Bugs in `FLOAT4`, `TIMESTAMP` and `TIMESTAMPTZ` conversions have been fixed. - We're now clamping timestamps to slightly tighter bounds to avoid the problems with infinite timestamps (see #41564). Assists: #79872 Release note: None --- pkg/sql/sem/tree/datum.go | 36 +- pkg/sql/stats/BUILD.bazel | 4 + pkg/sql/stats/quantile.go | 173 +++++++++ pkg/sql/stats/quantile_test.go | 573 +++++++++++++++++++++++++++++ pkg/util/timeutil/pgdate/pgdate.go | 11 + 5 files changed, 783 insertions(+), 14 deletions(-) create mode 100644 pkg/sql/stats/quantile.go create mode 100644 pkg/sql/stats/quantile_test.go diff --git a/pkg/sql/sem/tree/datum.go b/pkg/sql/sem/tree/datum.go index be05c210f181..527a311427f6 100644 --- a/pkg/sql/sem/tree/datum.go +++ b/pkg/sql/sem/tree/datum.go @@ -924,7 +924,7 @@ func (d *DFloat) Prev(ctx CompareContext) (Datum, bool) { return nil, false } if f == math.Inf(-1) { - return dNaNFloat, true + return DNaNFloat, true } return NewDFloat(DFloat(math.Nextafter(f, math.Inf(-1)))), true } @@ -933,7 +933,7 @@ func (d *DFloat) Prev(ctx CompareContext) (Datum, bool) { func (d *DFloat) Next(ctx CompareContext) (Datum, bool) { f := float64(*d) if math.IsNaN(f) { - return dNegInfFloat, true + return DNegInfFloat, true } if f == math.Inf(+1) { return nil, false @@ -941,14 +941,20 @@ func (d *DFloat) Next(ctx CompareContext) (Datum, bool) { return NewDFloat(DFloat(math.Nextafter(f, math.Inf(+1)))), true } -var dZeroFloat = NewDFloat(0.0) -var dPosInfFloat = NewDFloat(DFloat(math.Inf(+1))) -var dNegInfFloat = NewDFloat(DFloat(math.Inf(-1))) -var dNaNFloat = NewDFloat(DFloat(math.NaN())) +var ( + // DZeroFloat is the DFloat for zero. + DZeroFloat = NewDFloat(0) + // DPosInfFloat is the DFloat for positive infinity. + DPosInfFloat = NewDFloat(DFloat(math.Inf(+1))) + // DNegInfFloat is the DFloat for negative infinity. + DNegInfFloat = NewDFloat(DFloat(math.Inf(-1))) + // DNaNFloat is the DFloat for NaN. + DNaNFloat = NewDFloat(DFloat(math.NaN())) +) // IsMax implements the Datum interface. func (d *DFloat) IsMax(ctx CompareContext) bool { - return *d == *dPosInfFloat + return *d == *DPosInfFloat } // IsMin implements the Datum interface. @@ -958,12 +964,12 @@ func (d *DFloat) IsMin(ctx CompareContext) bool { // Max implements the Datum interface. func (d *DFloat) Max(ctx CompareContext) (Datum, bool) { - return dPosInfFloat, true + return DPosInfFloat, true } // Min implements the Datum interface. func (d *DFloat) Min(ctx CompareContext) (Datum, bool) { - return dNaNFloat, true + return DNaNFloat, true } // AmbiguousFormat implements the Datum interface. @@ -2553,7 +2559,8 @@ func MustMakeDTimestamp(t time.Time, precision time.Duration) *DTimestamp { return ret } -var dZeroTimestamp = &DTimestamp{} +// DZeroTimestamp is the zero-valued DTimestamp. +var DZeroTimestamp = &DTimestamp{} // time.Time formats. const ( @@ -2868,7 +2875,8 @@ func ParseDTimestampTZ( return d, dependsOnContext, err } -var dZeroTimestampTZ = &DTimestampTZ{} +// DZeroTimestampTZ is the zero-valued DTimestampTZ. +var DZeroTimestampTZ = &DTimestampTZ{} // AsDTimestampTZ attempts to retrieve a DTimestampTZ from an Expr, returning a // DTimestampTZ and a flag signifying whether the assertion was successful. The @@ -5303,13 +5311,13 @@ func NewDefaultDatum(collationEnv *CollationEnvironment, t *types.T) (d Datum, e case types.IntFamily: return DZero, nil case types.FloatFamily: - return dZeroFloat, nil + return DZeroFloat, nil case types.DecimalFamily: return dZeroDecimal, nil case types.DateFamily: return dEpochDate, nil case types.TimestampFamily: - return dZeroTimestamp, nil + return DZeroTimestamp, nil case types.IntervalFamily: return dZeroInterval, nil case types.StringFamily: @@ -5317,7 +5325,7 @@ func NewDefaultDatum(collationEnv *CollationEnvironment, t *types.T) (d Datum, e case types.BytesFamily: return dEmptyBytes, nil case types.TimestampTZFamily: - return dZeroTimestampTZ, nil + return DZeroTimestampTZ, nil case types.CollatedStringFamily: return NewDCollatedString("", t.Locale(), collationEnv) case types.OidFamily: diff --git a/pkg/sql/stats/BUILD.bazel b/pkg/sql/stats/BUILD.bazel index 7d818e5ac120..fac5b36b18df 100644 --- a/pkg/sql/stats/BUILD.bazel +++ b/pkg/sql/stats/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "histogram.go", "json.go", "new_stat.go", + "quantile.go", "row_sampling.go", "stats_cache.go", ], @@ -52,6 +53,7 @@ go_library( "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/timeutil", + "//pkg/util/timeutil/pgdate", "//pkg/util/tracing", "@com_github_cockroachdb_errors//:errors", ], @@ -67,6 +69,7 @@ go_test( "delete_stats_test.go", "histogram_test.go", "main_test.go", + "quantile_test.go", "row_sampling_test.go", "stats_cache_test.go", ], @@ -117,6 +120,7 @@ go_test( "//pkg/util/randutil", "//pkg/util/retry", "//pkg/util/timeutil", + "//pkg/util/timeutil/pgdate", "@com_github_cockroachdb_errors//:errors", ], ) diff --git a/pkg/sql/stats/quantile.go b/pkg/sql/stats/quantile.go new file mode 100644 index 000000000000..93dd8a2b656c --- /dev/null +++ b/pkg/sql/stats/quantile.go @@ -0,0 +1,173 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package stats + +import ( + "math" + "time" + + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil/pgdate" + "github.com/cockroachdb/errors" +) + +// CanMakeQuantile returns true if a quantile function can be created for a +// histogram of the given type. +// TODO(michae2): Add support for DECIMAL, TIME, TIMETZ, and INTERVAL. +func CanMakeQuantile(colType *types.T) bool { + if colType.UserDefined() { + return false + } + switch colType.Family() { + case types.IntFamily, + types.FloatFamily, + types.DateFamily, + types.TimestampFamily, + types.TimestampTZFamily: + return true + default: + return false + } +} + +// ToQuantileValue converts from a datum to a float suitable for use in a quantile +// function. It differs from eval.PerformCast in a few ways: +// 1. It supports conversions that are not legal casts (e.g. DATE to FLOAT). +// 2. It errors on NaN and infinite values because they will break our model. +// FromQuantileValue is the inverse of this function, and together they should +// support round-trip conversions. +// TODO(michae2): Add support for DECIMAL, TIME, TIMETZ, and INTERVAL. +func ToQuantileValue(d tree.Datum) (float64, error) { + switch v := d.(type) { + case *tree.DInt: + return float64(*v), nil + case *tree.DFloat: + if math.IsNaN(float64(*v)) || math.IsInf(float64(*v), 0) { + return 0, tree.ErrFloatOutOfRange + } + return float64(*v), nil + case *tree.DDate: + if !v.IsFinite() { + return 0, tree.ErrFloatOutOfRange + } + // We use PG epoch instead of Unix epoch to simplify clamping when + // converting back. + return float64(v.PGEpochDays()), nil + case *tree.DTimestamp: + if v.Equal(pgdate.TimeInfinity) || v.Equal(pgdate.TimeNegativeInfinity) { + return 0, tree.ErrFloatOutOfRange + } + return float64(v.Unix()) + float64(v.Nanosecond())*1e-9, nil + case *tree.DTimestampTZ: + // TIMESTAMPTZ doesn't store a timezone, so this is the same as TIMESTAMP. + if v.Equal(pgdate.TimeInfinity) || v.Equal(pgdate.TimeNegativeInfinity) { + return 0, tree.ErrFloatOutOfRange + } + return float64(v.Unix()) + float64(v.Nanosecond())*1e-9, nil + default: + return 0, errors.Errorf("cannot make quantile value from %v", d) + } +} + +var ( + // quantileMinTimestamp is an alternative minimum finite DTimestamp value to + // avoid the problems around TimeNegativeInfinity, see #41564. + quantileMinTimestamp = tree.MinSupportedTime.Add(time.Second) + quantileMinTimestampSec = float64(quantileMinTimestamp.Unix()) + // quantileMaxTimestamp is an alternative maximum finite DTimestamp value to + // avoid the problems around TimeInfinity, see #41564. + quantileMaxTimestamp = tree.MaxSupportedTime.Add(-1 * time.Second).Truncate(time.Second) + quantileMaxTimestampSec = float64(quantileMaxTimestamp.Unix()) +) + +// FromQuantileValue converts from a quantile value back to a datum suitable for +// use in a histogram. It is the inverse of ToQuantileValue. It differs from +// eval.PerformCast in a few ways: +// 1. It supports conversions that are not legal casts (e.g. FLOAT to DATE). +// 2. It errors on NaN and infinite values because they indicate a problem with +// the regression model rather than valid values. +// 3. On overflow or underflow it clamps to maximum or minimum finite values +// rather than failing the conversion (and thus the entire histogram). +// TODO(michae2): Add support for DECIMAL, TIME, TIMETZ, and INTERVAL. +func FromQuantileValue(colType *types.T, val float64) (tree.Datum, error) { + if math.IsNaN(val) || math.IsInf(val, 0) { + return nil, tree.ErrFloatOutOfRange + } + switch colType.Family() { + case types.IntFamily: + i := math.Round(val) + // Clamp instead of truncating. + switch colType.Width() { + case 16: + if i < math.MinInt16 { + i = math.MinInt16 + } else if i > math.MaxInt16 { + i = math.MaxInt16 + } + case 32: + if i < math.MinInt32 { + i = math.MinInt32 + } else if i > math.MaxInt32 { + i = math.MaxInt32 + } + default: + if i < math.MinInt64 { + i = math.MinInt64 + } else if i >= math.MaxInt64 { + // float64 cannot represent 2^63 - 1 exactly, so cast directly to DInt. + return tree.NewDInt(tree.DInt(math.MaxInt64)), nil + } + } + return tree.NewDInt(tree.DInt(i)), nil + case types.FloatFamily: + switch colType.Width() { + case 32: + if val <= -math.MaxFloat32 { + val = -math.MaxFloat32 + } else if val >= math.MaxFloat32 { + val = math.MaxFloat32 + } else { + val = float64(float32(val)) + } + } + return tree.NewDFloat(tree.DFloat(val)), nil + case types.DateFamily: + days := math.Round(val) + // First clamp to int32. + if days < math.MinInt32 { + days = math.MinInt32 + } else if days > math.MaxInt32 { + days = math.MaxInt32 + } + // Then clamp to pgdate.Date. + return tree.NewDDate(pgdate.MakeDateFromPGEpochClampFinite(int32(days))), nil + case types.TimestampFamily, types.TimestampTZFamily: + sec, frac := math.Modf(val) + var t time.Time + // Clamp to (our alternative finite) DTimestamp bounds. + if sec <= quantileMinTimestampSec { + t = quantileMinTimestamp + } else if sec >= quantileMaxTimestampSec { + t = quantileMaxTimestamp + } else { + t = timeutil.Unix(int64(sec), int64(frac*1e9)) + } + roundTo := tree.TimeFamilyPrecisionToRoundDuration(colType.Precision()) + if colType.Family() == types.TimestampFamily { + return tree.MakeDTimestamp(t, roundTo) + } + return tree.MakeDTimestampTZ(t, roundTo) + default: + return nil, errors.Errorf("cannot convert quantile value to type %s", colType.Name()) + } +} diff --git a/pkg/sql/stats/quantile_test.go b/pkg/sql/stats/quantile_test.go new file mode 100644 index 000000000000..ea4027fe7c70 --- /dev/null +++ b/pkg/sql/stats/quantile_test.go @@ -0,0 +1,573 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package stats + +import ( + "math" + "strconv" + "testing" + + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/timeutil/pgdate" +) + +// TODO(michae2): Test that a random histogram can round-trip to quantile +// and back. + +// Test conversions from datum to quantile value and back. +func TestQuantileValueRoundTrip(t *testing.T) { + testCases := []struct { + typ *types.T + dat tree.Datum + val float64 + err bool + }{ + // Integer cases. + { + typ: types.Int, + dat: tree.NewDInt(tree.DInt(0)), + val: 0, + }, + { + typ: types.Int, + dat: tree.NewDInt(tree.DInt(42)), + val: 42, + }, + { + typ: types.Int, + dat: tree.NewDInt(tree.DInt(math.MinInt32 - 1)), + val: math.MinInt32 - 1, + }, + { + typ: types.Int, + dat: tree.NewDInt(tree.DInt(math.MaxInt32 + 1)), + val: math.MaxInt32 + 1, + }, + { + typ: types.Int, + dat: tree.NewDInt(tree.DInt(math.MinInt64)), + val: math.MinInt64, + }, + { + typ: types.Int, + dat: tree.NewDInt(tree.DInt(math.MaxInt64)), + val: math.MaxInt64, + }, + { + typ: types.Int4, + dat: tree.NewDInt(tree.DInt(math.MinInt16 - 1)), + val: math.MinInt16 - 1, + }, + { + typ: types.Int4, + dat: tree.NewDInt(tree.DInt(math.MaxInt16 + 1)), + val: math.MaxInt16 + 1, + }, + { + typ: types.Int4, + dat: tree.NewDInt(tree.DInt(math.MinInt32)), + val: math.MinInt32, + }, + { + typ: types.Int4, + dat: tree.NewDInt(tree.DInt(math.MaxInt32)), + val: math.MaxInt32, + }, + { + typ: types.Int2, + dat: tree.NewDInt(tree.DInt(math.MinInt16)), + val: math.MinInt16, + }, + { + typ: types.Int2, + dat: tree.NewDInt(tree.DInt(math.MaxInt16)), + val: math.MaxInt16, + }, + // Float cases. + { + typ: types.Float, + dat: tree.DZeroFloat, + val: 0, + }, + { + typ: types.Float, + dat: tree.NewDFloat(tree.DFloat(-math.MaxFloat32 - 1)), + val: -math.MaxFloat32 - 1, + }, + { + typ: types.Float, + dat: tree.NewDFloat(tree.DFloat(math.MaxFloat32 + 1)), + val: math.MaxFloat32 + 1, + }, + { + typ: types.Float, + dat: tree.NewDFloat(tree.DFloat(-math.MaxFloat64)), + val: -math.MaxFloat64, + }, + { + typ: types.Float, + dat: tree.NewDFloat(tree.DFloat(math.MaxFloat64)), + val: math.MaxFloat64, + }, + { + typ: types.Float, + dat: tree.NewDFloat(tree.DFloat(math.Pi)), + val: math.Pi, + }, + { + typ: types.Float, + dat: tree.NewDFloat(tree.DFloat(math.SmallestNonzeroFloat64)), + val: math.SmallestNonzeroFloat64, + }, + { + typ: types.Float, + dat: tree.DNaNFloat, + err: true, + }, + { + typ: types.Float, + dat: tree.DNegInfFloat, + err: true, + }, + { + typ: types.Float, + dat: tree.DPosInfFloat, + err: true, + }, + { + typ: types.Float4, + dat: tree.NewDFloat(tree.DFloat(-math.MaxFloat32)), + val: -math.MaxFloat32, + }, + { + typ: types.Float4, + dat: tree.NewDFloat(tree.DFloat(math.MaxFloat32)), + val: math.MaxFloat32, + }, + { + typ: types.Float4, + dat: tree.NewDFloat(tree.DFloat(float32(math.Pi))), + val: float64(float32(math.Pi)), + }, + { + typ: types.Float4, + dat: tree.NewDFloat(tree.DFloat(math.SmallestNonzeroFloat32)), + val: math.SmallestNonzeroFloat32, + }, + { + typ: types.Float4, + dat: tree.DNaNFloat, + err: true, + }, + { + typ: types.Float4, + dat: tree.DNegInfFloat, + err: true, + }, + { + typ: types.Float4, + dat: tree.DPosInfFloat, + err: true, + }, + // Date cases. + { + typ: types.Date, + dat: tree.NewDDate(pgdate.MakeDateFromPGEpochClampFinite(0)), + val: 0, + }, + { + typ: types.Date, + dat: tree.NewDDate(pgdate.LowDate), + val: float64(pgdate.LowDate.PGEpochDays()), + }, + { + typ: types.Date, + dat: tree.NewDDate(pgdate.HighDate), + val: float64(pgdate.HighDate.PGEpochDays()), + }, + { + typ: types.Date, + dat: tree.NewDDate(pgdate.PosInfDate), + err: true, + }, + { + typ: types.Date, + dat: tree.NewDDate(pgdate.NegInfDate), + err: true, + }, + // Timestamp cases. + { + typ: types.Timestamp, + dat: tree.DZeroTimestamp, + val: float64(tree.DZeroTimestamp.Unix()), + }, + { + typ: types.Timestamp, + dat: &tree.DTimestamp{Time: quantileMinTimestamp}, + val: quantileMinTimestampSec, + }, + { + typ: types.Timestamp, + dat: &tree.DTimestamp{Time: quantileMaxTimestamp}, + val: quantileMaxTimestampSec, + }, + { + typ: types.Timestamp, + dat: &tree.DTimestamp{Time: pgdate.TimeNegativeInfinity}, + err: true, + }, + { + typ: types.Timestamp, + dat: &tree.DTimestamp{Time: pgdate.TimeInfinity}, + err: true, + }, + { + typ: types.TimestampTZ, + dat: tree.DZeroTimestampTZ, + val: float64(tree.DZeroTimestampTZ.Unix()), + }, + { + typ: types.TimestampTZ, + dat: &tree.DTimestampTZ{Time: quantileMinTimestamp}, + val: quantileMinTimestampSec, + }, + { + typ: types.TimestampTZ, + dat: &tree.DTimestampTZ{Time: quantileMaxTimestamp}, + val: quantileMaxTimestampSec, + }, + { + typ: types.TimestampTZ, + dat: &tree.DTimestampTZ{Time: pgdate.TimeNegativeInfinity}, + err: true, + }, + { + typ: types.TimestampTZ, + dat: &tree.DTimestampTZ{Time: pgdate.TimeInfinity}, + err: true, + }, + } + evalCtx := eval.NewTestingEvalContext(cluster.MakeTestingClusterSettings()) + for i, tc := range testCases { + t.Run(strconv.Itoa(i), func(t *testing.T) { + val, err := ToQuantileValue(tc.dat) + if err != nil { + if !tc.err { + t.Errorf("test case %d (%v) unexpected ToQuantileValue err: %v", i, tc.typ.Name(), err) + } + return + } + if tc.err { + t.Errorf("test case %d (%v) expected ToQuantileValue err", i, tc.typ.Name()) + return + } + if val != tc.val { + t.Errorf("test case %d (%v) incorrect val %v expected %v", i, tc.typ.Name(), val, tc.val) + return + } + // Check that we can make the round trip. + res, err := FromQuantileValue(tc.typ, val) + if err != nil { + t.Errorf("test case %d (%v) unexpected FromQuantileValue err: %v", i, tc.typ.Name(), err) + return + } + cmp, err := res.CompareError(evalCtx, tc.dat) + if err != nil { + t.Errorf("test case %d (%v) unexpected CompareError err: %v", i, tc.typ.Name(), err) + return + } + if cmp != 0 { + t.Errorf("test case %d (%v) incorrect datum %v expected %v", i, tc.typ.Name(), res, tc.dat) + } + }) + } +} + +// Test conversions from quantile value to datum and back. +// TestQuantileValueRoundTrip covers similar ground, so here we focus on cases +// that overflow or underflow and have to clamp. +func TestQuantileValueRoundTripOverflow(t *testing.T) { + testCases := []struct { + typ *types.T + val float64 + dat tree.Datum + err bool + res float64 + }{ + // Integer cases. + { + typ: types.Int, + val: math.MinInt64 - 1, + dat: tree.NewDInt(tree.DInt(math.MinInt64)), + res: math.MinInt64, + }, + { + typ: types.Int, + val: math.MaxInt64 + 1, + dat: tree.NewDInt(tree.DInt(math.MaxInt64)), + res: math.MaxInt64, + }, + { + typ: types.Int, + val: -math.MaxFloat64, + dat: tree.NewDInt(tree.DInt(math.MinInt64)), + res: math.MinInt64, + }, + { + typ: types.Int, + val: math.MaxFloat64, + dat: tree.NewDInt(tree.DInt(math.MaxInt64)), + res: math.MaxInt64, + }, + { + typ: types.Int4, + val: math.MinInt32 - 1, + dat: tree.NewDInt(tree.DInt(math.MinInt32)), + res: math.MinInt32, + }, + { + typ: types.Int4, + val: math.MaxInt32 + 1, + dat: tree.NewDInt(tree.DInt(math.MaxInt32)), + res: math.MaxInt32, + }, + { + typ: types.Int4, + val: -math.MaxFloat64, + dat: tree.NewDInt(tree.DInt(math.MinInt32)), + res: math.MinInt32, + }, + { + typ: types.Int4, + val: math.MaxFloat64, + dat: tree.NewDInt(tree.DInt(math.MaxInt32)), + res: math.MaxInt32, + }, + { + typ: types.Int2, + val: math.MinInt16 - 1, + dat: tree.NewDInt(tree.DInt(math.MinInt16)), + res: math.MinInt16, + }, + { + typ: types.Int2, + val: math.MaxInt16 + 1, + dat: tree.NewDInt(tree.DInt(math.MaxInt16)), + res: math.MaxInt16, + }, + { + typ: types.Int2, + val: -math.MaxFloat64, + dat: tree.NewDInt(tree.DInt(math.MinInt16)), + res: math.MinInt16, + }, + { + typ: types.Int2, + val: math.MaxFloat64, + dat: tree.NewDInt(tree.DInt(math.MaxInt16)), + res: math.MaxInt16, + }, + // Float cases. + { + typ: types.Float, + val: -math.MaxFloat64, + dat: tree.NewDFloat(tree.DFloat(-math.MaxFloat64)), + res: -math.MaxFloat64, + }, + { + typ: types.Float, + val: math.MaxFloat64, + dat: tree.NewDFloat(tree.DFloat(math.MaxFloat64)), + res: math.MaxFloat64, + }, + { + typ: types.Float, + val: -math.SmallestNonzeroFloat64, + dat: tree.NewDFloat(tree.DFloat(-math.SmallestNonzeroFloat64)), + res: -math.SmallestNonzeroFloat64, + }, + { + typ: types.Float, + val: math.SmallestNonzeroFloat64, + dat: tree.NewDFloat(tree.DFloat(math.SmallestNonzeroFloat64)), + res: math.SmallestNonzeroFloat64, + }, + { + typ: types.Float, + val: math.NaN(), + err: true, + }, + { + typ: types.Float, + val: math.Inf(-1), + err: true, + }, + { + typ: types.Float, + val: math.Inf(+1), + err: true, + }, + { + typ: types.Float4, + val: -math.MaxFloat32 - 1, + dat: tree.NewDFloat(tree.DFloat(-math.MaxFloat32)), + res: -math.MaxFloat32, + }, + { + typ: types.Float4, + val: math.MaxFloat32 + 1, + dat: tree.NewDFloat(tree.DFloat(math.MaxFloat32)), + res: math.MaxFloat32, + }, + { + typ: types.Float4, + val: -math.MaxFloat64, + dat: tree.NewDFloat(tree.DFloat(-math.MaxFloat32)), + res: -math.MaxFloat32, + }, + { + typ: types.Float4, + val: math.MaxFloat64, + dat: tree.NewDFloat(tree.DFloat(math.MaxFloat32)), + res: math.MaxFloat32, + }, + { + typ: types.Float4, + val: math.Pi, + dat: tree.NewDFloat(tree.DFloat(float32(math.Pi))), + res: float64(float32(math.Pi)), + }, + { + typ: types.Float4, + val: -math.SmallestNonzeroFloat64, + dat: tree.DZeroFloat, + res: 0, + }, + { + typ: types.Float4, + val: math.SmallestNonzeroFloat64, + dat: tree.DZeroFloat, + res: 0, + }, + // Date cases. + { + typ: types.Date, + val: float64(pgdate.LowDate.PGEpochDays()) - 1, + dat: tree.NewDDate(pgdate.LowDate), + res: float64(pgdate.LowDate.PGEpochDays()), + }, + { + typ: types.Date, + val: float64(pgdate.HighDate.PGEpochDays()) + 1, + dat: tree.NewDDate(pgdate.HighDate), + res: float64(pgdate.HighDate.PGEpochDays()), + }, + { + typ: types.Date, + val: -math.MaxFloat64, + dat: tree.NewDDate(pgdate.LowDate), + res: float64(pgdate.LowDate.PGEpochDays()), + }, + { + typ: types.Date, + val: math.MaxFloat64, + dat: tree.NewDDate(pgdate.HighDate), + res: float64(pgdate.HighDate.PGEpochDays()), + }, + // Timestamp cases. + { + typ: types.Timestamp, + val: float64(pgdate.TimeNegativeInfinity.Unix()), + dat: &tree.DTimestamp{Time: quantileMinTimestamp}, + res: quantileMinTimestampSec, + }, + { + typ: types.Timestamp, + val: float64(pgdate.TimeInfinity.Unix()), + dat: &tree.DTimestamp{Time: quantileMaxTimestamp}, + res: quantileMaxTimestampSec, + }, + { + typ: types.Timestamp, + val: -math.MaxFloat64, + dat: &tree.DTimestamp{Time: quantileMinTimestamp}, + res: quantileMinTimestampSec, + }, + { + typ: types.Timestamp, + val: math.MaxFloat64, + dat: &tree.DTimestamp{Time: quantileMaxTimestamp}, + res: quantileMaxTimestampSec, + }, + { + typ: types.TimestampTZ, + val: float64(pgdate.TimeNegativeInfinity.Unix()), + dat: &tree.DTimestampTZ{Time: quantileMinTimestamp}, + res: quantileMinTimestampSec, + }, + { + typ: types.TimestampTZ, + val: float64(pgdate.TimeInfinity.Unix()), + dat: &tree.DTimestampTZ{Time: quantileMaxTimestamp}, + res: quantileMaxTimestampSec, + }, + { + typ: types.TimestampTZ, + val: -math.MaxFloat64, + dat: &tree.DTimestampTZ{Time: quantileMinTimestamp}, + res: quantileMinTimestampSec, + }, + { + typ: types.TimestampTZ, + val: math.MaxFloat64, + dat: &tree.DTimestampTZ{Time: quantileMaxTimestamp}, + res: quantileMaxTimestampSec, + }, + } + evalCtx := eval.NewTestingEvalContext(cluster.MakeTestingClusterSettings()) + for i, tc := range testCases { + t.Run(strconv.Itoa(i), func(t *testing.T) { + d, err := FromQuantileValue(tc.typ, tc.val) + if err != nil { + if !tc.err { + t.Errorf("test case %d (%v) unexpected FromQuantileValue err: %v", i, tc.typ.Name(), err) + } + return + } + if tc.err { + t.Errorf("test case %d (%v) expected FromQuantileValue err", i, tc.typ.Name()) + return + } + cmp, err := d.CompareError(evalCtx, tc.dat) + if err != nil { + t.Errorf("test case %d (%v) unexpected CompareError err: %v", i, tc.typ.Name(), err) + return + } + if cmp != 0 { + t.Errorf("test case %d (%v) incorrect datum %v expected %v", i, tc.typ.Name(), d, tc.dat) + return + } + // Check that we can make the round trip with the clamped value. + res, err := ToQuantileValue(d) + if err != nil { + t.Errorf("test case %d (%v) unexpected ToQuantileValue err: %v", i, tc.typ.Name(), err) + return + } + if res != tc.res { + t.Errorf("test case %d (%v) incorrect val %v expected %v", i, tc.typ.Name(), res, tc.res) + return + } + }) + } +} diff --git a/pkg/util/timeutil/pgdate/pgdate.go b/pkg/util/timeutil/pgdate/pgdate.go index 9260c1d91696..34f92318a2e1 100644 --- a/pkg/util/timeutil/pgdate/pgdate.go +++ b/pkg/util/timeutil/pgdate/pgdate.go @@ -141,6 +141,17 @@ func MakeDateFromPGEpoch(days int32) (Date, error) { return Date{days: days}, nil } +// MakeDateFromPGEpochClampFinite creates a Date from the number of days since +// 2000-01-01, clamping to LowDate or HighDate if outside those bounds. +func MakeDateFromPGEpochClampFinite(days int32) Date { + if days < lowDays { + return LowDate + } else if days > highDays { + return HighDate + } + return Date{days: days} +} + // ToTime returns d as a time.Time. Non finite dates return an error. func (d Date) ToTime() (time.Time, error) { if d.days == math.MinInt32 || d.days == math.MaxInt32 {