lotus/retrieval/client.go

220 lines
4.9 KiB
Go
Raw Normal View History

2019-08-26 13:45:36 +00:00
package retrieval
import (
"context"
"io/ioutil"
2019-08-26 18:23:11 +00:00
pb "github.com/ipfs/go-bitswap/message/pb"
2019-08-27 18:45:21 +00:00
blocks "github.com/ipfs/go-block-format"
2019-08-26 13:45:36 +00:00
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/host"
2019-08-27 18:45:21 +00:00
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-msgio"
"golang.org/x/xerrors"
2019-08-26 13:45:36 +00:00
"github.com/filecoin-project/go-lotus/api"
2019-08-26 18:23:11 +00:00
"github.com/filecoin-project/go-lotus/build"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/lib/cborrpc"
2019-08-26 13:45:36 +00:00
"github.com/filecoin-project/go-lotus/retrieval/discovery"
)
var log = logging.Logger("retrieval")
type Client struct {
h host.Host
}
func NewClient(h host.Host) *Client {
return &Client{h: h}
}
2019-08-26 18:23:11 +00:00
func (c *Client) Query(ctx context.Context, p discovery.RetrievalPeer, data cid.Cid) api.QueryOffer {
2019-08-26 13:45:36 +00:00
s, err := c.h.NewStream(ctx, p.ID, QueryProtocolID)
if err != nil {
log.Warn(err)
2019-08-26 18:23:11 +00:00
return api.QueryOffer{Err: err.Error(), Miner: p.Address, MinerPeerID: p.ID}
2019-08-26 13:45:36 +00:00
}
defer s.Close()
2019-08-26 18:23:11 +00:00
err = cborrpc.WriteCborRPC(s, Query{
2019-08-26 13:45:36 +00:00
Piece: data,
})
if err != nil {
log.Warn(err)
2019-08-26 18:23:11 +00:00
return api.QueryOffer{Err: err.Error(), Miner: p.Address, MinerPeerID: p.ID}
2019-08-26 13:45:36 +00:00
}
// TODO: read deadline
rawResp, err := ioutil.ReadAll(s)
if err != nil {
log.Warn(err)
2019-08-26 18:23:11 +00:00
return api.QueryOffer{Err: err.Error(), Miner: p.Address, MinerPeerID: p.ID}
2019-08-26 13:45:36 +00:00
}
2019-08-26 18:23:11 +00:00
var resp QueryResponse
2019-08-26 13:45:36 +00:00
if err := cbor.DecodeInto(rawResp, &resp); err != nil {
log.Warn(err)
2019-08-26 18:23:11 +00:00
return api.QueryOffer{Err: err.Error(), Miner: p.Address, MinerPeerID: p.ID}
2019-08-26 13:45:36 +00:00
}
2019-08-26 18:23:11 +00:00
return api.QueryOffer{
2019-08-27 19:54:39 +00:00
Root: data,
2019-08-26 13:45:36 +00:00
Size: resp.Size,
MinPrice: resp.MinPrice,
Miner: p.Address, // TODO: check
MinerPeerID: p.ID,
}
}
2019-08-26 18:23:11 +00:00
type clientStream struct {
stream network.Stream
root cid.Cid
offset uint64
windowSize uint64 // how much we "trust" the peer
verifier BlockVerifier
}
// C > S
//
// Offset MUST be aligned on chunking boundaries, size is rounded up to leaf size
//
// > Deal{Mode: Unixfs0, RootCid, Offset, Size, Payment(nil if free)}
// < Resp{Accept}
// < ..(Intermediate Block)
// < ..Blocks
// < ..(Intermediate Block)
// < ..Blocks
// > Deal(...)
// < ...
func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, miner peer.ID, minerAddr address.Address) error {
s, err := c.h.NewStream(ctx, miner, ProtocolID)
if err != nil {
return err
}
defer s.Close()
cst := clientStream{
stream: s,
root: root,
2019-08-27 18:45:21 +00:00
offset: 0, // TODO: Check how much data we have locally
2019-08-27 19:54:39 +00:00
// TODO: Support in handler
// TODO: Allow client to specify this
2019-08-26 18:23:11 +00:00
windowSize: build.UnixfsChunkSize,
verifier: &OptimisticVerifier{}, // TODO: Use a real verifier
}
2019-08-27 18:45:21 +00:00
for cst.offset != size {
2019-08-26 18:23:11 +00:00
toFetch := cst.windowSize
if toFetch+cst.offset > size {
toFetch = size - cst.offset
}
err := cst.doOneExchange(toFetch)
if err != nil {
return err
}
2019-08-27 18:45:21 +00:00
cst.offset += toFetch
2019-08-26 18:23:11 +00:00
}
2019-08-27 18:45:21 +00:00
log.Info("RETRIEVE SUCCESSFUL")
return nil
2019-08-26 18:23:11 +00:00
}
func (cst *clientStream) doOneExchange(toFetch uint64) error {
deal := Deal{Unixfs0: &Unixfs0Offer{
Root: cst.root,
Offset: cst.offset,
Size: toFetch,
}}
if err := cborrpc.WriteCborRPC(cst.stream, deal); err != nil {
return err
}
var resp DealResponse
if err := cborrpc.ReadCborRPC(cst.stream, &resp); err != nil {
2019-08-27 18:45:21 +00:00
log.Error(err)
2019-08-26 18:23:11 +00:00
return err
}
2019-08-27 18:45:21 +00:00
if resp.Status != Accepted {
2019-08-26 18:23:11 +00:00
cst.windowSize = build.UnixfsChunkSize
// TODO: apply some 'penalty' to miner 'reputation' (needs to be the same in both cases)
2019-08-27 18:45:21 +00:00
if resp.Status == Error {
return xerrors.Errorf("storage deal error: %s", resp.Message)
2019-08-26 18:23:11 +00:00
}
2019-08-27 18:45:21 +00:00
if resp.Status == Rejected {
return xerrors.Errorf("storage deal rejected: %s", resp.Message)
2019-08-26 18:23:11 +00:00
}
return xerrors.New("storage deal response had no Accepted section")
}
return cst.fetchBlocks(toFetch)
// TODO: maybe increase miner window size after success
}
func (cst *clientStream) fetchBlocks(toFetch uint64) error {
blocksToFetch := (toFetch + build.UnixfsChunkSize - 1) / build.UnixfsChunkSize
// TODO: put msgio into spec
reader := msgio.NewVarintReaderSize(cst.stream, network.MessageSizeMax)
for i := uint64(0); i < blocksToFetch; {
msg, err := reader.ReadMsg()
if err != nil {
return err
}
var pb pb.Message_Block
if err := pb.Unmarshal(msg); err != nil {
return err
}
dataBlocks, err := cst.consumeBlockMessage(pb)
if err != nil {
return err
}
i += dataBlocks
reader.ReleaseMsg(msg)
}
return nil
}
func (cst *clientStream) consumeBlockMessage(pb pb.Message_Block) (uint64, error) {
prefix, err := cid.PrefixFromBytes(pb.GetPrefix())
if err != nil {
return 0, err
}
cid, err := prefix.Sum(pb.GetData())
blk, err := blocks.NewBlockWithCid(pb.GetData(), cid)
if err != nil {
return 0, err
}
internal, err := cst.verifier.Verify(blk)
if err != nil {
return 0, err
}
// TODO: Persist block
if internal {
return 0, nil
}
return 1, nil
}