Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: feat: paramseed - cluster-p2p paramfetch #333

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions harmony/harmonydb/sql/20241128-paramfetch-reseed.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE paramfetch_urls (
machine INT,
cid TEXT,

PRIMARY KEY (machine, cid),
FOREIGN KEY (machine) REFERENCES harmony_machines (id) ON DELETE CASCADE
);
122 changes: 90 additions & 32 deletions lib/fastparamfetch/paramfetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
Expand All @@ -16,6 +17,7 @@ import (
"sync"
"time"

"github.com/ipfs/go-cid"
fslock "github.com/ipfs/go-fs-lock"
logging "github.com/ipfs/go-log/v2"
"github.com/minio/blake2b-simd"
Expand All @@ -25,7 +27,6 @@ import (

var log = logging.Logger("paramfetch")

// const gateway = "http://198.211.99.118/ipfs/"
const gateway = "https://proofs.filecoin.io/ipfs/"
const paramdir = "/var/tmp/filecoin-proof-parameters"
const dirEnv = "FIL_PROOFS_PARAMETER_CACHE"
Expand All @@ -49,6 +50,8 @@ type fetch struct {
fsLockRelease func()
fsLockOnce sync.Once
lockFail bool // true if we failed to acquire the lock at least once, meaning that is was claimed by another process

ps *ParamServe
}

func getParamDir() string {
Expand All @@ -58,7 +61,7 @@ func getParamDir() string {
return os.Getenv(dirEnv)
}

func GetParams(ctx context.Context, paramBytes []byte, srsBytes []byte, storageSize uint64) error {
func GetParams(ctx context.Context, ps *ParamServe, paramBytes []byte, srsBytes []byte, storageSize uint64) error {
if err := os.Mkdir(getParamDir(), 0755); err != nil && !os.IsExist(err) {
return err
}
Expand All @@ -69,7 +72,9 @@ func GetParams(ctx context.Context, paramBytes []byte, srsBytes []byte, storageS
return err
}

ft := &fetch{}
ft := &fetch{
ps: ps,
}

defer func() {
if ft.fsLockRelease != nil {
Expand Down Expand Up @@ -152,7 +157,7 @@ func (ft *fetch) maybeFetchAsync(ctx context.Context, name string, info paramFil
return
}

if err := doFetch(ctx, path, info); err != nil {
if err := ft.doFetch(ctx, path, info); err != nil {
ft.errs = append(ft.errs, xerrors.Errorf("fetching file %s failed: %w", path, err))
return
}
Expand All @@ -166,7 +171,7 @@ func (ft *fetch) maybeFetchAsync(ctx context.Context, name string, info paramFil
return
}

if err := doFetch(ctx, path, info); err != nil {
if err := ft.doFetch(ctx, path, info); err != nil {
ft.errs = append(ft.errs, xerrors.Errorf("fetching file %s failed: %w", path, err))
return
}
Expand Down Expand Up @@ -226,6 +231,16 @@ func (ft *fetch) checkFile(path string, info paramFile) error {
checked[path] = struct{}{}
checkedLk.Unlock()

// Call ps.allowCid
if ft.ps != nil {
c, err := cid.Parse(info.Cid)
if err != nil {
log.Errorf("Invalid CID %s: %v", info.Cid, err)
} else {
ft.ps.allowCid(context.Background(), c, path)
}
}

return nil
}

Expand All @@ -250,26 +265,78 @@ func (ft *fetch) wait(ctx context.Context) error {
return multierr.Combine(ft.errs...)
}

func doFetch(ctx context.Context, out string, info paramFile) error {
func (ft *fetch) doFetch(ctx context.Context, out string, info paramFile) error {
c, err := cid.Parse(info.Cid)
if err != nil {
return err
}

var urls []string

if ft.ps != nil {
// Get URLs from paramserve
u, err := ft.ps.urlsForCid(ctx, c)
if err != nil {
log.Warnf("Failed to get URLs for CID %s: %v", c.String(), err)
} else {
for _, hostAndPort := range u {
// Build URL
urlStr := fmt.Sprintf("http://%s/params/ipfs/%s", hostAndPort, info.Cid)
urls = append(urls, urlStr)
}
}
}

// Append the default gateway at the end
gw := os.Getenv("IPFS_GATEWAY")
if gw == "" {
gw = gateway
}
log.Infof("Fetching %s from %s", out, gw)
urls = append(urls, gw+info.Cid)

for _, urlStr := range urls {
log.Infof("Fetching %s from %s", out, urlStr)
u, err := url.Parse(urlStr)
if err != nil {
log.Warnf("Invalid URL %s: %v", urlStr, err)
continue
}
// Try aria2c first
if err := fetchWithAria2c(ctx, out, u.String()); err == nil {
return nil
} else {
log.Warnf("aria2c fetch failed: %s", err)
}

url, err := url.Parse(gw + info.Cid)
// Try HTTP client
if err := fetchWithHTTPClient(ctx, out, u); err == nil {
return nil
} else {
log.Warnf("HTTP fetch failed: %s", err)
}
}

return xerrors.Errorf("failed to fetch %s from any source", info.Cid)
}

func fetchWithAria2c(ctx context.Context, out, url string) error {
aria2cPath, err := exec.LookPath("aria2c")
if err != nil {
return err
return xerrors.New("aria2c not found in PATH")
}
log.Infof("GET %s", url)

// Try aria2c first
if err := fetchWithAria2c(ctx, out, url.String()); err == nil {
return nil
} else {
log.Warnf("aria2c fetch failed: %s", err)
cmd := exec.CommandContext(ctx, aria2cPath, "--continue", "-x16", "-s16", "--dir", filepath.Dir(out), "-o", filepath.Base(out), url)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr

if err := cmd.Run(); err != nil {
return err
}

return nil
}

func fetchWithHTTPClient(ctx context.Context, out string, u *url.URL) error {
outf, err := os.OpenFile(out, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
return err
Expand All @@ -283,7 +350,7 @@ func doFetch(ctx context.Context, out string, info paramFile) error {
header := http.Header{}
header.Set("Range", "bytes="+strconv.FormatInt(fStat.Size(), 10)+"-")

req, err := http.NewRequestWithContext(ctx, "GET", url.String(), nil)
req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil)
if err != nil {
return err
}
Expand All @@ -296,24 +363,15 @@ func doFetch(ctx context.Context, out string, info paramFile) error {
}
defer resp.Body.Close()

_, err = io.Copy(outf, resp.Body)

return err
}

func fetchWithAria2c(ctx context.Context, out, url string) error {
aria2cPath, err := exec.LookPath("aria2c")
if err != nil {
return xerrors.New("aria2c not found in PATH")
if resp.StatusCode == http.StatusNotFound {
return xerrors.New("file not found on server")
}

cmd := exec.CommandContext(ctx, aria2cPath, "--continue", "-x16", "-s16", "--dir", filepath.Dir(out), "-o", filepath.Base(out), url)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr

if err := cmd.Run(); err != nil {
return err
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
return xerrors.Errorf("unexpected HTTP status: %s", resp.Status)
}

return nil
_, err = io.Copy(outf, resp.Body)

return err
}
124 changes: 124 additions & 0 deletions lib/fastparamfetch/paramserve.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package fastparamfetch

import (
"context"
"fmt"

Check failure on line 5 in lib/fastparamfetch/paramserve.go

View workflow job for this annotation

GitHub Actions / build-calibnet

"fmt" imported and not used

Check failure on line 5 in lib/fastparamfetch/paramserve.go

View workflow job for this annotation

GitHub Actions / build-debug

"fmt" imported and not used

Check failure on line 5 in lib/fastparamfetch/paramserve.go

View workflow job for this annotation

GitHub Actions / build-2k

"fmt" imported and not used

Check failure on line 5 in lib/fastparamfetch/paramserve.go

View workflow job for this annotation

GitHub Actions / build-forest

"fmt" imported and not used

Check failure on line 5 in lib/fastparamfetch/paramserve.go

View workflow job for this annotation

GitHub Actions / build-mainnet

"fmt" imported and not used

Check failure on line 5 in lib/fastparamfetch/paramserve.go

View workflow job for this annotation

GitHub Actions / gen-check

"fmt" imported and not used

Check failure on line 5 in lib/fastparamfetch/paramserve.go

View workflow job for this annotation

GitHub Actions / lint

"fmt" imported and not used (typecheck)

Check failure on line 5 in lib/fastparamfetch/paramserve.go

View workflow job for this annotation

GitHub Actions / lint

"fmt" imported and not used) (typecheck)

Check failure on line 5 in lib/fastparamfetch/paramserve.go

View workflow job for this annotation

GitHub Actions / lint

"fmt" imported and not used) (typecheck)

Check failure on line 5 in lib/fastparamfetch/paramserve.go

View workflow job for this annotation

GitHub Actions / test (test-itest-curio, ./itests/curio_test.go)

"fmt" imported and not used

Check failure on line 5 in lib/fastparamfetch/paramserve.go

View workflow job for this annotation

GitHub Actions / test (test-all, `go list ./... | grep -v curio/itests`)

"fmt" imported and not used
"net/http"
"sync"

"github.com/filecoin-project/curio/deps"
"github.com/filecoin-project/curio/harmony/harmonydb"
"github.com/gorilla/mux"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
)

type ParamServe struct {
db *harmonydb.DB

lk sync.Mutex
allow map[string]bool // file CIDs

cidToFile map[string]string // mapping from CID string to file path

machineID int
}

func NewParamServe(db *harmonydb.DB, machineID int) *ParamServe {
return &ParamServe{
db: db,
allow: make(map[string]bool),
cidToFile: make(map[string]string),
machineID: machineID,
}
}

func (ps *ParamServe) allowCid(ctx context.Context, c cid.Cid, path string) {
ps.lk.Lock()
defer ps.lk.Unlock()

ps.allow[c.String()] = true
ps.cidToFile[c.String()] = path

// Insert into the database that this machine has this CID
err := ps.insertCidForMachine(ctx, c.String())
if err != nil {
log.Errorf("Failed to insert CID %s for machine: %v", c.String(), err)
}
}

func (ps *ParamServe) insertCidForMachine(ctx context.Context, cidStr string) error {
// Insert into paramfetch_urls (machine, cid)
_, err := ps.db.Exec(ctx, `INSERT INTO paramfetch_urls (machine, cid) VALUES ($1, $2) ON CONFLICT DO NOTHING`, ps.machineID, cidStr)
return err
}

func (ps *ParamServe) urlsForCid(ctx context.Context, c cid.Cid) ([]string, error) {
rows, err := ps.db.Query(ctx, `SELECT harmony_machines.host_and_port FROM harmony_machines
JOIN paramfetch_urls ON harmony_machines.id = paramfetch_urls.machine
WHERE paramfetch_urls.cid = $1`, c.String())
if err != nil {
return nil, err
}
defer rows.Close()

var urls []string
for rows.Next() {
var hostAndPort string
if err := rows.Scan(&hostAndPort); err != nil {
return nil, err
}
urls = append(urls, hostAndPort)
}
if err := rows.Err(); err != nil {
return nil, err
}
return urls, nil
}

func (ps *ParamServe) getFilePathForCid(c cid.Cid) (string, error) {
ps.lk.Lock()
defer ps.lk.Unlock()

filePath, ok := ps.cidToFile[c.String()]
if !ok {
return "", xerrors.Errorf("file path for CID %s not found", c.String())
}
return filePath, nil
}

func (ps *ParamServe) ServeHTTP(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
cidStr := vars["cid"]
if cidStr == "" {
http.Error(w, "CID not specified", http.StatusBadRequest)
return
}

// Parse the CID
c, err := cid.Parse(cidStr)
if err != nil {
http.Error(w, "Invalid CID", http.StatusBadRequest)
return
}

ps.lk.Lock()
allowed := ps.allow[c.String()]
ps.lk.Unlock()
if !allowed {
http.Error(w, "CID not allowed", http.StatusNotFound)
return
}

filePath, err := ps.getFilePathForCid(c)
if err != nil {
http.Error(w, "File not found", http.StatusNotFound)
return
}

http.ServeFile(w, r, filePath)
}

func Routes(r *mux.Router, deps *deps.Deps, serve *ParamServe) {
r.Methods("GET", "HEAD").Path("/params/ipfs/{cid}").HandlerFunc(serve.ServeHTTP)
}
Loading