From 3b8bd1102e15a720874d7b71570c995871b94167 Mon Sep 17 00:00:00 2001 From: Liqun Date: Fri, 22 Nov 2024 18:32:01 +0800 Subject: [PATCH] feat(indexer-gateway): add new rest api (#72) - add new rest api to get file info and node status - improve storage nodes selection for file upload and download --- cmd/indexer.go | 14 ++-- indexer/file_location_cache.go | 6 +- indexer/gateway/controller.go | 142 +++++++++++++++++++++++++++++++++ indexer/gateway/download.go | 134 +++++++++++-------------------- indexer/gateway/server.go | 35 +++----- indexer/gateway/status.go | 53 ++++++++++++ indexer/gateway/upload.go | 54 +++++++++---- indexer/node_manager.go | 6 +- 8 files changed, 304 insertions(+), 140 deletions(-) create mode 100644 indexer/gateway/controller.go create mode 100644 indexer/gateway/status.go diff --git a/cmd/indexer.go b/cmd/indexer.go index d160638..cb45454 100644 --- a/cmd/indexer.go +++ b/cmd/indexer.go @@ -17,7 +17,6 @@ var ( locations indexer.IPLocationConfig locationCache indexer.FileLocationCacheConfig maxDownloadFileSize uint64 - ExpectedReplica uint } indexerCmd = &cobra.Command{ @@ -45,7 +44,6 @@ func init() { indexerCmd.Flags().IntVar(&indexerArgs.locationCache.CacheSize, "file-location-cache-size", 100000, "size of file location cache") indexerCmd.Flags().Uint64Var(&indexerArgs.maxDownloadFileSize, "max-download-file-size", 100*1024*1024, "Maximum file size in bytes to download") - indexerCmd.Flags().UintVar(&indexerArgs.ExpectedReplica, "expected-replica", 1, "Expected number of replications to upload") indexerCmd.MarkFlagsOneRequired("trusted", "node") @@ -58,17 +56,17 @@ func startIndexer(*cobra.Command, []string) { indexer.InitDefaultIPLocationManager(indexerArgs.locations) - nodeManagerClosable, err := indexer.InitDefaultNodeManager(indexerArgs.nodes) + nodeManager, err := indexer.InitDefaultNodeManager(indexerArgs.nodes) if err != nil { logrus.WithError(err).Fatal("Failed to initialize the default node manager") } - defer nodeManagerClosable() + defer nodeManager.Close() - fileLocationCacheClosable, err := indexer.InitFileLocationCache(indexerArgs.locationCache) + fileLocationCache, err := indexer.InitFileLocationCache(indexerArgs.locationCache) if err != nil { logrus.WithError(err).Fatal("Failed to initialize the default file location cache") } - defer fileLocationCacheClosable() + defer fileLocationCache.Close() api := indexer.NewIndexerApi() @@ -77,11 +75,9 @@ func startIndexer(*cobra.Command, []string) { "discover": len(indexerArgs.nodes.DiscoveryNode) > 0, }).Info("Starting indexer service ...") - gateway.MustServeWithRPC(gateway.Config{ + gateway.MustServeWithRPC(nodeManager, fileLocationCache, gateway.Config{ Endpoint: indexerArgs.endpoint, - Nodes: indexerArgs.nodes.TrustedNodes, MaxDownloadFileSize: indexerArgs.maxDownloadFileSize, - ExpectedReplica: indexerArgs.ExpectedReplica, RPCHandler: rpc.MustNewHandler(map[string]interface{}{ api.Namespace: api, }), diff --git a/indexer/file_location_cache.go b/indexer/file_location_cache.go index 506fbad..f9dce94 100644 --- a/indexer/file_location_cache.go +++ b/indexer/file_location_cache.go @@ -41,7 +41,7 @@ type FileLocationCache struct { var defaultFileLocationCache FileLocationCache -func InitFileLocationCache(config FileLocationCacheConfig) (closable func(), err error) { +func InitFileLocationCache(config FileLocationCacheConfig) (cache *FileLocationCache, err error) { if len(config.DiscoveryNode) > 0 { if defaultFileLocationCache.discoverNode, err = node.NewAdminClient(config.DiscoveryNode, defaultZgsClientOpt); err != nil { return nil, errors.WithMessage(err, "Failed to create admin client to discover peers") @@ -49,10 +49,10 @@ func InitFileLocationCache(config FileLocationCacheConfig) (closable func(), err } defaultFileLocationCache.cache = expirable.NewLRU[uint64, []*shard.ShardedNode](config.CacheSize, nil, config.Expiry) defaultFileLocationCache.discoveryPorts = config.DiscoveryPorts - return defaultFileLocationCache.close, nil + return &defaultFileLocationCache, nil } -func (c *FileLocationCache) close() { +func (c *FileLocationCache) Close() { if c.discoverNode != nil { c.discoverNode.Close() } diff --git a/indexer/gateway/controller.go b/indexer/gateway/controller.go new file mode 100644 index 0000000..185373c --- /dev/null +++ b/indexer/gateway/controller.go @@ -0,0 +1,142 @@ +package gateway + +import ( + "context" + "fmt" + "strconv" + + "github.com/0glabs/0g-storage-client/common/shard" + "github.com/0glabs/0g-storage-client/indexer" + "github.com/0glabs/0g-storage-client/node" + eth_common "github.com/ethereum/go-ethereum/common" + "github.com/pkg/errors" +) + +type Cid struct { + Root string `form:"root" json:"root"` + TxSeq *uint64 `form:"txSeq" json:"txSeq"` +} + +// NewCid parsing the CID from the input string. +func NewCid(cidStr string) Cid { + var cid Cid + if v, err := strconv.ParseUint(cidStr, 10, 64); err == nil { // TxnSeq is used as CID + cid.TxSeq = &v + } else { + cid.Root = cidStr + } + return cid +} + +type RestController struct { + nodeManager *indexer.NodeManager + fileLocationCache *indexer.FileLocationCache + + maxDownloadFileSize uint64 // max download file size +} + +func NewRestController(nodeManager *indexer.NodeManager, locationCache *indexer.FileLocationCache, maxDownloadFileSize uint64) *RestController { + return &RestController{ + nodeManager: nodeManager, + fileLocationCache: locationCache, + maxDownloadFileSize: maxDownloadFileSize, + } +} + +// getAvailableFileLocations returns a list of available file locations for a file with the given CID. +func (ctrl *RestController) getAvailableFileLocations(ctx context.Context, cid Cid) ([]*shard.ShardedNode, error) { + if cid.TxSeq != nil { + return ctrl.fileLocationCache.GetFileLocations(ctx, *cid.TxSeq) + } + + // find corresponding tx sequence + hash := eth_common.HexToHash(cid.Root) + for _, client := range ctrl.nodeManager.TrustedClients() { + info, err := client.GetFileInfo(ctx, hash) + if err == nil && info != nil { + return ctrl.fileLocationCache.GetFileLocations(ctx, info.Tx.Seq) + } + } + + return nil, nil +} + +// getAvailableStorageNodes returns a list of available storage nodes for a file with the given CID. +func (ctrl *RestController) getAvailableStorageNodes(ctx context.Context, cid Cid) ([]*node.ZgsClient, error) { + nodes, err := ctrl.getAvailableFileLocations(ctx, cid) + if err != nil { + return nil, errors.WithMessage(err, "failed to get file locations") + } + + var clients []*node.ZgsClient + for i := range nodes { + client, err := node.NewZgsClient(nodes[i].URL) + if err != nil { + return nil, errors.WithMessage(err, "failed to create zgs client") + } + + clients = append(clients, client) + } + + return clients, nil +} + +// fetchFileInfo encapsulates the logic for attempting to retrieve file info from storage nodes. +func (ctrl *RestController) fetchFileInfo(ctx context.Context, cid Cid) (*node.FileInfo, error) { + clients, err := ctrl.getAvailableStorageNodes(ctx, cid) + if err != nil { + return nil, fmt.Errorf("failed to get available storage nodes: %v", err) + } + + fileInfo, err := getOverallFileInfo(ctx, clients, cid) + if err != nil { + return nil, fmt.Errorf("failed to retrieve file info from storage nodes: %v", err) + } + + if fileInfo != nil { + return fileInfo, nil + } + + // Attempt retrieval from trusted clients as a fallback + fileInfo, err = getOverallFileInfo(ctx, ctrl.nodeManager.TrustedClients(), cid) + if err != nil { + return nil, fmt.Errorf("failed to retrieve file info from trusted clients: %v", err) + } + + return fileInfo, nil +} + +func getOverallFileInfo(ctx context.Context, clients []*node.ZgsClient, cid Cid) (info *node.FileInfo, err error) { + var rootHash eth_common.Hash + if cid.TxSeq == nil { + rootHash = eth_common.HexToHash(cid.Root) + } + + var finalInfo *node.FileInfo + for _, client := range clients { + if cid.TxSeq != nil { + info, err = client.GetFileInfoByTxSeq(ctx, *cid.TxSeq) + } else { + info, err = client.GetFileInfo(ctx, rootHash) + } + + if err != nil { + return nil, err + } + + if info == nil { + return nil, nil + } + + if finalInfo == nil { + finalInfo = info + continue + } + finalInfo.Finalized = finalInfo.Finalized && info.Finalized + finalInfo.IsCached = finalInfo.IsCached && info.IsCached + finalInfo.Pruned = finalInfo.Pruned || info.Pruned + finalInfo.UploadedSegNum = min(finalInfo.UploadedSegNum, info.UploadedSegNum) + } + + return finalInfo, nil +} diff --git a/indexer/gateway/download.go b/indexer/gateway/download.go index d0c6eda..166b957 100644 --- a/indexer/gateway/download.go +++ b/indexer/gateway/download.go @@ -1,30 +1,22 @@ package gateway import ( - "context" - "errors" "fmt" "net/http" "os" "path/filepath" - "strconv" + "strings" - "github.com/0glabs/0g-storage-client/node" "github.com/0glabs/0g-storage-client/transfer" "github.com/0glabs/0g-storage-client/transfer/dir" - "github.com/ethereum/go-ethereum/common" "github.com/gin-gonic/gin" ) -var clients []*node.ZgsClient -var maxDownloadFileSize uint64 - // downloadFile handles file downloads by root hash or transaction sequence. -func downloadFile(c *gin.Context) { +func (ctrl *RestController) downloadFile(c *gin.Context) { var input struct { - Name string `form:"name" json:"name"` - Root string `form:"root" json:"root"` - TxSeq *uint64 `form:"txSeq" json:"txSeq"` + Cid + Name string `form:"name" json:"name"` } if err := c.ShouldBind(&input); err != nil { @@ -37,59 +29,22 @@ func downloadFile(c *gin.Context) { return } - fileInfo, err := getFileInfo(c, common.HexToHash(input.Root), input.TxSeq) - if err != nil { - c.JSON(http.StatusInternalServerError, fmt.Sprintf("Failed to retrieve file info: %v", err)) - return - } - - if fileInfo == nil { - c.JSON(http.StatusNotFound, "File not found") - return - } - - if fileInfo.Pruned { - c.JSON(http.StatusBadRequest, "File already pruned") - return - } - - if !fileInfo.Finalized { - c.JSON(http.StatusBadRequest, "File not finalized yet") - return - } - - if fileInfo.Tx.Size > maxDownloadFileSize { - errMsg := fmt.Sprintf("Requested file size too large, actual = %v, max = %v", fileInfo.Tx.Size, maxDownloadFileSize) - c.JSON(http.StatusBadRequest, errMsg) - return - } - - downloader, err := transfer.NewDownloader(clients) - if err != nil { - c.JSON(http.StatusInternalServerError, fmt.Sprintf("Failed to create downloader, %v", err.Error())) - return - } - - if err := downloadAndServeFile(c, downloader, fileInfo.Tx.DataMerkleRoot.Hex(), input.Name); err != nil { - c.JSON(http.StatusInternalServerError, fmt.Sprintf("Failed to download file: %v", err)) - } + ctrl.downloadAndServeFile(c, input.Cid, input.Name) } // downloadFileInFolder handles file downloads from a directory structure. -func downloadFileInFolder(c *gin.Context) { - cid := c.Param("cid") +func (ctrl *RestController) downloadFileInFolder(c *gin.Context) { + cidStr := strings.TrimSpace(c.Param("cid")) filePath := filepath.Clean(c.Param("filePath")) + cid := NewCid(cidStr) - var root common.Hash - var txSeq *uint64 - - if v, err := strconv.ParseUint(cid, 10, 64); err == nil { // TxnSeq is used as cid - txSeq = &v - } else { - root = common.HexToHash(cid) + clients, err := ctrl.getAvailableStorageNodes(c, cid) + if err != nil { + c.JSON(http.StatusInternalServerError, fmt.Sprintf("Failed to get available storage nodes, %v", err.Error())) + return } - fileInfo, err := getFileInfo(c, root, txSeq) + fileInfo, err := getOverallFileInfo(c, clients, cid) if err != nil { c.JSON(http.StatusInternalServerError, fmt.Sprintf("Failed to retrieve file info: %v", err)) return @@ -116,7 +71,7 @@ func downloadFileInFolder(c *gin.Context) { return } - root = fileInfo.Tx.DataMerkleRoot + root := fileInfo.Tx.DataMerkleRoot ftree, err := transfer.BuildFileTree(c, downloader, root.Hex(), true) if err != nil { @@ -140,53 +95,61 @@ func downloadFileInFolder(c *gin.Context) { // This prevents the server from following the symbolic link and returning the target file's content. c.JSON(http.StatusOK, fnode) case dir.FileTypeFile: - if fnode.Size > int64(maxDownloadFileSize) { - errMsg := fmt.Sprintf("Requested file size too large, actual = %v, max = %v", fnode.Size, maxDownloadFileSize) + if fnode.Size > int64(ctrl.maxDownloadFileSize) { + errMsg := fmt.Sprintf("Requested file size too large, actual = %v, max = %v", fnode.Size, ctrl.maxDownloadFileSize) c.JSON(http.StatusBadRequest, errMsg) return } - if err := downloadAndServeFile(c, downloader, fnode.Root, fnode.Name); err != nil { - c.JSON(http.StatusInternalServerError, fmt.Sprintf("Failed to download file: %v", err)) - } + ctrl.downloadAndServeFile(c, Cid{Root: fnode.Root}, fnode.Name) default: c.JSON(http.StatusInternalServerError, fmt.Sprintf("Unsupported file type: %v", fnode.Type)) return } } -// getFileInfo retrieves file info based on root or transaction sequence. -func getFileInfo(ctx context.Context, root common.Hash, txSeq *uint64) (info *node.FileInfo, err error) { - if len(clients) == 0 { - return nil, errors.New("no clients available") +// downloadAndServeFile downloads the file and serves it as an attachment. +func (ctrl *RestController) downloadAndServeFile(c *gin.Context, cid Cid, filename string) { + clients, err := ctrl.getAvailableStorageNodes(c, cid) + if err != nil { + c.JSON(http.StatusInternalServerError, fmt.Sprintf("Failed to get available storage nodes, %v", err.Error())) + return } - for _, client := range clients { - if txSeq != nil { - info, err = client.GetFileInfoByTxSeq(ctx, *txSeq) - } else { - info, err = client.GetFileInfo(ctx, root) - } + fileInfo, err := getOverallFileInfo(c, clients, cid) + if err != nil { + c.JSON(http.StatusInternalServerError, fmt.Sprintf("Failed to retrieve file info: %v", err)) + return + } - if err != nil { - return nil, err - } + if fileInfo == nil { + c.JSON(http.StatusNotFound, "File not found") + return + } - if info != nil { - return info, nil - } + if fileInfo.Pruned { + c.JSON(http.StatusBadRequest, "File already pruned") + return } - return nil, nil -} + if !fileInfo.Finalized { + c.JSON(http.StatusBadRequest, "File not finalized yet") + return + } -// downloadAndServeFile downloads the file and serves it as an attachment. -func downloadAndServeFile(c *gin.Context, downloader *transfer.Downloader, root, filename string) error { + downloader, err := transfer.NewDownloader(clients) + if err != nil { + c.JSON(http.StatusInternalServerError, fmt.Sprintf("Failed to create downloader, %v", err.Error())) + return + } + + root := fileInfo.Tx.DataMerkleRoot.Hex() tmpfile := filepath.Join(os.TempDir(), fmt.Sprintf("zgs_indexer_download_%v", root)) defer os.Remove(tmpfile) if err := downloader.Download(c, root, tmpfile, true); err != nil { - return fmt.Errorf("failed to download file: %w", err) + c.JSON(http.StatusInternalServerError, fmt.Sprintf("Failed to download file: %v", err.Error())) + return } if len(filename) == 0 { @@ -194,7 +157,6 @@ func downloadAndServeFile(c *gin.Context, downloader *transfer.Downloader, root, } c.FileAttachment(tmpfile, filename) - return nil } // serveDirectoryListing serves the list of files in a directory. diff --git a/indexer/gateway/server.go b/indexer/gateway/server.go index 6308d56..6b986bd 100644 --- a/indexer/gateway/server.go +++ b/indexer/gateway/server.go @@ -4,34 +4,23 @@ import ( "net/http" "github.com/0glabs/0g-storage-client/common/rpc" - "github.com/0glabs/0g-storage-client/node" + "github.com/0glabs/0g-storage-client/indexer" "github.com/gin-contrib/cors" "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" ) type Config struct { - Endpoint string // http endpoint - - Nodes []string // storage nodes for file upload or download - MaxDownloadFileSize uint64 // max download file size - ExpectedReplica uint // expected upload replica number - - RPCHandler http.Handler // enable to provide both RPC and REST API service + Endpoint string // http endpoint + RPCHandler http.Handler // enable to provide both RPC and REST API service + MaxDownloadFileSize uint64 // max download file size } -func MustServeWithRPC(config Config) { - if len(config.Nodes) == 0 { - logrus.Fatal("Nodes not specified to start HTTP server") - } - - // init global variables - clients = node.MustNewZgsClients(config.Nodes) - maxDownloadFileSize = config.MaxDownloadFileSize - expectedReplica = config.ExpectedReplica +func MustServeWithRPC(nodeManager *indexer.NodeManager, locationCache *indexer.FileLocationCache, config Config) { + controller := NewRestController(nodeManager, locationCache, config.MaxDownloadFileSize) // init router - router := newRouter() + router := newRouter(controller) if config.RPCHandler != nil { router.POST("/", gin.WrapH(config.RPCHandler)) } @@ -39,7 +28,7 @@ func MustServeWithRPC(config Config) { rpc.Start(config.Endpoint, router) } -func newRouter() *gin.Engine { +func newRouter(controller *RestController) *gin.Engine { router := gin.New() // middlewares @@ -50,9 +39,11 @@ func newRouter() *gin.Engine { router.Use(middlewareCors()) // handlers - router.GET("/file", downloadFile) - router.GET("/file/:cid/*filePath", downloadFileInFolder) - router.POST("/file/segment", uploadSegment) + router.GET("/file", controller.downloadFile) + router.GET("/file/:cid/*filePath", controller.downloadFileInFolder) + router.GET("/file/info/:cid", controller.getFileStatus) + router.GET("/node/status", controller.getNodeStatus) + router.POST("/file/segment", controller.uploadSegment) return router } diff --git a/indexer/gateway/status.go b/indexer/gateway/status.go new file mode 100644 index 0000000..b6cf4b1 --- /dev/null +++ b/indexer/gateway/status.go @@ -0,0 +1,53 @@ +package gateway + +import ( + "fmt" + "net/http" + "strings" + + "github.com/0glabs/0g-storage-client/node" + "github.com/gin-gonic/gin" +) + +func (ctrl *RestController) getFileStatus(c *gin.Context) { + cidStr := strings.TrimSpace(c.Param("cid")) + cid := NewCid(cidStr) + + fileInfo, err := ctrl.fetchFileInfo(c, cid) + if err != nil { + c.JSON(http.StatusInternalServerError, fmt.Sprintf("Failed to retrieve file info: %v", err)) + return + } + + if fileInfo == nil { + c.JSON(http.StatusNotFound, "File not found") + return + } + + c.JSON(http.StatusOK, fileInfo) +} + +func (ctrl *RestController) getNodeStatus(c *gin.Context) { + var finalStatus *node.Status + for _, client := range ctrl.nodeManager.TrustedClients() { + status, err := client.GetStatus(c) + if err != nil { + c.JSON(http.StatusInternalServerError, fmt.Sprintf("Failed to retrieve node status: %v", err)) + } + + if finalStatus == nil { + finalStatus = &status + continue + } + + if finalStatus.LogSyncHeight > status.LogSyncHeight { + finalStatus.LogSyncHeight = status.LogSyncHeight + finalStatus.LogSyncBlock = status.LogSyncBlock + } + + finalStatus.ConnectedPeers = max(finalStatus.ConnectedPeers, status.ConnectedPeers) + finalStatus.NextTxSeq = min(finalStatus.NextTxSeq, status.NextTxSeq) + } + + c.JSON(http.StatusOK, finalStatus) +} diff --git a/indexer/gateway/upload.go b/indexer/gateway/upload.go index a4cd6fb..5092cab 100644 --- a/indexer/gateway/upload.go +++ b/indexer/gateway/upload.go @@ -1,6 +1,7 @@ package gateway import ( + "context" "net/http" "github.com/0glabs/0g-storage-client/core" @@ -12,17 +13,16 @@ import ( "github.com/pkg/errors" ) -var expectedReplica uint - type UploadSegmentRequest struct { - Root common.Hash `json:"root"` // file merkle root - TxSeq *uint64 `json:"txSeq"` // Transaction sequence - Data []byte `json:"data"` // segment data - Index uint64 `json:"index"` // segment index - Proof merkle.Proof `json:"proof"` // segment merkle proof + Root common.Hash `json:"root"` // file merkle root + TxSeq *uint64 `json:"txSeq"` // Transaction sequence + Data []byte `json:"data"` // segment data + Index uint64 `json:"index"` // segment index + Proof merkle.Proof `json:"proof"` // segment merkle proof + ExpectedReplica uint `json:"expectedReplica"` // expected replica count, default 1 } -func uploadSegment(c *gin.Context) { +func (ctrl *RestController) uploadSegment(c *gin.Context) { var input UploadSegmentRequest // bind the `application/json` request @@ -37,13 +37,27 @@ func uploadSegment(c *gin.Context) { return } - // retrieve and validate file info - fileInfo, err := getFileInfo(c, input.Root, input.TxSeq) - if err != nil { - c.JSON(http.StatusInternalServerError, errors.WithMessage(err, "Failed to retrieve file info").Error()) - return + cid := Cid{ + Root: input.Root.String(), + TxSeq: input.TxSeq, + } + + var fileInfo *node.FileInfo + var selectedClients []*node.ZgsClient + + // select trusted storage nodes that have already synced the submitted event + for _, client := range ctrl.nodeManager.TrustedClients() { + info, err := getOverallFileInfo(c, []*node.ZgsClient{client}, cid) + if err != nil { + c.JSON(http.StatusInternalServerError, errors.WithMessage(err, "Failed to retrieve file info").Error()) + return + } + if info != nil { + selectedClients = append(selectedClients, client) + fileInfo = info + } } - if fileInfo == nil { + if len(selectedClients) == 0 || fileInfo == nil { c.JSON(http.StatusNotFound, "File not found") return } @@ -62,7 +76,7 @@ func uploadSegment(c *gin.Context) { Proof: input.Proof, FileSize: fileInfo.Tx.Size, } - if err := uploadSegmentWithProof(c, segment, fileInfo); err != nil { + if err := uploadSegmentWithProof(c, selectedClients, segment, fileInfo, input.ExpectedReplica); err != nil { c.JSON(http.StatusInternalServerError, errors.WithMessage(err, "Failed to upload segment with proof").Error()) return } @@ -79,7 +93,13 @@ func validateMerkleProof(req UploadSegmentRequest, fileInfo *node.FileInfo) erro } // uploadSegmentWithProof is a helper function to upload the segment with proof -func uploadSegmentWithProof(c *gin.Context, segment node.SegmentWithProof, fileInfo *node.FileInfo) error { +func uploadSegmentWithProof( + ctx context.Context, clients []*node.ZgsClient, segment node.SegmentWithProof, fileInfo *node.FileInfo, expectedReplica uint) error { + + if expectedReplica == 0 { + expectedReplica = 1 + } + opt := transfer.UploadOption{ ExpectedReplica: expectedReplica, } @@ -87,5 +107,5 @@ func uploadSegmentWithProof(c *gin.Context, segment node.SegmentWithProof, fileI Segments: []node.SegmentWithProof{segment}, FileInfo: fileInfo, } - return transfer.NewFileSegementUploader(clients).Upload(c, fileSegements, opt) + return transfer.NewFileSegementUploader(clients).Upload(ctx, fileSegements, opt) } diff --git a/indexer/node_manager.go b/indexer/node_manager.go index 5482e89..6a176e2 100644 --- a/indexer/node_manager.go +++ b/indexer/node_manager.go @@ -51,7 +51,7 @@ type NodeManager struct { } // InitDefaultNodeManager initializes the default `NodeManager`. -func InitDefaultNodeManager(config NodeManagerConfig) (closable func(), err error) { +func InitDefaultNodeManager(config NodeManagerConfig) (mgr *NodeManager, err error) { if len(config.DiscoveryNode) > 0 { if defaultNodeManager.discoverNode, err = node.NewAdminClient(config.DiscoveryNode, defaultZgsClientOpt); err != nil { return nil, errors.WithMessage(err, "Failed to create admin client to discover peers") @@ -68,7 +68,7 @@ func InitDefaultNodeManager(config NodeManagerConfig) (closable func(), err erro go util.Schedule(defaultNodeManager.update, config.UpdateInterval, "Failed to update shard configs once") } - return defaultNodeManager.close, nil + return &defaultNodeManager, nil } // TrustedClients returns trusted clients. @@ -159,7 +159,7 @@ func (nm *NodeManager) AddTrustedNodes(nodes ...string) error { return nil } -func (nm *NodeManager) close() { +func (nm *NodeManager) Close() { nm.trusted.Range(func(key, value any) bool { value.(*node.ZgsClient).Close() return true