Skip to content

Commit

Permalink
streamingccl: small logging and tracing cleanups
Browse files Browse the repository at this point in the history
- Add a tracing span to ingestion job cutover.
- Move a particularly noisy log message to VInfo(3).
- Prefer log.VInfo to `if log.V(n) {}` in cases where we aren't doing
  expensive argument construction.

Release note: None
  • Loading branch information
stevendanna committed Jul 7, 2022
1 parent 5fb4478 commit 75971c3
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 13 deletions.
4 changes: 4 additions & 0 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -191,6 +192,9 @@ func (s *streamIngestionResumer) Resume(resumeCtx context.Context, execCtx inter
func revertToCutoverTimestamp(
ctx context.Context, execCtx interface{}, ingestionJobID jobspb.JobID,
) error {
ctx, span := tracing.ChildSpan(ctx, "streamingest.revertToCutoverTimestamp")
defer span.Finish()

p := execCtx.(sql.JobExecContext)
db := p.ExecCfg().DB
j, err := p.ExecCfg().JobRegistry.LoadJob(ctx, ingestionJobID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ func (sip *streamIngestionProcessor) bufferKV(kv *roachpb.KeyValue) error {
}

func (sip *streamIngestionProcessor) bufferCheckpoint(event partitionEvent) error {
log.Infof(sip.Ctx, "got checkpoint %v", event.GetResolved())
log.VInfof(sip.Ctx, 3, "got checkpoint %v", event.GetResolved())
resolvedTimePtr := event.GetResolved()
if resolvedTimePtr == nil {
return errors.New("checkpoint event expected to have a resolved timestamp")
Expand Down
16 changes: 4 additions & 12 deletions pkg/ccl/streamingccl/streamproducer/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,19 +223,15 @@ func (s *eventStream) onValue(ctx context.Context, value *roachpb.RangeFeedValue
select {
case <-ctx.Done():
case s.eventsCh <- roachpb.RangeFeedEvent{Val: value}:
if log.V(1) {
log.Infof(ctx, "onValue: %s@%s", value.Key, value.Value.Timestamp)
}
log.VInfof(ctx, 1, "onValue: %s@%s", value.Key, value.Value.Timestamp)
}
}

func (s *eventStream) onCheckpoint(ctx context.Context, checkpoint *roachpb.RangeFeedCheckpoint) {
select {
case <-ctx.Done():
case s.eventsCh <- roachpb.RangeFeedEvent{Checkpoint: checkpoint}:
if log.V(1) {
log.Infof(ctx, "onCheckpoint: %s@%s", checkpoint.Span, checkpoint.ResolvedTS)
}
log.VInfof(ctx, 1, "onCheckpoint: %s@%s", checkpoint.Span, checkpoint.ResolvedTS)
}
}

Expand All @@ -248,9 +244,7 @@ func (s *eventStream) onSpanCompleted(ctx context.Context, sp roachpb.Span) erro
case <-ctx.Done():
return ctx.Err()
case s.eventsCh <- roachpb.RangeFeedEvent{Checkpoint: &checkpoint}:
if log.V(1) {
log.Infof(ctx, "onSpanCompleted: %s@%s", checkpoint.Span, checkpoint.ResolvedTS)
}
log.VInfof(ctx, 1, "onSpanCompleted: %s@%s", checkpoint.Span, checkpoint.ResolvedTS)
return nil
}
}
Expand All @@ -259,9 +253,7 @@ func (s *eventStream) onSSTable(ctx context.Context, sst *roachpb.RangeFeedSSTab
select {
case <-ctx.Done():
case s.eventsCh <- roachpb.RangeFeedEvent{SST: sst}:
if log.V(1) {
log.Infof(ctx, "onSSTable: %s@%s", sst.Span, sst.WriteTS)
}
log.VInfof(ctx, 1, "onSSTable: %s@%s", sst.Span, sst.WriteTS)
}
}

Expand Down

0 comments on commit 75971c3

Please sign in to comment.