Skip to content

Commit

Permalink
Add timeout for all uploads
Browse files Browse the repository at this point in the history
Merge uploader routines

Address review issues

Signed-off-by: Kemal Akkoyun <[email protected]>
  • Loading branch information
kakkoyun committed Jul 14, 2020
1 parent 642b609 commit 5d6a04c
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 39 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel

- [#2832](https://github.com/thanos-io/thanos/pull/2832) ui: React: Add runtime and build info page
- [#2305](https://github.com/thanos-io/thanos/pull/2305) Receive,Sidecar,Ruler: Propagate correct (stricter) MinTime for no-block TSDBs.
- [#2892](https://github.com/thanos-io/thanos/pull/2892) receive: Add time-out for each block upload. And receiver fails when the initial upload fails.

## [v0.14.0](https://github.com/thanos-io/thanos/releases/tag/v0.14.0) - 2020.07.10

Expand Down
80 changes: 41 additions & 39 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {

replicationFactor := cmd.Flag("receive.replication-factor", "How many times to replicate incoming write requests.").Default("1").Uint64()

forwardTimeout := modelDuration(cmd.Flag("receive-forward-timeout", "Timeout for forward requests.").Default("5s").Hidden())
forwardTimeout := modelDuration(cmd.Flag("receive-forward-timeout", "Timeout for each forward request.").Default("5s").Hidden())

uploadTimeout := modelDuration(cmd.Flag("receive.upload-timeout", "Timeout for the initial and last upload request.").Default("10m").Hidden())
uploadTimeout := modelDuration(cmd.Flag("receive.upload-timeout", "Timeout for each block upload request.").Default("10m").Hidden())

tsdbMinBlockDuration := modelDuration(cmd.Flag("tsdb.min-block-duration", "Min duration for local TSDB blocks").Default("2h").Hidden())
tsdbMaxBlockDuration := modelDuration(cmd.Flag("tsdb.max-block-duration", "Max duration for local TSDB blocks").Default("2h").Hidden())
Expand Down Expand Up @@ -316,15 +316,17 @@ func runReceive(

// Before quitting, ensure the WAL is flushed and the DBs are closed.
defer func() {
level.Info(logger).Log("msg", "shutting down Multi TSDB")
level.Info(logger).Log("msg", "shutting down storage")
if err := dbs.Flush(); err != nil {
level.Error(logger).Log("err", err, "msg", "failed to flush storage")
} else {
level.Info(logger).Log("msg", "storage is flushed successfully")
}
if err := dbs.Close(); err != nil {
level.Error(logger).Log("err", err, "msg", "failed to close storage")
return
}
level.Info(logger).Log("msg", "Multi TSDB is closed")
level.Info(logger).Log("msg", "storage is closed")
}()

for {
Expand All @@ -336,7 +338,7 @@ func runReceive(
return nil
}
dbUpdatesStarted.Inc()
level.Info(logger).Log("msg", "updating Multi TSDB")
level.Info(logger).Log("msg", "updating storage")

if err := dbs.Flush(); err != nil {
return errors.Wrap(err, "flushing storage")
Expand Down Expand Up @@ -498,69 +500,69 @@ func runReceive(

if upload {
logger := log.With(logger, "component", "uploader")
{
level.Info(logger).Log("msg", "upload enabled, starting initial sync")
ctx, cancel := context.WithTimeout(context.Background(), uploadTimeout)
upload := func(ctx context.Context) error {
level.Debug(logger).Log("msg", "upload starting")
start := time.Now()
ctx, cancel := context.WithTimeout(ctx, uploadTimeout)
defer cancel()

if err := dbs.Sync(ctx); err != nil {
cancel()
level.Warn(logger).Log("msg", "initial upload failed", "err", err)
level.Warn(logger).Log("msg", "upload failed", "elapsed", time.Since(start), "err", err)
} else {
level.Debug(logger).Log("msg", "upload done", "elapsed", time.Since(start))
}
cancel()
level.Info(logger).Log("msg", "initial sync done")

return nil
}
{
// Run the uploader in a loop.
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
level.Debug(logger).Log("msg", "recurring upload starting")
if err := dbs.Sync(ctx); err != nil {
level.Warn(logger).Log("msg", "recurring upload failed", "err", err)
}
level.Debug(logger).Log("msg", "upload done")
return nil
})
}, func(error) {
cancel()
})
level.Info(logger).Log("msg", "upload enabled, starting initial sync")
if err := upload(context.Background()); err != nil {
return errors.Wrapf(err, "initial upload failed")
}
level.Info(logger).Log("msg", "initial sync done")
}

{
// Upload on demand.
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
// Ensure we clean up everything properly.
defer func() {
runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
}()

// Before quitting, ensure all blocks are uploaded.
defer func() {
<-uploadC
level.Info(logger).Log("msg", "uploading the last cut block before exiting")
<-uploadC // Closed by storage routine when it's done.
level.Info(logger).Log("msg", "uploading the final cut block before exiting")
dctx, dCancel := context.WithTimeout(context.Background(), uploadTimeout)
if err := dbs.Sync(dctx); err != nil {
dCancel()
level.Error(logger).Log("msg", "on demand upload failed", "err", err)
level.Error(logger).Log("msg", "the final upload failed", "err", err)
return
}
dCancel()
level.Info(logger).Log("msg", "the last cut block is uploaded")
level.Info(logger).Log("msg", "the final cut block was uploaded")
}()

defer close(uploadDone)

// Run the uploader in a loop.
tick := time.NewTicker(30 * time.Second)
defer tick.Stop()

for {
select {
case <-ctx.Done():
return nil
default:
}
select {
case <-ctx.Done():
return nil
case <-uploadC:
if err := dbs.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err)
// Upload on demand.
if err := upload(ctx); err != nil {
level.Warn(logger).Log("msg", "on demand upload failed", "err", err)
}
uploadDone <- struct{}{}
case <-tick.C:
if err := upload(ctx); err != nil {
level.Warn(logger).Log("msg", "recurring upload failed", "err", err)
}
}
}
}, func(error) {
Expand Down

0 comments on commit 5d6a04c

Please sign in to comment.