Skip to content

Commit

Permalink
feat(indexer-client): support file segment uploading
Browse files Browse the repository at this point in the history
- Improve indexer test stub setup to wait for port being available.
- Add integration tests for the file segment uploading.
  • Loading branch information
wanliqun committed Nov 15, 2024
1 parent 54ba802 commit fbcf7a0
Show file tree
Hide file tree
Showing 4 changed files with 256 additions and 1 deletion.
52 changes: 52 additions & 0 deletions indexer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,58 @@ func (c *Client) BatchUpload(ctx context.Context, w3Client *web3go.Client, datas
}
}

// NewUploaderFromIndexerNodes return a file segment uploader with selected storage nodes from indexer service.
func (c *Client) NewFileSegmentUploaderFromIndexerNodes(
ctx context.Context, segNum uint64, expectedReplica uint, dropped []string) (*transfer.FileSegmentUploader, error) {

clients, err := c.SelectNodes(ctx, segNum, expectedReplica, dropped)
if err != nil {
return nil, err
}
urls := make([]string, len(clients))
for i, client := range clients {
urls[i] = client.URL()
}
c.logger.Infof("get %v storage nodes from indexer: %v", len(urls), urls)
return transfer.NewFileSegementUploader(clients, c.option.LogOption), nil
}

// UploadFileSegments transfer segment data of a file, which should has already been submitted to the 0g storage contract,
// to the storage nodes selected from indexer service.
func (c *Client) UploadFileSegments(
ctx context.Context, fileSeg transfer.FileSegmentsWithProof, option ...transfer.UploadOption) error {

if fileSeg.FileInfo == nil {
return errors.New("file not found")
}

if len(fileSeg.Segments) == 0 {
return errors.New("segment data is empty")
}

expectedReplica := uint(1)
if len(option) > 0 {
expectedReplica = max(expectedReplica, option[0].ExpectedReplica)
}

numSeg := core.NumSplits(int64(fileSeg.FileInfo.Tx.Size), core.DefaultSegmentSize)
dropped := make([]string, 0)
for {
uploader, err := c.NewFileSegmentUploaderFromIndexerNodes(ctx, numSeg, expectedReplica, dropped)
if err != nil {
return err
}

var rpcError *node.RPCError
if err := uploader.Upload(ctx, fileSeg, option...); errors.As(err, &rpcError) {
dropped = append(dropped, rpcError.URL)
c.logger.Infof("dropped problematic node and retry: %v", rpcError.Error())
} else {
return err
}
}
}

func (c *Client) NewDownloaderFromIndexerNodes(ctx context.Context, root string) (*transfer.Downloader, error) {
locations, err := c.GetFileLocations(ctx, root)
if err != nil {
Expand Down
11 changes: 10 additions & 1 deletion tests/client_test_framework/test_framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import time
import traceback
import json
import requests
from pathlib import Path

from eth_utils import encode_hex
Expand Down Expand Up @@ -393,7 +394,15 @@ def setup_indexer(self, trusted, discover_node, discover_ports = None):
env=os.environ.copy(),
)
self.indexer_rpc_url = "http://127.0.0.1:{}".format(indexer_port(0))


def is_port_available(url):
try:
response = requests.get(url, timeout=20)
return response.status_code is not None
except requests.RequestException:
return False
wait_until(lambda: is_port_available(self.indexer_rpc_url), timeout=20)

def stop_indexer(self):
if self.indexer_process is not None:
self.indexer_process.terminate()
Expand Down
165 changes: 165 additions & 0 deletions tests/go_tests/segment_upload_test/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package main

import (
"context"
"io"
"math/rand"
"os"
"strings"
"time"

"github.com/0glabs/0g-storage-client/common"
"github.com/0glabs/0g-storage-client/common/blockchain"
"github.com/0glabs/0g-storage-client/common/util"
"github.com/0glabs/0g-storage-client/core"
"github.com/0glabs/0g-storage-client/core/merkle"
"github.com/0glabs/0g-storage-client/indexer"
"github.com/0glabs/0g-storage-client/node"
"github.com/0glabs/0g-storage-client/transfer"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

func randomBytes(length int) ([]byte, error) {
b := make([]byte, length)
r := rand.New(rand.NewSource(time.Now().UnixNano()))

_, err := io.ReadFull(r, b)
if err != nil {
return nil, err
}
return b, nil
}

func getFileSegments(tree *merkle.Tree, data core.IterableData) (res []node.SegmentWithProof, err error) {
numChunks := data.NumChunks()
for segIndex := uint64(0); segIndex < data.NumSegments(); segIndex++ {
// check segment index
startIndex := segIndex * core.DefaultSegmentMaxChunks
if startIndex >= numChunks {
break
}
// get segment
segment, err := core.ReadAt(data, core.DefaultSegmentSize, int64(segIndex*core.DefaultSegmentSize), data.PaddedSize())
if err != nil {
return nil, errors.WithMessage(err, "failed to read segment")
}
if startIndex+uint64(len(segment))/core.DefaultChunkSize >= numChunks {
// last segment has real data
expectedLen := core.DefaultChunkSize * int(numChunks-startIndex)
segment = segment[:expectedLen]
}
// fill proof
proof := tree.ProofAt(int(segIndex))
segWithProof := node.SegmentWithProof{
Root: tree.Root(),
Data: segment,
Index: segIndex,
Proof: proof,
FileSize: uint64(data.Size()),
}
res = append(res, segWithProof)
}

return res, nil
}

func runTest() error {
ctx := context.Background()
// load args
args := os.Args[1:]
key := args[0]
chainUrl := args[1]
zgsNodeUrls := strings.Split(args[2], ",")
indexerUrl := args[3]

w3client := blockchain.MustNewWeb3(chainUrl, key)
defer w3client.Close()

// submit log entry
bytes, err := randomBytes(core.DefaultSegmentSize*3 + 100)
if err != nil {
return errors.WithMessage(err, "failed to generate random bytes")
}
data, err := core.NewDataInMemory(bytes)
if err != nil {
return errors.WithMessage(err, "failed to initialize data")
}
tree, err := core.MerkleTree(data)
if err != nil {
return errors.WithMessage(err, "failed to build merkle tree")
}
indexerClient, err := indexer.NewClient(indexerUrl, indexer.IndexerClientOption{LogOption: common.LogOption{Logger: logrus.StandardLogger()}})
if err != nil {
return errors.WithMessage(err, "failed to initialize indexer client")
}
defer indexerClient.Close()
uploader, err := indexerClient.NewUploaderFromIndexerNodes(ctx, data.NumSegments(), w3client, 1, nil)
if err != nil {
return errors.WithMessage(err, "failed to initialize uploader")
}
_, _, err = uploader.SubmitLogEntry(ctx, []core.IterableData{data}, make([][]byte, 1), nil, nil)
if err != nil {
return errors.WithMessage(err, "failed to sub log entry")
}
// wait for log entry
var info *node.FileInfo
zgsClients := node.MustNewZgsClients(zgsNodeUrls)
for i := range zgsClients {
zgsClients[i].Close()
}
waitLoop:
for tryN, maxTries := 0, 15; tryN < maxTries; tryN++ {
time.Sleep(time.Second)
for _, client := range zgsClients {
info, err = client.GetFileInfo(ctx, tree.Root())
if err != nil {
return errors.WithMessage(err, "failed to get file info")
}
if info != nil {
break waitLoop
}
}
if tryN == maxTries-1 {
return errors.New("failed to get file info after too many retries")
}
}

// upload file segments
segments, err := getFileSegments(tree, data)
if err != nil {
return errors.WithMessage(err, "failed to get file segments")
}
if err := indexerClient.UploadFileSegments(ctx, transfer.FileSegmentsWithProof{
Segments: segments,
FileInfo: info,
}); err != nil {
return errors.WithMessage(err, "failed to upload file segments")
}

// check upload result
checkLoop:
for tryN, maxTries := 0, 15; tryN < maxTries; tryN++ {
time.Sleep(time.Second)
for _, client := range zgsClients {
info, err = client.GetFileInfo(ctx, tree.Root())
if err != nil {
return errors.WithMessage(err, "failed to get file info")
}
if info != nil && info.UploadedSegNum == data.NumSegments() {
break checkLoop
}
}
if tryN == maxTries-1 {
return errors.New("failed to get file info to check result after too many retries")
}
}

return nil
}

func main() {
if err := util.WaitUntil(runTest, time.Minute*3); err != nil {
logrus.WithError(err).Fatalf("file segments upload test failed")
}
}
29 changes: 29 additions & 0 deletions tests/segment_upload_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import os

from client_test_framework.test_framework import ClientTestFramework
from config.node_config import GENESIS_PRIV_KEY
from client_utility.run_go_test import run_go_test

class FileSegmentUploadTest(ClientTestFramework):
def setup_params(self):
self.num_blockchain_nodes = 1
self.num_nodes = 1

def run_test(self):
ports = ",".join([x.rpc_url.split(":")[-1] for x in self.nodes])
self.setup_indexer(self.nodes[0].rpc_url, self.nodes[0].rpc_url, ports)
test_args = [
"go",
"run",
os.path.join(os.path.dirname(__file__), "go_tests", "segment_upload_test", "main.go"),
# arguments passed to go
GENESIS_PRIV_KEY,
self.blockchain_nodes[0].rpc_url,
",".join([x.rpc_url for x in self.nodes]),
self.indexer_rpc_url
]
self.log.info("run go test with args: {}".format(test_args))
run_go_test(self.root_dir, test_args)

if __name__ == "__main__":
FileSegmentUploadTest().main()

0 comments on commit fbcf7a0

Please sign in to comment.