Skip to content

Commit

Permalink
opt indexer to query shard configs in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
boqiu committed Sep 9, 2024
1 parent f32de44 commit 781c925
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 57 deletions.
9 changes: 4 additions & 5 deletions common/parallel/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type rpcExecutor[CLIENT closable, T any] struct {
}

// QueryZgsRpc calls zgs RPC with given nodes in parallel.
func QueryZgsRpc[T any](ctx context.Context, nodes []string, rpcFunc func(*node.ZgsClient, context.Context) (T, error), option ...RpcOption) (map[string]*RpcResult[T], error) {
func QueryZgsRpc[T any](ctx context.Context, nodes []string, rpcFunc func(*node.ZgsClient, context.Context) (T, error), option ...RpcOption) map[string]*RpcResult[T] {
var opt RpcOption
if len(option) > 0 {
opt = option[0]
Expand All @@ -53,11 +53,10 @@ func QueryZgsRpc[T any](ctx context.Context, nodes []string, rpcFunc func(*node.
lastReportTime: time.Now(),
}

if err := Serial(ctx, &executor, len(nodes), opt.Parallel); err != nil {
return nil, err
}
// should not return err
Serial(ctx, &executor, len(nodes), opt.Parallel)

return executor.node2Results, nil
return executor.node2Results
}

func (executor *rpcExecutor[CLIENT, T]) ParallelDo(ctx context.Context, routine, task int) (interface{}, error) {
Expand Down
120 changes: 68 additions & 52 deletions indexer/node_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/0glabs/0g-storage-client/common/parallel"
"github.com/0glabs/0g-storage-client/common/shard"
"github.com/0glabs/0g-storage-client/common/util"
"github.com/0glabs/0g-storage-client/node"
Expand All @@ -20,6 +21,13 @@ var (
RequestTimeout: 3 * time.Second,
}

defaultRpcOpt = parallel.RpcOption{
Parallel: parallel.SerialOption{
Routines: 500,
},
Provider: defaultZgsClientOpt,
}

defaultNodeManager = NodeManager{}
)

Expand Down Expand Up @@ -176,6 +184,7 @@ func (nm *NodeManager) discover() error {
}).Debug("Succeeded to retrieve peers from storage node")

var numNew int
var newPeers []string

for _, v := range peers {
// public ip address required
Expand All @@ -200,65 +209,39 @@ func (nm *NodeManager) discover() error {
continue
}

// add new storage node
node, err := nm.updateNode(url)
if err != nil {
logrus.WithError(err).WithField("url", url).Debug("Failed to add new peer")
} else {
logrus.WithFields(logrus.Fields{
"url": url,
"shard": node.Config,
"latency": node.Latency,
}).Debug("New peer discovered")
}

numNew++
newPeers = append(newPeers, url)
break
}
}

if numNew > 0 {
logrus.WithField("count", numNew).Info("New peers discovered")
}

return nil
}

// updateNode updates the shard config of specified storage node by `url`.
func (nm *NodeManager) updateNode(url string) (*shard.ShardedNode, error) {
// query ip location at first
ip := parseIP(url)
if _, err := defaultIPLocationManager.Query(ip); err != nil {
logrus.WithError(err).WithField("ip", ip).Warn("Failed to query IP location")
}

zgsClient, err := node.NewZgsClient(url, defaultZgsClientOpt)
if err != nil {
return nil, errors.WithMessage(err, "Failed to create zgs client")
}
defer zgsClient.Close()
result := queryShardConfigs(newPeers)
for url, rpcResult := range result {
if rpcResult.Err != nil {
logrus.WithError(rpcResult.Err).WithField("url", url).Debug("Failed to add new peer")
continue
}

start := time.Now()
nm.discovered.Store(url, &shard.ShardedNode{
URL: url,
Config: rpcResult.Data,
Latency: rpcResult.Latency.Milliseconds(),
Since: time.Now().Unix(),
})

config, err := zgsClient.GetShardConfig(context.Background())
if err != nil {
return nil, errors.WithMessage(err, "Failed to retrieve shard config from storage node")
}
numNew++

if !config.IsValid() {
return nil, errors.Errorf("Invalid shard config retrieved %v", config)
logrus.WithFields(logrus.Fields{
"url": url,
"shard": rpcResult.Data,
"latency": rpcResult.Latency.Milliseconds(),
}).Debug("New peer discovered")
}

node := &shard.ShardedNode{
URL: url,
Config: config,
Latency: time.Since(start).Milliseconds(),
Since: time.Now().Unix(),
if numNew > 0 {
logrus.WithField("count", numNew).Info("New peers discovered")
}

nm.discovered.Store(url, node)

return node, nil
return nil
}

// update updates shard configs of all storage nodes.
Expand All @@ -277,10 +260,18 @@ func (nm *NodeManager) update() error {

start := time.Now()

for _, v := range urls {
if _, err := nm.updateNode(v); err != nil {
logrus.WithError(err).WithField("url", v).Debug("Failed to update shard config, remove from cache")
nm.discovered.Delete(v)
result := queryShardConfigs(urls)
for url, rpcResult := range result {
if rpcResult.Err == nil {
nm.discovered.Store(url, &shard.ShardedNode{
URL: url,
Config: rpcResult.Data,
Latency: rpcResult.Latency.Milliseconds(),
Since: time.Now().Unix(),
})
} else {
logrus.WithError(rpcResult.Err).WithField("url", url).Debug("Failed to update shard config, remove from cache")
nm.discovered.Delete(url)
}
}

Expand All @@ -291,3 +282,28 @@ func (nm *NodeManager) update() error {

return nil
}

func queryShardConfigs(nodes []string) map[string]*parallel.RpcResult[shard.ShardConfig] {
// update IP if absent
for _, v := range nodes {
ip := parseIP(v)
if _, err := defaultIPLocationManager.Query(v); err != nil {
logrus.WithError(err).WithField("ip", ip).Warn("Failed to query IP location")
}
}

rpcFunc := func(client *node.ZgsClient, ctx context.Context) (shard.ShardConfig, error) {
config, err := client.GetShardConfig(ctx)
if err != nil {
return shard.ShardConfig{}, err
}

if !config.IsValid() {
return shard.ShardConfig{}, errors.Errorf("Invalid shard config retrieved %v", config)
}

return config, nil
}

return parallel.QueryZgsRpc(context.Background(), nodes, rpcFunc, defaultRpcOpt)
}

0 comments on commit 781c925

Please sign in to comment.