basic retrieval content discovery

This commit is contained in:
Łukasz Magiera 2019-08-26 15:45:36 +02:00
parent cad3efb9ba
commit 28d3eb38eb
16 changed files with 434 additions and 20 deletions

View File

@ -91,6 +91,8 @@ type FullNode interface {
// ClientImport imports file under the specified path into filestore
ClientImport(ctx context.Context, path string) (cid.Cid, error)
ClientStartDeal(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error)
ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error)
ClientFindData(ctx context.Context, root cid.Cid) ([]RetrievalOffer, error) // TODO: specify serialization mode we want (defaults to unixfs for now)
// ClientUnimport removes references to the specified file from filestore
//ClientUnimport(path string)
@ -190,3 +192,13 @@ type SealedRef struct {
Offset uint64
Size uint32
}
type RetrievalOffer struct {
Err string
Size uint64
MinPrice types.BigInt
Miner address.Address
MinerPeerID peer.ID
}

View File

@ -69,7 +69,9 @@ type FullNodeStruct struct {
MpoolGetNonce func(context.Context, address.Address) (uint64, error) `perm:"read"`
ClientImport func(ctx context.Context, path string) (cid.Cid, error) `perm:"write"`
ClientListImports func(ctx context.Context) ([]Import, error) `perm:"read"`
ClientListImports func(ctx context.Context) ([]Import, error) `perm:"write"`
ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"`
ClientFindData func(ctx context.Context, root cid.Cid) ([]RetrievalOffer, error) `perm:"read"`
ClientStartDeal func(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) `perm:"admin"`
StateMinerSectors func(context.Context, address.Address) ([]*SectorInfo, error) `perm:"read"`
@ -152,6 +154,14 @@ func (c *FullNodeStruct) ClientImport(ctx context.Context, path string) (cid.Cid
return c.Internal.ClientImport(ctx, path)
}
func (c *FullNodeStruct) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) {
return c.Internal.ClientHasLocal(ctx, root)
}
func (c *FullNodeStruct) ClientFindData(ctx context.Context, root cid.Cid) ([]RetrievalOffer, error) {
return c.Internal.ClientFindData(ctx, root)
}
func (c *FullNodeStruct) ClientStartDeal(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) {
return c.Internal.ClientStartDeal(ctx, data, miner, price, blocksDuration)
}

View File

@ -4,8 +4,6 @@ import (
"context"
"math"
"github.com/filecoin-project/go-lotus/chain/actors"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
@ -19,12 +17,14 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/chain/wallet"
"github.com/filecoin-project/go-lotus/lib/cborrpc"
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
"github.com/filecoin-project/go-lotus/retrieval/discovery"
)
func init() {
@ -48,10 +48,11 @@ type ClientDeal struct {
}
type Client struct {
cs *store.ChainStore
h host.Host
w *wallet.Wallet
dag dtypes.ClientDAG
cs *store.ChainStore
h host.Host
w *wallet.Wallet
dag dtypes.ClientDAG
discovery *discovery.Local
deals StateStore
@ -61,12 +62,13 @@ type Client struct {
stopped chan struct{}
}
func NewClient(cs *store.ChainStore, h host.Host, w *wallet.Wallet, ds dtypes.MetadataDS, dag dtypes.ClientDAG) *Client {
func NewClient(cs *store.ChainStore, h host.Host, w *wallet.Wallet, ds dtypes.MetadataDS, dag dtypes.ClientDAG, discovery *discovery.Local) *Client {
c := &Client{
cs: cs,
h: h,
w: w,
dag: dag,
cs: cs,
h: h,
w: w,
dag: dag,
discovery: discovery,
deals: StateStore{ds: namespace.Wrap(ds, datastore.NewKey("/deals/client"))},
@ -242,7 +244,12 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal, vd *actors.Pie
// TODO: actually care about what happens with the deal after it was accepted
//c.incoming <- deal
return deal.ProposalCid, nil
// TODO: start tracking after the deal is sealed
return deal.ProposalCid, c.discovery.AddPeer(p.Data, discovery.RetrievalPeer{
Address: proposal.MinerAddress,
ID: deal.Miner,
})
}
func (c *Client) Stop() {

View File

@ -19,6 +19,8 @@ var clientCmd = &cli.Command{
clientImportCmd,
clientLocalCmd,
clientDealCmd,
clientFindCmd,
clientRetrieveCmd,
},
}
@ -108,3 +110,99 @@ var clientDealCmd = &cli.Command{
return nil
},
}
var clientFindCmd = &cli.Command{
Name: "find",
Usage: "find data in the network",
Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() {
fmt.Println("Usage: retrieve [CID]")
return nil
}
file, err := cid.Parse(cctx.Args().First())
if err != nil {
return err
}
api, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
ctx := ReqContext(cctx)
// Check if we already have this data locally
has, err := api.ClientHasLocal(ctx, file)
if err != nil {
return err
}
if has {
fmt.Println("LOCAL")
}
offers, err := api.ClientFindData(ctx, file)
if err != nil {
return err
}
for _, offer := range offers {
if offer.Err != "" {
fmt.Printf("ERR %s@%s: %s\n", offer.Miner, offer.MinerPeerID, offer.Err)
continue
}
fmt.Printf("RETRIEVAL %s@%s-%sfil-%db\n", offer.Miner, offer.MinerPeerID, offer.MinPrice, offer.Size)
}
return nil
},
}
var clientRetrieveCmd = &cli.Command{
Name: "retrieve",
Usage: "retrieve data from network",
Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() {
fmt.Println("Usage: retrieve [CID]")
return nil
}
file, err := cid.Parse(cctx.Args().First())
if err != nil {
return err
}
api, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
ctx := ReqContext(cctx)
// Check if we already have this data locally
has, err := api.ClientHasLocal(ctx, file)
if err != nil {
return err
}
if has {
fmt.Println("Success: Already in local storage")
return nil
}
_, err = api.ClientFindData(ctx, file)
if err != nil {
return err
}
// Find miner which may have this data
// Get merkle proofs (intermediate nodes)
// if acceptable, make retrieval deals to get data
// done
panic("TODO")
},
}

View File

@ -3,7 +3,7 @@ package node
import (
"context"
"errors"
"github.com/filecoin-project/go-lotus/storage/sectorblocks"
"github.com/filecoin-project/go-lotus/retrieval"
"reflect"
"time"
@ -36,8 +36,10 @@ import (
"github.com/filecoin-project/go-lotus/node/modules/testing"
"github.com/filecoin-project/go-lotus/node/repo"
"github.com/filecoin-project/go-lotus/paych"
"github.com/filecoin-project/go-lotus/retrieval/discovery"
"github.com/filecoin-project/go-lotus/storage"
"github.com/filecoin-project/go-lotus/storage/sector"
"github.com/filecoin-project/go-lotus/storage/sectorblocks"
)
// special is a type used to give keys to modules which
@ -80,6 +82,7 @@ const (
// storage miner
HandleDealsKey
HandleRetrievalKey
RunSectorServiceKey
RegisterMinerKey
@ -220,6 +223,10 @@ func Online() Option {
Override(RunBlockSyncKey, modules.RunBlockSync),
Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks),
Override(new(*discovery.Local), discovery.NewLocal),
Override(new(discovery.PeerResolver), modules.RetrievalResolver),
Override(new(*retrieval.Client), retrieval.NewClient),
Override(new(*deals.Client), deals.NewClient),
Override(RunDealClientKey, modules.RunDealClient),
@ -238,7 +245,9 @@ func Online() Option {
Override(new(dtypes.StagingDAG), modules.StagingDAG),
Override(new(*retrieval.Miner), retrieval.NewMiner),
Override(new(*deals.Handler), deals.NewHandler),
Override(HandleRetrievalKey, modules.HandleRetrieval),
Override(HandleDealsKey, modules.HandleDeals),
Override(RunSectorServiceKey, modules.RunSectorService),
Override(RegisterMinerKey, modules.RegisterMiner),
@ -302,6 +311,7 @@ func Repo(r repo.Repo) Option {
Override(new(dtypes.ChainBlockstore), modules.ChainBlockstore),
Override(new(dtypes.ClientFilestore), modules.ClientFstore),
Override(new(dtypes.ClientBlockstore), modules.ClientBlockstore),
Override(new(dtypes.ClientDAG), modules.ClientDAG),
Override(new(ci.PrivKey), pk),

View File

@ -3,6 +3,11 @@ package full
import (
"context"
"errors"
"github.com/filecoin-project/go-lotus/retrieval"
"github.com/filecoin-project/go-lotus/retrieval/discovery"
"github.com/ipfs/go-blockservice"
offline "github.com/ipfs/go-ipfs-exchange-offline"
"github.com/ipfs/go-merkledag"
"os"
"github.com/filecoin-project/go-lotus/api"
@ -31,10 +36,13 @@ type ClientAPI struct {
WalletAPI
PaychAPI
DealClient *deals.Client
DealClient *deals.Client
RetDiscovery discovery.PeerResolver
Retrieval *retrieval.Client
LocalDAG dtypes.ClientDAG
Filestore dtypes.ClientFilestore `optional:"true"`
LocalDAG dtypes.ClientDAG
Blockstore dtypes.ClientBlockstore
Filestore dtypes.ClientFilestore `optional:"true"`
}
func (a *ClientAPI) ClientStartDeal(ctx context.Context, data cid.Cid, miner address.Address, price types.BigInt, blocksDuration uint64) (*cid.Cid, error) {
@ -116,6 +124,34 @@ func (a *ClientAPI) ClientStartDeal(ctx context.Context, data cid.Cid, miner add
return &c, err
}
func (a *ClientAPI) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) {
// TODO: check if we have the ENTIRE dag
offExch := merkledag.NewDAGService(blockservice.New(a.Blockstore, offline.Exchange(a.Blockstore)))
_, err := offExch.Get(ctx, root)
if err == ipld.ErrNotFound {
return false, nil
}
if err != nil {
return false, err
}
return true, nil
}
func (a *ClientAPI) ClientFindData(ctx context.Context, root cid.Cid) ([]api.RetrievalOffer, error) {
peers, err := a.RetDiscovery.GetPeers(root)
if err != nil {
return nil, err
}
out := make([]api.RetrievalOffer, len(peers))
for k, p := range peers {
out[k] = a.Retrieval.Query(ctx, p, root)
}
return out, nil
}
func (a *ClientAPI) ClientImport(ctx context.Context, path string) (cid.Cid, error) {
f, err := os.Open(path)
if err != nil {

View File

@ -36,9 +36,11 @@ func ClientFstore(r repo.LockedRepo) (dtypes.ClientFilestore, error) {
return filestore.NewFilestore(bs, fm), nil
}
func ClientDAG(mctx helpers.MetricsCtx, lc fx.Lifecycle, fstore dtypes.ClientFilestore, rt routing.Routing, h host.Host) dtypes.ClientDAG {
ibs := blockstore.NewIdStore((*filestore.Filestore)(fstore))
func ClientBlockstore(fstore dtypes.ClientFilestore) dtypes.ClientBlockstore {
return blockstore.NewIdStore((*filestore.Filestore)(fstore))
}
func ClientDAG(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.ClientBlockstore, rt routing.Routing, h host.Host) dtypes.ClientDAG {
bitswapNetwork := network.NewFromIpfsHost(h, rt)
exch := bitswap.New(helpers.LifecycleCtx(mctx, lc), bitswapNetwork, ibs)

View File

@ -21,6 +21,7 @@ type ChainExchange exchange.Interface
type ChainBlockService bserv.BlockService
type ClientFilestore *filestore.Filestore
type ClientBlockstore blockstore.Blockstore
type ClientDAG ipld.DAGService
type StagingDAG ipld.DAGService

View File

@ -2,6 +2,7 @@ package modules
import (
"context"
"github.com/filecoin-project/go-lotus/retrieval/discovery"
"github.com/filecoin-project/go-lotus/storage/sector"
"github.com/libp2p/go-libp2p-core/host"
@ -83,3 +84,7 @@ func RunSectorService(lc fx.Lifecycle, secst *sector.Store) {
},
})
}
func RetrievalResolver(l *discovery.Local) discovery.PeerResolver {
return discovery.Multi(l)
}

View File

@ -2,6 +2,7 @@ package modules
import (
"context"
"github.com/filecoin-project/go-lotus/retrieval"
"github.com/filecoin-project/go-lotus/storage/sector"
"path/filepath"
@ -86,6 +87,15 @@ func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h
return sm, nil
}
func HandleRetrieval(host host.Host, lc fx.Lifecycle, m *retrieval.Miner) {
lc.Append(fx.Hook{
OnStart: func(context.Context) error {
host.SetStreamHandler(retrieval.QueryProtocolID, m.HandleStream)
return nil
},
})
}
func HandleDeals(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, h *deals.Handler) {
ctx := helpers.LifecycleCtx(mctx, lc)

62
retrieval/client.go Normal file
View File

@ -0,0 +1,62 @@
package retrieval
import (
"context"
"github.com/filecoin-project/go-lotus/lib/cborrpc"
"io/ioutil"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/host"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/retrieval/discovery"
)
var log = logging.Logger("retrieval")
type Client struct {
h host.Host
}
func NewClient(h host.Host) *Client {
return &Client{h: h}
}
func (c *Client) Query(ctx context.Context, p discovery.RetrievalPeer, data cid.Cid) api.RetrievalOffer {
s, err := c.h.NewStream(ctx, p.ID, QueryProtocolID)
if err != nil {
log.Warn(err)
return api.RetrievalOffer{Err: err.Error(), Miner: p.Address, MinerPeerID: p.ID}
}
defer s.Close()
err = cborrpc.WriteCborRPC(s, RetQuery{
Piece: data,
})
if err != nil {
log.Warn(err)
return api.RetrievalOffer{Err: err.Error(), Miner: p.Address, MinerPeerID: p.ID}
}
// TODO: read deadline
rawResp, err := ioutil.ReadAll(s)
if err != nil {
log.Warn(err)
return api.RetrievalOffer{Err: err.Error(), Miner: p.Address, MinerPeerID: p.ID}
}
var resp RetQueryResponse
if err := cbor.DecodeInto(rawResp, &resp); err != nil {
log.Warn(err)
return api.RetrievalOffer{Err: err.Error(), Miner: p.Address, MinerPeerID: p.ID}
}
return api.RetrievalOffer{
Size: resp.Size,
MinPrice: resp.MinPrice,
Miner: p.Address, // TODO: check
MinerPeerID: p.ID,
}
}

View File

@ -0,0 +1,25 @@
package discovery
import (
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/libp2p/go-libp2p-core/peer"
)
func init() {
cbor.RegisterCborType(RetrievalPeer{})
}
type RetrievalPeer struct {
Address address.Address
ID peer.ID // optional
}
type PeerResolver interface {
GetPeers(data cid.Cid) ([]RetrievalPeer, error) // TODO: channel
}
func Multi(r PeerResolver) PeerResolver { // TODO: actually support multiple mechanisms
return r
}

View File

@ -0,0 +1,53 @@
package discovery
import (
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
dshelp "github.com/ipfs/go-ipfs-ds-help"
cbor "github.com/ipfs/go-ipld-cbor"
logging "github.com/ipfs/go-log"
"github.com/filecoin-project/go-lotus/node/modules/dtypes"
)
var log = logging.Logger("ret-discovery")
type Local struct {
ds datastore.Datastore
}
func NewLocal(ds dtypes.MetadataDS) *Local {
return &Local{ds: namespace.Wrap(ds, datastore.NewKey("/deals/local"))}
}
func (l *Local) AddPeer(cid cid.Cid, peer RetrievalPeer) error {
// TODO: allow multiple peers here
// (implement an util for tracking map[thing][]otherThing, use in sectorBlockstore too)
log.Warn("Tracking multiple retrieval peers not implemented")
entry, err := cbor.DumpObject(peer)
if err != nil {
return err
}
return l.ds.Put(dshelp.CidToDsKey(cid), entry)
}
func (l *Local) GetPeers(data cid.Cid) ([]RetrievalPeer, error) {
entry, err := l.ds.Get(dshelp.CidToDsKey(data))
if err == datastore.ErrNotFound {
return []RetrievalPeer{}, nil
}
if err != nil {
return nil, err
}
var peer RetrievalPeer
if err := cbor.DecodeInto(entry, &peer); err != nil {
return nil, err
}
return []RetrievalPeer{peer}, nil
}
var _ PeerResolver = &Local{}

51
retrieval/miner.go Normal file
View File

@ -0,0 +1,51 @@
package retrieval
import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/lib/cborrpc"
"github.com/filecoin-project/go-lotus/storage/sectorblocks"
)
type Miner struct {
sectorBlocks *sectorblocks.SectorBlocks
}
func NewMiner(sblks *sectorblocks.SectorBlocks) *Miner {
return &Miner{
sectorBlocks: sblks,
}
}
func (m *Miner) HandleStream(stream network.Stream) {
defer stream.Close()
var query RetQuery
if err := cborrpc.ReadCborRPC(stream, &query); err != nil {
log.Errorf("Retrieval query: ReadCborRPC: %s", err)
return
}
refs, err := m.sectorBlocks.GetRefs(query.Piece)
if err != nil {
log.Errorf("Retrieval query: GetRefs: %s", err)
return
}
answer := RetQueryResponse{
Status: Unavailable,
}
if len(refs) > 0 {
answer.Status = Available
// TODO: get price, look for already unsealed ref to reduce work
answer.MinPrice = types.NewInt(uint64(refs[0].Size)) // TODO: Get this from somewhere
answer.Size = uint64(refs[0].Size)
}
if err := cborrpc.WriteCborRPC(stream, answer); err != nil {
log.Errorf("Retrieval query: WriteCborRPC: %s", err)
return
}
}

View File

@ -2,10 +2,14 @@ package retrieval
import (
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/filecoin-project/go-lotus/chain/types"
)
const ProtocolID = "/fil/retrieval/-1.0.0" // TODO: spec
const QueryProtocolID = "/fil/retrieval/qry/-1.0.0" // TODO: spec
type QueryResponse int
const (
@ -13,6 +17,13 @@ const (
Unavailable
)
func init() {
cbor.RegisterCborType(RetDealProposal{})
cbor.RegisterCborType(RetQuery{})
cbor.RegisterCborType(RetQueryResponse{})
}
type RetDealProposal struct {
Piece cid.Cid
Price types.BigInt
@ -26,5 +37,8 @@ type RetQuery struct {
type RetQueryResponse struct {
Status QueryResponse
MinPricePerMiB types.BigInt // TODO: check units used for sector size
Size uint64 // TODO: spec
// TODO: unseal price (+spec)
// TODO: address to send money for the deal?
MinPrice types.BigInt
}

View File

@ -106,6 +106,10 @@ func (r *refStorer) Read(p []byte) (n int, err error) {
return 0, err
}
if len(data) == 0 {
panic("Handle intermediate nodes") // TODO: !
}
if err := r.writeRef(cid, offset, uint32(len(data))); err != nil {
return 0, err
}
@ -158,3 +162,17 @@ func (st *SectorBlocks) List() (map[cid.Cid][]api.SealedRef, error) {
return out, nil
}
func (st *SectorBlocks) GetRefs(k cid.Cid) ([]api.SealedRef, error) { // TODO: track unsealed sectors
ent, err := st.keys.Get(dshelp.CidToDsKey(k))
if err != nil {
return nil, err
}
var refs []api.SealedRef
if err := cbor.DecodeInto(ent, &refs); err != nil {
return nil, err
}
return refs, nil
}