lotus/retrieval/client.go

273 lines
6.8 KiB
Go
Raw Normal View History

2019-08-26 13:45:36 +00:00
package retrieval
import (
"context"
2019-08-27 22:10:23 +00:00
"io"
2019-08-26 13:45:36 +00:00
2019-08-27 18:45:21 +00:00
blocks "github.com/ipfs/go-block-format"
2019-08-26 13:45:36 +00:00
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/host"
2019-08-27 18:45:21 +00:00
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
cbg "github.com/whyrusleeping/cbor-gen"
2019-08-27 18:45:21 +00:00
"golang.org/x/xerrors"
2019-08-26 13:45:36 +00:00
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/types"
2019-11-07 14:11:39 +00:00
"github.com/filecoin-project/lotus/lib/cborutil"
payapi "github.com/filecoin-project/lotus/node/impl/paych"
"github.com/filecoin-project/lotus/paych"
"github.com/filecoin-project/lotus/retrieval/discovery"
2019-08-26 13:45:36 +00:00
)
var log = logging.Logger("retrieval")
type Client struct {
h host.Host
2019-09-16 13:46:05 +00:00
pmgr *paych.Manager
payapi payapi.PaychAPI
2019-08-26 13:45:36 +00:00
}
2019-09-16 20:11:17 +00:00
func NewClient(h host.Host, pmgr *paych.Manager, payapi payapi.PaychAPI) *Client {
return &Client{h: h, pmgr: pmgr, payapi: payapi}
2019-08-26 13:45:36 +00:00
}
2019-08-26 18:23:11 +00:00
func (c *Client) Query(ctx context.Context, p discovery.RetrievalPeer, data cid.Cid) api.QueryOffer {
2019-08-26 13:45:36 +00:00
s, err := c.h.NewStream(ctx, p.ID, QueryProtocolID)
if err != nil {
log.Warn(err)
2019-08-26 18:23:11 +00:00
return api.QueryOffer{Err: err.Error(), Miner: p.Address, MinerPeerID: p.ID}
2019-08-26 13:45:36 +00:00
}
defer s.Close()
2019-11-07 14:11:39 +00:00
err = cborutil.WriteCborRPC(s, &Query{
2019-08-26 13:45:36 +00:00
Piece: data,
})
if err != nil {
log.Warn(err)
2019-08-26 18:23:11 +00:00
return api.QueryOffer{Err: err.Error(), Miner: p.Address, MinerPeerID: p.ID}
2019-08-26 13:45:36 +00:00
}
2019-08-26 18:23:11 +00:00
var resp QueryResponse
if err := resp.UnmarshalCBOR(s); err != nil {
2019-08-26 13:45:36 +00:00
log.Warn(err)
2019-08-26 18:23:11 +00:00
return api.QueryOffer{Err: err.Error(), Miner: p.Address, MinerPeerID: p.ID}
2019-08-26 13:45:36 +00:00
}
2019-08-26 18:23:11 +00:00
return api.QueryOffer{
2019-08-27 19:54:39 +00:00
Root: data,
2019-08-26 13:45:36 +00:00
Size: resp.Size,
MinPrice: resp.MinPrice,
Miner: p.Address, // TODO: check
MinerPeerID: p.ID,
}
}
2019-08-26 18:23:11 +00:00
type clientStream struct {
2019-09-16 13:46:05 +00:00
payapi payapi.PaychAPI
2019-08-26 18:23:11 +00:00
stream network.Stream
peeker cbg.BytePeeker
2019-08-26 18:23:11 +00:00
root cid.Cid
2019-09-16 13:46:05 +00:00
size types.BigInt
2019-08-26 18:23:11 +00:00
offset uint64
2019-09-16 13:46:05 +00:00
paych address.Address
lane uint64
total types.BigInt
transferred types.BigInt
2019-08-26 18:23:11 +00:00
windowSize uint64 // how much we "trust" the peer
verifier BlockVerifier
}
// C > S
//
// Offset MUST be aligned on chunking boundaries, size is rounded up to leaf size
//
// > DealProposal{Mode: Unixfs0, RootCid, Offset, Size, Payment(nil if free)}
2019-08-26 18:23:11 +00:00
// < Resp{Accept}
// < ..(Intermediate Block)
// < ..Blocks
// < ..(Intermediate Block)
// < ..Blocks
// > DealProposal(...)
2019-08-26 18:23:11 +00:00
// < ...
2019-09-16 13:46:05 +00:00
func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, total types.BigInt, miner peer.ID, client, minerAddr address.Address, out io.Writer) error {
2019-08-26 18:23:11 +00:00
s, err := c.h.NewStream(ctx, miner, ProtocolID)
if err != nil {
2019-12-11 13:59:15 +00:00
return xerrors.Errorf("failed to open stream to miner for retrieval query: %w", err)
2019-08-26 18:23:11 +00:00
}
defer s.Close()
2019-08-28 23:01:28 +00:00
initialOffset := uint64(0) // TODO: Check how much data we have locally
// TODO: Support in handler
// TODO: Allow client to specify this
2019-09-16 13:46:05 +00:00
paych, _, err := c.pmgr.GetPaych(ctx, client, minerAddr, total)
if err != nil {
2019-09-17 08:34:41 +00:00
return xerrors.Errorf("getting payment channel: %w", err)
2019-09-16 13:46:05 +00:00
}
lane, err := c.pmgr.AllocateLane(paych)
if err != nil {
2019-09-17 08:34:41 +00:00
return xerrors.Errorf("allocating payment lane: %w", err)
2019-09-16 13:46:05 +00:00
}
2019-08-26 18:23:11 +00:00
cst := clientStream{
2019-09-16 13:46:05 +00:00
payapi: c.payapi,
2019-08-26 18:23:11 +00:00
stream: s,
peeker: cbg.GetPeeker(s),
2019-08-26 18:23:11 +00:00
root: root,
2019-09-16 13:46:05 +00:00
size: types.NewInt(size),
2019-08-28 23:01:28 +00:00
offset: initialOffset,
2019-08-26 18:23:11 +00:00
2019-09-16 13:46:05 +00:00
paych: paych,
lane: lane,
total: total,
transferred: types.NewInt(0),
2019-08-26 18:23:11 +00:00
windowSize: build.UnixfsChunkSize,
2019-08-27 22:10:23 +00:00
verifier: &UnixFs0Verifier{Root: root},
2019-08-26 18:23:11 +00:00
}
2019-08-28 23:01:38 +00:00
for cst.offset != size+initialOffset {
2019-08-26 18:23:11 +00:00
toFetch := cst.windowSize
if toFetch+cst.offset > size {
toFetch = size - cst.offset
}
2019-08-27 22:10:23 +00:00
log.Infof("Retrieve %dB @%d", toFetch, cst.offset)
2019-08-26 18:23:11 +00:00
2019-09-16 13:46:05 +00:00
err := cst.doOneExchange(ctx, toFetch, out)
2019-08-26 18:23:11 +00:00
if err != nil {
2019-09-17 08:34:41 +00:00
return xerrors.Errorf("retrieval exchange: %w", err)
2019-08-26 18:23:11 +00:00
}
2019-08-27 18:45:21 +00:00
cst.offset += toFetch
2019-08-26 18:23:11 +00:00
}
2019-08-27 18:45:21 +00:00
return nil
2019-08-26 18:23:11 +00:00
}
2019-09-16 13:46:05 +00:00
func (cst *clientStream) doOneExchange(ctx context.Context, toFetch uint64, out io.Writer) error {
payAmount := types.BigDiv(types.BigMul(cst.total, types.NewInt(toFetch)), cst.size)
payment, err := cst.setupPayment(ctx, payAmount)
if err != nil {
2019-09-17 08:34:41 +00:00
return xerrors.Errorf("setting up retrieval payment: %w", err)
2019-09-16 13:46:05 +00:00
}
deal := &DealProposal{
2019-09-16 13:46:05 +00:00
Payment: payment,
Ref: cst.root,
Params: RetParams{
Unixfs0: &Unixfs0Offer{
Offset: cst.offset,
Size: toFetch,
},
},
}
2019-08-26 18:23:11 +00:00
2019-11-07 14:11:39 +00:00
if err := cborutil.WriteCborRPC(cst.stream, deal); err != nil {
2019-12-11 13:59:15 +00:00
return xerrors.Errorf("sending incremental retrieval request: %w", err)
2019-08-26 18:23:11 +00:00
}
var resp DealResponse
2019-11-07 14:11:39 +00:00
if err := cborutil.ReadCborRPC(cst.peeker, &resp); err != nil {
2019-12-11 13:59:15 +00:00
return xerrors.Errorf("reading retrieval response: %w", err)
2019-08-26 18:23:11 +00:00
}
2019-08-27 18:45:21 +00:00
if resp.Status != Accepted {
2019-08-26 18:23:11 +00:00
cst.windowSize = build.UnixfsChunkSize
// TODO: apply some 'penalty' to miner 'reputation' (needs to be the same in both cases)
2019-08-27 18:45:21 +00:00
if resp.Status == Error {
return xerrors.Errorf("storage deal error: %s", resp.Message)
2019-08-26 18:23:11 +00:00
}
2019-08-27 18:45:21 +00:00
if resp.Status == Rejected {
return xerrors.Errorf("storage deal rejected: %s", resp.Message)
2019-08-26 18:23:11 +00:00
}
return xerrors.New("storage deal response had no Accepted section")
}
2019-08-27 22:10:23 +00:00
log.Info("Retrieval accepted, fetching blocks")
return cst.fetchBlocks(toFetch, out)
2019-08-26 18:23:11 +00:00
// TODO: maybe increase miner window size after success
}
2019-08-27 22:10:23 +00:00
func (cst *clientStream) fetchBlocks(toFetch uint64, out io.Writer) error {
2019-08-26 18:23:11 +00:00
blocksToFetch := (toFetch + build.UnixfsChunkSize - 1) / build.UnixfsChunkSize
for i := uint64(0); i < blocksToFetch; {
2019-08-27 22:10:23 +00:00
log.Infof("block %d of %d", i+1, blocksToFetch)
2019-08-26 18:23:11 +00:00
2019-08-29 11:31:25 +00:00
var block Block
2019-11-07 14:11:39 +00:00
if err := cborutil.ReadCborRPC(cst.peeker, &block); err != nil {
2019-09-17 08:34:41 +00:00
return xerrors.Errorf("reading fetchBlock response: %w", err)
2019-08-26 18:23:11 +00:00
}
2019-08-29 11:31:25 +00:00
dataBlocks, err := cst.consumeBlockMessage(block, out)
2019-08-26 18:23:11 +00:00
if err != nil {
2019-09-17 08:34:41 +00:00
return xerrors.Errorf("consuming retrieved blocks: %w", err)
2019-08-26 18:23:11 +00:00
}
i += dataBlocks
}
return nil
}
2019-08-29 11:31:25 +00:00
func (cst *clientStream) consumeBlockMessage(block Block, out io.Writer) (uint64, error) {
prefix, err := cid.PrefixFromBytes(block.Prefix)
2019-08-26 18:23:11 +00:00
if err != nil {
return 0, err
}
2019-08-29 11:31:25 +00:00
cid, err := prefix.Sum(block.Data)
if err != nil {
return 0, err
}
2019-08-29 11:31:25 +00:00
blk, err := blocks.NewBlockWithCid(block.Data, cid)
2019-08-26 18:23:11 +00:00
if err != nil {
return 0, err
}
2019-08-27 22:10:23 +00:00
internal, err := cst.verifier.Verify(context.TODO(), blk, out)
2019-08-26 18:23:11 +00:00
if err != nil {
2019-08-28 21:11:29 +00:00
log.Warnf("block verify failed: %s", err)
2019-08-26 18:23:11 +00:00
return 0, err
}
2019-08-27 22:10:23 +00:00
// TODO: Smarter out, maybe add to filestore automagically
// (Also, persist intermediate nodes)
2019-08-26 18:23:11 +00:00
if internal {
return 0, nil
}
2019-08-27 22:10:23 +00:00
2019-08-26 18:23:11 +00:00
return 1, nil
}
2019-09-16 13:46:05 +00:00
func (cst *clientStream) setupPayment(ctx context.Context, toSend types.BigInt) (api.PaymentInfo, error) {
amount := types.BigAdd(cst.transferred, toSend)
sv, err := cst.payapi.PaychVoucherCreate(ctx, cst.paych, amount, cst.lane)
if err != nil {
return api.PaymentInfo{}, err
}
cst.transferred = amount
return api.PaymentInfo{
Channel: cst.paych,
ChannelMessage: nil,
2019-09-24 21:13:47 +00:00
Vouchers: []*types.SignedVoucher{sv},
2019-09-16 13:46:05 +00:00
}, nil
}