Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Add a storetheindex delegated provider #158

Merged
merged 8 commits into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions head/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,15 @@ func NewHead(ctx context.Context, options ...opts.Option) (*Head, chan Bootstrap
}
providerStore = hproviders.CombineProviders(providerStore, hproviders.AddProviderNotSupported(delegateProvider))
}
if cfg.StoreTheIndexAddr != "" {
log.Infof("will delegate to %v with timeout %v", cfg.StoreTheIndexAddr, cfg.DelegateTimeout)
stiProvider, err := hproviders.StoreTheIndexProvider(cfg.StoreTheIndexAddr, cfg.DelegateTimeout)
if err != nil {
return nil, nil, fmt.Errorf("failed to instantiate delegation client (%w)", err)
willscott marked this conversation as resolved.
Show resolved Hide resolved
}
providerStore = hproviders.CombineProviders(providerStore, hproviders.AddProviderNotSupported(stiProvider))
Copy link
Contributor

@guseggert guseggert Feb 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC this will try the caching provider store concurrently, which we expect to fail, which will then enqueue an async DHT lookup. Those are expensive, will always fail, and will contend for resources (the work queue) with regular DHT queries...is there a way to avoid that?

Copy link
Contributor

@aschmahmann aschmahmann Feb 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you saying you're concerned that all the requests that end up being fulfilled by the indexers will result in DHT queries that will likely fail and you're concerned about the load?

If so we have some options here depending on the semantics we want. One might be that if we make a "fallback provider" that instead of trying all the providers in parallel does them sequentially only if the previous ones fail. In this case we could then decide to only do a DHT lookup in the event the Datastore and Indexer systems returned no records.

This wouldn't cover every case (e.g. if there's some record in the DHT that we're missing for a given multihash, but the data is separately advertised by the indexers)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there's a change in logic here it should be in a different PR in order to keep the scope of this one reasonable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, but can we agree to not deploy this to the Hydras until this is fixed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is not making the current situation any worse right?

Do we have a consensus agreement for something better?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change does make it worse, for the reasons listed above.

What @aschmahmann brought up seems like a good compromise. I can make the change if it helps.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed with Gus. In practice merging this code will make it worse. The change I proposed should be pretty small though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think i wasn't clear earlier - what i meant by 'this change' was that this PR uses the same composition structure as the already merged delegated routing structure. I agree that spinning up load in this path is something we need to watch in case it leads to lots of failing dht queries, and that the change to the composition structure propose seems good.

  • There isn't going to be substantial amount of upstream bitswap data that we expect loaded into store the index in the coming week. it would be useful for providers to begin testing the end-to-end flow though, so if the additional change is going to take more than this coming week, we should consider if we can get away without it temporarily.

  • @guseggert if you're able to make the proposed change, that would be great!


}
dhtOpts = append(dhtOpts, dht.ProviderStore(providerStore))

dhtNode, err := dht.New(ctx, node, dhtOpts...)
Expand Down
10 changes: 10 additions & 0 deletions head/opts/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Options struct {
Peerstore peerstore.Peerstore
ProviderStoreBuilder ProviderStoreBuilderFunc
DelegateAddr string
StoreTheIndexAddr string
DelegateTimeout time.Duration
RoutingTable *kbucket.RoutingTable
EnableRelay bool
Expand Down Expand Up @@ -101,6 +102,15 @@ func DelegateAddr(addr string) Option {
}
}

// StoreTheIndexAddr configures the Hydra Head to delegate routing also to this storetheindex addr.
// Defaults to empty string which indicates no delegation.
func StoreTheIndexAddr(addr string) Option {
return func(o *Options) error {
o.StoreTheIndexAddr = addr
return nil
}
}

// DelegateTimeout configures the Hydra Head timeout for delegate routing requests.
func DelegateTimeout(timeout time.Duration) Option {
return func(o *Options) error {
Expand Down
2 changes: 2 additions & 0 deletions hydra/hydra.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type Options struct {
ProviderStore string
DelegateAddr string
DelegateTimeout time.Duration
StoreTheIndexAddr string
GetPort func() int
NHeads int
ProtocolPrefix protocol.ID
Expand Down Expand Up @@ -150,6 +151,7 @@ func NewHydra(ctx context.Context, options Options) (*Hydra, error) {
opts.BootstrapPeers(options.BootstrapPeers),
opts.DelegateAddr(options.DelegateAddr),
opts.DelegateTimeout(options.DelegateTimeout),
opts.StoreTheIndexAddr(options.StoreTheIndexAddr),
}
if options.EnableRelay {
hdOpts = append(hdOpts, opts.EnableRelay())
Expand Down
2 changes: 1 addition & 1 deletion k8s/alasybil.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ spec:
- name: HYDRA_DELEGATED_ROUTING_ADDR
value: "http://127.0.0.1:9999/" # must be a valid URL
- name: HYDRA_DELEGATED_ROUTING_TIMEOUT
value: "2"
value: "1000"
willscott marked this conversation as resolved.
Show resolved Hide resolved
image: libp2p/hydra-booster:master
resources:
limits:
Expand Down
7 changes: 6 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func main() {
httpAPIAddr := flag.String("httpapi-addr", defaultHTTPAPIAddr, "Specify an IP and port to run the HTTP API server on")
delegateAddr := flag.String("delegate-addr", "", "API endpoint for delegated routing")
delegateTimeout := flag.Int("delegate-timeout", 0, "Timeout for delegated routing in seconds")
willscott marked this conversation as resolved.
Show resolved Hide resolved
stiAddr := flag.String("store-the-index-addr", "", "StoreTheIndex API endpoint for delegated routing")
inmem := flag.Bool("mem", false, "Use an in-memory database. This overrides the -db option")
metricsAddr := flag.String("metrics-addr", defaultMetricsAddr, "Specify an IP and port to run Prometheus metrics and pprof HTTP server on")
enableRelay := flag.Bool("enable-relay", false, "Enable libp2p circuit relaying for this node (default false).")
Expand Down Expand Up @@ -119,6 +120,9 @@ func main() {
if *delegateTimeout == 0 {
*delegateTimeout = mustGetEnvInt("HYDRA_DELEGATED_ROUTING_TIMEOUT", 0)
}
if *stiAddr == "" {
*stiAddr = os.Getenv("HYDRA_STORE_THE_INDEX_ADDR")
}

// Allow short keys. Otherwise, we'll refuse connections from the bootsrappers and break the network.
// TODO: Remove this when we shut those bootstrappers down.
Expand Down Expand Up @@ -161,7 +165,8 @@ func main() {
PeerstorePath: *pstorePath,
ProviderStore: *providerStore,
DelegateAddr: *delegateAddr,
DelegateTimeout: time.Second * time.Duration(*delegateTimeout),
DelegateTimeout: time.Millisecond * time.Duration(*delegateTimeout),
StoreTheIndexAddr: *stiAddr,
EnableRelay: *enableRelay,
ProtocolPrefix: protocol.ID(*protocolPrefix),
BucketSize: *bucketSize,
Expand Down
15 changes: 15 additions & 0 deletions metrics/definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ var (
DelegatedFindProvs = stats.Int64("delegated_find_provs_total", "Total delegated find provider attempts that were found locally, or not found locally and succeeded, failed or were discarded", stats.UnitDimensionless)
DelegatedFindProvsDuration = stats.Float64("delegated_find_provs_duration", "The time it took delegated find provider attempts from the network to succeed or fail because of timeout or completion", stats.UnitMilliseconds)

STIFindProvs = stats.Int64("sti_find_provs_total", "Total store the index find provider attempts that were found locally, or not found locally and succeeded, failed or were discarded", stats.UnitDimensionless)
STIFindProvsDuration = stats.Float64("sti_find_provs_duration_nanoseconds", "The time it took storetheindex finds from the network to succeed or fail because of timeout or completion", stats.UnitSeconds)

AWSRequests = stats.Int64("aws_requests", "Requests made to AWS", stats.UnitDimensionless)
AWSRequestDurationMillis = stats.Float64("aws_request_duration", "The time it took to make an AWS request and receive a response", stats.UnitMilliseconds)
AWSRequestRetries = stats.Int64("aws_retries", "Retried requests to AWS", stats.UnitDimensionless)
Expand Down Expand Up @@ -157,6 +160,16 @@ var (
TagKeys: []tag.Key{KeyName},
Aggregation: view.Sum(),
}
STIFindProvsView = &view.View{
Measure: STIFindProvs,
TagKeys: []tag.Key{KeyName, KeyStatus},
Aggregation: view.Sum(),
}
STIFindProvsDurationView = &view.View{
Measure: STIFindProvsDuration,
TagKeys: []tag.Key{KeyName, KeyStatus},
Aggregation: view.Sum(),
}
// DHT views
ReceivedMessagesView = &view.View{
Measure: dhtmetrics.ReceivedMessages,
Expand Down Expand Up @@ -219,6 +232,8 @@ var DefaultViews = []*view.View{
UniquePeersView,
RoutingTableSizeView,
ProviderRecordsView,
STIFindProvsView,
STIFindProvsDurationView,
ProviderRecordsPerKeyView,
PrefetchesView,
PrefetchDurationMillisView,
Expand Down
4 changes: 3 additions & 1 deletion providers/combined.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ func mergeAddrInfos(infos []peer.AddrInfo) []peer.AddrInfo {
}
var r []peer.AddrInfo
for k, v := range m {
r = append(r, peer.AddrInfo{ID: k, Addrs: v})
if k.Validate() == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems fine, but why wouldn't we do this check further up next to the if r.Err == nil check when accumulating the addresses before merging them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we aren't doing an explicit iteration through the addrInfo's / keys during that part of accumulation

r = append(r, peer.AddrInfo{ID: k, Addrs: v})
}
}
return r
}
Expand Down
52 changes: 52 additions & 0 deletions providers/storetheindex.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package providers

import (
"context"
"fmt"
"net/http"
"time"

"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-kad-dht/providers"
"github.com/libp2p/hydra-booster/metrics"
"github.com/libp2p/hydra-booster/providers/storetheindex"
"github.com/multiformats/go-multihash"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
)

func StoreTheIndexProvider(endpointURL string, timeout time.Duration) (providers.ProviderStore, error) {
c, err := storetheindex.New(endpointURL, storetheindex.WithHTTPClient(&http.Client{Timeout: timeout}))
if err != nil {
return nil, err
}
return &StoreTheIndexProviderStore{c: c}, nil
}

type StoreTheIndexProviderStore struct {
c storetheindex.Client
}

func (s *StoreTheIndexProviderStore) AddProvider(ctx context.Context, key []byte, prov peer.AddrInfo) error {
return fmt.Errorf("adding providers not supported")
}

func (s *StoreTheIndexProviderStore) GetProviders(ctx context.Context, key []byte) ([]peer.AddrInfo, error) {
h, err := multihash.Cast(key)
if err != nil {
return nil, err
}
t0 := time.Now()
infos, err := s.c.FindProviders(ctx, h)
dur := time.Now().Sub(t0)
recordSTIFindProvsComplete(ctx, statusFromErr(err), metrics.STIFindProvsDuration.M(float64(dur)))
willscott marked this conversation as resolved.
Show resolved Hide resolved
return infos, err
}

func recordSTIFindProvsComplete(ctx context.Context, status string, extraMeasures ...stats.Measurement) {
stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Upsert(metrics.KeyStatus, status)},
append([]stats.Measurement{metrics.STIFindProvs.M(1)}, extraMeasures...)...,
)
}
42 changes: 42 additions & 0 deletions providers/storetheindex/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package storetheindex

import (
"context"
"net/http"
"net/url"

"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multihash"
)

type Client interface {
FindProviders(ctx context.Context, mh multihash.Multihash) ([]peer.AddrInfo, error)
}

type Option func(*client) error

type client struct {
client *http.Client
endpoint *url.URL
}

func WithHTTPClient(hc *http.Client) Option {
return func(c *client) error {
c.client = hc
return nil
}
}

func New(endpoint string, opts ...Option) (*client, error) {
u, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
c := &client{endpoint: u, client: http.DefaultClient}
for _, o := range opts {
if err := o(c); err != nil {
return nil, err
}
}
return c, nil
}
72 changes: 72 additions & 0 deletions providers/storetheindex/findproviders.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package storetheindex

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"

logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multihash"
)

var logger = logging.Logger("sti/client")

func (c *client) FindProviders(ctx context.Context, mh multihash.Multihash) ([]peer.AddrInfo, error) {
// encode request in URL
u := fmt.Sprint(c.endpoint.String(), "/", mh.B58String())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to use multibase b58 encoding here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but this is an existing endpoint which we are planning on replacing with the delegated routing one anyway.

Side note: @willscott you probably want to change the endpoint at some point in the future to use multibase. The cost of not having that one extra character is almost never worth it.

Copy link
Contributor

@guseggert guseggert Feb 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but this is an existing endpoint which we are planning on replacing with the delegated routing one anyway.

Yeah I was just wondering if we should change the endpoint to use multibase encoding, if it's not too late. If getting replaced soon, then disregard :). (and consider doing this for the replacement)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this whole setup is for the 'works now' variant until go-delegated-routing is solidified and migrated to.

httpReq, err := http.NewRequestWithContext(ctx, "GET", u, nil)
if err != nil {
return nil, err
}
httpReq.Header.Set("Content-Type", "application/json")

resp, err := c.client.Do(httpReq)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
if resp.StatusCode == http.StatusNotFound {
return []peer.AddrInfo{}, nil
}
willscott marked this conversation as resolved.
Show resolved Hide resolved
return nil, fmt.Errorf("find query failed: %v", http.StatusText(resp.StatusCode))
}

defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

parsedResponse := indexFindResponse{}
if err := json.Unmarshal(body, &parsedResponse); err != nil {
return nil, err
}

if len(parsedResponse.MultihashResults) != 1 {
return nil, fmt.Errorf("unexpected number of responses")
}
Comment on lines +56 to +58
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this always has one then why is it in an array? Is it expected the change in the future? If so can we just loop over the array so this can be forwards compatible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the query endpoint allows an array of multihashes to be queried. this client only queries for an individual one at a time.

result := make([]peer.AddrInfo, len(parsedResponse.MultihashResults[0].ProviderResults))
for _, m := range parsedResponse.MultihashResults[0].ProviderResults {
result = append(result, m.Provider)
}

return result, nil
}

type indexFindResponse struct {
MultihashResults []indexMultihashResult
}

type indexMultihashResult struct {
Multihash multihash.Multihash
ProviderResults []indexProviderResult
}

type indexProviderResult struct {
ContextID []byte
Metadata json.RawMessage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are these fields for? They look unused

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the response from the indexer node contains these fields, which are used by some providers. They're here for completeness of the message format. There has been some conversations of providers considering using them for cases that they could be relevant here, for instance in expressing priorities, or that multiple records with the same contextID should be de-duplicated

Provider peer.AddrInfo
}