-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy patherrors.go
139 lines (113 loc) · 4.12 KB
/
errors.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// Copyright 2019 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt
package changefeedbase
import (
"context"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
"github.com/cockroachdb/errors"
)
// FailureType is the reason for the changefeed failure that maps to the
// failure_type values we emit in the ChangefeedFailed telemetry events
type FailureType = string
const (
// ConnectionClosed means the user disconnected from a core changefeed
ConnectionClosed FailureType = "connection_closed"
// UserInput applies to errors in the create statement prior to job creation
UserInput FailureType = "user_input"
// OnStartup applies to all non-user-input errors during Planning
OnStartup FailureType = "on_startup"
// UnknownError applies to all errors not otherwise categorized
UnknownError FailureType = "unknown_error"
)
// Used for categorizing errors in logging
type taggedError struct {
wrapped error
tag string
}
// MarkTaggedError wraps the given error with an extra tag string property to be
// read afterwards in cases such as logging
func MarkTaggedError(e error, tag string) error {
return &taggedError{wrapped: e, tag: tag}
}
// IsTaggedError returns whether or not the top level error is a TaggedError
// along with its tag. This does not work if the tagged error is wrapped.
func IsTaggedError(err error) (bool, string) {
if err == nil {
return false, ""
}
if tagged := (*taggedError)(nil); errors.As(err, &tagged) {
return true, tagged.tag
}
return false, ""
}
// Error implements the error interface.
func (e *taggedError) Error() string {
return e.wrapped.Error()
}
// Cause implements the github.com/pkg/errors.causer interface.
func (e *taggedError) Cause() error { return e.wrapped }
// Unwrap implements the github.com/golang/xerrors.Wrapper interface, which is
// planned to be moved to the stdlib in go 1.13.
func (e *taggedError) Unwrap() error { return e.wrapped }
type terminalError struct{}
func (e *terminalError) Error() string {
return "terminal changefeed error"
}
// TODO(yevgeniy): retryableError and all machinery related
// to MarkRetryableError maybe removed once 23.1 is released.
type retryableError struct{}
func (e *retryableError) Error() string {
return "retryable changefeed error"
}
// WithTerminalError decorates underlying error to indicate
// that the error is a terminal changefeed error.
func WithTerminalError(cause error) error {
if cause == nil {
return nil
}
return errors.Mark(cause, &terminalError{})
}
// MarkRetryableError wraps the given error, marking it as retryable.s
func MarkRetryableError(cause error) error {
if cause == nil {
return nil
}
return errors.Mark(cause, &retryableError{})
}
// AsTerminalError determines if the cause error is a terminal changefeed
// error. Returns non-nil error if changefeed should terminate with the
// returned error.
func AsTerminalError(ctx context.Context, lm *lease.Manager, cause error) (termErr error) {
if cause == nil {
return nil
}
if err := ctx.Err(); err != nil {
// If context has been cancelled, we must respect that; this happens
// if, e.g. this changefeed is being cancelled.
return err
}
if lm.IsDraining() {
// This node is being drained. It's safe to propagate this error (to the
// job registry) since job registry should not be able to commit this error
// to the jobs table; but to be safe, make sure this error is marked as jobs
// retryable error to ensure that some other node retries this changefeed.
return jobs.MarkAsRetryJobError(cause)
}
// GC TTL errors are always fatal.
if errors.HasType(cause, (*roachpb.BatchTimestampBeforeGCError)(nil)) {
return WithTerminalError(cause)
}
// Explicitly marked terminal errors are terminal.
if errors.Is(cause, &terminalError{}) {
return cause
}
// All other errors retry.
return nil
}