Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(frontend): clean up tempo and trace references #50

Merged
merged 1 commit into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions modules/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,19 @@ var (
)

type Config struct {
Config v1.Config `yaml:",inline"`
MaxRetries int `yaml:"max_retries,omitempty"`
TolerateFailedBlocks int `yaml:"tolerate_failed_blocks,omitempty"`
Search SearchConfig `yaml:"search"`
TraceByID TraceByIDConfig `yaml:"trace_by_id"`
Config v1.Config `yaml:",inline"`
MaxRetries int `yaml:"max_retries,omitempty"`
TolerateFailedBlocks int `yaml:"tolerate_failed_blocks,omitempty"`
Search SearchConfig `yaml:"search"`
SnapshotByID SnapshotByIDConfig `yaml:"snapshot_by_id"`
}

type SearchConfig struct {
Sharder SearchSharderConfig `yaml:",inline"`
SLO SLOConfig `yaml:",inline"`
}

type TraceByIDConfig struct {
type SnapshotByIDConfig struct {
QueryShards int `yaml:"query_shards,omitempty"`
Hedging HedgingConfig `yaml:",inline"`
SLO SLOConfig `yaml:",inline"`
Expand Down Expand Up @@ -84,7 +84,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(string, *flag.FlagSet) {
},
SLO: slo,
}
cfg.TraceByID = TraceByIDConfig{
cfg.SnapshotByID = SnapshotByIDConfig{
QueryShards: 50,
SLO: slo,
Hedging: HedgingConfig{
Expand Down
19 changes: 9 additions & 10 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type QueryFrontend struct {
func New(cfg Config, next http.RoundTripper, tpNext http.RoundTripper, o *overrides.Overrides, store storage.Store, logger log.Logger, registerer prometheus.Registerer) (*QueryFrontend, error) {
level.Info(logger).Log("msg", "creating middleware in query frontend")

if cfg.TraceByID.QueryShards < minQueryShards || cfg.TraceByID.QueryShards > maxQueryShards {
if cfg.SnapshotByID.QueryShards < minQueryShards || cfg.SnapshotByID.QueryShards > maxQueryShards {
return nil, fmt.Errorf("frontend query shards should be between %d and %d (both inclusive)", minQueryShards, maxQueryShards)
}

Expand All @@ -83,15 +83,15 @@ func New(cfg Config, next http.RoundTripper, tpNext http.RoundTripper, o *overri

retryWare := newRetryWare(cfg.MaxRetries, registerer)

traceByIDMiddleware := MergeMiddlewares(newTraceByIDMiddleware(cfg, logger), retryWare)
snapshotByIDMiddleware := MergeMiddlewares(newSnapshotByIDMiddleware(cfg, logger), retryWare)
searchMiddleware := MergeMiddlewares(newSearchMiddleware(cfg, o, store, logger), retryWare)

snapshotByIDCounter := queriesPerTenant.MustCurryWith(prometheus.Labels{"op": snapshotByIDOp})
searchCounter := queriesPerTenant.MustCurryWith(prometheus.Labels{"op": searchOp})
loadTp := queriesPerTenant.MustCurryWith(prometheus.Labels{"op": "loadtp"})
delTp := queriesPerTenant.MustCurryWith(prometheus.Labels{"op": "deltp"})

snapshots := traceByIDMiddleware.Wrap(next)
snapshots := snapshotByIDMiddleware.Wrap(next)
search := searchMiddleware.Wrap(next)

tpMiddleware := newTracepointForwardMiddleware()
Expand All @@ -118,21 +118,20 @@ func newTracepointForwardMiddleware() Middleware {
})
}

// newTraceByIDMiddleware creates a new frontend middleware responsible for handling get traces requests.
func newTraceByIDMiddleware(cfg Config, logger log.Logger) Middleware {
// newSnapshotByIDMiddleware creates a new frontend middleware responsible for handling get snapshot requests.
func newSnapshotByIDMiddleware(cfg Config, logger log.Logger) Middleware {
return MiddlewareFunc(func(next http.RoundTripper) http.RoundTripper {
// We're constructing middleware in this statement, each middleware wraps the next one from left-to-right
// - the Deduper dedupes Span IDs for Zipkin support
// - the ShardingWare shards queries by splitting the block ID space
// - the RetryWare retries requests that have failed (error or http status 500)
rt := NewRoundTripper(
next,
newSnapshotByIDSharder(cfg.TraceByID.QueryShards, cfg.TolerateFailedBlocks, cfg.TraceByID.SLO, logger),
newHedgedRequestWare(cfg.TraceByID.Hedging),
newSnapshotByIDSharder(cfg.SnapshotByID.QueryShards, cfg.TolerateFailedBlocks, cfg.SnapshotByID.SLO, logger),
newHedgedRequestWare(cfg.SnapshotByID.Hedging),
)

return RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
// validate traceID
// validate snapshot
_, err := api.ParseSnapshotID(r)
if err != nil {
return &http.Response{
Expand All @@ -158,7 +157,7 @@ func newTraceByIDMiddleware(cfg Config, logger log.Logger) Middleware {
marshallingFormat = api.HeaderAcceptProtobuf
}

// enforce all communication internal to Tempo to be in protobuf bytes
// enforce all communication internal to Deep to be in protobuf bytes
r.Header.Set(api.HeaderAccept, api.HeaderAcceptProtobuf)

resp, err := rt.RoundTrip(r)
Expand Down
260 changes: 130 additions & 130 deletions modules/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,133 +17,133 @@

package frontend

//
//import (
// "bytes"
// "io"
// "net/http"
// "net/http/httptest"
// "testing"
// "time"
//
// "github.com/go-kit/log"
// "github.com/stretchr/testify/assert"
// "github.com/stretchr/testify/require"
//)
//
//type mockNextTripperware struct{}
//
//func (s *mockNextTripperware) RoundTrip(_ *http.Request) (*http.Response, error) {
// return &http.Response{
// StatusCode: 200,
// Body: io.NopCloser(bytes.NewReader([]byte("next"))),
// }, nil
//}
//
//func TestFrontendRoundTripsSearch(t *testing.T) {
// next := &mockNextTripperware{}
// f, err := New(Config{
// TraceByID: TraceByIDConfig{
// QueryShards: minQueryShards,
// SLO: testSLOcfg,
// },
// Search: SearchConfig{
// Sharder: SearchSharderConfig{
// ConcurrentRequests: defaultConcurrentRequests,
// TargetBytesPerRequest: defaultTargetBytesPerRequest,
// },
// SLO: testSLOcfg,
// },
// }, next, nil, nil, log.NewNopLogger(), nil)
// require.NoError(t, err)
//
// req := httptest.NewRequest("GET", "/", nil)
//
// // search is a blind passthrough. easy!
// res := httptest.NewRecorder()
// f.Search.ServeHTTP(res, req)
// assert.Equal(t, res.Body.String(), "next")
//}
//
//func TestFrontendBadConfigFails(t *testing.T) {
// f, err := New(Config{
// TraceByID: TraceByIDConfig{
// QueryShards: minQueryShards - 1,
// },
// Search: SearchConfig{
// Sharder: SearchSharderConfig{
// ConcurrentRequests: defaultConcurrentRequests,
// TargetBytesPerRequest: defaultTargetBytesPerRequest,
// },
// SLO: testSLOcfg,
// },
// }, nil, nil, nil, log.NewNopLogger(), nil)
// assert.EqualError(t, err, "frontend query shards should be between 2 and 256 (both inclusive)")
// assert.Nil(t, f)
//
// f, err = New(Config{
// TraceByID: TraceByIDConfig{
// QueryShards: maxQueryShards + 1,
// SLO: testSLOcfg,
// },
// Search: SearchConfig{
// Sharder: SearchSharderConfig{
// ConcurrentRequests: defaultConcurrentRequests,
// TargetBytesPerRequest: defaultTargetBytesPerRequest,
// },
// SLO: testSLOcfg,
// },
// }, nil, nil, nil, log.NewNopLogger(), nil)
// assert.EqualError(t, err, "frontend query shards should be between 2 and 256 (both inclusive)")
// assert.Nil(t, f)
//
// f, err = New(Config{
// TraceByID: TraceByIDConfig{
// QueryShards: maxQueryShards,
// SLO: testSLOcfg,
// },
// Search: SearchConfig{
// Sharder: SearchSharderConfig{
// ConcurrentRequests: 0,
// TargetBytesPerRequest: defaultTargetBytesPerRequest,
// },
// SLO: testSLOcfg,
// },
// }, nil, nil, nil, log.NewNopLogger(), nil)
// assert.EqualError(t, err, "frontend search concurrent requests should be greater than 0")
// assert.Nil(t, f)
//
// f, err = New(Config{
// TraceByID: TraceByIDConfig{
// QueryShards: maxQueryShards,
// SLO: testSLOcfg,
// },
// Search: SearchConfig{
// Sharder: SearchSharderConfig{
// ConcurrentRequests: defaultConcurrentRequests,
// TargetBytesPerRequest: 0,
// },
// SLO: testSLOcfg,
// },
// }, nil, nil, nil, log.NewNopLogger(), nil)
// assert.EqualError(t, err, "frontend search target bytes per request should be greater than 0")
// assert.Nil(t, f)
//
// f, err = New(Config{
// TraceByID: TraceByIDConfig{
// QueryShards: maxQueryShards,
// SLO: testSLOcfg,
// },
// Search: SearchConfig{
// Sharder: SearchSharderConfig{
// ConcurrentRequests: defaultConcurrentRequests,
// TargetBytesPerRequest: defaultTargetBytesPerRequest,
// QueryIngestersUntil: time.Minute,
// QueryBackendAfter: time.Hour,
// },
// SLO: testSLOcfg,
// },
// }, nil, nil, nil, log.NewNopLogger(), nil)
// assert.EqualError(t, err, "query backend after should be less than or equal to query ingester until")
// assert.Nil(t, f)
//}
import (
"bytes"
"io"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/go-kit/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type mockNextTripperware struct{}

func (s *mockNextTripperware) RoundTrip(_ *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: 200,
Body: io.NopCloser(bytes.NewReader([]byte("next"))),
}, nil
}

func TestFrontendRoundTripsSearch(t *testing.T) {
next := &mockNextTripperware{}
tpNext := &mockNextTripperware{}
f, err := New(Config{
SnapshotByID: SnapshotByIDConfig{
QueryShards: minQueryShards,
SLO: testSLOcfg,
},
Search: SearchConfig{
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
},
SLO: testSLOcfg,
},
}, next, tpNext, nil, nil, log.NewNopLogger(), nil)
require.NoError(t, err)

req := httptest.NewRequest("GET", "/", nil)

// search is a blind passthrough. easy!
res := httptest.NewRecorder()
f.Search.ServeHTTP(res, req)
assert.Equal(t, res.Body.String(), "next")
}

func TestFrontendBadConfigFails(t *testing.T) {
f, err := New(Config{
SnapshotByID: SnapshotByIDConfig{
QueryShards: minQueryShards - 1,
},
Search: SearchConfig{
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
},
SLO: testSLOcfg,
},
}, nil, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend query shards should be between 2 and 256 (both inclusive)")
assert.Nil(t, f)

f, err = New(Config{
SnapshotByID: SnapshotByIDConfig{
QueryShards: maxQueryShards + 1,
SLO: testSLOcfg,
},
Search: SearchConfig{
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
},
SLO: testSLOcfg,
},
}, nil, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend query shards should be between 2 and 256 (both inclusive)")
assert.Nil(t, f)

f, err = New(Config{
SnapshotByID: SnapshotByIDConfig{
QueryShards: maxQueryShards,
SLO: testSLOcfg,
},
Search: SearchConfig{
Sharder: SearchSharderConfig{
ConcurrentRequests: 0,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
},
SLO: testSLOcfg,
},
}, nil, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend search concurrent requests should be greater than 0")
assert.Nil(t, f)

f, err = New(Config{
SnapshotByID: SnapshotByIDConfig{
QueryShards: maxQueryShards,
SLO: testSLOcfg,
},
Search: SearchConfig{
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: 0,
},
SLO: testSLOcfg,
},
}, nil, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend search target bytes per request should be greater than 0")
assert.Nil(t, f)

f, err = New(Config{
SnapshotByID: SnapshotByIDConfig{
QueryShards: maxQueryShards,
SLO: testSLOcfg,
},
Search: SearchConfig{
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
QueryIngestersUntil: time.Minute,
QueryBackendAfter: time.Hour,
},
SLO: testSLOcfg,
},
}, nil, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "query backend after should be less than or equal to query ingester until")
assert.Nil(t, f)
}
2 changes: 1 addition & 1 deletion modules/frontend/hedged_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var (
prometheus.GaugeOpts{
Namespace: "deep",
Name: "query_frontend_hedged_roundtrips_total",
Help: "Total number of hedged trace by ID requests. Registered as a gauge for code sanity. This is a counter.",
Help: "Total number of hedged snapshot by ID requests. Registered as a gauge for code sanity. This is a counter.",
},
)
)
Expand Down
Loading