feat(retrieval): extract retrievalmarket
Extract retrieval market and modify shared types
This commit is contained in:
parent
d6b0648610
commit
ccf359d057
@ -11,14 +11,14 @@ import (
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-cbor-util"
|
||||
cborutil "github.com/filecoin-project/go-cbor-util"
|
||||
"github.com/filecoin-project/go-fil-components/retrievalmarket"
|
||||
"github.com/filecoin-project/go-fil-components/retrievalmarket/discovery"
|
||||
"github.com/filecoin-project/go-statestore"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/actors"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
|
||||
"github.com/filecoin-project/lotus/retrieval/discovery"
|
||||
"github.com/filecoin-project/lotus/storagemarket"
|
||||
)
|
||||
|
||||
|
3
go.mod
3
go.mod
@ -16,6 +16,7 @@ require (
|
||||
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2
|
||||
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
|
||||
github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce
|
||||
github.com/filecoin-project/go-fil-components v0.0.0-20200110033229-671d79221133
|
||||
github.com/filecoin-project/go-paramfetch v0.0.1
|
||||
github.com/filecoin-project/go-sectorbuilder v0.0.0-20200109194458-9656ce473254
|
||||
github.com/filecoin-project/go-statestore v0.0.0-20200102200712-1f63c701c1e5
|
||||
@ -29,7 +30,7 @@ require (
|
||||
github.com/ipfs/go-bitswap v0.1.8
|
||||
github.com/ipfs/go-block-format v0.0.2
|
||||
github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c
|
||||
github.com/ipfs/go-car v0.0.2
|
||||
github.com/ipfs/go-car v0.0.3-0.20191203022317-23b0a85fd1b1
|
||||
github.com/ipfs/go-cid v0.0.4
|
||||
github.com/ipfs/go-datastore v0.3.1
|
||||
github.com/ipfs/go-ds-badger2 v0.0.0-20200108185345-7f650e6b2521
|
||||
|
7
go.sum
7
go.sum
@ -111,6 +111,8 @@ github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod
|
||||
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
|
||||
github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce h1:Jdejrx6XVSTRy2PiX08HCU5y68p3wx2hNMJJc/J7kZY=
|
||||
github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce/go.mod h1:b14UWxhxVCAjrQUYvVGrQRRsjAh79wXYejw9RbUcAww=
|
||||
github.com/filecoin-project/go-fil-components v0.0.0-20200110033229-671d79221133 h1:/L916kY3hyq8w18rLO9VMSHqw25/9pwRB3nVW6b+Sm4=
|
||||
github.com/filecoin-project/go-fil-components v0.0.0-20200110033229-671d79221133/go.mod h1:7M0YUI2CSVmqEmXNeXNq5L/pjk7C1Q5ifhirfMADD/k=
|
||||
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878 h1:YicJT9xhPzZ1SBGiJFNUCkfwqK/G9vFyY1ytKBSjNJA=
|
||||
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878 h1:YicJT9xhPzZ1SBGiJFNUCkfwqK/G9vFyY1ytKBSjNJA=
|
||||
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go.mod h1:40kI2Gv16mwcRsHptI3OAV4nlOEU7wVDc4RgMylNFjU=
|
||||
@ -120,6 +122,7 @@ github.com/filecoin-project/go-paramfetch v0.0.1 h1:gV7bs5YaqlgpGFMiLxInGK2L1FyC
|
||||
github.com/filecoin-project/go-paramfetch v0.0.1/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc=
|
||||
github.com/filecoin-project/go-sectorbuilder v0.0.0-20200109194458-9656ce473254 h1:4IvlPad82JaNBtqh8fEAUIKWv8I3tguAJjGvUyHNZS4=
|
||||
github.com/filecoin-project/go-sectorbuilder v0.0.0-20200109194458-9656ce473254/go.mod h1:3OZ4E3B2OuwhJjtxR4r7hPU9bCfB+A+hm4alLEsaeDc=
|
||||
github.com/filecoin-project/go-statestore v0.0.0-20191219195854-7a95521e8f15/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI=
|
||||
github.com/filecoin-project/go-statestore v0.0.0-20200102200712-1f63c701c1e5 h1:NZXq90YlfakSmB2/84dGr0AVmKYFA97+yyViBIgTFbk=
|
||||
github.com/filecoin-project/go-statestore v0.0.0-20200102200712-1f63c701c1e5/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI=
|
||||
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
|
||||
@ -210,8 +213,8 @@ github.com/ipfs/go-blockservice v0.0.7/go.mod h1:EOfb9k/Y878ZTRY/CH0x5+ATtaipfbR
|
||||
github.com/ipfs/go-blockservice v0.1.0/go.mod h1:hzmMScl1kXHg3M2BjTymbVPjv627N7sYcvYaKbop39M=
|
||||
github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c h1:lN5IQA07VtLiTLAp/Scezp1ljFhXErC6yq4O1cu+yJ0=
|
||||
github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c/go.mod h1:t+411r7psEUhLueM8C7aPA7cxCclv4O3VsUVxt9kz2I=
|
||||
github.com/ipfs/go-car v0.0.2 h1:j02lzgeijorstzoMl3nQmvvb8wjJUVCiOAl8XEwYMCQ=
|
||||
github.com/ipfs/go-car v0.0.2/go.mod h1:60pzeu308k5kVFHzq0HIi2kPtITgor+1ll1xuGk5JwQ=
|
||||
github.com/ipfs/go-car v0.0.3-0.20191203022317-23b0a85fd1b1 h1:Nq8xEW+2KZq7IkRlkOh0rTEUI8FgunhMoLj5EMkJzbQ=
|
||||
github.com/ipfs/go-car v0.0.3-0.20191203022317-23b0a85fd1b1/go.mod h1:rmd887mJxQRDfndfDEY3Liyx8gQVyfFFRSHdsnDSAlk=
|
||||
github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
|
||||
github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
|
||||
github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
|
||||
|
@ -19,6 +19,8 @@ import (
|
||||
"go.uber.org/fx"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-fil-components/retrievalmarket"
|
||||
"github.com/filecoin-project/go-fil-components/retrievalmarket/discovery"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain"
|
||||
"github.com/filecoin-project/lotus/chain/blocksync"
|
||||
@ -43,8 +45,6 @@ import (
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
"github.com/filecoin-project/lotus/paych"
|
||||
"github.com/filecoin-project/lotus/peermgr"
|
||||
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
|
||||
"github.com/filecoin-project/lotus/retrieval/discovery"
|
||||
"github.com/filecoin-project/lotus/storage"
|
||||
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||
"github.com/filecoin-project/lotus/storagemarket"
|
||||
@ -222,7 +222,7 @@ func Online() Option {
|
||||
Override(RunPeerMgrKey, modules.RunPeerMgr),
|
||||
Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks),
|
||||
|
||||
Override(new(*discovery.Local), discovery.NewLocal),
|
||||
Override(new(*discovery.Local), modules.NewLocalDiscovery),
|
||||
Override(new(retrievalmarket.PeerResolver), modules.RetrievalResolver),
|
||||
|
||||
Override(new(retrievalmarket.RetrievalClient), modules.RetrievalClient),
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
"go.uber.org/fx"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-fil-components/retrievalmarket"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/store"
|
||||
@ -32,7 +33,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/node/impl/full"
|
||||
"github.com/filecoin-project/lotus/node/impl/paych"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
|
||||
"github.com/filecoin-project/lotus/retrievaladapter"
|
||||
"github.com/filecoin-project/lotus/storagemarket"
|
||||
)
|
||||
|
||||
@ -164,7 +165,7 @@ func (a *API) ClientFindData(ctx context.Context, root cid.Cid) ([]api.QueryOffe
|
||||
out[k] = api.QueryOffer{
|
||||
Root: root,
|
||||
Size: queryResponse.Size,
|
||||
MinPrice: queryResponse.PieceRetrievalPrice(),
|
||||
MinPrice: retrievaladapter.FromSharedTokenAmount(queryResponse.PieceRetrievalPrice()),
|
||||
Miner: p.Address, // TODO: check
|
||||
MinerPeerID: p.ID,
|
||||
}
|
||||
@ -292,9 +293,13 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, path
|
||||
})
|
||||
|
||||
a.Retrieval.Retrieve(
|
||||
ctx, order.Root.Bytes(), retrievalmarket.Params{
|
||||
PricePerByte: types.BigDiv(order.Total, types.NewInt(order.Size)),
|
||||
}, order.Total, order.MinerPeerID, order.Client, order.Miner)
|
||||
ctx,
|
||||
order.Root.Bytes(),
|
||||
retrievalmarket.NewParamsV0(types.BigDiv(order.Total, types.NewInt(order.Size)).Int, 0, 0),
|
||||
retrievaladapter.ToSharedTokenAmount(order.Total),
|
||||
order.MinerPeerID,
|
||||
order.Client,
|
||||
order.Miner)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return xerrors.New("Retrieval Timed Out")
|
||||
|
@ -6,11 +6,12 @@ import (
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
|
||||
"github.com/filecoin-project/go-fil-components/retrievalmarket"
|
||||
retrievalimpl "github.com/filecoin-project/go-fil-components/retrievalmarket/impl"
|
||||
"github.com/filecoin-project/go-statestore"
|
||||
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||||
"github.com/filecoin-project/lotus/paych"
|
||||
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
|
||||
retrievalimpl "github.com/filecoin-project/lotus/retrieval/impl"
|
||||
|
||||
"github.com/ipfs/go-bitswap"
|
||||
"github.com/ipfs/go-bitswap/network"
|
||||
graphsync "github.com/ipfs/go-graphsync/impl"
|
||||
|
@ -8,15 +8,16 @@ import (
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
"go.uber.org/fx"
|
||||
|
||||
"github.com/filecoin-project/go-fil-components/retrievalmarket"
|
||||
"github.com/filecoin-project/go-fil-components/retrievalmarket/discovery"
|
||||
"github.com/filecoin-project/lotus/chain"
|
||||
"github.com/filecoin-project/lotus/chain/blocksync"
|
||||
"github.com/filecoin-project/lotus/chain/messagepool"
|
||||
"github.com/filecoin-project/lotus/chain/sub"
|
||||
"github.com/filecoin-project/lotus/node/hello"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||||
"github.com/filecoin-project/lotus/peermgr"
|
||||
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
|
||||
"github.com/filecoin-project/lotus/retrieval/discovery"
|
||||
"github.com/filecoin-project/lotus/storagemarket"
|
||||
)
|
||||
|
||||
@ -81,6 +82,10 @@ func RunDealClient(mctx helpers.MetricsCtx, lc fx.Lifecycle, c storagemarket.Sto
|
||||
})
|
||||
}
|
||||
|
||||
func NewLocalDiscovery(ds dtypes.MetadataDS) *discovery.Local {
|
||||
return discovery.NewLocal(ds)
|
||||
}
|
||||
|
||||
func RetrievalResolver(l *discovery.Local) retrievalmarket.PeerResolver {
|
||||
return discovery.Multi(l)
|
||||
}
|
||||
|
@ -25,6 +25,8 @@ import (
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
dtgraphsync "github.com/filecoin-project/go-data-transfer/impl/graphsync"
|
||||
"github.com/filecoin-project/go-fil-components/retrievalmarket"
|
||||
retrievalimpl "github.com/filecoin-project/go-fil-components/retrievalmarket/impl"
|
||||
"github.com/filecoin-project/go-sectorbuilder"
|
||||
"github.com/filecoin-project/go-statestore"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
@ -35,8 +37,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
|
||||
retrievalimpl "github.com/filecoin-project/lotus/retrieval/impl"
|
||||
|
||||
"github.com/filecoin-project/lotus/retrievaladapter"
|
||||
"github.com/filecoin-project/lotus/storage"
|
||||
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||
@ -265,6 +266,6 @@ func SealTicketGen(api api.FullNode) storage.TicketFn {
|
||||
|
||||
// RetrievalProvider creates a new retrieval provider attached to the provider blockstore
|
||||
func RetrievalProvider(sblks *sectorblocks.SectorBlocks, full api.FullNode) retrievalmarket.RetrievalProvider {
|
||||
adapter := retrievaladapter.NewRetrievalProviderNode(full)
|
||||
return retrievalimpl.NewProvider(sblks, adapter)
|
||||
adapter := retrievaladapter.NewRetrievalProviderNode(sblks, full)
|
||||
return retrievalimpl.NewProvider(adapter)
|
||||
}
|
||||
|
@ -1,20 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
retrievalimpl "github.com/filecoin-project/lotus/retrieval/impl"
|
||||
)
|
||||
|
||||
// main func has ONE JOB
|
||||
func main() {
|
||||
fmt.Print("Generating Cbor Marshal/Unmarshal...")
|
||||
|
||||
if err := retrievalimpl.RunCborGen(); err != nil {
|
||||
fmt.Println("Failed: ")
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
fmt.Println("Done.")
|
||||
}
|
@ -1,15 +0,0 @@
|
||||
package discovery
|
||||
|
||||
import (
|
||||
cbor "github.com/ipfs/go-ipld-cbor"
|
||||
|
||||
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
|
||||
)
|
||||
|
||||
func init() {
|
||||
cbor.RegisterCborType(retrievalmarket.RetrievalPeer{})
|
||||
}
|
||||
|
||||
func Multi(r retrievalmarket.PeerResolver) retrievalmarket.PeerResolver { // TODO: actually support multiple mechanisms
|
||||
return r
|
||||
}
|
@ -1,54 +0,0 @@
|
||||
package discovery
|
||||
|
||||
import (
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipfs/go-datastore"
|
||||
"github.com/ipfs/go-datastore/namespace"
|
||||
dshelp "github.com/ipfs/go-ipfs-ds-help"
|
||||
cbor "github.com/ipfs/go-ipld-cbor"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
|
||||
)
|
||||
|
||||
var log = logging.Logger("ret-discovery")
|
||||
|
||||
type Local struct {
|
||||
ds datastore.Datastore
|
||||
}
|
||||
|
||||
func NewLocal(ds dtypes.MetadataDS) *Local {
|
||||
return &Local{ds: namespace.Wrap(ds, datastore.NewKey("/deals/local"))}
|
||||
}
|
||||
|
||||
func (l *Local) AddPeer(cid cid.Cid, peer retrievalmarket.RetrievalPeer) error {
|
||||
// TODO: allow multiple peers here
|
||||
// (implement an util for tracking map[thing][]otherThing, use in sectorBlockstore too)
|
||||
|
||||
log.Warn("Tracking multiple retrieval peers not implemented")
|
||||
|
||||
entry, err := cbor.DumpObject(peer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return l.ds.Put(dshelp.CidToDsKey(cid), entry)
|
||||
}
|
||||
|
||||
func (l *Local) GetPeers(data cid.Cid) ([]retrievalmarket.RetrievalPeer, error) {
|
||||
entry, err := l.ds.Get(dshelp.CidToDsKey(data))
|
||||
if err == datastore.ErrNotFound {
|
||||
return []retrievalmarket.RetrievalPeer{}, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var peer retrievalmarket.RetrievalPeer
|
||||
if err := cbor.DecodeInto(entry, &peer); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return []retrievalmarket.RetrievalPeer{peer}, nil
|
||||
}
|
||||
|
||||
var _ retrievalmarket.PeerResolver = &Local{}
|
@ -1,476 +0,0 @@
|
||||
package retrievalimpl
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
cbg "github.com/whyrusleeping/cbor-gen"
|
||||
xerrors "golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT.
|
||||
|
||||
var _ = xerrors.Errorf
|
||||
|
||||
func (t *RetParams) MarshalCBOR(w io.Writer) error {
|
||||
if t == nil {
|
||||
_, err := w.Write(cbg.CborNull)
|
||||
return err
|
||||
}
|
||||
if _, err := w.Write([]byte{129}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// t.Unixfs0 (retrievalimpl.Unixfs0Offer) (struct)
|
||||
if err := t.Unixfs0.MarshalCBOR(w); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *RetParams) UnmarshalCBOR(r io.Reader) error {
|
||||
br := cbg.GetPeeker(r)
|
||||
|
||||
maj, extra, err := cbg.CborReadHeader(br)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if maj != cbg.MajArray {
|
||||
return fmt.Errorf("cbor input should be of type array")
|
||||
}
|
||||
|
||||
if extra != 1 {
|
||||
return fmt.Errorf("cbor input had wrong number of fields")
|
||||
}
|
||||
|
||||
// t.Unixfs0 (retrievalimpl.Unixfs0Offer) (struct)
|
||||
|
||||
{
|
||||
|
||||
pb, err := br.PeekByte()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if pb == cbg.CborNull[0] {
|
||||
var nbuf [1]byte
|
||||
if _, err := br.Read(nbuf[:]); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
t.Unixfs0 = new(Unixfs0Offer)
|
||||
if err := t.Unixfs0.UnmarshalCBOR(br); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *OldQuery) MarshalCBOR(w io.Writer) error {
|
||||
if t == nil {
|
||||
_, err := w.Write(cbg.CborNull)
|
||||
return err
|
||||
}
|
||||
if _, err := w.Write([]byte{129}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// t.Piece (cid.Cid) (struct)
|
||||
|
||||
if err := cbg.WriteCid(w, t.Piece); err != nil {
|
||||
return xerrors.Errorf("failed to write cid field t.Piece: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *OldQuery) UnmarshalCBOR(r io.Reader) error {
|
||||
br := cbg.GetPeeker(r)
|
||||
|
||||
maj, extra, err := cbg.CborReadHeader(br)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if maj != cbg.MajArray {
|
||||
return fmt.Errorf("cbor input should be of type array")
|
||||
}
|
||||
|
||||
if extra != 1 {
|
||||
return fmt.Errorf("cbor input had wrong number of fields")
|
||||
}
|
||||
|
||||
// t.Piece (cid.Cid) (struct)
|
||||
|
||||
{
|
||||
|
||||
c, err := cbg.ReadCid(br)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to read cid field t.Piece: %w", err)
|
||||
}
|
||||
|
||||
t.Piece = c
|
||||
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *OldQueryResponse) MarshalCBOR(w io.Writer) error {
|
||||
if t == nil {
|
||||
_, err := w.Write(cbg.CborNull)
|
||||
return err
|
||||
}
|
||||
if _, err := w.Write([]byte{131}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// t.Status (retrievalimpl.OldQueryResponseStatus) (uint64)
|
||||
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Status))); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// t.Size (uint64) (uint64)
|
||||
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Size))); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// t.MinPrice (types.BigInt) (struct)
|
||||
if err := t.MinPrice.MarshalCBOR(w); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *OldQueryResponse) UnmarshalCBOR(r io.Reader) error {
|
||||
br := cbg.GetPeeker(r)
|
||||
|
||||
maj, extra, err := cbg.CborReadHeader(br)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if maj != cbg.MajArray {
|
||||
return fmt.Errorf("cbor input should be of type array")
|
||||
}
|
||||
|
||||
if extra != 3 {
|
||||
return fmt.Errorf("cbor input had wrong number of fields")
|
||||
}
|
||||
|
||||
// t.Status (retrievalimpl.OldQueryResponseStatus) (uint64)
|
||||
|
||||
maj, extra, err = cbg.CborReadHeader(br)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if maj != cbg.MajUnsignedInt {
|
||||
return fmt.Errorf("wrong type for uint64 field")
|
||||
}
|
||||
t.Status = OldQueryResponseStatus(extra)
|
||||
// t.Size (uint64) (uint64)
|
||||
|
||||
maj, extra, err = cbg.CborReadHeader(br)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if maj != cbg.MajUnsignedInt {
|
||||
return fmt.Errorf("wrong type for uint64 field")
|
||||
}
|
||||
t.Size = uint64(extra)
|
||||
// t.MinPrice (types.BigInt) (struct)
|
||||
|
||||
{
|
||||
|
||||
if err := t.MinPrice.UnmarshalCBOR(br); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *Unixfs0Offer) MarshalCBOR(w io.Writer) error {
|
||||
if t == nil {
|
||||
_, err := w.Write(cbg.CborNull)
|
||||
return err
|
||||
}
|
||||
if _, err := w.Write([]byte{130}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// t.Offset (uint64) (uint64)
|
||||
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Offset))); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// t.Size (uint64) (uint64)
|
||||
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Size))); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *Unixfs0Offer) UnmarshalCBOR(r io.Reader) error {
|
||||
br := cbg.GetPeeker(r)
|
||||
|
||||
maj, extra, err := cbg.CborReadHeader(br)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if maj != cbg.MajArray {
|
||||
return fmt.Errorf("cbor input should be of type array")
|
||||
}
|
||||
|
||||
if extra != 2 {
|
||||
return fmt.Errorf("cbor input had wrong number of fields")
|
||||
}
|
||||
|
||||
// t.Offset (uint64) (uint64)
|
||||
|
||||
maj, extra, err = cbg.CborReadHeader(br)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if maj != cbg.MajUnsignedInt {
|
||||
return fmt.Errorf("wrong type for uint64 field")
|
||||
}
|
||||
t.Offset = uint64(extra)
|
||||
// t.Size (uint64) (uint64)
|
||||
|
||||
maj, extra, err = cbg.CborReadHeader(br)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if maj != cbg.MajUnsignedInt {
|
||||
return fmt.Errorf("wrong type for uint64 field")
|
||||
}
|
||||
t.Size = uint64(extra)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *OldDealProposal) MarshalCBOR(w io.Writer) error {
|
||||
if t == nil {
|
||||
_, err := w.Write(cbg.CborNull)
|
||||
return err
|
||||
}
|
||||
if _, err := w.Write([]byte{131}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// t.Payment (api.PaymentInfo) (struct)
|
||||
if err := t.Payment.MarshalCBOR(w); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// t.Ref (cid.Cid) (struct)
|
||||
|
||||
if err := cbg.WriteCid(w, t.Ref); err != nil {
|
||||
return xerrors.Errorf("failed to write cid field t.Ref: %w", err)
|
||||
}
|
||||
|
||||
// t.Params (retrievalimpl.RetParams) (struct)
|
||||
if err := t.Params.MarshalCBOR(w); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *OldDealProposal) UnmarshalCBOR(r io.Reader) error {
|
||||
br := cbg.GetPeeker(r)
|
||||
|
||||
maj, extra, err := cbg.CborReadHeader(br)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if maj != cbg.MajArray {
|
||||
return fmt.Errorf("cbor input should be of type array")
|
||||
}
|
||||
|
||||
if extra != 3 {
|
||||
return fmt.Errorf("cbor input had wrong number of fields")
|
||||
}
|
||||
|
||||
// t.Payment (api.PaymentInfo) (struct)
|
||||
|
||||
{
|
||||
|
||||
if err := t.Payment.UnmarshalCBOR(br); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
}
|
||||
// t.Ref (cid.Cid) (struct)
|
||||
|
||||
{
|
||||
|
||||
c, err := cbg.ReadCid(br)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to read cid field t.Ref: %w", err)
|
||||
}
|
||||
|
||||
t.Ref = c
|
||||
|
||||
}
|
||||
// t.Params (retrievalimpl.RetParams) (struct)
|
||||
|
||||
{
|
||||
|
||||
if err := t.Params.UnmarshalCBOR(br); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *OldDealResponse) MarshalCBOR(w io.Writer) error {
|
||||
if t == nil {
|
||||
_, err := w.Write(cbg.CborNull)
|
||||
return err
|
||||
}
|
||||
if _, err := w.Write([]byte{130}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// t.Status (uint64) (uint64)
|
||||
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Status))); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// t.Message (string) (string)
|
||||
if len(t.Message) > cbg.MaxLength {
|
||||
return xerrors.Errorf("Value in field t.Message was too long")
|
||||
}
|
||||
|
||||
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Message)))); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := w.Write([]byte(t.Message)); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *OldDealResponse) UnmarshalCBOR(r io.Reader) error {
|
||||
br := cbg.GetPeeker(r)
|
||||
|
||||
maj, extra, err := cbg.CborReadHeader(br)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if maj != cbg.MajArray {
|
||||
return fmt.Errorf("cbor input should be of type array")
|
||||
}
|
||||
|
||||
if extra != 2 {
|
||||
return fmt.Errorf("cbor input had wrong number of fields")
|
||||
}
|
||||
|
||||
// t.Status (uint64) (uint64)
|
||||
|
||||
maj, extra, err = cbg.CborReadHeader(br)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if maj != cbg.MajUnsignedInt {
|
||||
return fmt.Errorf("wrong type for uint64 field")
|
||||
}
|
||||
t.Status = uint64(extra)
|
||||
// t.Message (string) (string)
|
||||
|
||||
{
|
||||
sval, err := cbg.ReadString(br)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
t.Message = string(sval)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *Block) MarshalCBOR(w io.Writer) error {
|
||||
if t == nil {
|
||||
_, err := w.Write(cbg.CborNull)
|
||||
return err
|
||||
}
|
||||
if _, err := w.Write([]byte{130}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// t.Prefix ([]uint8) (slice)
|
||||
if len(t.Prefix) > cbg.ByteArrayMaxLen {
|
||||
return xerrors.Errorf("Byte array in field t.Prefix was too long")
|
||||
}
|
||||
|
||||
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.Prefix)))); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := w.Write(t.Prefix); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// t.Data ([]uint8) (slice)
|
||||
if len(t.Data) > cbg.ByteArrayMaxLen {
|
||||
return xerrors.Errorf("Byte array in field t.Data was too long")
|
||||
}
|
||||
|
||||
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.Data)))); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := w.Write(t.Data); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *Block) UnmarshalCBOR(r io.Reader) error {
|
||||
br := cbg.GetPeeker(r)
|
||||
|
||||
maj, extra, err := cbg.CborReadHeader(br)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if maj != cbg.MajArray {
|
||||
return fmt.Errorf("cbor input should be of type array")
|
||||
}
|
||||
|
||||
if extra != 2 {
|
||||
return fmt.Errorf("cbor input had wrong number of fields")
|
||||
}
|
||||
|
||||
// t.Prefix ([]uint8) (slice)
|
||||
|
||||
maj, extra, err = cbg.CborReadHeader(br)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if extra > cbg.ByteArrayMaxLen {
|
||||
return fmt.Errorf("t.Prefix: byte array too large (%d)", extra)
|
||||
}
|
||||
if maj != cbg.MajByteString {
|
||||
return fmt.Errorf("expected byte array")
|
||||
}
|
||||
t.Prefix = make([]byte, extra)
|
||||
if _, err := io.ReadFull(br, t.Prefix); err != nil {
|
||||
return err
|
||||
}
|
||||
// t.Data ([]uint8) (slice)
|
||||
|
||||
maj, extra, err = cbg.CborReadHeader(br)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if extra > cbg.ByteArrayMaxLen {
|
||||
return fmt.Errorf("t.Data: byte array too large (%d)", extra)
|
||||
}
|
||||
if maj != cbg.MajByteString {
|
||||
return fmt.Errorf("expected byte array")
|
||||
}
|
||||
t.Data = make([]byte, extra)
|
||||
if _, err := io.ReadFull(br, t.Data); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
@ -1,389 +0,0 @@
|
||||
package retrievalimpl
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"sync"
|
||||
|
||||
blocks "github.com/ipfs/go-block-format"
|
||||
"github.com/ipfs/go-cid"
|
||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
cbg "github.com/whyrusleeping/cbor-gen"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
cborutil "github.com/filecoin-project/go-cbor-util"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
|
||||
)
|
||||
|
||||
var log = logging.Logger("retrieval")
|
||||
|
||||
type client struct {
|
||||
h host.Host
|
||||
bs blockstore.Blockstore
|
||||
node retrievalmarket.RetrievalClientNode
|
||||
// The parameters should be replaced by RetrievalClientNode
|
||||
|
||||
nextDealLk sync.Mutex
|
||||
nextDealID retrievalmarket.DealID
|
||||
subscribersLk sync.RWMutex
|
||||
subscribers []retrievalmarket.ClientSubscriber
|
||||
}
|
||||
|
||||
// NewClient creates a new retrieval client
|
||||
func NewClient(h host.Host, bs blockstore.Blockstore, node retrievalmarket.RetrievalClientNode) retrievalmarket.RetrievalClient {
|
||||
return &client{h: h, bs: bs, node: node}
|
||||
}
|
||||
|
||||
// V0
|
||||
|
||||
// TODO: Implement for retrieval provider V0 epic
|
||||
// https://github.com/filecoin-project/go-retrieval-market-project/issues/12
|
||||
func (c *client) FindProviders(pieceCID []byte) []retrievalmarket.RetrievalPeer {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
// TODO: Update to match spec for V0 epic
|
||||
// https://github.com/filecoin-project/go-retrieval-market-project/issues/8
|
||||
func (c *client) Query(ctx context.Context, p retrievalmarket.RetrievalPeer, pieceCID []byte, params retrievalmarket.QueryParams) (retrievalmarket.QueryResponse, error) {
|
||||
cid, err := cid.Cast(pieceCID)
|
||||
if err != nil {
|
||||
log.Warn(err)
|
||||
return retrievalmarket.QueryResponseUndefined, err
|
||||
}
|
||||
|
||||
s, err := c.h.NewStream(ctx, p.ID, retrievalmarket.QueryProtocolID)
|
||||
if err != nil {
|
||||
log.Warn(err)
|
||||
return retrievalmarket.QueryResponseUndefined, err
|
||||
}
|
||||
defer s.Close()
|
||||
|
||||
err = cborutil.WriteCborRPC(s, &OldQuery{
|
||||
Piece: cid,
|
||||
})
|
||||
if err != nil {
|
||||
log.Warn(err)
|
||||
return retrievalmarket.QueryResponseUndefined, err
|
||||
}
|
||||
|
||||
var oldResp OldQueryResponse
|
||||
if err := oldResp.UnmarshalCBOR(s); err != nil {
|
||||
log.Warn(err)
|
||||
return retrievalmarket.QueryResponseUndefined, err
|
||||
}
|
||||
|
||||
resp := retrievalmarket.QueryResponse{
|
||||
Status: retrievalmarket.QueryResponseStatus(oldResp.Status),
|
||||
Size: oldResp.Size,
|
||||
MinPricePerByte: types.BigDiv(oldResp.MinPrice, types.NewInt(oldResp.Size)),
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// TODO: Update to match spec for V0 Epic:
|
||||
// https://github.com/filecoin-project/go-retrieval-market-project/issues/9
|
||||
func (c *client) Retrieve(ctx context.Context, pieceCID []byte, params retrievalmarket.Params, totalFunds types.BigInt, miner peer.ID, clientWallet retrievalmarket.Address, minerWallet retrievalmarket.Address) retrievalmarket.DealID {
|
||||
/* The implementation of this function is just wrapper for the old code which retrieves UnixFS pieces
|
||||
-- it will be replaced when we do the V0 implementation of the module */
|
||||
c.nextDealLk.Lock()
|
||||
c.nextDealID++
|
||||
dealID := c.nextDealID
|
||||
c.nextDealLk.Unlock()
|
||||
|
||||
dealState := retrievalmarket.ClientDealState{
|
||||
DealProposal: retrievalmarket.DealProposal{
|
||||
PieceCID: pieceCID,
|
||||
ID: dealID,
|
||||
Params: params,
|
||||
},
|
||||
Status: retrievalmarket.DealStatusFailed,
|
||||
Sender: miner,
|
||||
}
|
||||
|
||||
go func() {
|
||||
evt := retrievalmarket.ClientEventError
|
||||
converted, err := cid.Cast(pieceCID)
|
||||
|
||||
if err == nil {
|
||||
err = c.retrieveUnixfs(ctx, converted, types.BigDiv(totalFunds, params.PricePerByte).Uint64(), totalFunds, miner, clientWallet, minerWallet)
|
||||
if err == nil {
|
||||
evt = retrievalmarket.ClientEventComplete
|
||||
dealState.Status = retrievalmarket.DealStatusCompleted
|
||||
}
|
||||
}
|
||||
|
||||
c.notifySubscribers(evt, dealState)
|
||||
}()
|
||||
|
||||
return dealID
|
||||
}
|
||||
|
||||
// unsubscribeAt returns a function that removes an item from the subscribers list by comparing
|
||||
// their reflect.ValueOf before pulling the item out of the slice. Does not preserve order.
|
||||
// Subsequent, repeated calls to the func with the same Subscriber are a no-op.
|
||||
func (c *client) unsubscribeAt(sub retrievalmarket.ClientSubscriber) retrievalmarket.Unsubscribe {
|
||||
return func() {
|
||||
c.subscribersLk.Lock()
|
||||
defer c.subscribersLk.Unlock()
|
||||
curLen := len(c.subscribers)
|
||||
for i, el := range c.subscribers {
|
||||
if reflect.ValueOf(sub) == reflect.ValueOf(el) {
|
||||
c.subscribers[i] = c.subscribers[curLen-1]
|
||||
c.subscribers = c.subscribers[:curLen-1]
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *client) notifySubscribers(evt retrievalmarket.ClientEvent, ds retrievalmarket.ClientDealState) {
|
||||
c.subscribersLk.RLock()
|
||||
defer c.subscribersLk.RUnlock()
|
||||
for _, cb := range c.subscribers {
|
||||
cb(evt, ds)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *client) SubscribeToEvents(subscriber retrievalmarket.ClientSubscriber) retrievalmarket.Unsubscribe {
|
||||
c.subscribersLk.Lock()
|
||||
c.subscribers = append(c.subscribers, subscriber)
|
||||
c.subscribersLk.Unlock()
|
||||
|
||||
return c.unsubscribeAt(subscriber)
|
||||
}
|
||||
|
||||
// V1
|
||||
func (c *client) AddMoreFunds(id retrievalmarket.DealID, amount types.BigInt) error {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (c *client) CancelDeal(id retrievalmarket.DealID) error {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (c *client) RetrievalStatus(id retrievalmarket.DealID) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (c *client) ListDeals() map[retrievalmarket.DealID]retrievalmarket.ClientDealState {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
type clientStream struct {
|
||||
node retrievalmarket.RetrievalClientNode
|
||||
stream network.Stream
|
||||
peeker cbg.BytePeeker
|
||||
|
||||
root cid.Cid
|
||||
size types.BigInt
|
||||
offset uint64
|
||||
|
||||
paych retrievalmarket.Address
|
||||
lane uint64
|
||||
total types.BigInt
|
||||
transferred types.BigInt
|
||||
|
||||
windowSize uint64 // how much we "trust" the peer
|
||||
verifier BlockVerifier
|
||||
bs blockstore.Blockstore
|
||||
}
|
||||
|
||||
/* This is the old retrieval code that is NOT spec compliant */
|
||||
|
||||
// C > S
|
||||
//
|
||||
// Offset MUST be aligned on chunking boundaries, size is rounded up to leaf size
|
||||
//
|
||||
// > DealProposal{Mode: Unixfs0, RootCid, Offset, Size, Payment(nil if free)}
|
||||
// < Resp{Accept}
|
||||
// < ..(Intermediate Block)
|
||||
// < ..Blocks
|
||||
// < ..(Intermediate Block)
|
||||
// < ..Blocks
|
||||
// > DealProposal(...)
|
||||
// < ...
|
||||
func (c *client) retrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, total types.BigInt, miner peer.ID, client, minerAddr retrievalmarket.Address) error {
|
||||
s, err := c.h.NewStream(ctx, miner, retrievalmarket.ProtocolID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer s.Close()
|
||||
|
||||
initialOffset := uint64(0) // TODO: Check how much data we have locally
|
||||
// TODO: Support in handler
|
||||
// TODO: Allow client to specify this
|
||||
|
||||
paych, err := c.node.GetOrCreatePaymentChannel(ctx, client, minerAddr, total)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting payment channel: %w", err)
|
||||
}
|
||||
lane, err := c.node.AllocateLane(paych)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("allocating payment lane: %w", err)
|
||||
}
|
||||
|
||||
cst := clientStream{
|
||||
node: c.node,
|
||||
stream: s,
|
||||
peeker: cbg.GetPeeker(s),
|
||||
|
||||
root: root,
|
||||
size: types.NewInt(size),
|
||||
offset: initialOffset,
|
||||
|
||||
paych: paych,
|
||||
lane: lane,
|
||||
total: total,
|
||||
transferred: types.NewInt(0),
|
||||
|
||||
windowSize: build.UnixfsChunkSize,
|
||||
verifier: &UnixFs0Verifier{Root: root},
|
||||
bs: c.bs,
|
||||
}
|
||||
|
||||
for cst.offset != size+initialOffset {
|
||||
toFetch := cst.windowSize
|
||||
if toFetch+cst.offset > size {
|
||||
toFetch = size - cst.offset
|
||||
}
|
||||
log.Infof("Retrieve %dB @%d", toFetch, cst.offset)
|
||||
|
||||
err := cst.doOneExchange(ctx, toFetch)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("retrieval exchange: %w", err)
|
||||
}
|
||||
|
||||
cst.offset += toFetch
|
||||
}
|
||||
log.Info("RETRIEVE SUCCESSFUL")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cst *clientStream) doOneExchange(ctx context.Context, toFetch uint64) error {
|
||||
payAmount := types.BigDiv(types.BigMul(cst.total, types.NewInt(toFetch)), cst.size)
|
||||
|
||||
payment, err := cst.setupPayment(ctx, payAmount)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("setting up retrieval payment: %w", err)
|
||||
}
|
||||
|
||||
deal := &OldDealProposal{
|
||||
Payment: payment,
|
||||
Ref: cst.root,
|
||||
Params: RetParams{
|
||||
Unixfs0: &Unixfs0Offer{
|
||||
Offset: cst.offset,
|
||||
Size: toFetch,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if err := cborutil.WriteCborRPC(cst.stream, deal); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var resp OldDealResponse
|
||||
if err := cborutil.ReadCborRPC(cst.peeker, &resp); err != nil {
|
||||
log.Error(err)
|
||||
return err
|
||||
}
|
||||
|
||||
if resp.Status != Accepted {
|
||||
cst.windowSize = build.UnixfsChunkSize
|
||||
// TODO: apply some 'penalty' to miner 'reputation' (needs to be the same in both cases)
|
||||
|
||||
if resp.Status == Error {
|
||||
return xerrors.Errorf("storage deal error: %s", resp.Message)
|
||||
}
|
||||
if resp.Status == Rejected {
|
||||
return xerrors.Errorf("storage deal rejected: %s", resp.Message)
|
||||
}
|
||||
return xerrors.New("storage deal response had no Accepted section")
|
||||
}
|
||||
|
||||
log.Info("Retrieval accepted, fetching blocks")
|
||||
|
||||
return cst.fetchBlocks(toFetch)
|
||||
|
||||
// TODO: maybe increase miner window size after success
|
||||
}
|
||||
|
||||
func (cst *clientStream) fetchBlocks(toFetch uint64) error {
|
||||
blocksToFetch := (toFetch + build.UnixfsChunkSize - 1) / build.UnixfsChunkSize
|
||||
|
||||
for i := uint64(0); i < blocksToFetch; {
|
||||
log.Infof("block %d of %d", i+1, blocksToFetch)
|
||||
|
||||
var block Block
|
||||
if err := cborutil.ReadCborRPC(cst.peeker, &block); err != nil {
|
||||
return xerrors.Errorf("reading fetchBlock response: %w", err)
|
||||
}
|
||||
|
||||
dataBlocks, err := cst.consumeBlockMessage(block)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("consuming retrieved blocks: %w", err)
|
||||
}
|
||||
|
||||
i += dataBlocks
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cst *clientStream) consumeBlockMessage(block Block) (uint64, error) {
|
||||
prefix, err := cid.PrefixFromBytes(block.Prefix)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
cid, err := prefix.Sum(block.Data)
|
||||
|
||||
blk, err := blocks.NewBlockWithCid(block.Data, cid)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
internal, err := cst.verifier.Verify(context.TODO(), blk)
|
||||
if err != nil {
|
||||
log.Warnf("block verify failed: %s", err)
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// TODO: Smarter out, maybe add to filestore automagically
|
||||
// (Also, persist intermediate nodes)
|
||||
err = cst.bs.Put(blk)
|
||||
if err != nil {
|
||||
log.Warnf("block write failed: %s", err)
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if internal {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
return 1, nil
|
||||
}
|
||||
|
||||
func (cst *clientStream) setupPayment(ctx context.Context, toSend types.BigInt) (api.PaymentInfo, error) {
|
||||
amount := types.BigAdd(cst.transferred, toSend)
|
||||
|
||||
sv, err := cst.node.CreatePaymentVoucher(ctx, cst.paych, amount, cst.lane)
|
||||
if err != nil {
|
||||
return api.PaymentInfo{}, err
|
||||
}
|
||||
|
||||
cst.transferred = amount
|
||||
|
||||
return api.PaymentInfo{
|
||||
Channel: cst.paych,
|
||||
ChannelMessage: nil,
|
||||
Vouchers: []*types.SignedVoucher{sv},
|
||||
}, nil
|
||||
}
|
@ -1,323 +0,0 @@
|
||||
package retrievalimpl
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"reflect"
|
||||
"sync"
|
||||
|
||||
"github.com/ipfs/go-blockservice"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipfs/go-merkledag"
|
||||
unixfile "github.com/ipfs/go-unixfs/file"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
cborutil "github.com/filecoin-project/go-cbor-util"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
|
||||
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||
)
|
||||
|
||||
// RetrMinerAPI are the node functions needed by a retrieval provider
|
||||
type RetrMinerAPI interface {
|
||||
PaychVoucherAdd(context.Context, address.Address, *types.SignedVoucher, []byte, types.BigInt) (types.BigInt, error)
|
||||
}
|
||||
|
||||
type provider struct {
|
||||
|
||||
// TODO: Replace with RetrievalProviderNode & FileStore for https://github.com/filecoin-project/go-retrieval-market-project/issues/9
|
||||
sectorBlocks *sectorblocks.SectorBlocks
|
||||
|
||||
// TODO: Replace with RetrievalProviderNode for
|
||||
// https://github.com/filecoin-project/go-retrieval-market-project/issues/4
|
||||
node retrievalmarket.RetrievalProviderNode
|
||||
|
||||
pricePerByte retrievalmarket.BigInt
|
||||
|
||||
subscribersLk sync.RWMutex
|
||||
subscribers []retrievalmarket.ProviderSubscriber
|
||||
}
|
||||
|
||||
// NewProvider returns a new retrieval provider
|
||||
func NewProvider(sblks *sectorblocks.SectorBlocks, node retrievalmarket.RetrievalProviderNode) retrievalmarket.RetrievalProvider {
|
||||
return &provider{
|
||||
sectorBlocks: sblks,
|
||||
node: node,
|
||||
|
||||
pricePerByte: types.NewInt(2), // TODO: allow setting
|
||||
}
|
||||
}
|
||||
|
||||
// Start begins listening for deals on the given host
|
||||
func (p *provider) Start(host host.Host) {
|
||||
host.SetStreamHandler(retrievalmarket.QueryProtocolID, p.handleQueryStream)
|
||||
host.SetStreamHandler(retrievalmarket.ProtocolID, p.handleDealStream)
|
||||
}
|
||||
|
||||
// V0
|
||||
// SetPricePerByte sets the price per byte a miner charges for retrievals
|
||||
func (p *provider) SetPricePerByte(price retrievalmarket.BigInt) {
|
||||
p.pricePerByte = price
|
||||
}
|
||||
|
||||
// SetPaymentInterval sets the maximum number of bytes a a provider will send before
|
||||
// requesting further payment, and the rate at which that value increases
|
||||
// TODO: Implement for https://github.com/filecoin-project/go-retrieval-market-project/issues/7
|
||||
func (p *provider) SetPaymentInterval(paymentInterval uint64, paymentIntervalIncrease uint64) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
// unsubscribeAt returns a function that removes an item from the subscribers list by comparing
|
||||
// their reflect.ValueOf before pulling the item out of the slice. Does not preserve order.
|
||||
// Subsequent, repeated calls to the func with the same Subscriber are a no-op.
|
||||
func (p *provider) unsubscribeAt(sub retrievalmarket.ProviderSubscriber) retrievalmarket.Unsubscribe {
|
||||
return func() {
|
||||
p.subscribersLk.Lock()
|
||||
defer p.subscribersLk.Unlock()
|
||||
curLen := len(p.subscribers)
|
||||
for i, el := range p.subscribers {
|
||||
if reflect.ValueOf(sub) == reflect.ValueOf(el) {
|
||||
p.subscribers[i] = p.subscribers[curLen-1]
|
||||
p.subscribers = p.subscribers[:curLen-1]
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *provider) notifySubscribers(evt retrievalmarket.ProviderEvent, ds retrievalmarket.ProviderDealState) {
|
||||
p.subscribersLk.RLock()
|
||||
defer p.subscribersLk.RUnlock()
|
||||
for _, cb := range p.subscribers {
|
||||
cb(evt, ds)
|
||||
}
|
||||
}
|
||||
|
||||
// SubscribeToEvents listens for events that happen related to client retrievals
|
||||
// TODO: Implement updates as part of https://github.com/filecoin-project/go-retrieval-market-project/issues/7
|
||||
func (p *provider) SubscribeToEvents(subscriber retrievalmarket.ProviderSubscriber) retrievalmarket.Unsubscribe {
|
||||
p.subscribersLk.Lock()
|
||||
p.subscribers = append(p.subscribers, subscriber)
|
||||
p.subscribersLk.Unlock()
|
||||
|
||||
return p.unsubscribeAt(subscriber)
|
||||
}
|
||||
|
||||
// V1
|
||||
func (p *provider) SetPricePerUnseal(price retrievalmarket.BigInt) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (p *provider) ListDeals() map[retrievalmarket.ProviderDealID]retrievalmarket.ProviderDealState {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func writeErr(stream network.Stream, err error) {
|
||||
log.Errorf("Retrieval deal error: %+v", err)
|
||||
_ = cborutil.WriteCborRPC(stream, &OldDealResponse{
|
||||
Status: Error,
|
||||
Message: err.Error(),
|
||||
})
|
||||
}
|
||||
|
||||
// TODO: Update for https://github.com/filecoin-project/go-retrieval-market-project/issues/8
|
||||
func (p *provider) handleQueryStream(stream network.Stream) {
|
||||
defer stream.Close()
|
||||
|
||||
var query OldQuery
|
||||
if err := cborutil.ReadCborRPC(stream, &query); err != nil {
|
||||
writeErr(stream, err)
|
||||
return
|
||||
}
|
||||
|
||||
size, err := p.sectorBlocks.GetSize(query.Piece)
|
||||
if err != nil && err != sectorblocks.ErrNotFound {
|
||||
log.Errorf("Retrieval query: GetRefs: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
answer := &OldQueryResponse{
|
||||
Status: Unavailable,
|
||||
}
|
||||
if err == nil {
|
||||
answer.Status = Available
|
||||
|
||||
// TODO: get price, look for already unsealed ref to reduce work
|
||||
answer.MinPrice = types.BigMul(types.NewInt(uint64(size)), p.pricePerByte)
|
||||
answer.Size = uint64(size) // TODO: verify on intermediate
|
||||
}
|
||||
|
||||
if err := cborutil.WriteCborRPC(stream, answer); err != nil {
|
||||
log.Errorf("Retrieval query: WriteCborRPC: %s", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
type handlerDeal struct {
|
||||
p *provider
|
||||
stream network.Stream
|
||||
|
||||
ufsr sectorblocks.UnixfsReader
|
||||
open cid.Cid
|
||||
at uint64
|
||||
size uint64
|
||||
}
|
||||
|
||||
// TODO: Update for https://github.com/filecoin-project/go-retrieval-market-project/issues/7
|
||||
func (p *provider) handleDealStream(stream network.Stream) {
|
||||
defer stream.Close()
|
||||
|
||||
hnd := &handlerDeal{
|
||||
p: p,
|
||||
|
||||
stream: stream,
|
||||
}
|
||||
|
||||
var err error
|
||||
more := true
|
||||
|
||||
for more {
|
||||
more, err = hnd.handleNext() // TODO: 'more' bool
|
||||
if err != nil {
|
||||
writeErr(stream, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (hnd *handlerDeal) handleNext() (bool, error) {
|
||||
var deal OldDealProposal
|
||||
if err := cborutil.ReadCborRPC(hnd.stream, &deal); err != nil {
|
||||
if err == io.EOF { // client sent all deals
|
||||
err = nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
|
||||
if deal.Params.Unixfs0 == nil {
|
||||
return false, xerrors.New("unknown deal type")
|
||||
}
|
||||
|
||||
unixfs0 := deal.Params.Unixfs0
|
||||
|
||||
if len(deal.Payment.Vouchers) != 1 {
|
||||
return false, xerrors.Errorf("expected one signed voucher, got %d", len(deal.Payment.Vouchers))
|
||||
}
|
||||
|
||||
expPayment := types.BigMul(hnd.p.pricePerByte, types.NewInt(deal.Params.Unixfs0.Size))
|
||||
if _, err := hnd.p.node.SavePaymentVoucher(context.TODO(), deal.Payment.Channel, deal.Payment.Vouchers[0], nil, expPayment); err != nil {
|
||||
return false, xerrors.Errorf("processing retrieval payment: %w", err)
|
||||
}
|
||||
|
||||
// If the file isn't open (new deal stream), isn't the right file, or isn't
|
||||
// at the right offset, (re)open it
|
||||
if hnd.open != deal.Ref || hnd.at != unixfs0.Offset {
|
||||
log.Infof("opening file for sending (open '%s') (@%d, want %d)", deal.Ref, hnd.at, unixfs0.Offset)
|
||||
if err := hnd.openFile(deal); err != nil {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
|
||||
if unixfs0.Offset+unixfs0.Size > hnd.size {
|
||||
return false, xerrors.Errorf("tried to read too much %d+%d > %d", unixfs0.Offset, unixfs0.Size, hnd.size)
|
||||
}
|
||||
|
||||
err := hnd.accept(deal)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (hnd *handlerDeal) openFile(deal OldDealProposal) error {
|
||||
unixfs0 := deal.Params.Unixfs0
|
||||
|
||||
if unixfs0.Offset != 0 {
|
||||
// TODO: Implement SeekBlock (like ReadBlock) in go-unixfs
|
||||
return xerrors.New("sending merkle proofs for nonzero offset not supported yet")
|
||||
}
|
||||
hnd.at = unixfs0.Offset
|
||||
|
||||
bstore := hnd.p.sectorBlocks.SealedBlockstore(func() error {
|
||||
return nil // TODO: approve unsealing based on amount paid
|
||||
})
|
||||
|
||||
ds := merkledag.NewDAGService(blockservice.New(bstore, nil))
|
||||
rootNd, err := ds.Get(context.TODO(), deal.Ref)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fsr, err := unixfile.NewUnixfsFile(context.TODO(), ds, rootNd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var ok bool
|
||||
hnd.ufsr, ok = fsr.(sectorblocks.UnixfsReader)
|
||||
if !ok {
|
||||
return xerrors.Errorf("file %s didn't implement sectorblocks.UnixfsReader", deal.Ref)
|
||||
}
|
||||
|
||||
isize, err := hnd.ufsr.Size()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
hnd.size = uint64(isize)
|
||||
|
||||
hnd.open = deal.Ref
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (hnd *handlerDeal) accept(deal OldDealProposal) error {
|
||||
unixfs0 := deal.Params.Unixfs0
|
||||
|
||||
resp := &OldDealResponse{
|
||||
Status: Accepted,
|
||||
}
|
||||
if err := cborutil.WriteCborRPC(hnd.stream, resp); err != nil {
|
||||
log.Errorf("Retrieval query: Write Accepted resp: %s", err)
|
||||
return err
|
||||
}
|
||||
|
||||
blocksToSend := (unixfs0.Size + build.UnixfsChunkSize - 1) / build.UnixfsChunkSize
|
||||
for i := uint64(0); i < blocksToSend; {
|
||||
data, offset, nd, err := hnd.ufsr.ReadBlock(context.TODO())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("sending block for a deal: %s", nd.Cid())
|
||||
|
||||
if offset != unixfs0.Offset {
|
||||
return xerrors.Errorf("ReadBlock on wrong offset: want %d, got %d", unixfs0.Offset, offset)
|
||||
}
|
||||
|
||||
/*if uint64(len(data)) != deal.Unixfs0.Size { // TODO: Fix for internal nodes (and any other node too)
|
||||
writeErr(stream, xerrors.Errorf("ReadBlock data with wrong size: want %d, got %d", deal.Unixfs0.Size, len(data)))
|
||||
return
|
||||
}*/
|
||||
|
||||
block := &Block{
|
||||
Prefix: nd.Cid().Prefix().Bytes(),
|
||||
Data: nd.RawData(),
|
||||
}
|
||||
|
||||
if err := cborutil.WriteCborRPC(hnd.stream, block); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(data) > 0 { // don't count internal nodes
|
||||
hnd.at += uint64(len(data))
|
||||
i++
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
@ -1,33 +0,0 @@
|
||||
package retrievalimpl
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
cborgen "github.com/whyrusleeping/cbor-gen"
|
||||
)
|
||||
|
||||
func RunCborGen() error {
|
||||
genName := "./impl/cbor_gen.go"
|
||||
reName := "./impl/cbor_gen_old.go"
|
||||
if err := os.Rename(genName, reName); err != nil {
|
||||
return fmt.Errorf("could not rename %s to %s", genName, reName)
|
||||
}
|
||||
if err := cborgen.WriteTupleEncodersToFile(
|
||||
genName,
|
||||
"retrievalimpl",
|
||||
RetParams{},
|
||||
OldQuery{},
|
||||
OldQueryResponse{},
|
||||
Unixfs0Offer{},
|
||||
OldDealProposal{},
|
||||
OldDealResponse{},
|
||||
Block{},
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := os.Remove(reName); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
@ -1,67 +0,0 @@
|
||||
package retrievalimpl
|
||||
|
||||
import (
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/ipfs/go-cid"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
)
|
||||
|
||||
/* These types are all the types provided by Lotus, which diverge even from
|
||||
spec V0 -- prior to the "update to spec epic", we are using these types internally
|
||||
and switching to spec at the boundaries of the module */
|
||||
|
||||
type OldQueryResponseStatus uint64
|
||||
|
||||
const (
|
||||
Available OldQueryResponseStatus = iota
|
||||
Unavailable
|
||||
)
|
||||
|
||||
const (
|
||||
Accepted = iota
|
||||
Error
|
||||
Rejected
|
||||
Unsealing
|
||||
)
|
||||
|
||||
type OldQuery struct {
|
||||
Piece cid.Cid
|
||||
// TODO: payment
|
||||
}
|
||||
|
||||
type OldQueryResponse struct {
|
||||
Status OldQueryResponseStatus
|
||||
|
||||
Size uint64 // TODO: spec
|
||||
// TODO: unseal price (+spec)
|
||||
// TODO: sectors to unseal
|
||||
// TODO: address to send money for the deal?
|
||||
MinPrice types.BigInt
|
||||
}
|
||||
|
||||
type Unixfs0Offer struct {
|
||||
Offset uint64
|
||||
Size uint64
|
||||
}
|
||||
|
||||
type RetParams struct {
|
||||
Unixfs0 *Unixfs0Offer
|
||||
}
|
||||
|
||||
type OldDealProposal struct {
|
||||
Payment api.PaymentInfo
|
||||
|
||||
Ref cid.Cid
|
||||
Params RetParams
|
||||
}
|
||||
|
||||
type OldDealResponse struct {
|
||||
Status uint64
|
||||
Message string
|
||||
}
|
||||
|
||||
type Block struct { // TODO: put in spec
|
||||
Prefix []byte // TODO: fix cid.Prefix marshaling somehow
|
||||
Data []byte
|
||||
}
|
@ -1,141 +0,0 @@
|
||||
package retrievalimpl
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
blocks "github.com/ipfs/go-block-format"
|
||||
"github.com/ipfs/go-cid"
|
||||
ipld "github.com/ipfs/go-ipld-format"
|
||||
"github.com/ipfs/go-merkledag"
|
||||
"github.com/ipfs/go-unixfs"
|
||||
pb "github.com/ipfs/go-unixfs/pb"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
)
|
||||
|
||||
type BlockVerifier interface {
|
||||
Verify(context.Context, blocks.Block) (internal bool, err error)
|
||||
}
|
||||
|
||||
type OptimisticVerifier struct {
|
||||
}
|
||||
|
||||
func (o *OptimisticVerifier) Verify(context.Context, blocks.Block) (bool, error) {
|
||||
// It's probably fine
|
||||
return false, nil
|
||||
}
|
||||
|
||||
type UnixFs0Verifier struct {
|
||||
Root cid.Cid
|
||||
rootBlk blocks.Block
|
||||
|
||||
expect int
|
||||
seen int
|
||||
|
||||
sub *UnixFs0Verifier
|
||||
}
|
||||
|
||||
func (b *UnixFs0Verifier) verify(ctx context.Context, blk blocks.Block) (last bool, internal bool, err error) {
|
||||
if b.sub != nil {
|
||||
// TODO: check links here (iff b.sub.sub == nil)
|
||||
|
||||
subLast, internal, err := b.sub.verify(ctx, blk)
|
||||
if err != nil {
|
||||
return false, false, err
|
||||
}
|
||||
if subLast {
|
||||
b.sub = nil
|
||||
b.seen++
|
||||
}
|
||||
|
||||
return b.seen == b.expect, internal, nil
|
||||
}
|
||||
|
||||
if b.seen >= b.expect { // this is probably impossible
|
||||
return false, false, xerrors.New("unixfs verifier: too many nodes in level")
|
||||
}
|
||||
|
||||
links, err := b.checkInternal(blk)
|
||||
if err != nil {
|
||||
return false, false, err
|
||||
}
|
||||
|
||||
if links > 0 { // TODO: check if all links are intermediate (or all aren't)
|
||||
if links > build.UnixfsLinksPerLevel {
|
||||
return false, false, xerrors.New("unixfs verifier: too many links in intermediate node")
|
||||
}
|
||||
|
||||
if b.seen+1 == b.expect && links != build.UnixfsLinksPerLevel {
|
||||
return false, false, xerrors.New("unixfs verifier: too few nodes in level")
|
||||
}
|
||||
|
||||
b.sub = &UnixFs0Verifier{
|
||||
Root: blk.Cid(),
|
||||
rootBlk: blk,
|
||||
expect: links,
|
||||
}
|
||||
|
||||
// don't mark as seen yet
|
||||
return false, true, nil
|
||||
}
|
||||
|
||||
b.seen++
|
||||
return b.seen == b.expect, false, nil
|
||||
}
|
||||
|
||||
func (b *UnixFs0Verifier) checkInternal(blk blocks.Block) (int, error) {
|
||||
nd, err := ipld.Decode(blk)
|
||||
if err != nil {
|
||||
log.Warnf("IPLD Decode failed: %s", err)
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// TODO: check size
|
||||
switch nd := nd.(type) {
|
||||
case *merkledag.ProtoNode:
|
||||
fsn, err := unixfs.FSNodeFromBytes(nd.Data())
|
||||
if err != nil {
|
||||
log.Warnf("unixfs.FSNodeFromBytes failed: %s", err)
|
||||
return 0, err
|
||||
}
|
||||
if fsn.Type() != pb.Data_File {
|
||||
return 0, xerrors.New("internal nodes must be a file")
|
||||
}
|
||||
if len(fsn.Data()) > 0 {
|
||||
return 0, xerrors.New("internal node with data")
|
||||
}
|
||||
if len(nd.Links()) == 0 {
|
||||
return 0, xerrors.New("internal node with no links")
|
||||
}
|
||||
return len(nd.Links()), nil
|
||||
|
||||
case *merkledag.RawNode:
|
||||
return 0, nil
|
||||
default:
|
||||
return 0, xerrors.New("verifier: unknown node type")
|
||||
}
|
||||
}
|
||||
|
||||
func (b *UnixFs0Verifier) Verify(ctx context.Context, blk blocks.Block) (bool, error) {
|
||||
// root is special
|
||||
if b.rootBlk == nil {
|
||||
if !b.Root.Equals(blk.Cid()) {
|
||||
return false, xerrors.Errorf("unixfs verifier: root block CID didn't match: valid %s, got %s", b.Root, blk.Cid())
|
||||
}
|
||||
b.rootBlk = blk
|
||||
links, err := b.checkInternal(blk)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
b.expect = links
|
||||
return links != 0, nil
|
||||
}
|
||||
|
||||
_, internal, err := b.verify(ctx, blk)
|
||||
return internal, err
|
||||
}
|
||||
|
||||
var _ BlockVerifier = &OptimisticVerifier{}
|
||||
var _ BlockVerifier = &UnixFs0Verifier{}
|
@ -1,364 +0,0 @@
|
||||
package retrievalmarket
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipld/go-ipld-prime"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
)
|
||||
|
||||
// type aliases
|
||||
// TODO: Remove and use native types or extract for
|
||||
// https://github.com/filecoin-project/go-retrieval-market-project/issues/5
|
||||
|
||||
// BigInt is used for token amounts in retrieval deals
|
||||
type BigInt = types.BigInt
|
||||
|
||||
// Address is an address in the filecoin network
|
||||
type Address = address.Address
|
||||
|
||||
// SignedVoucher is a signed payment voucher
|
||||
type SignedVoucher = types.SignedVoucher
|
||||
|
||||
// ProtocolID is the protocol for proposing / responding to retrieval deals
|
||||
const ProtocolID = "/fil/retrieval/0.0.1"
|
||||
|
||||
// QueryProtocolID is the protocol for querying infromation about retrieval
|
||||
// deal parameters
|
||||
const QueryProtocolID = "/fil/retrieval/qry/0.0.1" // TODO: spec
|
||||
|
||||
// Unsubscribe is a function that unsubscribes a subscriber for either the
|
||||
// client or the provider
|
||||
type Unsubscribe func()
|
||||
|
||||
// ClientDealState is the current state of a deal from the point of view
|
||||
// of a retrieval client
|
||||
type ClientDealState struct {
|
||||
DealProposal
|
||||
Status DealStatus
|
||||
Sender peer.ID
|
||||
TotalReceived uint64
|
||||
FundsSpent BigInt
|
||||
}
|
||||
|
||||
// ClientEvent is an event that occurs in a deal lifecycle on the client
|
||||
type ClientEvent uint64
|
||||
|
||||
const (
|
||||
// ClientEventOpen indicates a deal was initiated
|
||||
ClientEventOpen ClientEvent = iota
|
||||
|
||||
// ClientEventFundsExpended indicates a deal has run out of funds in the payment channel
|
||||
// forcing the client to add more funds to continue the deal
|
||||
ClientEventFundsExpended // when totalFunds is expended
|
||||
|
||||
// ClientEventProgress indicates more data was received for a retrieval
|
||||
ClientEventProgress
|
||||
|
||||
// ClientEventError indicates an error occurred during a deal
|
||||
ClientEventError
|
||||
|
||||
// ClientEventComplete indicates a deal has completed
|
||||
ClientEventComplete
|
||||
)
|
||||
|
||||
// ClientSubscriber is a callback that is registered to listen for retrieval events
|
||||
type ClientSubscriber func(event ClientEvent, state ClientDealState)
|
||||
|
||||
// RetrievalClient is a client interface for making retrieval deals
|
||||
type RetrievalClient interface {
|
||||
// V0
|
||||
|
||||
// Find Providers finds retrieval providers who may be storing a given piece
|
||||
FindProviders(pieceCID []byte) []RetrievalPeer
|
||||
|
||||
// Query asks a provider for information about a piece it is storing
|
||||
Query(
|
||||
ctx context.Context,
|
||||
p RetrievalPeer,
|
||||
pieceCID []byte,
|
||||
params QueryParams,
|
||||
) (QueryResponse, error)
|
||||
|
||||
// Retrieve retrieves all or part of a piece with the given retrieval parameters
|
||||
Retrieve(
|
||||
ctx context.Context,
|
||||
pieceCID []byte,
|
||||
params Params,
|
||||
totalFunds BigInt,
|
||||
miner peer.ID,
|
||||
clientWallet Address,
|
||||
minerWallet Address,
|
||||
) DealID
|
||||
|
||||
// SubscribeToEvents listens for events that happen related to client retrievals
|
||||
SubscribeToEvents(subscriber ClientSubscriber) Unsubscribe
|
||||
|
||||
// V1
|
||||
AddMoreFunds(id DealID, amount BigInt) error
|
||||
CancelDeal(id DealID) error
|
||||
RetrievalStatus(id DealID)
|
||||
ListDeals() map[DealID]ClientDealState
|
||||
}
|
||||
|
||||
// RetrievalClientNode are the node depedencies for a RetrevalClient
|
||||
type RetrievalClientNode interface {
|
||||
|
||||
// GetOrCreatePaymentChannel sets up a new payment channel if one does not exist
|
||||
// between a client and a miner and insures the client has the given amount of funds available in the channel
|
||||
GetOrCreatePaymentChannel(ctx context.Context, clientAddress Address, minerAddress Address, clientFundsAvailable BigInt) (Address, error)
|
||||
|
||||
// Allocate late creates a lane within a payment channel so that calls to
|
||||
// CreatePaymentVoucher will automatically make vouchers only for the difference
|
||||
// in total
|
||||
AllocateLane(paymentChannel Address) (uint64, error)
|
||||
|
||||
// CreatePaymentVoucher creates a new payment voucher in the given lane for a
|
||||
// given payment channel so that all the payment vouchers in the lane add up
|
||||
// to the given amount (so the payment voucher will be for the difference)
|
||||
CreatePaymentVoucher(ctx context.Context, paymentChannel Address, amount BigInt, lane uint64) (*SignedVoucher, error)
|
||||
}
|
||||
|
||||
// ProviderDealState is the current state of a deal from the point of view
|
||||
// of a retrieval provider
|
||||
type ProviderDealState struct {
|
||||
DealProposal
|
||||
Status DealStatus
|
||||
Receiver peer.ID
|
||||
TotalSent uint64
|
||||
FundsReceived BigInt
|
||||
}
|
||||
|
||||
// ProviderEvent is an event that occurs in a deal lifecycle on the provider
|
||||
type ProviderEvent uint64
|
||||
|
||||
const (
|
||||
|
||||
// ProviderEventOpen indicates a new deal was received from a client
|
||||
ProviderEventOpen ProviderEvent = iota
|
||||
|
||||
// ProviderEventProgress indicates more data was sent to a client
|
||||
ProviderEventProgress
|
||||
|
||||
// ProviderEventError indicates an error occurred in processing a deal for a client
|
||||
ProviderEventError
|
||||
|
||||
// ProviderEventComplete indicates a retrieval deal was completed for a client
|
||||
ProviderEventComplete
|
||||
)
|
||||
|
||||
// ProviderDealID is a unique identifier for a deal on a provider -- it is
|
||||
// a combination of DealID set by the client and the peer ID of the client
|
||||
type ProviderDealID struct {
|
||||
From peer.ID
|
||||
ID DealID
|
||||
}
|
||||
|
||||
// ProviderSubscriber is a callback that is registered to listen for retrieval events on a provider
|
||||
type ProviderSubscriber func(event ProviderEvent, state ProviderDealState)
|
||||
|
||||
// RetrievalProvider is an interface by which a provider configures their
|
||||
// retrieval operations and monitors deals received and process
|
||||
type RetrievalProvider interface {
|
||||
// Start begins listening for deals on the given host
|
||||
Start(host.Host)
|
||||
|
||||
// V0
|
||||
|
||||
// SetPricePerByte sets the price per byte a miner charges for retrievals
|
||||
SetPricePerByte(price BigInt)
|
||||
|
||||
// SetPaymentInterval sets the maximum number of bytes a a provider will send before
|
||||
// requesting further payment, and the rate at which that value increases
|
||||
SetPaymentInterval(paymentInterval uint64, paymentIntervalIncrease uint64)
|
||||
|
||||
// SubscribeToEvents listens for events that happen related to client retrievals
|
||||
SubscribeToEvents(subscriber ProviderSubscriber) Unsubscribe
|
||||
|
||||
// V1
|
||||
SetPricePerUnseal(price BigInt)
|
||||
ListDeals() map[ProviderDealID]ProviderDealState
|
||||
}
|
||||
|
||||
// RetrievalProviderNode are the node depedencies for a RetrevalProvider
|
||||
type RetrievalProviderNode interface {
|
||||
SavePaymentVoucher(ctx context.Context, paymentChannel address.Address, voucher *SignedVoucher, proof []byte, expectedAmount BigInt) (BigInt, error)
|
||||
}
|
||||
|
||||
// PeerResolver is an interface for looking up providers that may have a piece
|
||||
type PeerResolver interface {
|
||||
GetPeers(data cid.Cid) ([]RetrievalPeer, error) // TODO: channel
|
||||
}
|
||||
|
||||
// RetrievalPeer is a provider address/peer.ID pair (everything needed to make
|
||||
// deals for with a miner)
|
||||
type RetrievalPeer struct {
|
||||
Address Address
|
||||
ID peer.ID // optional
|
||||
}
|
||||
|
||||
// QueryResponseStatus indicates whether a queried piece is available
|
||||
type QueryResponseStatus uint64
|
||||
|
||||
const (
|
||||
// QueryResponseAvailable indicates a provider has a piece and is prepared to
|
||||
// return it
|
||||
QueryResponseAvailable QueryResponseStatus = iota
|
||||
|
||||
// QueryResponseUnavailable indicates a provider either does not have or cannot
|
||||
// serve the queried piece to the client
|
||||
QueryResponseUnavailable
|
||||
)
|
||||
|
||||
// QueryItemStatus (V1) indicates whether the requested part of a piece (payload or selector)
|
||||
// is available for retrieval
|
||||
type QueryItemStatus uint64
|
||||
|
||||
const (
|
||||
// QueryItemAvailable indicates requested part of the piece is available to be
|
||||
// served
|
||||
QueryItemAvailable QueryItemStatus = iota
|
||||
|
||||
// QueryItemUnavailable indicates the piece either does not contain the requested
|
||||
// item or it cannot be served
|
||||
QueryItemUnavailable
|
||||
|
||||
// QueryItemUnknown indicates the provider cannot determine if the given item
|
||||
// is part of the requested piece (for example, if the piece is sealed and the
|
||||
// miner does not maintain a payload CID index)
|
||||
QueryItemUnknown
|
||||
)
|
||||
|
||||
// QueryParams indicate what specific information about a piece that a retrieval
|
||||
// client is interested in, as well as specific parameters the client is seeking
|
||||
// for the retrieval deal
|
||||
type QueryParams struct {
|
||||
PayloadCID cid.Cid // optional, query if miner has this cid in this piece. some miners may not be able to respond.
|
||||
Selector ipld.Node // optional, query if miner has this cid in this piece. some miners may not be able to respond.
|
||||
MaxPricePerByte BigInt // optional, tell miner uninterested if more expensive than this
|
||||
MinPaymentInterval uint64 // optional, tell miner uninterested unless payment interval is greater than this
|
||||
MinPaymentIntervalIncrease uint64 // optional, tell miner uninterested unless payment interval increase is greater than this
|
||||
}
|
||||
|
||||
// Query is a query to a given provider to determine information about a piece
|
||||
// they may have available for retrieval
|
||||
type Query struct {
|
||||
PieceCID []byte // V0
|
||||
// QueryParams // V1
|
||||
}
|
||||
|
||||
// QueryResponse is a miners response to a given retrieval query
|
||||
type QueryResponse struct {
|
||||
Status QueryResponseStatus
|
||||
//PayloadCIDFound QueryItemStatus // V1 - if a PayloadCid was requested, the result
|
||||
//SelectorFound QueryItemStatus // V1 - if a Selector was requested, the result
|
||||
|
||||
Size uint64 // Total size of piece in bytes
|
||||
//ExpectedPayloadSize uint64 // V1 - optional, if PayloadCID + selector are specified and miner knows, can offer an expected size
|
||||
|
||||
PaymentAddress Address // address to send funds to -- may be different than miner addr
|
||||
MinPricePerByte BigInt
|
||||
MaxPaymentInterval uint64
|
||||
MaxPaymentIntervalIncrease uint64
|
||||
}
|
||||
|
||||
// QueryResponseUndefined is an empty QueryResponse
|
||||
var QueryResponseUndefined = QueryResponse{}
|
||||
|
||||
// PieceRetrievalPrice is the total price to retrieve the piece (size * MinPricePerByte)
|
||||
func (qr QueryResponse) PieceRetrievalPrice() BigInt {
|
||||
return types.BigMul(qr.MinPricePerByte, types.NewInt(qr.Size))
|
||||
}
|
||||
|
||||
// PayloadRetrievalPrice is the expected price to retrieve just the given payload
|
||||
// & selector (V1)
|
||||
//func (qr QueryResponse) PayloadRetrievalPrice() BigInt {
|
||||
// return types.BigMul(qr.MinPricePerByte, types.NewInt(qr.ExpectedPayloadSize))
|
||||
//}
|
||||
|
||||
// DealStatus is the status of a retrieval deal returned by a provider
|
||||
// in a DealResponse
|
||||
type DealStatus uint64
|
||||
|
||||
const (
|
||||
// DealStatusAccepted means a deal has been accepted by a provider
|
||||
// and its is ready to proceed with retrieval
|
||||
DealStatusAccepted DealStatus = iota
|
||||
|
||||
// DealStatusFailed indicates something went wrong during a retrieval
|
||||
DealStatusFailed
|
||||
|
||||
// DealStatusRejected indicates the provider rejected a client's deal proposal
|
||||
// for some reason
|
||||
DealStatusRejected
|
||||
|
||||
// DealStatusUnsealing indicates the provider is currently unsealing the sector
|
||||
// needed to serve the retrieval deal
|
||||
DealStatusUnsealing
|
||||
|
||||
// DealStatusFundsNeeded indicates the provider is awaiting a payment voucher to
|
||||
// continue processing the deal
|
||||
DealStatusFundsNeeded
|
||||
|
||||
// DealStatusOngoing indicates the provider is continuing to process a deal
|
||||
DealStatusOngoing
|
||||
|
||||
// DealStatusFundsNeededLastPayment indicates the provider is awaiting funds for
|
||||
// a final payment in order to complete a deal
|
||||
DealStatusFundsNeededLastPayment
|
||||
|
||||
// DealStatusCompleted indicates a deal is complete
|
||||
DealStatusCompleted
|
||||
|
||||
// DealStatusDealNotFound indicates an update was received for a deal that could
|
||||
// not be identified
|
||||
DealStatusDealNotFound
|
||||
)
|
||||
|
||||
// Params are the parameters requested for a retrieval deal proposal
|
||||
type Params struct {
|
||||
//PayloadCID cid.Cid // V1
|
||||
//Selector ipld.Node // V1
|
||||
PricePerByte BigInt
|
||||
PaymentInterval uint64
|
||||
PaymentIntervalIncrease uint64
|
||||
}
|
||||
|
||||
// DealID is an identifier for a retrieval deal (unique to a client)
|
||||
type DealID uint64
|
||||
|
||||
// DealProposal is a proposal for a new retrieval deal
|
||||
type DealProposal struct {
|
||||
PieceCID []byte
|
||||
ID DealID
|
||||
Params
|
||||
}
|
||||
|
||||
// Block is an IPLD block in bitswap format
|
||||
type Block struct {
|
||||
Prefix []byte
|
||||
Data []byte
|
||||
}
|
||||
|
||||
// DealResponse is a response to a retrieval deal proposal
|
||||
type DealResponse struct {
|
||||
Status DealStatus
|
||||
ID DealID
|
||||
|
||||
// payment required to proceed
|
||||
PaymentOwed BigInt
|
||||
|
||||
Message string
|
||||
Blocks []Block // V0 only
|
||||
}
|
||||
|
||||
// DealPayment is a payment for an in progress retrieval deal
|
||||
type DealPayment struct {
|
||||
ID DealID
|
||||
PaymentChannel address.Address
|
||||
PaymentVoucher *types.SignedVoucher
|
||||
}
|
@ -3,9 +3,13 @@ package retrievaladapter
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-fil-components/retrievalmarket"
|
||||
retrievaltoken "github.com/filecoin-project/go-fil-components/shared/tokenamount"
|
||||
retrievaltypes "github.com/filecoin-project/go-fil-components/shared/types"
|
||||
|
||||
payapi "github.com/filecoin-project/lotus/node/impl/paych"
|
||||
"github.com/filecoin-project/lotus/paych"
|
||||
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
|
||||
)
|
||||
|
||||
type retrievalClientNode struct {
|
||||
@ -21,21 +25,25 @@ func NewRetrievalClientNode(pmgr *paych.Manager, payapi payapi.PaychAPI) retriev
|
||||
|
||||
// GetOrCreatePaymentChannel sets up a new payment channel if one does not exist
|
||||
// between a client and a miner and insures the client has the given amount of funds available in the channel
|
||||
func (rcn *retrievalClientNode) GetOrCreatePaymentChannel(ctx context.Context, clientAddress retrievalmarket.Address, minerAddress retrievalmarket.Address, clientFundsAvailable retrievalmarket.BigInt) (retrievalmarket.Address, error) {
|
||||
paych, _, err := rcn.pmgr.GetPaych(ctx, clientAddress, minerAddress, clientFundsAvailable)
|
||||
func (rcn *retrievalClientNode) GetOrCreatePaymentChannel(ctx context.Context, clientAddress address.Address, minerAddress address.Address, clientFundsAvailable retrievaltoken.TokenAmount) (address.Address, error) {
|
||||
paych, _, err := rcn.pmgr.GetPaych(ctx, clientAddress, minerAddress, FromSharedTokenAmount(clientFundsAvailable))
|
||||
return paych, err
|
||||
}
|
||||
|
||||
// Allocate late creates a lane within a payment channel so that calls to
|
||||
// CreatePaymentVoucher will automatically make vouchers only for the difference
|
||||
// in total
|
||||
func (rcn *retrievalClientNode) AllocateLane(paymentChannel retrievalmarket.Address) (uint64, error) {
|
||||
func (rcn *retrievalClientNode) AllocateLane(paymentChannel address.Address) (uint64, error) {
|
||||
return rcn.pmgr.AllocateLane(paymentChannel)
|
||||
}
|
||||
|
||||
// CreatePaymentVoucher creates a new payment voucher in the given lane for a
|
||||
// given payment channel so that all the payment vouchers in the lane add up
|
||||
// to the given amount (so the payment voucher will be for the difference)
|
||||
func (rcn *retrievalClientNode) CreatePaymentVoucher(ctx context.Context, paymentChannel retrievalmarket.Address, amount retrievalmarket.BigInt, lane uint64) (*retrievalmarket.SignedVoucher, error) {
|
||||
return rcn.payapi.PaychVoucherCreate(ctx, paymentChannel, amount, lane)
|
||||
func (rcn *retrievalClientNode) CreatePaymentVoucher(ctx context.Context, paymentChannel address.Address, amount retrievaltoken.TokenAmount, lane uint64) (*retrievaltypes.SignedVoucher, error) {
|
||||
voucher, err := rcn.payapi.PaychVoucherCreate(ctx, paymentChannel, FromSharedTokenAmount(amount), lane)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ToSharedSignedVoucher(voucher)
|
||||
}
|
||||
|
45
retrievaladapter/converters.go
Normal file
45
retrievaladapter/converters.go
Normal file
@ -0,0 +1,45 @@
|
||||
package retrievaladapter
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
sharedamount "github.com/filecoin-project/go-fil-components/shared/tokenamount"
|
||||
sharedtypes "github.com/filecoin-project/go-fil-components/shared/types"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
)
|
||||
|
||||
|
||||
func FromSharedTokenAmount(in sharedamount.TokenAmount) types.BigInt {
|
||||
return types.BigInt{Int: in.Int}
|
||||
}
|
||||
|
||||
func ToSharedTokenAmount(in types.BigInt) sharedamount.TokenAmount {
|
||||
return sharedamount.TokenAmount{Int: in.Int}
|
||||
}
|
||||
|
||||
func ToSharedSignedVoucher(in *types.SignedVoucher) (*sharedtypes.SignedVoucher, error) {
|
||||
var encoded bytes.Buffer
|
||||
err := in.MarshalCBOR(&encoded)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var out sharedtypes.SignedVoucher
|
||||
err = out.UnmarshalCBOR(&encoded)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &out, nil
|
||||
}
|
||||
|
||||
func FromSharedSignedVoucher(in *sharedtypes.SignedVoucher) (*types.SignedVoucher, error) {
|
||||
var encoded bytes.Buffer
|
||||
err := in.MarshalCBOR(&encoded)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var out types.SignedVoucher
|
||||
err = out.UnmarshalCBOR(&encoded)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &out, nil
|
||||
}
|
@ -4,20 +4,43 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-fil-components/retrievalmarket"
|
||||
retrievaltoken "github.com/filecoin-project/go-fil-components/shared/tokenamount"
|
||||
retrievaltypes "github.com/filecoin-project/go-fil-components/shared/types"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
retrievalmarket "github.com/filecoin-project/lotus/retrieval"
|
||||
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||
"github.com/ipfs/go-cid"
|
||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
)
|
||||
|
||||
type retrievalProviderNode struct {
|
||||
sectorBlocks *sectorblocks.SectorBlocks
|
||||
full api.FullNode
|
||||
}
|
||||
|
||||
// NewRetrievalProviderNode returns a new node adapter for a retrieval provider that talks to the
|
||||
// Lotus Node
|
||||
func NewRetrievalProviderNode(full api.FullNode) retrievalmarket.RetrievalProviderNode {
|
||||
return &retrievalProviderNode{full}
|
||||
func NewRetrievalProviderNode(sectorBlocks *sectorblocks.SectorBlocks, full api.FullNode) retrievalmarket.RetrievalProviderNode {
|
||||
return &retrievalProviderNode{sectorBlocks, full}
|
||||
}
|
||||
|
||||
func (rpn *retrievalProviderNode) SavePaymentVoucher(ctx context.Context, paymentChannel address.Address, voucher *retrievalmarket.SignedVoucher, proof []byte, expectedAmount retrievalmarket.BigInt) (retrievalmarket.BigInt, error) {
|
||||
return rpn.full.PaychVoucherAdd(ctx, paymentChannel, voucher, proof, expectedAmount)
|
||||
func (rpn *retrievalProviderNode) GetPieceSize(pieceCid []byte) (uint64, error) {
|
||||
asCid, err := cid.Cast(pieceCid)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return rpn.sectorBlocks.GetSize(asCid)
|
||||
}
|
||||
|
||||
func (rpn *retrievalProviderNode) SealedBlockstore(approveUnseal func() error) blockstore.Blockstore {
|
||||
return rpn.sectorBlocks.SealedBlockstore(approveUnseal)
|
||||
}
|
||||
|
||||
func (rpn *retrievalProviderNode) SavePaymentVoucher(ctx context.Context, paymentChannel address.Address, voucher *retrievaltypes.SignedVoucher, proof []byte, expectedAmount retrievaltoken.TokenAmount) (retrievaltoken.TokenAmount, error) {
|
||||
localVoucher, err := FromSharedSignedVoucher(voucher)
|
||||
if err != nil {
|
||||
return retrievaltoken.FromInt(0), err
|
||||
}
|
||||
added, err := rpn.full.PaychVoucherAdd(ctx, paymentChannel, localVoucher, proof, FromSharedTokenAmount(expectedAmount))
|
||||
return ToSharedTokenAmount(added), err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user