diff --git a/pkg/ccl/telemetryccl/telemetry_logging_test.go b/pkg/ccl/telemetryccl/telemetry_logging_test.go index 483d0a9257d2..59b4d6560b60 100644 --- a/pkg/ccl/telemetryccl/telemetry_logging_test.go +++ b/pkg/ccl/telemetryccl/telemetry_logging_test.go @@ -27,7 +27,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -156,17 +155,60 @@ type expectedSampleQueryEvent struct { stmt string } +type telemetrySpy struct { + t *testing.T + + sampledQueries []eventpb.SampledQuery + sampledQueriesRaw []logpb.Entry + recoveryEvents []eventpb.RecoveryEvent +} + +func (l *telemetrySpy) Intercept(entry []byte) { + var rawLog logpb.Entry + if err := json.Unmarshal(entry, &rawLog); err != nil { + l.t.Errorf("failed unmarshaling %s: %s", entry, err) + } + + if rawLog.Channel != logpb.Channel_TELEMETRY { + return + } + + var sq eventpb.SampledQuery + if strings.Contains(rawLog.Message, "IMPORT") || + strings.Contains(rawLog.Message, "RESTORE") || + strings.Contains(rawLog.Message, "BACKUP") { + if err := json.Unmarshal([]byte(rawLog.Message[rawLog.StructuredStart:rawLog.StructuredEnd]), &sq); err == nil { + l.sampledQueries = append(l.sampledQueries, sq) + l.sampledQueriesRaw = append(l.sampledQueriesRaw, rawLog) + return + } else { + l.t.Errorf("failed unmarshaling %s: %s", rawLog.Message, err) + } + } + + var re eventpb.RecoveryEvent + if err := json.Unmarshal([]byte(rawLog.Message[rawLog.StructuredStart:rawLog.StructuredEnd]), &re); err == nil { + l.recoveryEvents = append(l.recoveryEvents, re) + return + } else { + l.t.Errorf("failed unmarshaling %s: %s", rawLog.Message, err) + } +} + +var _ log.Interceptor = &telemetrySpy{} + // TODO(janexing): add event telemetry tests for failed or canceled bulk jobs. func TestBulkJobTelemetryLogging(t *testing.T) { defer leaktest.AfterTest(t)() sc := log.ScopeWithoutShowLogs(t) defer sc.Close(t) - skip.WithIssue(t, 120115) - ctx := context.Background() - cleanup := logtestutils.InstallLogFileSink(sc, t, logpb.Channel_TELEMETRY) + spy := &telemetrySpy{ + t: t, + } + cleanup := log.InterceptWith(ctx, spy) defer cleanup() st := logtestutils.StubTime{} @@ -325,64 +367,20 @@ func TestBulkJobTelemetryLogging(t *testing.T) { execTimestamp++ } - log.FlushFiles() - - var filteredSampleQueries []logpb.Entry - testutils.SucceedsSoon(t, func() error { - filteredSampleQueries = []logpb.Entry{} - sampleQueryEntries, err := log.FetchEntriesFromFiles( - 0, - math.MaxInt64, - 10000, - regexp.MustCompile(`"EventType":"sampled_query"`), - log.WithMarkedSensitiveData, - ) - require.NoError(t, err) - - for _, sq := range sampleQueryEntries { - if !(strings.Contains(sq.Message, "IMPORT") || strings.Contains(sq.Message, "RESTORE") || strings.Contains(sq.Message, "BACKUP")) { - continue - } - filteredSampleQueries = append(filteredSampleQueries, sq) - } - if len(filteredSampleQueries) < len(testData) { - return errors.New("not enough sample query events fetched") - } - return nil - }) - - var recoveryEventEntries []logpb.Entry - testutils.SucceedsSoon(t, func() error { - recoveryEventEntries, err = log.FetchEntriesFromFiles( - 0, - math.MaxInt64, - 10000, - regexp.MustCompile(`"EventType":"recovery_event"`), - log.WithMarkedSensitiveData, - ) - require.NoError(t, err) - if len(recoveryEventEntries) < len(testData) { - return errors.New("not enough recovery events fetched") - } - return nil - }) + log.FlushAllSync() for _, tc := range testData { t.Run(tc.name, func(t *testing.T) { var foundSampleQuery bool - for i := len(filteredSampleQueries) - 1; i >= 0; i-- { - e := filteredSampleQueries[i] - var sq eventpb.SampledQuery - jsonPayload := []byte(e.Message) - if err := json.Unmarshal(jsonPayload, &sq); err != nil { - t.Errorf("unmarshalling %q: %v", e.Message, err) - } + for i := len(spy.sampledQueries) - 1; i >= 0; i-- { + sq := spy.sampledQueries[i] if sq.Statement.StripMarkers() == tc.sampleQueryEvent.stmt { foundSampleQuery = true - if strings.Contains(e.Message, "NumRows:") { + rawEvent := spy.sampledQueriesRaw[i] + if strings.Contains(rawEvent.Message, "NumRows:") { t.Errorf("for bulk jobs (IMPORT/BACKUP/RESTORE), "+ "there shouldn't be NumRows entry in the event message: %s", - e.Message) + rawEvent.Message) } require.Greater(t, sq.BulkJobId, uint64(0)) tc.recoveryEvent.bulkJobId = sq.BulkJobId @@ -394,18 +392,13 @@ func TestBulkJobTelemetryLogging(t *testing.T) { } var foundRecoveryEvent bool - for i := len(recoveryEventEntries) - 1; i >= 0; i-- { - e := recoveryEventEntries[i] - var re eventpb.RecoveryEvent - jsonPayload := []byte(e.Message) - if err := json.Unmarshal(jsonPayload, &re); err != nil { - t.Errorf("unmarshalling %q: %v", e.Message, err) - } - if string(re.RecoveryType) == tc.recoveryEvent.recoveryType && - tc.recoveryEvent.bulkJobId == re.JobID && - re.ResultStatus == "succeeded" { + for i := len(spy.recoveryEvents) - 1; i >= 0; i-- { + e := spy.recoveryEvents[i] + if string(e.RecoveryType) == tc.recoveryEvent.recoveryType && + tc.recoveryEvent.bulkJobId == e.JobID && + e.ResultStatus == "succeeded" { foundRecoveryEvent = true - require.Equal(t, tc.recoveryEvent.numRows, re.NumRows) + require.Equal(t, tc.recoveryEvent.numRows, e.NumRows) break } }