Skip to content

Commit

Permalink
runutil: add Exhaust* fns, initial users (#1302)
Browse files Browse the repository at this point in the history
* runutil: add Exhaust* fns, initial users

* runutil: explicitly ignore return values

* runutil: fix according to comments

* pkg/*: convert more users to Exhaust*()

* runutil: fix errcheck

* CHANGELOG: add item

* runutil: inform the user if exhaustion fails

* CHANGELOG: fix up the merge

* runutil: add missing `if err != nil`
  • Loading branch information
GiedriusS authored Jul 22, 2019
1 parent 546fdcd commit d02488d
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 11 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ We use *breaking* word for marking changes that are not backward compatible (rel

- [#1327](https://github.com/thanos-io/thanos/pull/1327) `/series` API end-point now properly returns an empty array just like Prometheus if there are no results

- [#1302](https://github.com/thanos-io/thanos/pull/1302) Thanos now efficiently reuses HTTP keep-alive connections

## [v0.6.0](https://github.com/thanos-io/thanos/releases/tag/v0.6.0) - 2019.07.18

### Added
Expand Down
2 changes: 1 addition & 1 deletion pkg/alert/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func (s *Sender) sendOne(ctx context.Context, url string, b []byte) error {
if err != nil {
return errors.Wrapf(err, "send request to %q", url)
}
defer runutil.CloseWithLogOnErr(s.logger, resp.Body, "send one alert")
defer runutil.ExhaustCloseWithLogOnErr(s.logger, resp.Body, "send one alert")

if resp.StatusCode/100 != 2 {
return errors.Errorf("bad response status %v from %q", resp.Status, url)
Expand Down
2 changes: 1 addition & 1 deletion pkg/objstore/cos/cos.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) (
return nil, err
}
if _, err := resp.Body.Read(nil); err != nil {
runutil.CloseWithLogOnErr(b.logger, resp.Body, "cos get range obj close")
runutil.ExhaustCloseWithLogOnErr(b.logger, resp.Body, "cos get range obj close")
return nil, err
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func ExternalLabels(ctx context.Context, logger log.Logger, base *url.URL) (labe
if err != nil {
return nil, errors.Wrapf(err, "request flags against %s", u.String())
}
defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body")
defer runutil.ExhaustCloseWithLogOnErr(logger, resp.Body, "query body")

b, err := ioutil.ReadAll(resp.Body)
if err != nil {
Expand Down Expand Up @@ -185,7 +185,7 @@ func ConfiguredFlags(ctx context.Context, logger log.Logger, base *url.URL) (Fla
if err != nil {
return Flags{}, errors.Wrapf(err, "request config against %s", u.String())
}
defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body")
defer runutil.ExhaustCloseWithLogOnErr(logger, resp.Body, "query body")

b, err := ioutil.ReadAll(resp.Body)
if err != nil {
Expand Down Expand Up @@ -234,7 +234,7 @@ func Snapshot(ctx context.Context, logger log.Logger, base *url.URL, skipHead bo
if err != nil {
return "", errors.Wrapf(err, "request snapshot against %s", u.String())
}
defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body")
defer runutil.ExhaustCloseWithLogOnErr(logger, resp.Body, "query body")

b, err := ioutil.ReadAll(resp.Body)
if err != nil {
Expand Down Expand Up @@ -317,7 +317,7 @@ func QueryInstant(ctx context.Context, logger log.Logger, base *url.URL, query s
if err != nil {
return nil, nil, errors.Wrapf(err, "perform GET request against %s", u.String())
}
defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body")
defer runutil.ExhaustCloseWithLogOnErr(logger, resp.Body, "query body")

// Decode only ResultType and load Result only as RawJson since we don't know
// structure of the Result yet.
Expand Down Expand Up @@ -452,7 +452,7 @@ func MetricValues(ctx context.Context, logger log.Logger, base *url.URL, perMetr
if err != nil {
return errors.Wrapf(err, "perform GET request against %s", u.String())
}
defer runutil.CloseWithLogOnErr(logger, resp.Body, "metrics body")
defer runutil.ExhaustCloseWithLogOnErr(logger, resp.Body, "metrics body")

if resp.StatusCode != http.StatusOK {
return errors.Errorf("server returned HTTP status %s", resp.Status)
Expand Down
2 changes: 1 addition & 1 deletion pkg/reloader/reloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func (r *Reloader) triggerReload(ctx context.Context) error {
if err != nil {
return errors.Wrap(err, "reload request failed")
}
defer runutil.CloseWithLogOnErr(r.logger, resp.Body, "trigger reload resp body")
defer runutil.ExhaustCloseWithLogOnErr(r.logger, resp.Body, "trigger reload resp body")

if resp.StatusCode != 200 {
return errors.Errorf("received non-200 response: %s; have you set `--web.enable-lifecycle` Prometheus flag?", resp.Status)
Expand Down
31 changes: 31 additions & 0 deletions pkg/runutil/runutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,18 @@
// // ...
//
// If Close() returns error, err will capture it and return by argument.
//
// The rununtil.Exhaust* family of functions provide the same functionality but
// they take an io.ReadCloser and they exhaust the whole reader before closing
// them. They are useful when trying to use http keep-alive connections because
// for the same connection to be re-used the whole response body needs to be
// exhausted.
package runutil

import (
"fmt"
"io"
"io/ioutil"
"os"
"time"

Expand Down Expand Up @@ -108,6 +115,16 @@ func CloseWithLogOnErr(logger log.Logger, closer io.Closer, format string, a ...
level.Warn(logger).Log("msg", "detected close error", "err", errors.Wrap(err, fmt.Sprintf(format, a...)))
}

// ExhaustCloseWithLogOnErr closes the io.ReadCloser with a log message on error but exhausts the reader before.
func ExhaustCloseWithLogOnErr(logger log.Logger, r io.ReadCloser, format string, a ...interface{}) {
_, err := io.Copy(ioutil.Discard, r)
if err != nil {
level.Warn(logger).Log("msg", "failed to exhaust reader, performance may be impeded", "err", err)
}

CloseWithLogOnErr(logger, r, format, a...)
}

// CloseWithErrCapture runs function and on error return error by argument including the given error (usually
// from caller function).
func CloseWithErrCapture(err *error, closer io.Closer, format string, a ...interface{}) {
Expand All @@ -118,3 +135,17 @@ func CloseWithErrCapture(err *error, closer io.Closer, format string, a ...inter

*err = merr.Err()
}

// ExhaustCloseWithErrCapture closes the io.ReadCloser with error capture but exhausts the reader before.
func ExhaustCloseWithErrCapture(err *error, r io.ReadCloser, format string, a ...interface{}) {
_, copyErr := io.Copy(ioutil.Discard, r)

CloseWithErrCapture(err, r, format, a...)

// Prepend the io.Copy error.
merr := tsdberrors.MultiError{}
merr.Add(copyErr)
merr.Add(*err)

*err = merr.Err()
}
6 changes: 3 additions & 3 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (p *PrometheusStore) promSeries(ctx context.Context, q prompb.Query) (*prom
return nil, errors.Wrap(err, "send request")
}
spanReqDo.Finish()
defer runutil.CloseWithLogOnErr(p.logger, presp.Body, "prom series request body")
defer runutil.ExhaustCloseWithLogOnErr(p.logger, presp.Body, "prom series request body")

if presp.StatusCode/100 != 2 {
return nil, errors.Errorf("request failed with code %s", presp.Status)
Expand Down Expand Up @@ -388,7 +388,7 @@ func (p *PrometheusStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesR
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer runutil.CloseWithLogOnErr(p.logger, resp.Body, "label names request body")
defer runutil.ExhaustCloseWithLogOnErr(p.logger, resp.Body, "label names request body")

if resp.StatusCode/100 != 2 {
return nil, status.Error(codes.Internal, fmt.Sprintf("request Prometheus server failed, code %s", resp.Status))
Expand Down Expand Up @@ -448,7 +448,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer runutil.CloseWithLogOnErr(p.logger, resp.Body, "label values request body")
defer runutil.ExhaustCloseWithLogOnErr(p.logger, resp.Body, "label values request body")

if resp.StatusCode/100 != 2 {
return nil, status.Error(codes.Internal, fmt.Sprintf("request Prometheus server failed, code %s", resp.Status))
Expand Down

0 comments on commit d02488d

Please sign in to comment.