From d02488d9be253223393515a0f9c1fce5318ca3af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Mon, 22 Jul 2019 12:29:15 +0300 Subject: [PATCH] runutil: add Exhaust* fns, initial users (#1302) * runutil: add Exhaust* fns, initial users * runutil: explicitly ignore return values * runutil: fix according to comments * pkg/*: convert more users to Exhaust*() * runutil: fix errcheck * CHANGELOG: add item * runutil: inform the user if exhaustion fails * CHANGELOG: fix up the merge * runutil: add missing `if err != nil` --- CHANGELOG.md | 2 ++ pkg/alert/alert.go | 2 +- pkg/objstore/cos/cos.go | 2 +- pkg/promclient/promclient.go | 10 +++++----- pkg/reloader/reloader.go | 2 +- pkg/runutil/runutil.go | 31 +++++++++++++++++++++++++++++++ pkg/store/prometheus.go | 6 +++--- 7 files changed, 44 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 62b2e874a8..bf3ebe3dae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,8 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#1327](https://github.com/thanos-io/thanos/pull/1327) `/series` API end-point now properly returns an empty array just like Prometheus if there are no results +- [#1302](https://github.com/thanos-io/thanos/pull/1302) Thanos now efficiently reuses HTTP keep-alive connections + ## [v0.6.0](https://github.com/thanos-io/thanos/releases/tag/v0.6.0) - 2019.07.18 ### Added diff --git a/pkg/alert/alert.go b/pkg/alert/alert.go index 52bd880691..ca8ba09234 100644 --- a/pkg/alert/alert.go +++ b/pkg/alert/alert.go @@ -374,7 +374,7 @@ func (s *Sender) sendOne(ctx context.Context, url string, b []byte) error { if err != nil { return errors.Wrapf(err, "send request to %q", url) } - defer runutil.CloseWithLogOnErr(s.logger, resp.Body, "send one alert") + defer runutil.ExhaustCloseWithLogOnErr(s.logger, resp.Body, "send one alert") if resp.StatusCode/100 != 2 { return errors.Errorf("bad response status %v from %q", resp.Status, url) diff --git a/pkg/objstore/cos/cos.go b/pkg/objstore/cos/cos.go index d348152cce..d81264ef0e 100644 --- a/pkg/objstore/cos/cos.go +++ b/pkg/objstore/cos/cos.go @@ -145,7 +145,7 @@ func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) ( return nil, err } if _, err := resp.Body.Read(nil); err != nil { - runutil.CloseWithLogOnErr(b.logger, resp.Body, "cos get range obj close") + runutil.ExhaustCloseWithLogOnErr(b.logger, resp.Body, "cos get range obj close") return nil, err } diff --git a/pkg/promclient/promclient.go b/pkg/promclient/promclient.go index a67ed041b9..054d1efc9a 100644 --- a/pkg/promclient/promclient.go +++ b/pkg/promclient/promclient.go @@ -68,7 +68,7 @@ func ExternalLabels(ctx context.Context, logger log.Logger, base *url.URL) (labe if err != nil { return nil, errors.Wrapf(err, "request flags against %s", u.String()) } - defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body") + defer runutil.ExhaustCloseWithLogOnErr(logger, resp.Body, "query body") b, err := ioutil.ReadAll(resp.Body) if err != nil { @@ -185,7 +185,7 @@ func ConfiguredFlags(ctx context.Context, logger log.Logger, base *url.URL) (Fla if err != nil { return Flags{}, errors.Wrapf(err, "request config against %s", u.String()) } - defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body") + defer runutil.ExhaustCloseWithLogOnErr(logger, resp.Body, "query body") b, err := ioutil.ReadAll(resp.Body) if err != nil { @@ -234,7 +234,7 @@ func Snapshot(ctx context.Context, logger log.Logger, base *url.URL, skipHead bo if err != nil { return "", errors.Wrapf(err, "request snapshot against %s", u.String()) } - defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body") + defer runutil.ExhaustCloseWithLogOnErr(logger, resp.Body, "query body") b, err := ioutil.ReadAll(resp.Body) if err != nil { @@ -317,7 +317,7 @@ func QueryInstant(ctx context.Context, logger log.Logger, base *url.URL, query s if err != nil { return nil, nil, errors.Wrapf(err, "perform GET request against %s", u.String()) } - defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body") + defer runutil.ExhaustCloseWithLogOnErr(logger, resp.Body, "query body") // Decode only ResultType and load Result only as RawJson since we don't know // structure of the Result yet. @@ -452,7 +452,7 @@ func MetricValues(ctx context.Context, logger log.Logger, base *url.URL, perMetr if err != nil { return errors.Wrapf(err, "perform GET request against %s", u.String()) } - defer runutil.CloseWithLogOnErr(logger, resp.Body, "metrics body") + defer runutil.ExhaustCloseWithLogOnErr(logger, resp.Body, "metrics body") if resp.StatusCode != http.StatusOK { return errors.Errorf("server returned HTTP status %s", resp.Status) diff --git a/pkg/reloader/reloader.go b/pkg/reloader/reloader.go index 44dceaaf6d..5efa0f9b6d 100644 --- a/pkg/reloader/reloader.go +++ b/pkg/reloader/reloader.go @@ -329,7 +329,7 @@ func (r *Reloader) triggerReload(ctx context.Context) error { if err != nil { return errors.Wrap(err, "reload request failed") } - defer runutil.CloseWithLogOnErr(r.logger, resp.Body, "trigger reload resp body") + defer runutil.ExhaustCloseWithLogOnErr(r.logger, resp.Body, "trigger reload resp body") if resp.StatusCode != 200 { return errors.Errorf("received non-200 response: %s; have you set `--web.enable-lifecycle` Prometheus flag?", resp.Status) diff --git a/pkg/runutil/runutil.go b/pkg/runutil/runutil.go index ae63cae4a2..48ceb0a124 100644 --- a/pkg/runutil/runutil.go +++ b/pkg/runutil/runutil.go @@ -38,11 +38,18 @@ // // ... // // If Close() returns error, err will capture it and return by argument. +// +// The rununtil.Exhaust* family of functions provide the same functionality but +// they take an io.ReadCloser and they exhaust the whole reader before closing +// them. They are useful when trying to use http keep-alive connections because +// for the same connection to be re-used the whole response body needs to be +// exhausted. package runutil import ( "fmt" "io" + "io/ioutil" "os" "time" @@ -108,6 +115,16 @@ func CloseWithLogOnErr(logger log.Logger, closer io.Closer, format string, a ... level.Warn(logger).Log("msg", "detected close error", "err", errors.Wrap(err, fmt.Sprintf(format, a...))) } +// ExhaustCloseWithLogOnErr closes the io.ReadCloser with a log message on error but exhausts the reader before. +func ExhaustCloseWithLogOnErr(logger log.Logger, r io.ReadCloser, format string, a ...interface{}) { + _, err := io.Copy(ioutil.Discard, r) + if err != nil { + level.Warn(logger).Log("msg", "failed to exhaust reader, performance may be impeded", "err", err) + } + + CloseWithLogOnErr(logger, r, format, a...) +} + // CloseWithErrCapture runs function and on error return error by argument including the given error (usually // from caller function). func CloseWithErrCapture(err *error, closer io.Closer, format string, a ...interface{}) { @@ -118,3 +135,17 @@ func CloseWithErrCapture(err *error, closer io.Closer, format string, a ...inter *err = merr.Err() } + +// ExhaustCloseWithErrCapture closes the io.ReadCloser with error capture but exhausts the reader before. +func ExhaustCloseWithErrCapture(err *error, r io.ReadCloser, format string, a ...interface{}) { + _, copyErr := io.Copy(ioutil.Discard, r) + + CloseWithErrCapture(err, r, format, a...) + + // Prepend the io.Copy error. + merr := tsdberrors.MultiError{} + merr.Add(copyErr) + merr.Add(*err) + + *err = merr.Err() +} diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index d3fdaf5de4..2874884f17 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -258,7 +258,7 @@ func (p *PrometheusStore) promSeries(ctx context.Context, q prompb.Query) (*prom return nil, errors.Wrap(err, "send request") } spanReqDo.Finish() - defer runutil.CloseWithLogOnErr(p.logger, presp.Body, "prom series request body") + defer runutil.ExhaustCloseWithLogOnErr(p.logger, presp.Body, "prom series request body") if presp.StatusCode/100 != 2 { return nil, errors.Errorf("request failed with code %s", presp.Status) @@ -388,7 +388,7 @@ func (p *PrometheusStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesR if err != nil { return nil, status.Error(codes.Internal, err.Error()) } - defer runutil.CloseWithLogOnErr(p.logger, resp.Body, "label names request body") + defer runutil.ExhaustCloseWithLogOnErr(p.logger, resp.Body, "label names request body") if resp.StatusCode/100 != 2 { return nil, status.Error(codes.Internal, fmt.Sprintf("request Prometheus server failed, code %s", resp.Status)) @@ -448,7 +448,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue if err != nil { return nil, status.Error(codes.Internal, err.Error()) } - defer runutil.CloseWithLogOnErr(p.logger, resp.Body, "label values request body") + defer runutil.ExhaustCloseWithLogOnErr(p.logger, resp.Body, "label values request body") if resp.StatusCode/100 != 2 { return nil, status.Error(codes.Internal, fmt.Sprintf("request Prometheus server failed, code %s", resp.Status))