Skip to content

Commit

Permalink
Add timeout for uploads
Browse files Browse the repository at this point in the history
Add more logs fo receive

Signed-off-by: Kemal Akkoyun <[email protected]>
  • Loading branch information
kakkoyun committed Jul 14, 2020
1 parent f15b0c0 commit 642b609
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 27 deletions.
55 changes: 38 additions & 17 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb"
kingpin "gopkg.in/alecthomas/kingpin.v2"
"gopkg.in/alecthomas/kingpin.v2"

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
Expand Down Expand Up @@ -84,6 +84,8 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {

forwardTimeout := modelDuration(cmd.Flag("receive-forward-timeout", "Timeout for forward requests.").Default("5s").Hidden())

uploadTimeout := modelDuration(cmd.Flag("receive.upload-timeout", "Timeout for the initial and last upload request.").Default("10m").Hidden())

tsdbMinBlockDuration := modelDuration(cmd.Flag("tsdb.min-block-duration", "Min duration for local TSDB blocks").Default("2h").Hidden())
tsdbMaxBlockDuration := modelDuration(cmd.Flag("tsdb.max-block-duration", "Max duration for local TSDB blocks").Default("2h").Hidden())
ignoreBlockSize := cmd.Flag("shipper.ignore-unequal-block-size", "If true receive will not require min and max block size flags to be set to the same value. Only use this if you want to keep long retention and compaction enabled, as in the worst case it can result in ~2h data loss for your Thanos bucket storage.").Default("false").Hidden().Bool()
Expand Down Expand Up @@ -163,8 +165,9 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {
*replicaHeader,
*replicationFactor,
time.Duration(*forwardTimeout),
comp,
time.Duration(*uploadTimeout),
*allowOutOfOrderUpload,
comp,
)
}
}
Expand Down Expand Up @@ -202,8 +205,9 @@ func runReceive(
replicaHeader string,
replicationFactor uint64,
forwardTimeout time.Duration,
comp component.SourceStoreAPI,
uploadTimeout time.Duration,
allowOutOfOrderUpload bool,
comp component.SourceStoreAPI,
) error {
logger = log.With(logger, "component", "receive")
level.Warn(logger).Log("msg", "setting up receive")
Expand Down Expand Up @@ -294,6 +298,7 @@ func runReceive(

level.Debug(logger).Log("msg", "setting up tsdb")
{
log.With(logger, "component", "storage")
dbUpdatesStarted := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_receive_multi_db_updates_attempted_total",
Help: "Number of Multi DB attempted reloads with flush and potential upload due to hashring changes",
Expand All @@ -311,12 +316,15 @@ func runReceive(

// Before quitting, ensure the WAL is flushed and the DBs are closed.
defer func() {
level.Info(logger).Log("msg", "shutting down Multi TSDB")
if err := dbs.Flush(); err != nil {
level.Warn(logger).Log("err", err, "msg", "failed to flush storage")
level.Error(logger).Log("err", err, "msg", "failed to flush storage")
}
if err := dbs.Close(); err != nil {
level.Warn(logger).Log("err", err, "msg", "failed to close multi db")
level.Error(logger).Log("err", err, "msg", "failed to close storage")
return
}
level.Info(logger).Log("msg", "Multi TSDB is closed")
}()

for {
Expand All @@ -328,7 +336,7 @@ func runReceive(
return nil
}
dbUpdatesStarted.Inc()
level.Info(logger).Log("msg", "updating Multi DB")
level.Info(logger).Log("msg", "updating Multi TSDB")

if err := dbs.Flush(); err != nil {
return errors.Wrap(err, "flushing storage")
Expand All @@ -341,7 +349,7 @@ func runReceive(
<-uploadDone
}
statusProber.Ready()
level.Info(logger).Log("msg", "tsdb started, and server is ready to receive web requests")
level.Info(logger).Log("msg", "Multi TSDB started, and server is ready to receive web requests")
dbUpdatesCompleted.Inc()
dbReady <- struct{}{}
}
Expand Down Expand Up @@ -394,7 +402,7 @@ func runReceive(
return nil
}
webHandler.Hashring(h)
msg := "hashring has changed; server is not ready to receive web requests."
msg := "hashring has changed; server is not ready to receive web requests"
statusProber.NotReady(errors.New(msg))
level.Info(logger).Log("msg", msg)
hashringChangedChan <- struct{}{}
Expand Down Expand Up @@ -489,20 +497,27 @@ func runReceive(
}

if upload {
level.Debug(logger).Log("msg", "upload enabled")
if err := dbs.Sync(context.Background()); err != nil {
level.Warn(logger).Log("msg", "initial upload failed", "err", err)
logger := log.With(logger, "component", "uploader")
{
level.Info(logger).Log("msg", "upload enabled, starting initial sync")
ctx, cancel := context.WithTimeout(context.Background(), uploadTimeout)
if err := dbs.Sync(ctx); err != nil {
cancel()
level.Warn(logger).Log("msg", "initial upload failed", "err", err)
}
cancel()
level.Info(logger).Log("msg", "initial sync done")
}

{
// Run the uploader in a loop.
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
level.Debug(logger).Log("msg", "recurring upload starting")
if err := dbs.Sync(ctx); err != nil {
level.Warn(logger).Log("msg", "interval upload failed", "err", err)
level.Warn(logger).Log("msg", "recurring upload failed", "err", err)
}

level.Debug(logger).Log("msg", "upload done")
return nil
})
}, func(error) {
Expand All @@ -521,9 +536,15 @@ func runReceive(
// Before quitting, ensure all blocks are uploaded.
defer func() {
<-uploadC
if err := dbs.Sync(context.Background()); err != nil {
level.Warn(logger).Log("msg", "on demnad upload failed", "err", err)
level.Info(logger).Log("msg", "uploading the last cut block before exiting")
dctx, dCancel := context.WithTimeout(context.Background(), uploadTimeout)
if err := dbs.Sync(dctx); err != nil {
dCancel()
level.Error(logger).Log("msg", "on demand upload failed", "err", err)
return
}
dCancel()
level.Info(logger).Log("msg", "the last cut block is uploaded")
}()
defer close(uploadDone)
for {
Expand Down
5 changes: 3 additions & 2 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
terrors "github.com/prometheus/prometheus/tsdb/errors"
"golang.org/x/sync/errgroup"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
"golang.org/x/sync/errgroup"
)

type MultiTSDB struct {
Expand Down Expand Up @@ -57,7 +58,7 @@ func NewMultiTSDB(

return &MultiTSDB{
dataDir: dataDir,
logger: l,
logger: log.With(l, "component", "multi-tsdb"),
reg: reg,
tsdbOpts: tsdbOpts,
mtx: &sync.RWMutex{},
Expand Down
15 changes: 9 additions & 6 deletions pkg/server/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/rules/rulespb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
grpc_health "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/rules/rulespb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
)

// A Server defines parameters to serve RPC requests, a wrapper around grpc.Server.
Expand Down Expand Up @@ -123,10 +124,11 @@ func (s *Server) ListenAndServe() error {
// Shutdown gracefully shuts down the server by waiting,
// for specified amount of time (by gracePeriod) for connections to return to idle and then shut down.
func (s *Server) Shutdown(err error) {
defer level.Info(s.logger).Log("msg", "internal server shutdown", "err", err)
level.Info(s.logger).Log("msg", "internal server is shutting down", "err", err)

if s.opts.gracePeriod == 0 {
s.srv.Stop()
level.Info(s.logger).Log("msg", "internal server is shutdown", "err", err)
return
}

Expand All @@ -147,6 +149,7 @@ func (s *Server) Shutdown(err error) {
case <-stopped:
cancel()
}
level.Info(s.logger).Log("msg", "internal server is shutdown", "err", err)
}

// ReadWriteStoreServer is a StoreServer and a WriteableStoreServer.
Expand Down
7 changes: 5 additions & 2 deletions pkg/server/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/prober"
)
Expand Down Expand Up @@ -60,15 +61,15 @@ func (s *Server) ListenAndServe() error {
// Shutdown gracefully shuts down the server by waiting,
// for specified amount of time (by gracePeriod) for connections to return to idle and then shut down.
func (s *Server) Shutdown(err error) {
level.Info(s.logger).Log("msg", "internal server is shutting down", "err", err)
if err == http.ErrServerClosed {
level.Warn(s.logger).Log("msg", "internal server closed unexpectedly")
return
}

defer level.Info(s.logger).Log("msg", "internal server shutdown", "err", err)

if s.opts.gracePeriod == 0 {
s.srv.Close()
level.Info(s.logger).Log("msg", "internal server is shutdown", "err", err)
return
}

Expand All @@ -77,7 +78,9 @@ func (s *Server) Shutdown(err error) {

if err := s.srv.Shutdown(ctx); err != nil {
level.Error(s.logger).Log("msg", "internal server shut down failed", "err", err)
return
}
level.Info(s.logger).Log("msg", "internal server is shutdown", "err", err)
}

// Handle registers the handler for the given pattern.
Expand Down

0 comments on commit 642b609

Please sign in to comment.