diff --git a/chain/gen/gen.go b/chain/gen/gen.go index d56f285a0..93e090ac8 100644 --- a/chain/gen/gen.go +++ b/chain/gen/gen.go @@ -236,7 +236,7 @@ func NewGeneratorWithSectors(numSectors int) (*ChainGen, error) { return nil, xerrors.Errorf("make genesis block failed: %w", err) } - cs := store.NewChainStore(bs, ds, sys, j) + cs := store.NewChainStore(bs, bs, ds, sys, j) genfb := &types.FullBlock{Header: genb.Genesis} gents := store.NewFullTipSet([]*types.FullBlock{genfb}) diff --git a/chain/gen/genesis/genesis.go b/chain/gen/genesis/genesis.go index 6a1090784..e441af7ae 100644 --- a/chain/gen/genesis/genesis.go +++ b/chain/gen/genesis/genesis.go @@ -482,7 +482,7 @@ func MakeGenesisBlock(ctx context.Context, j journal.Journal, bs bstore.Blocksto } // temp chainstore - cs := store.NewChainStore(bs, datastore.NewMapDatastore(), sys, j) + cs := store.NewChainStore(bs, bs, datastore.NewMapDatastore(), sys, j) // Verify PreSealed Data stateroot, err = VerifyPreSealedData(ctx, cs, stateroot, template, keyIDs) diff --git a/chain/store/index_test.go b/chain/store/index_test.go index 5283d10dc..11ff4371f 100644 --- a/chain/store/index_test.go +++ b/chain/store/index_test.go @@ -31,7 +31,7 @@ func TestIndexSeeks(t *testing.T) { ctx := context.TODO() nbs := blockstore.NewTemporarySync() - cs := store.NewChainStore(nbs, syncds.MutexWrap(datastore.NewMapDatastore()), nil, nil) + cs := store.NewChainStore(nbs, nbs, syncds.MutexWrap(datastore.NewMapDatastore()), nil, nil) _, err = cs.Import(bytes.NewReader(gencar)) if err != nil { diff --git a/chain/store/store.go b/chain/store/store.go index 00a78500e..f9df20d4f 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -104,8 +104,9 @@ type HeadChangeEvt struct { // 1. a tipset cache // 2. a block => messages references cache. type ChainStore struct { - bs bstore.Blockstore - ds dstore.Batching + bs bstore.Blockstore + localbs bstore.Blockstore + ds dstore.Batching heaviestLk sync.Mutex heaviest *types.TipSet @@ -130,7 +131,8 @@ type ChainStore struct { journal journal.Journal } -func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder, j journal.Journal) *ChainStore { +// localbs is guaranteed to fail Get* if requested block isn't stored locally +func NewChainStore(bs bstore.Blockstore, localbs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder, j journal.Journal) *ChainStore { c, _ := lru.NewARC(DefaultMsgMetaCacheSize) tsc, _ := lru.NewARC(DefaultTipSetCacheSize) if j == nil { @@ -138,6 +140,7 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallB } cs := &ChainStore{ bs: bs, + localbs: localbs, ds: ds, bestTips: pubsub.New(64), tipsets: make(map[abi.ChainEpoch][]cid.Cid), @@ -522,7 +525,7 @@ func (cs *ChainStore) Contains(ts *types.TipSet) (bool, error) { // GetBlock fetches a BlockHeader with the supplied CID. It returns // blockstore.ErrNotFound if the block was not found in the BlockStore. func (cs *ChainStore) GetBlock(c cid.Cid) (*types.BlockHeader, error) { - sb, err := cs.bs.Get(c) + sb, err := cs.localbs.Get(c) if err != nil { return nil, err } @@ -793,7 +796,7 @@ func (cs *ChainStore) GetCMessage(c cid.Cid) (types.ChainMsg, error) { } func (cs *ChainStore) GetMessage(c cid.Cid) (*types.Message, error) { - sb, err := cs.bs.Get(c) + sb, err := cs.localbs.Get(c) if err != nil { log.Errorf("get message get failed: %s: %s", c, err) return nil, err @@ -803,7 +806,7 @@ func (cs *ChainStore) GetMessage(c cid.Cid) (*types.Message, error) { } func (cs *ChainStore) GetSignedMessage(c cid.Cid) (*types.SignedMessage, error) { - sb, err := cs.bs.Get(c) + sb, err := cs.localbs.Get(c) if err != nil { log.Errorf("get message get failed: %s: %s", c, err) return nil, err @@ -939,7 +942,7 @@ func (cs *ChainStore) ReadMsgMetaCids(mmc cid.Cid) ([]cid.Cid, []cid.Cid, error) return mmcids.bls, mmcids.secpk, nil } - cst := cbor.NewCborStore(cs.bs) + cst := cbor.NewCborStore(cs.localbs) var msgmeta types.MsgMeta if err := cst.Get(context.TODO(), mmc, &msgmeta); err != nil { return nil, nil, xerrors.Errorf("failed to load msgmeta (%s): %w", mmc, err) diff --git a/chain/store/store_test.go b/chain/store/store_test.go index 160527104..61ff98620 100644 --- a/chain/store/store_test.go +++ b/chain/store/store_test.go @@ -63,7 +63,7 @@ func BenchmarkGetRandomness(b *testing.B) { bs := blockstore.NewBlockstore(bds) - cs := store.NewChainStore(bs, mds, nil, nil) + cs := store.NewChainStore(bs, bs, mds, nil, nil) b.ResetTimer() @@ -97,7 +97,7 @@ func TestChainExportImport(t *testing.T) { } nbs := blockstore.NewTemporary() - cs := store.NewChainStore(nbs, datastore.NewMapDatastore(), nil, nil) + cs := store.NewChainStore(nbs, nbs, datastore.NewMapDatastore(), nil, nil) root, err := cs.Import(buf) if err != nil { @@ -131,7 +131,7 @@ func TestChainExportImportFull(t *testing.T) { } nbs := blockstore.NewTemporary() - cs := store.NewChainStore(nbs, datastore.NewMapDatastore(), nil, nil) + cs := store.NewChainStore(nbs, nbs, datastore.NewMapDatastore(), nil, nil) root, err := cs.Import(buf) if err != nil { t.Fatal(err) diff --git a/cmd/lotus-bench/import.go b/cmd/lotus-bench/import.go index acbf9ebdc..bb7baf2b8 100644 --- a/cmd/lotus-bench/import.go +++ b/cmd/lotus-bench/import.go @@ -193,7 +193,7 @@ var importBenchCmd = &cli.Command{ return nil } - cs := store.NewChainStore(bs, ds, vm.Syscalls(verifier), nil) + cs := store.NewChainStore(bs, bs, ds, vm.Syscalls(verifier), nil) stm := stmgr.NewStateManager(cs) if cctx.Bool("global-profile") { diff --git a/cmd/lotus-shed/balances.go b/cmd/lotus-shed/balances.go index b12c069f5..8f3c9574e 100644 --- a/cmd/lotus-shed/balances.go +++ b/cmd/lotus-shed/balances.go @@ -180,7 +180,7 @@ var chainBalanceStateCmd = &cli.Command{ bs := blockstore.NewBlockstore(ds) - cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil) + cs := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil) cst := cbor.NewCborStore(bs) store := adt.WrapStore(ctx, cst) @@ -394,7 +394,7 @@ var chainPledgeCmd = &cli.Command{ bs := blockstore.NewBlockstore(ds) - cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil) + cs := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil) cst := cbor.NewCborStore(bs) store := adt.WrapStore(ctx, cst) diff --git a/cmd/lotus-shed/export.go b/cmd/lotus-shed/export.go index 3be49f0e0..dcf45e9a8 100644 --- a/cmd/lotus-shed/export.go +++ b/cmd/lotus-shed/export.go @@ -83,7 +83,7 @@ var exportChainCmd = &cli.Command{ bs := blockstore.NewBlockstore(ds) - cs := store.NewChainStore(bs, mds, nil, nil) + cs := store.NewChainStore(bs, bs, mds, nil, nil) if err := cs.Load(); err != nil { return err } diff --git a/cmd/lotus-shed/genesis-verify.go b/cmd/lotus-shed/genesis-verify.go index 4b197c58f..e15a42374 100644 --- a/cmd/lotus-shed/genesis-verify.go +++ b/cmd/lotus-shed/genesis-verify.go @@ -52,7 +52,7 @@ var genesisVerifyCmd = &cli.Command{ } bs := blockstore.NewBlockstore(datastore.NewMapDatastore()) - cs := store.NewChainStore(bs, datastore.NewMapDatastore(), nil, nil) + cs := store.NewChainStore(bs, bs, datastore.NewMapDatastore(), nil, nil) cf := cctx.Args().Get(0) f, err := os.Open(cf) diff --git a/cmd/lotus-shed/pruning.go b/cmd/lotus-shed/pruning.go index 6cf4f8c6f..f61c8d8ea 100644 --- a/cmd/lotus-shed/pruning.go +++ b/cmd/lotus-shed/pruning.go @@ -162,7 +162,7 @@ var stateTreePruneCmd = &cli.Command{ bs := blockstore.NewBlockstore(ds) - cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil) + cs := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil) if err := cs.Load(); err != nil { return fmt.Errorf("loading chainstore: %w", err) } diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index 7d078407a..42fee736b 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -422,7 +422,7 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) { if err != nil { return xerrors.Errorf("failed to open journal: %w", err) } - cst := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), j) + cst := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), j) log.Infof("importing chain from %s...", fname) diff --git a/conformance/driver.go b/conformance/driver.go index 95b6f2659..91f461722 100644 --- a/conformance/driver.go +++ b/conformance/driver.go @@ -87,7 +87,7 @@ func (d *Driver) ExecuteTipset(bs blockstore.Blockstore, ds ds.Batching, preroot syscalls = vm.Syscalls(ffiwrapper.ProofVerifier) vmRand = NewFixedRand() - cs = store.NewChainStore(bs, ds, syscalls, nil) + cs = store.NewChainStore(bs, bs, ds, syscalls, nil) sm = stmgr.NewStateManager(cs) ) diff --git a/lib/blockstore/fallbackstore.go b/lib/blockstore/fallbackstore.go new file mode 100644 index 000000000..0ce397d44 --- /dev/null +++ b/lib/blockstore/fallbackstore.go @@ -0,0 +1,95 @@ +package blockstore + +import ( + "context" + "sync" + "time" + + "golang.org/x/xerrors" + + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + blockstore "github.com/ipfs/go-ipfs-blockstore" + logging "github.com/ipfs/go-log" +) + +var log = logging.Logger("blockstore") + +type FallbackStore struct { + blockstore.Blockstore + + fallbackGetBlock func(context.Context, cid.Cid) (blocks.Block, error) + lk sync.RWMutex +} + +func (fbs *FallbackStore) SetFallback(fg func(context.Context, cid.Cid) (blocks.Block, error)) { + fbs.lk.Lock() + defer fbs.lk.Unlock() + + fbs.fallbackGetBlock = fg +} + +func (fbs *FallbackStore) getFallback(c cid.Cid) (blocks.Block, error) { + log.Errorw("fallbackstore: Block not found locally, fetching from the network", "cid", c) + fbs.lk.RLock() + defer fbs.lk.RUnlock() + + if fbs.fallbackGetBlock == nil { + // FallbackStore wasn't configured yet (chainstore/bitswap aren't up yet) + // Wait for a bit and retry + fbs.lk.RUnlock() + time.Sleep(5 * time.Second) + fbs.lk.RLock() + + if fbs.fallbackGetBlock == nil { + log.Errorw("fallbackstore: fallbackGetBlock not configured yet") + return nil, blockstore.ErrNotFound + } + } + + ctx, cancel := context.WithTimeout(context.TODO(), 120*time.Second) + defer cancel() + + b, err := fbs.fallbackGetBlock(ctx, c) + if err != nil { + return nil, err + } + + // chain bitswap puts blocks in temp blockstore which is cleaned up + // every few min (to drop any messages we fetched but don't want) + // in this case we want to keep this block around + if err := fbs.Put(b); err != nil { + return nil, xerrors.Errorf("persisting fallback-fetched block: %w", err) + } + return b, nil +} + +func (fbs *FallbackStore) Get(c cid.Cid) (blocks.Block, error) { + b, err := fbs.Blockstore.Get(c) + switch err { + case nil: + return b, nil + case blockstore.ErrNotFound: + return fbs.getFallback(c) + default: + return b, err + } +} + +func (fbs *FallbackStore) GetSize(c cid.Cid) (int, error) { + sz, err := fbs.Blockstore.GetSize(c) + switch err { + case nil: + return sz, nil + case blockstore.ErrNotFound: + b, err := fbs.getFallback(c) + if err != nil { + return 0, err + } + return len(b.RawData()), nil + default: + return sz, err + } +} + +var _ blockstore.Blockstore = &FallbackStore{} diff --git a/node/builder.go b/node/builder.go index d797ebb2f..1ab27f486 100644 --- a/node/builder.go +++ b/node/builder.go @@ -3,6 +3,7 @@ package node import ( "context" "errors" + "os" "time" metricsi "github.com/ipfs/go-metrics-interface" @@ -138,6 +139,7 @@ const ( HeadMetricsKey SettlePaymentChannelsKey RunPeerTaggerKey + SetupFallbackBlockstoreKey SetApiEndpointKey @@ -521,7 +523,13 @@ func Repo(r repo.Repo) Option { Override(new(repo.LockedRepo), modules.LockedRepo(lr)), // module handles closing Override(new(dtypes.MetadataDS), modules.Datastore), - Override(new(dtypes.ChainBlockstore), modules.ChainBlockstore), + Override(new(dtypes.ChainRawBlockstore), modules.ChainRawBlockstore), + Override(new(dtypes.ChainBlockstore), From(new(dtypes.ChainRawBlockstore))), + + If(os.Getenv("LOTUS_ENABLE_CHAINSTORE_FALLBACK") == "1", + Override(new(dtypes.ChainBlockstore), modules.FallbackChainBlockstore), + Override(SetupFallbackBlockstoreKey, modules.SetupFallbackBlockstore), + ), Override(new(dtypes.ClientImportMgr), modules.ClientImportMgr), Override(new(dtypes.ClientMultiDstore), modules.ClientMultiDatastore), diff --git a/node/modules/chain.go b/node/modules/chain.go index d1414b307..e5a0f7412 100644 --- a/node/modules/chain.go +++ b/node/modules/chain.go @@ -76,7 +76,7 @@ func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds return mp, nil } -func ChainBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.ChainBlockstore, error) { +func ChainRawBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.ChainRawBlockstore, error) { blocks, err := r.Datastore("/chain") if err != nil { return nil, err @@ -91,16 +91,32 @@ func ChainBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo return cbs, nil } -func ChainGCBlockstore(bs dtypes.ChainBlockstore, gcl dtypes.ChainGCLocker) dtypes.ChainGCBlockstore { +func ChainGCBlockstore(bs dtypes.ChainRawBlockstore, gcl dtypes.ChainGCLocker) dtypes.ChainGCBlockstore { return blockstore.NewGCBlockstore(bs, gcl) } -func ChainBlockService(bs dtypes.ChainBlockstore, rem dtypes.ChainBitswap) dtypes.ChainBlockService { +func ChainBlockService(bs dtypes.ChainRawBlockstore, rem dtypes.ChainBitswap) dtypes.ChainBlockService { return blockservice.New(bs, rem) } -func ChainStore(lc fx.Lifecycle, bs dtypes.ChainBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore { - chain := store.NewChainStore(bs, ds, syscalls, j) +func FallbackChainBlockstore(rbs dtypes.ChainRawBlockstore) dtypes.ChainBlockstore { + return &blockstore.FallbackStore{ + Blockstore: rbs, + } +} + +func SetupFallbackBlockstore(cbs dtypes.ChainBlockstore, rem dtypes.ChainBitswap) error { + fbs, ok := cbs.(*blockstore.FallbackStore) + if !ok { + return xerrors.Errorf("expected a FallbackStore") + } + + fbs.SetFallback(rem.GetBlock) + return nil +} + +func ChainStore(bs dtypes.ChainBlockstore, lbs dtypes.ChainRawBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore { + chain := store.NewChainStore(bs, lbs, ds, syscalls, j) if err := chain.Load(); err != nil { log.Warnf("loading chain state from disk: %s", err) diff --git a/node/modules/dtypes/storage.go b/node/modules/dtypes/storage.go index 13defda8d..9d7364577 100644 --- a/node/modules/dtypes/storage.go +++ b/node/modules/dtypes/storage.go @@ -23,7 +23,8 @@ import ( // dy default it's namespaced under /metadata in main repo datastore type MetadataDS datastore.Batching -type ChainBlockstore blockstore.Blockstore +type ChainRawBlockstore blockstore.Blockstore +type ChainBlockstore blockstore.Blockstore // optionally bitswap backed type ChainGCLocker blockstore.GCLocker type ChainGCBlockstore blockstore.GCBlockstore