Optionally allow bitswap for chainstore
This commit is contained in:
parent
b0824ada15
commit
a1e1b03ca4
@ -236,7 +236,7 @@ func NewGeneratorWithSectors(numSectors int) (*ChainGen, error) {
|
||||
return nil, xerrors.Errorf("make genesis block failed: %w", err)
|
||||
}
|
||||
|
||||
cs := store.NewChainStore(bs, ds, sys, j)
|
||||
cs := store.NewChainStore(bs, bs, ds, sys, j)
|
||||
|
||||
genfb := &types.FullBlock{Header: genb.Genesis}
|
||||
gents := store.NewFullTipSet([]*types.FullBlock{genfb})
|
||||
|
@ -482,7 +482,7 @@ func MakeGenesisBlock(ctx context.Context, j journal.Journal, bs bstore.Blocksto
|
||||
}
|
||||
|
||||
// temp chainstore
|
||||
cs := store.NewChainStore(bs, datastore.NewMapDatastore(), sys, j)
|
||||
cs := store.NewChainStore(bs, bs, datastore.NewMapDatastore(), sys, j)
|
||||
|
||||
// Verify PreSealed Data
|
||||
stateroot, err = VerifyPreSealedData(ctx, cs, stateroot, template, keyIDs)
|
||||
|
@ -31,7 +31,7 @@ func TestIndexSeeks(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
|
||||
nbs := blockstore.NewTemporarySync()
|
||||
cs := store.NewChainStore(nbs, syncds.MutexWrap(datastore.NewMapDatastore()), nil, nil)
|
||||
cs := store.NewChainStore(nbs, nbs, syncds.MutexWrap(datastore.NewMapDatastore()), nil, nil)
|
||||
|
||||
_, err = cs.Import(bytes.NewReader(gencar))
|
||||
if err != nil {
|
||||
|
@ -104,8 +104,9 @@ type HeadChangeEvt struct {
|
||||
// 1. a tipset cache
|
||||
// 2. a block => messages references cache.
|
||||
type ChainStore struct {
|
||||
bs bstore.Blockstore
|
||||
ds dstore.Batching
|
||||
bs bstore.Blockstore
|
||||
localbs bstore.Blockstore
|
||||
ds dstore.Batching
|
||||
|
||||
heaviestLk sync.Mutex
|
||||
heaviest *types.TipSet
|
||||
@ -130,7 +131,8 @@ type ChainStore struct {
|
||||
journal journal.Journal
|
||||
}
|
||||
|
||||
func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder, j journal.Journal) *ChainStore {
|
||||
// localbs is guaranteed to fail Get* if requested block isn't stored locally
|
||||
func NewChainStore(bs bstore.Blockstore, localbs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder, j journal.Journal) *ChainStore {
|
||||
c, _ := lru.NewARC(DefaultMsgMetaCacheSize)
|
||||
tsc, _ := lru.NewARC(DefaultTipSetCacheSize)
|
||||
if j == nil {
|
||||
@ -138,6 +140,7 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallB
|
||||
}
|
||||
cs := &ChainStore{
|
||||
bs: bs,
|
||||
localbs: localbs,
|
||||
ds: ds,
|
||||
bestTips: pubsub.New(64),
|
||||
tipsets: make(map[abi.ChainEpoch][]cid.Cid),
|
||||
@ -522,7 +525,7 @@ func (cs *ChainStore) Contains(ts *types.TipSet) (bool, error) {
|
||||
// GetBlock fetches a BlockHeader with the supplied CID. It returns
|
||||
// blockstore.ErrNotFound if the block was not found in the BlockStore.
|
||||
func (cs *ChainStore) GetBlock(c cid.Cid) (*types.BlockHeader, error) {
|
||||
sb, err := cs.bs.Get(c)
|
||||
sb, err := cs.localbs.Get(c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -793,7 +796,7 @@ func (cs *ChainStore) GetCMessage(c cid.Cid) (types.ChainMsg, error) {
|
||||
}
|
||||
|
||||
func (cs *ChainStore) GetMessage(c cid.Cid) (*types.Message, error) {
|
||||
sb, err := cs.bs.Get(c)
|
||||
sb, err := cs.localbs.Get(c)
|
||||
if err != nil {
|
||||
log.Errorf("get message get failed: %s: %s", c, err)
|
||||
return nil, err
|
||||
@ -803,7 +806,7 @@ func (cs *ChainStore) GetMessage(c cid.Cid) (*types.Message, error) {
|
||||
}
|
||||
|
||||
func (cs *ChainStore) GetSignedMessage(c cid.Cid) (*types.SignedMessage, error) {
|
||||
sb, err := cs.bs.Get(c)
|
||||
sb, err := cs.localbs.Get(c)
|
||||
if err != nil {
|
||||
log.Errorf("get message get failed: %s: %s", c, err)
|
||||
return nil, err
|
||||
@ -939,7 +942,7 @@ func (cs *ChainStore) ReadMsgMetaCids(mmc cid.Cid) ([]cid.Cid, []cid.Cid, error)
|
||||
return mmcids.bls, mmcids.secpk, nil
|
||||
}
|
||||
|
||||
cst := cbor.NewCborStore(cs.bs)
|
||||
cst := cbor.NewCborStore(cs.localbs)
|
||||
var msgmeta types.MsgMeta
|
||||
if err := cst.Get(context.TODO(), mmc, &msgmeta); err != nil {
|
||||
return nil, nil, xerrors.Errorf("failed to load msgmeta (%s): %w", mmc, err)
|
||||
|
@ -63,7 +63,7 @@ func BenchmarkGetRandomness(b *testing.B) {
|
||||
|
||||
bs := blockstore.NewBlockstore(bds)
|
||||
|
||||
cs := store.NewChainStore(bs, mds, nil, nil)
|
||||
cs := store.NewChainStore(bs, bs, mds, nil, nil)
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
@ -97,7 +97,7 @@ func TestChainExportImport(t *testing.T) {
|
||||
}
|
||||
|
||||
nbs := blockstore.NewTemporary()
|
||||
cs := store.NewChainStore(nbs, datastore.NewMapDatastore(), nil, nil)
|
||||
cs := store.NewChainStore(nbs, nbs, datastore.NewMapDatastore(), nil, nil)
|
||||
|
||||
root, err := cs.Import(buf)
|
||||
if err != nil {
|
||||
@ -131,7 +131,7 @@ func TestChainExportImportFull(t *testing.T) {
|
||||
}
|
||||
|
||||
nbs := blockstore.NewTemporary()
|
||||
cs := store.NewChainStore(nbs, datastore.NewMapDatastore(), nil, nil)
|
||||
cs := store.NewChainStore(nbs, nbs, datastore.NewMapDatastore(), nil, nil)
|
||||
root, err := cs.Import(buf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -193,7 +193,7 @@ var importBenchCmd = &cli.Command{
|
||||
return nil
|
||||
}
|
||||
|
||||
cs := store.NewChainStore(bs, ds, vm.Syscalls(verifier), nil)
|
||||
cs := store.NewChainStore(bs, bs, ds, vm.Syscalls(verifier), nil)
|
||||
stm := stmgr.NewStateManager(cs)
|
||||
|
||||
if cctx.Bool("global-profile") {
|
||||
|
@ -180,7 +180,7 @@ var chainBalanceStateCmd = &cli.Command{
|
||||
|
||||
bs := blockstore.NewBlockstore(ds)
|
||||
|
||||
cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
|
||||
cs := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
|
||||
|
||||
cst := cbor.NewCborStore(bs)
|
||||
store := adt.WrapStore(ctx, cst)
|
||||
@ -394,7 +394,7 @@ var chainPledgeCmd = &cli.Command{
|
||||
|
||||
bs := blockstore.NewBlockstore(ds)
|
||||
|
||||
cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
|
||||
cs := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
|
||||
|
||||
cst := cbor.NewCborStore(bs)
|
||||
store := adt.WrapStore(ctx, cst)
|
||||
|
@ -83,7 +83,7 @@ var exportChainCmd = &cli.Command{
|
||||
|
||||
bs := blockstore.NewBlockstore(ds)
|
||||
|
||||
cs := store.NewChainStore(bs, mds, nil, nil)
|
||||
cs := store.NewChainStore(bs, bs, mds, nil, nil)
|
||||
if err := cs.Load(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -52,7 +52,7 @@ var genesisVerifyCmd = &cli.Command{
|
||||
}
|
||||
bs := blockstore.NewBlockstore(datastore.NewMapDatastore())
|
||||
|
||||
cs := store.NewChainStore(bs, datastore.NewMapDatastore(), nil, nil)
|
||||
cs := store.NewChainStore(bs, bs, datastore.NewMapDatastore(), nil, nil)
|
||||
|
||||
cf := cctx.Args().Get(0)
|
||||
f, err := os.Open(cf)
|
||||
|
@ -162,7 +162,7 @@ var stateTreePruneCmd = &cli.Command{
|
||||
|
||||
bs := blockstore.NewBlockstore(ds)
|
||||
|
||||
cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
|
||||
cs := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
|
||||
if err := cs.Load(); err != nil {
|
||||
return fmt.Errorf("loading chainstore: %w", err)
|
||||
}
|
||||
|
@ -422,7 +422,7 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) {
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to open journal: %w", err)
|
||||
}
|
||||
cst := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), j)
|
||||
cst := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), j)
|
||||
|
||||
log.Infof("importing chain from %s...", fname)
|
||||
|
||||
|
@ -87,7 +87,7 @@ func (d *Driver) ExecuteTipset(bs blockstore.Blockstore, ds ds.Batching, preroot
|
||||
syscalls = vm.Syscalls(ffiwrapper.ProofVerifier)
|
||||
vmRand = NewFixedRand()
|
||||
|
||||
cs = store.NewChainStore(bs, ds, syscalls, nil)
|
||||
cs = store.NewChainStore(bs, bs, ds, syscalls, nil)
|
||||
sm = stmgr.NewStateManager(cs)
|
||||
)
|
||||
|
||||
|
95
lib/blockstore/fallbackstore.go
Normal file
95
lib/blockstore/fallbackstore.go
Normal file
@ -0,0 +1,95 @@
|
||||
package blockstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
blocks "github.com/ipfs/go-block-format"
|
||||
"github.com/ipfs/go-cid"
|
||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
logging "github.com/ipfs/go-log"
|
||||
)
|
||||
|
||||
var log = logging.Logger("blockstore")
|
||||
|
||||
type FallbackStore struct {
|
||||
blockstore.Blockstore
|
||||
|
||||
fallbackGetBlock func(context.Context, cid.Cid) (blocks.Block, error)
|
||||
lk sync.RWMutex
|
||||
}
|
||||
|
||||
func (fbs *FallbackStore) SetFallback(fg func(context.Context, cid.Cid) (blocks.Block, error)) {
|
||||
fbs.lk.Lock()
|
||||
defer fbs.lk.Unlock()
|
||||
|
||||
fbs.fallbackGetBlock = fg
|
||||
}
|
||||
|
||||
func (fbs *FallbackStore) getFallback(c cid.Cid) (blocks.Block, error) {
|
||||
log.Errorw("fallbackstore: Block not found locally, fetching from the network", "cid", c)
|
||||
fbs.lk.RLock()
|
||||
defer fbs.lk.RUnlock()
|
||||
|
||||
if fbs.fallbackGetBlock == nil {
|
||||
// FallbackStore wasn't configured yet (chainstore/bitswap aren't up yet)
|
||||
// Wait for a bit and retry
|
||||
fbs.lk.RUnlock()
|
||||
time.Sleep(5 * time.Second)
|
||||
fbs.lk.RLock()
|
||||
|
||||
if fbs.fallbackGetBlock == nil {
|
||||
log.Errorw("fallbackstore: fallbackGetBlock not configured yet")
|
||||
return nil, blockstore.ErrNotFound
|
||||
}
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), 120*time.Second)
|
||||
defer cancel()
|
||||
|
||||
b, err := fbs.fallbackGetBlock(ctx, c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// chain bitswap puts blocks in temp blockstore which is cleaned up
|
||||
// every few min (to drop any messages we fetched but don't want)
|
||||
// in this case we want to keep this block around
|
||||
if err := fbs.Put(b); err != nil {
|
||||
return nil, xerrors.Errorf("persisting fallback-fetched block: %w", err)
|
||||
}
|
||||
return b, nil
|
||||
}
|
||||
|
||||
func (fbs *FallbackStore) Get(c cid.Cid) (blocks.Block, error) {
|
||||
b, err := fbs.Blockstore.Get(c)
|
||||
switch err {
|
||||
case nil:
|
||||
return b, nil
|
||||
case blockstore.ErrNotFound:
|
||||
return fbs.getFallback(c)
|
||||
default:
|
||||
return b, err
|
||||
}
|
||||
}
|
||||
|
||||
func (fbs *FallbackStore) GetSize(c cid.Cid) (int, error) {
|
||||
sz, err := fbs.Blockstore.GetSize(c)
|
||||
switch err {
|
||||
case nil:
|
||||
return sz, nil
|
||||
case blockstore.ErrNotFound:
|
||||
b, err := fbs.getFallback(c)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return len(b.RawData()), nil
|
||||
default:
|
||||
return sz, err
|
||||
}
|
||||
}
|
||||
|
||||
var _ blockstore.Blockstore = &FallbackStore{}
|
@ -3,6 +3,7 @@ package node
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
metricsi "github.com/ipfs/go-metrics-interface"
|
||||
@ -138,6 +139,7 @@ const (
|
||||
HeadMetricsKey
|
||||
SettlePaymentChannelsKey
|
||||
RunPeerTaggerKey
|
||||
SetupFallbackBlockstoreKey
|
||||
|
||||
SetApiEndpointKey
|
||||
|
||||
@ -521,7 +523,13 @@ func Repo(r repo.Repo) Option {
|
||||
Override(new(repo.LockedRepo), modules.LockedRepo(lr)), // module handles closing
|
||||
|
||||
Override(new(dtypes.MetadataDS), modules.Datastore),
|
||||
Override(new(dtypes.ChainBlockstore), modules.ChainBlockstore),
|
||||
Override(new(dtypes.ChainRawBlockstore), modules.ChainRawBlockstore),
|
||||
Override(new(dtypes.ChainBlockstore), From(new(dtypes.ChainRawBlockstore))),
|
||||
|
||||
If(os.Getenv("LOTUS_ENABLE_CHAINSTORE_FALLBACK") == "1",
|
||||
Override(new(dtypes.ChainBlockstore), modules.FallbackChainBlockstore),
|
||||
Override(SetupFallbackBlockstoreKey, modules.SetupFallbackBlockstore),
|
||||
),
|
||||
|
||||
Override(new(dtypes.ClientImportMgr), modules.ClientImportMgr),
|
||||
Override(new(dtypes.ClientMultiDstore), modules.ClientMultiDatastore),
|
||||
|
@ -76,7 +76,7 @@ func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds
|
||||
return mp, nil
|
||||
}
|
||||
|
||||
func ChainBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.ChainBlockstore, error) {
|
||||
func ChainRawBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.ChainRawBlockstore, error) {
|
||||
blocks, err := r.Datastore("/chain")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -91,16 +91,32 @@ func ChainBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo
|
||||
return cbs, nil
|
||||
}
|
||||
|
||||
func ChainGCBlockstore(bs dtypes.ChainBlockstore, gcl dtypes.ChainGCLocker) dtypes.ChainGCBlockstore {
|
||||
func ChainGCBlockstore(bs dtypes.ChainRawBlockstore, gcl dtypes.ChainGCLocker) dtypes.ChainGCBlockstore {
|
||||
return blockstore.NewGCBlockstore(bs, gcl)
|
||||
}
|
||||
|
||||
func ChainBlockService(bs dtypes.ChainBlockstore, rem dtypes.ChainBitswap) dtypes.ChainBlockService {
|
||||
func ChainBlockService(bs dtypes.ChainRawBlockstore, rem dtypes.ChainBitswap) dtypes.ChainBlockService {
|
||||
return blockservice.New(bs, rem)
|
||||
}
|
||||
|
||||
func ChainStore(lc fx.Lifecycle, bs dtypes.ChainBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore {
|
||||
chain := store.NewChainStore(bs, ds, syscalls, j)
|
||||
func FallbackChainBlockstore(rbs dtypes.ChainRawBlockstore) dtypes.ChainBlockstore {
|
||||
return &blockstore.FallbackStore{
|
||||
Blockstore: rbs,
|
||||
}
|
||||
}
|
||||
|
||||
func SetupFallbackBlockstore(cbs dtypes.ChainBlockstore, rem dtypes.ChainBitswap) error {
|
||||
fbs, ok := cbs.(*blockstore.FallbackStore)
|
||||
if !ok {
|
||||
return xerrors.Errorf("expected a FallbackStore")
|
||||
}
|
||||
|
||||
fbs.SetFallback(rem.GetBlock)
|
||||
return nil
|
||||
}
|
||||
|
||||
func ChainStore(bs dtypes.ChainBlockstore, lbs dtypes.ChainRawBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore {
|
||||
chain := store.NewChainStore(bs, lbs, ds, syscalls, j)
|
||||
|
||||
if err := chain.Load(); err != nil {
|
||||
log.Warnf("loading chain state from disk: %s", err)
|
||||
|
@ -23,7 +23,8 @@ import (
|
||||
// dy default it's namespaced under /metadata in main repo datastore
|
||||
type MetadataDS datastore.Batching
|
||||
|
||||
type ChainBlockstore blockstore.Blockstore
|
||||
type ChainRawBlockstore blockstore.Blockstore
|
||||
type ChainBlockstore blockstore.Blockstore // optionally bitswap backed
|
||||
|
||||
type ChainGCLocker blockstore.GCLocker
|
||||
type ChainGCBlockstore blockstore.GCBlockstore
|
||||
|
Loading…
Reference in New Issue
Block a user