Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Add a storetheindex delegated provider #158

Merged
merged 8 commits into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions head/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,19 +137,35 @@ func NewHead(ctx context.Context, options ...opts.Option) (*Head, chan Bootstrap
}

var cachingProviderStore *hproviders.CachingProviderStore
if cfg.ProvidersFinder != nil {
cachingProviderStore = hproviders.NewCachingProviderStore(providerStore, cfg.ProvidersFinder, nil)
providerStore = cachingProviderStore
if cfg.ProvidersFinder != nil && cfg.StoreTheIndexAddr == "" {
providerStore = hproviders.NewCachingProviderStore(providerStore, providerStore, cfg.ProvidersFinder, nil)
}
if cfg.ProvidersFinder != nil && cfg.StoreTheIndexAddr != "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this sequence of if statements is correct, but for future reference: When your intention is to have mutually-exclusive cases, both for readability and safety, it is best to capture them either as a switch statement, or with "if else" chain. Here switch statement would be best:

switch {
case cfg.ProvidersFinder != nil && cfg.StoreTheIndexAddr == "":
...
break
case cfg.ProvidersFinder != nil && cfg.StoreTheIndexAddr != "":
...
break
case cfg.ProvidersFinder == nil && cfg.StoreTheIndexAddr != "":
...
break
default:
... something is not right ...
}

stiProviderStore, err := hproviders.NewStoreTheIndexProviderStore(cfg.DelegateHTTPClient, cfg.StoreTheIndexAddr)
if err != nil {
return nil, nil, fmt.Errorf("creating StoreTheIndex providerstore: %w", err)
}

cachingProviderStore = hproviders.NewCachingProviderStore(
hproviders.CombineProviders(providerStore, stiProviderStore),
providerStore,
cfg.ProvidersFinder,
nil,
)

if cfg.DelegateAddr != "" {
log.Infof("will delegate to %v with timeout %v", cfg.DelegateAddr, cfg.DelegateTimeout)
delegateProvider, err := hproviders.DelegateProvider(cfg.DelegateAddr, cfg.DelegateTimeout)
// we still want to use the caching provider store instead of the provider store directly b/c it publishes cache metrics
fmt.Printf("Will delegate to %v with timeout %v.\n", cfg.StoreTheIndexAddr, cfg.DelegateHTTPClient.Timeout)
providerStore = cachingProviderStore
}
if cfg.ProvidersFinder == nil && cfg.StoreTheIndexAddr != "" {
stiPS, err := hproviders.NewStoreTheIndexProviderStore(cfg.DelegateHTTPClient, cfg.StoreTheIndexAddr)
if err != nil {
return nil, nil, fmt.Errorf("failed to instantiate delegation client (%w)", err)
return nil, nil, fmt.Errorf("creating StoreTheIndex provider store: %w", err)
}
providerStore = hproviders.CombineProviders(providerStore, hproviders.AddProviderNotSupported(delegateProvider))
fmt.Printf("Will delegate to %v with timeout %v.\n", cfg.StoreTheIndexAddr, cfg.DelegateHTTPClient.Timeout)
providerStore = stiPS
}

dhtOpts = append(dhtOpts, dht.ProviderStore(providerStore))

dhtNode, err := dht.New(ctx, node, dhtOpts...)
Expand Down
19 changes: 9 additions & 10 deletions head/opts/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package opts

import (
"fmt"
"time"
"net/http"

ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
Expand All @@ -24,8 +24,8 @@ type Options struct {
Datastore ds.Batching
Peerstore peerstore.Peerstore
ProviderStoreBuilder ProviderStoreBuilderFunc
DelegateAddr string
DelegateTimeout time.Duration
StoreTheIndexAddr string
DelegateHTTPClient *http.Client
RoutingTable *kbucket.RoutingTable
EnableRelay bool
Addrs []multiaddr.Multiaddr
Expand Down Expand Up @@ -92,19 +92,18 @@ func ProviderStoreBuilder(builder func(Options, host.Host) (providers.ProviderSt
}
}

// DelegateAddr configures the Hydra Head to delegate routing also to this addr.
// Defaults to empty string which indicates no delegation.
func DelegateAddr(addr string) Option {
func DelegateHTTPClient(c *http.Client) Option {
return func(o *Options) error {
o.DelegateAddr = addr
o.DelegateHTTPClient = c
return nil
}
}

// DelegateTimeout configures the Hydra Head timeout for delegate routing requests.
func DelegateTimeout(timeout time.Duration) Option {
// StoreTheIndexAddr configures the Hydra Head to delegate routing also to this storetheindex addr.
// Defaults to empty string which indicates no delegation.
func StoreTheIndexAddr(addr string) Option {
return func(o *Options) error {
o.DelegateTimeout = timeout
o.StoreTheIndexAddr = addr
return nil
}
}
Expand Down
10 changes: 7 additions & 3 deletions hydra/hydra.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"net/http"
"os"
"strconv"
"strings"
Expand Down Expand Up @@ -59,8 +60,8 @@ type Options struct {
DatastorePath string
PeerstorePath string
ProviderStore string
DelegateAddr string
DelegateTimeout time.Duration
StoreTheIndexAddr string
GetPort func() int
NHeads int
ProtocolPrefix protocol.ID
Expand Down Expand Up @@ -128,6 +129,9 @@ func NewHydra(ctx context.Context, options Options) (*Hydra, error) {
providersFinder := hproviders.NewAsyncProvidersFinder(10*time.Second, options.NHeads, 1*time.Hour)
providersFinder.Run(ctx, 1000)

// reuse the HTTP client across all the heads
delegateHTTPClient := &http.Client{Timeout: options.DelegateTimeout}

for i := 0; i < options.NHeads; i++ {
time.Sleep(options.Stagger)
fmt.Fprintf(os.Stderr, ".")
Expand All @@ -148,8 +152,8 @@ func NewHydra(ctx context.Context, options Options) (*Hydra, error) {
opts.Limiter(limiter),
opts.ID(priv),
opts.BootstrapPeers(options.BootstrapPeers),
opts.DelegateAddr(options.DelegateAddr),
opts.DelegateTimeout(options.DelegateTimeout),
opts.DelegateHTTPClient(delegateHTTPClient),
opts.StoreTheIndexAddr(options.StoreTheIndexAddr),
}
if options.EnableRelay {
hdOpts = append(hdOpts, opts.EnableRelay())
Expand Down
2 changes: 1 addition & 1 deletion k8s/alasybil.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ spec:
- name: HYDRA_DELEGATED_ROUTING_ADDR
value: "http://127.0.0.1:9999/" # must be a valid URL
- name: HYDRA_DELEGATED_ROUTING_TIMEOUT
value: "2"
value: "1000"
willscott marked this conversation as resolved.
Show resolved Hide resolved
image: libp2p/hydra-booster:master
resources:
limits:
Expand Down
16 changes: 8 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ func main() {
pstorePath := flag.String("pstore", "", "Peerstore directory for LevelDB store (defaults to in-memory store)")
providerStore := flag.String("provider-store", "", "A non-default provider store to use, \"dynamodb,table=<string>,ttl=<ttl-in-seconds>,queryLimit=<int>\"")
httpAPIAddr := flag.String("httpapi-addr", defaultHTTPAPIAddr, "Specify an IP and port to run the HTTP API server on")
delegateAddr := flag.String("delegate-addr", "", "API endpoint for delegated routing")
delegateTimeout := flag.Int("delegate-timeout", 0, "Timeout for delegated routing in seconds")
delegateTimeout := flag.Int("delegate-timeout", 0, "Timeout for delegated routing in milliseconds")
stiAddr := flag.String("store-the-index-addr", "", "StoreTheIndex API endpoint for delegated routing")
inmem := flag.Bool("mem", false, "Use an in-memory database. This overrides the -db option")
metricsAddr := flag.String("metrics-addr", defaultMetricsAddr, "Specify an IP and port to run Prometheus metrics and pprof HTTP server on")
enableRelay := flag.Bool("enable-relay", false, "Enable libp2p circuit relaying for this node (default false).")
Expand Down Expand Up @@ -113,11 +113,11 @@ func main() {
if *providerStore == "" {
*providerStore = os.Getenv("HYDRA_PROVIDER_STORE")
}
if *delegateAddr == "" {
*delegateAddr = os.Getenv("HYDRA_DELEGATED_ROUTING_ADDR")
}
if *delegateTimeout == 0 {
*delegateTimeout = mustGetEnvInt("HYDRA_DELEGATED_ROUTING_TIMEOUT", 0)
*delegateTimeout = mustGetEnvInt("HYDRA_DELEGATED_ROUTING_TIMEOUT", 1000)
}
if *stiAddr == "" {
*stiAddr = os.Getenv("HYDRA_STORE_THE_INDEX_ADDR")
}

// Allow short keys. Otherwise, we'll refuse connections from the bootsrappers and break the network.
Expand Down Expand Up @@ -160,8 +160,8 @@ func main() {
DatastorePath: *dbpath,
PeerstorePath: *pstorePath,
ProviderStore: *providerStore,
DelegateAddr: *delegateAddr,
DelegateTimeout: time.Second * time.Duration(*delegateTimeout),
DelegateTimeout: time.Millisecond * time.Duration(*delegateTimeout),
StoreTheIndexAddr: *stiAddr,
EnableRelay: *enableRelay,
ProtocolPrefix: protocol.ID(*protocolPrefix),
BucketSize: *bucketSize,
Expand Down
15 changes: 15 additions & 0 deletions metrics/definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ var (
DelegatedFindProvs = stats.Int64("delegated_find_provs_total", "Total delegated find provider attempts that were found locally, or not found locally and succeeded, failed or were discarded", stats.UnitDimensionless)
DelegatedFindProvsDuration = stats.Float64("delegated_find_provs_duration", "The time it took delegated find provider attempts from the network to succeed or fail because of timeout or completion", stats.UnitMilliseconds)

STIFindProvs = stats.Int64("sti_find_provs_total", "Total store the index find provider attempts that were found locally, or not found locally and succeeded, failed or were discarded", stats.UnitDimensionless)
STIFindProvsDuration = stats.Float64("sti_find_provs_duration_nanoseconds", "The time it took storetheindex finds from the network to succeed or fail because of timeout or completion", stats.UnitSeconds)

AWSRequests = stats.Int64("aws_requests", "Requests made to AWS", stats.UnitDimensionless)
AWSRequestDurationMillis = stats.Float64("aws_request_duration", "The time it took to make an AWS request and receive a response", stats.UnitMilliseconds)
AWSRequestRetries = stats.Int64("aws_retries", "Retried requests to AWS", stats.UnitDimensionless)
Expand Down Expand Up @@ -157,6 +160,16 @@ var (
TagKeys: []tag.Key{KeyName},
Aggregation: view.Sum(),
}
STIFindProvsView = &view.View{
Measure: STIFindProvs,
TagKeys: []tag.Key{KeyName, KeyStatus},
Aggregation: view.Sum(),
}
STIFindProvsDurationView = &view.View{
Measure: STIFindProvsDuration,
TagKeys: []tag.Key{KeyName, KeyStatus},
Aggregation: view.Sum(),
}
// DHT views
ReceivedMessagesView = &view.View{
Measure: dhtmetrics.ReceivedMessages,
Expand Down Expand Up @@ -219,6 +232,8 @@ var DefaultViews = []*view.View{
UniquePeersView,
RoutingTableSizeView,
ProviderRecordsView,
STIFindProvsView,
STIFindProvsDurationView,
ProviderRecordsPerKeyView,
PrefetchesView,
PrefetchDurationMillisView,
Expand Down
41 changes: 16 additions & 25 deletions providers/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,58 +8,49 @@ import (
"github.com/libp2p/go-libp2p-kad-dht/providers"
"github.com/libp2p/hydra-booster/metrics"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
)

// CachingProviderStore wraps a providerstore, and finds providers for CIDs that were requested
// but not found in the underlying providerstore, and then caches them in the providerstore.
// CachingProviderStore checks the ReadProviderStore for providers. If no providers are returned,
// then the Finder is used to find providers, which are then added to the WriteProviderStore.
type CachingProviderStore struct {
Delegate providers.ProviderStore
Finder ProvidersFinder
Router ReadContentRouting
ProvidersToFind int
log logging.EventLogger
ReadProviderStore providers.ProviderStore
WriteProviderStore providers.ProviderStore
Finder ProvidersFinder
Router ReadContentRouting
log logging.EventLogger
}

func NewCachingProviderStore(delegate providers.ProviderStore, finder ProvidersFinder, router ReadContentRouting) *CachingProviderStore {
func NewCachingProviderStore(getDelegate providers.ProviderStore, addDelegate providers.ProviderStore, finder ProvidersFinder, router ReadContentRouting) *CachingProviderStore {
return &CachingProviderStore{
Delegate: delegate,
Finder: finder,
Router: router,
log: logging.Logger("hydra/providersn"),
ReadProviderStore: getDelegate,
WriteProviderStore: addDelegate,
Finder: finder,
Router: router,
log: logging.Logger("hydra/providers"),
}
}

func (s *CachingProviderStore) AddProvider(ctx context.Context, key []byte, prov peer.AddrInfo) error {
return s.Delegate.AddProvider(ctx, key, prov)
return s.WriteProviderStore.AddProvider(ctx, key, prov)
}

// GetProviders gets providers for the given key from the providerstore.
// If the providerstore does not have providers for the key, then the ProvidersFinder is queried and the results are cached.
func (d *CachingProviderStore) GetProviders(ctx context.Context, key []byte) ([]peer.AddrInfo, error) {
addrInfos, err := d.Delegate.GetProviders(ctx, key)
addrInfos, err := d.ReadProviderStore.GetProviders(ctx, key)
if err != nil {
return addrInfos, err
}

if len(addrInfos) > 0 {
recordPrefetches(ctx, "local")
return addrInfos, nil
}

return nil, d.Finder.Find(ctx, d.Router, key, func(ai peer.AddrInfo) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably not worth the effort, but for future reference. It is bad style to reuse the error return value for two different purposes. Above, returned errors indicate errors in the providerstores. Here, errors indicate errors in the finder. Considering that the finder is an async functionality, independent of the GetProviders execution path, its error should be logged, not returned.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise this function seems to be a correct implementation of the logic we discussed in colo.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't heard of that style guideline before, if a method is supposed to do two things, and it can't do one of them, then it's normal to return an error, regardless of which one failed. The finder is best effort, so an error from the finder here doesn't represent an error in the finding itself, it means that we couldn't even try to find (e.g. in async case, that the work couldn't be queued for some reason). My intention was for the CachingProviderStore to not know nor care that things are happening async, but given that we don't have a sync version of this, I can see how that just adds confusion, so I can rename things a bit here to clarify.

err := d.Delegate.AddProvider(ctx, key, ai)
err := d.WriteProviderStore.AddProvider(ctx, key, ai)
if err != nil {
d.log.Errorf("failed to add provider to providerstore: %s", err)
stats.Record(ctx, metrics.PrefetchFailedToCache.M(1))
}
})
}

func recordPrefetches(ctx context.Context, status string, extraMeasures ...stats.Measurement) {
stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Upsert(metrics.KeyStatus, status)},
append([]stats.Measurement{metrics.Prefetches.M(1)}, extraMeasures...)...,
)
}
41 changes: 22 additions & 19 deletions providers/caching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ func (m *mockProviderStore) AddProvider(ctx context.Context, key []byte, prov pe
if m.err != nil {
return m.err
}
if m.providers == nil {
m.providers = map[string][]peer.AddrInfo{}
}
m.providers[string(key)] = append(m.providers[string(key)], prov)
return nil
}
Expand Down Expand Up @@ -46,36 +49,33 @@ func TestCachingProviderStore_GetProviders(t *testing.T) {
name string
mh string

delegateErr error
delegateProviders map[string][]peer.AddrInfo
routerProviders map[string][]peer.AddrInfo
finderProviders map[string][]peer.AddrInfo
delegateErr error
readProviders map[string][]peer.AddrInfo
routerProviders map[string][]peer.AddrInfo
finderProviders map[string][]peer.AddrInfo

expProviders []peer.AddrInfo
expDelegateProviders map[string][]peer.AddrInfo
expErr error
expProviders []peer.AddrInfo
expWriteProviders map[string][]peer.AddrInfo
expErr error
}{
{
name: "returns providers when delegate has them",
mh: "mh1",
delegateProviders: map[string][]peer.AddrInfo{
readProviders: map[string][]peer.AddrInfo{
"mh1": {peer.AddrInfo{ID: peer.ID([]byte("peer1"))}},
},
expProviders: []peer.AddrInfo{
{ID: peer.ID([]byte("peer1"))},
},
expDelegateProviders: map[string][]peer.AddrInfo{
"mh1": {peer.AddrInfo{ID: peer.ID([]byte("peer1"))}},
},
},
{
name: "finds and caches providers when delegate doesn't have them",
mh: "mh1",
delegateProviders: map[string][]peer.AddrInfo{},
name: "finds and caches providers when delegate doesn't have them",
mh: "mh1",
readProviders: map[string][]peer.AddrInfo{},
finderProviders: map[string][]peer.AddrInfo{
"mh1": {peer.AddrInfo{ID: peer.ID([]byte("peer1"))}},
},
expDelegateProviders: map[string][]peer.AddrInfo{
expWriteProviders: map[string][]peer.AddrInfo{
"mh1": {peer.AddrInfo{ID: peer.ID([]byte("peer1"))}},
},
},
Expand All @@ -90,20 +90,23 @@ func TestCachingProviderStore_GetProviders(t *testing.T) {
t.Run(c.name, func(t *testing.T) {
ctx, stop := context.WithTimeout(context.Background(), 2*time.Second)
defer stop()
delegate := &mockProviderStore{
providers: c.delegateProviders,
writePS := &mockProviderStore{
err: c.delegateErr,
}
readPS := &mockProviderStore{
providers: c.readProviders,
err: c.delegateErr,
}
finder := &mockFinder{
providers: c.finderProviders,
}

ps := NewCachingProviderStore(delegate, finder, nil)
ps := NewCachingProviderStore(readPS, writePS, finder, nil)

provs, err := ps.GetProviders(ctx, []byte(c.mh))
assert.Equal(t, c.expErr, err)
assert.Equal(t, c.expProviders, provs)
assert.Equal(t, c.expDelegateProviders, delegate.providers)
assert.Equal(t, c.expWriteProviders, writePS.providers)
})
}
}
4 changes: 3 additions & 1 deletion providers/combined.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ func mergeAddrInfos(infos []peer.AddrInfo) []peer.AddrInfo {
}
var r []peer.AddrInfo
for k, v := range m {
r = append(r, peer.AddrInfo{ID: k, Addrs: v})
if k.Validate() == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems fine, but why wouldn't we do this check further up next to the if r.Err == nil check when accumulating the addresses before merging them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we aren't doing an explicit iteration through the addrInfo's / keys during that part of accumulation

r = append(r, peer.AddrInfo{ID: k, Addrs: v})
}
}
return r
}
Expand Down
5 changes: 5 additions & 0 deletions providers/ddb.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ func (d *dynamoDBProviderStore) GetProviders(ctx context.Context, key []byte) ([
}

stats.Record(ctx, stats.Measurement(metrics.ProviderRecordsPerKey.M(int64(len(providers)))))

if len(providers) > 0 {
recordPrefetches(ctx, "local")
}

return providers, nil
}

Expand Down
Loading