diff --git a/chain/deals/client.go b/chain/deals/client.go index 3af97315f..4e1d3df4a 100644 --- a/chain/deals/client.go +++ b/chain/deals/client.go @@ -23,7 +23,7 @@ import ( "github.com/filecoin-project/lotus/chain/wallet" "github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/modules/dtypes" - + retrievalmarket "github.com/filecoin-project/lotus/retrieval" "github.com/filecoin-project/lotus/retrieval/discovery" ) @@ -252,7 +252,7 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro c.incoming <- deal - return deal.ProposalCid, c.discovery.AddPeer(p.Data, discovery.RetrievalPeer{ + return deal.ProposalCid, c.discovery.AddPeer(p.Data, retrievalmarket.RetrievalPeer{ Address: dealProposal.Provider, ID: deal.Miner, }) diff --git a/gen/main.go b/gen/main.go index 52fcf4665..4f90dc236 100644 --- a/gen/main.go +++ b/gen/main.go @@ -12,7 +12,6 @@ import ( "github.com/filecoin-project/lotus/chain/deals" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/paych" - "github.com/filecoin-project/lotus/retrieval" "github.com/filecoin-project/lotus/storage" ) @@ -59,21 +58,6 @@ func main() { os.Exit(1) } - err = gen.WriteTupleEncodersToFile("./retrieval/cbor_gen.go", "retrieval", - retrieval.RetParams{}, - - retrieval.Query{}, - retrieval.QueryResponse{}, - retrieval.Unixfs0Offer{}, - retrieval.DealProposal{}, - retrieval.DealResponse{}, - retrieval.Block{}, - ) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - err = gen.WriteTupleEncodersToFile("./chain/blocksync/cbor_gen.go", "blocksync", blocksync.BlockSyncRequest{}, blocksync.BlockSyncResponse{}, diff --git a/node/builder.go b/node/builder.go index ee15f8e70..76e9289fa 100644 --- a/node/builder.go +++ b/node/builder.go @@ -43,7 +43,7 @@ import ( "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/paych" "github.com/filecoin-project/lotus/peermgr" - "github.com/filecoin-project/lotus/retrieval" + retrievalmarket "github.com/filecoin-project/lotus/retrieval" "github.com/filecoin-project/lotus/retrieval/discovery" "github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage/sectorblocks" @@ -221,9 +221,9 @@ func Online() Option { Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks), Override(new(*discovery.Local), discovery.NewLocal), - Override(new(discovery.PeerResolver), modules.RetrievalResolver), + Override(new(retrievalmarket.PeerResolver), modules.RetrievalResolver), - Override(new(*retrieval.Client), retrieval.NewClient), + Override(new(retrievalmarket.RetrievalClient), modules.RetrievalClient), Override(new(dtypes.ClientDealStore), modules.NewClientDealStore), Override(new(dtypes.ClientDataTransfer), modules.NewClientDAGServiceDataTransfer), Override(new(*deals.ClientRequestValidator), deals.NewClientRequestValidator), @@ -246,7 +246,7 @@ func Online() Option { Override(new(dtypes.StagingBlockstore), modules.StagingBlockstore), Override(new(dtypes.StagingDAG), modules.StagingDAG), Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync), - Override(new(*retrieval.Miner), retrieval.NewMiner), + Override(new(retrievalmarket.RetrievalProvider), modules.RetrievalProvider), Override(new(dtypes.ProviderDealStore), modules.NewProviderDealStore), Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDAGServiceDataTransfer), Override(new(*deals.ProviderRequestValidator), deals.NewProviderRequestValidator), diff --git a/node/impl/client/client.go b/node/impl/client/client.go index a0c420e7e..57df600bd 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -1,6 +1,7 @@ package client import ( + "bytes" "context" "errors" "io" @@ -17,6 +18,7 @@ import ( files "github.com/ipfs/go-ipfs-files" ipld "github.com/ipfs/go-ipld-format" "github.com/ipfs/go-merkledag" + unixfile "github.com/ipfs/go-unixfs/file" "github.com/ipfs/go-unixfs/importer/balanced" ihelper "github.com/ipfs/go-unixfs/importer/helpers" "github.com/libp2p/go-libp2p-core/peer" @@ -31,8 +33,7 @@ import ( "github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/impl/paych" "github.com/filecoin-project/lotus/node/modules/dtypes" - "github.com/filecoin-project/lotus/retrieval" - "github.com/filecoin-project/lotus/retrieval/discovery" + retrievalmarket "github.com/filecoin-project/lotus/retrieval" ) type API struct { @@ -44,8 +45,8 @@ type API struct { paych.PaychAPI DealClient *deals.Client - RetDiscovery discovery.PeerResolver - Retrieval *retrieval.Client + RetDiscovery retrievalmarket.PeerResolver + Retrieval retrievalmarket.RetrievalClient Chain *store.ChainStore LocalDAG dtypes.ClientDAG @@ -153,7 +154,18 @@ func (a *API) ClientFindData(ctx context.Context, root cid.Cid) ([]api.QueryOffe out := make([]api.QueryOffer, len(peers)) for k, p := range peers { - out[k] = a.Retrieval.Query(ctx, p, root) + queryResponse, err := a.Retrieval.Query(ctx, p, root.Bytes(), retrievalmarket.QueryParams{}) + if err != nil { + out[k] = api.QueryOffer{Err: err.Error(), Miner: p.Address, MinerPeerID: p.ID} + } else { + out[k] = api.QueryOffer{ + Root: root, + Size: queryResponse.Size, + MinPrice: queryResponse.PieceRetrievalPrice(), + Miner: p.Address, // TODO: check + MinerPeerID: p.ID, + } + } } return out, nil @@ -263,18 +275,43 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, path order.MinerPeerID = pid } - outFile, err := os.OpenFile(path, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0777) - if err != nil { - return err + retrievalResult := make(chan error, 1) + + unsubscribe := a.Retrieval.SubscribeToEvents(func(event retrievalmarket.ClientEvent, state retrievalmarket.ClientDealState) { + if bytes.Equal(state.PieceCID, order.Root.Bytes()) { + switch event { + case retrievalmarket.ClientEventError: + retrievalResult <- xerrors.New("Retrieval Error") + case retrievalmarket.ClientEventComplete: + retrievalResult <- nil + } + } + }) + + a.Retrieval.Retrieve( + ctx, order.Root.Bytes(), retrievalmarket.Params{ + PricePerByte: types.BigDiv(order.Total, types.NewInt(order.Size)), + }, order.Total, order.MinerPeerID, order.Client, order.Miner) + select { + case <-ctx.Done(): + return xerrors.New("Retrieval Timed Out") + case err := <-retrievalResult: + if err != nil { + return xerrors.Errorf("RetrieveUnixfs: %w", err) + } } - err = a.Retrieval.RetrieveUnixfs(ctx, order.Root, order.Size, order.Total, order.MinerPeerID, order.Client, order.Miner, outFile) - if err != nil { - _ = outFile.Close() - return xerrors.Errorf("RetrieveUnixfs: %w", err) - } + unsubscribe() - return outFile.Close() + nd, err := a.LocalDAG.Get(ctx, order.Root) + if err != nil { + return xerrors.Errorf("ClientRetrieve: %w", err) + } + file, err := unixfile.NewUnixfsFile(ctx, a.LocalDAG, nd) + if err != nil { + return xerrors.Errorf("ClientRetrieve: %w", err) + } + return files.WriteTo(file, path) } func (a *API) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*types.SignedStorageAsk, error) { diff --git a/node/modules/client.go b/node/modules/client.go index f01bbf8c3..6047df7af 100644 --- a/node/modules/client.go +++ b/node/modules/client.go @@ -2,11 +2,15 @@ package modules import ( "context" + "github.com/filecoin-project/lotus/retrievaladapter" "path/filepath" "reflect" "github.com/filecoin-project/go-statestore" "github.com/filecoin-project/lotus/node/modules/helpers" + "github.com/filecoin-project/lotus/paych" + retrievalmarket "github.com/filecoin-project/lotus/retrieval" + retrievalimpl "github.com/filecoin-project/lotus/retrieval/impl" "github.com/ipfs/go-bitswap" "github.com/ipfs/go-bitswap/network" graphsync "github.com/ipfs/go-graphsync/impl" @@ -26,6 +30,7 @@ import ( "github.com/filecoin-project/go-data-transfer/impl/graphsync" "github.com/filecoin-project/lotus/chain/deals" + payapi "github.com/filecoin-project/lotus/node/impl/paych" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/repo" ) @@ -97,3 +102,9 @@ func ClientGraphsync(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.Client return gs } + +// RetrievalClient creates a new retrieval client attached to the client blockstore +func RetrievalClient(h host.Host, bs dtypes.ClientBlockstore, pmgr *paych.Manager, payapi payapi.PaychAPI) retrievalmarket.RetrievalClient { + adapter := retrievaladapter.NewRetrievalClientNode(pmgr, payapi) + return retrievalimpl.NewClient(h, bs, adapter) +} diff --git a/node/modules/services.go b/node/modules/services.go index 92b6264e8..9949d0d69 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -16,6 +16,7 @@ import ( "github.com/filecoin-project/lotus/node/hello" "github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/peermgr" + retrievalmarket "github.com/filecoin-project/lotus/retrieval" "github.com/filecoin-project/lotus/retrieval/discovery" ) @@ -80,6 +81,6 @@ func RunDealClient(mctx helpers.MetricsCtx, lc fx.Lifecycle, c *deals.Client) { }) } -func RetrievalResolver(l *discovery.Local) discovery.PeerResolver { +func RetrievalResolver(l *discovery.Local) retrievalmarket.PeerResolver { return discovery.Multi(l) } diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 9725d3440..37902dd08 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -35,8 +35,11 @@ import ( "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/repo" - "github.com/filecoin-project/lotus/retrieval" + retrievalmarket "github.com/filecoin-project/lotus/retrieval" + retrievalimpl "github.com/filecoin-project/lotus/retrieval/impl" + "github.com/filecoin-project/lotus/retrievaladapter" "github.com/filecoin-project/lotus/storage" + "github.com/filecoin-project/lotus/storage/sectorblocks" ) func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) { @@ -115,11 +118,10 @@ func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h return sm, nil } -func HandleRetrieval(host host.Host, lc fx.Lifecycle, m *retrieval.Miner) { +func HandleRetrieval(host host.Host, lc fx.Lifecycle, m retrievalmarket.RetrievalProvider) { lc.Append(fx.Hook{ OnStart: func(context.Context) error { - host.SetStreamHandler(retrieval.QueryProtocolID, m.HandleQueryStream) - host.SetStreamHandler(retrieval.ProtocolID, m.HandleDealStream) + m.Start(host) return nil }, }) @@ -261,3 +263,9 @@ func SealTicketGen(api api.FullNode) storage.TicketFn { }, nil } } + +// RetrievalProvider creates a new retrieval provider attached to the provider blockstore +func RetrievalProvider(sblks *sectorblocks.SectorBlocks, full api.FullNode) retrievalmarket.RetrievalProvider { + adapter := retrievaladapter.NewRetrievalProviderNode(full) + return retrievalimpl.NewProvider(sblks, adapter) +} diff --git a/retrieval/cbor-gen/main.go b/retrieval/cbor-gen/main.go new file mode 100644 index 000000000..cc3fe9887 --- /dev/null +++ b/retrieval/cbor-gen/main.go @@ -0,0 +1,20 @@ +package main + +import ( + "fmt" + "os" + + retrievalimpl "github.com/filecoin-project/lotus/retrieval/impl" +) + +// main func has ONE JOB +func main() { + fmt.Print("Generating Cbor Marshal/Unmarshal...") + + if err := retrievalimpl.RunCborGen(); err != nil { + fmt.Println("Failed: ") + fmt.Println(err) + os.Exit(1) + } + fmt.Println("Done.") +} diff --git a/retrieval/client.go b/retrieval/client.go deleted file mode 100644 index 496781222..000000000 --- a/retrieval/client.go +++ /dev/null @@ -1,272 +0,0 @@ -package retrieval - -import ( - "context" - "io" - - blocks "github.com/ipfs/go-block-format" - "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log/v2" - "github.com/libp2p/go-libp2p-core/host" - "github.com/libp2p/go-libp2p-core/network" - "github.com/libp2p/go-libp2p-core/peer" - cbg "github.com/whyrusleeping/cbor-gen" - "golang.org/x/xerrors" - - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-cbor-util" - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/types" - payapi "github.com/filecoin-project/lotus/node/impl/paych" - "github.com/filecoin-project/lotus/paych" - "github.com/filecoin-project/lotus/retrieval/discovery" -) - -var log = logging.Logger("retrieval") - -type Client struct { - h host.Host - - pmgr *paych.Manager - payapi payapi.PaychAPI -} - -func NewClient(h host.Host, pmgr *paych.Manager, payapi payapi.PaychAPI) *Client { - return &Client{h: h, pmgr: pmgr, payapi: payapi} -} - -func (c *Client) Query(ctx context.Context, p discovery.RetrievalPeer, data cid.Cid) api.QueryOffer { - s, err := c.h.NewStream(ctx, p.ID, QueryProtocolID) - if err != nil { - log.Warn(err) - return api.QueryOffer{Err: err.Error(), Miner: p.Address, MinerPeerID: p.ID} - } - defer s.Close() - - err = cborutil.WriteCborRPC(s, &Query{ - Piece: data, - }) - if err != nil { - log.Warn(err) - return api.QueryOffer{Err: err.Error(), Miner: p.Address, MinerPeerID: p.ID} - } - - var resp QueryResponse - if err := resp.UnmarshalCBOR(s); err != nil { - log.Warn(err) - return api.QueryOffer{Err: err.Error(), Miner: p.Address, MinerPeerID: p.ID} - } - - return api.QueryOffer{ - Root: data, - Size: resp.Size, - MinPrice: resp.MinPrice, - Miner: p.Address, // TODO: check - MinerPeerID: p.ID, - } -} - -type clientStream struct { - payapi payapi.PaychAPI - stream network.Stream - peeker cbg.BytePeeker - - root cid.Cid - size types.BigInt - offset uint64 - - paych address.Address - lane uint64 - total types.BigInt - transferred types.BigInt - - windowSize uint64 // how much we "trust" the peer - verifier BlockVerifier -} - -// C > S -// -// Offset MUST be aligned on chunking boundaries, size is rounded up to leaf size -// -// > DealProposal{Mode: Unixfs0, RootCid, Offset, Size, Payment(nil if free)} -// < Resp{Accept} -// < ..(Intermediate Block) -// < ..Blocks -// < ..(Intermediate Block) -// < ..Blocks -// > DealProposal(...) -// < ... -func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, total types.BigInt, miner peer.ID, client, minerAddr address.Address, out io.Writer) error { - s, err := c.h.NewStream(ctx, miner, ProtocolID) - if err != nil { - return xerrors.Errorf("failed to open stream to miner for retrieval query: %w", err) - } - defer s.Close() - - initialOffset := uint64(0) // TODO: Check how much data we have locally - // TODO: Support in handler - // TODO: Allow client to specify this - - paych, _, err := c.pmgr.GetPaych(ctx, client, minerAddr, total) - if err != nil { - return xerrors.Errorf("getting payment channel: %w", err) - } - lane, err := c.pmgr.AllocateLane(paych) - if err != nil { - return xerrors.Errorf("allocating payment lane: %w", err) - } - - cst := clientStream{ - payapi: c.payapi, - stream: s, - peeker: cbg.GetPeeker(s), - - root: root, - size: types.NewInt(size), - offset: initialOffset, - - paych: paych, - lane: lane, - total: total, - transferred: types.NewInt(0), - - windowSize: build.UnixfsChunkSize, - verifier: &UnixFs0Verifier{Root: root}, - } - - for cst.offset != size+initialOffset { - toFetch := cst.windowSize - if toFetch+cst.offset > size { - toFetch = size - cst.offset - } - log.Infof("Retrieve %dB @%d", toFetch, cst.offset) - - err := cst.doOneExchange(ctx, toFetch, out) - if err != nil { - return xerrors.Errorf("retrieval exchange: %w", err) - } - - cst.offset += toFetch - } - return nil -} - -func (cst *clientStream) doOneExchange(ctx context.Context, toFetch uint64, out io.Writer) error { - payAmount := types.BigDiv(types.BigMul(cst.total, types.NewInt(toFetch)), cst.size) - - payment, err := cst.setupPayment(ctx, payAmount) - if err != nil { - return xerrors.Errorf("setting up retrieval payment: %w", err) - } - - deal := &DealProposal{ - Payment: payment, - Ref: cst.root, - Params: RetParams{ - Unixfs0: &Unixfs0Offer{ - Offset: cst.offset, - Size: toFetch, - }, - }, - } - - if err := cborutil.WriteCborRPC(cst.stream, deal); err != nil { - return xerrors.Errorf("sending incremental retrieval request: %w", err) - } - - var resp DealResponse - if err := cborutil.ReadCborRPC(cst.peeker, &resp); err != nil { - return xerrors.Errorf("reading retrieval response: %w", err) - } - - if resp.Status != Accepted { - cst.windowSize = build.UnixfsChunkSize - // TODO: apply some 'penalty' to miner 'reputation' (needs to be the same in both cases) - - if resp.Status == Error { - return xerrors.Errorf("storage deal error: %s", resp.Message) - } - if resp.Status == Rejected { - return xerrors.Errorf("storage deal rejected: %s", resp.Message) - } - return xerrors.New("storage deal response had no Accepted section") - } - - log.Info("Retrieval accepted, fetching blocks") - - return cst.fetchBlocks(toFetch, out) - - // TODO: maybe increase miner window size after success -} - -func (cst *clientStream) fetchBlocks(toFetch uint64, out io.Writer) error { - blocksToFetch := (toFetch + build.UnixfsChunkSize - 1) / build.UnixfsChunkSize - - for i := uint64(0); i < blocksToFetch; { - log.Infof("block %d of %d", i+1, blocksToFetch) - - var block Block - if err := cborutil.ReadCborRPC(cst.peeker, &block); err != nil { - return xerrors.Errorf("reading fetchBlock response: %w", err) - } - - dataBlocks, err := cst.consumeBlockMessage(block, out) - if err != nil { - return xerrors.Errorf("consuming retrieved blocks: %w", err) - } - - i += dataBlocks - } - - return nil -} - -func (cst *clientStream) consumeBlockMessage(block Block, out io.Writer) (uint64, error) { - prefix, err := cid.PrefixFromBytes(block.Prefix) - if err != nil { - return 0, err - } - - cid, err := prefix.Sum(block.Data) - if err != nil { - return 0, err - } - - blk, err := blocks.NewBlockWithCid(block.Data, cid) - if err != nil { - return 0, err - } - - internal, err := cst.verifier.Verify(context.TODO(), blk, out) - if err != nil { - log.Warnf("block verify failed: %s", err) - return 0, err - } - - // TODO: Smarter out, maybe add to filestore automagically - // (Also, persist intermediate nodes) - - if internal { - return 0, nil - } - - return 1, nil -} - -func (cst *clientStream) setupPayment(ctx context.Context, toSend types.BigInt) (api.PaymentInfo, error) { - amount := types.BigAdd(cst.transferred, toSend) - - sv, err := cst.payapi.PaychVoucherCreate(ctx, cst.paych, amount, cst.lane) - if err != nil { - return api.PaymentInfo{}, err - } - - cst.transferred = amount - - return api.PaymentInfo{ - Channel: cst.paych, - ChannelMessage: nil, - Vouchers: []*types.SignedVoucher{sv}, - }, nil -} diff --git a/retrieval/discovery/discovery.go b/retrieval/discovery/discovery.go index 9933ad793..93da56b60 100644 --- a/retrieval/discovery/discovery.go +++ b/retrieval/discovery/discovery.go @@ -1,25 +1,15 @@ package discovery import ( - "github.com/filecoin-project/go-address" - "github.com/ipfs/go-cid" cbor "github.com/ipfs/go-ipld-cbor" - "github.com/libp2p/go-libp2p-core/peer" + + retrievalmarket "github.com/filecoin-project/lotus/retrieval" ) func init() { - cbor.RegisterCborType(RetrievalPeer{}) + cbor.RegisterCborType(retrievalmarket.RetrievalPeer{}) } -type RetrievalPeer struct { - Address address.Address - ID peer.ID // optional -} - -type PeerResolver interface { - GetPeers(data cid.Cid) ([]RetrievalPeer, error) // TODO: channel -} - -func Multi(r PeerResolver) PeerResolver { // TODO: actually support multiple mechanisms +func Multi(r retrievalmarket.PeerResolver) retrievalmarket.PeerResolver { // TODO: actually support multiple mechanisms return r } diff --git a/retrieval/discovery/local.go b/retrieval/discovery/local.go index 984e47da5..c777a0828 100644 --- a/retrieval/discovery/local.go +++ b/retrieval/discovery/local.go @@ -9,6 +9,7 @@ import ( logging "github.com/ipfs/go-log/v2" "github.com/filecoin-project/lotus/node/modules/dtypes" + retrievalmarket "github.com/filecoin-project/lotus/retrieval" ) var log = logging.Logger("ret-discovery") @@ -21,7 +22,7 @@ func NewLocal(ds dtypes.MetadataDS) *Local { return &Local{ds: namespace.Wrap(ds, datastore.NewKey("/deals/local"))} } -func (l *Local) AddPeer(cid cid.Cid, peer RetrievalPeer) error { +func (l *Local) AddPeer(cid cid.Cid, peer retrievalmarket.RetrievalPeer) error { // TODO: allow multiple peers here // (implement an util for tracking map[thing][]otherThing, use in sectorBlockstore too) @@ -35,19 +36,19 @@ func (l *Local) AddPeer(cid cid.Cid, peer RetrievalPeer) error { return l.ds.Put(dshelp.CidToDsKey(cid), entry) } -func (l *Local) GetPeers(data cid.Cid) ([]RetrievalPeer, error) { +func (l *Local) GetPeers(data cid.Cid) ([]retrievalmarket.RetrievalPeer, error) { entry, err := l.ds.Get(dshelp.CidToDsKey(data)) if err == datastore.ErrNotFound { - return []RetrievalPeer{}, nil + return []retrievalmarket.RetrievalPeer{}, nil } if err != nil { return nil, err } - var peer RetrievalPeer + var peer retrievalmarket.RetrievalPeer if err := cbor.DecodeInto(entry, &peer); err != nil { return nil, err } - return []RetrievalPeer{peer}, nil + return []retrievalmarket.RetrievalPeer{peer}, nil } -var _ PeerResolver = &Local{} +var _ retrievalmarket.PeerResolver = &Local{} diff --git a/retrieval/cbor_gen.go b/retrieval/impl/cbor_gen.go similarity index 92% rename from retrieval/cbor_gen.go rename to retrieval/impl/cbor_gen.go index c80ab66d6..f2778544c 100644 --- a/retrieval/cbor_gen.go +++ b/retrieval/impl/cbor_gen.go @@ -1,4 +1,4 @@ -package retrieval +package retrievalimpl import ( "fmt" @@ -67,7 +67,7 @@ func (t *RetParams) UnmarshalCBOR(r io.Reader) error { return nil } -func (t *Query) MarshalCBOR(w io.Writer) error { +func (t *OldQuery) MarshalCBOR(w io.Writer) error { if t == nil { _, err := w.Write(cbg.CborNull) return err @@ -85,7 +85,7 @@ func (t *Query) MarshalCBOR(w io.Writer) error { return nil } -func (t *Query) UnmarshalCBOR(r io.Reader) error { +func (t *OldQuery) UnmarshalCBOR(r io.Reader) error { br := cbg.GetPeeker(r) maj, extra, err := cbg.CborReadHeader(br) @@ -115,7 +115,7 @@ func (t *Query) UnmarshalCBOR(r io.Reader) error { return nil } -func (t *QueryResponse) MarshalCBOR(w io.Writer) error { +func (t *OldQueryResponse) MarshalCBOR(w io.Writer) error { if t == nil { _, err := w.Write(cbg.CborNull) return err @@ -124,7 +124,7 @@ func (t *QueryResponse) MarshalCBOR(w io.Writer) error { return err } - // t.Status (retrieval.QueryResponseStatus) (uint64) + // t.t.Status (retrieval.OldQueryResponseStatus) (uint64) if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.Status))); err != nil { return err } @@ -141,7 +141,7 @@ func (t *QueryResponse) MarshalCBOR(w io.Writer) error { return nil } -func (t *QueryResponse) UnmarshalCBOR(r io.Reader) error { +func (t *OldQueryResponse) UnmarshalCBOR(r io.Reader) error { br := cbg.GetPeeker(r) maj, extra, err := cbg.CborReadHeader(br) @@ -156,7 +156,7 @@ func (t *QueryResponse) UnmarshalCBOR(r io.Reader) error { return fmt.Errorf("cbor input had wrong number of fields") } - // t.Status (retrieval.QueryResponseStatus) (uint64) + // t.t.Status (retrieval.OldQueryResponseStatus) (uint64) maj, extra, err = cbg.CborReadHeader(br) if err != nil { @@ -165,8 +165,8 @@ func (t *QueryResponse) UnmarshalCBOR(r io.Reader) error { if maj != cbg.MajUnsignedInt { return fmt.Errorf("wrong type for uint64 field") } - t.Status = QueryResponseStatus(extra) - // t.Size (uint64) (uint64) + t.Status = OldQueryResponseStatus(extra) + // t.t.Size (uint64) (uint64) maj, extra, err = cbg.CborReadHeader(br) if err != nil { @@ -247,7 +247,7 @@ func (t *Unixfs0Offer) UnmarshalCBOR(r io.Reader) error { return nil } -func (t *DealProposal) MarshalCBOR(w io.Writer) error { +func (t *OldDealProposal) MarshalCBOR(w io.Writer) error { if t == nil { _, err := w.Write(cbg.CborNull) return err @@ -274,7 +274,7 @@ func (t *DealProposal) MarshalCBOR(w io.Writer) error { return nil } -func (t *DealProposal) UnmarshalCBOR(r io.Reader) error { +func (t *OldDealProposal) UnmarshalCBOR(r io.Reader) error { br := cbg.GetPeeker(r) maj, extra, err := cbg.CborReadHeader(br) @@ -322,7 +322,7 @@ func (t *DealProposal) UnmarshalCBOR(r io.Reader) error { return nil } -func (t *DealResponse) MarshalCBOR(w io.Writer) error { +func (t *OldDealResponse) MarshalCBOR(w io.Writer) error { if t == nil { _, err := w.Write(cbg.CborNull) return err @@ -350,7 +350,7 @@ func (t *DealResponse) MarshalCBOR(w io.Writer) error { return nil } -func (t *DealResponse) UnmarshalCBOR(r io.Reader) error { +func (t *OldDealResponse) UnmarshalCBOR(r io.Reader) error { br := cbg.GetPeeker(r) maj, extra, err := cbg.CborReadHeader(br) diff --git a/retrieval/impl/client.go b/retrieval/impl/client.go new file mode 100644 index 000000000..96bad3503 --- /dev/null +++ b/retrieval/impl/client.go @@ -0,0 +1,381 @@ +package retrievalimpl + +import ( + "context" + "reflect" + "sync" + + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + blockstore "github.com/ipfs/go-ipfs-blockstore" + logging "github.com/ipfs/go-log/v2" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + cbg "github.com/whyrusleeping/cbor-gen" + "golang.org/x/xerrors" + + cborutil "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/types" + retrievalmarket "github.com/filecoin-project/lotus/retrieval" +) + +var log = logging.Logger("retrieval") + +type client struct { + h host.Host + bs blockstore.Blockstore + node retrievalmarket.RetrievalClientNode + // The parameters should be replaced by RetrievalClientNode + + nextDealLk sync.Mutex + nextDealID retrievalmarket.DealID + subscribers []retrievalmarket.ClientSubscriber +} + +// NewClient creates a new retrieval client +func NewClient(h host.Host, bs blockstore.Blockstore, node retrievalmarket.RetrievalClientNode) retrievalmarket.RetrievalClient { + return &client{h: h, bs: bs, node: node} +} + +// V0 + +// TODO: Implement for retrieval provider V0 epic +// https://github.com/filecoin-project/go-retrieval-market-project/issues/12 +func (c *client) FindProviders(pieceCID []byte) []retrievalmarket.RetrievalPeer { + panic("not implemented") +} + +// TODO: Update to match spec for V0 epic +// https://github.com/filecoin-project/go-retrieval-market-project/issues/8 +func (c *client) Query(ctx context.Context, p retrievalmarket.RetrievalPeer, pieceCID []byte, params retrievalmarket.QueryParams) (retrievalmarket.QueryResponse, error) { + cid, err := cid.Cast(pieceCID) + if err != nil { + log.Warn(err) + return retrievalmarket.QueryResponseUndefined, err + } + + s, err := c.h.NewStream(ctx, p.ID, retrievalmarket.QueryProtocolID) + if err != nil { + log.Warn(err) + return retrievalmarket.QueryResponseUndefined, err + } + defer s.Close() + + err = cborutil.WriteCborRPC(s, &OldQuery{ + Piece: cid, + }) + if err != nil { + log.Warn(err) + return retrievalmarket.QueryResponseUndefined, err + } + + var oldResp OldQueryResponse + if err := oldResp.UnmarshalCBOR(s); err != nil { + log.Warn(err) + return retrievalmarket.QueryResponseUndefined, err + } + + resp := retrievalmarket.QueryResponse{ + Status: retrievalmarket.QueryResponseStatus(oldResp.Status), + Size: oldResp.Size, + MinPricePerByte: types.BigDiv(oldResp.MinPrice, types.NewInt(oldResp.Size)), + } + return resp, nil +} + +// TODO: Update to match spec for V0 Epic: +// https://github.com/filecoin-project/go-retrieval-market-project/issues/9 +func (c *client) Retrieve(ctx context.Context, pieceCID []byte, params retrievalmarket.Params, totalFunds types.BigInt, miner peer.ID, clientWallet retrievalmarket.Address, minerWallet retrievalmarket.Address) retrievalmarket.DealID { + /* The implementation of this function is just wrapper for the old code which retrieves UnixFS pieces + -- it will be replaced when we do the V0 implementation of the module */ + c.nextDealLk.Lock() + c.nextDealID++ + dealID := c.nextDealID + c.nextDealLk.Unlock() + + dealState := retrievalmarket.ClientDealState{ + DealProposal: retrievalmarket.DealProposal{ + PieceCID: pieceCID, + ID: dealID, + Params: params, + }, + Status: retrievalmarket.DealStatusFailed, + Sender: miner, + } + + go func() { + evt := retrievalmarket.ClientEventError + converted, err := cid.Cast(pieceCID) + + if err == nil { + err = c.retrieveUnixfs(ctx, converted, types.BigDiv(totalFunds, params.PricePerByte).Uint64(), totalFunds, miner, clientWallet, minerWallet) + if err == nil { + evt = retrievalmarket.ClientEventComplete + dealState.Status = retrievalmarket.DealStatusCompleted + } + } + + c.notifySubscribers(evt, dealState) + }() + + return dealID +} + +// unsubscribeAt returns a function that removes an item from the subscribers list by comparing +// their reflect.ValueOf before pulling the item out of the slice. Does not preserve order. +// Subsequent, repeated calls to the func with the same Subscriber are a no-op. +func (c *client) unsubscribeAt(sub retrievalmarket.ClientSubscriber) retrievalmarket.Unsubscribe { + return func() { + curLen := len(c.subscribers) + for i, el := range c.subscribers { + if reflect.ValueOf(sub) == reflect.ValueOf(el) { + c.subscribers[i] = c.subscribers[curLen-1] + c.subscribers = c.subscribers[:curLen-1] + return + } + } + } +} + +func (c *client) notifySubscribers(evt retrievalmarket.ClientEvent, ds retrievalmarket.ClientDealState) { + for _, cb := range c.subscribers { + cb(evt, ds) + } +} + +func (c *client) SubscribeToEvents(subscriber retrievalmarket.ClientSubscriber) retrievalmarket.Unsubscribe { + c.subscribers = append(c.subscribers, subscriber) + return c.unsubscribeAt(subscriber) +} + +// V1 +func (c *client) AddMoreFunds(id retrievalmarket.DealID, amount types.BigInt) error { + panic("not implemented") +} + +func (c *client) CancelDeal(id retrievalmarket.DealID) error { + panic("not implemented") +} + +func (c *client) RetrievalStatus(id retrievalmarket.DealID) { + panic("not implemented") +} + +func (c *client) ListDeals() map[retrievalmarket.DealID]retrievalmarket.ClientDealState { + panic("not implemented") +} + +type clientStream struct { + node retrievalmarket.RetrievalClientNode + stream network.Stream + peeker cbg.BytePeeker + + root cid.Cid + size types.BigInt + offset uint64 + + paych retrievalmarket.Address + lane uint64 + total types.BigInt + transferred types.BigInt + + windowSize uint64 // how much we "trust" the peer + verifier BlockVerifier + bs blockstore.Blockstore +} + +/* This is the old retrieval code that is NOT spec compliant */ + +// C > S +// +// Offset MUST be aligned on chunking boundaries, size is rounded up to leaf size +// +// > DealProposal{Mode: Unixfs0, RootCid, Offset, Size, Payment(nil if free)} +// < Resp{Accept} +// < ..(Intermediate Block) +// < ..Blocks +// < ..(Intermediate Block) +// < ..Blocks +// > DealProposal(...) +// < ... +func (c *client) retrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, total types.BigInt, miner peer.ID, client, minerAddr retrievalmarket.Address) error { + s, err := c.h.NewStream(ctx, miner, retrievalmarket.ProtocolID) + if err != nil { + return err + } + defer s.Close() + + initialOffset := uint64(0) // TODO: Check how much data we have locally + // TODO: Support in handler + // TODO: Allow client to specify this + + paych, err := c.node.GetOrCreatePaymentChannel(ctx, client, minerAddr, total) + if err != nil { + return xerrors.Errorf("getting payment channel: %w", err) + } + lane, err := c.node.AllocateLane(paych) + if err != nil { + return xerrors.Errorf("allocating payment lane: %w", err) + } + + cst := clientStream{ + node: c.node, + stream: s, + peeker: cbg.GetPeeker(s), + + root: root, + size: types.NewInt(size), + offset: initialOffset, + + paych: paych, + lane: lane, + total: total, + transferred: types.NewInt(0), + + windowSize: build.UnixfsChunkSize, + verifier: &UnixFs0Verifier{Root: root}, + bs: c.bs, + } + + for cst.offset != size+initialOffset { + toFetch := cst.windowSize + if toFetch+cst.offset > size { + toFetch = size - cst.offset + } + log.Infof("Retrieve %dB @%d", toFetch, cst.offset) + + err := cst.doOneExchange(ctx, toFetch) + if err != nil { + return xerrors.Errorf("retrieval exchange: %w", err) + } + + cst.offset += toFetch + } + log.Info("RETRIEVE SUCCESSFUL") + return nil +} + +func (cst *clientStream) doOneExchange(ctx context.Context, toFetch uint64) error { + payAmount := types.BigDiv(types.BigMul(cst.total, types.NewInt(toFetch)), cst.size) + + payment, err := cst.setupPayment(ctx, payAmount) + if err != nil { + return xerrors.Errorf("setting up retrieval payment: %w", err) + } + + deal := &OldDealProposal{ + Payment: payment, + Ref: cst.root, + Params: RetParams{ + Unixfs0: &Unixfs0Offer{ + Offset: cst.offset, + Size: toFetch, + }, + }, + } + + if err := cborutil.WriteCborRPC(cst.stream, deal); err != nil { + return err + } + + var resp OldDealResponse + if err := cborutil.ReadCborRPC(cst.peeker, &resp); err != nil { + log.Error(err) + return err + } + + if resp.Status != Accepted { + cst.windowSize = build.UnixfsChunkSize + // TODO: apply some 'penalty' to miner 'reputation' (needs to be the same in both cases) + + if resp.Status == Error { + return xerrors.Errorf("storage deal error: %s", resp.Message) + } + if resp.Status == Rejected { + return xerrors.Errorf("storage deal rejected: %s", resp.Message) + } + return xerrors.New("storage deal response had no Accepted section") + } + + log.Info("Retrieval accepted, fetching blocks") + + return cst.fetchBlocks(toFetch) + + // TODO: maybe increase miner window size after success +} + +func (cst *clientStream) fetchBlocks(toFetch uint64) error { + blocksToFetch := (toFetch + build.UnixfsChunkSize - 1) / build.UnixfsChunkSize + + for i := uint64(0); i < blocksToFetch; { + log.Infof("block %d of %d", i+1, blocksToFetch) + + var block Block + if err := cborutil.ReadCborRPC(cst.peeker, &block); err != nil { + return xerrors.Errorf("reading fetchBlock response: %w", err) + } + + dataBlocks, err := cst.consumeBlockMessage(block) + if err != nil { + return xerrors.Errorf("consuming retrieved blocks: %w", err) + } + + i += dataBlocks + } + + return nil +} + +func (cst *clientStream) consumeBlockMessage(block Block) (uint64, error) { + prefix, err := cid.PrefixFromBytes(block.Prefix) + if err != nil { + return 0, err + } + + cid, err := prefix.Sum(block.Data) + + blk, err := blocks.NewBlockWithCid(block.Data, cid) + if err != nil { + return 0, err + } + + internal, err := cst.verifier.Verify(context.TODO(), blk) + if err != nil { + log.Warnf("block verify failed: %s", err) + return 0, err + } + + // TODO: Smarter out, maybe add to filestore automagically + // (Also, persist intermediate nodes) + err = cst.bs.Put(blk) + if err != nil { + log.Warnf("block write failed: %s", err) + return 0, err + } + + if internal { + return 0, nil + } + + return 1, nil +} + +func (cst *clientStream) setupPayment(ctx context.Context, toSend types.BigInt) (api.PaymentInfo, error) { + amount := types.BigAdd(cst.transferred, toSend) + + sv, err := cst.node.CreatePaymentVoucher(ctx, cst.paych, amount, cst.lane) + if err != nil { + return api.PaymentInfo{}, err + } + + cst.transferred = amount + + return api.PaymentInfo{ + Channel: cst.paych, + ChannelMessage: nil, + Vouchers: []*types.SignedVoucher{sv}, + }, nil +} diff --git a/retrieval/miner.go b/retrieval/impl/provider.go similarity index 54% rename from retrieval/miner.go rename to retrieval/impl/provider.go index c9b54c5ec..ef1de4dde 100644 --- a/retrieval/miner.go +++ b/retrieval/impl/provider.go @@ -1,76 +1,144 @@ -package retrieval +package retrievalimpl import ( "context" "io" + "reflect" "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" "github.com/ipfs/go-merkledag" unixfile "github.com/ipfs/go-unixfs/file" + "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-cbor-util" - "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/types" + retrievalmarket "github.com/filecoin-project/lotus/retrieval" "github.com/filecoin-project/lotus/storage/sectorblocks" ) -type RetrMinerApi interface { +// RetrMinerAPI are the node functions needed by a retrieval provider +type RetrMinerAPI interface { PaychVoucherAdd(context.Context, address.Address, *types.SignedVoucher, []byte, types.BigInt) (types.BigInt, error) } -type Miner struct { - sectorBlocks *sectorblocks.SectorBlocks - full RetrMinerApi +type provider struct { - pricePerByte types.BigInt - // TODO: Unseal price + // TODO: Replace with RetrievalProviderNode & FileStore for https://github.com/filecoin-project/go-retrieval-market-project/issues/9 + sectorBlocks *sectorblocks.SectorBlocks + + // TODO: Replace with RetrievalProviderNode for + // https://github.com/filecoin-project/go-retrieval-market-project/issues/4 + node retrievalmarket.RetrievalProviderNode + + pricePerByte retrievalmarket.BigInt + + subscribers []retrievalmarket.ProviderSubscriber } -func NewMiner(sblks *sectorblocks.SectorBlocks, full api.FullNode) *Miner { - return &Miner{ +// NewProvider returns a new retrieval provider +func NewProvider(sblks *sectorblocks.SectorBlocks, node retrievalmarket.RetrievalProviderNode) retrievalmarket.RetrievalProvider { + return &provider{ sectorBlocks: sblks, - full: full, + node: node, pricePerByte: types.NewInt(2), // TODO: allow setting } } +// Start begins listening for deals on the given host +func (p *provider) Start(host host.Host) { + host.SetStreamHandler(retrievalmarket.QueryProtocolID, p.handleQueryStream) + host.SetStreamHandler(retrievalmarket.ProtocolID, p.handleDealStream) +} + +// V0 +// SetPricePerByte sets the price per byte a miner charges for retrievals +func (p *provider) SetPricePerByte(price retrievalmarket.BigInt) { + p.pricePerByte = price +} + +// SetPaymentInterval sets the maximum number of bytes a a provider will send before +// requesting further payment, and the rate at which that value increases +// TODO: Implement for https://github.com/filecoin-project/go-retrieval-market-project/issues/7 +func (p *provider) SetPaymentInterval(paymentInterval uint64, paymentIntervalIncrease uint64) { + panic("not implemented") +} + +// unsubscribeAt returns a function that removes an item from the subscribers list by comparing +// their reflect.ValueOf before pulling the item out of the slice. Does not preserve order. +// Subsequent, repeated calls to the func with the same Subscriber are a no-op. +func (p *provider) unsubscribeAt(sub retrievalmarket.ProviderSubscriber) retrievalmarket.Unsubscribe { + return func() { + curLen := len(p.subscribers) + for i, el := range p.subscribers { + if reflect.ValueOf(sub) == reflect.ValueOf(el) { + p.subscribers[i] = p.subscribers[curLen-1] + p.subscribers = p.subscribers[:curLen-1] + return + } + } + } +} + +func (p *provider) notifySubscribers(evt retrievalmarket.ProviderEvent, ds retrievalmarket.ProviderDealState) { + for _, cb := range p.subscribers { + cb(evt, ds) + } +} + +// SubscribeToEvents listens for events that happen related to client retrievals +// TODO: Implement updates as part of https://github.com/filecoin-project/go-retrieval-market-project/issues/7 +func (p *provider) SubscribeToEvents(subscriber retrievalmarket.ProviderSubscriber) retrievalmarket.Unsubscribe { + p.subscribers = append(p.subscribers, subscriber) + return p.unsubscribeAt(subscriber) +} + +// V1 +func (p *provider) SetPricePerUnseal(price retrievalmarket.BigInt) { + panic("not implemented") +} + +func (p *provider) ListDeals() map[retrievalmarket.ProviderDealID]retrievalmarket.ProviderDealState { + panic("not implemented") +} + func writeErr(stream network.Stream, err error) { log.Errorf("Retrieval deal error: %+v", err) - _ = cborutil.WriteCborRPC(stream, &DealResponse{ + _ = cborutil.WriteCborRPC(stream, &OldDealResponse{ Status: Error, Message: err.Error(), }) } -func (m *Miner) HandleQueryStream(stream network.Stream) { +// TODO: Update for https://github.com/filecoin-project/go-retrieval-market-project/issues/8 +func (p *provider) handleQueryStream(stream network.Stream) { defer stream.Close() - var query Query + var query OldQuery if err := cborutil.ReadCborRPC(stream, &query); err != nil { writeErr(stream, err) return } - size, err := m.sectorBlocks.GetSize(query.Piece) + size, err := p.sectorBlocks.GetSize(query.Piece) if err != nil && err != sectorblocks.ErrNotFound { log.Errorf("Retrieval query: GetRefs: %s", err) return } - answer := &QueryResponse{ + answer := &OldQueryResponse{ Status: Unavailable, } if err == nil { answer.Status = Available // TODO: get price, look for already unsealed ref to reduce work - answer.MinPrice = types.BigMul(types.NewInt(uint64(size)), m.pricePerByte) + answer.MinPrice = types.BigMul(types.NewInt(uint64(size)), p.pricePerByte) answer.Size = uint64(size) // TODO: verify on intermediate } @@ -81,7 +149,7 @@ func (m *Miner) HandleQueryStream(stream network.Stream) { } type handlerDeal struct { - m *Miner + p *provider stream network.Stream ufsr sectorblocks.UnixfsReader @@ -90,11 +158,12 @@ type handlerDeal struct { size uint64 } -func (m *Miner) HandleDealStream(stream network.Stream) { +// TODO: Update for https://github.com/filecoin-project/go-retrieval-market-project/issues/7 +func (p *provider) handleDealStream(stream network.Stream) { defer stream.Close() hnd := &handlerDeal{ - m: m, + p: p, stream: stream, } @@ -113,7 +182,7 @@ func (m *Miner) HandleDealStream(stream network.Stream) { } func (hnd *handlerDeal) handleNext() (bool, error) { - var deal DealProposal + var deal OldDealProposal if err := cborutil.ReadCborRPC(hnd.stream, &deal); err != nil { if err == io.EOF { // client sent all deals err = nil @@ -131,8 +200,8 @@ func (hnd *handlerDeal) handleNext() (bool, error) { return false, xerrors.Errorf("expected one signed voucher, got %d", len(deal.Payment.Vouchers)) } - expPayment := types.BigMul(hnd.m.pricePerByte, types.NewInt(deal.Params.Unixfs0.Size)) - if _, err := hnd.m.full.PaychVoucherAdd(context.TODO(), deal.Payment.Channel, deal.Payment.Vouchers[0], nil, expPayment); err != nil { + expPayment := types.BigMul(hnd.p.pricePerByte, types.NewInt(deal.Params.Unixfs0.Size)) + if _, err := hnd.p.node.SavePaymentVoucher(context.TODO(), deal.Payment.Channel, deal.Payment.Vouchers[0], nil, expPayment); err != nil { return false, xerrors.Errorf("processing retrieval payment: %w", err) } @@ -156,7 +225,7 @@ func (hnd *handlerDeal) handleNext() (bool, error) { return true, nil } -func (hnd *handlerDeal) openFile(deal DealProposal) error { +func (hnd *handlerDeal) openFile(deal OldDealProposal) error { unixfs0 := deal.Params.Unixfs0 if unixfs0.Offset != 0 { @@ -165,7 +234,7 @@ func (hnd *handlerDeal) openFile(deal DealProposal) error { } hnd.at = unixfs0.Offset - bstore := hnd.m.sectorBlocks.SealedBlockstore(func() error { + bstore := hnd.p.sectorBlocks.SealedBlockstore(func() error { return nil // TODO: approve unsealing based on amount paid }) @@ -197,10 +266,10 @@ func (hnd *handlerDeal) openFile(deal DealProposal) error { return nil } -func (hnd *handlerDeal) accept(deal DealProposal) error { +func (hnd *handlerDeal) accept(deal OldDealProposal) error { unixfs0 := deal.Params.Unixfs0 - resp := &DealResponse{ + resp := &OldDealResponse{ Status: Accepted, } if err := cborutil.WriteCborRPC(hnd.stream, resp); err != nil { diff --git a/retrieval/impl/run_cbor_gen.go b/retrieval/impl/run_cbor_gen.go new file mode 100644 index 000000000..8dbd6f19c --- /dev/null +++ b/retrieval/impl/run_cbor_gen.go @@ -0,0 +1,33 @@ +package retrievalimpl + +import ( + "fmt" + "os" + + cborgen "github.com/whyrusleeping/cbor-gen" +) + +func RunCborGen() error { + genName := "./impl/cbor_gen.go" + reName := "./impl/cbor_gen_old.go" + if err := os.Rename(genName, reName); err != nil { + return fmt.Errorf("could not rename %s to %s", genName, reName) + } + if err := cborgen.WriteTupleEncodersToFile( + genName, + "retrievalimpl", + RetParams{}, + OldQuery{}, + OldQueryResponse{}, + Unixfs0Offer{}, + OldDealProposal{}, + OldDealResponse{}, + Block{}, + ); err != nil { + return err + } + if err := os.Remove(reName); err != nil { + return err + } + return nil +} diff --git a/retrieval/impl/types.go b/retrieval/impl/types.go new file mode 100644 index 000000000..3af74f993 --- /dev/null +++ b/retrieval/impl/types.go @@ -0,0 +1,67 @@ +package retrievalimpl + +import ( + "github.com/filecoin-project/lotus/api" + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/lotus/chain/types" +) + +/* These types are all the types provided by Lotus, which diverge even from +spec V0 -- prior to the "update to spec epic", we are using these types internally +and switching to spec at the boundaries of the module */ + +type OldQueryResponseStatus uint64 + +const ( + Available OldQueryResponseStatus = iota + Unavailable +) + +const ( + Accepted = iota + Error + Rejected + Unsealing +) + +type OldQuery struct { + Piece cid.Cid + // TODO: payment +} + +type OldQueryResponse struct { + Status OldQueryResponseStatus + + Size uint64 // TODO: spec + // TODO: unseal price (+spec) + // TODO: sectors to unseal + // TODO: address to send money for the deal? + MinPrice types.BigInt +} + +type Unixfs0Offer struct { + Offset uint64 + Size uint64 +} + +type RetParams struct { + Unixfs0 *Unixfs0Offer +} + +type OldDealProposal struct { + Payment api.PaymentInfo + + Ref cid.Cid + Params RetParams +} + +type OldDealResponse struct { + Status uint64 + Message string +} + +type Block struct { // TODO: put in spec + Prefix []byte // TODO: fix cid.Prefix marshaling somehow + Data []byte +} diff --git a/retrieval/verify.go b/retrieval/impl/verify.go similarity index 83% rename from retrieval/verify.go rename to retrieval/impl/verify.go index 88fa0dfed..892876ef5 100644 --- a/retrieval/verify.go +++ b/retrieval/impl/verify.go @@ -1,8 +1,7 @@ -package retrieval +package retrievalimpl import ( "context" - "io" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" @@ -16,13 +15,13 @@ import ( ) type BlockVerifier interface { - Verify(context.Context, blocks.Block, io.Writer) (internal bool, err error) + Verify(context.Context, blocks.Block) (internal bool, err error) } type OptimisticVerifier struct { } -func (o *OptimisticVerifier) Verify(context.Context, blocks.Block, io.Writer) (bool, error) { +func (o *OptimisticVerifier) Verify(context.Context, blocks.Block) (bool, error) { // It's probably fine return false, nil } @@ -37,11 +36,11 @@ type UnixFs0Verifier struct { sub *UnixFs0Verifier } -func (b *UnixFs0Verifier) verify(ctx context.Context, blk blocks.Block, out io.Writer) (last bool, internal bool, err error) { +func (b *UnixFs0Verifier) verify(ctx context.Context, blk blocks.Block) (last bool, internal bool, err error) { if b.sub != nil { // TODO: check links here (iff b.sub.sub == nil) - subLast, internal, err := b.sub.verify(ctx, blk, out) + subLast, internal, err := b.sub.verify(ctx, blk) if err != nil { return false, false, err } @@ -57,7 +56,7 @@ func (b *UnixFs0Verifier) verify(ctx context.Context, blk blocks.Block, out io.W return false, false, xerrors.New("unixfs verifier: too many nodes in level") } - links, err := b.checkInternal(blk, out) + links, err := b.checkInternal(blk) if err != nil { return false, false, err } @@ -85,7 +84,7 @@ func (b *UnixFs0Verifier) verify(ctx context.Context, blk blocks.Block, out io.W return b.seen == b.expect, false, nil } -func (b *UnixFs0Verifier) checkInternal(blk blocks.Block, out io.Writer) (int, error) { +func (b *UnixFs0Verifier) checkInternal(blk blocks.Block) (int, error) { nd, err := ipld.Decode(blk) if err != nil { log.Warnf("IPLD Decode failed: %s", err) @@ -112,21 +111,20 @@ func (b *UnixFs0Verifier) checkInternal(blk blocks.Block, out io.Writer) (int, e return len(nd.Links()), nil case *merkledag.RawNode: - _, err := out.Write(nd.RawData()) - return 0, err + return 0, nil default: return 0, xerrors.New("verifier: unknown node type") } } -func (b *UnixFs0Verifier) Verify(ctx context.Context, blk blocks.Block, w io.Writer) (bool, error) { +func (b *UnixFs0Verifier) Verify(ctx context.Context, blk blocks.Block) (bool, error) { // root is special if b.rootBlk == nil { if !b.Root.Equals(blk.Cid()) { return false, xerrors.Errorf("unixfs verifier: root block CID didn't match: valid %s, got %s", b.Root, blk.Cid()) } b.rootBlk = blk - links, err := b.checkInternal(blk, w) + links, err := b.checkInternal(blk) if err != nil { return false, err } @@ -135,7 +133,7 @@ func (b *UnixFs0Verifier) Verify(ctx context.Context, blk blocks.Block, w io.Wri return links != 0, nil } - _, internal, err := b.verify(ctx, blk, w) + _, internal, err := b.verify(ctx, blk) return internal, err } diff --git a/retrieval/types.go b/retrieval/types.go index 28343e6b2..f82279a9d 100644 --- a/retrieval/types.go +++ b/retrieval/types.go @@ -1,66 +1,364 @@ -package retrieval +package retrievalmarket import ( - "github.com/filecoin-project/lotus/api" - "github.com/ipfs/go-cid" + "context" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/lotus/chain/types" + "github.com/ipfs/go-cid" + "github.com/ipld/go-ipld-prime" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" ) -const ProtocolID = "/fil/retrieval/-1.0.0" // TODO: spec -const QueryProtocolID = "/fil/retrieval/qry/-1.0.0" // TODO: spec +// type aliases +// TODO: Remove and use native types or extract for +// https://github.com/filecoin-project/go-retrieval-market-project/issues/5 +// BigInt is used for token amounts in retrieval deals +type BigInt = types.BigInt + +// Address is an address in the filecoin network +type Address = address.Address + +// SignedVoucher is a signed payment voucher +type SignedVoucher = types.SignedVoucher + +// ProtocolID is the protocol for proposing / responding to retrieval deals +const ProtocolID = "/fil/retrieval/0.0.1" + +// QueryProtocolID is the protocol for querying infromation about retrieval +// deal parameters +const QueryProtocolID = "/fil/retrieval/qry/0.0.1" // TODO: spec + +// Unsubscribe is a function that unsubscribes a subscriber for either the +// client or the provider +type Unsubscribe func() + +// ClientDealState is the current state of a deal from the point of view +// of a retrieval client +type ClientDealState struct { + DealProposal + Status DealStatus + Sender peer.ID + TotalReceived uint64 + FundsSpent BigInt +} + +// ClientEvent is an event that occurs in a deal lifecycle on the client +type ClientEvent uint64 + +const ( + // ClientEventOpen indicates a deal was initiated + ClientEventOpen ClientEvent = iota + + // ClientEventFundsExpended indicates a deal has run out of funds in the payment channel + // forcing the client to add more funds to continue the deal + ClientEventFundsExpended // when totalFunds is expended + + // ClientEventProgress indicates more data was received for a retrieval + ClientEventProgress + + // ClientEventError indicates an error occurred during a deal + ClientEventError + + // ClientEventComplete indicates a deal has completed + ClientEventComplete +) + +// ClientSubscriber is a callback that is registered to listen for retrieval events +type ClientSubscriber func(event ClientEvent, state ClientDealState) + +// RetrievalClient is a client interface for making retrieval deals +type RetrievalClient interface { + // V0 + + // Find Providers finds retrieval providers who may be storing a given piece + FindProviders(pieceCID []byte) []RetrievalPeer + + // Query asks a provider for information about a piece it is storing + Query( + ctx context.Context, + p RetrievalPeer, + pieceCID []byte, + params QueryParams, + ) (QueryResponse, error) + + // Retrieve retrieves all or part of a piece with the given retrieval parameters + Retrieve( + ctx context.Context, + pieceCID []byte, + params Params, + totalFunds BigInt, + miner peer.ID, + clientWallet Address, + minerWallet Address, + ) DealID + + // SubscribeToEvents listens for events that happen related to client retrievals + SubscribeToEvents(subscriber ClientSubscriber) Unsubscribe + + // V1 + AddMoreFunds(id DealID, amount BigInt) error + CancelDeal(id DealID) error + RetrievalStatus(id DealID) + ListDeals() map[DealID]ClientDealState +} + +// RetrievalClientNode are the node depedencies for a RetrevalClient +type RetrievalClientNode interface { + + // GetOrCreatePaymentChannel sets up a new payment channel if one does not exist + // between a client and a miner and insures the client has the given amount of funds available in the channel + GetOrCreatePaymentChannel(ctx context.Context, clientAddress Address, minerAddress Address, clientFundsAvailable BigInt) (Address, error) + + // Allocate late creates a lane within a payment channel so that calls to + // CreatePaymentVoucher will automatically make vouchers only for the difference + // in total + AllocateLane(paymentChannel Address) (uint64, error) + + // CreatePaymentVoucher creates a new payment voucher in the given lane for a + // given payment channel so that all the payment vouchers in the lane add up + // to the given amount (so the payment voucher will be for the difference) + CreatePaymentVoucher(ctx context.Context, paymentChannel Address, amount BigInt, lane uint64) (*SignedVoucher, error) +} + +// ProviderDealState is the current state of a deal from the point of view +// of a retrieval provider +type ProviderDealState struct { + DealProposal + Status DealStatus + Receiver peer.ID + TotalSent uint64 + FundsReceived BigInt +} + +// ProviderEvent is an event that occurs in a deal lifecycle on the provider +type ProviderEvent uint64 + +const ( + + // ProviderEventOpen indicates a new deal was received from a client + ProviderEventOpen ProviderEvent = iota + + // ProviderEventProgress indicates more data was sent to a client + ProviderEventProgress + + // ProviderEventError indicates an error occurred in processing a deal for a client + ProviderEventError + + // ProviderEventComplete indicates a retrieval deal was completed for a client + ProviderEventComplete +) + +// ProviderDealID is a unique identifier for a deal on a provider -- it is +// a combination of DealID set by the client and the peer ID of the client +type ProviderDealID struct { + From peer.ID + ID DealID +} + +// ProviderSubscriber is a callback that is registered to listen for retrieval events on a provider +type ProviderSubscriber func(event ProviderEvent, state ProviderDealState) + +// RetrievalProvider is an interface by which a provider configures their +// retrieval operations and monitors deals received and process +type RetrievalProvider interface { + // Start begins listening for deals on the given host + Start(host.Host) + + // V0 + + // SetPricePerByte sets the price per byte a miner charges for retrievals + SetPricePerByte(price BigInt) + + // SetPaymentInterval sets the maximum number of bytes a a provider will send before + // requesting further payment, and the rate at which that value increases + SetPaymentInterval(paymentInterval uint64, paymentIntervalIncrease uint64) + + // SubscribeToEvents listens for events that happen related to client retrievals + SubscribeToEvents(subscriber ProviderSubscriber) Unsubscribe + + // V1 + SetPricePerUnseal(price BigInt) + ListDeals() map[ProviderDealID]ProviderDealState +} + +// RetrievalProviderNode are the node depedencies for a RetrevalProvider +type RetrievalProviderNode interface { + SavePaymentVoucher(ctx context.Context, paymentChannel address.Address, voucher *SignedVoucher, proof []byte, expectedAmount BigInt) (BigInt, error) +} + +// PeerResolver is an interface for looking up providers that may have a piece +type PeerResolver interface { + GetPeers(data cid.Cid) ([]RetrievalPeer, error) // TODO: channel +} + +// RetrievalPeer is a provider address/peer.ID pair (everything needed to make +// deals for with a miner) +type RetrievalPeer struct { + Address Address + ID peer.ID // optional +} + +// QueryResponseStatus indicates whether a queried piece is available type QueryResponseStatus uint64 const ( - Available QueryResponseStatus = iota - Unavailable + // QueryResponseAvailable indicates a provider has a piece and is prepared to + // return it + QueryResponseAvailable QueryResponseStatus = iota + + // QueryResponseUnavailable indicates a provider either does not have or cannot + // serve the queried piece to the client + QueryResponseUnavailable ) +// QueryItemStatus (V1) indicates whether the requested part of a piece (payload or selector) +// is available for retrieval +type QueryItemStatus uint64 + const ( - Accepted = iota - Error - Rejected - Unsealing + // QueryItemAvailable indicates requested part of the piece is available to be + // served + QueryItemAvailable QueryItemStatus = iota + + // QueryItemUnavailable indicates the piece either does not contain the requested + // item or it cannot be served + QueryItemUnavailable + + // QueryItemUnknown indicates the provider cannot determine if the given item + // is part of the requested piece (for example, if the piece is sealed and the + // miner does not maintain a payload CID index) + QueryItemUnknown ) -type Query struct { - Piece cid.Cid - // TODO: payment +// QueryParams indicate what specific information about a piece that a retrieval +// client is interested in, as well as specific parameters the client is seeking +// for the retrieval deal +type QueryParams struct { + PayloadCID cid.Cid // optional, query if miner has this cid in this piece. some miners may not be able to respond. + Selector ipld.Node // optional, query if miner has this cid in this piece. some miners may not be able to respond. + MaxPricePerByte BigInt // optional, tell miner uninterested if more expensive than this + MinPaymentInterval uint64 // optional, tell miner uninterested unless payment interval is greater than this + MinPaymentIntervalIncrease uint64 // optional, tell miner uninterested unless payment interval increase is greater than this } +// Query is a query to a given provider to determine information about a piece +// they may have available for retrieval +type Query struct { + PieceCID []byte // V0 + // QueryParams // V1 +} + +// QueryResponse is a miners response to a given retrieval query type QueryResponse struct { Status QueryResponseStatus + //PayloadCIDFound QueryItemStatus // V1 - if a PayloadCid was requested, the result + //SelectorFound QueryItemStatus // V1 - if a Selector was requested, the result - Size uint64 // TODO: spec - // TODO: unseal price (+spec) - // TODO: sectors to unseal - // TODO: address to send money for the deal? - MinPrice types.BigInt + Size uint64 // Total size of piece in bytes + //ExpectedPayloadSize uint64 // V1 - optional, if PayloadCID + selector are specified and miner knows, can offer an expected size + + PaymentAddress Address // address to send funds to -- may be different than miner addr + MinPricePerByte BigInt + MaxPaymentInterval uint64 + MaxPaymentIntervalIncrease uint64 } -type Unixfs0Offer struct { - Offset uint64 - Size uint64 +// QueryResponseUndefined is an empty QueryResponse +var QueryResponseUndefined = QueryResponse{} + +// PieceRetrievalPrice is the total price to retrieve the piece (size * MinPricePerByte) +func (qr QueryResponse) PieceRetrievalPrice() BigInt { + return types.BigMul(qr.MinPricePerByte, types.NewInt(qr.Size)) } -type RetParams struct { - Unixfs0 *Unixfs0Offer +// PayloadRetrievalPrice is the expected price to retrieve just the given payload +// & selector (V1) +//func (qr QueryResponse) PayloadRetrievalPrice() BigInt { +// return types.BigMul(qr.MinPricePerByte, types.NewInt(qr.ExpectedPayloadSize)) +//} + +// DealStatus is the status of a retrieval deal returned by a provider +// in a DealResponse +type DealStatus uint64 + +const ( + // DealStatusAccepted means a deal has been accepted by a provider + // and its is ready to proceed with retrieval + DealStatusAccepted DealStatus = iota + + // DealStatusFailed indicates something went wrong during a retrieval + DealStatusFailed + + // DealStatusRejected indicates the provider rejected a client's deal proposal + // for some reason + DealStatusRejected + + // DealStatusUnsealing indicates the provider is currently unsealing the sector + // needed to serve the retrieval deal + DealStatusUnsealing + + // DealStatusFundsNeeded indicates the provider is awaiting a payment voucher to + // continue processing the deal + DealStatusFundsNeeded + + // DealStatusOngoing indicates the provider is continuing to process a deal + DealStatusOngoing + + // DealStatusFundsNeededLastPayment indicates the provider is awaiting funds for + // a final payment in order to complete a deal + DealStatusFundsNeededLastPayment + + // DealStatusCompleted indicates a deal is complete + DealStatusCompleted + + // DealStatusDealNotFound indicates an update was received for a deal that could + // not be identified + DealStatusDealNotFound +) + +// Params are the parameters requested for a retrieval deal proposal +type Params struct { + //PayloadCID cid.Cid // V1 + //Selector ipld.Node // V1 + PricePerByte BigInt + PaymentInterval uint64 + PaymentIntervalIncrease uint64 } +// DealID is an identifier for a retrieval deal (unique to a client) +type DealID uint64 + +// DealProposal is a proposal for a new retrieval deal type DealProposal struct { - Payment api.PaymentInfo - - Ref cid.Cid - Params RetParams + PieceCID []byte + ID DealID + Params } -type DealResponse struct { - Status uint64 - Message string -} - -type Block struct { // TODO: put in spec - Prefix []byte // TODO: fix cid.Prefix marshaling somehow +// Block is an IPLD block in bitswap format +type Block struct { + Prefix []byte Data []byte } + +// DealResponse is a response to a retrieval deal proposal +type DealResponse struct { + Status DealStatus + ID DealID + + // payment required to proceed + PaymentOwed BigInt + + Message string + Blocks []Block // V0 only +} + +// DealPayment is a payment for an in progress retrieval deal +type DealPayment struct { + ID DealID + PaymentChannel address.Address + PaymentVoucher *types.SignedVoucher +} diff --git a/retrievaladapter/client.go b/retrievaladapter/client.go new file mode 100644 index 000000000..ff7d2f0fb --- /dev/null +++ b/retrievaladapter/client.go @@ -0,0 +1,41 @@ +package retrievaladapter + +import ( + "context" + + payapi "github.com/filecoin-project/lotus/node/impl/paych" + "github.com/filecoin-project/lotus/paych" + retrievalmarket "github.com/filecoin-project/lotus/retrieval" +) + +type retrievalClientNode struct { + pmgr *paych.Manager + payapi payapi.PaychAPI +} + +// NewRetrievalClientNode returns a new node adapter for a retrieval client that talks to the +// Lotus Node +func NewRetrievalClientNode(pmgr *paych.Manager, payapi payapi.PaychAPI) retrievalmarket.RetrievalClientNode { + return &retrievalClientNode{pmgr: pmgr, payapi: payapi} +} + +// GetOrCreatePaymentChannel sets up a new payment channel if one does not exist +// between a client and a miner and insures the client has the given amount of funds available in the channel +func (rcn *retrievalClientNode) GetOrCreatePaymentChannel(ctx context.Context, clientAddress retrievalmarket.Address, minerAddress retrievalmarket.Address, clientFundsAvailable retrievalmarket.BigInt) (retrievalmarket.Address, error) { + paych, _, err := rcn.pmgr.GetPaych(ctx, clientAddress, minerAddress, clientFundsAvailable) + return paych, err +} + +// Allocate late creates a lane within a payment channel so that calls to +// CreatePaymentVoucher will automatically make vouchers only for the difference +// in total +func (rcn *retrievalClientNode) AllocateLane(paymentChannel retrievalmarket.Address) (uint64, error) { + return rcn.pmgr.AllocateLane(paymentChannel) +} + +// CreatePaymentVoucher creates a new payment voucher in the given lane for a +// given payment channel so that all the payment vouchers in the lane add up +// to the given amount (so the payment voucher will be for the difference) +func (rcn *retrievalClientNode) CreatePaymentVoucher(ctx context.Context, paymentChannel retrievalmarket.Address, amount retrievalmarket.BigInt, lane uint64) (*retrievalmarket.SignedVoucher, error) { + return rcn.payapi.PaychVoucherCreate(ctx, paymentChannel, amount, lane) +} diff --git a/retrievaladapter/provider.go b/retrievaladapter/provider.go new file mode 100644 index 000000000..f8d188a95 --- /dev/null +++ b/retrievaladapter/provider.go @@ -0,0 +1,23 @@ +package retrievaladapter + +import ( + "context" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/go-address" + retrievalmarket "github.com/filecoin-project/lotus/retrieval" +) + +type retrievalProviderNode struct { + full api.FullNode +} + +// NewRetrievalProviderNode returns a new node adapter for a retrieval provider that talks to the +// Lotus Node +func NewRetrievalProviderNode(full api.FullNode) retrievalmarket.RetrievalProviderNode { + return &retrievalProviderNode{full} +} + +func (rpn *retrievalProviderNode) SavePaymentVoucher(ctx context.Context, paymentChannel address.Address, voucher *retrievalmarket.SignedVoucher, proof []byte, expectedAmount retrievalmarket.BigInt) (retrievalmarket.BigInt, error) { + return rpn.full.PaychVoucherAdd(ctx, paymentChannel, voucher, proof, expectedAmount) +}