From 60eedb699ec58f5a57645f70c622374ef2bf48c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 26 Aug 2019 20:23:11 +0200 Subject: [PATCH] retrieval: Client impl --- api/api.go | 4 +- api/struct.go | 4 +- build/params.go | 7 ++ go.mod | 1 + node/impl/full/client.go | 7 +- retrieval/client.go | 167 ++++++++++++++++++++++++++++++++++++--- retrieval/miner.go | 8 +- retrieval/types.go | 46 +++++++---- retrieval/verify.go | 19 +++++ 9 files changed, 229 insertions(+), 34 deletions(-) create mode 100644 build/params.go create mode 100644 retrieval/verify.go diff --git a/api/api.go b/api/api.go index 9651ee45e..17f633d64 100644 --- a/api/api.go +++ b/api/api.go @@ -92,7 +92,7 @@ type FullNode interface { ClientImport(ctx context.Context, path string) (cid.Cid, error) ClientStartDeal(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) - ClientFindData(ctx context.Context, root cid.Cid) ([]RetrievalOffer, error) // TODO: specify serialization mode we want (defaults to unixfs for now) + ClientFindData(ctx context.Context, root cid.Cid) ([]QueryOffer, error) // TODO: specify serialization mode we want (defaults to unixfs for now) // ClientUnimport removes references to the specified file from filestore //ClientUnimport(path string) @@ -193,7 +193,7 @@ type SealedRef struct { Size uint32 } -type RetrievalOffer struct { +type QueryOffer struct { Err string Size uint64 diff --git a/api/struct.go b/api/struct.go index 0b275e032..f3e6d449c 100644 --- a/api/struct.go +++ b/api/struct.go @@ -71,7 +71,7 @@ type FullNodeStruct struct { ClientImport func(ctx context.Context, path string) (cid.Cid, error) `perm:"write"` ClientListImports func(ctx context.Context) ([]Import, error) `perm:"write"` ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"` - ClientFindData func(ctx context.Context, root cid.Cid) ([]RetrievalOffer, error) `perm:"read"` + ClientFindData func(ctx context.Context, root cid.Cid) ([]QueryOffer, error) `perm:"read"` ClientStartDeal func(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) `perm:"admin"` StateMinerSectors func(context.Context, address.Address) ([]*SectorInfo, error) `perm:"read"` @@ -158,7 +158,7 @@ func (c *FullNodeStruct) ClientHasLocal(ctx context.Context, root cid.Cid) (bool return c.Internal.ClientHasLocal(ctx, root) } -func (c *FullNodeStruct) ClientFindData(ctx context.Context, root cid.Cid) ([]RetrievalOffer, error) { +func (c *FullNodeStruct) ClientFindData(ctx context.Context, root cid.Cid) ([]QueryOffer, error) { return c.Internal.ClientFindData(ctx, root) } diff --git a/build/params.go b/build/params.go new file mode 100644 index 000000000..b3b71c563 --- /dev/null +++ b/build/params.go @@ -0,0 +1,7 @@ +package build + +// Core network constants + +const UnixfsChunkSize uint64 = 1 << 20 + +// TODO: Move other important consts here diff --git a/go.mod b/go.mod index cf385f063..10f2bf6e6 100644 --- a/go.mod +++ b/go.mod @@ -53,6 +53,7 @@ require ( github.com/libp2p/go-libp2p-tls v0.1.0 github.com/libp2p/go-libp2p-yamux v0.2.1 github.com/libp2p/go-maddr-filter v0.0.5 + github.com/libp2p/go-msgio v0.0.4 github.com/miekg/dns v1.1.16 // indirect github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 github.com/mitchellh/go-homedir v1.1.0 diff --git a/node/impl/full/client.go b/node/impl/full/client.go index fd909f723..4c5a4e89a 100644 --- a/node/impl/full/client.go +++ b/node/impl/full/client.go @@ -3,6 +3,7 @@ package full import ( "context" "errors" + "github.com/filecoin-project/go-lotus/build" "github.com/filecoin-project/go-lotus/retrieval" "github.com/filecoin-project/go-lotus/retrieval/discovery" "github.com/ipfs/go-blockservice" @@ -138,13 +139,13 @@ func (a *ClientAPI) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, err return true, nil } -func (a *ClientAPI) ClientFindData(ctx context.Context, root cid.Cid) ([]api.RetrievalOffer, error) { +func (a *ClientAPI) ClientFindData(ctx context.Context, root cid.Cid) ([]api.QueryOffer, error) { peers, err := a.RetDiscovery.GetPeers(root) if err != nil { return nil, err } - out := make([]api.RetrievalOffer, len(peers)) + out := make([]api.QueryOffer, len(peers)) for k, p := range peers { out[k] = a.Retrieval.Query(ctx, p, root) } @@ -177,7 +178,7 @@ func (a *ClientAPI) ClientImport(ctx context.Context, path string) (cid.Cid, err NoCopy: true, } - db, err := params.New(chunker.DefaultSplitter(file)) + db, err := params.New(chunker.NewSizeSplitter(file, build.UnixfsChunkSize)) if err != nil { return cid.Undef, err } diff --git a/retrieval/client.go b/retrieval/client.go index 3c6f2bf95..b151bd728 100644 --- a/retrieval/client.go +++ b/retrieval/client.go @@ -2,15 +2,23 @@ package retrieval import ( "context" - "github.com/filecoin-project/go-lotus/lib/cborrpc" "io/ioutil" + blocks "github.com/ipfs/go-block-format" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-msgio" + "golang.org/x/xerrors" + pb "github.com/ipfs/go-bitswap/message/pb" "github.com/ipfs/go-cid" cbor "github.com/ipfs/go-ipld-cbor" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/host" "github.com/filecoin-project/go-lotus/api" + "github.com/filecoin-project/go-lotus/build" + "github.com/filecoin-project/go-lotus/chain/address" + "github.com/filecoin-project/go-lotus/lib/cborrpc" "github.com/filecoin-project/go-lotus/retrieval/discovery" ) @@ -24,39 +32,180 @@ func NewClient(h host.Host) *Client { return &Client{h: h} } -func (c *Client) Query(ctx context.Context, p discovery.RetrievalPeer, data cid.Cid) api.RetrievalOffer { +func (c *Client) Query(ctx context.Context, p discovery.RetrievalPeer, data cid.Cid) api.QueryOffer { s, err := c.h.NewStream(ctx, p.ID, QueryProtocolID) if err != nil { log.Warn(err) - return api.RetrievalOffer{Err: err.Error(), Miner: p.Address, MinerPeerID: p.ID} + return api.QueryOffer{Err: err.Error(), Miner: p.Address, MinerPeerID: p.ID} } defer s.Close() - err = cborrpc.WriteCborRPC(s, RetQuery{ + err = cborrpc.WriteCborRPC(s, Query{ Piece: data, }) if err != nil { log.Warn(err) - return api.RetrievalOffer{Err: err.Error(), Miner: p.Address, MinerPeerID: p.ID} + return api.QueryOffer{Err: err.Error(), Miner: p.Address, MinerPeerID: p.ID} } // TODO: read deadline rawResp, err := ioutil.ReadAll(s) if err != nil { log.Warn(err) - return api.RetrievalOffer{Err: err.Error(), Miner: p.Address, MinerPeerID: p.ID} + return api.QueryOffer{Err: err.Error(), Miner: p.Address, MinerPeerID: p.ID} } - var resp RetQueryResponse + var resp QueryResponse if err := cbor.DecodeInto(rawResp, &resp); err != nil { log.Warn(err) - return api.RetrievalOffer{Err: err.Error(), Miner: p.Address, MinerPeerID: p.ID} + return api.QueryOffer{Err: err.Error(), Miner: p.Address, MinerPeerID: p.ID} } - return api.RetrievalOffer{ + return api.QueryOffer{ Size: resp.Size, MinPrice: resp.MinPrice, Miner: p.Address, // TODO: check MinerPeerID: p.ID, } } + +type clientStream struct { + stream network.Stream + + root cid.Cid + offset uint64 + + windowSize uint64 // how much we "trust" the peer + verifier BlockVerifier +} + +// C > S +// +// Offset MUST be aligned on chunking boundaries, size is rounded up to leaf size +// +// > Deal{Mode: Unixfs0, RootCid, Offset, Size, Payment(nil if free)} +// < Resp{Accept} +// < ..(Intermediate Block) +// < ..Blocks +// < ..(Intermediate Block) +// < ..Blocks +// > Deal(...) +// < ... +func (c *Client) RetrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, miner peer.ID, minerAddr address.Address) error { + s, err := c.h.NewStream(ctx, miner, ProtocolID) + if err != nil { + return err + } + defer s.Close() + + cst := clientStream{ + stream: s, + + root: root, + offset: 0, // TODO: check how much data we have locally + + windowSize: build.UnixfsChunkSize, + verifier: &OptimisticVerifier{}, // TODO: Use a real verifier + } + + for { + toFetch := cst.windowSize + if toFetch+cst.offset > size { + toFetch = size - cst.offset + } + + err := cst.doOneExchange(toFetch) + if err != nil { + return err + } + } +} + +func (cst *clientStream) doOneExchange(toFetch uint64) error { + deal := Deal{Unixfs0: &Unixfs0Offer{ + Root: cst.root, + Offset: cst.offset, + Size: toFetch, + }} + + if err := cborrpc.WriteCborRPC(cst.stream, deal); err != nil { + return err + } + + var resp DealResponse + if err := cborrpc.ReadCborRPC(cst.stream, &resp); err != nil { + return err + } + + if resp.AcceptedResponse == nil { + cst.windowSize = build.UnixfsChunkSize + // TODO: apply some 'penalty' to miner 'reputation' (needs to be the same in both cases) + + if resp.ErrorResponse != nil { + return xerrors.Errorf("storage deal error: %s", resp.ErrorResponse.Message) + } + if resp.RejectedResponse != nil { + return xerrors.Errorf("storage deal rejected: %s", resp.RejectedResponse.Message) + } + return xerrors.New("storage deal response had no Accepted section") + } + + return cst.fetchBlocks(toFetch) + + // TODO: maybe increase miner window size after success +} + +func (cst *clientStream) fetchBlocks(toFetch uint64) error { + blocksToFetch := (toFetch + build.UnixfsChunkSize - 1) / build.UnixfsChunkSize + + // TODO: put msgio into spec + reader := msgio.NewVarintReaderSize(cst.stream, network.MessageSizeMax) + + for i := uint64(0); i < blocksToFetch; { + msg, err := reader.ReadMsg() + if err != nil { + return err + } + + var pb pb.Message_Block + if err := pb.Unmarshal(msg); err != nil { + return err + } + + dataBlocks, err := cst.consumeBlockMessage(pb) + if err != nil { + return err + } + + i += dataBlocks + + reader.ReleaseMsg(msg) + } + + return nil +} + +func (cst *clientStream) consumeBlockMessage(pb pb.Message_Block) (uint64, error) { + prefix, err := cid.PrefixFromBytes(pb.GetPrefix()) + if err != nil { + return 0, err + } + cid, err := prefix.Sum(pb.GetData()) + + blk, err := blocks.NewBlockWithCid(pb.GetData(), cid) + if err != nil { + return 0, err + } + + internal, err := cst.verifier.Verify(blk) + if err != nil { + return 0, err + } + + // TODO: Persist block + + if internal { + return 0, nil + } + return 1, nil +} diff --git a/retrieval/miner.go b/retrieval/miner.go index 0ca2439b3..5e2525d76 100644 --- a/retrieval/miner.go +++ b/retrieval/miner.go @@ -21,7 +21,7 @@ func NewMiner(sblks *sectorblocks.SectorBlocks) *Miner { func (m *Miner) HandleStream(stream network.Stream) { defer stream.Close() - var query RetQuery + var query Query if err := cborrpc.ReadCborRPC(stream, &query); err != nil { log.Errorf("Retrieval query: ReadCborRPC: %s", err) return @@ -33,15 +33,15 @@ func (m *Miner) HandleStream(stream network.Stream) { return } - answer := RetQueryResponse{ + answer := QueryResponse{ Status: Unavailable, } if len(refs) > 0 { answer.Status = Available // TODO: get price, look for already unsealed ref to reduce work - answer.MinPrice = types.NewInt(uint64(refs[0].Size)) // TODO: Get this from somewhere - answer.Size = uint64(refs[0].Size) + answer.MinPrice = types.NewInt(uint64(refs[0].Size) * 2) // TODO: Get this from somewhere + answer.Size = uint64(refs[0].Size) // TODO: verify on intermediate } if err := cborrpc.WriteCborRPC(stream, answer); err != nil { diff --git a/retrieval/types.go b/retrieval/types.go index 764a9ae6a..31dc07119 100644 --- a/retrieval/types.go +++ b/retrieval/types.go @@ -10,35 +10,53 @@ import ( const ProtocolID = "/fil/retrieval/-1.0.0" // TODO: spec const QueryProtocolID = "/fil/retrieval/qry/-1.0.0" // TODO: spec -type QueryResponse int +type QueryResponseStatus int const ( - Available QueryResponse = iota + Available QueryResponseStatus = iota Unavailable ) func init() { - cbor.RegisterCborType(RetDealProposal{}) + cbor.RegisterCborType(Deal{}) - cbor.RegisterCborType(RetQuery{}) - cbor.RegisterCborType(RetQueryResponse{}) + cbor.RegisterCborType(Query{}) + cbor.RegisterCborType(QueryResponse{}) } -type RetDealProposal struct { - Piece cid.Cid - Price types.BigInt - Payment types.SignedVoucher -} - -type RetQuery struct { +type Query struct { Piece cid.Cid + // TODO: payment } -type RetQueryResponse struct { - Status QueryResponse +type QueryResponse struct { + Status QueryResponseStatus Size uint64 // TODO: spec // TODO: unseal price (+spec) + // TODO: sectors to unseal // TODO: address to send money for the deal? MinPrice types.BigInt } + +type Unixfs0Offer struct { + Root cid.Cid + Offset uint64 + Size uint64 +} + +type Deal struct { + Unixfs0 *Unixfs0Offer +} + +type AcceptedResponse struct{} +type RejectedResponse struct { + Message string +} +type ErrorResponse RejectedResponse + +type DealResponse struct { + *AcceptedResponse + *RejectedResponse + *ErrorResponse +} diff --git a/retrieval/verify.go b/retrieval/verify.go new file mode 100644 index 000000000..0c0dca227 --- /dev/null +++ b/retrieval/verify.go @@ -0,0 +1,19 @@ +package retrieval + +import blocks "github.com/ipfs/go-block-format" + +type BlockVerifier interface { + Verify(blocks.Block) (internal bool, err error) +} + +// TODO: BalancedUnixFs0Verifier + +type OptimisticVerifier struct { +} + +func (o *OptimisticVerifier) Verify(blocks.Block) (bool, error) { + // It's probably fine + return false, nil +} + +var _ BlockVerifier = &OptimisticVerifier{}