Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
35241: stats: delay automatic stats refresh to reflect latest changes r=rytaft a=rytaft

Prior to this commit, it was possible for an `IMPORT` or `RESTORE` to
trigger a stats refresh, but because the stats refresh was run with
`AS OF SYSTEM TIME '-30s'`, it appeared as though the table was empty.
This commit adds a 30 second delay before starting a refresh cycle,
so all of the latest changes that triggered the refresh will be
reflected.

Release note: None

35256: rsg: add some errors to the ignored list r=jordanlewis a=jordanlewis

Release note: None

Co-authored-by: Rebecca Taft <[email protected]>
Co-authored-by: Jordan Lewis <[email protected]>
  • Loading branch information
3 people committed Feb 28, 2019
3 parents cc38281 + bad3a17 + 0e1c1e7 commit 71681f6
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 48 deletions.
80 changes: 41 additions & 39 deletions pkg/sql/stats/automatic_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,13 @@ func (r *Refresher) Start(
// on startup.
r.ensureAllTables(ctx, st)

// We always sleep for r.asOfTime at the beginning of each refresh, so
// subtract it from the refreshInterval.
refreshInterval -= r.asOfTime
if refreshInterval < 0 {
refreshInterval = 0
}

timer := time.NewTimer(refreshInterval)
defer timer.Stop()

Expand All @@ -205,6 +212,17 @@ func (r *Refresher) Start(
mutationCounts := r.mutationCounts
if err := stopper.RunAsyncTask(
ctx, "stats.Refresher: maybeRefreshStats", func(ctx context.Context) {
// Wait so that the latest changes will be reflected according to the
// AS OF time.
timerAsOf := time.NewTimer(r.asOfTime)
defer timerAsOf.Stop()
select {
case <-timerAsOf.C:
break
case <-stopper.ShouldQuiesce():
return
}

for tableID, rowsAffected := range mutationCounts {
r.maybeRefreshStats(ctx, stopper, tableID, rowsAffected, r.asOfTime)
select {
Expand Down Expand Up @@ -348,48 +366,32 @@ func (r *Refresher) maybeRefreshStats(

if err := r.refreshStats(ctx, tableID, asOf); err != nil {
pgerr, ok := errors.Cause(err).(*pgerror.Error)
if ok && pgerr.Code == pgerror.CodeUndefinedTableError {
// Wait so that the latest changes will be reflected according to the
// AS OF time, then try again.
timer := time.NewTimer(asOf)
defer timer.Stop()
select {
case <-timer.C:
break
case <-stopper.ShouldQuiesce():
return
if ok && pgerr.Code == pgerror.CodeLockNotAvailableError {
// Another stats job was already running. Attempt to reschedule this
// refresh.
if mustRefresh {
// For the cases where mustRefresh=true (stats don't yet exist or it
// has been 2x the average time since a refresh), we want to make sure
// that maybeRefreshStats is called on this table during the next
// cycle so that we have another chance to trigger a refresh. We pass
// rowsAffected=0 so that we don't force a refresh if another node has
// already done it.
r.mutations <- mutation{tableID: tableID, rowsAffected: 0}
} else {
// If this refresh was caused by a "dice roll", we want to make sure
// that the refresh is rescheduled so that we adhere to the
// targetFractionOfRowsUpdatedBeforeRefresh statistical ideal. We
// ensure that the refresh is triggered during the next cycle by
// passing a very large number for rowsAffected.
r.mutations <- mutation{tableID: tableID, rowsAffected: math.MaxInt32}
}
err = r.refreshStats(ctx, tableID, asOf)
}
if err != nil {
pgerr, ok := errors.Cause(err).(*pgerror.Error)
if ok && pgerr.Code == pgerror.CodeLockNotAvailableError {
// Another stats job was already running. Attempt to reschedule this
// refresh.
if mustRefresh {
// For the cases where mustRefresh=true (stats don't yet exist or it
// has been 2x the average time since a refresh), we want to make sure
// that maybeRefreshStats is called on this table during the next
// cycle so that we have another chance to trigger a refresh. We pass
// rowsAffected=0 so that we don't force a refresh if another node has
// already done it.
r.mutations <- mutation{tableID: tableID, rowsAffected: 0}
} else {
// If this refresh was caused by a "dice roll", we want to make sure
// that the refresh is rescheduled so that we adhere to the
// targetFractionOfRowsUpdatedBeforeRefresh statistical ideal. We
// ensure that the refresh is triggered during the next cycle by
// passing a very large number for rowsAffected.
r.mutations <- mutation{tableID: tableID, rowsAffected: math.MaxInt32}
}
return
}

// Log other errors but don't automatically reschedule the refresh, since
// that could lead to endless retries.
log.Errorf(ctx, "failed to create statistics on table %d: %v", tableID, err)
return
}

// Log other errors but don't automatically reschedule the refresh, since
// that could lead to endless retries.
log.Errorf(ctx, "failed to create statistics on table %d: %v", tableID, err)
return
}
}

Expand Down
29 changes: 20 additions & 9 deletions pkg/sql/stats/automatic_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,23 +65,26 @@ func TestMaybeRefreshStats(t *testing.T) {

// There are no stats yet, so this must refresh the statistics on table t
// even though rowsAffected=0.
refresher.maybeRefreshStats(ctx, s.Stopper(), descA.ID, 0 /* rowsAffected */, time.Microsecond /* asOf */)
refresher.maybeRefreshStats(
ctx, s.Stopper(), descA.ID, 0 /* rowsAffected */, time.Microsecond, /* asOf */
)
if err := checkStatsCount(ctx, cache, descA.ID, 1 /* expected */); err != nil {
t.Fatal(err)
}

// Try to refresh again. With rowsAffected=0, the probability of a refresh
// is 0, so refreshing will not succeed.
refresher.maybeRefreshStats(ctx, s.Stopper(), descA.ID, 0 /* rowsAffected */, time.Microsecond /* asOf */)
refresher.maybeRefreshStats(
ctx, s.Stopper(), descA.ID, 0 /* rowsAffected */, time.Microsecond, /* asOf */
)
if err := checkStatsCount(ctx, cache, descA.ID, 1 /* expected */); err != nil {
t.Fatal(err)
}

// With rowsAffected=10, refreshing should work. Since there are more rows
// updated than exist in the table, the probability of a refresh is 100%.
// Use a non-zero asOf time to test that the thread will sleep if necessary.
refresher.maybeRefreshStats(
ctx, s.Stopper(), descA.ID, 10 /* rowsAffected */, time.Second, /* asOf */
ctx, s.Stopper(), descA.ID, 10 /* rowsAffected */, time.Microsecond, /* asOf */
)
if err := checkStatsCount(ctx, cache, descA.ID, 2 /* expected */); err != nil {
t.Fatal(err)
Expand All @@ -91,7 +94,9 @@ func TestMaybeRefreshStats(t *testing.T) {
// enqueuing the attempt.
// TODO(rytaft): Should not enqueue views to begin with.
descVW := sqlbase.GetTableDescriptor(s.DB(), "t", "vw")
refresher.maybeRefreshStats(ctx, s.Stopper(), descVW.ID, 0 /* rowsAffected */, 0 /* asOf */)
refresher.maybeRefreshStats(
ctx, s.Stopper(), descVW.ID, 0 /* rowsAffected */, time.Microsecond, /* asOf */
)
select {
case <-refresher.mutations:
t.Fatal("refresher should not re-enqueue attempt to create stats over view")
Expand Down Expand Up @@ -262,7 +267,9 @@ func TestAverageRefreshTime(t *testing.T) {
// average time between refreshes, so this call is not required to refresh
// the statistics on table t. With rowsAffected=0, the probability of refresh
// is 0.
refresher.maybeRefreshStats(ctx, s.Stopper(), tableID, 0 /* rowsAffected */, time.Microsecond /* asOf */)
refresher.maybeRefreshStats(
ctx, s.Stopper(), tableID, 0 /* rowsAffected */, time.Microsecond, /* asOf */
)
if err := checkStatsCount(ctx, cache, tableID, 20 /* expected */); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -307,7 +314,9 @@ func TestAverageRefreshTime(t *testing.T) {
// on table t even though rowsAffected=0. After refresh, only 15 stats should
// remain (5 from column k and 10 from column v), since the old stats on k
// were deleted.
refresher.maybeRefreshStats(ctx, s.Stopper(), tableID, 0 /* rowsAffected */, time.Microsecond /* asOf */)
refresher.maybeRefreshStats(
ctx, s.Stopper(), tableID, 0 /* rowsAffected */, time.Microsecond, /* asOf */
)
if err := checkStatsCount(ctx, cache, tableID, 15 /* expected */); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -362,10 +371,12 @@ func TestNoRetryOnFailure(t *testing.T) {

executor := s.InternalExecutor().(sqlutil.InternalExecutor)
cache := NewTableStatisticsCache(10 /* cacheSize */, s.Gossip(), kvDB, executor)
r := MakeRefresher(executor, cache, 0 /* asOfTime */)
r := MakeRefresher(executor, cache, time.Microsecond /* asOfTime */)

// Try to refresh stats on a table that doesn't exist.
r.maybeRefreshStats(ctx, s.Stopper(), 100 /* tableID */, math.MaxInt32, 0 /* asOfTime */)
r.maybeRefreshStats(
ctx, s.Stopper(), 100 /* tableID */, math.MaxInt32, time.Microsecond, /* asOfTime */
)

// Ensure that we will not try to refresh tableID 100 again.
if expected, actual := 0, len(r.mutations); expected != actual {
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/tests/rsg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ var ignoredErrorPatterns = []string{
"out of int64 range",
"underflow, subnormal",
"overflow",
"requested length too large",
// Type checking
"value type .* doesn't match type .* of column",
"incompatible value type",
Expand Down Expand Up @@ -418,17 +419,24 @@ var ignoredErrorPatterns = []string{
"cannot get array length of a non-array",
"cannot be called on a non-array",
"cannot call json_object_keys on an array",
"cannot set path in scalar",
// Builtins that have funky preconditions
"cannot delete from scalar",
"lastval is not yet defined",
"negative substring length",
"non-positive substring length",
"bit strings of different sizes",
"inet addresses with different sizes",
"zero length IP",
"values of different sizes",
"must have even number of elements",
"cannot take logarithm of a negative number",
"input value must be",
"formats are supported for decode",
"only available in ccl",
"expect comma-separated list of filename",
"unknown constraint",
"invalid destination encoding name",
}

var ignoredRegex = regexp.MustCompile(strings.Join(ignoredErrorPatterns, "|"))
Expand Down

0 comments on commit 71681f6

Please sign in to comment.