From 64a26df54f6226d739c8a5b57b32ad5af07d3061 Mon Sep 17 00:00:00 2001 From: Alexey Sharp Date: Mon, 25 Apr 2022 16:47:30 +0100 Subject: [PATCH] Calculate total size of downloaded files --- cmd/downloader/downloader/grpc_server.go | 25 ++++++++++++++++++------ eth/stagedsync/stage_headers.go | 22 +++++++++++++++++++++ 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/cmd/downloader/downloader/grpc_server.go b/cmd/downloader/downloader/grpc_server.go index 06db650ed42..f6a88897df4 100644 --- a/cmd/downloader/downloader/grpc_server.go +++ b/cmd/downloader/downloader/grpc_server.go @@ -3,6 +3,7 @@ package downloader import ( "context" "errors" + "fmt" "path/filepath" "github.com/anacrolix/torrent" @@ -74,13 +75,26 @@ func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.Dow infoHashes[i] = gointerfaces.ConvertH160toAddress(it.TorrentHash) } } - if err := ResolveAbsentTorrents(ctx, s.t.TorrentClient, infoHashes, s.snapshotDir, s.silent); err != nil { - return nil, err - } - for _, t := range s.t.TorrentClient.Torrents() { - t.AllowDataDownload() + if len(request.Items) == 1 { + t, ok := s.t.TorrentClient.Torrent(infoHashes[0]) + if !ok { + return nil, fmt.Errorf("torrent not found: [%x]", infoHashes[0]) + } + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-t.GotInfo(): + if !t.Complete.Bool() { + t.DownloadAll() + } + mi := t.Metainfo() + if err := CreateTorrentFileIfNotExists(s.snapshotDir, t.Info(), &mi); err != nil { + return nil, err + } + } t.AllowDataUpload() if !t.Complete.Bool() { + t.AllowDataDownload() t.DownloadAll() } } @@ -102,7 +116,6 @@ func (s *GrpcServer) Stats(ctx context.Context, request *proto_downloader.StatsR reply.BytesTotal += uint64(t.Info().TotalLength()) reply.Completed = reply.Completed && t.Complete.Bool() reply.Connections += uint64(len(t.PeerConns())) - for _, peer := range t.PeerConns() { peers[peer.PeerID] = struct{}{} } diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index f6c7ecb06ac..2e6d72d3f58 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -1177,6 +1177,28 @@ func WaitForDownloader(ctx context.Context, tx kv.RwTx, cfg HeadersCfg) error { var prevBytesCompleted uint64 logEvery := time.NewTicker(logInterval) defer logEvery.Stop() + // Initially send all info hashes to calculate total size of all files, to calculate correct progress + req := &proto_downloader.DownloadRequest{Items: make([]*proto_downloader.DownloadItem, len(preverified))} + for i, p := range preverified { + req.Items[i] = &proto_downloader.DownloadItem{ + TorrentHash: downloadergrpc.String2Proto(p.Hash), + Path: p.Name, + } + } + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if _, err := cfg.snapshotDownloader.Download(ctx, req); err != nil { + log.Error("[Snapshots] Can't call downloader", "err", err) + time.Sleep(10 * time.Second) + continue + } + break + } + // Now send all info hashes 1 by one for _, p := range preverified { req := &proto_downloader.DownloadRequest{Items: make([]*proto_downloader.DownloadItem, 1)} req.Items[0] = &proto_downloader.DownloadItem{