feat(graphsync): unified graphsync instance

setup a single graphsync that loads from both the chainstore & client blockstore
This commit is contained in:
hannahhoward 2020-03-17 17:25:12 -07:00
parent 9e6ec71520
commit f259bc6a09
6 changed files with 58 additions and 39 deletions

1
go.mod
View File

@ -60,6 +60,7 @@ require (
github.com/ipfs/go-merkledag v0.2.4 github.com/ipfs/go-merkledag v0.2.4
github.com/ipfs/go-path v0.0.7 github.com/ipfs/go-path v0.0.7
github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb
github.com/ipld/go-ipld-prime v0.0.2-0.20191108012745-28a82f04c785
github.com/lib/pq v1.2.0 github.com/lib/pq v1.2.0
github.com/libp2p/go-libp2p v0.5.2 github.com/libp2p/go-libp2p v0.5.2
github.com/libp2p/go-libp2p-circuit v0.1.4 github.com/libp2p/go-libp2p-circuit v0.1.4

View File

@ -230,9 +230,12 @@ func Online() Option {
Override(new(*blocksync.BlockSyncService), blocksync.NewBlockSyncService), Override(new(*blocksync.BlockSyncService), blocksync.NewBlockSyncService),
Override(new(*peermgr.PeerMgr), peermgr.NewPeerMgr), Override(new(*peermgr.PeerMgr), peermgr.NewPeerMgr),
Override(new(dtypes.GraphsyncLoader), modules.GraphsyncLoader),
Override(new(dtypes.GraphsyncStorer), modules.GraphsyncStorer),
Override(new(dtypes.Graphsync), modules.Graphsync),
Override(RunHelloKey, modules.RunHello), Override(RunHelloKey, modules.RunHello),
Override(RunBlockSyncKey, modules.RunBlockSync), Override(RunBlockSyncKey, modules.RunBlockSync),
Override(RunChainGraphsync, modules.ChainGraphsync),
Override(RunPeerMgrKey, modules.RunPeerMgr), Override(RunPeerMgrKey, modules.RunPeerMgr),
Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks), Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks),
@ -241,7 +244,7 @@ func Online() Option {
Override(new(retrievalmarket.RetrievalClient), modules.RetrievalClient), Override(new(retrievalmarket.RetrievalClient), modules.RetrievalClient),
Override(new(dtypes.ClientDealStore), modules.NewClientDealStore), Override(new(dtypes.ClientDealStore), modules.NewClientDealStore),
Override(new(dtypes.ClientDataTransfer), modules.NewClientDAGServiceDataTransfer), Override(new(dtypes.ClientDataTransfer), modules.NewClientGraphsyncDataTransfer),
Override(new(*deals.ClientRequestValidator), modules.NewClientRequestValidator), Override(new(*deals.ClientRequestValidator), modules.NewClientRequestValidator),
Override(new(storagemarket.StorageClient), modules.StorageClient), Override(new(storagemarket.StorageClient), modules.StorageClient),
Override(new(storagemarket.StorageClientNode), storageadapter.NewClientNodeAdapter), Override(new(storagemarket.StorageClientNode), storageadapter.NewClientNodeAdapter),
@ -387,7 +390,6 @@ func Repo(r repo.Repo) Option {
Override(new(dtypes.ClientFilestore), modules.ClientFstore), Override(new(dtypes.ClientFilestore), modules.ClientFstore),
Override(new(dtypes.ClientBlockstore), modules.ClientBlockstore), Override(new(dtypes.ClientBlockstore), modules.ClientBlockstore),
Override(new(dtypes.ClientDAG), modules.ClientDAG), Override(new(dtypes.ClientDAG), modules.ClientDAG),
Override(new(dtypes.ClientGraphsync), modules.ClientGraphsync),
Override(new(ci.PrivKey), lp2p.PrivKey), Override(new(ci.PrivKey), lp2p.PrivKey),
Override(new(ci.PubKey), ci.PrivKey.GetPublic), Override(new(ci.PubKey), ci.PrivKey.GetPublic),

View File

@ -11,10 +11,6 @@ import (
"github.com/ipfs/go-blockservice" "github.com/ipfs/go-blockservice"
"github.com/ipfs/go-car" "github.com/ipfs/go-car"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
graphsync "github.com/ipfs/go-graphsync/impl"
"github.com/ipfs/go-graphsync/ipldbridge"
gsnet "github.com/ipfs/go-graphsync/network"
"github.com/ipfs/go-graphsync/storeutil"
blockstore "github.com/ipfs/go-ipfs-blockstore" blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/routing" "github.com/libp2p/go-libp2p-core/routing"
@ -79,15 +75,6 @@ func ChainBlockservice(bs dtypes.ChainBlockstore, rem dtypes.ChainExchange) dtyp
return blockservice.New(bs, rem) return blockservice.New(bs, rem)
} }
func ChainGraphsync(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.ChainGCBlockstore, h host.Host) dtypes.ClientGraphsync {
graphsyncNetwork := gsnet.NewFromLibp2pHost(h)
ipldBridge := ipldbridge.NewIPLDBridge()
loader := storeutil.LoaderForBlockstore(ibs)
gs := graphsync.New(helpers.LifecycleCtx(mctx, lc), graphsyncNetwork, ipldBridge, loader, nil)
return gs
}
func ChainStore(lc fx.Lifecycle, bs dtypes.ChainBlockstore, ds dtypes.MetadataDS, syscalls runtime.Syscalls) *store.ChainStore { func ChainStore(lc fx.Lifecycle, bs dtypes.ChainBlockstore, ds dtypes.MetadataDS, syscalls runtime.Syscalls) *store.ChainStore {
chain := store.NewChainStore(bs, ds, syscalls) chain := store.NewChainStore(bs, ds, syscalls)

View File

@ -22,10 +22,6 @@ import (
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace" "github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-filestore" "github.com/ipfs/go-filestore"
graphsync "github.com/ipfs/go-graphsync/impl"
"github.com/ipfs/go-graphsync/ipldbridge"
gsnet "github.com/ipfs/go-graphsync/network"
"github.com/ipfs/go-graphsync/storeutil"
blockstore "github.com/ipfs/go-ipfs-blockstore" blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-merkledag" "github.com/ipfs/go-merkledag"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
@ -68,9 +64,9 @@ func RegisterClientValidator(crv *deals.ClientRequestValidator, dtm dtypes.Clien
} }
} }
// NewClientDAGServiceDataTransfer returns a data transfer manager that just // NewClientGraphsyncDataTransfer returns a data transfer manager that just
// uses the clients's Client DAG service for transfers // uses the clients's Client DAG service for transfers
func NewClientDAGServiceDataTransfer(h host.Host, gs dtypes.ClientGraphsync) dtypes.ClientDataTransfer { func NewClientGraphsyncDataTransfer(h host.Host, gs dtypes.Graphsync) dtypes.ClientDataTransfer {
return graphsyncimpl.NewGraphSyncDataTransfer(h, gs) return graphsyncimpl.NewGraphSyncDataTransfer(h, gs)
} }
@ -96,18 +92,6 @@ func ClientDAG(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.ClientBlocks
return dag return dag
} }
// ClientGraphsync creates a graphsync instance which reads and writes blocks
// to the ClientBlockstore
func ClientGraphsync(mctx helpers.MetricsCtx, lc fx.Lifecycle, ibs dtypes.ClientBlockstore, h host.Host) dtypes.ClientGraphsync {
graphsyncNetwork := gsnet.NewFromLibp2pHost(h)
ipldBridge := ipldbridge.NewIPLDBridge()
loader := storeutil.LoaderForBlockstore(ibs)
storer := storeutil.StorerForBlockstore(ibs)
gs := graphsync.New(helpers.LifecycleCtx(mctx, lc), graphsyncNetwork, ipldBridge, loader, storer)
return gs
}
func NewClientRequestValidator(deals dtypes.ClientDealStore) *storageimpl.ClientRequestValidator { func NewClientRequestValidator(deals dtypes.ClientDealStore) *storageimpl.ClientRequestValidator {
return storageimpl.NewClientRequestValidator(deals) return storageimpl.NewClientRequestValidator(deals)
} }

View File

@ -9,7 +9,8 @@ import (
"github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync"
blockstore "github.com/ipfs/go-ipfs-blockstore" blockstore "github.com/ipfs/go-ipfs-blockstore"
exchange "github.com/ipfs/go-ipfs-exchange-interface" exchange "github.com/ipfs/go-ipfs-exchange-interface"
ipld "github.com/ipfs/go-ipld-format" format "github.com/ipfs/go-ipld-format"
"github.com/ipld/go-ipld-prime"
"github.com/filecoin-project/go-statestore" "github.com/filecoin-project/go-statestore"
) )
@ -24,14 +25,16 @@ type ChainGCLocker blockstore.GCLocker
type ChainGCBlockstore blockstore.GCBlockstore type ChainGCBlockstore blockstore.GCBlockstore
type ChainExchange exchange.Interface type ChainExchange exchange.Interface
type ChainBlockService bserv.BlockService type ChainBlockService bserv.BlockService
type ChainGraphsync graphsync.GraphExchange
type ClientFilestore *filestore.Filestore type ClientFilestore *filestore.Filestore
type ClientBlockstore blockstore.Blockstore type ClientBlockstore blockstore.Blockstore
type ClientDAG ipld.DAGService type ClientDAG format.DAGService
type ClientGraphsync graphsync.GraphExchange
type ClientDealStore *statestore.StateStore type ClientDealStore *statestore.StateStore
type GraphsyncLoader ipld.Loader
type GraphsyncStorer ipld.Storer
type Graphsync graphsync.GraphExchange
// ClientDataTransfer is a data transfer manager for the client // ClientDataTransfer is a data transfer manager for the client
type ClientDataTransfer datatransfer.Manager type ClientDataTransfer datatransfer.Manager
@ -41,6 +44,6 @@ type ProviderPieceStore piecestore.PieceStore
// ProviderDataTransfer is a data transfer manager for the provider // ProviderDataTransfer is a data transfer manager for the provider
type ProviderDataTransfer datatransfer.Manager type ProviderDataTransfer datatransfer.Manager
type StagingDAG ipld.DAGService type StagingDAG format.DAGService
type StagingBlockstore blockstore.Blockstore type StagingBlockstore blockstore.Blockstore
type StagingGraphsync graphsync.GraphExchange type StagingGraphsync graphsync.GraphExchange

42
node/modules/graphsync.go Normal file
View File

@ -0,0 +1,42 @@
package modules
import (
"io"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
graphsync "github.com/ipfs/go-graphsync/impl"
"github.com/ipfs/go-graphsync/ipldbridge"
gsnet "github.com/ipfs/go-graphsync/network"
"github.com/ipfs/go-graphsync/storeutil"
"github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p-core/host"
"go.uber.org/fx"
)
// GraphsyncStorer creates a storer that stores data in the client blockstore
func GraphsyncStorer(clientBs dtypes.ClientBlockstore) dtypes.GraphsyncStorer {
return dtypes.GraphsyncStorer(storeutil.StorerForBlockstore(clientBs))
}
// GraphsyncLoader creates a loader that reads from both the chain blockstore and the client blockstore
func GraphsyncLoader(clientBs dtypes.ClientBlockstore, chainBs dtypes.ChainBlockstore) dtypes.GraphsyncLoader {
clientLoader := storeutil.LoaderForBlockstore(clientBs)
chainLoader := storeutil.LoaderForBlockstore(chainBs)
return func(lnk ipld.Link, lnkCtx ipld.LinkContext) (io.Reader, error) {
reader, err := chainLoader(lnk, lnkCtx)
if err != nil {
return clientLoader(lnk, lnkCtx)
}
return reader, err
}
}
// Graphsync creates a graphsync instance from the given loader and storer
func Graphsync(mctx helpers.MetricsCtx, lc fx.Lifecycle, loader dtypes.GraphsyncLoader, storer dtypes.GraphsyncStorer, h host.Host) dtypes.Graphsync {
graphsyncNetwork := gsnet.NewFromLibp2pHost(h)
ipldBridge := ipldbridge.NewIPLDBridge()
gs := graphsync.New(helpers.LifecycleCtx(mctx, lc), graphsyncNetwork, ipldBridge, ipld.Loader(loader), ipld.Storer(storer))
return gs
}