From 3795cc2bd20ef13fa256ac9c71de23bc3d3de42d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Sun, 28 Feb 2021 22:48:36 +0000 Subject: [PATCH] segregate chain and state blockstores. This paves the way for better object lifetime management. Concretely, it makes it possible to: - have different stores backing chain and state data. - having the same datastore library, but using different parameters. - attach different caching layers/policies to each class of data, e.g. sizing caches differently. - specifying different retention policies for chain and state data. This separation is important because: - access patterns/frequency of chain and state data are different. - state is derivable from chain, so one could never expunge the chain store, and only retain state objects reachable from the last finality in the state store. --- blockstore/badger/blockstore.go | 5 +- blockstore/badger/blockstore_test_suite.go | 9 +- blockstore/fallback.go | 15 +- blockstore/metrics.go | 154 ++++++++++++++++++ chain/gen/gen.go | 2 +- chain/gen/genesis/genesis.go | 2 +- chain/gen/genesis/miners.go | 2 +- chain/gen/mining.go | 2 +- chain/stmgr/call.go | 4 +- chain/stmgr/forks.go | 26 +-- chain/stmgr/forks_test.go | 2 +- chain/stmgr/read.go | 4 +- chain/stmgr/stmgr.go | 23 +-- chain/stmgr/utils.go | 32 ++-- chain/store/store.go | 140 ++++++++-------- chain/store/store_test.go | 2 +- chain/store/weight.go | 4 +- chain/sync.go | 10 +- cmd/lotus-bench/import.go | 31 ++-- cmd/lotus-shed/balances.go | 4 +- cmd/lotus-shed/datastore.go | 4 +- cmd/lotus-shed/export.go | 2 +- cmd/lotus-shed/import-car.go | 4 +- cmd/lotus-shed/pruning.go | 4 +- cmd/lotus/daemon.go | 2 +- documentation/en/architecture/architecture.md | 2 +- go.mod | 2 +- go.sum | 3 +- metrics/metrics.go | 62 +++---- node/builder.go | 11 +- node/impl/full/chain.go | 21 ++- node/impl/full/state.go | 52 +++--- node/modules/blockstore.go | 65 ++++++++ node/modules/chain.go | 116 ++----------- node/modules/client.go | 2 +- node/modules/dtypes/storage.go | 34 +++- node/modules/genesis.go | 73 +++++++++ node/modules/graphsync.go | 4 +- node/modules/ipfsclient.go | 2 +- node/repo/blockstore_opts.go | 4 - node/repo/fsrepo.go | 30 +++- node/repo/importmgr/mgr.go | 2 +- node/repo/interface.go | 12 +- node/repo/memrepo.go | 10 +- .../retrievalstoremgr/retrievalstoremgr.go | 4 +- 45 files changed, 630 insertions(+), 370 deletions(-) create mode 100644 blockstore/metrics.go create mode 100644 node/modules/blockstore.go create mode 100644 node/modules/genesis.go diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index 47dbf98d3..22f9036e3 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -110,10 +110,7 @@ func Open(opts Options) (*Blockstore, error) { return nil, fmt.Errorf("failed to open badger blockstore: %w", err) } - bs := &Blockstore{ - DB: db, - } - + bs := &Blockstore{DB: db} if p := opts.Prefix; p != "" { bs.prefixing = true bs.prefix = []byte(p) diff --git a/blockstore/badger/blockstore_test_suite.go b/blockstore/badger/blockstore_test_suite.go index ebf1be80a..93be82ac8 100644 --- a/blockstore/badger/blockstore_test_suite.go +++ b/blockstore/badger/blockstore_test_suite.go @@ -8,18 +8,19 @@ import ( "strings" "testing" - "github.com/filecoin-project/lotus/blockstore" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" u "github.com/ipfs/go-ipfs-util" + "github.com/filecoin-project/lotus/blockstore" + "github.com/stretchr/testify/require" ) // TODO: move this to go-ipfs-blockstore. type Suite struct { - NewBlockstore func(tb testing.TB) (bs blockstore.Blockstore, path string) - OpenBlockstore func(tb testing.TB, path string) (bs blockstore.Blockstore, err error) + NewBlockstore func(tb testing.TB) (bs blockstore.BasicBlockstore, path string) + OpenBlockstore func(tb testing.TB, path string) (bs blockstore.BasicBlockstore, err error) } func (s *Suite) RunTests(t *testing.T, prefix string) { @@ -290,7 +291,7 @@ func (s *Suite) TestDelete(t *testing.T) { } -func insertBlocks(t *testing.T, bs blockstore.Blockstore, count int) []cid.Cid { +func insertBlocks(t *testing.T, bs blockstore.BasicBlockstore, count int) []cid.Cid { keys := make([]cid.Cid, count) for i := 0; i < count; i++ { block := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i))) diff --git a/blockstore/fallback.go b/blockstore/fallback.go index 3a71913d4..5f220f941 100644 --- a/blockstore/fallback.go +++ b/blockstore/fallback.go @@ -11,6 +11,19 @@ import ( "github.com/ipfs/go-cid" ) +// UnwrapFallbackStore takes a blockstore, and returns the underlying blockstore +// if it was a FallbackStore. Otherwise, it just returns the supplied store +// unmodified. +func UnwrapFallbackStore(bs Blockstore) (Blockstore, bool) { + if fbs, ok := bs.(*FallbackStore); ok { + return fbs.Blockstore, true + } + return bs, false +} + +// FallbackStore is a read-through store that queries another (potentially +// remote) source if the block is not found locally. If the block is found +// during the fallback, it stores it in the local store. type FallbackStore struct { Blockstore @@ -30,7 +43,7 @@ func (fbs *FallbackStore) SetFallback(missFn func(context.Context, cid.Cid) (blo } func (fbs *FallbackStore) getFallback(c cid.Cid) (blocks.Block, error) { - log.Errorw("fallbackstore: Block not found locally, fetching from the network", "cid", c) + log.Warnf("fallbackstore: block not found locally, fetching from the network; cid: %s", c) fbs.lk.RLock() defer fbs.lk.RUnlock() diff --git a/blockstore/metrics.go b/blockstore/metrics.go new file mode 100644 index 000000000..737690a11 --- /dev/null +++ b/blockstore/metrics.go @@ -0,0 +1,154 @@ +package blockstore + +import ( + "time" + + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" +) + +// +// Currently unused, but kept in repo in case we introduce one of the candidate +// cache implementations (Freecache, Ristretto), both of which report these +// metrics. +// + +// CacheMetricsEmitInterval is the interval at which metrics are emitted onto +// OpenCensus. +var CacheMetricsEmitInterval = 5 * time.Second + +var ( + CacheName, _ = tag.NewKey("cache_name") +) + +// CacheMeasures groups all metrics emitted by the blockstore caches. +var CacheMeasures = struct { + HitRatio *stats.Float64Measure + Hits *stats.Int64Measure + Misses *stats.Int64Measure + Entries *stats.Int64Measure + QueriesServed *stats.Int64Measure + Adds *stats.Int64Measure + Updates *stats.Int64Measure + Evictions *stats.Int64Measure + CostAdded *stats.Int64Measure + CostEvicted *stats.Int64Measure + SetsDropped *stats.Int64Measure + SetsRejected *stats.Int64Measure + QueriesDropped *stats.Int64Measure +}{ + HitRatio: stats.Float64("blockstore/cache/hit_ratio", "Hit ratio of blockstore cache", stats.UnitDimensionless), + Hits: stats.Int64("blockstore/cache/hits", "Total number of hits at blockstore cache", stats.UnitDimensionless), + Misses: stats.Int64("blockstore/cache/misses", "Total number of misses at blockstore cache", stats.UnitDimensionless), + Entries: stats.Int64("blockstore/cache/entry_count", "Total number of entries currently in the blockstore cache", stats.UnitDimensionless), + QueriesServed: stats.Int64("blockstore/cache/queries_served", "Total number of queries served by the blockstore cache", stats.UnitDimensionless), + Adds: stats.Int64("blockstore/cache/adds", "Total number of adds to blockstore cache", stats.UnitDimensionless), + Updates: stats.Int64("blockstore/cache/updates", "Total number of updates in blockstore cache", stats.UnitDimensionless), + Evictions: stats.Int64("blockstore/cache/evictions", "Total number of evictions from blockstore cache", stats.UnitDimensionless), + CostAdded: stats.Int64("blockstore/cache/cost_added", "Total cost (byte size) of entries added into blockstore cache", stats.UnitBytes), + CostEvicted: stats.Int64("blockstore/cache/cost_evicted", "Total cost (byte size) of entries evicted by blockstore cache", stats.UnitBytes), + SetsDropped: stats.Int64("blockstore/cache/sets_dropped", "Total number of sets dropped by blockstore cache", stats.UnitDimensionless), + SetsRejected: stats.Int64("blockstore/cache/sets_rejected", "Total number of sets rejected by blockstore cache", stats.UnitDimensionless), + QueriesDropped: stats.Int64("blockstore/cache/queries_dropped", "Total number of queries dropped by blockstore cache", stats.UnitDimensionless), +} + +// CacheViews groups all cache-related default views. +var CacheViews = struct { + HitRatio *view.View + Hits *view.View + Misses *view.View + Entries *view.View + QueriesServed *view.View + Adds *view.View + Updates *view.View + Evictions *view.View + CostAdded *view.View + CostEvicted *view.View + SetsDropped *view.View + SetsRejected *view.View + QueriesDropped *view.View +}{ + HitRatio: &view.View{ + Measure: CacheMeasures.HitRatio, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{CacheName}, + }, + Hits: &view.View{ + Measure: CacheMeasures.Hits, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{CacheName}, + }, + Misses: &view.View{ + Measure: CacheMeasures.Misses, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{CacheName}, + }, + Entries: &view.View{ + Measure: CacheMeasures.Entries, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{CacheName}, + }, + QueriesServed: &view.View{ + Measure: CacheMeasures.QueriesServed, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{CacheName}, + }, + Adds: &view.View{ + Measure: CacheMeasures.Adds, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{CacheName}, + }, + Updates: &view.View{ + Measure: CacheMeasures.Updates, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{CacheName}, + }, + Evictions: &view.View{ + Measure: CacheMeasures.Evictions, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{CacheName}, + }, + CostAdded: &view.View{ + Measure: CacheMeasures.CostAdded, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{CacheName}, + }, + CostEvicted: &view.View{ + Measure: CacheMeasures.CostEvicted, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{CacheName}, + }, + SetsDropped: &view.View{ + Measure: CacheMeasures.SetsDropped, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{CacheName}, + }, + SetsRejected: &view.View{ + Measure: CacheMeasures.SetsRejected, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{CacheName}, + }, + QueriesDropped: &view.View{ + Measure: CacheMeasures.QueriesDropped, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{CacheName}, + }, +} + +// DefaultViews exports all default views for this package. +var DefaultViews = []*view.View{ + CacheViews.HitRatio, + CacheViews.Hits, + CacheViews.Misses, + CacheViews.Entries, + CacheViews.QueriesServed, + CacheViews.Adds, + CacheViews.Updates, + CacheViews.Evictions, + CacheViews.CostAdded, + CacheViews.CostEvicted, + CacheViews.SetsDropped, + CacheViews.SetsRejected, + CacheViews.QueriesDropped, +} diff --git a/chain/gen/gen.go b/chain/gen/gen.go index 18cbb64f7..d06c755fa 100644 --- a/chain/gen/gen.go +++ b/chain/gen/gen.go @@ -125,7 +125,7 @@ func NewGeneratorWithSectors(numSectors int) (*ChainGen, error) { return nil, xerrors.Errorf("failed to get metadata datastore: %w", err) } - bs, err := lr.Blockstore(context.TODO(), repo.BlockstoreChain) + bs, err := lr.Blockstore(context.TODO(), repo.UniversalBlockstore) if err != nil { return nil, err } diff --git a/chain/gen/genesis/genesis.go b/chain/gen/genesis/genesis.go index 92b4919c1..3a4e317a5 100644 --- a/chain/gen/genesis/genesis.go +++ b/chain/gen/genesis/genesis.go @@ -406,7 +406,7 @@ func VerifyPreSealedData(ctx context.Context, cs *store.ChainStore, stateroot ci StateBase: stateroot, Epoch: 0, Rand: &fakeRand{}, - Bstore: cs.Blockstore(), + Bstore: cs.StateBlockstore(), Syscalls: mkFakedSigSyscalls(cs.VMSys()), CircSupplyCalc: nil, NtwkVersion: genesisNetworkVersion, diff --git a/chain/gen/genesis/miners.go b/chain/gen/genesis/miners.go index 850c2f39f..297543886 100644 --- a/chain/gen/genesis/miners.go +++ b/chain/gen/genesis/miners.go @@ -70,7 +70,7 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sroot cid.Cid StateBase: sroot, Epoch: 0, Rand: &fakeRand{}, - Bstore: cs.Blockstore(), + Bstore: cs.StateBlockstore(), Syscalls: mkFakedSigSyscalls(cs.VMSys()), CircSupplyCalc: csc, NtwkVersion: genesisNetworkVersion, diff --git a/chain/gen/mining.go b/chain/gen/mining.go index 5de0fec0e..3c6a89873 100644 --- a/chain/gen/mining.go +++ b/chain/gen/mining.go @@ -79,7 +79,7 @@ func MinerCreateBlock(ctx context.Context, sm *stmgr.StateManager, w api.WalletA } } - store := sm.ChainStore().Store(ctx) + store := sm.ChainStore().ActorStore(ctx) blsmsgroot, err := toArray(store, blsMsgCids) if err != nil { return nil, xerrors.Errorf("building bls amt: %w", err) diff --git a/chain/stmgr/call.go b/chain/stmgr/call.go index bb0f0e5ec..89f91b0b7 100644 --- a/chain/stmgr/call.go +++ b/chain/stmgr/call.go @@ -59,7 +59,7 @@ func (sm *StateManager) Call(ctx context.Context, msg *types.Message, ts *types. StateBase: bstate, Epoch: bheight, Rand: store.NewChainRand(sm.cs, ts.Cids()), - Bstore: sm.cs.Blockstore(), + Bstore: sm.cs.StateBlockstore(), Syscalls: sm.cs.VMSys(), CircSupplyCalc: sm.GetVMCirculatingSupply, NtwkVersion: sm.GetNtwkVersion, @@ -174,7 +174,7 @@ func (sm *StateManager) CallWithGas(ctx context.Context, msg *types.Message, pri StateBase: state, Epoch: ts.Height() + 1, Rand: r, - Bstore: sm.cs.Blockstore(), + Bstore: sm.cs.StateBlockstore(), Syscalls: sm.cs.VMSys(), CircSupplyCalc: sm.GetVMCirculatingSupply, NtwkVersion: sm.GetNtwkVersion, diff --git a/chain/stmgr/forks.go b/chain/stmgr/forks.go index 90dcaf729..899397940 100644 --- a/chain/stmgr/forks.go +++ b/chain/stmgr/forks.go @@ -504,7 +504,7 @@ func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ Migratio } case builtin0.StorageMinerActorCodeID: var st miner0.State - if err := sm.ChainStore().Store(ctx).Get(ctx, act.Head, &st); err != nil { + if err := sm.ChainStore().ActorStore(ctx).Get(ctx, act.Head, &st); err != nil { return xerrors.Errorf("failed to load miner state: %w", err) } @@ -548,7 +548,7 @@ func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ Migratio return cid.Undef, xerrors.Errorf("failed to load power actor: %w", err) } - cst := cbor.NewCborStore(sm.ChainStore().Blockstore()) + cst := cbor.NewCborStore(sm.ChainStore().StateBlockstore()) if err := cst.Get(ctx, powAct.Head, &ps); err != nil { return cid.Undef, xerrors.Errorf("failed to get power actor state: %w", err) } @@ -582,7 +582,7 @@ func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ Migratio } case builtin0.StorageMinerActorCodeID: var st miner0.State - if err := sm.ChainStore().Store(ctx).Get(ctx, act.Head, &st); err != nil { + if err := sm.ChainStore().ActorStore(ctx).Get(ctx, act.Head, &st); err != nil { return xerrors.Errorf("failed to load miner state: %w", err) } @@ -591,7 +591,7 @@ func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ Migratio return xerrors.Errorf("failed to get miner info: %w", err) } - sectorsArr, err := adt0.AsArray(sm.ChainStore().Store(ctx), st.Sectors) + sectorsArr, err := adt0.AsArray(sm.ChainStore().ActorStore(ctx), st.Sectors) if err != nil { return xerrors.Errorf("failed to load sectors array: %w", err) } @@ -611,11 +611,11 @@ func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ Migratio lbact, err := lbtree.GetActor(addr) if err == nil { var lbst miner0.State - if err := sm.ChainStore().Store(ctx).Get(ctx, lbact.Head, &lbst); err != nil { + if err := sm.ChainStore().ActorStore(ctx).Get(ctx, lbact.Head, &lbst); err != nil { return xerrors.Errorf("failed to load miner state: %w", err) } - lbsectors, err := adt0.AsArray(sm.ChainStore().Store(ctx), lbst.Sectors) + lbsectors, err := adt0.AsArray(sm.ChainStore().ActorStore(ctx), lbst.Sectors) if err != nil { return xerrors.Errorf("failed to load lb sectors array: %w", err) } @@ -711,7 +711,7 @@ func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ Migratio } func UpgradeIgnition(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) { - store := sm.cs.Store(ctx) + store := sm.cs.ActorStore(ctx) if build.UpgradeLiftoffHeight <= epoch { return cid.Undef, xerrors.Errorf("liftoff height must be beyond ignition height") @@ -767,7 +767,7 @@ func UpgradeIgnition(ctx context.Context, sm *StateManager, _ MigrationCache, cb func UpgradeRefuel(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) { - store := sm.cs.Store(ctx) + store := sm.cs.ActorStore(ctx) tree, err := sm.StateTree(root) if err != nil { return cid.Undef, xerrors.Errorf("getting state tree: %w", err) @@ -792,7 +792,7 @@ func UpgradeRefuel(ctx context.Context, sm *StateManager, _ MigrationCache, cb E } func UpgradeActorsV2(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) { - buf := blockstore.NewTieredBstore(sm.cs.Blockstore(), blockstore.NewMemorySync()) + buf := blockstore.NewTieredBstore(sm.cs.StateBlockstore(), blockstore.NewMemorySync()) store := store.ActorStore(ctx, buf) info, err := store.Put(ctx, new(types.StateInfo0)) @@ -843,7 +843,7 @@ func UpgradeLiftoff(ctx context.Context, sm *StateManager, _ MigrationCache, cb return cid.Undef, xerrors.Errorf("getting state tree: %w", err) } - err = setNetworkName(ctx, sm.cs.Store(ctx), tree, "mainnet") + err = setNetworkName(ctx, sm.cs.ActorStore(ctx), tree, "mainnet") if err != nil { return cid.Undef, xerrors.Errorf("setting network name: %w", err) } @@ -852,7 +852,7 @@ func UpgradeLiftoff(ctx context.Context, sm *StateManager, _ MigrationCache, cb } func UpgradeCalico(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) { - store := sm.cs.Store(ctx) + store := sm.cs.ActorStore(ctx) var stateRoot types.StateRoot if err := store.Get(ctx, root, &stateRoot); err != nil { return cid.Undef, xerrors.Errorf("failed to decode state root: %w", err) @@ -1009,7 +1009,7 @@ func upgradeActorsV3Common( root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet, config nv10.Config, ) (cid.Cid, error) { - buf := blockstore.NewTieredBstore(sm.cs.Blockstore(), blockstore.NewMemorySync()) + buf := blockstore.NewTieredBstore(sm.cs.StateBlockstore(), blockstore.NewMemorySync()) store := store.ActorStore(ctx, buf) // Load the state root. @@ -1239,7 +1239,7 @@ func resetGenesisMsigs0(ctx context.Context, sm *StateManager, store adt0.Store, return xerrors.Errorf("getting genesis tipset: %w", err) } - cst := cbor.NewCborStore(sm.cs.Blockstore()) + cst := cbor.NewCborStore(sm.cs.StateBlockstore()) genesisTree, err := state.LoadStateTree(cst, gts.ParentState()) if err != nil { return xerrors.Errorf("loading state tree: %w", err) diff --git a/chain/stmgr/forks_test.go b/chain/stmgr/forks_test.go index 95e7ef699..e456dc436 100644 --- a/chain/stmgr/forks_test.go +++ b/chain/stmgr/forks_test.go @@ -125,7 +125,7 @@ func TestForkHeightTriggers(t *testing.T) { Height: testForkHeight, Migration: func(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecCallback, root cid.Cid, height abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) { - cst := ipldcbor.NewCborStore(sm.ChainStore().Blockstore()) + cst := ipldcbor.NewCborStore(sm.ChainStore().StateBlockstore()) st, err := sm.StateTree(root) if err != nil { diff --git a/chain/stmgr/read.go b/chain/stmgr/read.go index 9a9b80265..3c7fb5d91 100644 --- a/chain/stmgr/read.go +++ b/chain/stmgr/read.go @@ -22,7 +22,7 @@ func (sm *StateManager) ParentStateTsk(tsk types.TipSetKey) (*state.StateTree, e } func (sm *StateManager) ParentState(ts *types.TipSet) (*state.StateTree, error) { - cst := cbor.NewCborStore(sm.cs.Blockstore()) + cst := cbor.NewCborStore(sm.cs.StateBlockstore()) state, err := state.LoadStateTree(cst, sm.parentState(ts)) if err != nil { return nil, xerrors.Errorf("load state tree: %w", err) @@ -32,7 +32,7 @@ func (sm *StateManager) ParentState(ts *types.TipSet) (*state.StateTree, error) } func (sm *StateManager) StateTree(st cid.Cid) (*state.StateTree, error) { - cst := cbor.NewCborStore(sm.cs.Blockstore()) + cst := cbor.NewCborStore(sm.cs.StateBlockstore()) state, err := state.LoadStateTree(cst, st) if err != nil { return nil, xerrors.Errorf("load state tree: %w", err) diff --git a/chain/stmgr/stmgr.go b/chain/stmgr/stmgr.go index 73088ba2a..d6c984280 100644 --- a/chain/stmgr/stmgr.go +++ b/chain/stmgr/stmgr.go @@ -286,7 +286,7 @@ func (sm *StateManager) ApplyBlocks(ctx context.Context, parentEpoch abi.ChainEp StateBase: base, Epoch: epoch, Rand: r, - Bstore: sm.cs.Blockstore(), + Bstore: sm.cs.StateBlockstore(), Syscalls: sm.cs.VMSys(), CircSupplyCalc: sm.GetVMCirculatingSupply, NtwkVersion: sm.GetNtwkVersion, @@ -430,7 +430,8 @@ func (sm *StateManager) ApplyBlocks(ctx context.Context, parentEpoch abi.ChainEp return cid.Cid{}, cid.Cid{}, err } - rectarr := blockadt.MakeEmptyArray(sm.cs.Store(ctx)) + // XXX: Is the height correct? Or should it be epoch-1? + rectarr := blockadt.MakeEmptyArray(sm.cs.ActorStore(ctx)) for i, receipt := range receipts { if err := rectarr.Set(uint64(i), receipt); err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("failed to build receipts amt: %w", err) @@ -515,7 +516,7 @@ func (sm *StateManager) ResolveToKeyAddress(ctx context.Context, addr address.Ad ts = sm.cs.GetHeaviestTipSet() } - cst := cbor.NewCborStore(sm.cs.Blockstore()) + cst := cbor.NewCborStore(sm.cs.StateBlockstore()) // First try to resolve the actor in the parent state, so we don't have to compute anything. tree, err := state.LoadStateTree(cst, ts.ParentState()) @@ -556,7 +557,7 @@ func (sm *StateManager) GetBlsPublicKey(ctx context.Context, addr address.Addres } func (sm *StateManager) LookupID(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) { - cst := cbor.NewCborStore(sm.cs.Blockstore()) + cst := cbor.NewCborStore(sm.cs.StateBlockstore()) state, err := state.LoadStateTree(cst, sm.parentState(ts)) if err != nil { return address.Undef, xerrors.Errorf("load state tree: %w", err) @@ -882,7 +883,7 @@ func (sm *StateManager) MarketBalance(ctx context.Context, addr address.Address, return api.MarketBalance{}, err } - mstate, err := market.Load(sm.cs.Store(ctx), act) + mstate, err := market.Load(sm.cs.ActorStore(ctx), act) if err != nil { return api.MarketBalance{}, err } @@ -966,7 +967,7 @@ func (sm *StateManager) setupGenesisVestingSchedule(ctx context.Context) error { return xerrors.Errorf("getting genesis tipset state: %w", err) } - cst := cbor.NewCborStore(sm.cs.Blockstore()) + cst := cbor.NewCborStore(sm.cs.StateBlockstore()) sTree, err := state.LoadStateTree(cst, st) if err != nil { return xerrors.Errorf("loading state tree: %w", err) @@ -1325,7 +1326,7 @@ func (sm *StateManager) GetCirculatingSupply(ctx context.Context, height abi.Cha unCirc = big.Add(unCirc, actor.Balance) case a == market.Address: - mst, err := market.Load(sm.cs.Store(ctx), actor) + mst, err := market.Load(sm.cs.ActorStore(ctx), actor) if err != nil { return err } @@ -1342,7 +1343,7 @@ func (sm *StateManager) GetCirculatingSupply(ctx context.Context, height abi.Cha circ = big.Add(circ, actor.Balance) case builtin.IsStorageMinerActor(actor.Code): - mst, err := miner.Load(sm.cs.Store(ctx), actor) + mst, err := miner.Load(sm.cs.ActorStore(ctx), actor) if err != nil { return err } @@ -1359,7 +1360,7 @@ func (sm *StateManager) GetCirculatingSupply(ctx context.Context, height abi.Cha } case builtin.IsMultisigActor(actor.Code): - mst, err := multisig.Load(sm.cs.Store(ctx), actor) + mst, err := multisig.Load(sm.cs.ActorStore(ctx), actor) if err != nil { return err } @@ -1413,7 +1414,7 @@ func (sm *StateManager) GetPaychState(ctx context.Context, addr address.Address, return nil, nil, err } - actState, err := paych.Load(sm.cs.Store(ctx), act) + actState, err := paych.Load(sm.cs.ActorStore(ctx), act) if err != nil { return nil, nil, err } @@ -1431,7 +1432,7 @@ func (sm *StateManager) GetMarketState(ctx context.Context, ts *types.TipSet) (m return nil, err } - actState, err := market.Load(sm.cs.Store(ctx), act) + actState, err := market.Load(sm.cs.ActorStore(ctx), act) if err != nil { return nil, err } diff --git a/chain/stmgr/utils.go b/chain/stmgr/utils.go index 86bb3a6e0..947310c75 100644 --- a/chain/stmgr/utils.go +++ b/chain/stmgr/utils.go @@ -48,7 +48,7 @@ func GetNetworkName(ctx context.Context, sm *StateManager, st cid.Cid) (dtypes.N if err != nil { return "", err } - ias, err := init_.Load(sm.cs.Store(ctx), act) + ias, err := init_.Load(sm.cs.ActorStore(ctx), act) if err != nil { return "", err } @@ -65,7 +65,7 @@ func GetMinerWorkerRaw(ctx context.Context, sm *StateManager, st cid.Cid, maddr if err != nil { return address.Undef, xerrors.Errorf("(get sset) failed to load miner actor: %w", err) } - mas, err := miner.Load(sm.cs.Store(ctx), act) + mas, err := miner.Load(sm.cs.ActorStore(ctx), act) if err != nil { return address.Undef, xerrors.Errorf("(get sset) failed to load miner actor state: %w", err) } @@ -75,7 +75,7 @@ func GetMinerWorkerRaw(ctx context.Context, sm *StateManager, st cid.Cid, maddr return address.Undef, xerrors.Errorf("failed to load actor info: %w", err) } - return vm.ResolveToKeyAddr(state, sm.cs.Store(ctx), info.Worker) + return vm.ResolveToKeyAddr(state, sm.cs.ActorStore(ctx), info.Worker) } func GetPower(ctx context.Context, sm *StateManager, ts *types.TipSet, maddr address.Address) (power.Claim, power.Claim, bool, error) { @@ -88,7 +88,7 @@ func GetPowerRaw(ctx context.Context, sm *StateManager, st cid.Cid, maddr addres return power.Claim{}, power.Claim{}, false, xerrors.Errorf("(get sset) failed to load power actor state: %w", err) } - pas, err := power.Load(sm.cs.Store(ctx), act) + pas, err := power.Load(sm.cs.ActorStore(ctx), act) if err != nil { return power.Claim{}, power.Claim{}, false, err } @@ -123,7 +123,7 @@ func PreCommitInfo(ctx context.Context, sm *StateManager, maddr address.Address, return nil, xerrors.Errorf("(get sset) failed to load miner actor: %w", err) } - mas, err := miner.Load(sm.cs.Store(ctx), act) + mas, err := miner.Load(sm.cs.ActorStore(ctx), act) if err != nil { return nil, xerrors.Errorf("(get sset) failed to load miner actor state: %w", err) } @@ -137,7 +137,7 @@ func MinerSectorInfo(ctx context.Context, sm *StateManager, maddr address.Addres return nil, xerrors.Errorf("(get sset) failed to load miner actor: %w", err) } - mas, err := miner.Load(sm.cs.Store(ctx), act) + mas, err := miner.Load(sm.cs.ActorStore(ctx), act) if err != nil { return nil, xerrors.Errorf("(get sset) failed to load miner actor state: %w", err) } @@ -151,7 +151,7 @@ func GetSectorsForWinningPoSt(ctx context.Context, nv network.Version, pv ffiwra return nil, xerrors.Errorf("failed to load miner actor: %w", err) } - mas, err := miner.Load(sm.cs.Store(ctx), act) + mas, err := miner.Load(sm.cs.ActorStore(ctx), act) if err != nil { return nil, xerrors.Errorf("failed to load miner actor state: %w", err) } @@ -249,7 +249,7 @@ func GetMinerSlashed(ctx context.Context, sm *StateManager, ts *types.TipSet, ma return false, xerrors.Errorf("failed to load power actor: %w", err) } - spas, err := power.Load(sm.cs.Store(ctx), act) + spas, err := power.Load(sm.cs.ActorStore(ctx), act) if err != nil { return false, xerrors.Errorf("failed to load power actor state: %w", err) } @@ -272,7 +272,7 @@ func GetStorageDeal(ctx context.Context, sm *StateManager, dealID abi.DealID, ts return nil, xerrors.Errorf("failed to load market actor: %w", err) } - state, err := market.Load(sm.cs.Store(ctx), act) + state, err := market.Load(sm.cs.ActorStore(ctx), act) if err != nil { return nil, xerrors.Errorf("failed to load market actor state: %w", err) } @@ -320,7 +320,7 @@ func ListMinerActors(ctx context.Context, sm *StateManager, ts *types.TipSet) ([ return nil, xerrors.Errorf("failed to load power actor: %w", err) } - powState, err := power.Load(sm.cs.Store(ctx), act) + powState, err := power.Load(sm.cs.ActorStore(ctx), act) if err != nil { return nil, xerrors.Errorf("failed to load power actor state: %w", err) } @@ -353,7 +353,7 @@ func ComputeState(ctx context.Context, sm *StateManager, height abi.ChainEpoch, StateBase: base, Epoch: height, Rand: r, - Bstore: sm.cs.Blockstore(), + Bstore: sm.cs.StateBlockstore(), Syscalls: sm.cs.VMSys(), CircSupplyCalc: sm.GetVMCirculatingSupply, NtwkVersion: sm.GetNtwkVersion, @@ -474,7 +474,7 @@ func MinerGetBaseInfo(ctx context.Context, sm *StateManager, bcs beacon.Schedule return nil, xerrors.Errorf("failed to load miner actor: %w", err) } - mas, err := miner.Load(sm.cs.Store(ctx), act) + mas, err := miner.Load(sm.cs.ActorStore(ctx), act) if err != nil { return nil, xerrors.Errorf("failed to load miner actor state: %w", err) } @@ -623,7 +623,7 @@ func minerHasMinPower(ctx context.Context, sm *StateManager, addr address.Addres return false, xerrors.Errorf("loading power actor state: %w", err) } - ps, err := power.Load(sm.cs.Store(ctx), pact) + ps, err := power.Load(sm.cs.ActorStore(ctx), pact) if err != nil { return false, err } @@ -654,7 +654,7 @@ func MinerEligibleToMine(ctx context.Context, sm *StateManager, addr address.Add return false, xerrors.Errorf("loading power actor state: %w", err) } - pstate, err := power.Load(sm.cs.Store(ctx), pact) + pstate, err := power.Load(sm.cs.ActorStore(ctx), pact) if err != nil { return false, err } @@ -664,7 +664,7 @@ func MinerEligibleToMine(ctx context.Context, sm *StateManager, addr address.Add return false, xerrors.Errorf("loading miner actor state: %w", err) } - mstate, err := miner.Load(sm.cs.Store(ctx), mact) + mstate, err := miner.Load(sm.cs.ActorStore(ctx), mact) if err != nil { return false, err } @@ -696,7 +696,7 @@ func MinerEligibleToMine(ctx context.Context, sm *StateManager, addr address.Add } func CheckTotalFIL(ctx context.Context, sm *StateManager, ts *types.TipSet) (abi.TokenAmount, error) { - str, err := state.LoadStateTree(sm.ChainStore().Store(ctx), ts.ParentState()) + str, err := state.LoadStateTree(sm.ChainStore().ActorStore(ctx), ts.ParentState()) if err != nil { return abi.TokenAmount{}, err } diff --git a/chain/store/store.go b/chain/store/store.go index 8cbd5da37..1244995fc 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -107,11 +107,11 @@ type HeadChangeEvt struct { // 1. a tipset cache // 2. a block => messages references cache. type ChainStore struct { - bs bstore.Blockstore - localbs bstore.Blockstore - ds dstore.Batching + chainBlockstore bstore.Blockstore + stateBlockstore bstore.Blockstore + metadataDs dstore.Batching - localviewer bstore.Viewer + chainLocalBlockstore bstore.Blockstore heaviestLk sync.Mutex heaviest *types.TipSet @@ -139,30 +139,30 @@ type ChainStore struct { wg sync.WaitGroup } -// localbs is guaranteed to fail Get* if requested block isn't stored locally -func NewChainStore(bs bstore.Blockstore, localbs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder, j journal.Journal) *ChainStore { - mmCache, _ := lru.NewARC(DefaultMsgMetaCacheSize) - tsCache, _ := lru.NewARC(DefaultTipSetCacheSize) +// chainLocalBlockstore is guaranteed to fail Get* if requested block isn't stored locally +func NewChainStore(chainBs bstore.Blockstore, stateBs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder, j journal.Journal) *ChainStore { + c, _ := lru.NewARC(DefaultMsgMetaCacheSize) + tsc, _ := lru.NewARC(DefaultTipSetCacheSize) if j == nil { j = journal.NilJournal() } ctx, cancel := context.WithCancel(context.Background()) + // unwraps the fallback store in case one is configured. + // some methods _need_ to operate on a local blockstore only. + localbs, _ := bstore.UnwrapFallbackStore(chainBs) cs := &ChainStore{ - bs: bs, - localbs: localbs, - ds: ds, - bestTips: pubsub.New(64), - tipsets: make(map[abi.ChainEpoch][]cid.Cid), - mmCache: mmCache, - tsCache: tsCache, - vmcalls: vmcalls, - cancelFn: cancel, - journal: j, - } - - if v, ok := localbs.(bstore.Viewer); ok { - cs.localviewer = v + chainBlockstore: chainBs, + stateBlockstore: stateBs, + chainLocalBlockstore: localbs, + metadataDs: ds, + bestTips: pubsub.New(64), + tipsets: make(map[abi.ChainEpoch][]cid.Cid), + mmCache: c, + tsCache: tsc, + vmcalls: vmcalls, + cancelFn: cancel, + journal: j, } cs.evtTypes = [1]journal.EventType{ @@ -216,7 +216,7 @@ func (cs *ChainStore) Close() error { } func (cs *ChainStore) Load() error { - head, err := cs.ds.Get(chainHeadKey) + head, err := cs.metadataDs.Get(chainHeadKey) if err == dstore.ErrNotFound { log.Warn("no previous chain state found") return nil @@ -246,7 +246,7 @@ func (cs *ChainStore) writeHead(ts *types.TipSet) error { return xerrors.Errorf("failed to marshal tipset: %w", err) } - if err := cs.ds.Put(chainHeadKey, data); err != nil { + if err := cs.metadataDs.Put(chainHeadKey, data); err != nil { return xerrors.Errorf("failed to write chain head to datastore: %w", err) } @@ -306,13 +306,13 @@ func (cs *ChainStore) SubscribeHeadChanges(f ReorgNotifee) { func (cs *ChainStore) IsBlockValidated(ctx context.Context, blkid cid.Cid) (bool, error) { key := blockValidationCacheKeyPrefix.Instance(blkid.String()) - return cs.ds.Has(key) + return cs.metadataDs.Has(key) } func (cs *ChainStore) MarkBlockAsValidated(ctx context.Context, blkid cid.Cid) error { key := blockValidationCacheKeyPrefix.Instance(blkid.String()) - if err := cs.ds.Put(key, []byte{0}); err != nil { + if err := cs.metadataDs.Put(key, []byte{0}); err != nil { return xerrors.Errorf("cache block validation: %w", err) } @@ -322,7 +322,7 @@ func (cs *ChainStore) MarkBlockAsValidated(ctx context.Context, blkid cid.Cid) e func (cs *ChainStore) UnmarkBlockAsValidated(ctx context.Context, blkid cid.Cid) error { key := blockValidationCacheKeyPrefix.Instance(blkid.String()) - if err := cs.ds.Delete(key); err != nil { + if err := cs.metadataDs.Delete(key); err != nil { return xerrors.Errorf("removing from valid block cache: %w", err) } @@ -339,7 +339,7 @@ func (cs *ChainStore) SetGenesis(b *types.BlockHeader) error { return err } - return cs.ds.Put(dstore.NewKey("0"), b.Cid().Bytes()) + return cs.metadataDs.Put(dstore.NewKey("0"), b.Cid().Bytes()) } func (cs *ChainStore) PutTipSet(ctx context.Context, ts *types.TipSet) error { @@ -594,7 +594,7 @@ func (cs *ChainStore) takeHeaviestTipSet(ctx context.Context, ts *types.TipSet) // FlushValidationCache removes all results of block validation from the // chain metadata store. Usually the first step after a new chain import. func (cs *ChainStore) FlushValidationCache() error { - return FlushValidationCache(cs.ds) + return FlushValidationCache(cs.metadataDs) } func FlushValidationCache(ds datastore.Batching) error { @@ -653,7 +653,7 @@ func (cs *ChainStore) SetHead(ts *types.TipSet) error { // Contains returns whether our BlockStore has all blocks in the supplied TipSet. func (cs *ChainStore) Contains(ts *types.TipSet) (bool, error) { for _, c := range ts.Cids() { - has, err := cs.bs.Has(c) + has, err := cs.chainBlockstore.Has(c) if err != nil { return false, err } @@ -668,16 +668,8 @@ func (cs *ChainStore) Contains(ts *types.TipSet) (bool, error) { // GetBlock fetches a BlockHeader with the supplied CID. It returns // blockstore.ErrNotFound if the block was not found in the BlockStore. func (cs *ChainStore) GetBlock(c cid.Cid) (*types.BlockHeader, error) { - if cs.localviewer == nil { - sb, err := cs.localbs.Get(c) - if err != nil { - return nil, err - } - return types.DecodeBlock(sb.RawData()) - } - var blk *types.BlockHeader - err := cs.localviewer.View(c, func(b []byte) (err error) { + err := cs.chainLocalBlockstore.View(c, func(b []byte) (err error) { blk, err = types.DecodeBlock(b) return err }) @@ -851,7 +843,7 @@ func (cs *ChainStore) PersistBlockHeaders(b ...*types.BlockHeader) error { end = len(b) } - err = multierr.Append(err, cs.bs.PutMany(sbs[start:end])) + err = multierr.Append(err, cs.chainLocalBlockstore.PutMany(sbs[start:end])) } return err @@ -875,7 +867,7 @@ func PutMessage(bs bstore.Blockstore, m storable) (cid.Cid, error) { } func (cs *ChainStore) PutMessage(m storable) (cid.Cid, error) { - return PutMessage(cs.bs, m) + return PutMessage(cs.chainBlockstore, m) } func (cs *ChainStore) expandTipset(b *types.BlockHeader) (*types.TipSet, error) { @@ -936,7 +928,7 @@ func (cs *ChainStore) AddBlock(ctx context.Context, b *types.BlockHeader) error } func (cs *ChainStore) GetGenesis() (*types.BlockHeader, error) { - data, err := cs.ds.Get(dstore.NewKey("0")) + data, err := cs.metadataDs.Get(dstore.NewKey("0")) if err != nil { return nil, err } @@ -962,17 +954,8 @@ func (cs *ChainStore) GetCMessage(c cid.Cid) (types.ChainMsg, error) { } func (cs *ChainStore) GetMessage(c cid.Cid) (*types.Message, error) { - if cs.localviewer == nil { - sb, err := cs.localbs.Get(c) - if err != nil { - log.Errorf("get message get failed: %s: %s", c, err) - return nil, err - } - return types.DecodeMessage(sb.RawData()) - } - var msg *types.Message - err := cs.localviewer.View(c, func(b []byte) (err error) { + err := cs.chainLocalBlockstore.View(c, func(b []byte) (err error) { msg, err = types.DecodeMessage(b) return err }) @@ -980,17 +963,8 @@ func (cs *ChainStore) GetMessage(c cid.Cid) (*types.Message, error) { } func (cs *ChainStore) GetSignedMessage(c cid.Cid) (*types.SignedMessage, error) { - if cs.localviewer == nil { - sb, err := cs.localbs.Get(c) - if err != nil { - log.Errorf("get message get failed: %s: %s", c, err) - return nil, err - } - return types.DecodeSignedMessage(sb.RawData()) - } - var msg *types.SignedMessage - err := cs.localviewer.View(c, func(b []byte) (err error) { + err := cs.chainLocalBlockstore.View(c, func(b []byte) (err error) { msg, err = types.DecodeSignedMessage(b) return err }) @@ -1000,7 +974,7 @@ func (cs *ChainStore) GetSignedMessage(c cid.Cid) (*types.SignedMessage, error) func (cs *ChainStore) readAMTCids(root cid.Cid) ([]cid.Cid, error) { ctx := context.TODO() // block headers use adt0, for now. - a, err := blockadt.AsArray(cs.Store(ctx), root) + a, err := blockadt.AsArray(cs.ActorStore(ctx), root) if err != nil { return nil, xerrors.Errorf("amt load: %w", err) } @@ -1124,7 +1098,7 @@ func (cs *ChainStore) ReadMsgMetaCids(mmc cid.Cid) ([]cid.Cid, []cid.Cid, error) return mmcids.bls, mmcids.secpk, nil } - cst := cbor.NewCborStore(cs.localbs) + cst := cbor.NewCborStore(cs.chainLocalBlockstore) var msgmeta types.MsgMeta if err := cst.Get(context.TODO(), mmc, &msgmeta); err != nil { return nil, nil, xerrors.Errorf("failed to load msgmeta (%s): %w", mmc, err) @@ -1194,7 +1168,7 @@ func (cs *ChainStore) MessagesForBlock(b *types.BlockHeader) ([]*types.Message, func (cs *ChainStore) GetParentReceipt(b *types.BlockHeader, i int) (*types.MessageReceipt, error) { ctx := context.TODO() // block headers use adt0, for now. - a, err := blockadt.AsArray(cs.Store(ctx), b.ParentMessageReceipts) + a, err := blockadt.AsArray(cs.ActorStore(ctx), b.ParentMessageReceipts) if err != nil { return nil, xerrors.Errorf("amt load: %w", err) } @@ -1237,16 +1211,26 @@ func (cs *ChainStore) LoadSignedMessagesFromCids(cids []cid.Cid) ([]*types.Signe return msgs, nil } -func (cs *ChainStore) Blockstore() bstore.Blockstore { - return cs.bs +// ChainBlockstore returns the chain blockstore. Currently the chain and state +// // stores are both backed by the same physical store, albeit with different +// // caching policies, but in the future they will segregate. +func (cs *ChainStore) ChainBlockstore() bstore.Blockstore { + return cs.chainBlockstore +} + +// StateBlockstore returns the state blockstore. Currently the chain and state +// stores are both backed by the same physical store, albeit with different +// caching policies, but in the future they will segregate. +func (cs *ChainStore) StateBlockstore() bstore.Blockstore { + return cs.stateBlockstore } func ActorStore(ctx context.Context, bs bstore.Blockstore) adt.Store { return adt.WrapStore(ctx, cbor.NewCborStore(bs)) } -func (cs *ChainStore) Store(ctx context.Context) adt.Store { - return ActorStore(ctx, cs.bs) +func (cs *ChainStore) ActorStore(ctx context.Context) adt.Store { + return ActorStore(ctx, cs.stateBlockstore) } func (cs *ChainStore) VMSys() vm.SyscallBuilder { @@ -1444,8 +1428,8 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo return xerrors.Errorf("failed to write car header: %s", err) } - return cs.WalkSnapshot(ctx, ts, inclRecentRoots, skipOldMsgs, func(c cid.Cid) error { - blk, err := cs.bs.Get(c) + return cs.WalkSnapshot(ctx, ts, inclRecentRoots, skipOldMsgs, true, func(c cid.Cid) error { + blk, err := cs.chainBlockstore.Get(c) if err != nil { return xerrors.Errorf("writing object to car, bs.Get: %w", err) } @@ -1458,7 +1442,7 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo }) } -func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs bool, cb func(cid.Cid) error) error { +func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs, skipMsgReceipts bool, cb func(cid.Cid) error) error { if ts == nil { ts = cs.GetHeaviestTipSet() } @@ -1478,7 +1462,7 @@ func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRe return err } - data, err := cs.bs.Get(blk) + data, err := cs.chainBlockstore.Get(blk) if err != nil { return xerrors.Errorf("getting block: %w", err) } @@ -1498,7 +1482,7 @@ func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRe var cids []cid.Cid if !skipOldMsgs || b.Height > ts.Height()-inclRecentRoots { if walked.Visit(b.Messages) { - mcids, err := recurseLinks(cs.bs, walked, b.Messages, []cid.Cid{b.Messages}) + mcids, err := recurseLinks(cs.chainBlockstore, walked, b.Messages, []cid.Cid{b.Messages}) if err != nil { return xerrors.Errorf("recursing messages failed: %w", err) } @@ -1519,13 +1503,17 @@ func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRe if b.Height == 0 || b.Height > ts.Height()-inclRecentRoots { if walked.Visit(b.ParentStateRoot) { - cids, err := recurseLinks(cs.bs, walked, b.ParentStateRoot, []cid.Cid{b.ParentStateRoot}) + cids, err := recurseLinks(cs.chainBlockstore, walked, b.ParentStateRoot, []cid.Cid{b.ParentStateRoot}) if err != nil { return xerrors.Errorf("recursing genesis state failed: %w", err) } out = append(out, cids...) } + + if !skipMsgReceipts && walked.Visit(b.ParentMessageReceipts) { + out = append(out, b.ParentMessageReceipts) + } } for _, c := range out { @@ -1561,7 +1549,7 @@ func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRe } func (cs *ChainStore) Import(r io.Reader) (*types.TipSet, error) { - header, err := car.LoadCar(cs.Blockstore(), r) + header, err := car.LoadCar(cs.StateBlockstore(), r) if err != nil { return nil, xerrors.Errorf("loadcar failed: %w", err) } diff --git a/chain/store/store_test.go b/chain/store/store_test.go index 9afe6ba79..51e2e08d0 100644 --- a/chain/store/store_test.go +++ b/chain/store/store_test.go @@ -52,7 +52,7 @@ func BenchmarkGetRandomness(b *testing.B) { b.Fatal(err) } - bs, err := lr.Blockstore(context.TODO(), repo.BlockstoreChain) + bs, err := lr.Blockstore(context.TODO(), repo.UniversalBlockstore) if err != nil { b.Fatal(err) } diff --git a/chain/store/weight.go b/chain/store/weight.go index 9100df315..42546d5e3 100644 --- a/chain/store/weight.go +++ b/chain/store/weight.go @@ -28,7 +28,7 @@ func (cs *ChainStore) Weight(ctx context.Context, ts *types.TipSet) (types.BigIn tpow := big2.Zero() { - cst := cbor.NewCborStore(cs.Blockstore()) + cst := cbor.NewCborStore(cs.StateBlockstore()) state, err := state.LoadStateTree(cst, ts.ParentState()) if err != nil { return types.NewInt(0), xerrors.Errorf("load state tree: %w", err) @@ -39,7 +39,7 @@ func (cs *ChainStore) Weight(ctx context.Context, ts *types.TipSet) (types.BigIn return types.NewInt(0), xerrors.Errorf("get power actor: %w", err) } - powState, err := power.Load(cs.Store(ctx), act) + powState, err := power.Load(cs.ActorStore(ctx), act) if err != nil { return types.NewInt(0), xerrors.Errorf("failed to load power actor state: %w", err) } diff --git a/chain/sync.go b/chain/sync.go index 1743a3033..88237eb5a 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -354,7 +354,7 @@ func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error { } // Finally, flush. - return vm.Copy(context.TODO(), blockstore, syncer.store.Blockstore(), smroot) + return vm.Copy(context.TODO(), blockstore, syncer.store.ChainBlockstore(), smroot) } func (syncer *Syncer) LocalPeer() peer.ID { @@ -640,7 +640,7 @@ func (syncer *Syncer) minerIsValid(ctx context.Context, maddr address.Address, b return xerrors.Errorf("failed to load power actor: %w", err) } - powState, err := power.Load(syncer.store.Store(ctx), act) + powState, err := power.Load(syncer.store.ActorStore(ctx), act) if err != nil { return xerrors.Errorf("failed to load power actor state: %w", err) } @@ -1055,7 +1055,7 @@ func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock return err } - st, err := state.LoadStateTree(syncer.store.Store(ctx), stateroot) + st, err := state.LoadStateTree(syncer.store.ActorStore(ctx), stateroot) if err != nil { return xerrors.Errorf("failed to load base state tree: %w", err) } @@ -1172,7 +1172,7 @@ func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock } // Finally, flush. - return vm.Copy(ctx, tmpbs, syncer.store.Blockstore(), mrcid) + return vm.Copy(ctx, tmpbs, syncer.store.ChainBlockstore(), mrcid) } func (syncer *Syncer) verifyBlsAggregate(ctx context.Context, sig *crypto.Signature, msgs []cid.Cid, pubks [][]byte) error { @@ -1574,7 +1574,7 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS return err } - if err := copyBlockstore(ctx, bs, syncer.store.Blockstore()); err != nil { + if err := copyBlockstore(ctx, bs, syncer.store.ChainBlockstore()); err != nil { return xerrors.Errorf("message processing failed: %w", err) } } diff --git a/cmd/lotus-bench/import.go b/cmd/lotus-bench/import.go index 1ded9b30a..4b464bebe 100644 --- a/cmd/lotus-bench/import.go +++ b/cmd/lotus-bench/import.go @@ -20,7 +20,6 @@ import ( "github.com/cockroachdb/pebble" "github.com/cockroachdb/pebble/bloom" "github.com/ipfs/go-cid" - metricsi "github.com/ipfs/go-metrics-interface" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -204,7 +203,7 @@ var importBenchCmd = &cli.Command{ case cctx.Bool("use-native-badger"): log.Info("using native badger") var opts badgerbs.Options - if opts, err = repo.BadgerBlockstoreOptions(repo.BlockstoreChain, tdir, false); err != nil { + if opts, err = repo.BadgerBlockstoreOptions(repo.UniversalBlockstore, tdir, false); err != nil { return err } opts.SyncWrites = false @@ -236,14 +235,6 @@ var importBenchCmd = &cli.Command{ defer c.Close() //nolint:errcheck } - ctx := metricsi.CtxScope(context.Background(), "lotus") - cacheOpts := blockstore.DefaultCacheOpts() - cacheOpts.HasBloomFilterSize = 0 - bs, err = blockstore.CachedBlockstore(ctx, bs, cacheOpts) - if err != nil { - return err - } - var verifier ffiwrapper.Verifier = ffiwrapper.ProofVerifier if cctx.IsSet("syscall-cache") { scds, err := badger.NewDatastore(cctx.String("syscall-cache"), &badger.DefaultOptions) @@ -267,6 +258,15 @@ var importBenchCmd = &cli.Command{ stm := stmgr.NewStateManager(cs) + var carFile *os.File + // open the CAR file if one is provided. + if path := cctx.String("car"); path != "" { + var err error + if carFile, err = os.Open(path); err != nil { + return xerrors.Errorf("failed to open provided CAR file: %w", err) + } + } + startTime := time.Now() // register a gauge that reports how long since the measurable @@ -308,18 +308,7 @@ var importBenchCmd = &cli.Command{ writeProfile("allocs") }() - var carFile *os.File - - // open the CAR file if one is provided. - if path := cctx.String("car"); path != "" { - var err error - if carFile, err = os.Open(path); err != nil { - return xerrors.Errorf("failed to open provided CAR file: %w", err) - } - } - var head *types.TipSet - // --- IMPORT --- if !cctx.Bool("no-import") { if cctx.Bool("global-profile") { diff --git a/cmd/lotus-shed/balances.go b/cmd/lotus-shed/balances.go index 140effb3d..8c5bfefb8 100644 --- a/cmd/lotus-shed/balances.go +++ b/cmd/lotus-shed/balances.go @@ -175,7 +175,7 @@ var chainBalanceStateCmd = &cli.Command{ defer lkrepo.Close() //nolint:errcheck - bs, err := lkrepo.Blockstore(ctx, repo.BlockstoreChain) + bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } @@ -396,7 +396,7 @@ var chainPledgeCmd = &cli.Command{ defer lkrepo.Close() //nolint:errcheck - bs, err := lkrepo.Blockstore(ctx, repo.BlockstoreChain) + bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return xerrors.Errorf("failed to open blockstore: %w", err) } diff --git a/cmd/lotus-shed/datastore.go b/cmd/lotus-shed/datastore.go index 1189b5a3a..1086e8260 100644 --- a/cmd/lotus-shed/datastore.go +++ b/cmd/lotus-shed/datastore.go @@ -319,7 +319,7 @@ var datastoreRewriteCmd = &cli.Command{ ) // open the destination (to) store. - opts, err := repo.BadgerBlockstoreOptions(repo.BlockstoreChain, toPath, false) + opts, err := repo.BadgerBlockstoreOptions(repo.UniversalBlockstore, toPath, false) if err != nil { return xerrors.Errorf("failed to get badger options: %w", err) } @@ -329,7 +329,7 @@ var datastoreRewriteCmd = &cli.Command{ } // open the source (from) store. - opts, err = repo.BadgerBlockstoreOptions(repo.BlockstoreChain, fromPath, true) + opts, err = repo.BadgerBlockstoreOptions(repo.UniversalBlockstore, fromPath, true) if err != nil { return xerrors.Errorf("failed to get badger options: %w", err) } diff --git a/cmd/lotus-shed/export.go b/cmd/lotus-shed/export.go index 4820381b5..e711ba2bb 100644 --- a/cmd/lotus-shed/export.go +++ b/cmd/lotus-shed/export.go @@ -72,7 +72,7 @@ var exportChainCmd = &cli.Command{ defer fi.Close() //nolint:errcheck - bs, err := lr.Blockstore(ctx, repo.BlockstoreChain) + bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } diff --git a/cmd/lotus-shed/import-car.go b/cmd/lotus-shed/import-car.go index 7f3fa7c89..4e465029f 100644 --- a/cmd/lotus-shed/import-car.go +++ b/cmd/lotus-shed/import-car.go @@ -47,7 +47,7 @@ var importCarCmd = &cli.Command{ return xerrors.Errorf("opening the car file: %w", err) } - bs, err := lr.Blockstore(ctx, repo.BlockstoreChain) + bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return err } @@ -118,7 +118,7 @@ var importObjectCmd = &cli.Command{ } defer lr.Close() //nolint:errcheck - bs, err := lr.Blockstore(ctx, repo.BlockstoreChain) + bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } diff --git a/cmd/lotus-shed/pruning.go b/cmd/lotus-shed/pruning.go index aea548bbe..1afe76c4d 100644 --- a/cmd/lotus-shed/pruning.go +++ b/cmd/lotus-shed/pruning.go @@ -131,7 +131,7 @@ var stateTreePruneCmd = &cli.Command{ defer lkrepo.Close() //nolint:errcheck - bs, err := lkrepo.Blockstore(ctx, repo.BlockstoreChain) + bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return fmt.Errorf("failed to open blockstore: %w", err) } @@ -191,7 +191,7 @@ var stateTreePruneCmd = &cli.Command{ rrLb := abi.ChainEpoch(cctx.Int64("keep-from-lookback")) - if err := cs.WalkSnapshot(ctx, ts, rrLb, true, func(c cid.Cid) error { + if err := cs.WalkSnapshot(ctx, ts, rrLb, true, true, func(c cid.Cid) error { if goodSet.Len()%20 == 0 { fmt.Printf("\renumerating keep set: %d ", goodSet.Len()) } diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index 4226c33f7..5546ac376 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -432,7 +432,7 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool) } defer lr.Close() //nolint:errcheck - bs, err := lr.Blockstore(ctx, repo.BlockstoreChain) + bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore) if err != nil { return xerrors.Errorf("failed to open blockstore: %w", err) } diff --git a/documentation/en/architecture/architecture.md b/documentation/en/architecture/architecture.md index 5a9eee3c2..64914d539 100644 --- a/documentation/en/architecture/architecture.md +++ b/documentation/en/architecture/architecture.md @@ -311,7 +311,7 @@ FIXME: Maybe mention the `Batching` interface as the developer will stumble upon FIXME: IPFS blocks vs Filecoin blocks ideally happens before this / here -The [`Blockstore` interface](`github.com/filecoin-project/lotus/lib/blockstore.go`) structures the key-value pair +The [`Blockstore` interface](`github.com/filecoin-project/lotus/blockstore/blockstore.go`) structures the key-value pair into the CID format for the key and the [`Block` interface](`github.com/ipfs/go-block-format/blocks.go`) for the value. The `Block` value is just a raw string of bytes addressed by its hash, which is included in the CID key. diff --git a/go.mod b/go.mod index 93af360f8..5f11ea0a2 100644 --- a/go.mod +++ b/go.mod @@ -143,7 +143,7 @@ require ( go.uber.org/zap v1.16.0 golang.org/x/net v0.0.0-20201021035429-f5854403a974 golang.org/x/sync v0.0.0-20201207232520-09787c993a3a - golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f + golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 golang.org/x/time v0.0.0-20191024005414-555d28b269f0 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect diff --git a/go.sum b/go.sum index 4d0ecd0e4..c1e5494ee 100644 --- a/go.sum +++ b/go.sum @@ -1754,8 +1754,9 @@ golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200602225109-6fdc65e7d980/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200812155832-6a926be9bd1d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200926100807-9d91bd62050c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 h1:nxC68pudNYkKU6jWhgrqdreuFiOQWj1Fs7T3VrH4Pjw= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/metrics/metrics.go b/metrics/metrics.go index 996fa95b9..cb909d639 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -9,10 +9,12 @@ import ( "go.opencensus.io/tag" rpcmetrics "github.com/filecoin-project/go-jsonrpc/metrics" + + "github.com/filecoin-project/lotus/blockstore" ) // Distribution -var defaultMillisecondsDistribution = view.Distribution(0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000) +var defaultMillisecondsDistribution = view.Distribution(0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 3000, 4000, 5000, 7500, 10000, 20000, 50000, 100000) // Global Tags var ( @@ -179,33 +181,37 @@ var ( ) // DefaultViews is an array of OpenCensus views for metric gathering purposes -var DefaultViews = append([]*view.View{ - InfoView, - ChainNodeHeightView, - ChainNodeHeightExpectedView, - ChainNodeWorkerHeightView, - BlockReceivedView, - BlockValidationFailureView, - BlockValidationSuccessView, - BlockValidationDurationView, - BlockDelayView, - MessagePublishedView, - MessageReceivedView, - MessageValidationFailureView, - MessageValidationSuccessView, - PeerCountView, - PubsubPublishMessageView, - PubsubDeliverMessageView, - PubsubRejectMessageView, - PubsubDuplicateMessageView, - PubsubRecvRPCView, - PubsubSendRPCView, - PubsubDropRPCView, - APIRequestDurationView, - VMFlushCopyCountView, - VMFlushCopyDurationView, -}, - rpcmetrics.DefaultViews...) +var DefaultViews = func() []*view.View { + views := []*view.View{ + InfoView, + ChainNodeHeightView, + ChainNodeHeightExpectedView, + ChainNodeWorkerHeightView, + BlockReceivedView, + BlockValidationFailureView, + BlockValidationSuccessView, + BlockValidationDurationView, + BlockDelayView, + MessagePublishedView, + MessageReceivedView, + MessageValidationFailureView, + MessageValidationSuccessView, + PeerCountView, + PubsubPublishMessageView, + PubsubDeliverMessageView, + PubsubRejectMessageView, + PubsubDuplicateMessageView, + PubsubRecvRPCView, + PubsubSendRPCView, + PubsubDropRPCView, + APIRequestDurationView, + VMFlushCopyCountView, + VMFlushCopyDurationView, + } + views = append(views, blockstore.DefaultViews...) + views = append(views, rpcmetrics.DefaultViews...) + return views +}() // SinceInMilliseconds returns the duration of time since the provide time as a float64. func SinceInMilliseconds(startTime time.Time) float64 { diff --git a/node/builder.go b/node/builder.go index 0766d934a..b9f2e85bf 100644 --- a/node/builder.go +++ b/node/builder.go @@ -145,7 +145,7 @@ const ( HeadMetricsKey SettlePaymentChannelsKey RunPeerTaggerKey - SetupFallbackBlockstoreKey + SetupFallbackBlockstoresKey SetApiEndpointKey @@ -590,12 +590,15 @@ func Repo(r repo.Repo) Option { Override(new(repo.LockedRepo), modules.LockedRepo(lr)), // module handles closing Override(new(dtypes.MetadataDS), modules.Datastore), - Override(new(dtypes.ChainRawBlockstore), modules.ChainRawBlockstore), - Override(new(dtypes.ChainBlockstore), From(new(dtypes.ChainRawBlockstore))), + Override(new(dtypes.UniversalBlockstore), modules.UniversalBlockstore), + Override(new(dtypes.ChainBlockstore), modules.ChainBlockstore), + Override(new(dtypes.StateBlockstore), modules.StateBlockstore), + Override(new(dtypes.ExposedBlockstore), From(new(dtypes.UniversalBlockstore))), If(os.Getenv("LOTUS_ENABLE_CHAINSTORE_FALLBACK") == "1", Override(new(dtypes.ChainBlockstore), modules.FallbackChainBlockstore), - Override(SetupFallbackBlockstoreKey, modules.SetupFallbackBlockstore), + Override(new(dtypes.StateBlockstore), modules.FallbackStateBlockstore), + Override(SetupFallbackBlockstoresKey, modules.InitFallbackBlockstores), ), Override(new(dtypes.ClientImportMgr), modules.ClientImportMgr), diff --git a/node/impl/full/chain.go b/node/impl/full/chain.go index 46467a358..25d366a87 100644 --- a/node/impl/full/chain.go +++ b/node/impl/full/chain.go @@ -35,6 +35,7 @@ import ( "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/vm" + "github.com/filecoin-project/lotus/node/modules/dtypes" ) var log = logging.Logger("fullnode") @@ -57,6 +58,11 @@ type ChainModule struct { fx.In Chain *store.ChainStore + + // ExposedBlockstore is the global monolith blockstore that is safe to + // expose externally. In the future, this will be segregated into two + // blockstores. + ExposedBlockstore dtypes.ExposedBlockstore } var _ ChainModuleAPI = (*ChainModule)(nil) @@ -68,6 +74,11 @@ type ChainAPI struct { ChainModuleAPI Chain *store.ChainStore + + // ExposedBlockstore is the global monolith blockstore that is safe to + // expose externally. In the future, this will be segregated into two + // blockstores. + ExposedBlockstore dtypes.ExposedBlockstore } func (m *ChainModule) ChainNotify(ctx context.Context) (<-chan []*api.HeadChange, error) { @@ -212,7 +223,7 @@ func (m *ChainModule) ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpo } func (m *ChainModule) ChainReadObj(ctx context.Context, obj cid.Cid) ([]byte, error) { - blk, err := m.Chain.Blockstore().Get(obj) + blk, err := m.ExposedBlockstore.Get(obj) if err != nil { return nil, xerrors.Errorf("blockstore get: %w", err) } @@ -221,15 +232,15 @@ func (m *ChainModule) ChainReadObj(ctx context.Context, obj cid.Cid) ([]byte, er } func (a *ChainAPI) ChainDeleteObj(ctx context.Context, obj cid.Cid) error { - return a.Chain.Blockstore().DeleteBlock(obj) + return a.ExposedBlockstore.DeleteBlock(obj) } func (m *ChainModule) ChainHasObj(ctx context.Context, obj cid.Cid) (bool, error) { - return m.Chain.Blockstore().Has(obj) + return m.ExposedBlockstore.Has(obj) } func (a *ChainAPI) ChainStatObj(ctx context.Context, obj cid.Cid, base cid.Cid) (api.ObjStat, error) { - bs := a.Chain.Blockstore() + bs := a.ExposedBlockstore bsvc := blockservice.New(bs, offline.Exchange(bs)) dag := merkledag.NewDAGService(bsvc) @@ -514,7 +525,7 @@ func (a *ChainAPI) ChainGetNode(ctx context.Context, p string) (*api.IpldObject, return nil, xerrors.Errorf("parsing path: %w", err) } - bs := a.Chain.Blockstore() + bs := a.ExposedBlockstore bsvc := blockservice.New(bs, offline.Exchange(bs)) dag := merkledag.NewDAGService(bsvc) diff --git a/node/impl/full/state.go b/node/impl/full/state.go index 0f5d16ab2..f09f484f7 100644 --- a/node/impl/full/state.go +++ b/node/impl/full/state.go @@ -97,7 +97,7 @@ func (a *StateAPI) StateMinerSectors(ctx context.Context, addr address.Address, return nil, xerrors.Errorf("failed to load miner actor: %w", err) } - mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act) + mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act) if err != nil { return nil, xerrors.Errorf("failed to load miner actor state: %w", err) } @@ -111,7 +111,7 @@ func (a *StateAPI) StateMinerActiveSectors(ctx context.Context, maddr address.Ad return nil, xerrors.Errorf("failed to load miner actor: %w", err) } - mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act) + mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act) if err != nil { return nil, xerrors.Errorf("failed to load miner actor state: %w", err) } @@ -135,7 +135,7 @@ func (m *StateModule) StateMinerInfo(ctx context.Context, actor address.Address, return miner.MinerInfo{}, xerrors.Errorf("failed to load miner actor: %w", err) } - mas, err := miner.Load(m.StateManager.ChainStore().Store(ctx), act) + mas, err := miner.Load(m.StateManager.ChainStore().ActorStore(ctx), act) if err != nil { return miner.MinerInfo{}, xerrors.Errorf("failed to load miner actor state: %w", err) } @@ -153,7 +153,7 @@ func (a *StateAPI) StateMinerDeadlines(ctx context.Context, m address.Address, t return nil, xerrors.Errorf("failed to load miner actor: %w", err) } - mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act) + mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act) if err != nil { return nil, xerrors.Errorf("failed to load miner actor state: %w", err) } @@ -192,7 +192,7 @@ func (a *StateAPI) StateMinerPartitions(ctx context.Context, m address.Address, return nil, xerrors.Errorf("failed to load miner actor: %w", err) } - mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act) + mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act) if err != nil { return nil, xerrors.Errorf("failed to load miner actor state: %w", err) } @@ -253,7 +253,7 @@ func (m *StateModule) StateMinerProvingDeadline(ctx context.Context, addr addres return nil, xerrors.Errorf("failed to load miner actor: %w", err) } - mas, err := miner.Load(m.StateManager.ChainStore().Store(ctx), act) + mas, err := miner.Load(m.StateManager.ChainStore().ActorStore(ctx), act) if err != nil { return nil, xerrors.Errorf("failed to load miner actor state: %w", err) } @@ -272,7 +272,7 @@ func (a *StateAPI) StateMinerFaults(ctx context.Context, addr address.Address, t return bitfield.BitField{}, xerrors.Errorf("failed to load miner actor: %w", err) } - mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act) + mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act) if err != nil { return bitfield.BitField{}, xerrors.Errorf("failed to load miner actor state: %w", err) } @@ -329,7 +329,7 @@ func (a *StateAPI) StateMinerRecoveries(ctx context.Context, addr address.Addres return bitfield.BitField{}, xerrors.Errorf("failed to load miner actor: %w", err) } - mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act) + mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act) if err != nil { return bitfield.BitField{}, xerrors.Errorf("failed to load miner actor state: %w", err) } @@ -461,7 +461,7 @@ func (a *StateAPI) StateReadState(ctx context.Context, actor address.Address, ts return nil, xerrors.Errorf("getting actor: %w", err) } - blk, err := a.Chain.Blockstore().Get(act.Head) + blk, err := a.Chain.StateBlockstore().Get(act.Head) if err != nil { return nil, xerrors.Errorf("getting actor head: %w", err) } @@ -707,7 +707,7 @@ func (m *StateModule) StateMarketStorageDeal(ctx context.Context, dealId abi.Dea } func (a *StateAPI) StateChangedActors(ctx context.Context, old cid.Cid, new cid.Cid) (map[string]types.Actor, error) { - store := a.Chain.Store(ctx) + store := a.Chain.ActorStore(ctx) oldTree, err := state.LoadStateTree(store, old) if err != nil { @@ -727,7 +727,7 @@ func (a *StateAPI) StateMinerSectorCount(ctx context.Context, addr address.Addre if err != nil { return api.MinerSectors{}, err } - mas, err := miner.Load(a.Chain.Store(ctx), act) + mas, err := miner.Load(a.Chain.ActorStore(ctx), act) if err != nil { return api.MinerSectors{}, err } @@ -792,7 +792,7 @@ func (a *StateAPI) StateSectorExpiration(ctx context.Context, maddr address.Addr if err != nil { return nil, err } - mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act) + mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act) if err != nil { return nil, err } @@ -804,7 +804,7 @@ func (a *StateAPI) StateSectorPartition(ctx context.Context, maddr address.Addre if err != nil { return nil, err } - mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act) + mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act) if err != nil { return nil, err } @@ -890,7 +890,7 @@ func (m *StateModule) MsigGetAvailableBalance(ctx context.Context, addr address. if err != nil { return types.EmptyInt, xerrors.Errorf("failed to load multisig actor: %w", err) } - msas, err := multisig.Load(m.Chain.Store(ctx), act) + msas, err := multisig.Load(m.Chain.ActorStore(ctx), act) if err != nil { return types.EmptyInt, xerrors.Errorf("failed to load multisig actor state: %w", err) } @@ -912,7 +912,7 @@ func (a *StateAPI) MsigGetVestingSchedule(ctx context.Context, addr address.Addr return api.EmptyVesting, xerrors.Errorf("failed to load multisig actor: %w", err) } - msas, err := multisig.Load(a.Chain.Store(ctx), act) + msas, err := multisig.Load(a.Chain.ActorStore(ctx), act) if err != nil { return api.EmptyVesting, xerrors.Errorf("failed to load multisig actor state: %w", err) } @@ -961,7 +961,7 @@ func (m *StateModule) MsigGetVested(ctx context.Context, addr address.Address, s return types.EmptyInt, xerrors.Errorf("failed to load multisig actor at end epoch: %w", err) } - msas, err := multisig.Load(m.Chain.Store(ctx), act) + msas, err := multisig.Load(m.Chain.ActorStore(ctx), act) if err != nil { return types.EmptyInt, xerrors.Errorf("failed to load multisig actor state: %w", err) } @@ -989,7 +989,7 @@ func (m *StateModule) MsigGetPending(ctx context.Context, addr address.Address, if err != nil { return nil, xerrors.Errorf("failed to load multisig actor: %w", err) } - msas, err := multisig.Load(m.Chain.Store(ctx), act) + msas, err := multisig.Load(m.Chain.ActorStore(ctx), act) if err != nil { return nil, xerrors.Errorf("failed to load multisig actor state: %w", err) } @@ -1032,7 +1032,7 @@ func (a *StateAPI) StateMinerPreCommitDepositForPower(ctx context.Context, maddr return types.EmptyInt, xerrors.Errorf("failed to get resolve size: %w", err) } - store := a.Chain.Store(ctx) + store := a.Chain.ActorStore(ctx) var sectorWeight abi.StoragePower if act, err := state.GetActor(market.Address); err != nil { @@ -1093,7 +1093,7 @@ func (a *StateAPI) StateMinerInitialPledgeCollateral(ctx context.Context, maddr return types.EmptyInt, xerrors.Errorf("failed to get resolve size: %w", err) } - store := a.Chain.Store(ctx) + store := a.Chain.ActorStore(ctx) var sectorWeight abi.StoragePower if act, err := state.GetActor(market.Address); err != nil { @@ -1164,7 +1164,7 @@ func (a *StateAPI) StateMinerAvailableBalance(ctx context.Context, maddr address return types.EmptyInt, xerrors.Errorf("failed to load miner actor: %w", err) } - mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act) + mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act) if err != nil { return types.EmptyInt, xerrors.Errorf("failed to load miner actor state: %w", err) } @@ -1193,7 +1193,7 @@ func (a *StateAPI) StateMinerSectorAllocated(ctx context.Context, maddr address. return false, xerrors.Errorf("failed to load miner actor: %w", err) } - mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act) + mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act) if err != nil { return false, xerrors.Errorf("failed to load miner actor state: %w", err) } @@ -1216,7 +1216,7 @@ func (a *StateAPI) StateVerifierStatus(ctx context.Context, addr address.Address return nil, err } - vrs, err := verifreg.Load(a.StateManager.ChainStore().Store(ctx), act) + vrs, err := verifreg.Load(a.StateManager.ChainStore().ActorStore(ctx), act) if err != nil { return nil, xerrors.Errorf("failed to load verified registry state: %w", err) } @@ -1247,7 +1247,7 @@ func (m *StateModule) StateVerifiedClientStatus(ctx context.Context, addr addres return nil, err } - vrs, err := verifreg.Load(m.StateManager.ChainStore().Store(ctx), act) + vrs, err := verifreg.Load(m.StateManager.ChainStore().ActorStore(ctx), act) if err != nil { return nil, xerrors.Errorf("failed to load verified registry state: %w", err) } @@ -1269,7 +1269,7 @@ func (a *StateAPI) StateVerifiedRegistryRootKey(ctx context.Context, tsk types.T return address.Undef, err } - vst, err := verifreg.Load(a.StateManager.ChainStore().Store(ctx), vact) + vst, err := verifreg.Load(a.StateManager.ChainStore().ActorStore(ctx), vact) if err != nil { return address.Undef, err } @@ -1298,12 +1298,12 @@ func (m *StateModule) StateDealProviderCollateralBounds(ctx context.Context, siz return api.DealCollateralBounds{}, xerrors.Errorf("failed to load reward actor: %w", err) } - pst, err := power.Load(m.StateManager.ChainStore().Store(ctx), pact) + pst, err := power.Load(m.StateManager.ChainStore().ActorStore(ctx), pact) if err != nil { return api.DealCollateralBounds{}, xerrors.Errorf("failed to load power actor state: %w", err) } - rst, err := reward.Load(m.StateManager.ChainStore().Store(ctx), ract) + rst, err := reward.Load(m.StateManager.ChainStore().ActorStore(ctx), ract) if err != nil { return api.DealCollateralBounds{}, xerrors.Errorf("failed to load reward actor state: %w", err) } diff --git a/node/modules/blockstore.go b/node/modules/blockstore.go new file mode 100644 index 000000000..5b1d2ee63 --- /dev/null +++ b/node/modules/blockstore.go @@ -0,0 +1,65 @@ +package modules + +import ( + "context" + "io" + + bstore "github.com/ipfs/go-ipfs-blockstore" + "go.uber.org/fx" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/blockstore" + "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/node/modules/helpers" + "github.com/filecoin-project/lotus/node/repo" +) + +// UniversalBlockstore returns a single universal blockstore that stores both +// chain data and state data. +func UniversalBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.UniversalBlockstore, error) { + bs, err := r.Blockstore(helpers.LifecycleCtx(mctx, lc), repo.UniversalBlockstore) + if err != nil { + return nil, err + } + if c, ok := bs.(io.Closer); ok { + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { + return c.Close() + }, + }) + } + return bs, err +} + +// StateBlockstore is a hook to overlay caches for state objects, or in the +// future, to segregate the universal blockstore into different physical state +// and chain stores. +func StateBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.UniversalBlockstore) (dtypes.StateBlockstore, error) { + return bs, nil +} + +// ChainBlockstore is a hook to overlay caches for state objects, or in the +// future, to segregate the universal blockstore into different physical state +// and chain stores. +func ChainBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.UniversalBlockstore) (dtypes.ChainBlockstore, error) { + return bs, nil +} + +func FallbackChainBlockstore(cbs dtypes.ChainBlockstore) dtypes.ChainBlockstore { + return &blockstore.FallbackStore{Blockstore: cbs} +} + +func FallbackStateBlockstore(sbs dtypes.StateBlockstore) dtypes.StateBlockstore { + return &blockstore.FallbackStore{Blockstore: sbs} +} + +func InitFallbackBlockstores(cbs dtypes.ChainBlockstore, sbs dtypes.StateBlockstore, rem dtypes.ChainBitswap) error { + for _, bs := range []bstore.Blockstore{cbs, sbs} { + if fbs, ok := bs.(*blockstore.FallbackStore); ok { + fbs.SetFallback(rem.GetBlock) + continue + } + return xerrors.Errorf("expected a FallbackStore") + } + return nil +} diff --git a/node/modules/chain.go b/node/modules/chain.go index fcb5bea21..029064b97 100644 --- a/node/modules/chain.go +++ b/node/modules/chain.go @@ -1,25 +1,18 @@ package modules import ( - "bytes" "context" - "os" "time" "github.com/ipfs/go-bitswap" "github.com/ipfs/go-bitswap/network" "github.com/ipfs/go-blockservice" - "github.com/ipfs/go-datastore" - "github.com/ipld/go-car" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/routing" pubsub "github.com/libp2p/go-libp2p-pubsub" "go.uber.org/fx" "golang.org/x/xerrors" - "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" - "github.com/filecoin-project/lotus/journal" - "github.com/filecoin-project/lotus/blockstore" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain" @@ -29,14 +22,15 @@ import ( "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/store" - "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/vm" + "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" + "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" - "github.com/filecoin-project/lotus/node/repo" ) -func ChainBitswap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.Routing, bs dtypes.ChainBlockstore) dtypes.ChainBitswap { +// ChainBitswap uses a blockstore that bypasses all caches. +func ChainBitswap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.Routing, bs dtypes.ExposedBlockstore) dtypes.ChainBitswap { // prefix protocol for chain bitswap // (so bitswap uses /chain/ipfs/bitswap/1.0.0 internally for chain sync stuff) bitswapNetwork := network.NewFromIpfsHost(host, rt, network.Prefix("/chain")) @@ -60,6 +54,10 @@ func ChainBitswap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt r return exch } +func ChainBlockService(bs dtypes.ExposedBlockstore, rem dtypes.ChainBitswap) dtypes.ChainBlockService { + return blockservice.New(bs, rem) +} + func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS, nn dtypes.NetworkName, j journal.Journal) (*messagepool.MessagePool, error) { mpp := messagepool.NewProvider(sm, ps) mp, err := messagepool.New(mpp, ds, nn, j) @@ -74,43 +72,8 @@ func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds return mp, nil } -func ChainRawBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.ChainRawBlockstore, error) { - bs, err := r.Blockstore(helpers.LifecycleCtx(mctx, lc), repo.BlockstoreChain) - if err != nil { - return nil, err - } - - // TODO potentially replace this cached blockstore by a CBOR cache. - cbs, err := blockstore.CachedBlockstore(helpers.LifecycleCtx(mctx, lc), bs, blockstore.DefaultCacheOpts()) - if err != nil { - return nil, err - } - - return cbs, nil -} - -func ChainBlockService(bs dtypes.ChainRawBlockstore, rem dtypes.ChainBitswap) dtypes.ChainBlockService { - return blockservice.New(bs, rem) -} - -func FallbackChainBlockstore(rbs dtypes.ChainRawBlockstore) dtypes.ChainBlockstore { - return &blockstore.FallbackStore{ - Blockstore: rbs, - } -} - -func SetupFallbackBlockstore(cbs dtypes.ChainBlockstore, rem dtypes.ChainBitswap) error { - fbs, ok := cbs.(*blockstore.FallbackStore) - if !ok { - return xerrors.Errorf("expected a FallbackStore") - } - - fbs.SetFallback(rem.GetBlock) - return nil -} - -func ChainStore(lc fx.Lifecycle, bs dtypes.ChainBlockstore, lbs dtypes.ChainRawBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore { - chain := store.NewChainStore(bs, lbs, ds, syscalls, j) +func ChainStore(lc fx.Lifecycle, cbs dtypes.ChainBlockstore, sbs dtypes.StateBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore { + chain := store.NewChainStore(cbs, sbs, ds, syscalls, j) if err := chain.Load(); err != nil { log.Warnf("loading chain state from disk: %s", err) @@ -125,65 +88,6 @@ func ChainStore(lc fx.Lifecycle, bs dtypes.ChainBlockstore, lbs dtypes.ChainRawB return chain } -func ErrorGenesis() Genesis { - return func() (header *types.BlockHeader, e error) { - return nil, xerrors.New("No genesis block provided, provide the file with 'lotus daemon --genesis=[genesis file]'") - } -} - -func LoadGenesis(genBytes []byte) func(dtypes.ChainBlockstore) Genesis { - return func(bs dtypes.ChainBlockstore) Genesis { - return func() (header *types.BlockHeader, e error) { - c, err := car.LoadCar(bs, bytes.NewReader(genBytes)) - if err != nil { - return nil, xerrors.Errorf("loading genesis car file failed: %w", err) - } - if len(c.Roots) != 1 { - return nil, xerrors.New("expected genesis file to have one root") - } - root, err := bs.Get(c.Roots[0]) - if err != nil { - return nil, err - } - - h, err := types.DecodeBlock(root.RawData()) - if err != nil { - return nil, xerrors.Errorf("decoding block failed: %w", err) - } - return h, nil - } - } -} - -func DoSetGenesis(_ dtypes.AfterGenesisSet) {} - -func SetGenesis(cs *store.ChainStore, g Genesis) (dtypes.AfterGenesisSet, error) { - genFromRepo, err := cs.GetGenesis() - if err == nil { - if os.Getenv("LOTUS_SKIP_GENESIS_CHECK") != "_yes_" { - expectedGenesis, err := g() - if err != nil { - return dtypes.AfterGenesisSet{}, xerrors.Errorf("getting expected genesis failed: %w", err) - } - - if genFromRepo.Cid() != expectedGenesis.Cid() { - return dtypes.AfterGenesisSet{}, xerrors.Errorf("genesis in the repo is not the one expected by this version of Lotus!") - } - } - return dtypes.AfterGenesisSet{}, nil // already set, noop - } - if err != datastore.ErrNotFound { - return dtypes.AfterGenesisSet{}, xerrors.Errorf("getting genesis block failed: %w", err) - } - - genesis, err := g() - if err != nil { - return dtypes.AfterGenesisSet{}, xerrors.Errorf("genesis func failed: %w", err) - } - - return dtypes.AfterGenesisSet{}, cs.SetGenesis(genesis) -} - func NetworkName(mctx helpers.MetricsCtx, lc fx.Lifecycle, cs *store.ChainStore, us stmgr.UpgradeSchedule, _ dtypes.AfterGenesisSet) (dtypes.NetworkName, error) { if !build.Devnet { return "testnetnet", nil diff --git a/node/modules/client.go b/node/modules/client.go index ede36b4c9..da6a4cd83 100644 --- a/node/modules/client.go +++ b/node/modules/client.go @@ -83,7 +83,7 @@ func ClientMultiDatastore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.Locke ctx := helpers.LifecycleCtx(mctx, lc) ds, err := r.Datastore(ctx, "/client") if err != nil { - return nil, xerrors.Errorf("getting datastore out of reop: %w", err) + return nil, xerrors.Errorf("getting datastore out of repo: %w", err) } mds, err := multistore.NewMultiDstore(ds) diff --git a/node/modules/dtypes/storage.go b/node/modules/dtypes/storage.go index 87366647f..c6963e1e2 100644 --- a/node/modules/dtypes/storage.go +++ b/node/modules/dtypes/storage.go @@ -19,19 +19,41 @@ import ( "github.com/filecoin-project/lotus/node/repo/retrievalstoremgr" ) -// MetadataDS stores metadata -// dy default it's namespaced under /metadata in main repo datastore +// MetadataDS stores metadata. By default it's namespaced under /metadata in +// main repo datastore. type MetadataDS datastore.Batching -type ChainRawBlockstore blockstore.Blockstore -type ChainBlockstore blockstore.Blockstore // optionally bitswap backed +type ( + // UniversalBlockstore is the cold blockstore. + UniversalBlockstore blockstore.Blockstore + + // ChainBlockstore is a blockstore to store chain data (tipsets, blocks, + // messages). It is physically backed by the BareMonolithBlockstore, but it + // has a cache on top that is specially tuned for chain data access + // patterns. + ChainBlockstore blockstore.Blockstore + + // StateBlockstore is a blockstore to store state data (state tree). It is + // physically backed by the BareMonolithBlockstore, but it has a cache on + // top that is specially tuned for state data access patterns. + StateBlockstore blockstore.Blockstore + + // ExposedBlockstore is a blockstore that interfaces directly with the + // network or with users, from which queries are served, and where incoming + // data is deposited. For security reasons, this store is disconnected from + // any internal caches. If blocks are added to this store in a way that + // could render caches dirty (e.g. a block is added when an existence cache + // holds a 'false' for that block), the process should signal so by calling + // blockstore.AllCaches.Dirty(cid). + ExposedBlockstore blockstore.Blockstore +) type ChainBitswap exchange.Interface type ChainBlockService bserv.BlockService type ClientMultiDstore *multistore.MultiStore type ClientImportMgr *importmgr.Mgr -type ClientBlockstore blockstore.Blockstore +type ClientBlockstore blockstore.BasicBlockstore type ClientDealStore *statestore.StateStore type ClientRequestValidator *requestvalidation.UnifiedRequestValidator type ClientDatastore datastore.Batching @@ -50,6 +72,6 @@ type ProviderRequestValidator *requestvalidation.UnifiedRequestValidator type ProviderDataTransfer datatransfer.Manager type StagingDAG format.DAGService -type StagingBlockstore blockstore.Blockstore +type StagingBlockstore blockstore.BasicBlockstore type StagingGraphsync graphsync.GraphExchange type StagingMultiDstore *multistore.MultiStore diff --git a/node/modules/genesis.go b/node/modules/genesis.go new file mode 100644 index 000000000..43443b125 --- /dev/null +++ b/node/modules/genesis.go @@ -0,0 +1,73 @@ +package modules + +import ( + "bytes" + "os" + + "github.com/ipfs/go-datastore" + "github.com/ipld/go-car" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/node/modules/dtypes" +) + +func ErrorGenesis() Genesis { + return func() (header *types.BlockHeader, e error) { + return nil, xerrors.New("No genesis block provided, provide the file with 'lotus daemon --genesis=[genesis file]'") + } +} + +func LoadGenesis(genBytes []byte) func(dtypes.ChainBlockstore) Genesis { + return func(bs dtypes.ChainBlockstore) Genesis { + return func() (header *types.BlockHeader, e error) { + c, err := car.LoadCar(bs, bytes.NewReader(genBytes)) + if err != nil { + return nil, xerrors.Errorf("loading genesis car file failed: %w", err) + } + if len(c.Roots) != 1 { + return nil, xerrors.New("expected genesis file to have one root") + } + root, err := bs.Get(c.Roots[0]) + if err != nil { + return nil, err + } + + h, err := types.DecodeBlock(root.RawData()) + if err != nil { + return nil, xerrors.Errorf("decoding block failed: %w", err) + } + return h, nil + } + } +} + +func DoSetGenesis(_ dtypes.AfterGenesisSet) {} + +func SetGenesis(cs *store.ChainStore, g Genesis) (dtypes.AfterGenesisSet, error) { + genFromRepo, err := cs.GetGenesis() + if err == nil { + if os.Getenv("LOTUS_SKIP_GENESIS_CHECK") != "_yes_" { + expectedGenesis, err := g() + if err != nil { + return dtypes.AfterGenesisSet{}, xerrors.Errorf("getting expected genesis failed: %w", err) + } + + if genFromRepo.Cid() != expectedGenesis.Cid() { + return dtypes.AfterGenesisSet{}, xerrors.Errorf("genesis in the repo is not the one expected by this version of Lotus!") + } + } + return dtypes.AfterGenesisSet{}, nil // already set, noop + } + if err != datastore.ErrNotFound { + return dtypes.AfterGenesisSet{}, xerrors.Errorf("getting genesis block failed: %w", err) + } + + genesis, err := g() + if err != nil { + return dtypes.AfterGenesisSet{}, xerrors.Errorf("genesis func failed: %w", err) + } + + return dtypes.AfterGenesisSet{}, cs.SetGenesis(genesis) +} diff --git a/node/modules/graphsync.go b/node/modules/graphsync.go index bbb039957..a7f62db76 100644 --- a/node/modules/graphsync.go +++ b/node/modules/graphsync.go @@ -14,8 +14,8 @@ import ( ) // Graphsync creates a graphsync instance from the given loader and storer -func Graphsync(parallelTransfers uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ChainBlockstore, h host.Host) (dtypes.Graphsync, error) { - return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ChainBlockstore, h host.Host) (dtypes.Graphsync, error) { +func Graphsync(parallelTransfers uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ExposedBlockstore, h host.Host) (dtypes.Graphsync, error) { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ExposedBlockstore, h host.Host) (dtypes.Graphsync, error) { graphsyncNetwork := gsnet.NewFromLibp2pHost(h) loader := storeutil.LoaderForBlockstore(clientBs) storer := storeutil.StorerForBlockstore(clientBs) diff --git a/node/modules/ipfsclient.go b/node/modules/ipfsclient.go index 99fcc4180..24c5c9678 100644 --- a/node/modules/ipfsclient.go +++ b/node/modules/ipfsclient.go @@ -18,7 +18,7 @@ import ( func IpfsClientBlockstore(ipfsMaddr string, onlineMode bool) func(helpers.MetricsCtx, fx.Lifecycle, dtypes.ClientImportMgr) (dtypes.ClientBlockstore, error) { return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, localStore dtypes.ClientImportMgr) (dtypes.ClientBlockstore, error) { var err error - var ipfsbs blockstore.Blockstore + var ipfsbs blockstore.BasicBlockstore if ipfsMaddr != "" { var ma multiaddr.Multiaddr ma, err = multiaddr.NewMultiaddr(ipfsMaddr) diff --git a/node/repo/blockstore_opts.go b/node/repo/blockstore_opts.go index 5c2c4b367..1705217d3 100644 --- a/node/repo/blockstore_opts.go +++ b/node/repo/blockstore_opts.go @@ -5,10 +5,6 @@ import badgerbs "github.com/filecoin-project/lotus/blockstore/badger" // BadgerBlockstoreOptions returns the badger options to apply for the provided // domain. func BadgerBlockstoreOptions(domain BlockstoreDomain, path string, readonly bool) (badgerbs.Options, error) { - if domain != BlockstoreChain { - return badgerbs.Options{}, ErrInvalidBlockstoreDomain - } - opts := badgerbs.DefaultOptions(path) // Due to legacy usage of blockstore.Blockstore, over a datastore, all diff --git a/node/repo/fsrepo.go b/node/repo/fsrepo.go index 1aeaf9aa0..d96a5e645 100644 --- a/node/repo/fsrepo.go +++ b/node/repo/fsrepo.go @@ -13,7 +13,7 @@ import ( "sync" "github.com/BurntSushi/toml" - "github.com/filecoin-project/lotus/blockstore" + "github.com/ipfs/go-datastore" fslock "github.com/ipfs/go-fs-lock" logging "github.com/ipfs/go-log/v2" @@ -22,7 +22,7 @@ import ( "github.com/multiformats/go-multiaddr" "golang.org/x/xerrors" - lblockstore "github.com/filecoin-project/lotus/blockstore" + "github.com/filecoin-project/lotus/blockstore" badgerbs "github.com/filecoin-project/lotus/blockstore/badger" "github.com/filecoin-project/lotus/extern/sector-storage/fsutil" "github.com/filecoin-project/lotus/extern/sector-storage/stores" @@ -264,11 +264,18 @@ type fsLockedRepo struct { bs blockstore.Blockstore bsErr error bsOnce sync.Once + ssPath string + ssErr error + ssOnce sync.Once storageLk sync.Mutex configLk sync.Mutex } +func (fsr *fsLockedRepo) Readonly() bool { + return fsr.readonly +} + func (fsr *fsLockedRepo) Path() string { return fsr.path } @@ -301,7 +308,7 @@ func (fsr *fsLockedRepo) Close() error { // Blockstore returns a blockstore for the provided data domain. func (fsr *fsLockedRepo) Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, error) { - if domain != BlockstoreChain { + if domain != UniversalBlockstore { return nil, ErrInvalidBlockstoreDomain } @@ -325,12 +332,27 @@ func (fsr *fsLockedRepo) Blockstore(ctx context.Context, domain BlockstoreDomain fsr.bsErr = err return } - fsr.bs = lblockstore.WrapIDStore(bs) + fsr.bs = blockstore.WrapIDStore(bs) }) return fsr.bs, fsr.bsErr } +func (fsr *fsLockedRepo) SplitstorePath() (string, error) { + fsr.ssOnce.Do(func() { + path := fsr.join(filepath.Join(fsDatastore, "splitstore")) + + if err := os.MkdirAll(path, 0755); err != nil { + fsr.ssErr = err + return + } + + fsr.ssPath = path + }) + + return fsr.ssPath, fsr.ssErr +} + // join joins path elements with fsr.path func (fsr *fsLockedRepo) join(paths ...string) string { return filepath.Join(append([]string{fsr.path}, paths...)...) diff --git a/node/repo/importmgr/mgr.go b/node/repo/importmgr/mgr.go index 0108c8224..936d9b606 100644 --- a/node/repo/importmgr/mgr.go +++ b/node/repo/importmgr/mgr.go @@ -16,7 +16,7 @@ type Mgr struct { mds *multistore.MultiStore ds datastore.Batching - Blockstore blockstore.Blockstore + Blockstore blockstore.BasicBlockstore } type Label string diff --git a/node/repo/interface.go b/node/repo/interface.go index 1dabc0bda..33979c8de 100644 --- a/node/repo/interface.go +++ b/node/repo/interface.go @@ -4,10 +4,10 @@ import ( "context" "errors" - "github.com/filecoin-project/lotus/blockstore" "github.com/ipfs/go-datastore" "github.com/multiformats/go-multiaddr" + "github.com/filecoin-project/lotus/blockstore" "github.com/filecoin-project/lotus/extern/sector-storage/fsutil" "github.com/filecoin-project/lotus/extern/sector-storage/stores" @@ -18,11 +18,11 @@ import ( type BlockstoreDomain string const ( - // BlockstoreChain represents the blockstore domain for chain data. + // UniversalBlockstore represents the blockstore domain for all data. // Right now, this includes chain objects (tipsets, blocks, messages), as // well as state. In the future, they may get segregated into different // domains. - BlockstoreChain = BlockstoreDomain("chain") + UniversalBlockstore = BlockstoreDomain("universal") ) var ( @@ -63,6 +63,9 @@ type LockedRepo interface { // the lifecycle. Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, error) + // SplitstorePath returns the path for the SplitStore + SplitstorePath() (string, error) + // Returns config in this repo Config() (interface{}, error) SetConfig(func(interface{})) error @@ -84,4 +87,7 @@ type LockedRepo interface { // Path returns absolute path of the repo Path() string + + // Readonly returns true if the repo is readonly + Readonly() bool } diff --git a/node/repo/memrepo.go b/node/repo/memrepo.go index bcbc239c0..00ea32b88 100644 --- a/node/repo/memrepo.go +++ b/node/repo/memrepo.go @@ -201,6 +201,10 @@ func (mem *MemRepo) Lock(t RepoType) (LockedRepo, error) { }, nil } +func (lmem *lockedMemRepo) Readonly() bool { + return false +} + func (lmem *lockedMemRepo) checkToken() error { lmem.RLock() defer lmem.RUnlock() @@ -246,12 +250,16 @@ func (lmem *lockedMemRepo) Datastore(_ context.Context, ns string) (datastore.Ba } func (lmem *lockedMemRepo) Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, error) { - if domain != BlockstoreChain { + if domain != UniversalBlockstore { return nil, ErrInvalidBlockstoreDomain } return lmem.mem.blockstore, nil } +func (lmem *lockedMemRepo) SplitstorePath() (string, error) { + return ioutil.TempDir("", "splitstore.*") +} + func (lmem *lockedMemRepo) ListDatastores(ns string) ([]int64, error) { return nil, nil } diff --git a/node/repo/retrievalstoremgr/retrievalstoremgr.go b/node/repo/retrievalstoremgr/retrievalstoremgr.go index 0f6c98e6b..ba86ccee5 100644 --- a/node/repo/retrievalstoremgr/retrievalstoremgr.go +++ b/node/repo/retrievalstoremgr/retrievalstoremgr.go @@ -73,13 +73,13 @@ func (mrs *multiStoreRetrievalStore) DAGService() ipldformat.DAGService { // BlockstoreRetrievalStoreManager manages a single blockstore as if it were multiple stores type BlockstoreRetrievalStoreManager struct { - bs blockstore.Blockstore + bs blockstore.BasicBlockstore } var _ RetrievalStoreManager = &BlockstoreRetrievalStoreManager{} // NewBlockstoreRetrievalStoreManager returns a new blockstore based RetrievalStoreManager -func NewBlockstoreRetrievalStoreManager(bs blockstore.Blockstore) RetrievalStoreManager { +func NewBlockstoreRetrievalStoreManager(bs blockstore.BasicBlockstore) RetrievalStoreManager { return &BlockstoreRetrievalStoreManager{ bs: bs, }