segregate chain and state blockstores.

This paves the way for better object lifetime management.

Concretely, it makes it possible to:
- have different stores backing chain and state data.
- having the same datastore library, but using different parameters.
- attach different caching layers/policies to each class of data, e.g.
  sizing caches differently.
- specifying different retention policies for chain and state data.

This separation is important because:
- access patterns/frequency of chain and state data are different.
- state is derivable from chain, so one could never expunge the chain
  store, and only retain state objects reachable from the last finality
  in the state store.
This commit is contained in:
Raúl Kripalani 2021-02-28 22:48:36 +00:00
parent 853de3daf7
commit 3795cc2bd2
45 changed files with 630 additions and 370 deletions

View File

@ -110,10 +110,7 @@ func Open(opts Options) (*Blockstore, error) {
return nil, fmt.Errorf("failed to open badger blockstore: %w", err)
}
bs := &Blockstore{
DB: db,
}
bs := &Blockstore{DB: db}
if p := opts.Prefix; p != "" {
bs.prefixing = true
bs.prefix = []byte(p)

View File

@ -8,18 +8,19 @@ import (
"strings"
"testing"
"github.com/filecoin-project/lotus/blockstore"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
u "github.com/ipfs/go-ipfs-util"
"github.com/filecoin-project/lotus/blockstore"
"github.com/stretchr/testify/require"
)
// TODO: move this to go-ipfs-blockstore.
type Suite struct {
NewBlockstore func(tb testing.TB) (bs blockstore.Blockstore, path string)
OpenBlockstore func(tb testing.TB, path string) (bs blockstore.Blockstore, err error)
NewBlockstore func(tb testing.TB) (bs blockstore.BasicBlockstore, path string)
OpenBlockstore func(tb testing.TB, path string) (bs blockstore.BasicBlockstore, err error)
}
func (s *Suite) RunTests(t *testing.T, prefix string) {
@ -290,7 +291,7 @@ func (s *Suite) TestDelete(t *testing.T) {
}
func insertBlocks(t *testing.T, bs blockstore.Blockstore, count int) []cid.Cid {
func insertBlocks(t *testing.T, bs blockstore.BasicBlockstore, count int) []cid.Cid {
keys := make([]cid.Cid, count)
for i := 0; i < count; i++ {
block := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i)))

View File

@ -11,6 +11,19 @@ import (
"github.com/ipfs/go-cid"
)
// UnwrapFallbackStore takes a blockstore, and returns the underlying blockstore
// if it was a FallbackStore. Otherwise, it just returns the supplied store
// unmodified.
func UnwrapFallbackStore(bs Blockstore) (Blockstore, bool) {
if fbs, ok := bs.(*FallbackStore); ok {
return fbs.Blockstore, true
}
return bs, false
}
// FallbackStore is a read-through store that queries another (potentially
// remote) source if the block is not found locally. If the block is found
// during the fallback, it stores it in the local store.
type FallbackStore struct {
Blockstore
@ -30,7 +43,7 @@ func (fbs *FallbackStore) SetFallback(missFn func(context.Context, cid.Cid) (blo
}
func (fbs *FallbackStore) getFallback(c cid.Cid) (blocks.Block, error) {
log.Errorw("fallbackstore: Block not found locally, fetching from the network", "cid", c)
log.Warnf("fallbackstore: block not found locally, fetching from the network; cid: %s", c)
fbs.lk.RLock()
defer fbs.lk.RUnlock()

154
blockstore/metrics.go Normal file
View File

@ -0,0 +1,154 @@
package blockstore
import (
"time"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)
//
// Currently unused, but kept in repo in case we introduce one of the candidate
// cache implementations (Freecache, Ristretto), both of which report these
// metrics.
//
// CacheMetricsEmitInterval is the interval at which metrics are emitted onto
// OpenCensus.
var CacheMetricsEmitInterval = 5 * time.Second
var (
CacheName, _ = tag.NewKey("cache_name")
)
// CacheMeasures groups all metrics emitted by the blockstore caches.
var CacheMeasures = struct {
HitRatio *stats.Float64Measure
Hits *stats.Int64Measure
Misses *stats.Int64Measure
Entries *stats.Int64Measure
QueriesServed *stats.Int64Measure
Adds *stats.Int64Measure
Updates *stats.Int64Measure
Evictions *stats.Int64Measure
CostAdded *stats.Int64Measure
CostEvicted *stats.Int64Measure
SetsDropped *stats.Int64Measure
SetsRejected *stats.Int64Measure
QueriesDropped *stats.Int64Measure
}{
HitRatio: stats.Float64("blockstore/cache/hit_ratio", "Hit ratio of blockstore cache", stats.UnitDimensionless),
Hits: stats.Int64("blockstore/cache/hits", "Total number of hits at blockstore cache", stats.UnitDimensionless),
Misses: stats.Int64("blockstore/cache/misses", "Total number of misses at blockstore cache", stats.UnitDimensionless),
Entries: stats.Int64("blockstore/cache/entry_count", "Total number of entries currently in the blockstore cache", stats.UnitDimensionless),
QueriesServed: stats.Int64("blockstore/cache/queries_served", "Total number of queries served by the blockstore cache", stats.UnitDimensionless),
Adds: stats.Int64("blockstore/cache/adds", "Total number of adds to blockstore cache", stats.UnitDimensionless),
Updates: stats.Int64("blockstore/cache/updates", "Total number of updates in blockstore cache", stats.UnitDimensionless),
Evictions: stats.Int64("blockstore/cache/evictions", "Total number of evictions from blockstore cache", stats.UnitDimensionless),
CostAdded: stats.Int64("blockstore/cache/cost_added", "Total cost (byte size) of entries added into blockstore cache", stats.UnitBytes),
CostEvicted: stats.Int64("blockstore/cache/cost_evicted", "Total cost (byte size) of entries evicted by blockstore cache", stats.UnitBytes),
SetsDropped: stats.Int64("blockstore/cache/sets_dropped", "Total number of sets dropped by blockstore cache", stats.UnitDimensionless),
SetsRejected: stats.Int64("blockstore/cache/sets_rejected", "Total number of sets rejected by blockstore cache", stats.UnitDimensionless),
QueriesDropped: stats.Int64("blockstore/cache/queries_dropped", "Total number of queries dropped by blockstore cache", stats.UnitDimensionless),
}
// CacheViews groups all cache-related default views.
var CacheViews = struct {
HitRatio *view.View
Hits *view.View
Misses *view.View
Entries *view.View
QueriesServed *view.View
Adds *view.View
Updates *view.View
Evictions *view.View
CostAdded *view.View
CostEvicted *view.View
SetsDropped *view.View
SetsRejected *view.View
QueriesDropped *view.View
}{
HitRatio: &view.View{
Measure: CacheMeasures.HitRatio,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{CacheName},
},
Hits: &view.View{
Measure: CacheMeasures.Hits,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{CacheName},
},
Misses: &view.View{
Measure: CacheMeasures.Misses,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{CacheName},
},
Entries: &view.View{
Measure: CacheMeasures.Entries,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{CacheName},
},
QueriesServed: &view.View{
Measure: CacheMeasures.QueriesServed,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{CacheName},
},
Adds: &view.View{
Measure: CacheMeasures.Adds,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{CacheName},
},
Updates: &view.View{
Measure: CacheMeasures.Updates,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{CacheName},
},
Evictions: &view.View{
Measure: CacheMeasures.Evictions,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{CacheName},
},
CostAdded: &view.View{
Measure: CacheMeasures.CostAdded,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{CacheName},
},
CostEvicted: &view.View{
Measure: CacheMeasures.CostEvicted,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{CacheName},
},
SetsDropped: &view.View{
Measure: CacheMeasures.SetsDropped,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{CacheName},
},
SetsRejected: &view.View{
Measure: CacheMeasures.SetsRejected,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{CacheName},
},
QueriesDropped: &view.View{
Measure: CacheMeasures.QueriesDropped,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{CacheName},
},
}
// DefaultViews exports all default views for this package.
var DefaultViews = []*view.View{
CacheViews.HitRatio,
CacheViews.Hits,
CacheViews.Misses,
CacheViews.Entries,
CacheViews.QueriesServed,
CacheViews.Adds,
CacheViews.Updates,
CacheViews.Evictions,
CacheViews.CostAdded,
CacheViews.CostEvicted,
CacheViews.SetsDropped,
CacheViews.SetsRejected,
CacheViews.QueriesDropped,
}

View File

@ -125,7 +125,7 @@ func NewGeneratorWithSectors(numSectors int) (*ChainGen, error) {
return nil, xerrors.Errorf("failed to get metadata datastore: %w", err)
}
bs, err := lr.Blockstore(context.TODO(), repo.BlockstoreChain)
bs, err := lr.Blockstore(context.TODO(), repo.UniversalBlockstore)
if err != nil {
return nil, err
}

View File

@ -406,7 +406,7 @@ func VerifyPreSealedData(ctx context.Context, cs *store.ChainStore, stateroot ci
StateBase: stateroot,
Epoch: 0,
Rand: &fakeRand{},
Bstore: cs.Blockstore(),
Bstore: cs.StateBlockstore(),
Syscalls: mkFakedSigSyscalls(cs.VMSys()),
CircSupplyCalc: nil,
NtwkVersion: genesisNetworkVersion,

View File

@ -70,7 +70,7 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sroot cid.Cid
StateBase: sroot,
Epoch: 0,
Rand: &fakeRand{},
Bstore: cs.Blockstore(),
Bstore: cs.StateBlockstore(),
Syscalls: mkFakedSigSyscalls(cs.VMSys()),
CircSupplyCalc: csc,
NtwkVersion: genesisNetworkVersion,

View File

@ -79,7 +79,7 @@ func MinerCreateBlock(ctx context.Context, sm *stmgr.StateManager, w api.WalletA
}
}
store := sm.ChainStore().Store(ctx)
store := sm.ChainStore().ActorStore(ctx)
blsmsgroot, err := toArray(store, blsMsgCids)
if err != nil {
return nil, xerrors.Errorf("building bls amt: %w", err)

View File

@ -59,7 +59,7 @@ func (sm *StateManager) Call(ctx context.Context, msg *types.Message, ts *types.
StateBase: bstate,
Epoch: bheight,
Rand: store.NewChainRand(sm.cs, ts.Cids()),
Bstore: sm.cs.Blockstore(),
Bstore: sm.cs.StateBlockstore(),
Syscalls: sm.cs.VMSys(),
CircSupplyCalc: sm.GetVMCirculatingSupply,
NtwkVersion: sm.GetNtwkVersion,
@ -174,7 +174,7 @@ func (sm *StateManager) CallWithGas(ctx context.Context, msg *types.Message, pri
StateBase: state,
Epoch: ts.Height() + 1,
Rand: r,
Bstore: sm.cs.Blockstore(),
Bstore: sm.cs.StateBlockstore(),
Syscalls: sm.cs.VMSys(),
CircSupplyCalc: sm.GetVMCirculatingSupply,
NtwkVersion: sm.GetNtwkVersion,

View File

@ -504,7 +504,7 @@ func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ Migratio
}
case builtin0.StorageMinerActorCodeID:
var st miner0.State
if err := sm.ChainStore().Store(ctx).Get(ctx, act.Head, &st); err != nil {
if err := sm.ChainStore().ActorStore(ctx).Get(ctx, act.Head, &st); err != nil {
return xerrors.Errorf("failed to load miner state: %w", err)
}
@ -548,7 +548,7 @@ func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ Migratio
return cid.Undef, xerrors.Errorf("failed to load power actor: %w", err)
}
cst := cbor.NewCborStore(sm.ChainStore().Blockstore())
cst := cbor.NewCborStore(sm.ChainStore().StateBlockstore())
if err := cst.Get(ctx, powAct.Head, &ps); err != nil {
return cid.Undef, xerrors.Errorf("failed to get power actor state: %w", err)
}
@ -582,7 +582,7 @@ func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ Migratio
}
case builtin0.StorageMinerActorCodeID:
var st miner0.State
if err := sm.ChainStore().Store(ctx).Get(ctx, act.Head, &st); err != nil {
if err := sm.ChainStore().ActorStore(ctx).Get(ctx, act.Head, &st); err != nil {
return xerrors.Errorf("failed to load miner state: %w", err)
}
@ -591,7 +591,7 @@ func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ Migratio
return xerrors.Errorf("failed to get miner info: %w", err)
}
sectorsArr, err := adt0.AsArray(sm.ChainStore().Store(ctx), st.Sectors)
sectorsArr, err := adt0.AsArray(sm.ChainStore().ActorStore(ctx), st.Sectors)
if err != nil {
return xerrors.Errorf("failed to load sectors array: %w", err)
}
@ -611,11 +611,11 @@ func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ Migratio
lbact, err := lbtree.GetActor(addr)
if err == nil {
var lbst miner0.State
if err := sm.ChainStore().Store(ctx).Get(ctx, lbact.Head, &lbst); err != nil {
if err := sm.ChainStore().ActorStore(ctx).Get(ctx, lbact.Head, &lbst); err != nil {
return xerrors.Errorf("failed to load miner state: %w", err)
}
lbsectors, err := adt0.AsArray(sm.ChainStore().Store(ctx), lbst.Sectors)
lbsectors, err := adt0.AsArray(sm.ChainStore().ActorStore(ctx), lbst.Sectors)
if err != nil {
return xerrors.Errorf("failed to load lb sectors array: %w", err)
}
@ -711,7 +711,7 @@ func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ Migratio
}
func UpgradeIgnition(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
store := sm.cs.Store(ctx)
store := sm.cs.ActorStore(ctx)
if build.UpgradeLiftoffHeight <= epoch {
return cid.Undef, xerrors.Errorf("liftoff height must be beyond ignition height")
@ -767,7 +767,7 @@ func UpgradeIgnition(ctx context.Context, sm *StateManager, _ MigrationCache, cb
func UpgradeRefuel(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
store := sm.cs.Store(ctx)
store := sm.cs.ActorStore(ctx)
tree, err := sm.StateTree(root)
if err != nil {
return cid.Undef, xerrors.Errorf("getting state tree: %w", err)
@ -792,7 +792,7 @@ func UpgradeRefuel(ctx context.Context, sm *StateManager, _ MigrationCache, cb E
}
func UpgradeActorsV2(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
buf := blockstore.NewTieredBstore(sm.cs.Blockstore(), blockstore.NewMemorySync())
buf := blockstore.NewTieredBstore(sm.cs.StateBlockstore(), blockstore.NewMemorySync())
store := store.ActorStore(ctx, buf)
info, err := store.Put(ctx, new(types.StateInfo0))
@ -843,7 +843,7 @@ func UpgradeLiftoff(ctx context.Context, sm *StateManager, _ MigrationCache, cb
return cid.Undef, xerrors.Errorf("getting state tree: %w", err)
}
err = setNetworkName(ctx, sm.cs.Store(ctx), tree, "mainnet")
err = setNetworkName(ctx, sm.cs.ActorStore(ctx), tree, "mainnet")
if err != nil {
return cid.Undef, xerrors.Errorf("setting network name: %w", err)
}
@ -852,7 +852,7 @@ func UpgradeLiftoff(ctx context.Context, sm *StateManager, _ MigrationCache, cb
}
func UpgradeCalico(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
store := sm.cs.Store(ctx)
store := sm.cs.ActorStore(ctx)
var stateRoot types.StateRoot
if err := store.Get(ctx, root, &stateRoot); err != nil {
return cid.Undef, xerrors.Errorf("failed to decode state root: %w", err)
@ -1009,7 +1009,7 @@ func upgradeActorsV3Common(
root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet,
config nv10.Config,
) (cid.Cid, error) {
buf := blockstore.NewTieredBstore(sm.cs.Blockstore(), blockstore.NewMemorySync())
buf := blockstore.NewTieredBstore(sm.cs.StateBlockstore(), blockstore.NewMemorySync())
store := store.ActorStore(ctx, buf)
// Load the state root.
@ -1239,7 +1239,7 @@ func resetGenesisMsigs0(ctx context.Context, sm *StateManager, store adt0.Store,
return xerrors.Errorf("getting genesis tipset: %w", err)
}
cst := cbor.NewCborStore(sm.cs.Blockstore())
cst := cbor.NewCborStore(sm.cs.StateBlockstore())
genesisTree, err := state.LoadStateTree(cst, gts.ParentState())
if err != nil {
return xerrors.Errorf("loading state tree: %w", err)

View File

@ -125,7 +125,7 @@ func TestForkHeightTriggers(t *testing.T) {
Height: testForkHeight,
Migration: func(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecCallback,
root cid.Cid, height abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
cst := ipldcbor.NewCborStore(sm.ChainStore().Blockstore())
cst := ipldcbor.NewCborStore(sm.ChainStore().StateBlockstore())
st, err := sm.StateTree(root)
if err != nil {

View File

@ -22,7 +22,7 @@ func (sm *StateManager) ParentStateTsk(tsk types.TipSetKey) (*state.StateTree, e
}
func (sm *StateManager) ParentState(ts *types.TipSet) (*state.StateTree, error) {
cst := cbor.NewCborStore(sm.cs.Blockstore())
cst := cbor.NewCborStore(sm.cs.StateBlockstore())
state, err := state.LoadStateTree(cst, sm.parentState(ts))
if err != nil {
return nil, xerrors.Errorf("load state tree: %w", err)
@ -32,7 +32,7 @@ func (sm *StateManager) ParentState(ts *types.TipSet) (*state.StateTree, error)
}
func (sm *StateManager) StateTree(st cid.Cid) (*state.StateTree, error) {
cst := cbor.NewCborStore(sm.cs.Blockstore())
cst := cbor.NewCborStore(sm.cs.StateBlockstore())
state, err := state.LoadStateTree(cst, st)
if err != nil {
return nil, xerrors.Errorf("load state tree: %w", err)

View File

@ -286,7 +286,7 @@ func (sm *StateManager) ApplyBlocks(ctx context.Context, parentEpoch abi.ChainEp
StateBase: base,
Epoch: epoch,
Rand: r,
Bstore: sm.cs.Blockstore(),
Bstore: sm.cs.StateBlockstore(),
Syscalls: sm.cs.VMSys(),
CircSupplyCalc: sm.GetVMCirculatingSupply,
NtwkVersion: sm.GetNtwkVersion,
@ -430,7 +430,8 @@ func (sm *StateManager) ApplyBlocks(ctx context.Context, parentEpoch abi.ChainEp
return cid.Cid{}, cid.Cid{}, err
}
rectarr := blockadt.MakeEmptyArray(sm.cs.Store(ctx))
// XXX: Is the height correct? Or should it be epoch-1?
rectarr := blockadt.MakeEmptyArray(sm.cs.ActorStore(ctx))
for i, receipt := range receipts {
if err := rectarr.Set(uint64(i), receipt); err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("failed to build receipts amt: %w", err)
@ -515,7 +516,7 @@ func (sm *StateManager) ResolveToKeyAddress(ctx context.Context, addr address.Ad
ts = sm.cs.GetHeaviestTipSet()
}
cst := cbor.NewCborStore(sm.cs.Blockstore())
cst := cbor.NewCborStore(sm.cs.StateBlockstore())
// First try to resolve the actor in the parent state, so we don't have to compute anything.
tree, err := state.LoadStateTree(cst, ts.ParentState())
@ -556,7 +557,7 @@ func (sm *StateManager) GetBlsPublicKey(ctx context.Context, addr address.Addres
}
func (sm *StateManager) LookupID(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) {
cst := cbor.NewCborStore(sm.cs.Blockstore())
cst := cbor.NewCborStore(sm.cs.StateBlockstore())
state, err := state.LoadStateTree(cst, sm.parentState(ts))
if err != nil {
return address.Undef, xerrors.Errorf("load state tree: %w", err)
@ -882,7 +883,7 @@ func (sm *StateManager) MarketBalance(ctx context.Context, addr address.Address,
return api.MarketBalance{}, err
}
mstate, err := market.Load(sm.cs.Store(ctx), act)
mstate, err := market.Load(sm.cs.ActorStore(ctx), act)
if err != nil {
return api.MarketBalance{}, err
}
@ -966,7 +967,7 @@ func (sm *StateManager) setupGenesisVestingSchedule(ctx context.Context) error {
return xerrors.Errorf("getting genesis tipset state: %w", err)
}
cst := cbor.NewCborStore(sm.cs.Blockstore())
cst := cbor.NewCborStore(sm.cs.StateBlockstore())
sTree, err := state.LoadStateTree(cst, st)
if err != nil {
return xerrors.Errorf("loading state tree: %w", err)
@ -1325,7 +1326,7 @@ func (sm *StateManager) GetCirculatingSupply(ctx context.Context, height abi.Cha
unCirc = big.Add(unCirc, actor.Balance)
case a == market.Address:
mst, err := market.Load(sm.cs.Store(ctx), actor)
mst, err := market.Load(sm.cs.ActorStore(ctx), actor)
if err != nil {
return err
}
@ -1342,7 +1343,7 @@ func (sm *StateManager) GetCirculatingSupply(ctx context.Context, height abi.Cha
circ = big.Add(circ, actor.Balance)
case builtin.IsStorageMinerActor(actor.Code):
mst, err := miner.Load(sm.cs.Store(ctx), actor)
mst, err := miner.Load(sm.cs.ActorStore(ctx), actor)
if err != nil {
return err
}
@ -1359,7 +1360,7 @@ func (sm *StateManager) GetCirculatingSupply(ctx context.Context, height abi.Cha
}
case builtin.IsMultisigActor(actor.Code):
mst, err := multisig.Load(sm.cs.Store(ctx), actor)
mst, err := multisig.Load(sm.cs.ActorStore(ctx), actor)
if err != nil {
return err
}
@ -1413,7 +1414,7 @@ func (sm *StateManager) GetPaychState(ctx context.Context, addr address.Address,
return nil, nil, err
}
actState, err := paych.Load(sm.cs.Store(ctx), act)
actState, err := paych.Load(sm.cs.ActorStore(ctx), act)
if err != nil {
return nil, nil, err
}
@ -1431,7 +1432,7 @@ func (sm *StateManager) GetMarketState(ctx context.Context, ts *types.TipSet) (m
return nil, err
}
actState, err := market.Load(sm.cs.Store(ctx), act)
actState, err := market.Load(sm.cs.ActorStore(ctx), act)
if err != nil {
return nil, err
}

View File

@ -48,7 +48,7 @@ func GetNetworkName(ctx context.Context, sm *StateManager, st cid.Cid) (dtypes.N
if err != nil {
return "", err
}
ias, err := init_.Load(sm.cs.Store(ctx), act)
ias, err := init_.Load(sm.cs.ActorStore(ctx), act)
if err != nil {
return "", err
}
@ -65,7 +65,7 @@ func GetMinerWorkerRaw(ctx context.Context, sm *StateManager, st cid.Cid, maddr
if err != nil {
return address.Undef, xerrors.Errorf("(get sset) failed to load miner actor: %w", err)
}
mas, err := miner.Load(sm.cs.Store(ctx), act)
mas, err := miner.Load(sm.cs.ActorStore(ctx), act)
if err != nil {
return address.Undef, xerrors.Errorf("(get sset) failed to load miner actor state: %w", err)
}
@ -75,7 +75,7 @@ func GetMinerWorkerRaw(ctx context.Context, sm *StateManager, st cid.Cid, maddr
return address.Undef, xerrors.Errorf("failed to load actor info: %w", err)
}
return vm.ResolveToKeyAddr(state, sm.cs.Store(ctx), info.Worker)
return vm.ResolveToKeyAddr(state, sm.cs.ActorStore(ctx), info.Worker)
}
func GetPower(ctx context.Context, sm *StateManager, ts *types.TipSet, maddr address.Address) (power.Claim, power.Claim, bool, error) {
@ -88,7 +88,7 @@ func GetPowerRaw(ctx context.Context, sm *StateManager, st cid.Cid, maddr addres
return power.Claim{}, power.Claim{}, false, xerrors.Errorf("(get sset) failed to load power actor state: %w", err)
}
pas, err := power.Load(sm.cs.Store(ctx), act)
pas, err := power.Load(sm.cs.ActorStore(ctx), act)
if err != nil {
return power.Claim{}, power.Claim{}, false, err
}
@ -123,7 +123,7 @@ func PreCommitInfo(ctx context.Context, sm *StateManager, maddr address.Address,
return nil, xerrors.Errorf("(get sset) failed to load miner actor: %w", err)
}
mas, err := miner.Load(sm.cs.Store(ctx), act)
mas, err := miner.Load(sm.cs.ActorStore(ctx), act)
if err != nil {
return nil, xerrors.Errorf("(get sset) failed to load miner actor state: %w", err)
}
@ -137,7 +137,7 @@ func MinerSectorInfo(ctx context.Context, sm *StateManager, maddr address.Addres
return nil, xerrors.Errorf("(get sset) failed to load miner actor: %w", err)
}
mas, err := miner.Load(sm.cs.Store(ctx), act)
mas, err := miner.Load(sm.cs.ActorStore(ctx), act)
if err != nil {
return nil, xerrors.Errorf("(get sset) failed to load miner actor state: %w", err)
}
@ -151,7 +151,7 @@ func GetSectorsForWinningPoSt(ctx context.Context, nv network.Version, pv ffiwra
return nil, xerrors.Errorf("failed to load miner actor: %w", err)
}
mas, err := miner.Load(sm.cs.Store(ctx), act)
mas, err := miner.Load(sm.cs.ActorStore(ctx), act)
if err != nil {
return nil, xerrors.Errorf("failed to load miner actor state: %w", err)
}
@ -249,7 +249,7 @@ func GetMinerSlashed(ctx context.Context, sm *StateManager, ts *types.TipSet, ma
return false, xerrors.Errorf("failed to load power actor: %w", err)
}
spas, err := power.Load(sm.cs.Store(ctx), act)
spas, err := power.Load(sm.cs.ActorStore(ctx), act)
if err != nil {
return false, xerrors.Errorf("failed to load power actor state: %w", err)
}
@ -272,7 +272,7 @@ func GetStorageDeal(ctx context.Context, sm *StateManager, dealID abi.DealID, ts
return nil, xerrors.Errorf("failed to load market actor: %w", err)
}
state, err := market.Load(sm.cs.Store(ctx), act)
state, err := market.Load(sm.cs.ActorStore(ctx), act)
if err != nil {
return nil, xerrors.Errorf("failed to load market actor state: %w", err)
}
@ -320,7 +320,7 @@ func ListMinerActors(ctx context.Context, sm *StateManager, ts *types.TipSet) ([
return nil, xerrors.Errorf("failed to load power actor: %w", err)
}
powState, err := power.Load(sm.cs.Store(ctx), act)
powState, err := power.Load(sm.cs.ActorStore(ctx), act)
if err != nil {
return nil, xerrors.Errorf("failed to load power actor state: %w", err)
}
@ -353,7 +353,7 @@ func ComputeState(ctx context.Context, sm *StateManager, height abi.ChainEpoch,
StateBase: base,
Epoch: height,
Rand: r,
Bstore: sm.cs.Blockstore(),
Bstore: sm.cs.StateBlockstore(),
Syscalls: sm.cs.VMSys(),
CircSupplyCalc: sm.GetVMCirculatingSupply,
NtwkVersion: sm.GetNtwkVersion,
@ -474,7 +474,7 @@ func MinerGetBaseInfo(ctx context.Context, sm *StateManager, bcs beacon.Schedule
return nil, xerrors.Errorf("failed to load miner actor: %w", err)
}
mas, err := miner.Load(sm.cs.Store(ctx), act)
mas, err := miner.Load(sm.cs.ActorStore(ctx), act)
if err != nil {
return nil, xerrors.Errorf("failed to load miner actor state: %w", err)
}
@ -623,7 +623,7 @@ func minerHasMinPower(ctx context.Context, sm *StateManager, addr address.Addres
return false, xerrors.Errorf("loading power actor state: %w", err)
}
ps, err := power.Load(sm.cs.Store(ctx), pact)
ps, err := power.Load(sm.cs.ActorStore(ctx), pact)
if err != nil {
return false, err
}
@ -654,7 +654,7 @@ func MinerEligibleToMine(ctx context.Context, sm *StateManager, addr address.Add
return false, xerrors.Errorf("loading power actor state: %w", err)
}
pstate, err := power.Load(sm.cs.Store(ctx), pact)
pstate, err := power.Load(sm.cs.ActorStore(ctx), pact)
if err != nil {
return false, err
}
@ -664,7 +664,7 @@ func MinerEligibleToMine(ctx context.Context, sm *StateManager, addr address.Add
return false, xerrors.Errorf("loading miner actor state: %w", err)
}
mstate, err := miner.Load(sm.cs.Store(ctx), mact)
mstate, err := miner.Load(sm.cs.ActorStore(ctx), mact)
if err != nil {
return false, err
}
@ -696,7 +696,7 @@ func MinerEligibleToMine(ctx context.Context, sm *StateManager, addr address.Add
}
func CheckTotalFIL(ctx context.Context, sm *StateManager, ts *types.TipSet) (abi.TokenAmount, error) {
str, err := state.LoadStateTree(sm.ChainStore().Store(ctx), ts.ParentState())
str, err := state.LoadStateTree(sm.ChainStore().ActorStore(ctx), ts.ParentState())
if err != nil {
return abi.TokenAmount{}, err
}

View File

@ -107,11 +107,11 @@ type HeadChangeEvt struct {
// 1. a tipset cache
// 2. a block => messages references cache.
type ChainStore struct {
bs bstore.Blockstore
localbs bstore.Blockstore
ds dstore.Batching
chainBlockstore bstore.Blockstore
stateBlockstore bstore.Blockstore
metadataDs dstore.Batching
localviewer bstore.Viewer
chainLocalBlockstore bstore.Blockstore
heaviestLk sync.Mutex
heaviest *types.TipSet
@ -139,30 +139,30 @@ type ChainStore struct {
wg sync.WaitGroup
}
// localbs is guaranteed to fail Get* if requested block isn't stored locally
func NewChainStore(bs bstore.Blockstore, localbs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder, j journal.Journal) *ChainStore {
mmCache, _ := lru.NewARC(DefaultMsgMetaCacheSize)
tsCache, _ := lru.NewARC(DefaultTipSetCacheSize)
// chainLocalBlockstore is guaranteed to fail Get* if requested block isn't stored locally
func NewChainStore(chainBs bstore.Blockstore, stateBs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder, j journal.Journal) *ChainStore {
c, _ := lru.NewARC(DefaultMsgMetaCacheSize)
tsc, _ := lru.NewARC(DefaultTipSetCacheSize)
if j == nil {
j = journal.NilJournal()
}
ctx, cancel := context.WithCancel(context.Background())
// unwraps the fallback store in case one is configured.
// some methods _need_ to operate on a local blockstore only.
localbs, _ := bstore.UnwrapFallbackStore(chainBs)
cs := &ChainStore{
bs: bs,
localbs: localbs,
ds: ds,
bestTips: pubsub.New(64),
tipsets: make(map[abi.ChainEpoch][]cid.Cid),
mmCache: mmCache,
tsCache: tsCache,
vmcalls: vmcalls,
cancelFn: cancel,
journal: j,
}
if v, ok := localbs.(bstore.Viewer); ok {
cs.localviewer = v
chainBlockstore: chainBs,
stateBlockstore: stateBs,
chainLocalBlockstore: localbs,
metadataDs: ds,
bestTips: pubsub.New(64),
tipsets: make(map[abi.ChainEpoch][]cid.Cid),
mmCache: c,
tsCache: tsc,
vmcalls: vmcalls,
cancelFn: cancel,
journal: j,
}
cs.evtTypes = [1]journal.EventType{
@ -216,7 +216,7 @@ func (cs *ChainStore) Close() error {
}
func (cs *ChainStore) Load() error {
head, err := cs.ds.Get(chainHeadKey)
head, err := cs.metadataDs.Get(chainHeadKey)
if err == dstore.ErrNotFound {
log.Warn("no previous chain state found")
return nil
@ -246,7 +246,7 @@ func (cs *ChainStore) writeHead(ts *types.TipSet) error {
return xerrors.Errorf("failed to marshal tipset: %w", err)
}
if err := cs.ds.Put(chainHeadKey, data); err != nil {
if err := cs.metadataDs.Put(chainHeadKey, data); err != nil {
return xerrors.Errorf("failed to write chain head to datastore: %w", err)
}
@ -306,13 +306,13 @@ func (cs *ChainStore) SubscribeHeadChanges(f ReorgNotifee) {
func (cs *ChainStore) IsBlockValidated(ctx context.Context, blkid cid.Cid) (bool, error) {
key := blockValidationCacheKeyPrefix.Instance(blkid.String())
return cs.ds.Has(key)
return cs.metadataDs.Has(key)
}
func (cs *ChainStore) MarkBlockAsValidated(ctx context.Context, blkid cid.Cid) error {
key := blockValidationCacheKeyPrefix.Instance(blkid.String())
if err := cs.ds.Put(key, []byte{0}); err != nil {
if err := cs.metadataDs.Put(key, []byte{0}); err != nil {
return xerrors.Errorf("cache block validation: %w", err)
}
@ -322,7 +322,7 @@ func (cs *ChainStore) MarkBlockAsValidated(ctx context.Context, blkid cid.Cid) e
func (cs *ChainStore) UnmarkBlockAsValidated(ctx context.Context, blkid cid.Cid) error {
key := blockValidationCacheKeyPrefix.Instance(blkid.String())
if err := cs.ds.Delete(key); err != nil {
if err := cs.metadataDs.Delete(key); err != nil {
return xerrors.Errorf("removing from valid block cache: %w", err)
}
@ -339,7 +339,7 @@ func (cs *ChainStore) SetGenesis(b *types.BlockHeader) error {
return err
}
return cs.ds.Put(dstore.NewKey("0"), b.Cid().Bytes())
return cs.metadataDs.Put(dstore.NewKey("0"), b.Cid().Bytes())
}
func (cs *ChainStore) PutTipSet(ctx context.Context, ts *types.TipSet) error {
@ -594,7 +594,7 @@ func (cs *ChainStore) takeHeaviestTipSet(ctx context.Context, ts *types.TipSet)
// FlushValidationCache removes all results of block validation from the
// chain metadata store. Usually the first step after a new chain import.
func (cs *ChainStore) FlushValidationCache() error {
return FlushValidationCache(cs.ds)
return FlushValidationCache(cs.metadataDs)
}
func FlushValidationCache(ds datastore.Batching) error {
@ -653,7 +653,7 @@ func (cs *ChainStore) SetHead(ts *types.TipSet) error {
// Contains returns whether our BlockStore has all blocks in the supplied TipSet.
func (cs *ChainStore) Contains(ts *types.TipSet) (bool, error) {
for _, c := range ts.Cids() {
has, err := cs.bs.Has(c)
has, err := cs.chainBlockstore.Has(c)
if err != nil {
return false, err
}
@ -668,16 +668,8 @@ func (cs *ChainStore) Contains(ts *types.TipSet) (bool, error) {
// GetBlock fetches a BlockHeader with the supplied CID. It returns
// blockstore.ErrNotFound if the block was not found in the BlockStore.
func (cs *ChainStore) GetBlock(c cid.Cid) (*types.BlockHeader, error) {
if cs.localviewer == nil {
sb, err := cs.localbs.Get(c)
if err != nil {
return nil, err
}
return types.DecodeBlock(sb.RawData())
}
var blk *types.BlockHeader
err := cs.localviewer.View(c, func(b []byte) (err error) {
err := cs.chainLocalBlockstore.View(c, func(b []byte) (err error) {
blk, err = types.DecodeBlock(b)
return err
})
@ -851,7 +843,7 @@ func (cs *ChainStore) PersistBlockHeaders(b ...*types.BlockHeader) error {
end = len(b)
}
err = multierr.Append(err, cs.bs.PutMany(sbs[start:end]))
err = multierr.Append(err, cs.chainLocalBlockstore.PutMany(sbs[start:end]))
}
return err
@ -875,7 +867,7 @@ func PutMessage(bs bstore.Blockstore, m storable) (cid.Cid, error) {
}
func (cs *ChainStore) PutMessage(m storable) (cid.Cid, error) {
return PutMessage(cs.bs, m)
return PutMessage(cs.chainBlockstore, m)
}
func (cs *ChainStore) expandTipset(b *types.BlockHeader) (*types.TipSet, error) {
@ -936,7 +928,7 @@ func (cs *ChainStore) AddBlock(ctx context.Context, b *types.BlockHeader) error
}
func (cs *ChainStore) GetGenesis() (*types.BlockHeader, error) {
data, err := cs.ds.Get(dstore.NewKey("0"))
data, err := cs.metadataDs.Get(dstore.NewKey("0"))
if err != nil {
return nil, err
}
@ -962,17 +954,8 @@ func (cs *ChainStore) GetCMessage(c cid.Cid) (types.ChainMsg, error) {
}
func (cs *ChainStore) GetMessage(c cid.Cid) (*types.Message, error) {
if cs.localviewer == nil {
sb, err := cs.localbs.Get(c)
if err != nil {
log.Errorf("get message get failed: %s: %s", c, err)
return nil, err
}
return types.DecodeMessage(sb.RawData())
}
var msg *types.Message
err := cs.localviewer.View(c, func(b []byte) (err error) {
err := cs.chainLocalBlockstore.View(c, func(b []byte) (err error) {
msg, err = types.DecodeMessage(b)
return err
})
@ -980,17 +963,8 @@ func (cs *ChainStore) GetMessage(c cid.Cid) (*types.Message, error) {
}
func (cs *ChainStore) GetSignedMessage(c cid.Cid) (*types.SignedMessage, error) {
if cs.localviewer == nil {
sb, err := cs.localbs.Get(c)
if err != nil {
log.Errorf("get message get failed: %s: %s", c, err)
return nil, err
}
return types.DecodeSignedMessage(sb.RawData())
}
var msg *types.SignedMessage
err := cs.localviewer.View(c, func(b []byte) (err error) {
err := cs.chainLocalBlockstore.View(c, func(b []byte) (err error) {
msg, err = types.DecodeSignedMessage(b)
return err
})
@ -1000,7 +974,7 @@ func (cs *ChainStore) GetSignedMessage(c cid.Cid) (*types.SignedMessage, error)
func (cs *ChainStore) readAMTCids(root cid.Cid) ([]cid.Cid, error) {
ctx := context.TODO()
// block headers use adt0, for now.
a, err := blockadt.AsArray(cs.Store(ctx), root)
a, err := blockadt.AsArray(cs.ActorStore(ctx), root)
if err != nil {
return nil, xerrors.Errorf("amt load: %w", err)
}
@ -1124,7 +1098,7 @@ func (cs *ChainStore) ReadMsgMetaCids(mmc cid.Cid) ([]cid.Cid, []cid.Cid, error)
return mmcids.bls, mmcids.secpk, nil
}
cst := cbor.NewCborStore(cs.localbs)
cst := cbor.NewCborStore(cs.chainLocalBlockstore)
var msgmeta types.MsgMeta
if err := cst.Get(context.TODO(), mmc, &msgmeta); err != nil {
return nil, nil, xerrors.Errorf("failed to load msgmeta (%s): %w", mmc, err)
@ -1194,7 +1168,7 @@ func (cs *ChainStore) MessagesForBlock(b *types.BlockHeader) ([]*types.Message,
func (cs *ChainStore) GetParentReceipt(b *types.BlockHeader, i int) (*types.MessageReceipt, error) {
ctx := context.TODO()
// block headers use adt0, for now.
a, err := blockadt.AsArray(cs.Store(ctx), b.ParentMessageReceipts)
a, err := blockadt.AsArray(cs.ActorStore(ctx), b.ParentMessageReceipts)
if err != nil {
return nil, xerrors.Errorf("amt load: %w", err)
}
@ -1237,16 +1211,26 @@ func (cs *ChainStore) LoadSignedMessagesFromCids(cids []cid.Cid) ([]*types.Signe
return msgs, nil
}
func (cs *ChainStore) Blockstore() bstore.Blockstore {
return cs.bs
// ChainBlockstore returns the chain blockstore. Currently the chain and state
// // stores are both backed by the same physical store, albeit with different
// // caching policies, but in the future they will segregate.
func (cs *ChainStore) ChainBlockstore() bstore.Blockstore {
return cs.chainBlockstore
}
// StateBlockstore returns the state blockstore. Currently the chain and state
// stores are both backed by the same physical store, albeit with different
// caching policies, but in the future they will segregate.
func (cs *ChainStore) StateBlockstore() bstore.Blockstore {
return cs.stateBlockstore
}
func ActorStore(ctx context.Context, bs bstore.Blockstore) adt.Store {
return adt.WrapStore(ctx, cbor.NewCborStore(bs))
}
func (cs *ChainStore) Store(ctx context.Context) adt.Store {
return ActorStore(ctx, cs.bs)
func (cs *ChainStore) ActorStore(ctx context.Context) adt.Store {
return ActorStore(ctx, cs.stateBlockstore)
}
func (cs *ChainStore) VMSys() vm.SyscallBuilder {
@ -1444,8 +1428,8 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo
return xerrors.Errorf("failed to write car header: %s", err)
}
return cs.WalkSnapshot(ctx, ts, inclRecentRoots, skipOldMsgs, func(c cid.Cid) error {
blk, err := cs.bs.Get(c)
return cs.WalkSnapshot(ctx, ts, inclRecentRoots, skipOldMsgs, true, func(c cid.Cid) error {
blk, err := cs.chainBlockstore.Get(c)
if err != nil {
return xerrors.Errorf("writing object to car, bs.Get: %w", err)
}
@ -1458,7 +1442,7 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo
})
}
func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs bool, cb func(cid.Cid) error) error {
func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs, skipMsgReceipts bool, cb func(cid.Cid) error) error {
if ts == nil {
ts = cs.GetHeaviestTipSet()
}
@ -1478,7 +1462,7 @@ func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRe
return err
}
data, err := cs.bs.Get(blk)
data, err := cs.chainBlockstore.Get(blk)
if err != nil {
return xerrors.Errorf("getting block: %w", err)
}
@ -1498,7 +1482,7 @@ func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRe
var cids []cid.Cid
if !skipOldMsgs || b.Height > ts.Height()-inclRecentRoots {
if walked.Visit(b.Messages) {
mcids, err := recurseLinks(cs.bs, walked, b.Messages, []cid.Cid{b.Messages})
mcids, err := recurseLinks(cs.chainBlockstore, walked, b.Messages, []cid.Cid{b.Messages})
if err != nil {
return xerrors.Errorf("recursing messages failed: %w", err)
}
@ -1519,13 +1503,17 @@ func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRe
if b.Height == 0 || b.Height > ts.Height()-inclRecentRoots {
if walked.Visit(b.ParentStateRoot) {
cids, err := recurseLinks(cs.bs, walked, b.ParentStateRoot, []cid.Cid{b.ParentStateRoot})
cids, err := recurseLinks(cs.chainBlockstore, walked, b.ParentStateRoot, []cid.Cid{b.ParentStateRoot})
if err != nil {
return xerrors.Errorf("recursing genesis state failed: %w", err)
}
out = append(out, cids...)
}
if !skipMsgReceipts && walked.Visit(b.ParentMessageReceipts) {
out = append(out, b.ParentMessageReceipts)
}
}
for _, c := range out {
@ -1561,7 +1549,7 @@ func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRe
}
func (cs *ChainStore) Import(r io.Reader) (*types.TipSet, error) {
header, err := car.LoadCar(cs.Blockstore(), r)
header, err := car.LoadCar(cs.StateBlockstore(), r)
if err != nil {
return nil, xerrors.Errorf("loadcar failed: %w", err)
}

View File

@ -52,7 +52,7 @@ func BenchmarkGetRandomness(b *testing.B) {
b.Fatal(err)
}
bs, err := lr.Blockstore(context.TODO(), repo.BlockstoreChain)
bs, err := lr.Blockstore(context.TODO(), repo.UniversalBlockstore)
if err != nil {
b.Fatal(err)
}

View File

@ -28,7 +28,7 @@ func (cs *ChainStore) Weight(ctx context.Context, ts *types.TipSet) (types.BigIn
tpow := big2.Zero()
{
cst := cbor.NewCborStore(cs.Blockstore())
cst := cbor.NewCborStore(cs.StateBlockstore())
state, err := state.LoadStateTree(cst, ts.ParentState())
if err != nil {
return types.NewInt(0), xerrors.Errorf("load state tree: %w", err)
@ -39,7 +39,7 @@ func (cs *ChainStore) Weight(ctx context.Context, ts *types.TipSet) (types.BigIn
return types.NewInt(0), xerrors.Errorf("get power actor: %w", err)
}
powState, err := power.Load(cs.Store(ctx), act)
powState, err := power.Load(cs.ActorStore(ctx), act)
if err != nil {
return types.NewInt(0), xerrors.Errorf("failed to load power actor state: %w", err)
}

View File

@ -354,7 +354,7 @@ func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error {
}
// Finally, flush.
return vm.Copy(context.TODO(), blockstore, syncer.store.Blockstore(), smroot)
return vm.Copy(context.TODO(), blockstore, syncer.store.ChainBlockstore(), smroot)
}
func (syncer *Syncer) LocalPeer() peer.ID {
@ -640,7 +640,7 @@ func (syncer *Syncer) minerIsValid(ctx context.Context, maddr address.Address, b
return xerrors.Errorf("failed to load power actor: %w", err)
}
powState, err := power.Load(syncer.store.Store(ctx), act)
powState, err := power.Load(syncer.store.ActorStore(ctx), act)
if err != nil {
return xerrors.Errorf("failed to load power actor state: %w", err)
}
@ -1055,7 +1055,7 @@ func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock
return err
}
st, err := state.LoadStateTree(syncer.store.Store(ctx), stateroot)
st, err := state.LoadStateTree(syncer.store.ActorStore(ctx), stateroot)
if err != nil {
return xerrors.Errorf("failed to load base state tree: %w", err)
}
@ -1172,7 +1172,7 @@ func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock
}
// Finally, flush.
return vm.Copy(ctx, tmpbs, syncer.store.Blockstore(), mrcid)
return vm.Copy(ctx, tmpbs, syncer.store.ChainBlockstore(), mrcid)
}
func (syncer *Syncer) verifyBlsAggregate(ctx context.Context, sig *crypto.Signature, msgs []cid.Cid, pubks [][]byte) error {
@ -1574,7 +1574,7 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS
return err
}
if err := copyBlockstore(ctx, bs, syncer.store.Blockstore()); err != nil {
if err := copyBlockstore(ctx, bs, syncer.store.ChainBlockstore()); err != nil {
return xerrors.Errorf("message processing failed: %w", err)
}
}

View File

@ -20,7 +20,6 @@ import (
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/bloom"
"github.com/ipfs/go-cid"
metricsi "github.com/ipfs/go-metrics-interface"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
@ -204,7 +203,7 @@ var importBenchCmd = &cli.Command{
case cctx.Bool("use-native-badger"):
log.Info("using native badger")
var opts badgerbs.Options
if opts, err = repo.BadgerBlockstoreOptions(repo.BlockstoreChain, tdir, false); err != nil {
if opts, err = repo.BadgerBlockstoreOptions(repo.UniversalBlockstore, tdir, false); err != nil {
return err
}
opts.SyncWrites = false
@ -236,14 +235,6 @@ var importBenchCmd = &cli.Command{
defer c.Close() //nolint:errcheck
}
ctx := metricsi.CtxScope(context.Background(), "lotus")
cacheOpts := blockstore.DefaultCacheOpts()
cacheOpts.HasBloomFilterSize = 0
bs, err = blockstore.CachedBlockstore(ctx, bs, cacheOpts)
if err != nil {
return err
}
var verifier ffiwrapper.Verifier = ffiwrapper.ProofVerifier
if cctx.IsSet("syscall-cache") {
scds, err := badger.NewDatastore(cctx.String("syscall-cache"), &badger.DefaultOptions)
@ -267,6 +258,15 @@ var importBenchCmd = &cli.Command{
stm := stmgr.NewStateManager(cs)
var carFile *os.File
// open the CAR file if one is provided.
if path := cctx.String("car"); path != "" {
var err error
if carFile, err = os.Open(path); err != nil {
return xerrors.Errorf("failed to open provided CAR file: %w", err)
}
}
startTime := time.Now()
// register a gauge that reports how long since the measurable
@ -308,18 +308,7 @@ var importBenchCmd = &cli.Command{
writeProfile("allocs")
}()
var carFile *os.File
// open the CAR file if one is provided.
if path := cctx.String("car"); path != "" {
var err error
if carFile, err = os.Open(path); err != nil {
return xerrors.Errorf("failed to open provided CAR file: %w", err)
}
}
var head *types.TipSet
// --- IMPORT ---
if !cctx.Bool("no-import") {
if cctx.Bool("global-profile") {

View File

@ -175,7 +175,7 @@ var chainBalanceStateCmd = &cli.Command{
defer lkrepo.Close() //nolint:errcheck
bs, err := lkrepo.Blockstore(ctx, repo.BlockstoreChain)
bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
@ -396,7 +396,7 @@ var chainPledgeCmd = &cli.Command{
defer lkrepo.Close() //nolint:errcheck
bs, err := lkrepo.Blockstore(ctx, repo.BlockstoreChain)
bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return xerrors.Errorf("failed to open blockstore: %w", err)
}

View File

@ -319,7 +319,7 @@ var datastoreRewriteCmd = &cli.Command{
)
// open the destination (to) store.
opts, err := repo.BadgerBlockstoreOptions(repo.BlockstoreChain, toPath, false)
opts, err := repo.BadgerBlockstoreOptions(repo.UniversalBlockstore, toPath, false)
if err != nil {
return xerrors.Errorf("failed to get badger options: %w", err)
}
@ -329,7 +329,7 @@ var datastoreRewriteCmd = &cli.Command{
}
// open the source (from) store.
opts, err = repo.BadgerBlockstoreOptions(repo.BlockstoreChain, fromPath, true)
opts, err = repo.BadgerBlockstoreOptions(repo.UniversalBlockstore, fromPath, true)
if err != nil {
return xerrors.Errorf("failed to get badger options: %w", err)
}

View File

@ -72,7 +72,7 @@ var exportChainCmd = &cli.Command{
defer fi.Close() //nolint:errcheck
bs, err := lr.Blockstore(ctx, repo.BlockstoreChain)
bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}

View File

@ -47,7 +47,7 @@ var importCarCmd = &cli.Command{
return xerrors.Errorf("opening the car file: %w", err)
}
bs, err := lr.Blockstore(ctx, repo.BlockstoreChain)
bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return err
}
@ -118,7 +118,7 @@ var importObjectCmd = &cli.Command{
}
defer lr.Close() //nolint:errcheck
bs, err := lr.Blockstore(ctx, repo.BlockstoreChain)
bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}

View File

@ -131,7 +131,7 @@ var stateTreePruneCmd = &cli.Command{
defer lkrepo.Close() //nolint:errcheck
bs, err := lkrepo.Blockstore(ctx, repo.BlockstoreChain)
bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
@ -191,7 +191,7 @@ var stateTreePruneCmd = &cli.Command{
rrLb := abi.ChainEpoch(cctx.Int64("keep-from-lookback"))
if err := cs.WalkSnapshot(ctx, ts, rrLb, true, func(c cid.Cid) error {
if err := cs.WalkSnapshot(ctx, ts, rrLb, true, true, func(c cid.Cid) error {
if goodSet.Len()%20 == 0 {
fmt.Printf("\renumerating keep set: %d ", goodSet.Len())
}

View File

@ -432,7 +432,7 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool)
}
defer lr.Close() //nolint:errcheck
bs, err := lr.Blockstore(ctx, repo.BlockstoreChain)
bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return xerrors.Errorf("failed to open blockstore: %w", err)
}

View File

@ -311,7 +311,7 @@ FIXME: Maybe mention the `Batching` interface as the developer will stumble upon
FIXME: IPFS blocks vs Filecoin blocks ideally happens before this / here
The [`Blockstore` interface](`github.com/filecoin-project/lotus/lib/blockstore.go`) structures the key-value pair
The [`Blockstore` interface](`github.com/filecoin-project/lotus/blockstore/blockstore.go`) structures the key-value pair
into the CID format for the key and the [`Block` interface](`github.com/ipfs/go-block-format/blocks.go`) for the value.
The `Block` value is just a raw string of bytes addressed by its hash, which is included in the CID key.

2
go.mod
View File

@ -143,7 +143,7 @@ require (
go.uber.org/zap v1.16.0
golang.org/x/net v0.0.0-20201021035429-f5854403a974
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect

3
go.sum
View File

@ -1754,8 +1754,9 @@ golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200602225109-6fdc65e7d980/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200812155832-6a926be9bd1d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200926100807-9d91bd62050c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 h1:nxC68pudNYkKU6jWhgrqdreuFiOQWj1Fs7T3VrH4Pjw=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

View File

@ -9,10 +9,12 @@ import (
"go.opencensus.io/tag"
rpcmetrics "github.com/filecoin-project/go-jsonrpc/metrics"
"github.com/filecoin-project/lotus/blockstore"
)
// Distribution
var defaultMillisecondsDistribution = view.Distribution(0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000)
var defaultMillisecondsDistribution = view.Distribution(0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 3000, 4000, 5000, 7500, 10000, 20000, 50000, 100000)
// Global Tags
var (
@ -179,33 +181,37 @@ var (
)
// DefaultViews is an array of OpenCensus views for metric gathering purposes
var DefaultViews = append([]*view.View{
InfoView,
ChainNodeHeightView,
ChainNodeHeightExpectedView,
ChainNodeWorkerHeightView,
BlockReceivedView,
BlockValidationFailureView,
BlockValidationSuccessView,
BlockValidationDurationView,
BlockDelayView,
MessagePublishedView,
MessageReceivedView,
MessageValidationFailureView,
MessageValidationSuccessView,
PeerCountView,
PubsubPublishMessageView,
PubsubDeliverMessageView,
PubsubRejectMessageView,
PubsubDuplicateMessageView,
PubsubRecvRPCView,
PubsubSendRPCView,
PubsubDropRPCView,
APIRequestDurationView,
VMFlushCopyCountView,
VMFlushCopyDurationView,
},
rpcmetrics.DefaultViews...)
var DefaultViews = func() []*view.View {
views := []*view.View{
InfoView,
ChainNodeHeightView,
ChainNodeHeightExpectedView,
ChainNodeWorkerHeightView,
BlockReceivedView,
BlockValidationFailureView,
BlockValidationSuccessView,
BlockValidationDurationView,
BlockDelayView,
MessagePublishedView,
MessageReceivedView,
MessageValidationFailureView,
MessageValidationSuccessView,
PeerCountView,
PubsubPublishMessageView,
PubsubDeliverMessageView,
PubsubRejectMessageView,
PubsubDuplicateMessageView,
PubsubRecvRPCView,
PubsubSendRPCView,
PubsubDropRPCView,
APIRequestDurationView,
VMFlushCopyCountView,
VMFlushCopyDurationView,
}
views = append(views, blockstore.DefaultViews...)
views = append(views, rpcmetrics.DefaultViews...)
return views
}()
// SinceInMilliseconds returns the duration of time since the provide time as a float64.
func SinceInMilliseconds(startTime time.Time) float64 {

View File

@ -145,7 +145,7 @@ const (
HeadMetricsKey
SettlePaymentChannelsKey
RunPeerTaggerKey
SetupFallbackBlockstoreKey
SetupFallbackBlockstoresKey
SetApiEndpointKey
@ -590,12 +590,15 @@ func Repo(r repo.Repo) Option {
Override(new(repo.LockedRepo), modules.LockedRepo(lr)), // module handles closing
Override(new(dtypes.MetadataDS), modules.Datastore),
Override(new(dtypes.ChainRawBlockstore), modules.ChainRawBlockstore),
Override(new(dtypes.ChainBlockstore), From(new(dtypes.ChainRawBlockstore))),
Override(new(dtypes.UniversalBlockstore), modules.UniversalBlockstore),
Override(new(dtypes.ChainBlockstore), modules.ChainBlockstore),
Override(new(dtypes.StateBlockstore), modules.StateBlockstore),
Override(new(dtypes.ExposedBlockstore), From(new(dtypes.UniversalBlockstore))),
If(os.Getenv("LOTUS_ENABLE_CHAINSTORE_FALLBACK") == "1",
Override(new(dtypes.ChainBlockstore), modules.FallbackChainBlockstore),
Override(SetupFallbackBlockstoreKey, modules.SetupFallbackBlockstore),
Override(new(dtypes.StateBlockstore), modules.FallbackStateBlockstore),
Override(SetupFallbackBlockstoresKey, modules.InitFallbackBlockstores),
),
Override(new(dtypes.ClientImportMgr), modules.ClientImportMgr),

View File

@ -35,6 +35,7 @@ import (
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/node/modules/dtypes"
)
var log = logging.Logger("fullnode")
@ -57,6 +58,11 @@ type ChainModule struct {
fx.In
Chain *store.ChainStore
// ExposedBlockstore is the global monolith blockstore that is safe to
// expose externally. In the future, this will be segregated into two
// blockstores.
ExposedBlockstore dtypes.ExposedBlockstore
}
var _ ChainModuleAPI = (*ChainModule)(nil)
@ -68,6 +74,11 @@ type ChainAPI struct {
ChainModuleAPI
Chain *store.ChainStore
// ExposedBlockstore is the global monolith blockstore that is safe to
// expose externally. In the future, this will be segregated into two
// blockstores.
ExposedBlockstore dtypes.ExposedBlockstore
}
func (m *ChainModule) ChainNotify(ctx context.Context) (<-chan []*api.HeadChange, error) {
@ -212,7 +223,7 @@ func (m *ChainModule) ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpo
}
func (m *ChainModule) ChainReadObj(ctx context.Context, obj cid.Cid) ([]byte, error) {
blk, err := m.Chain.Blockstore().Get(obj)
blk, err := m.ExposedBlockstore.Get(obj)
if err != nil {
return nil, xerrors.Errorf("blockstore get: %w", err)
}
@ -221,15 +232,15 @@ func (m *ChainModule) ChainReadObj(ctx context.Context, obj cid.Cid) ([]byte, er
}
func (a *ChainAPI) ChainDeleteObj(ctx context.Context, obj cid.Cid) error {
return a.Chain.Blockstore().DeleteBlock(obj)
return a.ExposedBlockstore.DeleteBlock(obj)
}
func (m *ChainModule) ChainHasObj(ctx context.Context, obj cid.Cid) (bool, error) {
return m.Chain.Blockstore().Has(obj)
return m.ExposedBlockstore.Has(obj)
}
func (a *ChainAPI) ChainStatObj(ctx context.Context, obj cid.Cid, base cid.Cid) (api.ObjStat, error) {
bs := a.Chain.Blockstore()
bs := a.ExposedBlockstore
bsvc := blockservice.New(bs, offline.Exchange(bs))
dag := merkledag.NewDAGService(bsvc)
@ -514,7 +525,7 @@ func (a *ChainAPI) ChainGetNode(ctx context.Context, p string) (*api.IpldObject,
return nil, xerrors.Errorf("parsing path: %w", err)
}
bs := a.Chain.Blockstore()
bs := a.ExposedBlockstore
bsvc := blockservice.New(bs, offline.Exchange(bs))
dag := merkledag.NewDAGService(bsvc)

View File

@ -97,7 +97,7 @@ func (a *StateAPI) StateMinerSectors(ctx context.Context, addr address.Address,
return nil, xerrors.Errorf("failed to load miner actor: %w", err)
}
mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act)
mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act)
if err != nil {
return nil, xerrors.Errorf("failed to load miner actor state: %w", err)
}
@ -111,7 +111,7 @@ func (a *StateAPI) StateMinerActiveSectors(ctx context.Context, maddr address.Ad
return nil, xerrors.Errorf("failed to load miner actor: %w", err)
}
mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act)
mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act)
if err != nil {
return nil, xerrors.Errorf("failed to load miner actor state: %w", err)
}
@ -135,7 +135,7 @@ func (m *StateModule) StateMinerInfo(ctx context.Context, actor address.Address,
return miner.MinerInfo{}, xerrors.Errorf("failed to load miner actor: %w", err)
}
mas, err := miner.Load(m.StateManager.ChainStore().Store(ctx), act)
mas, err := miner.Load(m.StateManager.ChainStore().ActorStore(ctx), act)
if err != nil {
return miner.MinerInfo{}, xerrors.Errorf("failed to load miner actor state: %w", err)
}
@ -153,7 +153,7 @@ func (a *StateAPI) StateMinerDeadlines(ctx context.Context, m address.Address, t
return nil, xerrors.Errorf("failed to load miner actor: %w", err)
}
mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act)
mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act)
if err != nil {
return nil, xerrors.Errorf("failed to load miner actor state: %w", err)
}
@ -192,7 +192,7 @@ func (a *StateAPI) StateMinerPartitions(ctx context.Context, m address.Address,
return nil, xerrors.Errorf("failed to load miner actor: %w", err)
}
mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act)
mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act)
if err != nil {
return nil, xerrors.Errorf("failed to load miner actor state: %w", err)
}
@ -253,7 +253,7 @@ func (m *StateModule) StateMinerProvingDeadline(ctx context.Context, addr addres
return nil, xerrors.Errorf("failed to load miner actor: %w", err)
}
mas, err := miner.Load(m.StateManager.ChainStore().Store(ctx), act)
mas, err := miner.Load(m.StateManager.ChainStore().ActorStore(ctx), act)
if err != nil {
return nil, xerrors.Errorf("failed to load miner actor state: %w", err)
}
@ -272,7 +272,7 @@ func (a *StateAPI) StateMinerFaults(ctx context.Context, addr address.Address, t
return bitfield.BitField{}, xerrors.Errorf("failed to load miner actor: %w", err)
}
mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act)
mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act)
if err != nil {
return bitfield.BitField{}, xerrors.Errorf("failed to load miner actor state: %w", err)
}
@ -329,7 +329,7 @@ func (a *StateAPI) StateMinerRecoveries(ctx context.Context, addr address.Addres
return bitfield.BitField{}, xerrors.Errorf("failed to load miner actor: %w", err)
}
mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act)
mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act)
if err != nil {
return bitfield.BitField{}, xerrors.Errorf("failed to load miner actor state: %w", err)
}
@ -461,7 +461,7 @@ func (a *StateAPI) StateReadState(ctx context.Context, actor address.Address, ts
return nil, xerrors.Errorf("getting actor: %w", err)
}
blk, err := a.Chain.Blockstore().Get(act.Head)
blk, err := a.Chain.StateBlockstore().Get(act.Head)
if err != nil {
return nil, xerrors.Errorf("getting actor head: %w", err)
}
@ -707,7 +707,7 @@ func (m *StateModule) StateMarketStorageDeal(ctx context.Context, dealId abi.Dea
}
func (a *StateAPI) StateChangedActors(ctx context.Context, old cid.Cid, new cid.Cid) (map[string]types.Actor, error) {
store := a.Chain.Store(ctx)
store := a.Chain.ActorStore(ctx)
oldTree, err := state.LoadStateTree(store, old)
if err != nil {
@ -727,7 +727,7 @@ func (a *StateAPI) StateMinerSectorCount(ctx context.Context, addr address.Addre
if err != nil {
return api.MinerSectors{}, err
}
mas, err := miner.Load(a.Chain.Store(ctx), act)
mas, err := miner.Load(a.Chain.ActorStore(ctx), act)
if err != nil {
return api.MinerSectors{}, err
}
@ -792,7 +792,7 @@ func (a *StateAPI) StateSectorExpiration(ctx context.Context, maddr address.Addr
if err != nil {
return nil, err
}
mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act)
mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act)
if err != nil {
return nil, err
}
@ -804,7 +804,7 @@ func (a *StateAPI) StateSectorPartition(ctx context.Context, maddr address.Addre
if err != nil {
return nil, err
}
mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act)
mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act)
if err != nil {
return nil, err
}
@ -890,7 +890,7 @@ func (m *StateModule) MsigGetAvailableBalance(ctx context.Context, addr address.
if err != nil {
return types.EmptyInt, xerrors.Errorf("failed to load multisig actor: %w", err)
}
msas, err := multisig.Load(m.Chain.Store(ctx), act)
msas, err := multisig.Load(m.Chain.ActorStore(ctx), act)
if err != nil {
return types.EmptyInt, xerrors.Errorf("failed to load multisig actor state: %w", err)
}
@ -912,7 +912,7 @@ func (a *StateAPI) MsigGetVestingSchedule(ctx context.Context, addr address.Addr
return api.EmptyVesting, xerrors.Errorf("failed to load multisig actor: %w", err)
}
msas, err := multisig.Load(a.Chain.Store(ctx), act)
msas, err := multisig.Load(a.Chain.ActorStore(ctx), act)
if err != nil {
return api.EmptyVesting, xerrors.Errorf("failed to load multisig actor state: %w", err)
}
@ -961,7 +961,7 @@ func (m *StateModule) MsigGetVested(ctx context.Context, addr address.Address, s
return types.EmptyInt, xerrors.Errorf("failed to load multisig actor at end epoch: %w", err)
}
msas, err := multisig.Load(m.Chain.Store(ctx), act)
msas, err := multisig.Load(m.Chain.ActorStore(ctx), act)
if err != nil {
return types.EmptyInt, xerrors.Errorf("failed to load multisig actor state: %w", err)
}
@ -989,7 +989,7 @@ func (m *StateModule) MsigGetPending(ctx context.Context, addr address.Address,
if err != nil {
return nil, xerrors.Errorf("failed to load multisig actor: %w", err)
}
msas, err := multisig.Load(m.Chain.Store(ctx), act)
msas, err := multisig.Load(m.Chain.ActorStore(ctx), act)
if err != nil {
return nil, xerrors.Errorf("failed to load multisig actor state: %w", err)
}
@ -1032,7 +1032,7 @@ func (a *StateAPI) StateMinerPreCommitDepositForPower(ctx context.Context, maddr
return types.EmptyInt, xerrors.Errorf("failed to get resolve size: %w", err)
}
store := a.Chain.Store(ctx)
store := a.Chain.ActorStore(ctx)
var sectorWeight abi.StoragePower
if act, err := state.GetActor(market.Address); err != nil {
@ -1093,7 +1093,7 @@ func (a *StateAPI) StateMinerInitialPledgeCollateral(ctx context.Context, maddr
return types.EmptyInt, xerrors.Errorf("failed to get resolve size: %w", err)
}
store := a.Chain.Store(ctx)
store := a.Chain.ActorStore(ctx)
var sectorWeight abi.StoragePower
if act, err := state.GetActor(market.Address); err != nil {
@ -1164,7 +1164,7 @@ func (a *StateAPI) StateMinerAvailableBalance(ctx context.Context, maddr address
return types.EmptyInt, xerrors.Errorf("failed to load miner actor: %w", err)
}
mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act)
mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act)
if err != nil {
return types.EmptyInt, xerrors.Errorf("failed to load miner actor state: %w", err)
}
@ -1193,7 +1193,7 @@ func (a *StateAPI) StateMinerSectorAllocated(ctx context.Context, maddr address.
return false, xerrors.Errorf("failed to load miner actor: %w", err)
}
mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act)
mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act)
if err != nil {
return false, xerrors.Errorf("failed to load miner actor state: %w", err)
}
@ -1216,7 +1216,7 @@ func (a *StateAPI) StateVerifierStatus(ctx context.Context, addr address.Address
return nil, err
}
vrs, err := verifreg.Load(a.StateManager.ChainStore().Store(ctx), act)
vrs, err := verifreg.Load(a.StateManager.ChainStore().ActorStore(ctx), act)
if err != nil {
return nil, xerrors.Errorf("failed to load verified registry state: %w", err)
}
@ -1247,7 +1247,7 @@ func (m *StateModule) StateVerifiedClientStatus(ctx context.Context, addr addres
return nil, err
}
vrs, err := verifreg.Load(m.StateManager.ChainStore().Store(ctx), act)
vrs, err := verifreg.Load(m.StateManager.ChainStore().ActorStore(ctx), act)
if err != nil {
return nil, xerrors.Errorf("failed to load verified registry state: %w", err)
}
@ -1269,7 +1269,7 @@ func (a *StateAPI) StateVerifiedRegistryRootKey(ctx context.Context, tsk types.T
return address.Undef, err
}
vst, err := verifreg.Load(a.StateManager.ChainStore().Store(ctx), vact)
vst, err := verifreg.Load(a.StateManager.ChainStore().ActorStore(ctx), vact)
if err != nil {
return address.Undef, err
}
@ -1298,12 +1298,12 @@ func (m *StateModule) StateDealProviderCollateralBounds(ctx context.Context, siz
return api.DealCollateralBounds{}, xerrors.Errorf("failed to load reward actor: %w", err)
}
pst, err := power.Load(m.StateManager.ChainStore().Store(ctx), pact)
pst, err := power.Load(m.StateManager.ChainStore().ActorStore(ctx), pact)
if err != nil {
return api.DealCollateralBounds{}, xerrors.Errorf("failed to load power actor state: %w", err)
}
rst, err := reward.Load(m.StateManager.ChainStore().Store(ctx), ract)
rst, err := reward.Load(m.StateManager.ChainStore().ActorStore(ctx), ract)
if err != nil {
return api.DealCollateralBounds{}, xerrors.Errorf("failed to load reward actor state: %w", err)
}

View File

@ -0,0 +1,65 @@
package modules
import (
"context"
"io"
bstore "github.com/ipfs/go-ipfs-blockstore"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/repo"
)
// UniversalBlockstore returns a single universal blockstore that stores both
// chain data and state data.
func UniversalBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.UniversalBlockstore, error) {
bs, err := r.Blockstore(helpers.LifecycleCtx(mctx, lc), repo.UniversalBlockstore)
if err != nil {
return nil, err
}
if c, ok := bs.(io.Closer); ok {
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
return c.Close()
},
})
}
return bs, err
}
// StateBlockstore is a hook to overlay caches for state objects, or in the
// future, to segregate the universal blockstore into different physical state
// and chain stores.
func StateBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.UniversalBlockstore) (dtypes.StateBlockstore, error) {
return bs, nil
}
// ChainBlockstore is a hook to overlay caches for state objects, or in the
// future, to segregate the universal blockstore into different physical state
// and chain stores.
func ChainBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.UniversalBlockstore) (dtypes.ChainBlockstore, error) {
return bs, nil
}
func FallbackChainBlockstore(cbs dtypes.ChainBlockstore) dtypes.ChainBlockstore {
return &blockstore.FallbackStore{Blockstore: cbs}
}
func FallbackStateBlockstore(sbs dtypes.StateBlockstore) dtypes.StateBlockstore {
return &blockstore.FallbackStore{Blockstore: sbs}
}
func InitFallbackBlockstores(cbs dtypes.ChainBlockstore, sbs dtypes.StateBlockstore, rem dtypes.ChainBitswap) error {
for _, bs := range []bstore.Blockstore{cbs, sbs} {
if fbs, ok := bs.(*blockstore.FallbackStore); ok {
fbs.SetFallback(rem.GetBlock)
continue
}
return xerrors.Errorf("expected a FallbackStore")
}
return nil
}

View File

@ -1,25 +1,18 @@
package modules
import (
"bytes"
"context"
"os"
"time"
"github.com/ipfs/go-bitswap"
"github.com/ipfs/go-bitswap/network"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-datastore"
"github.com/ipld/go-car"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/routing"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain"
@ -29,14 +22,15 @@ import (
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/repo"
)
func ChainBitswap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.Routing, bs dtypes.ChainBlockstore) dtypes.ChainBitswap {
// ChainBitswap uses a blockstore that bypasses all caches.
func ChainBitswap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.Routing, bs dtypes.ExposedBlockstore) dtypes.ChainBitswap {
// prefix protocol for chain bitswap
// (so bitswap uses /chain/ipfs/bitswap/1.0.0 internally for chain sync stuff)
bitswapNetwork := network.NewFromIpfsHost(host, rt, network.Prefix("/chain"))
@ -60,6 +54,10 @@ func ChainBitswap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt r
return exch
}
func ChainBlockService(bs dtypes.ExposedBlockstore, rem dtypes.ChainBitswap) dtypes.ChainBlockService {
return blockservice.New(bs, rem)
}
func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS, nn dtypes.NetworkName, j journal.Journal) (*messagepool.MessagePool, error) {
mpp := messagepool.NewProvider(sm, ps)
mp, err := messagepool.New(mpp, ds, nn, j)
@ -74,43 +72,8 @@ func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds
return mp, nil
}
func ChainRawBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.ChainRawBlockstore, error) {
bs, err := r.Blockstore(helpers.LifecycleCtx(mctx, lc), repo.BlockstoreChain)
if err != nil {
return nil, err
}
// TODO potentially replace this cached blockstore by a CBOR cache.
cbs, err := blockstore.CachedBlockstore(helpers.LifecycleCtx(mctx, lc), bs, blockstore.DefaultCacheOpts())
if err != nil {
return nil, err
}
return cbs, nil
}
func ChainBlockService(bs dtypes.ChainRawBlockstore, rem dtypes.ChainBitswap) dtypes.ChainBlockService {
return blockservice.New(bs, rem)
}
func FallbackChainBlockstore(rbs dtypes.ChainRawBlockstore) dtypes.ChainBlockstore {
return &blockstore.FallbackStore{
Blockstore: rbs,
}
}
func SetupFallbackBlockstore(cbs dtypes.ChainBlockstore, rem dtypes.ChainBitswap) error {
fbs, ok := cbs.(*blockstore.FallbackStore)
if !ok {
return xerrors.Errorf("expected a FallbackStore")
}
fbs.SetFallback(rem.GetBlock)
return nil
}
func ChainStore(lc fx.Lifecycle, bs dtypes.ChainBlockstore, lbs dtypes.ChainRawBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore {
chain := store.NewChainStore(bs, lbs, ds, syscalls, j)
func ChainStore(lc fx.Lifecycle, cbs dtypes.ChainBlockstore, sbs dtypes.StateBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore {
chain := store.NewChainStore(cbs, sbs, ds, syscalls, j)
if err := chain.Load(); err != nil {
log.Warnf("loading chain state from disk: %s", err)
@ -125,65 +88,6 @@ func ChainStore(lc fx.Lifecycle, bs dtypes.ChainBlockstore, lbs dtypes.ChainRawB
return chain
}
func ErrorGenesis() Genesis {
return func() (header *types.BlockHeader, e error) {
return nil, xerrors.New("No genesis block provided, provide the file with 'lotus daemon --genesis=[genesis file]'")
}
}
func LoadGenesis(genBytes []byte) func(dtypes.ChainBlockstore) Genesis {
return func(bs dtypes.ChainBlockstore) Genesis {
return func() (header *types.BlockHeader, e error) {
c, err := car.LoadCar(bs, bytes.NewReader(genBytes))
if err != nil {
return nil, xerrors.Errorf("loading genesis car file failed: %w", err)
}
if len(c.Roots) != 1 {
return nil, xerrors.New("expected genesis file to have one root")
}
root, err := bs.Get(c.Roots[0])
if err != nil {
return nil, err
}
h, err := types.DecodeBlock(root.RawData())
if err != nil {
return nil, xerrors.Errorf("decoding block failed: %w", err)
}
return h, nil
}
}
}
func DoSetGenesis(_ dtypes.AfterGenesisSet) {}
func SetGenesis(cs *store.ChainStore, g Genesis) (dtypes.AfterGenesisSet, error) {
genFromRepo, err := cs.GetGenesis()
if err == nil {
if os.Getenv("LOTUS_SKIP_GENESIS_CHECK") != "_yes_" {
expectedGenesis, err := g()
if err != nil {
return dtypes.AfterGenesisSet{}, xerrors.Errorf("getting expected genesis failed: %w", err)
}
if genFromRepo.Cid() != expectedGenesis.Cid() {
return dtypes.AfterGenesisSet{}, xerrors.Errorf("genesis in the repo is not the one expected by this version of Lotus!")
}
}
return dtypes.AfterGenesisSet{}, nil // already set, noop
}
if err != datastore.ErrNotFound {
return dtypes.AfterGenesisSet{}, xerrors.Errorf("getting genesis block failed: %w", err)
}
genesis, err := g()
if err != nil {
return dtypes.AfterGenesisSet{}, xerrors.Errorf("genesis func failed: %w", err)
}
return dtypes.AfterGenesisSet{}, cs.SetGenesis(genesis)
}
func NetworkName(mctx helpers.MetricsCtx, lc fx.Lifecycle, cs *store.ChainStore, us stmgr.UpgradeSchedule, _ dtypes.AfterGenesisSet) (dtypes.NetworkName, error) {
if !build.Devnet {
return "testnetnet", nil

View File

@ -83,7 +83,7 @@ func ClientMultiDatastore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.Locke
ctx := helpers.LifecycleCtx(mctx, lc)
ds, err := r.Datastore(ctx, "/client")
if err != nil {
return nil, xerrors.Errorf("getting datastore out of reop: %w", err)
return nil, xerrors.Errorf("getting datastore out of repo: %w", err)
}
mds, err := multistore.NewMultiDstore(ds)

View File

@ -19,19 +19,41 @@ import (
"github.com/filecoin-project/lotus/node/repo/retrievalstoremgr"
)
// MetadataDS stores metadata
// dy default it's namespaced under /metadata in main repo datastore
// MetadataDS stores metadata. By default it's namespaced under /metadata in
// main repo datastore.
type MetadataDS datastore.Batching
type ChainRawBlockstore blockstore.Blockstore
type ChainBlockstore blockstore.Blockstore // optionally bitswap backed
type (
// UniversalBlockstore is the cold blockstore.
UniversalBlockstore blockstore.Blockstore
// ChainBlockstore is a blockstore to store chain data (tipsets, blocks,
// messages). It is physically backed by the BareMonolithBlockstore, but it
// has a cache on top that is specially tuned for chain data access
// patterns.
ChainBlockstore blockstore.Blockstore
// StateBlockstore is a blockstore to store state data (state tree). It is
// physically backed by the BareMonolithBlockstore, but it has a cache on
// top that is specially tuned for state data access patterns.
StateBlockstore blockstore.Blockstore
// ExposedBlockstore is a blockstore that interfaces directly with the
// network or with users, from which queries are served, and where incoming
// data is deposited. For security reasons, this store is disconnected from
// any internal caches. If blocks are added to this store in a way that
// could render caches dirty (e.g. a block is added when an existence cache
// holds a 'false' for that block), the process should signal so by calling
// blockstore.AllCaches.Dirty(cid).
ExposedBlockstore blockstore.Blockstore
)
type ChainBitswap exchange.Interface
type ChainBlockService bserv.BlockService
type ClientMultiDstore *multistore.MultiStore
type ClientImportMgr *importmgr.Mgr
type ClientBlockstore blockstore.Blockstore
type ClientBlockstore blockstore.BasicBlockstore
type ClientDealStore *statestore.StateStore
type ClientRequestValidator *requestvalidation.UnifiedRequestValidator
type ClientDatastore datastore.Batching
@ -50,6 +72,6 @@ type ProviderRequestValidator *requestvalidation.UnifiedRequestValidator
type ProviderDataTransfer datatransfer.Manager
type StagingDAG format.DAGService
type StagingBlockstore blockstore.Blockstore
type StagingBlockstore blockstore.BasicBlockstore
type StagingGraphsync graphsync.GraphExchange
type StagingMultiDstore *multistore.MultiStore

73
node/modules/genesis.go Normal file
View File

@ -0,0 +1,73 @@
package modules
import (
"bytes"
"os"
"github.com/ipfs/go-datastore"
"github.com/ipld/go-car"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/modules/dtypes"
)
func ErrorGenesis() Genesis {
return func() (header *types.BlockHeader, e error) {
return nil, xerrors.New("No genesis block provided, provide the file with 'lotus daemon --genesis=[genesis file]'")
}
}
func LoadGenesis(genBytes []byte) func(dtypes.ChainBlockstore) Genesis {
return func(bs dtypes.ChainBlockstore) Genesis {
return func() (header *types.BlockHeader, e error) {
c, err := car.LoadCar(bs, bytes.NewReader(genBytes))
if err != nil {
return nil, xerrors.Errorf("loading genesis car file failed: %w", err)
}
if len(c.Roots) != 1 {
return nil, xerrors.New("expected genesis file to have one root")
}
root, err := bs.Get(c.Roots[0])
if err != nil {
return nil, err
}
h, err := types.DecodeBlock(root.RawData())
if err != nil {
return nil, xerrors.Errorf("decoding block failed: %w", err)
}
return h, nil
}
}
}
func DoSetGenesis(_ dtypes.AfterGenesisSet) {}
func SetGenesis(cs *store.ChainStore, g Genesis) (dtypes.AfterGenesisSet, error) {
genFromRepo, err := cs.GetGenesis()
if err == nil {
if os.Getenv("LOTUS_SKIP_GENESIS_CHECK") != "_yes_" {
expectedGenesis, err := g()
if err != nil {
return dtypes.AfterGenesisSet{}, xerrors.Errorf("getting expected genesis failed: %w", err)
}
if genFromRepo.Cid() != expectedGenesis.Cid() {
return dtypes.AfterGenesisSet{}, xerrors.Errorf("genesis in the repo is not the one expected by this version of Lotus!")
}
}
return dtypes.AfterGenesisSet{}, nil // already set, noop
}
if err != datastore.ErrNotFound {
return dtypes.AfterGenesisSet{}, xerrors.Errorf("getting genesis block failed: %w", err)
}
genesis, err := g()
if err != nil {
return dtypes.AfterGenesisSet{}, xerrors.Errorf("genesis func failed: %w", err)
}
return dtypes.AfterGenesisSet{}, cs.SetGenesis(genesis)
}

View File

@ -14,8 +14,8 @@ import (
)
// Graphsync creates a graphsync instance from the given loader and storer
func Graphsync(parallelTransfers uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ChainBlockstore, h host.Host) (dtypes.Graphsync, error) {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ChainBlockstore, h host.Host) (dtypes.Graphsync, error) {
func Graphsync(parallelTransfers uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ExposedBlockstore, h host.Host) (dtypes.Graphsync, error) {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ExposedBlockstore, h host.Host) (dtypes.Graphsync, error) {
graphsyncNetwork := gsnet.NewFromLibp2pHost(h)
loader := storeutil.LoaderForBlockstore(clientBs)
storer := storeutil.StorerForBlockstore(clientBs)

View File

@ -18,7 +18,7 @@ import (
func IpfsClientBlockstore(ipfsMaddr string, onlineMode bool) func(helpers.MetricsCtx, fx.Lifecycle, dtypes.ClientImportMgr) (dtypes.ClientBlockstore, error) {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, localStore dtypes.ClientImportMgr) (dtypes.ClientBlockstore, error) {
var err error
var ipfsbs blockstore.Blockstore
var ipfsbs blockstore.BasicBlockstore
if ipfsMaddr != "" {
var ma multiaddr.Multiaddr
ma, err = multiaddr.NewMultiaddr(ipfsMaddr)

View File

@ -5,10 +5,6 @@ import badgerbs "github.com/filecoin-project/lotus/blockstore/badger"
// BadgerBlockstoreOptions returns the badger options to apply for the provided
// domain.
func BadgerBlockstoreOptions(domain BlockstoreDomain, path string, readonly bool) (badgerbs.Options, error) {
if domain != BlockstoreChain {
return badgerbs.Options{}, ErrInvalidBlockstoreDomain
}
opts := badgerbs.DefaultOptions(path)
// Due to legacy usage of blockstore.Blockstore, over a datastore, all

View File

@ -13,7 +13,7 @@ import (
"sync"
"github.com/BurntSushi/toml"
"github.com/filecoin-project/lotus/blockstore"
"github.com/ipfs/go-datastore"
fslock "github.com/ipfs/go-fs-lock"
logging "github.com/ipfs/go-log/v2"
@ -22,7 +22,7 @@ import (
"github.com/multiformats/go-multiaddr"
"golang.org/x/xerrors"
lblockstore "github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/blockstore"
badgerbs "github.com/filecoin-project/lotus/blockstore/badger"
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
@ -264,11 +264,18 @@ type fsLockedRepo struct {
bs blockstore.Blockstore
bsErr error
bsOnce sync.Once
ssPath string
ssErr error
ssOnce sync.Once
storageLk sync.Mutex
configLk sync.Mutex
}
func (fsr *fsLockedRepo) Readonly() bool {
return fsr.readonly
}
func (fsr *fsLockedRepo) Path() string {
return fsr.path
}
@ -301,7 +308,7 @@ func (fsr *fsLockedRepo) Close() error {
// Blockstore returns a blockstore for the provided data domain.
func (fsr *fsLockedRepo) Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, error) {
if domain != BlockstoreChain {
if domain != UniversalBlockstore {
return nil, ErrInvalidBlockstoreDomain
}
@ -325,12 +332,27 @@ func (fsr *fsLockedRepo) Blockstore(ctx context.Context, domain BlockstoreDomain
fsr.bsErr = err
return
}
fsr.bs = lblockstore.WrapIDStore(bs)
fsr.bs = blockstore.WrapIDStore(bs)
})
return fsr.bs, fsr.bsErr
}
func (fsr *fsLockedRepo) SplitstorePath() (string, error) {
fsr.ssOnce.Do(func() {
path := fsr.join(filepath.Join(fsDatastore, "splitstore"))
if err := os.MkdirAll(path, 0755); err != nil {
fsr.ssErr = err
return
}
fsr.ssPath = path
})
return fsr.ssPath, fsr.ssErr
}
// join joins path elements with fsr.path
func (fsr *fsLockedRepo) join(paths ...string) string {
return filepath.Join(append([]string{fsr.path}, paths...)...)

View File

@ -16,7 +16,7 @@ type Mgr struct {
mds *multistore.MultiStore
ds datastore.Batching
Blockstore blockstore.Blockstore
Blockstore blockstore.BasicBlockstore
}
type Label string

View File

@ -4,10 +4,10 @@ import (
"context"
"errors"
"github.com/filecoin-project/lotus/blockstore"
"github.com/ipfs/go-datastore"
"github.com/multiformats/go-multiaddr"
"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
@ -18,11 +18,11 @@ import (
type BlockstoreDomain string
const (
// BlockstoreChain represents the blockstore domain for chain data.
// UniversalBlockstore represents the blockstore domain for all data.
// Right now, this includes chain objects (tipsets, blocks, messages), as
// well as state. In the future, they may get segregated into different
// domains.
BlockstoreChain = BlockstoreDomain("chain")
UniversalBlockstore = BlockstoreDomain("universal")
)
var (
@ -63,6 +63,9 @@ type LockedRepo interface {
// the lifecycle.
Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, error)
// SplitstorePath returns the path for the SplitStore
SplitstorePath() (string, error)
// Returns config in this repo
Config() (interface{}, error)
SetConfig(func(interface{})) error
@ -84,4 +87,7 @@ type LockedRepo interface {
// Path returns absolute path of the repo
Path() string
// Readonly returns true if the repo is readonly
Readonly() bool
}

View File

@ -201,6 +201,10 @@ func (mem *MemRepo) Lock(t RepoType) (LockedRepo, error) {
}, nil
}
func (lmem *lockedMemRepo) Readonly() bool {
return false
}
func (lmem *lockedMemRepo) checkToken() error {
lmem.RLock()
defer lmem.RUnlock()
@ -246,12 +250,16 @@ func (lmem *lockedMemRepo) Datastore(_ context.Context, ns string) (datastore.Ba
}
func (lmem *lockedMemRepo) Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, error) {
if domain != BlockstoreChain {
if domain != UniversalBlockstore {
return nil, ErrInvalidBlockstoreDomain
}
return lmem.mem.blockstore, nil
}
func (lmem *lockedMemRepo) SplitstorePath() (string, error) {
return ioutil.TempDir("", "splitstore.*")
}
func (lmem *lockedMemRepo) ListDatastores(ns string) ([]int64, error) {
return nil, nil
}

View File

@ -73,13 +73,13 @@ func (mrs *multiStoreRetrievalStore) DAGService() ipldformat.DAGService {
// BlockstoreRetrievalStoreManager manages a single blockstore as if it were multiple stores
type BlockstoreRetrievalStoreManager struct {
bs blockstore.Blockstore
bs blockstore.BasicBlockstore
}
var _ RetrievalStoreManager = &BlockstoreRetrievalStoreManager{}
// NewBlockstoreRetrievalStoreManager returns a new blockstore based RetrievalStoreManager
func NewBlockstoreRetrievalStoreManager(bs blockstore.Blockstore) RetrievalStoreManager {
func NewBlockstoreRetrievalStoreManager(bs blockstore.BasicBlockstore) RetrievalStoreManager {
return &BlockstoreRetrievalStoreManager{
bs: bs,
}