From 5d6a04cd5198f0f8c663cf588100a4ce2a1a764e Mon Sep 17 00:00:00 2001 From: Kemal Akkoyun Date: Tue, 14 Jul 2020 17:15:13 +0200 Subject: [PATCH] Add timeout for all uploads Merge uploader routines Address review issues Signed-off-by: Kemal Akkoyun --- CHANGELOG.md | 1 + cmd/thanos/receive.go | 80 ++++++++++++++++++++++--------------------- 2 files changed, 42 insertions(+), 39 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c8a737ca623..0125f67cde5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#2832](https://github.com/thanos-io/thanos/pull/2832) ui: React: Add runtime and build info page - [#2305](https://github.com/thanos-io/thanos/pull/2305) Receive,Sidecar,Ruler: Propagate correct (stricter) MinTime for no-block TSDBs. +- [#2892](https://github.com/thanos-io/thanos/pull/2892) receive: Add time-out for each block upload. And receiver fails when the initial upload fails. ## [v0.14.0](https://github.com/thanos-io/thanos/releases/tag/v0.14.0) - 2020.07.10 diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 83f9ed15e3e..ec4b9c0e875 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -82,9 +82,9 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) { replicationFactor := cmd.Flag("receive.replication-factor", "How many times to replicate incoming write requests.").Default("1").Uint64() - forwardTimeout := modelDuration(cmd.Flag("receive-forward-timeout", "Timeout for forward requests.").Default("5s").Hidden()) + forwardTimeout := modelDuration(cmd.Flag("receive-forward-timeout", "Timeout for each forward request.").Default("5s").Hidden()) - uploadTimeout := modelDuration(cmd.Flag("receive.upload-timeout", "Timeout for the initial and last upload request.").Default("10m").Hidden()) + uploadTimeout := modelDuration(cmd.Flag("receive.upload-timeout", "Timeout for each block upload request.").Default("10m").Hidden()) tsdbMinBlockDuration := modelDuration(cmd.Flag("tsdb.min-block-duration", "Min duration for local TSDB blocks").Default("2h").Hidden()) tsdbMaxBlockDuration := modelDuration(cmd.Flag("tsdb.max-block-duration", "Max duration for local TSDB blocks").Default("2h").Hidden()) @@ -316,15 +316,17 @@ func runReceive( // Before quitting, ensure the WAL is flushed and the DBs are closed. defer func() { - level.Info(logger).Log("msg", "shutting down Multi TSDB") + level.Info(logger).Log("msg", "shutting down storage") if err := dbs.Flush(); err != nil { level.Error(logger).Log("err", err, "msg", "failed to flush storage") + } else { + level.Info(logger).Log("msg", "storage is flushed successfully") } if err := dbs.Close(); err != nil { level.Error(logger).Log("err", err, "msg", "failed to close storage") return } - level.Info(logger).Log("msg", "Multi TSDB is closed") + level.Info(logger).Log("msg", "storage is closed") }() for { @@ -336,7 +338,7 @@ func runReceive( return nil } dbUpdatesStarted.Inc() - level.Info(logger).Log("msg", "updating Multi TSDB") + level.Info(logger).Log("msg", "updating storage") if err := dbs.Flush(); err != nil { return errors.Wrap(err, "flushing storage") @@ -498,69 +500,69 @@ func runReceive( if upload { logger := log.With(logger, "component", "uploader") - { - level.Info(logger).Log("msg", "upload enabled, starting initial sync") - ctx, cancel := context.WithTimeout(context.Background(), uploadTimeout) + upload := func(ctx context.Context) error { + level.Debug(logger).Log("msg", "upload starting") + start := time.Now() + ctx, cancel := context.WithTimeout(ctx, uploadTimeout) + defer cancel() + if err := dbs.Sync(ctx); err != nil { - cancel() - level.Warn(logger).Log("msg", "initial upload failed", "err", err) + level.Warn(logger).Log("msg", "upload failed", "elapsed", time.Since(start), "err", err) + } else { + level.Debug(logger).Log("msg", "upload done", "elapsed", time.Since(start)) } - cancel() - level.Info(logger).Log("msg", "initial sync done") + + return nil } { - // Run the uploader in a loop. - ctx, cancel := context.WithCancel(context.Background()) - g.Add(func() error { - return runutil.Repeat(30*time.Second, ctx.Done(), func() error { - level.Debug(logger).Log("msg", "recurring upload starting") - if err := dbs.Sync(ctx); err != nil { - level.Warn(logger).Log("msg", "recurring upload failed", "err", err) - } - level.Debug(logger).Log("msg", "upload done") - return nil - }) - }, func(error) { - cancel() - }) + level.Info(logger).Log("msg", "upload enabled, starting initial sync") + if err := upload(context.Background()); err != nil { + return errors.Wrapf(err, "initial upload failed") + } + level.Info(logger).Log("msg", "initial sync done") } - { - // Upload on demand. ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { // Ensure we clean up everything properly. defer func() { runutil.CloseWithLogOnErr(logger, bkt, "bucket client") }() + // Before quitting, ensure all blocks are uploaded. defer func() { - <-uploadC - level.Info(logger).Log("msg", "uploading the last cut block before exiting") + <-uploadC // Closed by storage routine when it's done. + level.Info(logger).Log("msg", "uploading the final cut block before exiting") dctx, dCancel := context.WithTimeout(context.Background(), uploadTimeout) if err := dbs.Sync(dctx); err != nil { dCancel() - level.Error(logger).Log("msg", "on demand upload failed", "err", err) + level.Error(logger).Log("msg", "the final upload failed", "err", err) return } dCancel() - level.Info(logger).Log("msg", "the last cut block is uploaded") + level.Info(logger).Log("msg", "the final cut block was uploaded") }() + defer close(uploadDone) + + // Run the uploader in a loop. + tick := time.NewTicker(30 * time.Second) + defer tick.Stop() + for { - select { - case <-ctx.Done(): - return nil - default: - } select { case <-ctx.Done(): return nil case <-uploadC: - if err := dbs.Sync(ctx); err != nil { - level.Warn(logger).Log("err", err) + // Upload on demand. + if err := upload(ctx); err != nil { + level.Warn(logger).Log("msg", "on demand upload failed", "err", err) } uploadDone <- struct{}{} + case <-tick.C: + if err := upload(ctx); err != nil { + level.Warn(logger).Log("msg", "recurring upload failed", "err", err) + } } } }, func(error) {