Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
Implement Tiered Hashing with small pool sizes, kick out nodes for co…
Browse files Browse the repository at this point in the history
…rrectness, talk more to fast nodes and reduce pool churn (#86)

Switch to 2 tiers of measurement:
* a 'main' pool used for 80% of requests
* an 'unknown' pool used for 20% of requests

nodes in the unknown pool that perform better than those in the main pool in a recent sliding window of requests can be swapped into the main pool
  • Loading branch information
aarshkshah1992 authored May 2, 2023
1 parent 5e676ee commit 34094f8
Show file tree
Hide file tree
Showing 17 changed files with 1,664 additions and 967 deletions.
53 changes: 11 additions & 42 deletions caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/filecoin-saturn/caboose/tieredhashing"
"io"
"net/http"
"net/url"
Expand All @@ -18,6 +19,10 @@ import (
"go.opentelemetry.io/otel/trace"
)

const (
EnvironmentKey = "STRN_ENV_TAG"
)

type Config struct {
// OrchestratorEndpoint is the URL of the Saturn orchestrator.
OrchestratorEndpoint *url.URL
Expand Down Expand Up @@ -47,16 +52,6 @@ type Config struct {
// PoolRefresh is the interval at which we refresh the pool of Saturn nodes.
PoolRefresh time.Duration

// PoolWeightChangeDebounce is the amount of time we wait between consecutive updates to the weight of a Saturn node
// in our pool after a retrieval success/failure.
PoolWeightChangeDebounce time.Duration

// PoolMembershipDebounce is the amount of time we wait after a saturn node is removed from the pool
// before we add it again to the pool.
PoolMembershipDebounce time.Duration

// trigger early refreshes when pool size drops below this low watermark
PoolLowWatermark int
// MaxRetrievalAttempts determines the number of times we will attempt to retrieve a block from the Saturn network before failing.
MaxRetrievalAttempts int

Expand All @@ -71,10 +66,7 @@ type Config struct {
// SaturnNodeCoolOff is the cool off duration for a saturn node once we determine that we shouldn't be sending requests to it for a while.
SaturnNodeCoolOff time.Duration

MinCoolOff time.Duration

// MaxNCoolOff is the number of times we will cool off a node before downvoting it.
MaxNCoolOff int
TieredHashingOpts []tieredhashing.Option
}

const DefaultLoggingInterval = 5 * time.Second
Expand All @@ -85,9 +77,6 @@ const DefaultSaturnBlockRequestTimeout = 19 * time.Second
const DefaultSaturnCarRequestTimeout = 30 * time.Minute

const DefaultMaxRetries = 3
const DefaultPoolFailureDownvoteDebounce = 1 * time.Minute
const DefaultPoolMembershipDebounce = 3 * DefaultPoolRefreshInterval
const DefaultPoolLowWatermark = 5

const maxBlockSize = 4194305 // 4 Mib + 1 byte
const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes/nearby?count=1000"
Expand All @@ -103,7 +92,6 @@ const DefaultFetchKeyCoolDownDuration = 1 * time.Minute // how long will a sane
// we cool off sending requests to a Saturn node if it returns transient errors rather than immediately downvoting it;
// however, only upto a certain max number of cool-offs.
const DefaultSaturnNodeCoolOff = 5 * time.Minute
const DefaultMaxNCoolOff = 3

var ErrNotImplemented error = errors.New("not implemented")
var ErrNoBackend error = errors.New("no available saturn backend")
Expand Down Expand Up @@ -189,13 +177,6 @@ func NewCaboose(config *Config) (*Caboose, error) {
if config.SaturnNodeCoolOff == 0 {
config.SaturnNodeCoolOff = DefaultSaturnNodeCoolOff
}
if config.MinCoolOff == 0 {
config.MinCoolOff = 1 * time.Minute
}

if config.MaxNCoolOff == 0 {
config.MaxNCoolOff = DefaultMaxNCoolOff
}

c := Caboose{
config: config,
Expand All @@ -221,15 +202,6 @@ func NewCaboose(config *Config) (*Caboose, error) {
c.config.PoolRefresh = DefaultPoolRefreshInterval
}

if c.config.PoolWeightChangeDebounce == 0 {
c.config.PoolWeightChangeDebounce = DefaultPoolFailureDownvoteDebounce
}
if c.config.PoolMembershipDebounce == 0 {
c.config.PoolMembershipDebounce = DefaultPoolMembershipDebounce
}
if c.config.PoolLowWatermark == 0 {
c.config.PoolLowWatermark = DefaultPoolLowWatermark
}
if c.config.MaxRetrievalAttempts == 0 {
c.config.MaxRetrievalAttempts = DefaultMaxRetries
}
Expand All @@ -243,14 +215,6 @@ func NewCaboose(config *Config) (*Caboose, error) {
// Caboose is a blockstore.
var _ ipfsblockstore.Blockstore = (*Caboose)(nil)

// GetMemberWeights is for testing ONLY
func (c *Caboose) GetMemberWeights() map[string]int {
c.pool.lk.RLock()
defer c.pool.lk.RUnlock()

return c.pool.endpoints.ToWeights()
}

func (c *Caboose) Close() {
c.pool.Close()
c.logger.Close()
Expand All @@ -275,6 +239,11 @@ func (c *Caboose) Has(ctx context.Context, it cid.Cid) (bool, error) {
return blk != nil, nil
}

// for testing only
func (c *Caboose) GetPoolPerf() map[string]*tieredhashing.NodePerf {
return c.pool.th.GetPerf()
}

func (c *Caboose) Get(ctx context.Context, it cid.Cid) (blocks.Block, error) {
ctx, span := spanTrace(ctx, "Get", trace.WithAttributes(attribute.Stringer("cid", it)))
defer span.End()
Expand Down
214 changes: 137 additions & 77 deletions caboose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,31 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
"time"

"github.com/filecoin-saturn/caboose"
"github.com/ipfs/boxo/ipld/car/v2"
"github.com/filecoin-saturn/caboose/tieredhashing"
"github.com/ipfs/go-cid"
"github.com/ipld/go-car/v2"
"github.com/ipld/go-ipld-prime/linking"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/storage/memstore"
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
"github.com/multiformats/go-multicodec"
"github.com/stretchr/testify/require"
"io"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
"time"
)

func TestCidCoolDown(t *testing.T) {
ctx := context.Background()
ch := BuildCabooseHarness(t, 3, 3, WithMaxFailuresBeforeCoolDown(2), WithCidCoolDownDuration(1*time.Second))

testCid, _ := cid.V1Builder{Codec: uint64(multicodec.Raw), MhType: uint64(multicodec.Sha2_256)}.Sum(testBlock)
ch.fetchAndAssertSuccess(t, ctx, testCid)

// Invalidate all servers so we cool down cids
ch.failNodesWithCode(t, func(e *ep) bool {
Expand Down Expand Up @@ -61,88 +60,73 @@ func TestCidCoolDown(t *testing.T) {
}, 10*time.Second, 500*time.Millisecond)
}

type HarnessOption func(config *caboose.Config)
func TestFetchBlock(t *testing.T) {
ctx := context.Background()
h := BuildCabooseHarness(t, 3, 3, WithTieredHashingOpts(
[]tieredhashing.Option{tieredhashing.WithMaxMainTierSize(1), tieredhashing.WithCorrectnessWindowSize(2),
tieredhashing.WithLatencyWindowSize(2)}))

func WithPoolMembershipDebounce(d time.Duration) func(config *caboose.Config) {
return func(config *caboose.Config) {
config.PoolMembershipDebounce = d
}
}
testCid, _ := cid.V1Builder{Codec: uint64(multicodec.Raw), MhType: uint64(multicodec.Sha2_256)}.Sum(testBlock)

func WithMaxFailuresBeforeCoolDown(max int) func(config *caboose.Config) {
return func(config *caboose.Config) {
config.MaxFetchFailuresBeforeCoolDown = max
}
}
h.fetchAndAssertSuccess(t, ctx, testCid)

func WithCidCoolDownDuration(duration time.Duration) func(config *caboose.Config) {
return func(config *caboose.Config) {
config.FetchKeyCoolDownDuration = duration
}
}
// ensure we have a success recording
h.assertPoolSize(t, 0, 3, 3)
h.assertCorrectnessCount(t, 1)
h.assertLatencyCount(t, 1)

func WithMaxNCoolOff(n int) func(config *caboose.Config) {
return func(config *caboose.Config) {
config.MaxNCoolOff = n
}
h.fetchAndAssertSuccess(t, ctx, testCid)
h.assertCorrectnessCount(t, 2)
h.assertLatencyCount(t, 2)

// all nodes fail
h.failNodesWithCode(t, func(e *ep) bool {
return true
}, http.StatusNotAcceptable)

// one node gets evicted as correctness window is full
h.fetchAndAssertFailure(t, ctx, testCid, "406")
h.assertPoolSize(t, 0, 2, 2)
}

func BuildCabooseHarness(t *testing.T, n int, maxRetries int, opts ...HarnessOption) *CabooseHarness {
ch := &CabooseHarness{}
func (h *CabooseHarness) assertLatencyCount(t *testing.T, expected int) {
nds := h.c.GetPoolPerf()
count := 0

ch.pool = make([]*ep, n)
purls := make([]string, n)
for i := 0; i < len(ch.pool); i++ {
ch.pool[i] = &ep{}
ch.pool[i].Setup()
purls[i] = strings.TrimPrefix(ch.pool[i].server.URL, "https://")
for _, perf := range nds {
count += int(perf.NLatencyDigest)
}
ch.goodOrch = true
orch := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ch.gol.Lock()
defer ch.gol.Unlock()
if ch.goodOrch {
json.NewEncoder(w).Encode(purls)
} else {
json.NewEncoder(w).Encode([]string{})
}
}))
require.EqualValues(t, expected, count)
}

saturnClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
ServerName: "example.com",
},
},
func (h *CabooseHarness) assertCorrectnessCount(t *testing.T, expected int) {
nds := h.c.GetPoolPerf()
count := 0

for _, perf := range nds {
count += int(perf.NCorrectnessDigest)
}
require.EqualValues(t, expected, count)
}

ourl, _ := url.Parse(orch.URL)
func (h *CabooseHarness) assertPoolSize(t *testing.T, mainS, unknownS, totalS int) {
nds := h.c.GetPoolPerf()
require.Equal(t, totalS, len(nds))

conf := &caboose.Config{
OrchestratorEndpoint: ourl,
OrchestratorClient: http.DefaultClient,
LoggingEndpoint: *ourl,
LoggingClient: http.DefaultClient,
LoggingInterval: time.Hour,
var eMain int
var eUnknown int

SaturnClient: saturnClient,
DoValidation: false,
PoolWeightChangeDebounce: time.Duration(1),
PoolRefresh: time.Millisecond * 50,
MaxRetrievalAttempts: maxRetries,
PoolMembershipDebounce: 1,
}

for _, opt := range opts {
opt(conf)
for _, perf := range nds {
if perf.Tier == "main" {
eMain++
}
if perf.Tier == "unknown" {
eUnknown++
}
}

bs, err := caboose.NewCaboose(conf)
require.NoError(t, err)

ch.c = bs
return ch
require.EqualValues(t, eMain, mainS)
require.EqualValues(t, eUnknown, unknownS)
}

func TestResource(t *testing.T) {
Expand Down Expand Up @@ -205,3 +189,79 @@ func TestResource(t *testing.T) {
t.Fatal("expected fall-over progress")
}
}

type HarnessOption func(config *caboose.Config)

func WithTieredHashingOpts(opts []tieredhashing.Option) HarnessOption {
return func(config *caboose.Config) {
config.TieredHashingOpts = opts
}
}

func WithMaxFailuresBeforeCoolDown(max int) func(config *caboose.Config) {
return func(config *caboose.Config) {
config.MaxFetchFailuresBeforeCoolDown = max
}
}

func WithCidCoolDownDuration(duration time.Duration) func(config *caboose.Config) {
return func(config *caboose.Config) {
config.FetchKeyCoolDownDuration = duration
}
}

func BuildCabooseHarness(t *testing.T, n int, maxRetries int, opts ...HarnessOption) *CabooseHarness {
ch := &CabooseHarness{}

ch.pool = make([]*ep, n)
purls := make([]string, n)
for i := 0; i < len(ch.pool); i++ {
ch.pool[i] = &ep{}
ch.pool[i].Setup()
purls[i] = strings.TrimPrefix(ch.pool[i].server.URL, "https://")
}
ch.goodOrch = true
orch := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ch.gol.Lock()
defer ch.gol.Unlock()
if ch.goodOrch {
json.NewEncoder(w).Encode(purls)
} else {
json.NewEncoder(w).Encode([]string{})
}
}))

saturnClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
ServerName: "example.com",
},
},
}

ourl, _ := url.Parse(orch.URL)

conf := &caboose.Config{
OrchestratorEndpoint: ourl,
OrchestratorClient: http.DefaultClient,
LoggingEndpoint: *ourl,
LoggingClient: http.DefaultClient,
LoggingInterval: time.Hour,

SaturnClient: saturnClient,
DoValidation: false,
PoolRefresh: time.Millisecond * 50,
MaxRetrievalAttempts: maxRetries,
}

for _, opt := range opts {
opt(conf)
}

bs, err := caboose.NewCaboose(conf)
require.NoError(t, err)

ch.c = bs
return ch
}
Loading

0 comments on commit 34094f8

Please sign in to comment.