Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

More instrumentation of context cancellations and reward low latency nodes #85

Merged
merged 5 commits into from
Apr 7, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,10 @@ func (c *Caboose) Close() {

// Fetch allows fetching car archives by a path of the form `/ipfs/<cid>[/path/to/file]`
func (c *Caboose) Fetch(ctx context.Context, path string, cb DataCallback) error {
fetchCalledTotalMetric.WithLabelValues(resourceTypeCar).Add(1)
if recordIfContextErr(resourceTypeCar, ctx, "FetchApi") {
return ctx.Err()
}
return c.pool.fetchResourceWith(ctx, path, cb, c.getAffinity(ctx))
}

Expand All @@ -268,6 +272,10 @@ func (c *Caboose) Has(ctx context.Context, it cid.Cid) (bool, error) {
}

func (c *Caboose) Get(ctx context.Context, it cid.Cid) (blocks.Block, error) {
fetchCalledTotalMetric.WithLabelValues(resourceTypeBlock).Add(1)
if recordIfContextErr(resourceTypeBlock, ctx, "FetchBlockApi") {
return nil, ctx.Err()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is only a partial view of this, right? - because asks for Has and GetSize also trigger parallel block triggers to Get?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@willscott Yeah, just wanna focus on fetch for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then lets not record the block api at all - misreporting will lead to confusion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@willscott

Fixed the instrumentation. We now record on the lower level block and car fetchBlockWith and fetchResourceWith instead of the public API for capturing accurate numbers.

blk, err := c.pool.fetchBlockWith(ctx, it, c.getAffinity(ctx))
if err != nil {
return nil, err
Expand Down
25 changes: 12 additions & 13 deletions fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (

var saturnReqTmpl = "/ipfs/%s?format=raw"

var (
const (
saturnNodeIdKey = "Saturn-Node-Id"
saturnTransferIdKey = "Saturn-Transfer-Id"
saturnCacheHitKey = "Saturn-Cache-Status"
saturnCacheHit = "HIT"
saturnRetryAfterKey = "Retry-After"
resourceTypeCar = "car"
resourceTypeBlock = "block"
)

// doFetch attempts to fetch a block from a given Saturn endpoint. It sends the retrieval logs to the logging endpoint upon a successful or failed attempt.
Expand Down Expand Up @@ -71,13 +73,12 @@ func (p *pool) doFetch(ctx context.Context, from string, c cid.Cid, attempt int)

// TODO Refactor to use a metrics collector that separates the collection of metrics from the actual fetching
func (p *pool) fetchResource(ctx context.Context, from string, resource string, mime string, attempt int, cb DataCallback) (err error) {
resourceType := "car"
resourceType := resourceTypeCar
if mime == "application/vnd.ipld.raw" {
resourceType = "block"
resourceType = resourceTypeBlock
}
fetchCalledTotalMetric.WithLabelValues(resourceType).Add(1)
if ce := ctx.Err(); ce != nil {
fetchRequestContextErrorTotalMetric.WithLabelValues(resourceType, fmt.Sprintf("%t", errors.Is(ce, context.Canceled)), "init").Add(1)
fetchRequestContextErrorTotalMetric.WithLabelValues(resourceType, fmt.Sprintf("%t", errors.Is(ce, context.Canceled)), "fetchResource-init").Add(1)
return ce
}

Expand Down Expand Up @@ -214,7 +215,7 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string,
defer cancel()
req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, reqUrl, nil)
if err != nil {
recordIfContextErr(resourceType, reqCtx, "build-request", start, requestTimeout)
recordIfContextErr(resourceType, reqCtx, "build-http-request")
return err
}

Expand All @@ -233,7 +234,7 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string,
var resp *http.Response
resp, err = p.config.SaturnClient.Do(req)
if err != nil {
recordIfContextErr(resourceType, reqCtx, "send-request", start, requestTimeout)
recordIfContextErr(resourceType, reqCtx, "send-http-request")

networkError = err.Error()
return fmt.Errorf("http request failed: %w", err)
Expand Down Expand Up @@ -298,7 +299,7 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string,
// drain body so it can be re-used.
_, _ = io.Copy(io.Discard, resp.Body)
if err != nil {
recordIfContextErr(resourceType, reqCtx, "read-response", start, requestTimeout)
recordIfContextErr(resourceType, reqCtx, "read-http-response")
return
}

Expand All @@ -317,14 +318,12 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string,
return
}

func recordIfContextErr(resourceType string, ctx context.Context, requestState string, start time.Time, timeout time.Duration) {
func recordIfContextErr(resourceType string, ctx context.Context, requestState string) bool {
if ce := ctx.Err(); ce != nil {
fetchRequestContextErrorTotalMetric.WithLabelValues(resourceType, fmt.Sprintf("%t", errors.Is(ce, context.Canceled)), requestState).Add(1)

if errors.Is(ce, context.DeadlineExceeded) && time.Since(start) < (timeout-5*time.Second) {
fetchIncorrectDeadlineErrorTotalMetric.WithLabelValues(resourceType, requestState).Add(1)
}
return true
}
return false
}

// todo: refactor for dryness
Expand Down
10 changes: 5 additions & 5 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ var (
Name: prometheus.BuildFQName("ipfs", "caboose", "pool_new_members"),
Help: "New members added to the Caboose pool",
}, []string{"weight"})

poolWeightBumpMetric = prometheus.NewGauge(prometheus.GaugeOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "pool_weight_bump"),
})
)

var (
Expand Down Expand Up @@ -222,10 +226,6 @@ var (
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_called_total"),
}, []string{"resourceType"})

fetchIncorrectDeadlineErrorTotalMetric = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_incorrect_deadline_error_total"),
}, []string{"resourceType", "requestStage"})

fetchRequestContextErrorTotalMetric = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prometheus.BuildFQName("ipfs", "caboose", "fetch_request_context_error_total"),
}, []string{"resourceType", "errorType", "requestStage"})
Expand Down Expand Up @@ -276,5 +276,5 @@ func init() {
CabooseMetrics.MustRegister(fetchCalledTotalMetric)
CabooseMetrics.MustRegister(fetchRequestSuccessTimeTraceMetric)

CabooseMetrics.MustRegister(fetchIncorrectDeadlineErrorTotalMetric)
CabooseMetrics.MustRegister(poolWeightBumpMetric)
}
41 changes: 28 additions & 13 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,6 @@ func (p *pool) doRefresh() {

peerLatencyDistribution = latencyHist
peerSpeedDistribution = speedHist

// TODO: The orchestrator periodically prunes "bad" L1s based on a reputation system
// it owns and runs. We should probably just forget about the Saturn endpoints that were
// previously in the pool but are no longer being returned by the orchestrator. It's highly
// likely that the Orchestrator has deemed them to be non-functional/malicious.
// Let's just override the old pool with the new endpoints returned here.
oldMap := make(map[string]bool)
n := make([]*Member, 0, len(newEP))
for _, o := range p.endpoints {
Expand Down Expand Up @@ -245,6 +239,20 @@ func (p *pool) doRefresh() {
}
}

// give weight bumps to low latency peers that have served > 100 successful "low latency" cache hit retrievals.
poolWeightBumpMetric.Set(0)
for _, m := range n {
m := m
if perf, ok := p.nodePerf[m.url]; ok {
// Our analysis so far shows that we do have ~10-15 peers with -75 < 200ms latency.
// It's not the best but it's a good start and we can tune as we go along.
if perf.latencyDigest.Count() > 100 && perf.latencyDigest.Quantile(0.75) <= 200 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these parameters probably shouldn't be hard coded? - maybe environment variables at least?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@willscott Made these varibales for now. Will make them env in the next Caboose PR that I'm gonna write for much better pool membership and weighing algo.

poolWeightBumpMetric.Add(1)
m.weight = maxWeight
}
}
}

// If we have more than maxPoolSize nodes, pick the top maxPoolSize sorted by (weight * age).
if len(n) > maxPoolSize {
sort.Slice(n, func(i, j int) bool {
Expand All @@ -262,7 +270,6 @@ func (p *pool) doRefresh() {
p.c.UpdateWithWeights(p.endpoints.ToWeights())
}
poolSizeMetric.Set(float64(len(n)))

poolNewMembersMetric.Reset()
// periodic update of a pool health metric
byWeight := make(map[int]int)
Expand Down Expand Up @@ -329,6 +336,9 @@ func cidToKey(c cid.Cid) string {
}

func (p *pool) fetchBlockWith(ctx context.Context, c cid.Cid, with string) (blk blocks.Block, err error) {
if recordIfContextErr(resourceTypeBlock, ctx, "fetchBlockWith") {
return nil, ctx.Err()
}
// wait for pool to be initialised
<-p.started

Expand All @@ -352,6 +362,9 @@ func (p *pool) fetchBlockWith(ctx context.Context, c cid.Cid, with string) (blk

blockFetchStart := time.Now()
for i := 0; i < len(nodes); i++ {
if recordIfContextErr(resourceTypeBlock, ctx, "fetchBlockWithLoop") {
return nil, ctx.Err()
}
blk, err = p.fetchBlockAndUpdate(ctx, nodes[i], c, i)

if err == nil {
Expand All @@ -360,9 +373,6 @@ func (p *pool) fetchBlockWith(ctx context.Context, c cid.Cid, with string) (blk

return
}
if ce := ctx.Err(); ce != nil {
return nil, ce
}
}

fetchDurationBlockFailureMetric.Observe(float64(time.Since(blockFetchStart).Milliseconds()))
Expand Down Expand Up @@ -471,6 +481,10 @@ func (p *pool) getNodesToFetch(key string, with string) ([]string, error) {
}

func (p *pool) fetchResourceWith(ctx context.Context, path string, cb DataCallback, with string) (err error) {
if recordIfContextErr(resourceTypeCar, ctx, "fetchResourceWith") {
return ctx.Err()
}

// wait for pool to be initialised
<-p.started

Expand All @@ -496,6 +510,10 @@ func (p *pool) fetchResourceWith(ctx context.Context, path string, cb DataCallba

pq := []string{path}
for i := 0; i < len(nodes); i++ {
if recordIfContextErr(resourceTypeCar, ctx, "fetchResourceWithLoop") {
return ctx.Err()
}

err = p.fetchResourceAndUpdate(ctx, nodes[i], pq[0], i, cb)

var epr = ErrPartialResponse{}
Expand Down Expand Up @@ -528,9 +546,6 @@ func (p *pool) fetchResourceWith(ctx context.Context, path string, cb DataCallba
// for now: reset i on partials so we also give them a chance to retry.
i = -1
}
if ce := ctx.Err(); ce != nil {
return ce
}
}

fetchDurationCarFailureMetric.Observe(float64(time.Since(carFetchStart).Milliseconds()))
Expand Down