Skip to content

Commit

Permalink
Merge pull request #533 from ipfs-force-community/feat/spark
Browse files Browse the repository at this point in the history
feat: support spark
  • Loading branch information
LinZexiao authored Jul 31, 2024
2 parents 199cfaf + d459f62 commit aabfd6d
Show file tree
Hide file tree
Showing 17 changed files with 1,782 additions and 182 deletions.
120 changes: 120 additions & 0 deletions api/impl/venus_market.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package impl
import (
"context"
"fmt"
"io"
"os"
"sort"
"time"
Expand All @@ -24,12 +25,14 @@ import (
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multihash"
"github.com/pkg/errors"

"github.com/ipfs-force-community/sophon-auth/jwtclient"

clients2 "github.com/ipfs-force-community/droplet/v2/api/clients"
"github.com/ipfs-force-community/droplet/v2/config"
"github.com/ipfs-force-community/droplet/v2/indexprovider"
"github.com/ipfs-force-community/droplet/v2/minermgr"
"github.com/ipfs-force-community/droplet/v2/models/repo"
"github.com/ipfs-force-community/droplet/v2/network"
Expand Down Expand Up @@ -66,6 +69,7 @@ type MarketNodeImpl struct {
DataTransfer network.ProviderDataTransfer
DealPublisher *storageprovider.DealPublisher
DealAssigner storageprovider.DealAssiger
IndexProviderMgr *indexprovider.IndexProviderMgr

DirectDealProvider *storageprovider.DirectDealProvider

Expand Down Expand Up @@ -1392,3 +1396,119 @@ func (m *MarketNodeImpl) UpdateDirectDealState(ctx context.Context, id uuid.UUID

return m.Repo.DirectDealRepo().SaveDeal(ctx, deal)
}

func (m *MarketNodeImpl) IndexerAnnounceAllDeals(ctx context.Context, minerAddr address.Address) error {
return m.IndexProviderMgr.IndexAnnounceAllDeals(ctx, minerAddr)
}

func (m *MarketNodeImpl) getDeal(ctx context.Context, contextID []byte) (any, bool, error) {
propCID, err := cid.Cast(contextID)
if err == nil {
deal, err := m.Repo.StorageDealRepo().GetDeal(ctx, propCID)
if err != nil {
return address.Address{}, false, err
}
return deal, false, nil
}
dealUUID, err := uuid.FromBytes(contextID)
if err != nil {
return address.Address{}, false, err
}

directDeal, err := m.Repo.DirectDealRepo().GetDeal(ctx, dealUUID)
if err == nil {
return directDeal, true, nil
}

deal, err := m.Repo.StorageDealRepo().GetDealByUUID(ctx, dealUUID)
if err != nil {
return address.Address{}, false, err
}

return deal, false, nil
}

func (m *MarketNodeImpl) IndexerListMultihashes(ctx context.Context, contextID []byte) ([]multihash.Multihash, error) {
deal, isDDO, err := m.getDeal(ctx, contextID)
if err != nil {
return nil, err
}
var miner address.Address
if isDDO {
miner = deal.(*types.DirectDeal).Provider
} else {
miner = deal.(*types.MinerDeal).Proposal.Provider
}

it, err := m.IndexProviderMgr.MultihashLister(ctx, miner, "", contextID)
if err != nil {
return nil, err
}

var mhs []multihash.Multihash
mh, err := it.Next()
for {
if err != nil {
if errors.Is(err, io.EOF) {
return mhs, nil
}
return nil, err
}
mhs = append(mhs, mh)

mh, err = it.Next()
}
}

func (m *MarketNodeImpl) IndexerAnnounceLatest(ctx context.Context) (cid.Cid, error) {
var c cid.Cid
var err error
for _, miner := range m.Config.Miners {
c, err = m.IndexProviderMgr.IndexerAnnounceLatest(ctx, address.Address(miner.Addr))
if err != nil {
return c, err
}
}

return c, nil
}

func (m *MarketNodeImpl) IndexerAnnounceLatestHttp(ctx context.Context, urls []string) (cid.Cid, error) {
var c cid.Cid
var err error
for _, miner := range m.Config.Miners {
c, err = m.IndexProviderMgr.IndexerAnnounceLatestHttp(ctx, address.Address(miner.Addr), urls)
if err != nil {
return c, err
}
}

return c, nil
}

func (m *MarketNodeImpl) IndexerAnnounceDealRemoved(ctx context.Context, contextID []byte) (cid.Cid, error) {
deal, isDDO, err := m.getDeal(ctx, contextID)
if err != nil {
return cid.Undef, err
}
var miner address.Address
if isDDO {
miner = deal.(*types.DirectDeal).Provider
} else {
miner = deal.(*types.MinerDeal).Proposal.Provider
}

return m.IndexProviderMgr.AnnounceDealRemoved(ctx, miner, contextID)
}

func (m *MarketNodeImpl) IndexerAnnounceDeal(ctx context.Context, contextID []byte) (cid.Cid, error) {
deal, isDDO, err := m.getDeal(ctx, contextID)
if err != nil {
return cid.Undef, err
}
if isDDO {
return m.IndexProviderMgr.AnnounceDirectDeal(ctx, deal.(*types.DirectDeal))
}

return m.IndexProviderMgr.AnnounceDeal(ctx, deal.(*types.MinerDeal))
}
Loading

0 comments on commit aabfd6d

Please sign in to comment.