Skip to content

Commit

Permalink
feat(indexer-gateway): add new rest api (#72)
Browse files Browse the repository at this point in the history
- add new rest api to get file info and node status
- improve storage nodes selection for file upload and download
  • Loading branch information
wanliqun authored Nov 22, 2024
1 parent e8e028d commit 3b8bd11
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 140 deletions.
14 changes: 5 additions & 9 deletions cmd/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ var (
locations indexer.IPLocationConfig
locationCache indexer.FileLocationCacheConfig
maxDownloadFileSize uint64
ExpectedReplica uint
}

indexerCmd = &cobra.Command{
Expand Down Expand Up @@ -45,7 +44,6 @@ func init() {
indexerCmd.Flags().IntVar(&indexerArgs.locationCache.CacheSize, "file-location-cache-size", 100000, "size of file location cache")

indexerCmd.Flags().Uint64Var(&indexerArgs.maxDownloadFileSize, "max-download-file-size", 100*1024*1024, "Maximum file size in bytes to download")
indexerCmd.Flags().UintVar(&indexerArgs.ExpectedReplica, "expected-replica", 1, "Expected number of replications to upload")

indexerCmd.MarkFlagsOneRequired("trusted", "node")

Expand All @@ -58,17 +56,17 @@ func startIndexer(*cobra.Command, []string) {

indexer.InitDefaultIPLocationManager(indexerArgs.locations)

nodeManagerClosable, err := indexer.InitDefaultNodeManager(indexerArgs.nodes)
nodeManager, err := indexer.InitDefaultNodeManager(indexerArgs.nodes)
if err != nil {
logrus.WithError(err).Fatal("Failed to initialize the default node manager")
}
defer nodeManagerClosable()
defer nodeManager.Close()

fileLocationCacheClosable, err := indexer.InitFileLocationCache(indexerArgs.locationCache)
fileLocationCache, err := indexer.InitFileLocationCache(indexerArgs.locationCache)
if err != nil {
logrus.WithError(err).Fatal("Failed to initialize the default file location cache")
}
defer fileLocationCacheClosable()
defer fileLocationCache.Close()

api := indexer.NewIndexerApi()

Expand All @@ -77,11 +75,9 @@ func startIndexer(*cobra.Command, []string) {
"discover": len(indexerArgs.nodes.DiscoveryNode) > 0,
}).Info("Starting indexer service ...")

gateway.MustServeWithRPC(gateway.Config{
gateway.MustServeWithRPC(nodeManager, fileLocationCache, gateway.Config{
Endpoint: indexerArgs.endpoint,
Nodes: indexerArgs.nodes.TrustedNodes,
MaxDownloadFileSize: indexerArgs.maxDownloadFileSize,
ExpectedReplica: indexerArgs.ExpectedReplica,
RPCHandler: rpc.MustNewHandler(map[string]interface{}{
api.Namespace: api,
}),
Expand Down
6 changes: 3 additions & 3 deletions indexer/file_location_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,18 @@ type FileLocationCache struct {

var defaultFileLocationCache FileLocationCache

func InitFileLocationCache(config FileLocationCacheConfig) (closable func(), err error) {
func InitFileLocationCache(config FileLocationCacheConfig) (cache *FileLocationCache, err error) {
if len(config.DiscoveryNode) > 0 {
if defaultFileLocationCache.discoverNode, err = node.NewAdminClient(config.DiscoveryNode, defaultZgsClientOpt); err != nil {
return nil, errors.WithMessage(err, "Failed to create admin client to discover peers")
}
}
defaultFileLocationCache.cache = expirable.NewLRU[uint64, []*shard.ShardedNode](config.CacheSize, nil, config.Expiry)
defaultFileLocationCache.discoveryPorts = config.DiscoveryPorts
return defaultFileLocationCache.close, nil
return &defaultFileLocationCache, nil
}

func (c *FileLocationCache) close() {
func (c *FileLocationCache) Close() {
if c.discoverNode != nil {
c.discoverNode.Close()
}
Expand Down
142 changes: 142 additions & 0 deletions indexer/gateway/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package gateway

import (
"context"
"fmt"
"strconv"

"github.com/0glabs/0g-storage-client/common/shard"
"github.com/0glabs/0g-storage-client/indexer"
"github.com/0glabs/0g-storage-client/node"
eth_common "github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
)

type Cid struct {
Root string `form:"root" json:"root"`
TxSeq *uint64 `form:"txSeq" json:"txSeq"`
}

// NewCid parsing the CID from the input string.
func NewCid(cidStr string) Cid {
var cid Cid
if v, err := strconv.ParseUint(cidStr, 10, 64); err == nil { // TxnSeq is used as CID
cid.TxSeq = &v
} else {
cid.Root = cidStr
}
return cid
}

type RestController struct {
nodeManager *indexer.NodeManager
fileLocationCache *indexer.FileLocationCache

maxDownloadFileSize uint64 // max download file size
}

func NewRestController(nodeManager *indexer.NodeManager, locationCache *indexer.FileLocationCache, maxDownloadFileSize uint64) *RestController {
return &RestController{
nodeManager: nodeManager,
fileLocationCache: locationCache,
maxDownloadFileSize: maxDownloadFileSize,
}
}

// getAvailableFileLocations returns a list of available file locations for a file with the given CID.
func (ctrl *RestController) getAvailableFileLocations(ctx context.Context, cid Cid) ([]*shard.ShardedNode, error) {
if cid.TxSeq != nil {
return ctrl.fileLocationCache.GetFileLocations(ctx, *cid.TxSeq)
}

// find corresponding tx sequence
hash := eth_common.HexToHash(cid.Root)
for _, client := range ctrl.nodeManager.TrustedClients() {
info, err := client.GetFileInfo(ctx, hash)
if err == nil && info != nil {
return ctrl.fileLocationCache.GetFileLocations(ctx, info.Tx.Seq)
}
}

return nil, nil
}

// getAvailableStorageNodes returns a list of available storage nodes for a file with the given CID.
func (ctrl *RestController) getAvailableStorageNodes(ctx context.Context, cid Cid) ([]*node.ZgsClient, error) {
nodes, err := ctrl.getAvailableFileLocations(ctx, cid)
if err != nil {
return nil, errors.WithMessage(err, "failed to get file locations")
}

var clients []*node.ZgsClient
for i := range nodes {
client, err := node.NewZgsClient(nodes[i].URL)
if err != nil {
return nil, errors.WithMessage(err, "failed to create zgs client")
}

clients = append(clients, client)
}

return clients, nil
}

// fetchFileInfo encapsulates the logic for attempting to retrieve file info from storage nodes.
func (ctrl *RestController) fetchFileInfo(ctx context.Context, cid Cid) (*node.FileInfo, error) {
clients, err := ctrl.getAvailableStorageNodes(ctx, cid)
if err != nil {
return nil, fmt.Errorf("failed to get available storage nodes: %v", err)
}

fileInfo, err := getOverallFileInfo(ctx, clients, cid)
if err != nil {
return nil, fmt.Errorf("failed to retrieve file info from storage nodes: %v", err)
}

if fileInfo != nil {
return fileInfo, nil
}

// Attempt retrieval from trusted clients as a fallback
fileInfo, err = getOverallFileInfo(ctx, ctrl.nodeManager.TrustedClients(), cid)
if err != nil {
return nil, fmt.Errorf("failed to retrieve file info from trusted clients: %v", err)
}

return fileInfo, nil
}

func getOverallFileInfo(ctx context.Context, clients []*node.ZgsClient, cid Cid) (info *node.FileInfo, err error) {
var rootHash eth_common.Hash
if cid.TxSeq == nil {
rootHash = eth_common.HexToHash(cid.Root)
}

var finalInfo *node.FileInfo
for _, client := range clients {
if cid.TxSeq != nil {
info, err = client.GetFileInfoByTxSeq(ctx, *cid.TxSeq)
} else {
info, err = client.GetFileInfo(ctx, rootHash)
}

if err != nil {
return nil, err
}

if info == nil {
return nil, nil
}

if finalInfo == nil {
finalInfo = info
continue
}
finalInfo.Finalized = finalInfo.Finalized && info.Finalized
finalInfo.IsCached = finalInfo.IsCached && info.IsCached
finalInfo.Pruned = finalInfo.Pruned || info.Pruned
finalInfo.UploadedSegNum = min(finalInfo.UploadedSegNum, info.UploadedSegNum)
}

return finalInfo, nil
}
Loading

0 comments on commit 3b8bd11

Please sign in to comment.