Skip to content

Commit

Permalink
telemetryccl: use log spy in backup/restore test
Browse files Browse the repository at this point in the history
Previously this test used file logging to test the telemetry output,
which can result in flakes on CI. This commit modifies the test to use
a log spy which is a bit more reliable. Additionally, the
deserialization now happens in the `Intercept()` method which makes
the test easier to read.

Resolves: #120115
Epic: None
Release note: None
  • Loading branch information
dhartunian committed Mar 19, 2024
1 parent bd34188 commit cffcc30
Showing 1 changed file with 58 additions and 65 deletions.
123 changes: 58 additions & 65 deletions pkg/ccl/telemetryccl/telemetry_logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -156,17 +155,60 @@ type expectedSampleQueryEvent struct {
stmt string
}

type telemetrySpy struct {
t *testing.T

sampledQueries []eventpb.SampledQuery
sampledQueriesRaw []logpb.Entry
recoveryEvents []eventpb.RecoveryEvent
}

func (l *telemetrySpy) Intercept(entry []byte) {
var rawLog logpb.Entry
if err := json.Unmarshal(entry, &rawLog); err != nil {
l.t.Errorf("failed unmarshaling %s: %s", entry, err)
}

if rawLog.Channel != logpb.Channel_TELEMETRY {
return
}

var sq eventpb.SampledQuery
if strings.Contains(rawLog.Message, "IMPORT") ||
strings.Contains(rawLog.Message, "RESTORE") ||
strings.Contains(rawLog.Message, "BACKUP") {
if err := json.Unmarshal([]byte(rawLog.Message[rawLog.StructuredStart:rawLog.StructuredEnd]), &sq); err == nil {
l.sampledQueries = append(l.sampledQueries, sq)
l.sampledQueriesRaw = append(l.sampledQueriesRaw, rawLog)
return
} else {
l.t.Errorf("failed unmarshaling %s: %s", rawLog.Message, err)
}
}

var re eventpb.RecoveryEvent
if err := json.Unmarshal([]byte(rawLog.Message[rawLog.StructuredStart:rawLog.StructuredEnd]), &re); err == nil {
l.recoveryEvents = append(l.recoveryEvents, re)
return
} else {
l.t.Errorf("failed unmarshaling %s: %s", rawLog.Message, err)
}
}

var _ log.Interceptor = &telemetrySpy{}

// TODO(janexing): add event telemetry tests for failed or canceled bulk jobs.
func TestBulkJobTelemetryLogging(t *testing.T) {
defer leaktest.AfterTest(t)()
sc := log.ScopeWithoutShowLogs(t)
defer sc.Close(t)

skip.WithIssue(t, 120115)

ctx := context.Background()

cleanup := logtestutils.InstallLogFileSink(sc, t, logpb.Channel_TELEMETRY)
spy := &telemetrySpy{
t: t,
}
cleanup := log.InterceptWith(ctx, spy)
defer cleanup()

st := logtestutils.StubTime{}
Expand Down Expand Up @@ -325,64 +367,20 @@ func TestBulkJobTelemetryLogging(t *testing.T) {
execTimestamp++
}

log.FlushFiles()

var filteredSampleQueries []logpb.Entry
testutils.SucceedsSoon(t, func() error {
filteredSampleQueries = []logpb.Entry{}
sampleQueryEntries, err := log.FetchEntriesFromFiles(
0,
math.MaxInt64,
10000,
regexp.MustCompile(`"EventType":"sampled_query"`),
log.WithMarkedSensitiveData,
)
require.NoError(t, err)

for _, sq := range sampleQueryEntries {
if !(strings.Contains(sq.Message, "IMPORT") || strings.Contains(sq.Message, "RESTORE") || strings.Contains(sq.Message, "BACKUP")) {
continue
}
filteredSampleQueries = append(filteredSampleQueries, sq)
}
if len(filteredSampleQueries) < len(testData) {
return errors.New("not enough sample query events fetched")
}
return nil
})

var recoveryEventEntries []logpb.Entry
testutils.SucceedsSoon(t, func() error {
recoveryEventEntries, err = log.FetchEntriesFromFiles(
0,
math.MaxInt64,
10000,
regexp.MustCompile(`"EventType":"recovery_event"`),
log.WithMarkedSensitiveData,
)
require.NoError(t, err)
if len(recoveryEventEntries) < len(testData) {
return errors.New("not enough recovery events fetched")
}
return nil
})
log.FlushAllSync()

for _, tc := range testData {
t.Run(tc.name, func(t *testing.T) {
var foundSampleQuery bool
for i := len(filteredSampleQueries) - 1; i >= 0; i-- {
e := filteredSampleQueries[i]
var sq eventpb.SampledQuery
jsonPayload := []byte(e.Message)
if err := json.Unmarshal(jsonPayload, &sq); err != nil {
t.Errorf("unmarshalling %q: %v", e.Message, err)
}
for i := len(spy.sampledQueries) - 1; i >= 0; i-- {
sq := spy.sampledQueries[i]
if sq.Statement.StripMarkers() == tc.sampleQueryEvent.stmt {
foundSampleQuery = true
if strings.Contains(e.Message, "NumRows:") {
rawEvent := spy.sampledQueriesRaw[i]
if strings.Contains(rawEvent.Message, "NumRows:") {
t.Errorf("for bulk jobs (IMPORT/BACKUP/RESTORE), "+
"there shouldn't be NumRows entry in the event message: %s",
e.Message)
rawEvent.Message)
}
require.Greater(t, sq.BulkJobId, uint64(0))
tc.recoveryEvent.bulkJobId = sq.BulkJobId
Expand All @@ -394,18 +392,13 @@ func TestBulkJobTelemetryLogging(t *testing.T) {
}

var foundRecoveryEvent bool
for i := len(recoveryEventEntries) - 1; i >= 0; i-- {
e := recoveryEventEntries[i]
var re eventpb.RecoveryEvent
jsonPayload := []byte(e.Message)
if err := json.Unmarshal(jsonPayload, &re); err != nil {
t.Errorf("unmarshalling %q: %v", e.Message, err)
}
if string(re.RecoveryType) == tc.recoveryEvent.recoveryType &&
tc.recoveryEvent.bulkJobId == re.JobID &&
re.ResultStatus == "succeeded" {
for i := len(spy.recoveryEvents) - 1; i >= 0; i-- {
e := spy.recoveryEvents[i]
if string(e.RecoveryType) == tc.recoveryEvent.recoveryType &&
tc.recoveryEvent.bulkJobId == e.JobID &&
e.ResultStatus == "succeeded" {
foundRecoveryEvent = true
require.Equal(t, tc.recoveryEvent.numRows, re.NumRows)
require.Equal(t, tc.recoveryEvent.numRows, e.NumRows)
break
}
}
Expand Down

0 comments on commit cffcc30

Please sign in to comment.