Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Add test for mirroring #98

Merged
merged 9 commits into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"io"
"net/http"
"net/url"
"os"
"strings"
"time"

"github.com/filecoin-saturn/caboose/tieredhashing"
Expand All @@ -29,6 +31,8 @@ type Config struct {
OrchestratorEndpoint *url.URL
// OrchestratorClient is the HTTP client to use when communicating with the Saturn orchestrator.
OrchestratorClient *http.Client
// OrchestratorOverride replaces calls to the orchestrator with a fixed response.
OrchestratorOverride []string

// LoggingEndpoint is the URL of the logging endpoint where we submit logs pertaining to our Saturn retrieval requests.
LoggingEndpoint url.URL
Expand Down Expand Up @@ -184,6 +188,9 @@ func NewCaboose(config *Config) (*Caboose, error) {
if config.MirrorFraction == 0 {
config.MirrorFraction = DefaultMirrorFraction
}
if override := os.Getenv(BackendOverrideKey); len(override) > 0 {
config.OrchestratorOverride = strings.Split(override, ",")
}

c := Caboose{
config: config,
Expand Down
36 changes: 19 additions & 17 deletions fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,23 +203,25 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string,
}

if err == nil || !errors.Is(err, context.Canceled) {
p.logger.queue <- log{
CacheHit: isCacheHit,
URL: reqUrl,
StartTime: start,
NumBytesSent: received,
RequestDurationSec: durationSecs,
RequestID: saturnTransferId,
HTTPStatusCode: code,
HTTPProtocol: proto,
TTFBMS: int(ttfbMs),
// my address
Range: "",
Referrer: respReq.Referer(),
UserAgent: respReq.UserAgent(),
NodeId: saturnNodeId,
NodeIpAddress: from,
IfNetworkError: networkError,
if p.logger != nil {
p.logger.queue <- log{
CacheHit: isCacheHit,
URL: reqUrl,
StartTime: start,
NumBytesSent: received,
RequestDurationSec: durationSecs,
RequestID: saturnTransferId,
HTTPStatusCode: code,
HTTPProtocol: proto,
TTFBMS: int(ttfbMs),
// my address
Range: "",
Referrer: respReq.Referer(),
UserAgent: respReq.UserAgent(),
NodeId: saturnNodeId,
NodeIpAddress: from,
IfNetworkError: networkError,
}
}
}
}()
Expand Down
5 changes: 2 additions & 3 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"math/rand"
"net/url"
"os"
"strings"
"sync"
"time"

Expand All @@ -34,8 +33,8 @@ const (
// loadPool refreshes the set of Saturn endpoints in the pool by fetching an updated list of responsive Saturn nodes from the
// Saturn Orchestrator.
func (p *pool) loadPool() ([]string, error) {
if override := os.Getenv(BackendOverrideKey); len(override) > 0 {
return strings.Split(override, ","), nil
if p.config.OrchestratorOverride != nil {
return p.config.OrchestratorOverride, nil
}
resp, err := p.config.OrchestratorClient.Get(p.config.OrchestratorEndpoint.String())
if err != nil {
Expand Down
150 changes: 150 additions & 0 deletions pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package caboose

import (
"bytes"
"context"
"crypto/tls"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"sync"
"testing"
"time"
"unsafe"

"github.com/filecoin-saturn/caboose/tieredhashing"
"github.com/ipfs/go-cid"
"github.com/ipld/go-car/v2"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/storage/memstore"
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
"github.com/multiformats/go-multicodec"
)

type ep struct {
server *httptest.Server
valid bool
cnt int
httpCode int
resp []byte
lk sync.Mutex
}

var testBlock = []byte("hello World")

func (e *ep) Setup() {
e.valid = true
e.resp = testBlock
e.server = httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
e.lk.Lock()
defer e.lk.Unlock()
e.cnt++
if e.valid {
w.Write(e.resp)
} else {
if e.httpCode == http.StatusTooManyRequests {
w.Header().Set("Retry-After", "1")
}
if e.httpCode == 0 {
e.httpCode = 500
}
w.WriteHeader(e.httpCode)
w.Write([]byte("error"))
}
}))
}

func TestPoolMiroring(t *testing.T) {
if unsafe.Sizeof(unsafe.Pointer(nil)) <= 4 {
t.Skip("skipping for 32bit architectures because too slow")
}
opts := []tieredhashing.Option{
tieredhashing.WithCorrectnessWindowSize(2),
tieredhashing.WithLatencyWindowSize(2),
tieredhashing.WithMaxMainTierSize(1),
}

saturnClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
}

data := []byte("hello world")
ls := cidlink.DefaultLinkSystem()
lsm := memstore.Store{}
ls.SetReadStorage(&lsm)
ls.SetWriteStorage(&lsm)
finalCL := ls.MustStore(ipld.LinkContext{}, cidlink.LinkPrototype{Prefix: cid.NewPrefixV1(uint64(multicodec.Raw), uint64(multicodec.Sha2_256))}, basicnode.NewBytes(data))
finalC := finalCL.(cidlink.Link).Cid
cw, err := car.NewSelectiveWriter(context.TODO(), &ls, finalC, selectorparse.CommonSelector_MatchAllRecursively)
if err != nil {
t.Fatal(err)
}
carBytes := bytes.NewBuffer(nil)
cw.WriteTo(carBytes)

e := ep{}
e.Setup()
e.lk.Lock()
e.resp = carBytes.Bytes()
eURL := strings.TrimPrefix(e.server.URL, "https://")
e.lk.Unlock()

e2 := ep{}
e2.Setup()
e2.lk.Lock()
e2.resp = carBytes.Bytes()
e2URL := strings.TrimPrefix(e2.server.URL, "https://")
e2.lk.Unlock()

conf := Config{
OrchestratorEndpoint: &url.URL{},
OrchestratorClient: http.DefaultClient,
OrchestratorOverride: []string{eURL, e2URL},
LoggingEndpoint: url.URL{},
LoggingClient: http.DefaultClient,
LoggingInterval: time.Hour,

SaturnClient: saturnClient,
DoValidation: false,
PoolRefresh: time.Minute,
MaxRetrievalAttempts: 1,
TieredHashingOpts: opts,
MirrorFraction: 1.0,
}

p := newPool(&conf)
p.doRefresh()
p.config.OrchestratorOverride = nil
p.Start()

// promote one node to main pool. other will remain in uknown pool.
p.th.RecordSuccess(eURL, tieredhashing.ResponseMetrics{Success: true, TTFBMs: 30, SpeedPerMs: 30})
p.th.RecordSuccess(eURL, tieredhashing.ResponseMetrics{Success: true, TTFBMs: 30, SpeedPerMs: 30})
p.th.UpdateMainTierWithTopN()

_, err = p.fetchBlockWith(context.Background(), finalC, "")
if err != nil {
t.Fatal(err)
}

time.Sleep(100 * time.Millisecond)
p.Close()

e.lk.Lock()
defer e.lk.Unlock()
if e.cnt != 1 {
t.Fatalf("expected 1 primary fetch, got %d", e.cnt)
}
e2.lk.Lock()
defer e2.lk.Unlock()
if e2.cnt != 1 {
t.Fatalf("expected 1 mirrored fetch, got %d", e2.cnt)
}
}