Merge pull request #826 from filecoin-project/feat/retrieval-market-skeleton
Retrieval Market Cleanup W/ Node Interfaces
This commit is contained in:
commit
f0f8d83980
@ -23,7 +23,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/wallet"
|
"github.com/filecoin-project/lotus/chain/wallet"
|
||||||
"github.com/filecoin-project/lotus/node/impl/full"
|
"github.com/filecoin-project/lotus/node/impl/full"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
|
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
|
||||||
"github.com/filecoin-project/lotus/retrieval/discovery"
|
"github.com/filecoin-project/lotus/retrieval/discovery"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -252,7 +252,7 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro
|
|||||||
|
|
||||||
c.incoming <- deal
|
c.incoming <- deal
|
||||||
|
|
||||||
return deal.ProposalCid, c.discovery.AddPeer(p.Data, discovery.RetrievalPeer{
|
return deal.ProposalCid, c.discovery.AddPeer(p.Data, retrievalmarket.RetrievalPeer{
|
||||||
Address: dealProposal.Provider,
|
Address: dealProposal.Provider,
|
||||||
ID: deal.Miner,
|
ID: deal.Miner,
|
||||||
})
|
})
|
||||||
|
16
gen/main.go
16
gen/main.go
@ -12,7 +12,6 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/deals"
|
"github.com/filecoin-project/lotus/chain/deals"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/paych"
|
"github.com/filecoin-project/lotus/paych"
|
||||||
"github.com/filecoin-project/lotus/retrieval"
|
|
||||||
"github.com/filecoin-project/lotus/storage"
|
"github.com/filecoin-project/lotus/storage"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -59,21 +58,6 @@ func main() {
|
|||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = gen.WriteTupleEncodersToFile("./retrieval/cbor_gen.go", "retrieval",
|
|
||||||
retrieval.RetParams{},
|
|
||||||
|
|
||||||
retrieval.Query{},
|
|
||||||
retrieval.QueryResponse{},
|
|
||||||
retrieval.Unixfs0Offer{},
|
|
||||||
retrieval.DealProposal{},
|
|
||||||
retrieval.DealResponse{},
|
|
||||||
retrieval.Block{},
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println(err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = gen.WriteTupleEncodersToFile("./chain/blocksync/cbor_gen.go", "blocksync",
|
err = gen.WriteTupleEncodersToFile("./chain/blocksync/cbor_gen.go", "blocksync",
|
||||||
blocksync.BlockSyncRequest{},
|
blocksync.BlockSyncRequest{},
|
||||||
blocksync.BlockSyncResponse{},
|
blocksync.BlockSyncResponse{},
|
||||||
|
@ -43,7 +43,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/node/repo"
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
"github.com/filecoin-project/lotus/paych"
|
"github.com/filecoin-project/lotus/paych"
|
||||||
"github.com/filecoin-project/lotus/peermgr"
|
"github.com/filecoin-project/lotus/peermgr"
|
||||||
"github.com/filecoin-project/lotus/retrieval"
|
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
|
||||||
"github.com/filecoin-project/lotus/retrieval/discovery"
|
"github.com/filecoin-project/lotus/retrieval/discovery"
|
||||||
"github.com/filecoin-project/lotus/storage"
|
"github.com/filecoin-project/lotus/storage"
|
||||||
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||||
@ -221,9 +221,9 @@ func Online() Option {
|
|||||||
Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks),
|
Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks),
|
||||||
|
|
||||||
Override(new(*discovery.Local), discovery.NewLocal),
|
Override(new(*discovery.Local), discovery.NewLocal),
|
||||||
Override(new(discovery.PeerResolver), modules.RetrievalResolver),
|
Override(new(retrievalmarket.PeerResolver), modules.RetrievalResolver),
|
||||||
|
|
||||||
Override(new(*retrieval.Client), retrieval.NewClient),
|
Override(new(retrievalmarket.RetrievalClient), modules.RetrievalClient),
|
||||||
Override(new(dtypes.ClientDealStore), modules.NewClientDealStore),
|
Override(new(dtypes.ClientDealStore), modules.NewClientDealStore),
|
||||||
Override(new(dtypes.ClientDataTransfer), modules.NewClientDAGServiceDataTransfer),
|
Override(new(dtypes.ClientDataTransfer), modules.NewClientDAGServiceDataTransfer),
|
||||||
Override(new(*deals.ClientRequestValidator), deals.NewClientRequestValidator),
|
Override(new(*deals.ClientRequestValidator), deals.NewClientRequestValidator),
|
||||||
@ -246,7 +246,7 @@ func Online() Option {
|
|||||||
Override(new(dtypes.StagingBlockstore), modules.StagingBlockstore),
|
Override(new(dtypes.StagingBlockstore), modules.StagingBlockstore),
|
||||||
Override(new(dtypes.StagingDAG), modules.StagingDAG),
|
Override(new(dtypes.StagingDAG), modules.StagingDAG),
|
||||||
Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync),
|
Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync),
|
||||||
Override(new(*retrieval.Miner), retrieval.NewMiner),
|
Override(new(retrievalmarket.RetrievalProvider), modules.RetrievalProvider),
|
||||||
Override(new(dtypes.ProviderDealStore), modules.NewProviderDealStore),
|
Override(new(dtypes.ProviderDealStore), modules.NewProviderDealStore),
|
||||||
Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDAGServiceDataTransfer),
|
Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDAGServiceDataTransfer),
|
||||||
Override(new(*deals.ProviderRequestValidator), deals.NewProviderRequestValidator),
|
Override(new(*deals.ProviderRequestValidator), deals.NewProviderRequestValidator),
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package client
|
package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
@ -17,6 +18,7 @@ import (
|
|||||||
files "github.com/ipfs/go-ipfs-files"
|
files "github.com/ipfs/go-ipfs-files"
|
||||||
ipld "github.com/ipfs/go-ipld-format"
|
ipld "github.com/ipfs/go-ipld-format"
|
||||||
"github.com/ipfs/go-merkledag"
|
"github.com/ipfs/go-merkledag"
|
||||||
|
unixfile "github.com/ipfs/go-unixfs/file"
|
||||||
"github.com/ipfs/go-unixfs/importer/balanced"
|
"github.com/ipfs/go-unixfs/importer/balanced"
|
||||||
ihelper "github.com/ipfs/go-unixfs/importer/helpers"
|
ihelper "github.com/ipfs/go-unixfs/importer/helpers"
|
||||||
"github.com/libp2p/go-libp2p-core/peer"
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
@ -31,8 +33,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/node/impl/full"
|
"github.com/filecoin-project/lotus/node/impl/full"
|
||||||
"github.com/filecoin-project/lotus/node/impl/paych"
|
"github.com/filecoin-project/lotus/node/impl/paych"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
"github.com/filecoin-project/lotus/retrieval"
|
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
|
||||||
"github.com/filecoin-project/lotus/retrieval/discovery"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type API struct {
|
type API struct {
|
||||||
@ -44,8 +45,8 @@ type API struct {
|
|||||||
paych.PaychAPI
|
paych.PaychAPI
|
||||||
|
|
||||||
DealClient *deals.Client
|
DealClient *deals.Client
|
||||||
RetDiscovery discovery.PeerResolver
|
RetDiscovery retrievalmarket.PeerResolver
|
||||||
Retrieval *retrieval.Client
|
Retrieval retrievalmarket.RetrievalClient
|
||||||
Chain *store.ChainStore
|
Chain *store.ChainStore
|
||||||
|
|
||||||
LocalDAG dtypes.ClientDAG
|
LocalDAG dtypes.ClientDAG
|
||||||
@ -153,7 +154,18 @@ func (a *API) ClientFindData(ctx context.Context, root cid.Cid) ([]api.QueryOffe
|
|||||||
|
|
||||||
out := make([]api.QueryOffer, len(peers))
|
out := make([]api.QueryOffer, len(peers))
|
||||||
for k, p := range peers {
|
for k, p := range peers {
|
||||||
out[k] = a.Retrieval.Query(ctx, p, root)
|
queryResponse, err := a.Retrieval.Query(ctx, p, root.Bytes(), retrievalmarket.QueryParams{})
|
||||||
|
if err != nil {
|
||||||
|
out[k] = api.QueryOffer{Err: err.Error(), Miner: p.Address, MinerPeerID: p.ID}
|
||||||
|
} else {
|
||||||
|
out[k] = api.QueryOffer{
|
||||||
|
Root: root,
|
||||||
|
Size: queryResponse.Size,
|
||||||
|
MinPrice: queryResponse.PieceRetrievalPrice(),
|
||||||
|
Miner: p.Address, // TODO: check
|
||||||
|
MinerPeerID: p.ID,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return out, nil
|
return out, nil
|
||||||
@ -263,18 +275,43 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, path
|
|||||||
order.MinerPeerID = pid
|
order.MinerPeerID = pid
|
||||||
}
|
}
|
||||||
|
|
||||||
outFile, err := os.OpenFile(path, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0777)
|
retrievalResult := make(chan error, 1)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = a.Retrieval.RetrieveUnixfs(ctx, order.Root, order.Size, order.Total, order.MinerPeerID, order.Client, order.Miner, outFile)
|
unsubscribe := a.Retrieval.SubscribeToEvents(func(event retrievalmarket.ClientEvent, state retrievalmarket.ClientDealState) {
|
||||||
|
if bytes.Equal(state.PieceCID, order.Root.Bytes()) {
|
||||||
|
switch event {
|
||||||
|
case retrievalmarket.ClientEventError:
|
||||||
|
retrievalResult <- xerrors.New("Retrieval Error")
|
||||||
|
case retrievalmarket.ClientEventComplete:
|
||||||
|
retrievalResult <- nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
a.Retrieval.Retrieve(
|
||||||
|
ctx, order.Root.Bytes(), retrievalmarket.Params{
|
||||||
|
PricePerByte: types.BigDiv(order.Total, types.NewInt(order.Size)),
|
||||||
|
}, order.Total, order.MinerPeerID, order.Client, order.Miner)
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return xerrors.New("Retrieval Timed Out")
|
||||||
|
case err := <-retrievalResult:
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = outFile.Close()
|
|
||||||
return xerrors.Errorf("RetrieveUnixfs: %w", err)
|
return xerrors.Errorf("RetrieveUnixfs: %w", err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return outFile.Close()
|
unsubscribe()
|
||||||
|
|
||||||
|
nd, err := a.LocalDAG.Get(ctx, order.Root)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("ClientRetrieve: %w", err)
|
||||||
|
}
|
||||||
|
file, err := unixfile.NewUnixfsFile(ctx, a.LocalDAG, nd)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("ClientRetrieve: %w", err)
|
||||||
|
}
|
||||||
|
return files.WriteTo(file, path)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *API) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*types.SignedStorageAsk, error) {
|
func (a *API) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*types.SignedStorageAsk, error) {
|
||||||
|
@ -2,11 +2,15 @@ package modules
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/filecoin-project/lotus/retrievaladapter"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-statestore"
|
"github.com/filecoin-project/go-statestore"
|
||||||
"github.com/filecoin-project/lotus/node/modules/helpers"
|
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||||||
|
"github.com/filecoin-project/lotus/paych"
|
||||||
|
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
|
||||||
|
retrievalimpl "github.com/filecoin-project/lotus/retrieval/impl"
|
||||||
"github.com/ipfs/go-bitswap"
|
"github.com/ipfs/go-bitswap"
|
||||||
"github.com/ipfs/go-bitswap/network"
|
"github.com/ipfs/go-bitswap/network"
|
||||||
graphsync "github.com/ipfs/go-graphsync/impl"
|
graphsync "github.com/ipfs/go-graphsync/impl"
|
||||||
@ -26,6 +30,7 @@ import (
|
|||||||
|
|
||||||
"github.com/filecoin-project/go-data-transfer/impl/graphsync"
|
"github.com/filecoin-project/go-data-transfer/impl/graphsync"
|
||||||
"github.com/filecoin-project/lotus/chain/deals"
|
"github.com/filecoin-project/lotus/chain/deals"
|
||||||
|
payapi "github.com/filecoin-project/lotus/node/impl/paych"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
"github.com/filecoin-project/lotus/node/repo"
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
)
|
)
|
||||||
@ -97,3 +102,9 @@ func ClientGraphsync(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.Client
|
|||||||
|
|
||||||
return gs
|
return gs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RetrievalClient creates a new retrieval client attached to the client blockstore
|
||||||
|
func RetrievalClient(h host.Host, bs dtypes.ClientBlockstore, pmgr *paych.Manager, payapi payapi.PaychAPI) retrievalmarket.RetrievalClient {
|
||||||
|
adapter := retrievaladapter.NewRetrievalClientNode(pmgr, payapi)
|
||||||
|
return retrievalimpl.NewClient(h, bs, adapter)
|
||||||
|
}
|
||||||
|
@ -16,6 +16,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/node/hello"
|
"github.com/filecoin-project/lotus/node/hello"
|
||||||
"github.com/filecoin-project/lotus/node/modules/helpers"
|
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||||||
"github.com/filecoin-project/lotus/peermgr"
|
"github.com/filecoin-project/lotus/peermgr"
|
||||||
|
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
|
||||||
"github.com/filecoin-project/lotus/retrieval/discovery"
|
"github.com/filecoin-project/lotus/retrieval/discovery"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -80,6 +81,6 @@ func RunDealClient(mctx helpers.MetricsCtx, lc fx.Lifecycle, c *deals.Client) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func RetrievalResolver(l *discovery.Local) discovery.PeerResolver {
|
func RetrievalResolver(l *discovery.Local) retrievalmarket.PeerResolver {
|
||||||
return discovery.Multi(l)
|
return discovery.Multi(l)
|
||||||
}
|
}
|
||||||
|
@ -35,8 +35,11 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
"github.com/filecoin-project/lotus/node/modules/helpers"
|
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||||||
"github.com/filecoin-project/lotus/node/repo"
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
"github.com/filecoin-project/lotus/retrieval"
|
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
|
||||||
|
retrievalimpl "github.com/filecoin-project/lotus/retrieval/impl"
|
||||||
|
"github.com/filecoin-project/lotus/retrievaladapter"
|
||||||
"github.com/filecoin-project/lotus/storage"
|
"github.com/filecoin-project/lotus/storage"
|
||||||
|
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||||
)
|
)
|
||||||
|
|
||||||
func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) {
|
func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) {
|
||||||
@ -115,11 +118,10 @@ func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h
|
|||||||
return sm, nil
|
return sm, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func HandleRetrieval(host host.Host, lc fx.Lifecycle, m *retrieval.Miner) {
|
func HandleRetrieval(host host.Host, lc fx.Lifecycle, m retrievalmarket.RetrievalProvider) {
|
||||||
lc.Append(fx.Hook{
|
lc.Append(fx.Hook{
|
||||||
OnStart: func(context.Context) error {
|
OnStart: func(context.Context) error {
|
||||||
host.SetStreamHandler(retrieval.QueryProtocolID, m.HandleQueryStream)
|
m.Start(host)
|
||||||
host.SetStreamHandler(retrieval.ProtocolID, m.HandleDealStream)
|
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
@ -261,3 +263,9 @@ func SealTicketGen(api api.FullNode) storage.TicketFn {
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RetrievalProvider creates a new retrieval provider attached to the provider blockstore
|
||||||
|
func RetrievalProvider(sblks *sectorblocks.SectorBlocks, full api.FullNode) retrievalmarket.RetrievalProvider {
|
||||||
|
adapter := retrievaladapter.NewRetrievalProviderNode(full)
|
||||||
|
return retrievalimpl.NewProvider(sblks, adapter)
|
||||||
|
}
|
||||||
|
20
retrieval/cbor-gen/main.go
Normal file
20
retrieval/cbor-gen/main.go
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
retrievalimpl "github.com/filecoin-project/lotus/retrieval/impl"
|
||||||
|
)
|
||||||
|
|
||||||
|
// main func has ONE JOB
|
||||||
|
func main() {
|
||||||
|
fmt.Print("Generating Cbor Marshal/Unmarshal...")
|
||||||
|
|
||||||
|
if err := retrievalimpl.RunCborGen(); err != nil {
|
||||||
|
fmt.Println("Failed: ")
|
||||||
|
fmt.Println(err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
fmt.Println("Done.")
|
||||||
|
}
|
@ -1,272 +0,0 @@
|
|||||||
package retrieval
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"io"
|
|
||||||
|
|
||||||
blocks "github.com/ipfs/go-block-format"
|
|
||||||
"github.com/ipfs/go-cid"
|
|
||||||
logging "github.com/ipfs/go-log/v2"
|
|
||||||
"github.com/libp2p/go-libp2p-core/host"
|
|
||||||
"github.com/libp2p/go-libp2p-core/network"
|
|
||||||
"github.com/libp2p/go-libp2p-core/peer"
|
|
||||||
cbg "github.com/whyrusleeping/cbor-gen"
|
|
||||||
"golang.org/x/xerrors"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
|
||||||
"github.com/filecoin-project/go-cbor-util"
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
|
||||||
"github.com/filecoin-project/lotus/build"
|
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
|
||||||
payapi "github.com/filecoin-project/lotus/node/impl/paych"
|
|
||||||
"github.com/filecoin-project/lotus/paych"
|
|
||||||
"github.com/filecoin-project/lotus/retrieval/discovery"
|
|
||||||
)
|
|
||||||
|
|
||||||
var log = logging.Logger("retrieval")
|
|
||||||
|
|
||||||
type Client struct {
|
|
||||||
h host.Host
|
|
||||||
|
|
||||||
pmgr *paych.Manager
|
|
||||||
payapi payapi.PaychAPI
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewClient(h host.Host, pmgr *paych.Manager, payapi payapi.PaychAPI) *Client {
|
|
||||||
return &Client{h: h, pmgr: pmgr, payapi: payapi}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) Query(ctx context.Context, p discovery.RetrievalPeer, data cid.Cid) api.QueryOffer {
|
|
||||||
s, err := c.h.NewStream(ctx, p.ID, QueryProtocolID)
|
|
||||||
if err != nil {
|
|
||||||
log.Warn(err)
|
|
||||||
return api.QueryOffer{Err: err.Error(), Miner: p.Address, MinerPeerID: p.ID}
|
|
||||||
}
|
|
||||||
defer s.Close()
|
|
||||||
|
|
||||||
err = cborutil.WriteCborRPC(s, &Query{
|
|
||||||
Piece: data,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
log.Warn(err)
|
|
||||||
return api.QueryOffer{Err: err.Error(), Miner: p.Address, MinerPeerID: p.ID}
|
|
||||||
}
|
|
||||||
|
|
||||||
var resp QueryResponse
|
|
||||||
if err := resp.UnmarshalCBOR(s); err != nil {
|
|
||||||
log.Warn(err)
|
|
||||||
return api.QueryOffer{Err: err.Error(), Miner: p.Address, MinerPeerID: p.ID}
|
|
||||||
}
|
|
||||||
|
|
||||||
return api.QueryOffer{
|
|
||||||
Root: data,
|
|
||||||
Size: resp.Size,
|
|
||||||
MinPrice: resp.MinPrice,
|
|
||||||
Miner: p.Address, // TODO: check
|
|
||||||
MinerPeerID: p.ID,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type clientStream struct {
|
|
||||||
payapi payapi.PaychAPI
|
|
||||||
stream network.Stream
|
|
||||||
peeker cbg.BytePeeker
|
|
||||||
|
|
||||||
root cid.Cid
|
|
||||||
size types.BigInt
|
|
||||||
offset uint64
|
|
||||||
|
|
||||||
paych address.Address
|
|
||||||
lane uint64
|
|
||||||
total types.BigInt
|
|
||||||
transferred types.BigInt
|
|
||||||
|
|
||||||
windowSize uint64 // how much we "trust" the peer
|
|
||||||
verifier BlockVerifier
|
|
||||||
}
|
|
||||||
|
|
||||||
// C > S
|
|
||||||
//
|
|
||||||
// Offset MUST be aligned on chunking boundaries, size is rounded up to leaf size
|
|
||||||
//
|
|
||||||
// > DealProposal{Mode: Unixfs0, RootCid, Offset, Size, Payment(nil if free)}
|
|
||||||
// < Resp{Accept}
|
|
||||||
// < ..(Intermediate Block)
|
|
||||||
// < ..Blocks
|
|
||||||
// < ..(Intermediate Block)
|
|
||||||
// < ..Blocks
|
|
||||||
// > DealProposal(...)
|
|
||||||
// < ...
|
|
||||||
func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, total types.BigInt, miner peer.ID, client, minerAddr address.Address, out io.Writer) error {
|
|
||||||
s, err := c.h.NewStream(ctx, miner, ProtocolID)
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("failed to open stream to miner for retrieval query: %w", err)
|
|
||||||
}
|
|
||||||
defer s.Close()
|
|
||||||
|
|
||||||
initialOffset := uint64(0) // TODO: Check how much data we have locally
|
|
||||||
// TODO: Support in handler
|
|
||||||
// TODO: Allow client to specify this
|
|
||||||
|
|
||||||
paych, _, err := c.pmgr.GetPaych(ctx, client, minerAddr, total)
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("getting payment channel: %w", err)
|
|
||||||
}
|
|
||||||
lane, err := c.pmgr.AllocateLane(paych)
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("allocating payment lane: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
cst := clientStream{
|
|
||||||
payapi: c.payapi,
|
|
||||||
stream: s,
|
|
||||||
peeker: cbg.GetPeeker(s),
|
|
||||||
|
|
||||||
root: root,
|
|
||||||
size: types.NewInt(size),
|
|
||||||
offset: initialOffset,
|
|
||||||
|
|
||||||
paych: paych,
|
|
||||||
lane: lane,
|
|
||||||
total: total,
|
|
||||||
transferred: types.NewInt(0),
|
|
||||||
|
|
||||||
windowSize: build.UnixfsChunkSize,
|
|
||||||
verifier: &UnixFs0Verifier{Root: root},
|
|
||||||
}
|
|
||||||
|
|
||||||
for cst.offset != size+initialOffset {
|
|
||||||
toFetch := cst.windowSize
|
|
||||||
if toFetch+cst.offset > size {
|
|
||||||
toFetch = size - cst.offset
|
|
||||||
}
|
|
||||||
log.Infof("Retrieve %dB @%d", toFetch, cst.offset)
|
|
||||||
|
|
||||||
err := cst.doOneExchange(ctx, toFetch, out)
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("retrieval exchange: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
cst.offset += toFetch
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cst *clientStream) doOneExchange(ctx context.Context, toFetch uint64, out io.Writer) error {
|
|
||||||
payAmount := types.BigDiv(types.BigMul(cst.total, types.NewInt(toFetch)), cst.size)
|
|
||||||
|
|
||||||
payment, err := cst.setupPayment(ctx, payAmount)
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("setting up retrieval payment: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
deal := &DealProposal{
|
|
||||||
Payment: payment,
|
|
||||||
Ref: cst.root,
|
|
||||||
Params: RetParams{
|
|
||||||
Unixfs0: &Unixfs0Offer{
|
|
||||||
Offset: cst.offset,
|
|
||||||
Size: toFetch,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := cborutil.WriteCborRPC(cst.stream, deal); err != nil {
|
|
||||||
return xerrors.Errorf("sending incremental retrieval request: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var resp DealResponse
|
|
||||||
if err := cborutil.ReadCborRPC(cst.peeker, &resp); err != nil {
|
|
||||||
return xerrors.Errorf("reading retrieval response: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if resp.Status != Accepted {
|
|
||||||
cst.windowSize = build.UnixfsChunkSize
|
|
||||||
// TODO: apply some 'penalty' to miner 'reputation' (needs to be the same in both cases)
|
|
||||||
|
|
||||||
if resp.Status == Error {
|
|
||||||
return xerrors.Errorf("storage deal error: %s", resp.Message)
|
|
||||||
}
|
|
||||||
if resp.Status == Rejected {
|
|
||||||
return xerrors.Errorf("storage deal rejected: %s", resp.Message)
|
|
||||||
}
|
|
||||||
return xerrors.New("storage deal response had no Accepted section")
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Info("Retrieval accepted, fetching blocks")
|
|
||||||
|
|
||||||
return cst.fetchBlocks(toFetch, out)
|
|
||||||
|
|
||||||
// TODO: maybe increase miner window size after success
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cst *clientStream) fetchBlocks(toFetch uint64, out io.Writer) error {
|
|
||||||
blocksToFetch := (toFetch + build.UnixfsChunkSize - 1) / build.UnixfsChunkSize
|
|
||||||
|
|
||||||
for i := uint64(0); i < blocksToFetch; {
|
|
||||||
log.Infof("block %d of %d", i+1, blocksToFetch)
|
|
||||||
|
|
||||||
var block Block
|
|
||||||
if err := cborutil.ReadCborRPC(cst.peeker, &block); err != nil {
|
|
||||||
return xerrors.Errorf("reading fetchBlock response: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
dataBlocks, err := cst.consumeBlockMessage(block, out)
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("consuming retrieved blocks: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
i += dataBlocks
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cst *clientStream) consumeBlockMessage(block Block, out io.Writer) (uint64, error) {
|
|
||||||
prefix, err := cid.PrefixFromBytes(block.Prefix)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
cid, err := prefix.Sum(block.Data)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
blk, err := blocks.NewBlockWithCid(block.Data, cid)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
internal, err := cst.verifier.Verify(context.TODO(), blk, out)
|
|
||||||
if err != nil {
|
|
||||||
log.Warnf("block verify failed: %s", err)
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Smarter out, maybe add to filestore automagically
|
|
||||||
// (Also, persist intermediate nodes)
|
|
||||||
|
|
||||||
if internal {
|
|
||||||
return 0, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return 1, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cst *clientStream) setupPayment(ctx context.Context, toSend types.BigInt) (api.PaymentInfo, error) {
|
|
||||||
amount := types.BigAdd(cst.transferred, toSend)
|
|
||||||
|
|
||||||
sv, err := cst.payapi.PaychVoucherCreate(ctx, cst.paych, amount, cst.lane)
|
|
||||||
if err != nil {
|
|
||||||
return api.PaymentInfo{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
cst.transferred = amount
|
|
||||||
|
|
||||||
return api.PaymentInfo{
|
|
||||||
Channel: cst.paych,
|
|
||||||
ChannelMessage: nil,
|
|
||||||
Vouchers: []*types.SignedVoucher{sv},
|
|
||||||
}, nil
|
|
||||||
}
|
|
@ -1,25 +1,15 @@
|
|||||||
package discovery
|
package discovery
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/filecoin-project/go-address"
|
|
||||||
"github.com/ipfs/go-cid"
|
|
||||||
cbor "github.com/ipfs/go-ipld-cbor"
|
cbor "github.com/ipfs/go-ipld-cbor"
|
||||||
"github.com/libp2p/go-libp2p-core/peer"
|
|
||||||
|
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
cbor.RegisterCborType(RetrievalPeer{})
|
cbor.RegisterCborType(retrievalmarket.RetrievalPeer{})
|
||||||
}
|
}
|
||||||
|
|
||||||
type RetrievalPeer struct {
|
func Multi(r retrievalmarket.PeerResolver) retrievalmarket.PeerResolver { // TODO: actually support multiple mechanisms
|
||||||
Address address.Address
|
|
||||||
ID peer.ID // optional
|
|
||||||
}
|
|
||||||
|
|
||||||
type PeerResolver interface {
|
|
||||||
GetPeers(data cid.Cid) ([]RetrievalPeer, error) // TODO: channel
|
|
||||||
}
|
|
||||||
|
|
||||||
func Multi(r PeerResolver) PeerResolver { // TODO: actually support multiple mechanisms
|
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
@ -9,6 +9,7 @@ import (
|
|||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
|
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
|
||||||
)
|
)
|
||||||
|
|
||||||
var log = logging.Logger("ret-discovery")
|
var log = logging.Logger("ret-discovery")
|
||||||
@ -21,7 +22,7 @@ func NewLocal(ds dtypes.MetadataDS) *Local {
|
|||||||
return &Local{ds: namespace.Wrap(ds, datastore.NewKey("/deals/local"))}
|
return &Local{ds: namespace.Wrap(ds, datastore.NewKey("/deals/local"))}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Local) AddPeer(cid cid.Cid, peer RetrievalPeer) error {
|
func (l *Local) AddPeer(cid cid.Cid, peer retrievalmarket.RetrievalPeer) error {
|
||||||
// TODO: allow multiple peers here
|
// TODO: allow multiple peers here
|
||||||
// (implement an util for tracking map[thing][]otherThing, use in sectorBlockstore too)
|
// (implement an util for tracking map[thing][]otherThing, use in sectorBlockstore too)
|
||||||
|
|
||||||
@ -35,19 +36,19 @@ func (l *Local) AddPeer(cid cid.Cid, peer RetrievalPeer) error {
|
|||||||
return l.ds.Put(dshelp.CidToDsKey(cid), entry)
|
return l.ds.Put(dshelp.CidToDsKey(cid), entry)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Local) GetPeers(data cid.Cid) ([]RetrievalPeer, error) {
|
func (l *Local) GetPeers(data cid.Cid) ([]retrievalmarket.RetrievalPeer, error) {
|
||||||
entry, err := l.ds.Get(dshelp.CidToDsKey(data))
|
entry, err := l.ds.Get(dshelp.CidToDsKey(data))
|
||||||
if err == datastore.ErrNotFound {
|
if err == datastore.ErrNotFound {
|
||||||
return []RetrievalPeer{}, nil
|
return []retrievalmarket.RetrievalPeer{}, nil
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
var peer RetrievalPeer
|
var peer retrievalmarket.RetrievalPeer
|
||||||
if err := cbor.DecodeInto(entry, &peer); err != nil {
|
if err := cbor.DecodeInto(entry, &peer); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return []RetrievalPeer{peer}, nil
|
return []retrievalmarket.RetrievalPeer{peer}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ PeerResolver = &Local{}
|
var _ retrievalmarket.PeerResolver = &Local{}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
package retrieval
|
package retrievalimpl
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
@ -67,7 +67,7 @@ func (t *RetParams) UnmarshalCBOR(r io.Reader) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Query) MarshalCBOR(w io.Writer) error {
|
func (t *OldQuery) MarshalCBOR(w io.Writer) error {
|
||||||
if t == nil {
|
if t == nil {
|
||||||
_, err := w.Write(cbg.CborNull)
|
_, err := w.Write(cbg.CborNull)
|
||||||
return err
|
return err
|
||||||
@ -85,7 +85,7 @@ func (t *Query) MarshalCBOR(w io.Writer) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Query) UnmarshalCBOR(r io.Reader) error {
|
func (t *OldQuery) UnmarshalCBOR(r io.Reader) error {
|
||||||
br := cbg.GetPeeker(r)
|
br := cbg.GetPeeker(r)
|
||||||
|
|
||||||
maj, extra, err := cbg.CborReadHeader(br)
|
maj, extra, err := cbg.CborReadHeader(br)
|
||||||
@ -115,7 +115,7 @@ func (t *Query) UnmarshalCBOR(r io.Reader) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *QueryResponse) MarshalCBOR(w io.Writer) error {
|
func (t *OldQueryResponse) MarshalCBOR(w io.Writer) error {
|
||||||
if t == nil {
|
if t == nil {
|
||||||
_, err := w.Write(cbg.CborNull)
|
_, err := w.Write(cbg.CborNull)
|
||||||
return err
|
return err
|
||||||
@ -124,7 +124,7 @@ func (t *QueryResponse) MarshalCBOR(w io.Writer) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// t.Status (retrieval.QueryResponseStatus) (uint64)
|
// t.t.Status (retrieval.OldQueryResponseStatus) (uint64)
|
||||||
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Status))); err != nil {
|
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Status))); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -141,7 +141,7 @@ func (t *QueryResponse) MarshalCBOR(w io.Writer) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *QueryResponse) UnmarshalCBOR(r io.Reader) error {
|
func (t *OldQueryResponse) UnmarshalCBOR(r io.Reader) error {
|
||||||
br := cbg.GetPeeker(r)
|
br := cbg.GetPeeker(r)
|
||||||
|
|
||||||
maj, extra, err := cbg.CborReadHeader(br)
|
maj, extra, err := cbg.CborReadHeader(br)
|
||||||
@ -156,7 +156,7 @@ func (t *QueryResponse) UnmarshalCBOR(r io.Reader) error {
|
|||||||
return fmt.Errorf("cbor input had wrong number of fields")
|
return fmt.Errorf("cbor input had wrong number of fields")
|
||||||
}
|
}
|
||||||
|
|
||||||
// t.Status (retrieval.QueryResponseStatus) (uint64)
|
// t.t.Status (retrieval.OldQueryResponseStatus) (uint64)
|
||||||
|
|
||||||
maj, extra, err = cbg.CborReadHeader(br)
|
maj, extra, err = cbg.CborReadHeader(br)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -165,8 +165,8 @@ func (t *QueryResponse) UnmarshalCBOR(r io.Reader) error {
|
|||||||
if maj != cbg.MajUnsignedInt {
|
if maj != cbg.MajUnsignedInt {
|
||||||
return fmt.Errorf("wrong type for uint64 field")
|
return fmt.Errorf("wrong type for uint64 field")
|
||||||
}
|
}
|
||||||
t.Status = QueryResponseStatus(extra)
|
t.Status = OldQueryResponseStatus(extra)
|
||||||
// t.Size (uint64) (uint64)
|
// t.t.Size (uint64) (uint64)
|
||||||
|
|
||||||
maj, extra, err = cbg.CborReadHeader(br)
|
maj, extra, err = cbg.CborReadHeader(br)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -247,7 +247,7 @@ func (t *Unixfs0Offer) UnmarshalCBOR(r io.Reader) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *DealProposal) MarshalCBOR(w io.Writer) error {
|
func (t *OldDealProposal) MarshalCBOR(w io.Writer) error {
|
||||||
if t == nil {
|
if t == nil {
|
||||||
_, err := w.Write(cbg.CborNull)
|
_, err := w.Write(cbg.CborNull)
|
||||||
return err
|
return err
|
||||||
@ -274,7 +274,7 @@ func (t *DealProposal) MarshalCBOR(w io.Writer) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *DealProposal) UnmarshalCBOR(r io.Reader) error {
|
func (t *OldDealProposal) UnmarshalCBOR(r io.Reader) error {
|
||||||
br := cbg.GetPeeker(r)
|
br := cbg.GetPeeker(r)
|
||||||
|
|
||||||
maj, extra, err := cbg.CborReadHeader(br)
|
maj, extra, err := cbg.CborReadHeader(br)
|
||||||
@ -322,7 +322,7 @@ func (t *DealProposal) UnmarshalCBOR(r io.Reader) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *DealResponse) MarshalCBOR(w io.Writer) error {
|
func (t *OldDealResponse) MarshalCBOR(w io.Writer) error {
|
||||||
if t == nil {
|
if t == nil {
|
||||||
_, err := w.Write(cbg.CborNull)
|
_, err := w.Write(cbg.CborNull)
|
||||||
return err
|
return err
|
||||||
@ -350,7 +350,7 @@ func (t *DealResponse) MarshalCBOR(w io.Writer) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *DealResponse) UnmarshalCBOR(r io.Reader) error {
|
func (t *OldDealResponse) UnmarshalCBOR(r io.Reader) error {
|
||||||
br := cbg.GetPeeker(r)
|
br := cbg.GetPeeker(r)
|
||||||
|
|
||||||
maj, extra, err := cbg.CborReadHeader(br)
|
maj, extra, err := cbg.CborReadHeader(br)
|
389
retrieval/impl/client.go
Normal file
389
retrieval/impl/client.go
Normal file
@ -0,0 +1,389 @@
|
|||||||
|
package retrievalimpl
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"reflect"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
blocks "github.com/ipfs/go-block-format"
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||||
|
logging "github.com/ipfs/go-log/v2"
|
||||||
|
"github.com/libp2p/go-libp2p-core/host"
|
||||||
|
"github.com/libp2p/go-libp2p-core/network"
|
||||||
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
|
cbg "github.com/whyrusleeping/cbor-gen"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
cborutil "github.com/filecoin-project/go-cbor-util"
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
"github.com/filecoin-project/lotus/build"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
|
||||||
|
)
|
||||||
|
|
||||||
|
var log = logging.Logger("retrieval")
|
||||||
|
|
||||||
|
type client struct {
|
||||||
|
h host.Host
|
||||||
|
bs blockstore.Blockstore
|
||||||
|
node retrievalmarket.RetrievalClientNode
|
||||||
|
// The parameters should be replaced by RetrievalClientNode
|
||||||
|
|
||||||
|
nextDealLk sync.Mutex
|
||||||
|
nextDealID retrievalmarket.DealID
|
||||||
|
subscribersLk sync.RWMutex
|
||||||
|
subscribers []retrievalmarket.ClientSubscriber
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewClient creates a new retrieval client
|
||||||
|
func NewClient(h host.Host, bs blockstore.Blockstore, node retrievalmarket.RetrievalClientNode) retrievalmarket.RetrievalClient {
|
||||||
|
return &client{h: h, bs: bs, node: node}
|
||||||
|
}
|
||||||
|
|
||||||
|
// V0
|
||||||
|
|
||||||
|
// TODO: Implement for retrieval provider V0 epic
|
||||||
|
// https://github.com/filecoin-project/go-retrieval-market-project/issues/12
|
||||||
|
func (c *client) FindProviders(pieceCID []byte) []retrievalmarket.RetrievalPeer {
|
||||||
|
panic("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Update to match spec for V0 epic
|
||||||
|
// https://github.com/filecoin-project/go-retrieval-market-project/issues/8
|
||||||
|
func (c *client) Query(ctx context.Context, p retrievalmarket.RetrievalPeer, pieceCID []byte, params retrievalmarket.QueryParams) (retrievalmarket.QueryResponse, error) {
|
||||||
|
cid, err := cid.Cast(pieceCID)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn(err)
|
||||||
|
return retrievalmarket.QueryResponseUndefined, err
|
||||||
|
}
|
||||||
|
|
||||||
|
s, err := c.h.NewStream(ctx, p.ID, retrievalmarket.QueryProtocolID)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn(err)
|
||||||
|
return retrievalmarket.QueryResponseUndefined, err
|
||||||
|
}
|
||||||
|
defer s.Close()
|
||||||
|
|
||||||
|
err = cborutil.WriteCborRPC(s, &OldQuery{
|
||||||
|
Piece: cid,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Warn(err)
|
||||||
|
return retrievalmarket.QueryResponseUndefined, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var oldResp OldQueryResponse
|
||||||
|
if err := oldResp.UnmarshalCBOR(s); err != nil {
|
||||||
|
log.Warn(err)
|
||||||
|
return retrievalmarket.QueryResponseUndefined, err
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := retrievalmarket.QueryResponse{
|
||||||
|
Status: retrievalmarket.QueryResponseStatus(oldResp.Status),
|
||||||
|
Size: oldResp.Size,
|
||||||
|
MinPricePerByte: types.BigDiv(oldResp.MinPrice, types.NewInt(oldResp.Size)),
|
||||||
|
}
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Update to match spec for V0 Epic:
|
||||||
|
// https://github.com/filecoin-project/go-retrieval-market-project/issues/9
|
||||||
|
func (c *client) Retrieve(ctx context.Context, pieceCID []byte, params retrievalmarket.Params, totalFunds types.BigInt, miner peer.ID, clientWallet retrievalmarket.Address, minerWallet retrievalmarket.Address) retrievalmarket.DealID {
|
||||||
|
/* The implementation of this function is just wrapper for the old code which retrieves UnixFS pieces
|
||||||
|
-- it will be replaced when we do the V0 implementation of the module */
|
||||||
|
c.nextDealLk.Lock()
|
||||||
|
c.nextDealID++
|
||||||
|
dealID := c.nextDealID
|
||||||
|
c.nextDealLk.Unlock()
|
||||||
|
|
||||||
|
dealState := retrievalmarket.ClientDealState{
|
||||||
|
DealProposal: retrievalmarket.DealProposal{
|
||||||
|
PieceCID: pieceCID,
|
||||||
|
ID: dealID,
|
||||||
|
Params: params,
|
||||||
|
},
|
||||||
|
Status: retrievalmarket.DealStatusFailed,
|
||||||
|
Sender: miner,
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
evt := retrievalmarket.ClientEventError
|
||||||
|
converted, err := cid.Cast(pieceCID)
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
err = c.retrieveUnixfs(ctx, converted, types.BigDiv(totalFunds, params.PricePerByte).Uint64(), totalFunds, miner, clientWallet, minerWallet)
|
||||||
|
if err == nil {
|
||||||
|
evt = retrievalmarket.ClientEventComplete
|
||||||
|
dealState.Status = retrievalmarket.DealStatusCompleted
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
c.notifySubscribers(evt, dealState)
|
||||||
|
}()
|
||||||
|
|
||||||
|
return dealID
|
||||||
|
}
|
||||||
|
|
||||||
|
// unsubscribeAt returns a function that removes an item from the subscribers list by comparing
|
||||||
|
// their reflect.ValueOf before pulling the item out of the slice. Does not preserve order.
|
||||||
|
// Subsequent, repeated calls to the func with the same Subscriber are a no-op.
|
||||||
|
func (c *client) unsubscribeAt(sub retrievalmarket.ClientSubscriber) retrievalmarket.Unsubscribe {
|
||||||
|
return func() {
|
||||||
|
c.subscribersLk.Lock()
|
||||||
|
defer c.subscribersLk.Unlock()
|
||||||
|
curLen := len(c.subscribers)
|
||||||
|
for i, el := range c.subscribers {
|
||||||
|
if reflect.ValueOf(sub) == reflect.ValueOf(el) {
|
||||||
|
c.subscribers[i] = c.subscribers[curLen-1]
|
||||||
|
c.subscribers = c.subscribers[:curLen-1]
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *client) notifySubscribers(evt retrievalmarket.ClientEvent, ds retrievalmarket.ClientDealState) {
|
||||||
|
c.subscribersLk.RLock()
|
||||||
|
defer c.subscribersLk.RUnlock()
|
||||||
|
for _, cb := range c.subscribers {
|
||||||
|
cb(evt, ds)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *client) SubscribeToEvents(subscriber retrievalmarket.ClientSubscriber) retrievalmarket.Unsubscribe {
|
||||||
|
c.subscribersLk.Lock()
|
||||||
|
c.subscribers = append(c.subscribers, subscriber)
|
||||||
|
c.subscribersLk.Unlock()
|
||||||
|
|
||||||
|
return c.unsubscribeAt(subscriber)
|
||||||
|
}
|
||||||
|
|
||||||
|
// V1
|
||||||
|
func (c *client) AddMoreFunds(id retrievalmarket.DealID, amount types.BigInt) error {
|
||||||
|
panic("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *client) CancelDeal(id retrievalmarket.DealID) error {
|
||||||
|
panic("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *client) RetrievalStatus(id retrievalmarket.DealID) {
|
||||||
|
panic("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *client) ListDeals() map[retrievalmarket.DealID]retrievalmarket.ClientDealState {
|
||||||
|
panic("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
type clientStream struct {
|
||||||
|
node retrievalmarket.RetrievalClientNode
|
||||||
|
stream network.Stream
|
||||||
|
peeker cbg.BytePeeker
|
||||||
|
|
||||||
|
root cid.Cid
|
||||||
|
size types.BigInt
|
||||||
|
offset uint64
|
||||||
|
|
||||||
|
paych retrievalmarket.Address
|
||||||
|
lane uint64
|
||||||
|
total types.BigInt
|
||||||
|
transferred types.BigInt
|
||||||
|
|
||||||
|
windowSize uint64 // how much we "trust" the peer
|
||||||
|
verifier BlockVerifier
|
||||||
|
bs blockstore.Blockstore
|
||||||
|
}
|
||||||
|
|
||||||
|
/* This is the old retrieval code that is NOT spec compliant */
|
||||||
|
|
||||||
|
// C > S
|
||||||
|
//
|
||||||
|
// Offset MUST be aligned on chunking boundaries, size is rounded up to leaf size
|
||||||
|
//
|
||||||
|
// > DealProposal{Mode: Unixfs0, RootCid, Offset, Size, Payment(nil if free)}
|
||||||
|
// < Resp{Accept}
|
||||||
|
// < ..(Intermediate Block)
|
||||||
|
// < ..Blocks
|
||||||
|
// < ..(Intermediate Block)
|
||||||
|
// < ..Blocks
|
||||||
|
// > DealProposal(...)
|
||||||
|
// < ...
|
||||||
|
func (c *client) retrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, total types.BigInt, miner peer.ID, client, minerAddr retrievalmarket.Address) error {
|
||||||
|
s, err := c.h.NewStream(ctx, miner, retrievalmarket.ProtocolID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer s.Close()
|
||||||
|
|
||||||
|
initialOffset := uint64(0) // TODO: Check how much data we have locally
|
||||||
|
// TODO: Support in handler
|
||||||
|
// TODO: Allow client to specify this
|
||||||
|
|
||||||
|
paych, err := c.node.GetOrCreatePaymentChannel(ctx, client, minerAddr, total)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("getting payment channel: %w", err)
|
||||||
|
}
|
||||||
|
lane, err := c.node.AllocateLane(paych)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("allocating payment lane: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cst := clientStream{
|
||||||
|
node: c.node,
|
||||||
|
stream: s,
|
||||||
|
peeker: cbg.GetPeeker(s),
|
||||||
|
|
||||||
|
root: root,
|
||||||
|
size: types.NewInt(size),
|
||||||
|
offset: initialOffset,
|
||||||
|
|
||||||
|
paych: paych,
|
||||||
|
lane: lane,
|
||||||
|
total: total,
|
||||||
|
transferred: types.NewInt(0),
|
||||||
|
|
||||||
|
windowSize: build.UnixfsChunkSize,
|
||||||
|
verifier: &UnixFs0Verifier{Root: root},
|
||||||
|
bs: c.bs,
|
||||||
|
}
|
||||||
|
|
||||||
|
for cst.offset != size+initialOffset {
|
||||||
|
toFetch := cst.windowSize
|
||||||
|
if toFetch+cst.offset > size {
|
||||||
|
toFetch = size - cst.offset
|
||||||
|
}
|
||||||
|
log.Infof("Retrieve %dB @%d", toFetch, cst.offset)
|
||||||
|
|
||||||
|
err := cst.doOneExchange(ctx, toFetch)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("retrieval exchange: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cst.offset += toFetch
|
||||||
|
}
|
||||||
|
log.Info("RETRIEVE SUCCESSFUL")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cst *clientStream) doOneExchange(ctx context.Context, toFetch uint64) error {
|
||||||
|
payAmount := types.BigDiv(types.BigMul(cst.total, types.NewInt(toFetch)), cst.size)
|
||||||
|
|
||||||
|
payment, err := cst.setupPayment(ctx, payAmount)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("setting up retrieval payment: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
deal := &OldDealProposal{
|
||||||
|
Payment: payment,
|
||||||
|
Ref: cst.root,
|
||||||
|
Params: RetParams{
|
||||||
|
Unixfs0: &Unixfs0Offer{
|
||||||
|
Offset: cst.offset,
|
||||||
|
Size: toFetch,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := cborutil.WriteCborRPC(cst.stream, deal); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var resp OldDealResponse
|
||||||
|
if err := cborutil.ReadCborRPC(cst.peeker, &resp); err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.Status != Accepted {
|
||||||
|
cst.windowSize = build.UnixfsChunkSize
|
||||||
|
// TODO: apply some 'penalty' to miner 'reputation' (needs to be the same in both cases)
|
||||||
|
|
||||||
|
if resp.Status == Error {
|
||||||
|
return xerrors.Errorf("storage deal error: %s", resp.Message)
|
||||||
|
}
|
||||||
|
if resp.Status == Rejected {
|
||||||
|
return xerrors.Errorf("storage deal rejected: %s", resp.Message)
|
||||||
|
}
|
||||||
|
return xerrors.New("storage deal response had no Accepted section")
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("Retrieval accepted, fetching blocks")
|
||||||
|
|
||||||
|
return cst.fetchBlocks(toFetch)
|
||||||
|
|
||||||
|
// TODO: maybe increase miner window size after success
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cst *clientStream) fetchBlocks(toFetch uint64) error {
|
||||||
|
blocksToFetch := (toFetch + build.UnixfsChunkSize - 1) / build.UnixfsChunkSize
|
||||||
|
|
||||||
|
for i := uint64(0); i < blocksToFetch; {
|
||||||
|
log.Infof("block %d of %d", i+1, blocksToFetch)
|
||||||
|
|
||||||
|
var block Block
|
||||||
|
if err := cborutil.ReadCborRPC(cst.peeker, &block); err != nil {
|
||||||
|
return xerrors.Errorf("reading fetchBlock response: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
dataBlocks, err := cst.consumeBlockMessage(block)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("consuming retrieved blocks: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
i += dataBlocks
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cst *clientStream) consumeBlockMessage(block Block) (uint64, error) {
|
||||||
|
prefix, err := cid.PrefixFromBytes(block.Prefix)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
cid, err := prefix.Sum(block.Data)
|
||||||
|
|
||||||
|
blk, err := blocks.NewBlockWithCid(block.Data, cid)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
internal, err := cst.verifier.Verify(context.TODO(), blk)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("block verify failed: %s", err)
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Smarter out, maybe add to filestore automagically
|
||||||
|
// (Also, persist intermediate nodes)
|
||||||
|
err = cst.bs.Put(blk)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("block write failed: %s", err)
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if internal {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return 1, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cst *clientStream) setupPayment(ctx context.Context, toSend types.BigInt) (api.PaymentInfo, error) {
|
||||||
|
amount := types.BigAdd(cst.transferred, toSend)
|
||||||
|
|
||||||
|
sv, err := cst.node.CreatePaymentVoucher(ctx, cst.paych, amount, cst.lane)
|
||||||
|
if err != nil {
|
||||||
|
return api.PaymentInfo{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
cst.transferred = amount
|
||||||
|
|
||||||
|
return api.PaymentInfo{
|
||||||
|
Channel: cst.paych,
|
||||||
|
ChannelMessage: nil,
|
||||||
|
Vouchers: []*types.SignedVoucher{sv},
|
||||||
|
}, nil
|
||||||
|
}
|
@ -1,76 +1,153 @@
|
|||||||
package retrieval
|
package retrievalimpl
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"io"
|
"io"
|
||||||
|
"reflect"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/ipfs/go-blockservice"
|
"github.com/ipfs/go-blockservice"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
"github.com/ipfs/go-merkledag"
|
"github.com/ipfs/go-merkledag"
|
||||||
unixfile "github.com/ipfs/go-unixfs/file"
|
unixfile "github.com/ipfs/go-unixfs/file"
|
||||||
|
"github.com/libp2p/go-libp2p-core/host"
|
||||||
"github.com/libp2p/go-libp2p-core/network"
|
"github.com/libp2p/go-libp2p-core/network"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
"github.com/filecoin-project/go-cbor-util"
|
cborutil "github.com/filecoin-project/go-cbor-util"
|
||||||
"github.com/filecoin-project/lotus/api"
|
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
|
||||||
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||||
)
|
)
|
||||||
|
|
||||||
type RetrMinerApi interface {
|
// RetrMinerAPI are the node functions needed by a retrieval provider
|
||||||
|
type RetrMinerAPI interface {
|
||||||
PaychVoucherAdd(context.Context, address.Address, *types.SignedVoucher, []byte, types.BigInt) (types.BigInt, error)
|
PaychVoucherAdd(context.Context, address.Address, *types.SignedVoucher, []byte, types.BigInt) (types.BigInt, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type Miner struct {
|
type provider struct {
|
||||||
sectorBlocks *sectorblocks.SectorBlocks
|
|
||||||
full RetrMinerApi
|
|
||||||
|
|
||||||
pricePerByte types.BigInt
|
// TODO: Replace with RetrievalProviderNode & FileStore for https://github.com/filecoin-project/go-retrieval-market-project/issues/9
|
||||||
// TODO: Unseal price
|
sectorBlocks *sectorblocks.SectorBlocks
|
||||||
|
|
||||||
|
// TODO: Replace with RetrievalProviderNode for
|
||||||
|
// https://github.com/filecoin-project/go-retrieval-market-project/issues/4
|
||||||
|
node retrievalmarket.RetrievalProviderNode
|
||||||
|
|
||||||
|
pricePerByte retrievalmarket.BigInt
|
||||||
|
|
||||||
|
subscribersLk sync.RWMutex
|
||||||
|
subscribers []retrievalmarket.ProviderSubscriber
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMiner(sblks *sectorblocks.SectorBlocks, full api.FullNode) *Miner {
|
// NewProvider returns a new retrieval provider
|
||||||
return &Miner{
|
func NewProvider(sblks *sectorblocks.SectorBlocks, node retrievalmarket.RetrievalProviderNode) retrievalmarket.RetrievalProvider {
|
||||||
|
return &provider{
|
||||||
sectorBlocks: sblks,
|
sectorBlocks: sblks,
|
||||||
full: full,
|
node: node,
|
||||||
|
|
||||||
pricePerByte: types.NewInt(2), // TODO: allow setting
|
pricePerByte: types.NewInt(2), // TODO: allow setting
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Start begins listening for deals on the given host
|
||||||
|
func (p *provider) Start(host host.Host) {
|
||||||
|
host.SetStreamHandler(retrievalmarket.QueryProtocolID, p.handleQueryStream)
|
||||||
|
host.SetStreamHandler(retrievalmarket.ProtocolID, p.handleDealStream)
|
||||||
|
}
|
||||||
|
|
||||||
|
// V0
|
||||||
|
// SetPricePerByte sets the price per byte a miner charges for retrievals
|
||||||
|
func (p *provider) SetPricePerByte(price retrievalmarket.BigInt) {
|
||||||
|
p.pricePerByte = price
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetPaymentInterval sets the maximum number of bytes a a provider will send before
|
||||||
|
// requesting further payment, and the rate at which that value increases
|
||||||
|
// TODO: Implement for https://github.com/filecoin-project/go-retrieval-market-project/issues/7
|
||||||
|
func (p *provider) SetPaymentInterval(paymentInterval uint64, paymentIntervalIncrease uint64) {
|
||||||
|
panic("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
// unsubscribeAt returns a function that removes an item from the subscribers list by comparing
|
||||||
|
// their reflect.ValueOf before pulling the item out of the slice. Does not preserve order.
|
||||||
|
// Subsequent, repeated calls to the func with the same Subscriber are a no-op.
|
||||||
|
func (p *provider) unsubscribeAt(sub retrievalmarket.ProviderSubscriber) retrievalmarket.Unsubscribe {
|
||||||
|
return func() {
|
||||||
|
p.subscribersLk.Lock()
|
||||||
|
defer p.subscribersLk.Unlock()
|
||||||
|
curLen := len(p.subscribers)
|
||||||
|
for i, el := range p.subscribers {
|
||||||
|
if reflect.ValueOf(sub) == reflect.ValueOf(el) {
|
||||||
|
p.subscribers[i] = p.subscribers[curLen-1]
|
||||||
|
p.subscribers = p.subscribers[:curLen-1]
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *provider) notifySubscribers(evt retrievalmarket.ProviderEvent, ds retrievalmarket.ProviderDealState) {
|
||||||
|
p.subscribersLk.RLock()
|
||||||
|
defer p.subscribersLk.RUnlock()
|
||||||
|
for _, cb := range p.subscribers {
|
||||||
|
cb(evt, ds)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// SubscribeToEvents listens for events that happen related to client retrievals
|
||||||
|
// TODO: Implement updates as part of https://github.com/filecoin-project/go-retrieval-market-project/issues/7
|
||||||
|
func (p *provider) SubscribeToEvents(subscriber retrievalmarket.ProviderSubscriber) retrievalmarket.Unsubscribe {
|
||||||
|
p.subscribersLk.Lock()
|
||||||
|
p.subscribers = append(p.subscribers, subscriber)
|
||||||
|
p.subscribersLk.Unlock()
|
||||||
|
|
||||||
|
return p.unsubscribeAt(subscriber)
|
||||||
|
}
|
||||||
|
|
||||||
|
// V1
|
||||||
|
func (p *provider) SetPricePerUnseal(price retrievalmarket.BigInt) {
|
||||||
|
panic("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *provider) ListDeals() map[retrievalmarket.ProviderDealID]retrievalmarket.ProviderDealState {
|
||||||
|
panic("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
func writeErr(stream network.Stream, err error) {
|
func writeErr(stream network.Stream, err error) {
|
||||||
log.Errorf("Retrieval deal error: %+v", err)
|
log.Errorf("Retrieval deal error: %+v", err)
|
||||||
_ = cborutil.WriteCborRPC(stream, &DealResponse{
|
_ = cborutil.WriteCborRPC(stream, &OldDealResponse{
|
||||||
Status: Error,
|
Status: Error,
|
||||||
Message: err.Error(),
|
Message: err.Error(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) HandleQueryStream(stream network.Stream) {
|
// TODO: Update for https://github.com/filecoin-project/go-retrieval-market-project/issues/8
|
||||||
|
func (p *provider) handleQueryStream(stream network.Stream) {
|
||||||
defer stream.Close()
|
defer stream.Close()
|
||||||
|
|
||||||
var query Query
|
var query OldQuery
|
||||||
if err := cborutil.ReadCborRPC(stream, &query); err != nil {
|
if err := cborutil.ReadCborRPC(stream, &query); err != nil {
|
||||||
writeErr(stream, err)
|
writeErr(stream, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
size, err := m.sectorBlocks.GetSize(query.Piece)
|
size, err := p.sectorBlocks.GetSize(query.Piece)
|
||||||
if err != nil && err != sectorblocks.ErrNotFound {
|
if err != nil && err != sectorblocks.ErrNotFound {
|
||||||
log.Errorf("Retrieval query: GetRefs: %s", err)
|
log.Errorf("Retrieval query: GetRefs: %s", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
answer := &QueryResponse{
|
answer := &OldQueryResponse{
|
||||||
Status: Unavailable,
|
Status: Unavailable,
|
||||||
}
|
}
|
||||||
if err == nil {
|
if err == nil {
|
||||||
answer.Status = Available
|
answer.Status = Available
|
||||||
|
|
||||||
// TODO: get price, look for already unsealed ref to reduce work
|
// TODO: get price, look for already unsealed ref to reduce work
|
||||||
answer.MinPrice = types.BigMul(types.NewInt(uint64(size)), m.pricePerByte)
|
answer.MinPrice = types.BigMul(types.NewInt(uint64(size)), p.pricePerByte)
|
||||||
answer.Size = uint64(size) // TODO: verify on intermediate
|
answer.Size = uint64(size) // TODO: verify on intermediate
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -81,7 +158,7 @@ func (m *Miner) HandleQueryStream(stream network.Stream) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type handlerDeal struct {
|
type handlerDeal struct {
|
||||||
m *Miner
|
p *provider
|
||||||
stream network.Stream
|
stream network.Stream
|
||||||
|
|
||||||
ufsr sectorblocks.UnixfsReader
|
ufsr sectorblocks.UnixfsReader
|
||||||
@ -90,11 +167,12 @@ type handlerDeal struct {
|
|||||||
size uint64
|
size uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Miner) HandleDealStream(stream network.Stream) {
|
// TODO: Update for https://github.com/filecoin-project/go-retrieval-market-project/issues/7
|
||||||
|
func (p *provider) handleDealStream(stream network.Stream) {
|
||||||
defer stream.Close()
|
defer stream.Close()
|
||||||
|
|
||||||
hnd := &handlerDeal{
|
hnd := &handlerDeal{
|
||||||
m: m,
|
p: p,
|
||||||
|
|
||||||
stream: stream,
|
stream: stream,
|
||||||
}
|
}
|
||||||
@ -113,7 +191,7 @@ func (m *Miner) HandleDealStream(stream network.Stream) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (hnd *handlerDeal) handleNext() (bool, error) {
|
func (hnd *handlerDeal) handleNext() (bool, error) {
|
||||||
var deal DealProposal
|
var deal OldDealProposal
|
||||||
if err := cborutil.ReadCborRPC(hnd.stream, &deal); err != nil {
|
if err := cborutil.ReadCborRPC(hnd.stream, &deal); err != nil {
|
||||||
if err == io.EOF { // client sent all deals
|
if err == io.EOF { // client sent all deals
|
||||||
err = nil
|
err = nil
|
||||||
@ -131,8 +209,8 @@ func (hnd *handlerDeal) handleNext() (bool, error) {
|
|||||||
return false, xerrors.Errorf("expected one signed voucher, got %d", len(deal.Payment.Vouchers))
|
return false, xerrors.Errorf("expected one signed voucher, got %d", len(deal.Payment.Vouchers))
|
||||||
}
|
}
|
||||||
|
|
||||||
expPayment := types.BigMul(hnd.m.pricePerByte, types.NewInt(deal.Params.Unixfs0.Size))
|
expPayment := types.BigMul(hnd.p.pricePerByte, types.NewInt(deal.Params.Unixfs0.Size))
|
||||||
if _, err := hnd.m.full.PaychVoucherAdd(context.TODO(), deal.Payment.Channel, deal.Payment.Vouchers[0], nil, expPayment); err != nil {
|
if _, err := hnd.p.node.SavePaymentVoucher(context.TODO(), deal.Payment.Channel, deal.Payment.Vouchers[0], nil, expPayment); err != nil {
|
||||||
return false, xerrors.Errorf("processing retrieval payment: %w", err)
|
return false, xerrors.Errorf("processing retrieval payment: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -156,7 +234,7 @@ func (hnd *handlerDeal) handleNext() (bool, error) {
|
|||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hnd *handlerDeal) openFile(deal DealProposal) error {
|
func (hnd *handlerDeal) openFile(deal OldDealProposal) error {
|
||||||
unixfs0 := deal.Params.Unixfs0
|
unixfs0 := deal.Params.Unixfs0
|
||||||
|
|
||||||
if unixfs0.Offset != 0 {
|
if unixfs0.Offset != 0 {
|
||||||
@ -165,7 +243,7 @@ func (hnd *handlerDeal) openFile(deal DealProposal) error {
|
|||||||
}
|
}
|
||||||
hnd.at = unixfs0.Offset
|
hnd.at = unixfs0.Offset
|
||||||
|
|
||||||
bstore := hnd.m.sectorBlocks.SealedBlockstore(func() error {
|
bstore := hnd.p.sectorBlocks.SealedBlockstore(func() error {
|
||||||
return nil // TODO: approve unsealing based on amount paid
|
return nil // TODO: approve unsealing based on amount paid
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -197,10 +275,10 @@ func (hnd *handlerDeal) openFile(deal DealProposal) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hnd *handlerDeal) accept(deal DealProposal) error {
|
func (hnd *handlerDeal) accept(deal OldDealProposal) error {
|
||||||
unixfs0 := deal.Params.Unixfs0
|
unixfs0 := deal.Params.Unixfs0
|
||||||
|
|
||||||
resp := &DealResponse{
|
resp := &OldDealResponse{
|
||||||
Status: Accepted,
|
Status: Accepted,
|
||||||
}
|
}
|
||||||
if err := cborutil.WriteCborRPC(hnd.stream, resp); err != nil {
|
if err := cborutil.WriteCborRPC(hnd.stream, resp); err != nil {
|
33
retrieval/impl/run_cbor_gen.go
Normal file
33
retrieval/impl/run_cbor_gen.go
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
package retrievalimpl
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
cborgen "github.com/whyrusleeping/cbor-gen"
|
||||||
|
)
|
||||||
|
|
||||||
|
func RunCborGen() error {
|
||||||
|
genName := "./impl/cbor_gen.go"
|
||||||
|
reName := "./impl/cbor_gen_old.go"
|
||||||
|
if err := os.Rename(genName, reName); err != nil {
|
||||||
|
return fmt.Errorf("could not rename %s to %s", genName, reName)
|
||||||
|
}
|
||||||
|
if err := cborgen.WriteTupleEncodersToFile(
|
||||||
|
genName,
|
||||||
|
"retrievalimpl",
|
||||||
|
RetParams{},
|
||||||
|
OldQuery{},
|
||||||
|
OldQueryResponse{},
|
||||||
|
Unixfs0Offer{},
|
||||||
|
OldDealProposal{},
|
||||||
|
OldDealResponse{},
|
||||||
|
Block{},
|
||||||
|
); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := os.Remove(reName); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
67
retrieval/impl/types.go
Normal file
67
retrieval/impl/types.go
Normal file
@ -0,0 +1,67 @@
|
|||||||
|
package retrievalimpl
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
/* These types are all the types provided by Lotus, which diverge even from
|
||||||
|
spec V0 -- prior to the "update to spec epic", we are using these types internally
|
||||||
|
and switching to spec at the boundaries of the module */
|
||||||
|
|
||||||
|
type OldQueryResponseStatus uint64
|
||||||
|
|
||||||
|
const (
|
||||||
|
Available OldQueryResponseStatus = iota
|
||||||
|
Unavailable
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
Accepted = iota
|
||||||
|
Error
|
||||||
|
Rejected
|
||||||
|
Unsealing
|
||||||
|
)
|
||||||
|
|
||||||
|
type OldQuery struct {
|
||||||
|
Piece cid.Cid
|
||||||
|
// TODO: payment
|
||||||
|
}
|
||||||
|
|
||||||
|
type OldQueryResponse struct {
|
||||||
|
Status OldQueryResponseStatus
|
||||||
|
|
||||||
|
Size uint64 // TODO: spec
|
||||||
|
// TODO: unseal price (+spec)
|
||||||
|
// TODO: sectors to unseal
|
||||||
|
// TODO: address to send money for the deal?
|
||||||
|
MinPrice types.BigInt
|
||||||
|
}
|
||||||
|
|
||||||
|
type Unixfs0Offer struct {
|
||||||
|
Offset uint64
|
||||||
|
Size uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
type RetParams struct {
|
||||||
|
Unixfs0 *Unixfs0Offer
|
||||||
|
}
|
||||||
|
|
||||||
|
type OldDealProposal struct {
|
||||||
|
Payment api.PaymentInfo
|
||||||
|
|
||||||
|
Ref cid.Cid
|
||||||
|
Params RetParams
|
||||||
|
}
|
||||||
|
|
||||||
|
type OldDealResponse struct {
|
||||||
|
Status uint64
|
||||||
|
Message string
|
||||||
|
}
|
||||||
|
|
||||||
|
type Block struct { // TODO: put in spec
|
||||||
|
Prefix []byte // TODO: fix cid.Prefix marshaling somehow
|
||||||
|
Data []byte
|
||||||
|
}
|
@ -1,8 +1,7 @@
|
|||||||
package retrieval
|
package retrievalimpl
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"io"
|
|
||||||
|
|
||||||
blocks "github.com/ipfs/go-block-format"
|
blocks "github.com/ipfs/go-block-format"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
@ -16,13 +15,13 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type BlockVerifier interface {
|
type BlockVerifier interface {
|
||||||
Verify(context.Context, blocks.Block, io.Writer) (internal bool, err error)
|
Verify(context.Context, blocks.Block) (internal bool, err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type OptimisticVerifier struct {
|
type OptimisticVerifier struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (o *OptimisticVerifier) Verify(context.Context, blocks.Block, io.Writer) (bool, error) {
|
func (o *OptimisticVerifier) Verify(context.Context, blocks.Block) (bool, error) {
|
||||||
// It's probably fine
|
// It's probably fine
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
@ -37,11 +36,11 @@ type UnixFs0Verifier struct {
|
|||||||
sub *UnixFs0Verifier
|
sub *UnixFs0Verifier
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *UnixFs0Verifier) verify(ctx context.Context, blk blocks.Block, out io.Writer) (last bool, internal bool, err error) {
|
func (b *UnixFs0Verifier) verify(ctx context.Context, blk blocks.Block) (last bool, internal bool, err error) {
|
||||||
if b.sub != nil {
|
if b.sub != nil {
|
||||||
// TODO: check links here (iff b.sub.sub == nil)
|
// TODO: check links here (iff b.sub.sub == nil)
|
||||||
|
|
||||||
subLast, internal, err := b.sub.verify(ctx, blk, out)
|
subLast, internal, err := b.sub.verify(ctx, blk)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, false, err
|
return false, false, err
|
||||||
}
|
}
|
||||||
@ -57,7 +56,7 @@ func (b *UnixFs0Verifier) verify(ctx context.Context, blk blocks.Block, out io.W
|
|||||||
return false, false, xerrors.New("unixfs verifier: too many nodes in level")
|
return false, false, xerrors.New("unixfs verifier: too many nodes in level")
|
||||||
}
|
}
|
||||||
|
|
||||||
links, err := b.checkInternal(blk, out)
|
links, err := b.checkInternal(blk)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, false, err
|
return false, false, err
|
||||||
}
|
}
|
||||||
@ -85,7 +84,7 @@ func (b *UnixFs0Verifier) verify(ctx context.Context, blk blocks.Block, out io.W
|
|||||||
return b.seen == b.expect, false, nil
|
return b.seen == b.expect, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *UnixFs0Verifier) checkInternal(blk blocks.Block, out io.Writer) (int, error) {
|
func (b *UnixFs0Verifier) checkInternal(blk blocks.Block) (int, error) {
|
||||||
nd, err := ipld.Decode(blk)
|
nd, err := ipld.Decode(blk)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("IPLD Decode failed: %s", err)
|
log.Warnf("IPLD Decode failed: %s", err)
|
||||||
@ -112,21 +111,20 @@ func (b *UnixFs0Verifier) checkInternal(blk blocks.Block, out io.Writer) (int, e
|
|||||||
return len(nd.Links()), nil
|
return len(nd.Links()), nil
|
||||||
|
|
||||||
case *merkledag.RawNode:
|
case *merkledag.RawNode:
|
||||||
_, err := out.Write(nd.RawData())
|
return 0, nil
|
||||||
return 0, err
|
|
||||||
default:
|
default:
|
||||||
return 0, xerrors.New("verifier: unknown node type")
|
return 0, xerrors.New("verifier: unknown node type")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *UnixFs0Verifier) Verify(ctx context.Context, blk blocks.Block, w io.Writer) (bool, error) {
|
func (b *UnixFs0Verifier) Verify(ctx context.Context, blk blocks.Block) (bool, error) {
|
||||||
// root is special
|
// root is special
|
||||||
if b.rootBlk == nil {
|
if b.rootBlk == nil {
|
||||||
if !b.Root.Equals(blk.Cid()) {
|
if !b.Root.Equals(blk.Cid()) {
|
||||||
return false, xerrors.Errorf("unixfs verifier: root block CID didn't match: valid %s, got %s", b.Root, blk.Cid())
|
return false, xerrors.Errorf("unixfs verifier: root block CID didn't match: valid %s, got %s", b.Root, blk.Cid())
|
||||||
}
|
}
|
||||||
b.rootBlk = blk
|
b.rootBlk = blk
|
||||||
links, err := b.checkInternal(blk, w)
|
links, err := b.checkInternal(blk)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
@ -135,7 +133,7 @@ func (b *UnixFs0Verifier) Verify(ctx context.Context, blk blocks.Block, w io.Wri
|
|||||||
return links != 0, nil
|
return links != 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
_, internal, err := b.verify(ctx, blk, w)
|
_, internal, err := b.verify(ctx, blk)
|
||||||
return internal, err
|
return internal, err
|
||||||
}
|
}
|
||||||
|
|
@ -1,66 +1,364 @@
|
|||||||
package retrieval
|
package retrievalmarket
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/filecoin-project/lotus/api"
|
"context"
|
||||||
"github.com/ipfs/go-cid"
|
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-address"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
"github.com/ipld/go-ipld-prime"
|
||||||
|
"github.com/libp2p/go-libp2p-core/host"
|
||||||
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
)
|
)
|
||||||
|
|
||||||
const ProtocolID = "/fil/retrieval/-1.0.0" // TODO: spec
|
// type aliases
|
||||||
const QueryProtocolID = "/fil/retrieval/qry/-1.0.0" // TODO: spec
|
// TODO: Remove and use native types or extract for
|
||||||
|
// https://github.com/filecoin-project/go-retrieval-market-project/issues/5
|
||||||
|
|
||||||
|
// BigInt is used for token amounts in retrieval deals
|
||||||
|
type BigInt = types.BigInt
|
||||||
|
|
||||||
|
// Address is an address in the filecoin network
|
||||||
|
type Address = address.Address
|
||||||
|
|
||||||
|
// SignedVoucher is a signed payment voucher
|
||||||
|
type SignedVoucher = types.SignedVoucher
|
||||||
|
|
||||||
|
// ProtocolID is the protocol for proposing / responding to retrieval deals
|
||||||
|
const ProtocolID = "/fil/retrieval/0.0.1"
|
||||||
|
|
||||||
|
// QueryProtocolID is the protocol for querying infromation about retrieval
|
||||||
|
// deal parameters
|
||||||
|
const QueryProtocolID = "/fil/retrieval/qry/0.0.1" // TODO: spec
|
||||||
|
|
||||||
|
// Unsubscribe is a function that unsubscribes a subscriber for either the
|
||||||
|
// client or the provider
|
||||||
|
type Unsubscribe func()
|
||||||
|
|
||||||
|
// ClientDealState is the current state of a deal from the point of view
|
||||||
|
// of a retrieval client
|
||||||
|
type ClientDealState struct {
|
||||||
|
DealProposal
|
||||||
|
Status DealStatus
|
||||||
|
Sender peer.ID
|
||||||
|
TotalReceived uint64
|
||||||
|
FundsSpent BigInt
|
||||||
|
}
|
||||||
|
|
||||||
|
// ClientEvent is an event that occurs in a deal lifecycle on the client
|
||||||
|
type ClientEvent uint64
|
||||||
|
|
||||||
|
const (
|
||||||
|
// ClientEventOpen indicates a deal was initiated
|
||||||
|
ClientEventOpen ClientEvent = iota
|
||||||
|
|
||||||
|
// ClientEventFundsExpended indicates a deal has run out of funds in the payment channel
|
||||||
|
// forcing the client to add more funds to continue the deal
|
||||||
|
ClientEventFundsExpended // when totalFunds is expended
|
||||||
|
|
||||||
|
// ClientEventProgress indicates more data was received for a retrieval
|
||||||
|
ClientEventProgress
|
||||||
|
|
||||||
|
// ClientEventError indicates an error occurred during a deal
|
||||||
|
ClientEventError
|
||||||
|
|
||||||
|
// ClientEventComplete indicates a deal has completed
|
||||||
|
ClientEventComplete
|
||||||
|
)
|
||||||
|
|
||||||
|
// ClientSubscriber is a callback that is registered to listen for retrieval events
|
||||||
|
type ClientSubscriber func(event ClientEvent, state ClientDealState)
|
||||||
|
|
||||||
|
// RetrievalClient is a client interface for making retrieval deals
|
||||||
|
type RetrievalClient interface {
|
||||||
|
// V0
|
||||||
|
|
||||||
|
// Find Providers finds retrieval providers who may be storing a given piece
|
||||||
|
FindProviders(pieceCID []byte) []RetrievalPeer
|
||||||
|
|
||||||
|
// Query asks a provider for information about a piece it is storing
|
||||||
|
Query(
|
||||||
|
ctx context.Context,
|
||||||
|
p RetrievalPeer,
|
||||||
|
pieceCID []byte,
|
||||||
|
params QueryParams,
|
||||||
|
) (QueryResponse, error)
|
||||||
|
|
||||||
|
// Retrieve retrieves all or part of a piece with the given retrieval parameters
|
||||||
|
Retrieve(
|
||||||
|
ctx context.Context,
|
||||||
|
pieceCID []byte,
|
||||||
|
params Params,
|
||||||
|
totalFunds BigInt,
|
||||||
|
miner peer.ID,
|
||||||
|
clientWallet Address,
|
||||||
|
minerWallet Address,
|
||||||
|
) DealID
|
||||||
|
|
||||||
|
// SubscribeToEvents listens for events that happen related to client retrievals
|
||||||
|
SubscribeToEvents(subscriber ClientSubscriber) Unsubscribe
|
||||||
|
|
||||||
|
// V1
|
||||||
|
AddMoreFunds(id DealID, amount BigInt) error
|
||||||
|
CancelDeal(id DealID) error
|
||||||
|
RetrievalStatus(id DealID)
|
||||||
|
ListDeals() map[DealID]ClientDealState
|
||||||
|
}
|
||||||
|
|
||||||
|
// RetrievalClientNode are the node depedencies for a RetrevalClient
|
||||||
|
type RetrievalClientNode interface {
|
||||||
|
|
||||||
|
// GetOrCreatePaymentChannel sets up a new payment channel if one does not exist
|
||||||
|
// between a client and a miner and insures the client has the given amount of funds available in the channel
|
||||||
|
GetOrCreatePaymentChannel(ctx context.Context, clientAddress Address, minerAddress Address, clientFundsAvailable BigInt) (Address, error)
|
||||||
|
|
||||||
|
// Allocate late creates a lane within a payment channel so that calls to
|
||||||
|
// CreatePaymentVoucher will automatically make vouchers only for the difference
|
||||||
|
// in total
|
||||||
|
AllocateLane(paymentChannel Address) (uint64, error)
|
||||||
|
|
||||||
|
// CreatePaymentVoucher creates a new payment voucher in the given lane for a
|
||||||
|
// given payment channel so that all the payment vouchers in the lane add up
|
||||||
|
// to the given amount (so the payment voucher will be for the difference)
|
||||||
|
CreatePaymentVoucher(ctx context.Context, paymentChannel Address, amount BigInt, lane uint64) (*SignedVoucher, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ProviderDealState is the current state of a deal from the point of view
|
||||||
|
// of a retrieval provider
|
||||||
|
type ProviderDealState struct {
|
||||||
|
DealProposal
|
||||||
|
Status DealStatus
|
||||||
|
Receiver peer.ID
|
||||||
|
TotalSent uint64
|
||||||
|
FundsReceived BigInt
|
||||||
|
}
|
||||||
|
|
||||||
|
// ProviderEvent is an event that occurs in a deal lifecycle on the provider
|
||||||
|
type ProviderEvent uint64
|
||||||
|
|
||||||
|
const (
|
||||||
|
|
||||||
|
// ProviderEventOpen indicates a new deal was received from a client
|
||||||
|
ProviderEventOpen ProviderEvent = iota
|
||||||
|
|
||||||
|
// ProviderEventProgress indicates more data was sent to a client
|
||||||
|
ProviderEventProgress
|
||||||
|
|
||||||
|
// ProviderEventError indicates an error occurred in processing a deal for a client
|
||||||
|
ProviderEventError
|
||||||
|
|
||||||
|
// ProviderEventComplete indicates a retrieval deal was completed for a client
|
||||||
|
ProviderEventComplete
|
||||||
|
)
|
||||||
|
|
||||||
|
// ProviderDealID is a unique identifier for a deal on a provider -- it is
|
||||||
|
// a combination of DealID set by the client and the peer ID of the client
|
||||||
|
type ProviderDealID struct {
|
||||||
|
From peer.ID
|
||||||
|
ID DealID
|
||||||
|
}
|
||||||
|
|
||||||
|
// ProviderSubscriber is a callback that is registered to listen for retrieval events on a provider
|
||||||
|
type ProviderSubscriber func(event ProviderEvent, state ProviderDealState)
|
||||||
|
|
||||||
|
// RetrievalProvider is an interface by which a provider configures their
|
||||||
|
// retrieval operations and monitors deals received and process
|
||||||
|
type RetrievalProvider interface {
|
||||||
|
// Start begins listening for deals on the given host
|
||||||
|
Start(host.Host)
|
||||||
|
|
||||||
|
// V0
|
||||||
|
|
||||||
|
// SetPricePerByte sets the price per byte a miner charges for retrievals
|
||||||
|
SetPricePerByte(price BigInt)
|
||||||
|
|
||||||
|
// SetPaymentInterval sets the maximum number of bytes a a provider will send before
|
||||||
|
// requesting further payment, and the rate at which that value increases
|
||||||
|
SetPaymentInterval(paymentInterval uint64, paymentIntervalIncrease uint64)
|
||||||
|
|
||||||
|
// SubscribeToEvents listens for events that happen related to client retrievals
|
||||||
|
SubscribeToEvents(subscriber ProviderSubscriber) Unsubscribe
|
||||||
|
|
||||||
|
// V1
|
||||||
|
SetPricePerUnseal(price BigInt)
|
||||||
|
ListDeals() map[ProviderDealID]ProviderDealState
|
||||||
|
}
|
||||||
|
|
||||||
|
// RetrievalProviderNode are the node depedencies for a RetrevalProvider
|
||||||
|
type RetrievalProviderNode interface {
|
||||||
|
SavePaymentVoucher(ctx context.Context, paymentChannel address.Address, voucher *SignedVoucher, proof []byte, expectedAmount BigInt) (BigInt, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PeerResolver is an interface for looking up providers that may have a piece
|
||||||
|
type PeerResolver interface {
|
||||||
|
GetPeers(data cid.Cid) ([]RetrievalPeer, error) // TODO: channel
|
||||||
|
}
|
||||||
|
|
||||||
|
// RetrievalPeer is a provider address/peer.ID pair (everything needed to make
|
||||||
|
// deals for with a miner)
|
||||||
|
type RetrievalPeer struct {
|
||||||
|
Address Address
|
||||||
|
ID peer.ID // optional
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueryResponseStatus indicates whether a queried piece is available
|
||||||
type QueryResponseStatus uint64
|
type QueryResponseStatus uint64
|
||||||
|
|
||||||
const (
|
const (
|
||||||
Available QueryResponseStatus = iota
|
// QueryResponseAvailable indicates a provider has a piece and is prepared to
|
||||||
Unavailable
|
// return it
|
||||||
|
QueryResponseAvailable QueryResponseStatus = iota
|
||||||
|
|
||||||
|
// QueryResponseUnavailable indicates a provider either does not have or cannot
|
||||||
|
// serve the queried piece to the client
|
||||||
|
QueryResponseUnavailable
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// QueryItemStatus (V1) indicates whether the requested part of a piece (payload or selector)
|
||||||
|
// is available for retrieval
|
||||||
|
type QueryItemStatus uint64
|
||||||
|
|
||||||
const (
|
const (
|
||||||
Accepted = iota
|
// QueryItemAvailable indicates requested part of the piece is available to be
|
||||||
Error
|
// served
|
||||||
Rejected
|
QueryItemAvailable QueryItemStatus = iota
|
||||||
Unsealing
|
|
||||||
|
// QueryItemUnavailable indicates the piece either does not contain the requested
|
||||||
|
// item or it cannot be served
|
||||||
|
QueryItemUnavailable
|
||||||
|
|
||||||
|
// QueryItemUnknown indicates the provider cannot determine if the given item
|
||||||
|
// is part of the requested piece (for example, if the piece is sealed and the
|
||||||
|
// miner does not maintain a payload CID index)
|
||||||
|
QueryItemUnknown
|
||||||
)
|
)
|
||||||
|
|
||||||
type Query struct {
|
// QueryParams indicate what specific information about a piece that a retrieval
|
||||||
Piece cid.Cid
|
// client is interested in, as well as specific parameters the client is seeking
|
||||||
// TODO: payment
|
// for the retrieval deal
|
||||||
|
type QueryParams struct {
|
||||||
|
PayloadCID cid.Cid // optional, query if miner has this cid in this piece. some miners may not be able to respond.
|
||||||
|
Selector ipld.Node // optional, query if miner has this cid in this piece. some miners may not be able to respond.
|
||||||
|
MaxPricePerByte BigInt // optional, tell miner uninterested if more expensive than this
|
||||||
|
MinPaymentInterval uint64 // optional, tell miner uninterested unless payment interval is greater than this
|
||||||
|
MinPaymentIntervalIncrease uint64 // optional, tell miner uninterested unless payment interval increase is greater than this
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Query is a query to a given provider to determine information about a piece
|
||||||
|
// they may have available for retrieval
|
||||||
|
type Query struct {
|
||||||
|
PieceCID []byte // V0
|
||||||
|
// QueryParams // V1
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueryResponse is a miners response to a given retrieval query
|
||||||
type QueryResponse struct {
|
type QueryResponse struct {
|
||||||
Status QueryResponseStatus
|
Status QueryResponseStatus
|
||||||
|
//PayloadCIDFound QueryItemStatus // V1 - if a PayloadCid was requested, the result
|
||||||
|
//SelectorFound QueryItemStatus // V1 - if a Selector was requested, the result
|
||||||
|
|
||||||
Size uint64 // TODO: spec
|
Size uint64 // Total size of piece in bytes
|
||||||
// TODO: unseal price (+spec)
|
//ExpectedPayloadSize uint64 // V1 - optional, if PayloadCID + selector are specified and miner knows, can offer an expected size
|
||||||
// TODO: sectors to unseal
|
|
||||||
// TODO: address to send money for the deal?
|
PaymentAddress Address // address to send funds to -- may be different than miner addr
|
||||||
MinPrice types.BigInt
|
MinPricePerByte BigInt
|
||||||
|
MaxPaymentInterval uint64
|
||||||
|
MaxPaymentIntervalIncrease uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
type Unixfs0Offer struct {
|
// QueryResponseUndefined is an empty QueryResponse
|
||||||
Offset uint64
|
var QueryResponseUndefined = QueryResponse{}
|
||||||
Size uint64
|
|
||||||
|
// PieceRetrievalPrice is the total price to retrieve the piece (size * MinPricePerByte)
|
||||||
|
func (qr QueryResponse) PieceRetrievalPrice() BigInt {
|
||||||
|
return types.BigMul(qr.MinPricePerByte, types.NewInt(qr.Size))
|
||||||
}
|
}
|
||||||
|
|
||||||
type RetParams struct {
|
// PayloadRetrievalPrice is the expected price to retrieve just the given payload
|
||||||
Unixfs0 *Unixfs0Offer
|
// & selector (V1)
|
||||||
|
//func (qr QueryResponse) PayloadRetrievalPrice() BigInt {
|
||||||
|
// return types.BigMul(qr.MinPricePerByte, types.NewInt(qr.ExpectedPayloadSize))
|
||||||
|
//}
|
||||||
|
|
||||||
|
// DealStatus is the status of a retrieval deal returned by a provider
|
||||||
|
// in a DealResponse
|
||||||
|
type DealStatus uint64
|
||||||
|
|
||||||
|
const (
|
||||||
|
// DealStatusAccepted means a deal has been accepted by a provider
|
||||||
|
// and its is ready to proceed with retrieval
|
||||||
|
DealStatusAccepted DealStatus = iota
|
||||||
|
|
||||||
|
// DealStatusFailed indicates something went wrong during a retrieval
|
||||||
|
DealStatusFailed
|
||||||
|
|
||||||
|
// DealStatusRejected indicates the provider rejected a client's deal proposal
|
||||||
|
// for some reason
|
||||||
|
DealStatusRejected
|
||||||
|
|
||||||
|
// DealStatusUnsealing indicates the provider is currently unsealing the sector
|
||||||
|
// needed to serve the retrieval deal
|
||||||
|
DealStatusUnsealing
|
||||||
|
|
||||||
|
// DealStatusFundsNeeded indicates the provider is awaiting a payment voucher to
|
||||||
|
// continue processing the deal
|
||||||
|
DealStatusFundsNeeded
|
||||||
|
|
||||||
|
// DealStatusOngoing indicates the provider is continuing to process a deal
|
||||||
|
DealStatusOngoing
|
||||||
|
|
||||||
|
// DealStatusFundsNeededLastPayment indicates the provider is awaiting funds for
|
||||||
|
// a final payment in order to complete a deal
|
||||||
|
DealStatusFundsNeededLastPayment
|
||||||
|
|
||||||
|
// DealStatusCompleted indicates a deal is complete
|
||||||
|
DealStatusCompleted
|
||||||
|
|
||||||
|
// DealStatusDealNotFound indicates an update was received for a deal that could
|
||||||
|
// not be identified
|
||||||
|
DealStatusDealNotFound
|
||||||
|
)
|
||||||
|
|
||||||
|
// Params are the parameters requested for a retrieval deal proposal
|
||||||
|
type Params struct {
|
||||||
|
//PayloadCID cid.Cid // V1
|
||||||
|
//Selector ipld.Node // V1
|
||||||
|
PricePerByte BigInt
|
||||||
|
PaymentInterval uint64
|
||||||
|
PaymentIntervalIncrease uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DealID is an identifier for a retrieval deal (unique to a client)
|
||||||
|
type DealID uint64
|
||||||
|
|
||||||
|
// DealProposal is a proposal for a new retrieval deal
|
||||||
type DealProposal struct {
|
type DealProposal struct {
|
||||||
Payment api.PaymentInfo
|
PieceCID []byte
|
||||||
|
ID DealID
|
||||||
Ref cid.Cid
|
Params
|
||||||
Params RetParams
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type DealResponse struct {
|
// Block is an IPLD block in bitswap format
|
||||||
Status uint64
|
type Block struct {
|
||||||
Message string
|
Prefix []byte
|
||||||
}
|
|
||||||
|
|
||||||
type Block struct { // TODO: put in spec
|
|
||||||
Prefix []byte // TODO: fix cid.Prefix marshaling somehow
|
|
||||||
Data []byte
|
Data []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DealResponse is a response to a retrieval deal proposal
|
||||||
|
type DealResponse struct {
|
||||||
|
Status DealStatus
|
||||||
|
ID DealID
|
||||||
|
|
||||||
|
// payment required to proceed
|
||||||
|
PaymentOwed BigInt
|
||||||
|
|
||||||
|
Message string
|
||||||
|
Blocks []Block // V0 only
|
||||||
|
}
|
||||||
|
|
||||||
|
// DealPayment is a payment for an in progress retrieval deal
|
||||||
|
type DealPayment struct {
|
||||||
|
ID DealID
|
||||||
|
PaymentChannel address.Address
|
||||||
|
PaymentVoucher *types.SignedVoucher
|
||||||
|
}
|
||||||
|
41
retrievaladapter/client.go
Normal file
41
retrievaladapter/client.go
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
package retrievaladapter
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
payapi "github.com/filecoin-project/lotus/node/impl/paych"
|
||||||
|
"github.com/filecoin-project/lotus/paych"
|
||||||
|
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
|
||||||
|
)
|
||||||
|
|
||||||
|
type retrievalClientNode struct {
|
||||||
|
pmgr *paych.Manager
|
||||||
|
payapi payapi.PaychAPI
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewRetrievalClientNode returns a new node adapter for a retrieval client that talks to the
|
||||||
|
// Lotus Node
|
||||||
|
func NewRetrievalClientNode(pmgr *paych.Manager, payapi payapi.PaychAPI) retrievalmarket.RetrievalClientNode {
|
||||||
|
return &retrievalClientNode{pmgr: pmgr, payapi: payapi}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetOrCreatePaymentChannel sets up a new payment channel if one does not exist
|
||||||
|
// between a client and a miner and insures the client has the given amount of funds available in the channel
|
||||||
|
func (rcn *retrievalClientNode) GetOrCreatePaymentChannel(ctx context.Context, clientAddress retrievalmarket.Address, minerAddress retrievalmarket.Address, clientFundsAvailable retrievalmarket.BigInt) (retrievalmarket.Address, error) {
|
||||||
|
paych, _, err := rcn.pmgr.GetPaych(ctx, clientAddress, minerAddress, clientFundsAvailable)
|
||||||
|
return paych, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Allocate late creates a lane within a payment channel so that calls to
|
||||||
|
// CreatePaymentVoucher will automatically make vouchers only for the difference
|
||||||
|
// in total
|
||||||
|
func (rcn *retrievalClientNode) AllocateLane(paymentChannel retrievalmarket.Address) (uint64, error) {
|
||||||
|
return rcn.pmgr.AllocateLane(paymentChannel)
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreatePaymentVoucher creates a new payment voucher in the given lane for a
|
||||||
|
// given payment channel so that all the payment vouchers in the lane add up
|
||||||
|
// to the given amount (so the payment voucher will be for the difference)
|
||||||
|
func (rcn *retrievalClientNode) CreatePaymentVoucher(ctx context.Context, paymentChannel retrievalmarket.Address, amount retrievalmarket.BigInt, lane uint64) (*retrievalmarket.SignedVoucher, error) {
|
||||||
|
return rcn.payapi.PaychVoucherCreate(ctx, paymentChannel, amount, lane)
|
||||||
|
}
|
23
retrievaladapter/provider.go
Normal file
23
retrievaladapter/provider.go
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
package retrievaladapter
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
"github.com/filecoin-project/go-address"
|
||||||
|
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
|
||||||
|
)
|
||||||
|
|
||||||
|
type retrievalProviderNode struct {
|
||||||
|
full api.FullNode
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewRetrievalProviderNode returns a new node adapter for a retrieval provider that talks to the
|
||||||
|
// Lotus Node
|
||||||
|
func NewRetrievalProviderNode(full api.FullNode) retrievalmarket.RetrievalProviderNode {
|
||||||
|
return &retrievalProviderNode{full}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rpn *retrievalProviderNode) SavePaymentVoucher(ctx context.Context, paymentChannel address.Address, voucher *retrievalmarket.SignedVoucher, proof []byte, expectedAmount retrievalmarket.BigInt) (retrievalmarket.BigInt, error) {
|
||||||
|
return rpn.full.PaychVoucherAdd(ctx, paymentChannel, voucher, proof, expectedAmount)
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user