Skip to content

Commit

Permalink
receive: Added tenant ID error handling of remote write requests. (#5269
Browse files Browse the repository at this point in the history
)

Plus better explanation.

Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka authored Apr 7, 2022
1 parent 6458129 commit 36e9d2a
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 34 deletions.
59 changes: 35 additions & 24 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,28 +259,27 @@ type replica struct {
}

func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenant string, wreq *prompb.WriteRequest) error {
tLogger := log.With(h.logger, "tenant", tenant)

// This replica value is used to detect cycles in cyclic topologies.
// A non-zero value indicates that the request has already been replicated by a previous receive instance.
// For almost all users, this is only used in fully connected topologies of IngestorRouter instances.
// For acyclic topologies that use RouterOnly and IngestorOnly instances, this causes issues when replicating data.
// See discussion in: https://github.com/thanos-io/thanos/issues/4359
// See discussion in: https://github.com/thanos-io/thanos/issues/4359.
if h.receiverMode == RouterOnly || h.receiverMode == IngestorOnly {
rep = 0
}

// The replica value in the header is one-indexed, thus we need >.
if rep > h.options.ReplicationFactor {
level.Error(h.logger).Log("err", errBadReplica, "msg", "write request rejected",
level.Error(tLogger).Log("err", errBadReplica, "msg", "write request rejected",
"request_replica", rep, "replication_factor", h.options.ReplicationFactor)
return errBadReplica
}

r := replica{
n: rep,
replicated: rep != 0,
}
r := replica{n: rep, replicated: rep != 0}

// On the wire, format is 1-indexed and in-code is 0-indexed so we decrement the value if it was already replicated.
// On the wire, format is 1-indexed and in-code is 0-indexed, so we decrement the value if it was already replicated.
if r.replicated {
r.n--
}
Expand All @@ -295,6 +294,13 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
span, ctx := tracing.StartSpan(r.Context(), "receive_http")
defer span.Finish()

tenant := r.Header.Get(h.options.TenantHeader)
if tenant == "" {
tenant = h.options.DefaultTenantID
}

tLogger := log.With(h.logger, "tenant", tenant)

// ioutil.ReadAll dynamically adjust the byte slice for read data, starting from 512B.
// Since this is receive hot path, grow upfront saving allocations and CPU time.
compressed := bytes.Buffer{}
Expand All @@ -311,7 +317,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {

reqBuf, err := s2.Decode(nil, compressed.Bytes())
if err != nil {
level.Error(h.logger).Log("msg", "snappy decode error", "err", err)
level.Error(tLogger).Log("msg", "snappy decode error", "err", err)
http.Error(w, errors.Wrap(err, "snappy decode error").Error(), http.StatusBadRequest)
return
}
Expand All @@ -334,21 +340,22 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
}
}

tenant := r.Header.Get(h.options.TenantHeader)
if tenant == "" {
tenant = h.options.DefaultTenantID
}

// TODO(yeya24): handle remote write metadata.
// exit early if the request contained no data
// Exit early if the request contained no data. We don't support metadata yet. We also cannot fail here, because
// this would mean lack of forward compatibility for remote write proto.
if len(wreq.Timeseries) == 0 {
level.Debug(h.logger).Log("msg", "empty timeseries from client", "tenant", tenant)
// TODO(yeya24): Handle remote write metadata.
if len(wreq.Metadata) > 0 {
// TODO(bwplotka): Do we need this error message?
level.Debug(tLogger).Log("msg", "only metadata from client; metadata ingestion not supported; skipping")
return
}
level.Debug(tLogger).Log("msg", "empty remote write request; client bug or newer remote write protocol used?; skipping")
return
}

err = h.handleRequest(ctx, rep, tenant, &wreq)
if err != nil {
level.Debug(h.logger).Log("msg", "failed to handle request", "err", err)
level.Debug(tLogger).Log("msg", "failed to handle request", "err", err)
}

switch determineWriteErrorCause(err, 1) {
Expand All @@ -363,7 +370,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
case errBadReplica:
http.Error(w, err.Error(), http.StatusBadRequest)
default:
level.Error(h.logger).Log("err", err, "msg", "internal server error")
level.Error(tLogger).Log("err", err, "msg", "internal server error")
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
Expand Down Expand Up @@ -436,9 +443,13 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma
}
}()

logTags := []interface{}{"tenant", tenant}
if id, ok := middleware.RequestIDFromContext(pctx); ok {
logTags = append(logTags, "request-id", id)
var tLogger log.Logger
{
logTags := []interface{}{"tenant", tenant}
if id, ok := middleware.RequestIDFromContext(pctx); ok {
logTags = append(logTags, "request-id", id)
}
tLogger = log.With(h.logger, logTags)
}

ec := make(chan error)
Expand Down Expand Up @@ -488,7 +499,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma
if err != nil {
// When a MultiError is added to another MultiError, the error slices are concatenated, not nested.
// To avoid breaking the counting logic, we need to flatten the error.
level.Debug(h.logger).Log(append(logTags, "msg", "local tsdb write failed", "err", err.Error()))
level.Debug(tLogger).Log("msg", "local tsdb write failed", "err", err.Error())
ec <- errors.Wrapf(determineWriteErrorCause(err, 1), "store locally for endpoint %v", endpoint)
return
}
Expand Down Expand Up @@ -551,7 +562,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma
b.attempt++
dur := h.expBackoff.ForAttempt(b.attempt)
b.nextAllowed = time.Now().Add(dur)
level.Debug(h.logger).Log(append(logTags, "msg", "target unavailable backing off", "for", dur))
level.Debug(tLogger).Log("msg", "target unavailable backing off", "for", dur)
} else {
h.peerStates[endpoint] = &retryState{nextAllowed: time.Now().Add(h.expBackoff.ForAttempt(0))}
}
Expand Down Expand Up @@ -580,7 +591,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma
go func() {
for err := range ec {
if err != nil {
level.Debug(h.logger).Log(append(logTags, "msg", "request failed, but not needed to achieve quorum", "err", err))
level.Debug(tLogger).Log("msg", "request failed, but not needed to achieve quorum", "err", err)
}
}
}()
Expand Down
22 changes: 12 additions & 10 deletions pkg/receive/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func NewWriter(logger log.Logger, multiTSDB TenantStorage) *Writer {
}

func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteRequest) error {
tLogger := log.With(r.logger, "tenant", tenantID)

var (
numOutOfOrder = 0
numDuplicates = 0
Expand Down Expand Up @@ -85,13 +87,13 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
switch err {
case storage.ErrOutOfOrderSample:
numOutOfOrder++
level.Debug(r.logger).Log("msg", "Out of order sample", "lset", lset, "sample", s)
level.Debug(tLogger).Log("msg", "Out of order sample", "lset", lset, "value", s.Value, "timestamp", s.Timestamp)
case storage.ErrDuplicateSampleForTimestamp:
numDuplicates++
level.Debug(r.logger).Log("msg", "Duplicate sample for timestamp", "lset", lset, "sample", s)
level.Debug(tLogger).Log("msg", "Duplicate sample for timestamp", "lset", lset, "value", s.Value, "timestamp", s.Timestamp)
case storage.ErrOutOfBounds:
numOutOfBounds++
level.Debug(r.logger).Log("msg", "Out of bounds metric", "lset", lset, "sample", s)
level.Debug(tLogger).Log("msg", "Out of bounds metric", "lset", lset, "value", s.Value, "timestamp", s.Timestamp)
}
}

Expand All @@ -100,7 +102,7 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
if ref != 0 && len(t.Exemplars) > 0 {
for _, ex := range t.Exemplars {
exLset := labelpb.ZLabelsToPromLabels(ex.Labels)
logger := log.With(r.logger, "exemplarLset", exLset, "exemplar", ex.String())
logger := log.With(tLogger, "exemplarLset", exLset, "exemplar", ex.String())

_, err = app.AppendExemplar(ref, lset, exemplar.Exemplar{
Labels: exLset,
Expand Down Expand Up @@ -128,27 +130,27 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
}

if numOutOfOrder > 0 {
level.Warn(r.logger).Log("msg", "Error on ingesting out-of-order samples", "numDropped", numOutOfOrder)
level.Warn(tLogger).Log("msg", "Error on ingesting out-of-order samples", "numDropped", numOutOfOrder)
errs.Add(errors.Wrapf(storage.ErrOutOfOrderSample, "add %d samples", numOutOfOrder))
}
if numDuplicates > 0 {
level.Warn(r.logger).Log("msg", "Error on ingesting samples with different value but same timestamp", "numDropped", numDuplicates)
level.Warn(tLogger).Log("msg", "Error on ingesting samples with different value but same timestamp", "numDropped", numDuplicates)
errs.Add(errors.Wrapf(storage.ErrDuplicateSampleForTimestamp, "add %d samples", numDuplicates))
}
if numOutOfBounds > 0 {
level.Warn(r.logger).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "numDropped", numOutOfBounds)
level.Warn(tLogger).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "numDropped", numOutOfBounds)
errs.Add(errors.Wrapf(storage.ErrOutOfBounds, "add %d samples", numOutOfBounds))
}
if numExemplarsOutOfOrder > 0 {
level.Warn(r.logger).Log("msg", "Error on ingesting out-of-order exemplars", "numDropped", numExemplarsOutOfOrder)
level.Warn(tLogger).Log("msg", "Error on ingesting out-of-order exemplars", "numDropped", numExemplarsOutOfOrder)
errs.Add(errors.Wrapf(storage.ErrOutOfOrderExemplar, "add %d exemplars", numExemplarsOutOfOrder))
}
if numExemplarsDuplicate > 0 {
level.Warn(r.logger).Log("msg", "Error on ingesting duplicate exemplars", "numDropped", numExemplarsDuplicate)
level.Warn(tLogger).Log("msg", "Error on ingesting duplicate exemplars", "numDropped", numExemplarsDuplicate)
errs.Add(errors.Wrapf(storage.ErrDuplicateExemplar, "add %d exemplars", numExemplarsDuplicate))
}
if numExemplarsLabelLength > 0 {
level.Warn(r.logger).Log("msg", "Error on ingesting exemplars with label length exceeding maximum limit", "numDropped", numExemplarsLabelLength)
level.Warn(tLogger).Log("msg", "Error on ingesting exemplars with label length exceeding maximum limit", "numDropped", numExemplarsLabelLength)
errs.Add(errors.Wrapf(storage.ErrExemplarLabelLength, "add %d exemplars", numExemplarsLabelLength))
}

Expand Down

0 comments on commit 36e9d2a

Please sign in to comment.