Merge pull request #4717 from filecoin-project/feat/optional-chain-bitswap

Optional chain Bitswap
This commit is contained in:
Łukasz Magiera 2020-11-06 16:49:39 +01:00 committed by GitHub
commit 426c2e8c2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 156 additions and 33 deletions

View File

@ -236,7 +236,7 @@ func NewGeneratorWithSectors(numSectors int) (*ChainGen, error) {
return nil, xerrors.Errorf("make genesis block failed: %w", err)
}
cs := store.NewChainStore(bs, ds, sys, j)
cs := store.NewChainStore(bs, bs, ds, sys, j)
genfb := &types.FullBlock{Header: genb.Genesis}
gents := store.NewFullTipSet([]*types.FullBlock{genfb})

View File

@ -482,7 +482,7 @@ func MakeGenesisBlock(ctx context.Context, j journal.Journal, bs bstore.Blocksto
}
// temp chainstore
cs := store.NewChainStore(bs, datastore.NewMapDatastore(), sys, j)
cs := store.NewChainStore(bs, bs, datastore.NewMapDatastore(), sys, j)
// Verify PreSealed Data
stateroot, err = VerifyPreSealedData(ctx, cs, stateroot, template, keyIDs)

View File

@ -31,7 +31,7 @@ func TestIndexSeeks(t *testing.T) {
ctx := context.TODO()
nbs := blockstore.NewTemporarySync()
cs := store.NewChainStore(nbs, syncds.MutexWrap(datastore.NewMapDatastore()), nil, nil)
cs := store.NewChainStore(nbs, nbs, syncds.MutexWrap(datastore.NewMapDatastore()), nil, nil)
_, err = cs.Import(bytes.NewReader(gencar))
if err != nil {

View File

@ -104,8 +104,9 @@ type HeadChangeEvt struct {
// 1. a tipset cache
// 2. a block => messages references cache.
type ChainStore struct {
bs bstore.Blockstore
ds dstore.Batching
bs bstore.Blockstore
localbs bstore.Blockstore
ds dstore.Batching
heaviestLk sync.Mutex
heaviest *types.TipSet
@ -130,7 +131,8 @@ type ChainStore struct {
journal journal.Journal
}
func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder, j journal.Journal) *ChainStore {
// localbs is guaranteed to fail Get* if requested block isn't stored locally
func NewChainStore(bs bstore.Blockstore, localbs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder, j journal.Journal) *ChainStore {
c, _ := lru.NewARC(DefaultMsgMetaCacheSize)
tsc, _ := lru.NewARC(DefaultTipSetCacheSize)
if j == nil {
@ -138,6 +140,7 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallB
}
cs := &ChainStore{
bs: bs,
localbs: localbs,
ds: ds,
bestTips: pubsub.New(64),
tipsets: make(map[abi.ChainEpoch][]cid.Cid),
@ -522,7 +525,7 @@ func (cs *ChainStore) Contains(ts *types.TipSet) (bool, error) {
// GetBlock fetches a BlockHeader with the supplied CID. It returns
// blockstore.ErrNotFound if the block was not found in the BlockStore.
func (cs *ChainStore) GetBlock(c cid.Cid) (*types.BlockHeader, error) {
sb, err := cs.bs.Get(c)
sb, err := cs.localbs.Get(c)
if err != nil {
return nil, err
}
@ -793,7 +796,7 @@ func (cs *ChainStore) GetCMessage(c cid.Cid) (types.ChainMsg, error) {
}
func (cs *ChainStore) GetMessage(c cid.Cid) (*types.Message, error) {
sb, err := cs.bs.Get(c)
sb, err := cs.localbs.Get(c)
if err != nil {
log.Errorf("get message get failed: %s: %s", c, err)
return nil, err
@ -803,7 +806,7 @@ func (cs *ChainStore) GetMessage(c cid.Cid) (*types.Message, error) {
}
func (cs *ChainStore) GetSignedMessage(c cid.Cid) (*types.SignedMessage, error) {
sb, err := cs.bs.Get(c)
sb, err := cs.localbs.Get(c)
if err != nil {
log.Errorf("get message get failed: %s: %s", c, err)
return nil, err
@ -939,7 +942,7 @@ func (cs *ChainStore) ReadMsgMetaCids(mmc cid.Cid) ([]cid.Cid, []cid.Cid, error)
return mmcids.bls, mmcids.secpk, nil
}
cst := cbor.NewCborStore(cs.bs)
cst := cbor.NewCborStore(cs.localbs)
var msgmeta types.MsgMeta
if err := cst.Get(context.TODO(), mmc, &msgmeta); err != nil {
return nil, nil, xerrors.Errorf("failed to load msgmeta (%s): %w", mmc, err)

View File

@ -63,7 +63,7 @@ func BenchmarkGetRandomness(b *testing.B) {
bs := blockstore.NewBlockstore(bds)
cs := store.NewChainStore(bs, mds, nil, nil)
cs := store.NewChainStore(bs, bs, mds, nil, nil)
b.ResetTimer()
@ -97,7 +97,7 @@ func TestChainExportImport(t *testing.T) {
}
nbs := blockstore.NewTemporary()
cs := store.NewChainStore(nbs, datastore.NewMapDatastore(), nil, nil)
cs := store.NewChainStore(nbs, nbs, datastore.NewMapDatastore(), nil, nil)
root, err := cs.Import(buf)
if err != nil {
@ -131,7 +131,7 @@ func TestChainExportImportFull(t *testing.T) {
}
nbs := blockstore.NewTemporary()
cs := store.NewChainStore(nbs, datastore.NewMapDatastore(), nil, nil)
cs := store.NewChainStore(nbs, nbs, datastore.NewMapDatastore(), nil, nil)
root, err := cs.Import(buf)
if err != nil {
t.Fatal(err)

View File

@ -193,7 +193,7 @@ var importBenchCmd = &cli.Command{
return nil
}
cs := store.NewChainStore(bs, ds, vm.Syscalls(verifier), nil)
cs := store.NewChainStore(bs, bs, ds, vm.Syscalls(verifier), nil)
stm := stmgr.NewStateManager(cs)
if cctx.Bool("global-profile") {

View File

@ -180,7 +180,7 @@ var chainBalanceStateCmd = &cli.Command{
bs := blockstore.NewBlockstore(ds)
cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
cs := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
cst := cbor.NewCborStore(bs)
store := adt.WrapStore(ctx, cst)
@ -394,7 +394,7 @@ var chainPledgeCmd = &cli.Command{
bs := blockstore.NewBlockstore(ds)
cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
cs := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
cst := cbor.NewCborStore(bs)
store := adt.WrapStore(ctx, cst)

View File

@ -83,7 +83,7 @@ var exportChainCmd = &cli.Command{
bs := blockstore.NewBlockstore(ds)
cs := store.NewChainStore(bs, mds, nil, nil)
cs := store.NewChainStore(bs, bs, mds, nil, nil)
if err := cs.Load(); err != nil {
return err
}

View File

@ -52,7 +52,7 @@ var genesisVerifyCmd = &cli.Command{
}
bs := blockstore.NewBlockstore(datastore.NewMapDatastore())
cs := store.NewChainStore(bs, datastore.NewMapDatastore(), nil, nil)
cs := store.NewChainStore(bs, bs, datastore.NewMapDatastore(), nil, nil)
cf := cctx.Args().Get(0)
f, err := os.Open(cf)

View File

@ -162,7 +162,7 @@ var stateTreePruneCmd = &cli.Command{
bs := blockstore.NewBlockstore(ds)
cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
cs := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
if err := cs.Load(); err != nil {
return fmt.Errorf("loading chainstore: %w", err)
}

View File

@ -422,7 +422,7 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) {
if err != nil {
return xerrors.Errorf("failed to open journal: %w", err)
}
cst := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), j)
cst := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), j)
log.Infof("importing chain from %s...", fname)

View File

@ -87,7 +87,7 @@ func (d *Driver) ExecuteTipset(bs blockstore.Blockstore, ds ds.Batching, preroot
syscalls = vm.Syscalls(ffiwrapper.ProofVerifier)
vmRand = NewFixedRand()
cs = store.NewChainStore(bs, ds, syscalls, nil)
cs = store.NewChainStore(bs, bs, ds, syscalls, nil)
sm = stmgr.NewStateManager(cs)
)

View File

@ -0,0 +1,95 @@
package blockstore
import (
"context"
"sync"
"time"
"golang.org/x/xerrors"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log"
)
var log = logging.Logger("blockstore")
type FallbackStore struct {
blockstore.Blockstore
fallbackGetBlock func(context.Context, cid.Cid) (blocks.Block, error)
lk sync.RWMutex
}
func (fbs *FallbackStore) SetFallback(fg func(context.Context, cid.Cid) (blocks.Block, error)) {
fbs.lk.Lock()
defer fbs.lk.Unlock()
fbs.fallbackGetBlock = fg
}
func (fbs *FallbackStore) getFallback(c cid.Cid) (blocks.Block, error) {
log.Errorw("fallbackstore: Block not found locally, fetching from the network", "cid", c)
fbs.lk.RLock()
defer fbs.lk.RUnlock()
if fbs.fallbackGetBlock == nil {
// FallbackStore wasn't configured yet (chainstore/bitswap aren't up yet)
// Wait for a bit and retry
fbs.lk.RUnlock()
time.Sleep(5 * time.Second)
fbs.lk.RLock()
if fbs.fallbackGetBlock == nil {
log.Errorw("fallbackstore: fallbackGetBlock not configured yet")
return nil, blockstore.ErrNotFound
}
}
ctx, cancel := context.WithTimeout(context.TODO(), 120*time.Second)
defer cancel()
b, err := fbs.fallbackGetBlock(ctx, c)
if err != nil {
return nil, err
}
// chain bitswap puts blocks in temp blockstore which is cleaned up
// every few min (to drop any messages we fetched but don't want)
// in this case we want to keep this block around
if err := fbs.Put(b); err != nil {
return nil, xerrors.Errorf("persisting fallback-fetched block: %w", err)
}
return b, nil
}
func (fbs *FallbackStore) Get(c cid.Cid) (blocks.Block, error) {
b, err := fbs.Blockstore.Get(c)
switch err {
case nil:
return b, nil
case blockstore.ErrNotFound:
return fbs.getFallback(c)
default:
return b, err
}
}
func (fbs *FallbackStore) GetSize(c cid.Cid) (int, error) {
sz, err := fbs.Blockstore.GetSize(c)
switch err {
case nil:
return sz, nil
case blockstore.ErrNotFound:
b, err := fbs.getFallback(c)
if err != nil {
return 0, err
}
return len(b.RawData()), nil
default:
return sz, err
}
}
var _ blockstore.Blockstore = &FallbackStore{}

View File

@ -94,7 +94,7 @@ func (bs *BufferedBS) DeleteBlock(c cid.Cid) error {
}
func (bs *BufferedBS) Get(c cid.Cid) (block.Block, error) {
if out, err := bs.read.Get(c); err != nil {
if out, err := bs.write.Get(c); err != nil {
if err != bstore.ErrNotFound {
return nil, err
}
@ -102,7 +102,7 @@ func (bs *BufferedBS) Get(c cid.Cid) (block.Block, error) {
return out, nil
}
return bs.write.Get(c)
return bs.read.Get(c)
}
func (bs *BufferedBS) GetSize(c cid.Cid) (int, error) {
@ -115,7 +115,7 @@ func (bs *BufferedBS) GetSize(c cid.Cid) (int, error) {
}
func (bs *BufferedBS) Put(blk block.Block) error {
has, err := bs.read.Has(blk.Cid())
has, err := bs.read.Has(blk.Cid()) // TODO: consider dropping this check
if err != nil {
return err
}
@ -128,7 +128,7 @@ func (bs *BufferedBS) Put(blk block.Block) error {
}
func (bs *BufferedBS) Has(c cid.Cid) (bool, error) {
has, err := bs.read.Has(c)
has, err := bs.write.Has(c)
if err != nil {
return false, err
}
@ -136,7 +136,7 @@ func (bs *BufferedBS) Has(c cid.Cid) (bool, error) {
return true, nil
}
return bs.write.Has(c)
return bs.read.Has(c)
}
func (bs *BufferedBS) HashOnRead(hor bool) {

View File

@ -3,6 +3,7 @@ package node
import (
"context"
"errors"
"os"
"time"
metricsi "github.com/ipfs/go-metrics-interface"
@ -138,6 +139,7 @@ const (
HeadMetricsKey
SettlePaymentChannelsKey
RunPeerTaggerKey
SetupFallbackBlockstoreKey
SetApiEndpointKey
@ -521,7 +523,13 @@ func Repo(r repo.Repo) Option {
Override(new(repo.LockedRepo), modules.LockedRepo(lr)), // module handles closing
Override(new(dtypes.MetadataDS), modules.Datastore),
Override(new(dtypes.ChainBlockstore), modules.ChainBlockstore),
Override(new(dtypes.ChainRawBlockstore), modules.ChainRawBlockstore),
Override(new(dtypes.ChainBlockstore), From(new(dtypes.ChainRawBlockstore))),
If(os.Getenv("LOTUS_ENABLE_CHAINSTORE_FALLBACK") == "1",
Override(new(dtypes.ChainBlockstore), modules.FallbackChainBlockstore),
Override(SetupFallbackBlockstoreKey, modules.SetupFallbackBlockstore),
),
Override(new(dtypes.ClientImportMgr), modules.ClientImportMgr),
Override(new(dtypes.ClientMultiDstore), modules.ClientMultiDatastore),

View File

@ -76,7 +76,7 @@ func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds
return mp, nil
}
func ChainBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.ChainBlockstore, error) {
func ChainRawBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.ChainRawBlockstore, error) {
blocks, err := r.Datastore("/chain")
if err != nil {
return nil, err
@ -91,16 +91,32 @@ func ChainBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo
return cbs, nil
}
func ChainGCBlockstore(bs dtypes.ChainBlockstore, gcl dtypes.ChainGCLocker) dtypes.ChainGCBlockstore {
func ChainGCBlockstore(bs dtypes.ChainRawBlockstore, gcl dtypes.ChainGCLocker) dtypes.ChainGCBlockstore {
return blockstore.NewGCBlockstore(bs, gcl)
}
func ChainBlockService(bs dtypes.ChainBlockstore, rem dtypes.ChainBitswap) dtypes.ChainBlockService {
func ChainBlockService(bs dtypes.ChainRawBlockstore, rem dtypes.ChainBitswap) dtypes.ChainBlockService {
return blockservice.New(bs, rem)
}
func ChainStore(lc fx.Lifecycle, bs dtypes.ChainBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore {
chain := store.NewChainStore(bs, ds, syscalls, j)
func FallbackChainBlockstore(rbs dtypes.ChainRawBlockstore) dtypes.ChainBlockstore {
return &blockstore.FallbackStore{
Blockstore: rbs,
}
}
func SetupFallbackBlockstore(cbs dtypes.ChainBlockstore, rem dtypes.ChainBitswap) error {
fbs, ok := cbs.(*blockstore.FallbackStore)
if !ok {
return xerrors.Errorf("expected a FallbackStore")
}
fbs.SetFallback(rem.GetBlock)
return nil
}
func ChainStore(bs dtypes.ChainBlockstore, lbs dtypes.ChainRawBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore {
chain := store.NewChainStore(bs, lbs, ds, syscalls, j)
if err := chain.Load(); err != nil {
log.Warnf("loading chain state from disk: %s", err)

View File

@ -23,7 +23,8 @@ import (
// dy default it's namespaced under /metadata in main repo datastore
type MetadataDS datastore.Batching
type ChainBlockstore blockstore.Blockstore
type ChainRawBlockstore blockstore.Blockstore
type ChainBlockstore blockstore.Blockstore // optionally bitswap backed
type ChainGCLocker blockstore.GCLocker
type ChainGCBlockstore blockstore.GCBlockstore