Merge pull request #5695 from filecoin-project/feat/segregate-blockstores
segregate chain and state blockstores
This commit is contained in:
commit
448813d2fe
@ -110,10 +110,7 @@ func Open(opts Options) (*Blockstore, error) {
|
||||
return nil, fmt.Errorf("failed to open badger blockstore: %w", err)
|
||||
}
|
||||
|
||||
bs := &Blockstore{
|
||||
DB: db,
|
||||
}
|
||||
|
||||
bs := &Blockstore{DB: db}
|
||||
if p := opts.Prefix; p != "" {
|
||||
bs.prefixing = true
|
||||
bs.prefix = []byte(p)
|
||||
|
@ -61,8 +61,8 @@ func TestStorageKey(t *testing.T) {
|
||||
require.Equal(t, k3, k2)
|
||||
}
|
||||
|
||||
func newBlockstore(optsSupplier func(path string) Options) func(tb testing.TB) (bs blockstore.Blockstore, path string) {
|
||||
return func(tb testing.TB) (bs blockstore.Blockstore, path string) {
|
||||
func newBlockstore(optsSupplier func(path string) Options) func(tb testing.TB) (bs blockstore.BasicBlockstore, path string) {
|
||||
return func(tb testing.TB) (bs blockstore.BasicBlockstore, path string) {
|
||||
tb.Helper()
|
||||
|
||||
path, err := ioutil.TempDir("", "")
|
||||
@ -83,8 +83,8 @@ func newBlockstore(optsSupplier func(path string) Options) func(tb testing.TB) (
|
||||
}
|
||||
}
|
||||
|
||||
func openBlockstore(optsSupplier func(path string) Options) func(tb testing.TB, path string) (bs blockstore.Blockstore, err error) {
|
||||
return func(tb testing.TB, path string) (bs blockstore.Blockstore, err error) {
|
||||
func openBlockstore(optsSupplier func(path string) Options) func(tb testing.TB, path string) (bs blockstore.BasicBlockstore, err error) {
|
||||
return func(tb testing.TB, path string) (bs blockstore.BasicBlockstore, err error) {
|
||||
tb.Helper()
|
||||
return Open(optsSupplier(path))
|
||||
}
|
||||
|
@ -8,18 +8,19 @@ import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/filecoin-project/lotus/blockstore"
|
||||
blocks "github.com/ipfs/go-block-format"
|
||||
"github.com/ipfs/go-cid"
|
||||
u "github.com/ipfs/go-ipfs-util"
|
||||
|
||||
"github.com/filecoin-project/lotus/blockstore"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TODO: move this to go-ipfs-blockstore.
|
||||
type Suite struct {
|
||||
NewBlockstore func(tb testing.TB) (bs blockstore.Blockstore, path string)
|
||||
OpenBlockstore func(tb testing.TB, path string) (bs blockstore.Blockstore, err error)
|
||||
NewBlockstore func(tb testing.TB) (bs blockstore.BasicBlockstore, path string)
|
||||
OpenBlockstore func(tb testing.TB, path string) (bs blockstore.BasicBlockstore, err error)
|
||||
}
|
||||
|
||||
func (s *Suite) RunTests(t *testing.T, prefix string) {
|
||||
@ -290,7 +291,7 @@ func (s *Suite) TestDelete(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
func insertBlocks(t *testing.T, bs blockstore.Blockstore, count int) []cid.Cid {
|
||||
func insertBlocks(t *testing.T, bs blockstore.BasicBlockstore, count int) []cid.Cid {
|
||||
keys := make([]cid.Cid, count)
|
||||
for i := 0; i < count; i++ {
|
||||
block := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i)))
|
||||
|
@ -11,6 +11,19 @@ import (
|
||||
"github.com/ipfs/go-cid"
|
||||
)
|
||||
|
||||
// UnwrapFallbackStore takes a blockstore, and returns the underlying blockstore
|
||||
// if it was a FallbackStore. Otherwise, it just returns the supplied store
|
||||
// unmodified.
|
||||
func UnwrapFallbackStore(bs Blockstore) (Blockstore, bool) {
|
||||
if fbs, ok := bs.(*FallbackStore); ok {
|
||||
return fbs.Blockstore, true
|
||||
}
|
||||
return bs, false
|
||||
}
|
||||
|
||||
// FallbackStore is a read-through store that queries another (potentially
|
||||
// remote) source if the block is not found locally. If the block is found
|
||||
// during the fallback, it stores it in the local store.
|
||||
type FallbackStore struct {
|
||||
Blockstore
|
||||
|
||||
@ -30,7 +43,7 @@ func (fbs *FallbackStore) SetFallback(missFn func(context.Context, cid.Cid) (blo
|
||||
}
|
||||
|
||||
func (fbs *FallbackStore) getFallback(c cid.Cid) (blocks.Block, error) {
|
||||
log.Errorw("fallbackstore: Block not found locally, fetching from the network", "cid", c)
|
||||
log.Warnf("fallbackstore: block not found locally, fetching from the network; cid: %s", c)
|
||||
fbs.lk.RLock()
|
||||
defer fbs.lk.RUnlock()
|
||||
|
||||
|
154
blockstore/metrics.go
Normal file
154
blockstore/metrics.go
Normal file
@ -0,0 +1,154 @@
|
||||
package blockstore
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"go.opencensus.io/stats"
|
||||
"go.opencensus.io/stats/view"
|
||||
"go.opencensus.io/tag"
|
||||
)
|
||||
|
||||
//
|
||||
// Currently unused, but kept in repo in case we introduce one of the candidate
|
||||
// cache implementations (Freecache, Ristretto), both of which report these
|
||||
// metrics.
|
||||
//
|
||||
|
||||
// CacheMetricsEmitInterval is the interval at which metrics are emitted onto
|
||||
// OpenCensus.
|
||||
var CacheMetricsEmitInterval = 5 * time.Second
|
||||
|
||||
var (
|
||||
CacheName, _ = tag.NewKey("cache_name")
|
||||
)
|
||||
|
||||
// CacheMeasures groups all metrics emitted by the blockstore caches.
|
||||
var CacheMeasures = struct {
|
||||
HitRatio *stats.Float64Measure
|
||||
Hits *stats.Int64Measure
|
||||
Misses *stats.Int64Measure
|
||||
Entries *stats.Int64Measure
|
||||
QueriesServed *stats.Int64Measure
|
||||
Adds *stats.Int64Measure
|
||||
Updates *stats.Int64Measure
|
||||
Evictions *stats.Int64Measure
|
||||
CostAdded *stats.Int64Measure
|
||||
CostEvicted *stats.Int64Measure
|
||||
SetsDropped *stats.Int64Measure
|
||||
SetsRejected *stats.Int64Measure
|
||||
QueriesDropped *stats.Int64Measure
|
||||
}{
|
||||
HitRatio: stats.Float64("blockstore/cache/hit_ratio", "Hit ratio of blockstore cache", stats.UnitDimensionless),
|
||||
Hits: stats.Int64("blockstore/cache/hits", "Total number of hits at blockstore cache", stats.UnitDimensionless),
|
||||
Misses: stats.Int64("blockstore/cache/misses", "Total number of misses at blockstore cache", stats.UnitDimensionless),
|
||||
Entries: stats.Int64("blockstore/cache/entry_count", "Total number of entries currently in the blockstore cache", stats.UnitDimensionless),
|
||||
QueriesServed: stats.Int64("blockstore/cache/queries_served", "Total number of queries served by the blockstore cache", stats.UnitDimensionless),
|
||||
Adds: stats.Int64("blockstore/cache/adds", "Total number of adds to blockstore cache", stats.UnitDimensionless),
|
||||
Updates: stats.Int64("blockstore/cache/updates", "Total number of updates in blockstore cache", stats.UnitDimensionless),
|
||||
Evictions: stats.Int64("blockstore/cache/evictions", "Total number of evictions from blockstore cache", stats.UnitDimensionless),
|
||||
CostAdded: stats.Int64("blockstore/cache/cost_added", "Total cost (byte size) of entries added into blockstore cache", stats.UnitBytes),
|
||||
CostEvicted: stats.Int64("blockstore/cache/cost_evicted", "Total cost (byte size) of entries evicted by blockstore cache", stats.UnitBytes),
|
||||
SetsDropped: stats.Int64("blockstore/cache/sets_dropped", "Total number of sets dropped by blockstore cache", stats.UnitDimensionless),
|
||||
SetsRejected: stats.Int64("blockstore/cache/sets_rejected", "Total number of sets rejected by blockstore cache", stats.UnitDimensionless),
|
||||
QueriesDropped: stats.Int64("blockstore/cache/queries_dropped", "Total number of queries dropped by blockstore cache", stats.UnitDimensionless),
|
||||
}
|
||||
|
||||
// CacheViews groups all cache-related default views.
|
||||
var CacheViews = struct {
|
||||
HitRatio *view.View
|
||||
Hits *view.View
|
||||
Misses *view.View
|
||||
Entries *view.View
|
||||
QueriesServed *view.View
|
||||
Adds *view.View
|
||||
Updates *view.View
|
||||
Evictions *view.View
|
||||
CostAdded *view.View
|
||||
CostEvicted *view.View
|
||||
SetsDropped *view.View
|
||||
SetsRejected *view.View
|
||||
QueriesDropped *view.View
|
||||
}{
|
||||
HitRatio: &view.View{
|
||||
Measure: CacheMeasures.HitRatio,
|
||||
Aggregation: view.LastValue(),
|
||||
TagKeys: []tag.Key{CacheName},
|
||||
},
|
||||
Hits: &view.View{
|
||||
Measure: CacheMeasures.Hits,
|
||||
Aggregation: view.LastValue(),
|
||||
TagKeys: []tag.Key{CacheName},
|
||||
},
|
||||
Misses: &view.View{
|
||||
Measure: CacheMeasures.Misses,
|
||||
Aggregation: view.LastValue(),
|
||||
TagKeys: []tag.Key{CacheName},
|
||||
},
|
||||
Entries: &view.View{
|
||||
Measure: CacheMeasures.Entries,
|
||||
Aggregation: view.LastValue(),
|
||||
TagKeys: []tag.Key{CacheName},
|
||||
},
|
||||
QueriesServed: &view.View{
|
||||
Measure: CacheMeasures.QueriesServed,
|
||||
Aggregation: view.LastValue(),
|
||||
TagKeys: []tag.Key{CacheName},
|
||||
},
|
||||
Adds: &view.View{
|
||||
Measure: CacheMeasures.Adds,
|
||||
Aggregation: view.LastValue(),
|
||||
TagKeys: []tag.Key{CacheName},
|
||||
},
|
||||
Updates: &view.View{
|
||||
Measure: CacheMeasures.Updates,
|
||||
Aggregation: view.LastValue(),
|
||||
TagKeys: []tag.Key{CacheName},
|
||||
},
|
||||
Evictions: &view.View{
|
||||
Measure: CacheMeasures.Evictions,
|
||||
Aggregation: view.LastValue(),
|
||||
TagKeys: []tag.Key{CacheName},
|
||||
},
|
||||
CostAdded: &view.View{
|
||||
Measure: CacheMeasures.CostAdded,
|
||||
Aggregation: view.LastValue(),
|
||||
TagKeys: []tag.Key{CacheName},
|
||||
},
|
||||
CostEvicted: &view.View{
|
||||
Measure: CacheMeasures.CostEvicted,
|
||||
Aggregation: view.LastValue(),
|
||||
TagKeys: []tag.Key{CacheName},
|
||||
},
|
||||
SetsDropped: &view.View{
|
||||
Measure: CacheMeasures.SetsDropped,
|
||||
Aggregation: view.LastValue(),
|
||||
TagKeys: []tag.Key{CacheName},
|
||||
},
|
||||
SetsRejected: &view.View{
|
||||
Measure: CacheMeasures.SetsRejected,
|
||||
Aggregation: view.LastValue(),
|
||||
TagKeys: []tag.Key{CacheName},
|
||||
},
|
||||
QueriesDropped: &view.View{
|
||||
Measure: CacheMeasures.QueriesDropped,
|
||||
Aggregation: view.LastValue(),
|
||||
TagKeys: []tag.Key{CacheName},
|
||||
},
|
||||
}
|
||||
|
||||
// DefaultViews exports all default views for this package.
|
||||
var DefaultViews = []*view.View{
|
||||
CacheViews.HitRatio,
|
||||
CacheViews.Hits,
|
||||
CacheViews.Misses,
|
||||
CacheViews.Entries,
|
||||
CacheViews.QueriesServed,
|
||||
CacheViews.Adds,
|
||||
CacheViews.Updates,
|
||||
CacheViews.Evictions,
|
||||
CacheViews.CostAdded,
|
||||
CacheViews.CostEvicted,
|
||||
CacheViews.SetsDropped,
|
||||
CacheViews.SetsRejected,
|
||||
CacheViews.QueriesDropped,
|
||||
}
|
110
blockstore/union.go
Normal file
110
blockstore/union.go
Normal file
@ -0,0 +1,110 @@
|
||||
package blockstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
blocks "github.com/ipfs/go-block-format"
|
||||
"github.com/ipfs/go-cid"
|
||||
)
|
||||
|
||||
type unionBlockstore []Blockstore
|
||||
|
||||
// Union returns an unioned blockstore.
|
||||
//
|
||||
// * Reads return from the first blockstore that has the value, querying in the
|
||||
// supplied order.
|
||||
// * Writes (puts and deltes) are broadcast to all stores.
|
||||
//
|
||||
func Union(stores ...Blockstore) Blockstore {
|
||||
return unionBlockstore(stores)
|
||||
}
|
||||
|
||||
func (m unionBlockstore) Has(cid cid.Cid) (has bool, err error) {
|
||||
for _, bs := range m {
|
||||
if has, err = bs.Has(cid); has || err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
return has, err
|
||||
}
|
||||
|
||||
func (m unionBlockstore) Get(cid cid.Cid) (blk blocks.Block, err error) {
|
||||
for _, bs := range m {
|
||||
if blk, err = bs.Get(cid); err == nil || err != ErrNotFound {
|
||||
break
|
||||
}
|
||||
}
|
||||
return blk, err
|
||||
}
|
||||
|
||||
func (m unionBlockstore) View(cid cid.Cid, callback func([]byte) error) (err error) {
|
||||
for _, bs := range m {
|
||||
if err = bs.View(cid, callback); err == nil || err != ErrNotFound {
|
||||
break
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (m unionBlockstore) GetSize(cid cid.Cid) (size int, err error) {
|
||||
for _, bs := range m {
|
||||
if size, err = bs.GetSize(cid); err == nil || err != ErrNotFound {
|
||||
break
|
||||
}
|
||||
}
|
||||
return size, err
|
||||
}
|
||||
|
||||
func (m unionBlockstore) Put(block blocks.Block) (err error) {
|
||||
for _, bs := range m {
|
||||
if err = bs.Put(block); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (m unionBlockstore) PutMany(blks []blocks.Block) (err error) {
|
||||
for _, bs := range m {
|
||||
if err = bs.PutMany(blks); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (m unionBlockstore) DeleteBlock(cid cid.Cid) (err error) {
|
||||
for _, bs := range m {
|
||||
if err = bs.DeleteBlock(cid); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (m unionBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
|
||||
// this does not deduplicate; this interface needs to be revisited.
|
||||
outCh := make(chan cid.Cid)
|
||||
|
||||
go func() {
|
||||
defer close(outCh)
|
||||
|
||||
for _, bs := range m {
|
||||
ch, err := bs.AllKeysChan(ctx)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
for cid := range ch {
|
||||
outCh <- cid
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return outCh, nil
|
||||
}
|
||||
|
||||
func (m unionBlockstore) HashOnRead(enabled bool) {
|
||||
for _, bs := range m {
|
||||
bs.HashOnRead(enabled)
|
||||
}
|
||||
}
|
102
blockstore/union_test.go
Normal file
102
blockstore/union_test.go
Normal file
@ -0,0 +1,102 @@
|
||||
package blockstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
blocks "github.com/ipfs/go-block-format"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var (
|
||||
b0 = blocks.NewBlock([]byte("abc"))
|
||||
b1 = blocks.NewBlock([]byte("foo"))
|
||||
b2 = blocks.NewBlock([]byte("bar"))
|
||||
)
|
||||
|
||||
func TestUnionBlockstore_Get(t *testing.T) {
|
||||
m1 := NewMemory()
|
||||
m2 := NewMemory()
|
||||
|
||||
_ = m1.Put(b1)
|
||||
_ = m2.Put(b2)
|
||||
|
||||
u := Union(m1, m2)
|
||||
|
||||
v1, err := u.Get(b1.Cid())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, b1.RawData(), v1.RawData())
|
||||
|
||||
v2, err := u.Get(b2.Cid())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, b2.RawData(), v2.RawData())
|
||||
}
|
||||
|
||||
func TestUnionBlockstore_Put_PutMany_Delete_AllKeysChan(t *testing.T) {
|
||||
m1 := NewMemory()
|
||||
m2 := NewMemory()
|
||||
|
||||
u := Union(m1, m2)
|
||||
|
||||
err := u.Put(b0)
|
||||
require.NoError(t, err)
|
||||
|
||||
var has bool
|
||||
|
||||
// write was broadcasted to all stores.
|
||||
has, _ = m1.Has(b0.Cid())
|
||||
require.True(t, has)
|
||||
|
||||
has, _ = m2.Has(b0.Cid())
|
||||
require.True(t, has)
|
||||
|
||||
has, _ = u.Has(b0.Cid())
|
||||
require.True(t, has)
|
||||
|
||||
// put many.
|
||||
err = u.PutMany([]blocks.Block{b1, b2})
|
||||
require.NoError(t, err)
|
||||
|
||||
// write was broadcasted to all stores.
|
||||
has, _ = m1.Has(b1.Cid())
|
||||
require.True(t, has)
|
||||
|
||||
has, _ = m1.Has(b2.Cid())
|
||||
require.True(t, has)
|
||||
|
||||
has, _ = m2.Has(b1.Cid())
|
||||
require.True(t, has)
|
||||
|
||||
has, _ = m2.Has(b2.Cid())
|
||||
require.True(t, has)
|
||||
|
||||
// also in the union store.
|
||||
has, _ = u.Has(b1.Cid())
|
||||
require.True(t, has)
|
||||
|
||||
has, _ = u.Has(b2.Cid())
|
||||
require.True(t, has)
|
||||
|
||||
// deleted from all stores.
|
||||
err = u.DeleteBlock(b1.Cid())
|
||||
require.NoError(t, err)
|
||||
|
||||
has, _ = u.Has(b1.Cid())
|
||||
require.False(t, has)
|
||||
|
||||
has, _ = m1.Has(b1.Cid())
|
||||
require.False(t, has)
|
||||
|
||||
has, _ = m2.Has(b1.Cid())
|
||||
require.False(t, has)
|
||||
|
||||
// check that AllKeysChan returns b0 and b2, twice (once per backing store)
|
||||
ch, err := u.AllKeysChan(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
var i int
|
||||
for range ch {
|
||||
i++
|
||||
}
|
||||
require.Equal(t, 4, i)
|
||||
}
|
@ -125,7 +125,7 @@ func NewGeneratorWithSectors(numSectors int) (*ChainGen, error) {
|
||||
return nil, xerrors.Errorf("failed to get metadata datastore: %w", err)
|
||||
}
|
||||
|
||||
bs, err := lr.Blockstore(context.TODO(), repo.BlockstoreChain)
|
||||
bs, err := lr.Blockstore(context.TODO(), repo.UniversalBlockstore)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -406,7 +406,7 @@ func VerifyPreSealedData(ctx context.Context, cs *store.ChainStore, stateroot ci
|
||||
StateBase: stateroot,
|
||||
Epoch: 0,
|
||||
Rand: &fakeRand{},
|
||||
Bstore: cs.Blockstore(),
|
||||
Bstore: cs.StateBlockstore(),
|
||||
Syscalls: mkFakedSigSyscalls(cs.VMSys()),
|
||||
CircSupplyCalc: nil,
|
||||
NtwkVersion: genesisNetworkVersion,
|
||||
|
@ -70,7 +70,7 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sroot cid.Cid
|
||||
StateBase: sroot,
|
||||
Epoch: 0,
|
||||
Rand: &fakeRand{},
|
||||
Bstore: cs.Blockstore(),
|
||||
Bstore: cs.StateBlockstore(),
|
||||
Syscalls: mkFakedSigSyscalls(cs.VMSys()),
|
||||
CircSupplyCalc: csc,
|
||||
NtwkVersion: genesisNetworkVersion,
|
||||
|
@ -79,7 +79,7 @@ func MinerCreateBlock(ctx context.Context, sm *stmgr.StateManager, w api.WalletA
|
||||
}
|
||||
}
|
||||
|
||||
store := sm.ChainStore().Store(ctx)
|
||||
store := sm.ChainStore().ActorStore(ctx)
|
||||
blsmsgroot, err := toArray(store, blsMsgCids)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("building bls amt: %w", err)
|
||||
|
@ -59,7 +59,7 @@ func (sm *StateManager) Call(ctx context.Context, msg *types.Message, ts *types.
|
||||
StateBase: bstate,
|
||||
Epoch: bheight,
|
||||
Rand: store.NewChainRand(sm.cs, ts.Cids()),
|
||||
Bstore: sm.cs.Blockstore(),
|
||||
Bstore: sm.cs.StateBlockstore(),
|
||||
Syscalls: sm.cs.VMSys(),
|
||||
CircSupplyCalc: sm.GetVMCirculatingSupply,
|
||||
NtwkVersion: sm.GetNtwkVersion,
|
||||
@ -174,7 +174,7 @@ func (sm *StateManager) CallWithGas(ctx context.Context, msg *types.Message, pri
|
||||
StateBase: state,
|
||||
Epoch: ts.Height() + 1,
|
||||
Rand: r,
|
||||
Bstore: sm.cs.Blockstore(),
|
||||
Bstore: sm.cs.StateBlockstore(),
|
||||
Syscalls: sm.cs.VMSys(),
|
||||
CircSupplyCalc: sm.GetVMCirculatingSupply,
|
||||
NtwkVersion: sm.GetNtwkVersion,
|
||||
|
@ -504,7 +504,7 @@ func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ Migratio
|
||||
}
|
||||
case builtin0.StorageMinerActorCodeID:
|
||||
var st miner0.State
|
||||
if err := sm.ChainStore().Store(ctx).Get(ctx, act.Head, &st); err != nil {
|
||||
if err := sm.ChainStore().ActorStore(ctx).Get(ctx, act.Head, &st); err != nil {
|
||||
return xerrors.Errorf("failed to load miner state: %w", err)
|
||||
}
|
||||
|
||||
@ -548,7 +548,7 @@ func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ Migratio
|
||||
return cid.Undef, xerrors.Errorf("failed to load power actor: %w", err)
|
||||
}
|
||||
|
||||
cst := cbor.NewCborStore(sm.ChainStore().Blockstore())
|
||||
cst := cbor.NewCborStore(sm.ChainStore().StateBlockstore())
|
||||
if err := cst.Get(ctx, powAct.Head, &ps); err != nil {
|
||||
return cid.Undef, xerrors.Errorf("failed to get power actor state: %w", err)
|
||||
}
|
||||
@ -582,7 +582,7 @@ func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ Migratio
|
||||
}
|
||||
case builtin0.StorageMinerActorCodeID:
|
||||
var st miner0.State
|
||||
if err := sm.ChainStore().Store(ctx).Get(ctx, act.Head, &st); err != nil {
|
||||
if err := sm.ChainStore().ActorStore(ctx).Get(ctx, act.Head, &st); err != nil {
|
||||
return xerrors.Errorf("failed to load miner state: %w", err)
|
||||
}
|
||||
|
||||
@ -591,7 +591,7 @@ func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ Migratio
|
||||
return xerrors.Errorf("failed to get miner info: %w", err)
|
||||
}
|
||||
|
||||
sectorsArr, err := adt0.AsArray(sm.ChainStore().Store(ctx), st.Sectors)
|
||||
sectorsArr, err := adt0.AsArray(sm.ChainStore().ActorStore(ctx), st.Sectors)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to load sectors array: %w", err)
|
||||
}
|
||||
@ -611,11 +611,11 @@ func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ Migratio
|
||||
lbact, err := lbtree.GetActor(addr)
|
||||
if err == nil {
|
||||
var lbst miner0.State
|
||||
if err := sm.ChainStore().Store(ctx).Get(ctx, lbact.Head, &lbst); err != nil {
|
||||
if err := sm.ChainStore().ActorStore(ctx).Get(ctx, lbact.Head, &lbst); err != nil {
|
||||
return xerrors.Errorf("failed to load miner state: %w", err)
|
||||
}
|
||||
|
||||
lbsectors, err := adt0.AsArray(sm.ChainStore().Store(ctx), lbst.Sectors)
|
||||
lbsectors, err := adt0.AsArray(sm.ChainStore().ActorStore(ctx), lbst.Sectors)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to load lb sectors array: %w", err)
|
||||
}
|
||||
@ -711,7 +711,7 @@ func UpgradeFaucetBurnRecovery(ctx context.Context, sm *StateManager, _ Migratio
|
||||
}
|
||||
|
||||
func UpgradeIgnition(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
|
||||
store := sm.cs.Store(ctx)
|
||||
store := sm.cs.ActorStore(ctx)
|
||||
|
||||
if build.UpgradeLiftoffHeight <= epoch {
|
||||
return cid.Undef, xerrors.Errorf("liftoff height must be beyond ignition height")
|
||||
@ -767,7 +767,7 @@ func UpgradeIgnition(ctx context.Context, sm *StateManager, _ MigrationCache, cb
|
||||
|
||||
func UpgradeRefuel(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
|
||||
|
||||
store := sm.cs.Store(ctx)
|
||||
store := sm.cs.ActorStore(ctx)
|
||||
tree, err := sm.StateTree(root)
|
||||
if err != nil {
|
||||
return cid.Undef, xerrors.Errorf("getting state tree: %w", err)
|
||||
@ -792,7 +792,7 @@ func UpgradeRefuel(ctx context.Context, sm *StateManager, _ MigrationCache, cb E
|
||||
}
|
||||
|
||||
func UpgradeActorsV2(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
|
||||
buf := blockstore.NewTieredBstore(sm.cs.Blockstore(), blockstore.NewMemorySync())
|
||||
buf := blockstore.NewTieredBstore(sm.cs.StateBlockstore(), blockstore.NewMemorySync())
|
||||
store := store.ActorStore(ctx, buf)
|
||||
|
||||
info, err := store.Put(ctx, new(types.StateInfo0))
|
||||
@ -843,7 +843,7 @@ func UpgradeLiftoff(ctx context.Context, sm *StateManager, _ MigrationCache, cb
|
||||
return cid.Undef, xerrors.Errorf("getting state tree: %w", err)
|
||||
}
|
||||
|
||||
err = setNetworkName(ctx, sm.cs.Store(ctx), tree, "mainnet")
|
||||
err = setNetworkName(ctx, sm.cs.ActorStore(ctx), tree, "mainnet")
|
||||
if err != nil {
|
||||
return cid.Undef, xerrors.Errorf("setting network name: %w", err)
|
||||
}
|
||||
@ -852,7 +852,7 @@ func UpgradeLiftoff(ctx context.Context, sm *StateManager, _ MigrationCache, cb
|
||||
}
|
||||
|
||||
func UpgradeCalico(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
|
||||
store := sm.cs.Store(ctx)
|
||||
store := sm.cs.ActorStore(ctx)
|
||||
var stateRoot types.StateRoot
|
||||
if err := store.Get(ctx, root, &stateRoot); err != nil {
|
||||
return cid.Undef, xerrors.Errorf("failed to decode state root: %w", err)
|
||||
@ -1009,7 +1009,7 @@ func upgradeActorsV3Common(
|
||||
root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet,
|
||||
config nv10.Config,
|
||||
) (cid.Cid, error) {
|
||||
buf := blockstore.NewTieredBstore(sm.cs.Blockstore(), blockstore.NewMemorySync())
|
||||
buf := blockstore.NewTieredBstore(sm.cs.StateBlockstore(), blockstore.NewMemorySync())
|
||||
store := store.ActorStore(ctx, buf)
|
||||
|
||||
// Load the state root.
|
||||
@ -1239,7 +1239,7 @@ func resetGenesisMsigs0(ctx context.Context, sm *StateManager, store adt0.Store,
|
||||
return xerrors.Errorf("getting genesis tipset: %w", err)
|
||||
}
|
||||
|
||||
cst := cbor.NewCborStore(sm.cs.Blockstore())
|
||||
cst := cbor.NewCborStore(sm.cs.StateBlockstore())
|
||||
genesisTree, err := state.LoadStateTree(cst, gts.ParentState())
|
||||
if err != nil {
|
||||
return xerrors.Errorf("loading state tree: %w", err)
|
||||
|
@ -125,7 +125,7 @@ func TestForkHeightTriggers(t *testing.T) {
|
||||
Height: testForkHeight,
|
||||
Migration: func(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecCallback,
|
||||
root cid.Cid, height abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
|
||||
cst := ipldcbor.NewCborStore(sm.ChainStore().Blockstore())
|
||||
cst := ipldcbor.NewCborStore(sm.ChainStore().StateBlockstore())
|
||||
|
||||
st, err := sm.StateTree(root)
|
||||
if err != nil {
|
||||
|
@ -22,7 +22,7 @@ func (sm *StateManager) ParentStateTsk(tsk types.TipSetKey) (*state.StateTree, e
|
||||
}
|
||||
|
||||
func (sm *StateManager) ParentState(ts *types.TipSet) (*state.StateTree, error) {
|
||||
cst := cbor.NewCborStore(sm.cs.Blockstore())
|
||||
cst := cbor.NewCborStore(sm.cs.StateBlockstore())
|
||||
state, err := state.LoadStateTree(cst, sm.parentState(ts))
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("load state tree: %w", err)
|
||||
@ -32,7 +32,7 @@ func (sm *StateManager) ParentState(ts *types.TipSet) (*state.StateTree, error)
|
||||
}
|
||||
|
||||
func (sm *StateManager) StateTree(st cid.Cid) (*state.StateTree, error) {
|
||||
cst := cbor.NewCborStore(sm.cs.Blockstore())
|
||||
cst := cbor.NewCborStore(sm.cs.StateBlockstore())
|
||||
state, err := state.LoadStateTree(cst, st)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("load state tree: %w", err)
|
||||
|
@ -286,7 +286,7 @@ func (sm *StateManager) ApplyBlocks(ctx context.Context, parentEpoch abi.ChainEp
|
||||
StateBase: base,
|
||||
Epoch: epoch,
|
||||
Rand: r,
|
||||
Bstore: sm.cs.Blockstore(),
|
||||
Bstore: sm.cs.StateBlockstore(),
|
||||
Syscalls: sm.cs.VMSys(),
|
||||
CircSupplyCalc: sm.GetVMCirculatingSupply,
|
||||
NtwkVersion: sm.GetNtwkVersion,
|
||||
@ -430,7 +430,7 @@ func (sm *StateManager) ApplyBlocks(ctx context.Context, parentEpoch abi.ChainEp
|
||||
return cid.Cid{}, cid.Cid{}, err
|
||||
}
|
||||
|
||||
rectarr := blockadt.MakeEmptyArray(sm.cs.Store(ctx))
|
||||
rectarr := blockadt.MakeEmptyArray(sm.cs.ActorStore(ctx))
|
||||
for i, receipt := range receipts {
|
||||
if err := rectarr.Set(uint64(i), receipt); err != nil {
|
||||
return cid.Undef, cid.Undef, xerrors.Errorf("failed to build receipts amt: %w", err)
|
||||
@ -515,7 +515,7 @@ func (sm *StateManager) ResolveToKeyAddress(ctx context.Context, addr address.Ad
|
||||
ts = sm.cs.GetHeaviestTipSet()
|
||||
}
|
||||
|
||||
cst := cbor.NewCborStore(sm.cs.Blockstore())
|
||||
cst := cbor.NewCborStore(sm.cs.StateBlockstore())
|
||||
|
||||
// First try to resolve the actor in the parent state, so we don't have to compute anything.
|
||||
tree, err := state.LoadStateTree(cst, ts.ParentState())
|
||||
@ -556,7 +556,7 @@ func (sm *StateManager) GetBlsPublicKey(ctx context.Context, addr address.Addres
|
||||
}
|
||||
|
||||
func (sm *StateManager) LookupID(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) {
|
||||
cst := cbor.NewCborStore(sm.cs.Blockstore())
|
||||
cst := cbor.NewCborStore(sm.cs.StateBlockstore())
|
||||
state, err := state.LoadStateTree(cst, sm.parentState(ts))
|
||||
if err != nil {
|
||||
return address.Undef, xerrors.Errorf("load state tree: %w", err)
|
||||
@ -882,7 +882,7 @@ func (sm *StateManager) MarketBalance(ctx context.Context, addr address.Address,
|
||||
return api.MarketBalance{}, err
|
||||
}
|
||||
|
||||
mstate, err := market.Load(sm.cs.Store(ctx), act)
|
||||
mstate, err := market.Load(sm.cs.ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return api.MarketBalance{}, err
|
||||
}
|
||||
@ -966,7 +966,7 @@ func (sm *StateManager) setupGenesisVestingSchedule(ctx context.Context) error {
|
||||
return xerrors.Errorf("getting genesis tipset state: %w", err)
|
||||
}
|
||||
|
||||
cst := cbor.NewCborStore(sm.cs.Blockstore())
|
||||
cst := cbor.NewCborStore(sm.cs.StateBlockstore())
|
||||
sTree, err := state.LoadStateTree(cst, st)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("loading state tree: %w", err)
|
||||
@ -1325,7 +1325,7 @@ func (sm *StateManager) GetCirculatingSupply(ctx context.Context, height abi.Cha
|
||||
unCirc = big.Add(unCirc, actor.Balance)
|
||||
|
||||
case a == market.Address:
|
||||
mst, err := market.Load(sm.cs.Store(ctx), actor)
|
||||
mst, err := market.Load(sm.cs.ActorStore(ctx), actor)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -1342,7 +1342,7 @@ func (sm *StateManager) GetCirculatingSupply(ctx context.Context, height abi.Cha
|
||||
circ = big.Add(circ, actor.Balance)
|
||||
|
||||
case builtin.IsStorageMinerActor(actor.Code):
|
||||
mst, err := miner.Load(sm.cs.Store(ctx), actor)
|
||||
mst, err := miner.Load(sm.cs.ActorStore(ctx), actor)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -1359,7 +1359,7 @@ func (sm *StateManager) GetCirculatingSupply(ctx context.Context, height abi.Cha
|
||||
}
|
||||
|
||||
case builtin.IsMultisigActor(actor.Code):
|
||||
mst, err := multisig.Load(sm.cs.Store(ctx), actor)
|
||||
mst, err := multisig.Load(sm.cs.ActorStore(ctx), actor)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -1413,7 +1413,7 @@ func (sm *StateManager) GetPaychState(ctx context.Context, addr address.Address,
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
actState, err := paych.Load(sm.cs.Store(ctx), act)
|
||||
actState, err := paych.Load(sm.cs.ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
@ -1431,7 +1431,7 @@ func (sm *StateManager) GetMarketState(ctx context.Context, ts *types.TipSet) (m
|
||||
return nil, err
|
||||
}
|
||||
|
||||
actState, err := market.Load(sm.cs.Store(ctx), act)
|
||||
actState, err := market.Load(sm.cs.ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -48,7 +48,7 @@ func GetNetworkName(ctx context.Context, sm *StateManager, st cid.Cid) (dtypes.N
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
ias, err := init_.Load(sm.cs.Store(ctx), act)
|
||||
ias, err := init_.Load(sm.cs.ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@ -65,7 +65,7 @@ func GetMinerWorkerRaw(ctx context.Context, sm *StateManager, st cid.Cid, maddr
|
||||
if err != nil {
|
||||
return address.Undef, xerrors.Errorf("(get sset) failed to load miner actor: %w", err)
|
||||
}
|
||||
mas, err := miner.Load(sm.cs.Store(ctx), act)
|
||||
mas, err := miner.Load(sm.cs.ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return address.Undef, xerrors.Errorf("(get sset) failed to load miner actor state: %w", err)
|
||||
}
|
||||
@ -75,7 +75,7 @@ func GetMinerWorkerRaw(ctx context.Context, sm *StateManager, st cid.Cid, maddr
|
||||
return address.Undef, xerrors.Errorf("failed to load actor info: %w", err)
|
||||
}
|
||||
|
||||
return vm.ResolveToKeyAddr(state, sm.cs.Store(ctx), info.Worker)
|
||||
return vm.ResolveToKeyAddr(state, sm.cs.ActorStore(ctx), info.Worker)
|
||||
}
|
||||
|
||||
func GetPower(ctx context.Context, sm *StateManager, ts *types.TipSet, maddr address.Address) (power.Claim, power.Claim, bool, error) {
|
||||
@ -88,7 +88,7 @@ func GetPowerRaw(ctx context.Context, sm *StateManager, st cid.Cid, maddr addres
|
||||
return power.Claim{}, power.Claim{}, false, xerrors.Errorf("(get sset) failed to load power actor state: %w", err)
|
||||
}
|
||||
|
||||
pas, err := power.Load(sm.cs.Store(ctx), act)
|
||||
pas, err := power.Load(sm.cs.ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return power.Claim{}, power.Claim{}, false, err
|
||||
}
|
||||
@ -123,7 +123,7 @@ func PreCommitInfo(ctx context.Context, sm *StateManager, maddr address.Address,
|
||||
return nil, xerrors.Errorf("(get sset) failed to load miner actor: %w", err)
|
||||
}
|
||||
|
||||
mas, err := miner.Load(sm.cs.Store(ctx), act)
|
||||
mas, err := miner.Load(sm.cs.ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("(get sset) failed to load miner actor state: %w", err)
|
||||
}
|
||||
@ -137,7 +137,7 @@ func MinerSectorInfo(ctx context.Context, sm *StateManager, maddr address.Addres
|
||||
return nil, xerrors.Errorf("(get sset) failed to load miner actor: %w", err)
|
||||
}
|
||||
|
||||
mas, err := miner.Load(sm.cs.Store(ctx), act)
|
||||
mas, err := miner.Load(sm.cs.ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("(get sset) failed to load miner actor state: %w", err)
|
||||
}
|
||||
@ -151,7 +151,7 @@ func GetSectorsForWinningPoSt(ctx context.Context, nv network.Version, pv ffiwra
|
||||
return nil, xerrors.Errorf("failed to load miner actor: %w", err)
|
||||
}
|
||||
|
||||
mas, err := miner.Load(sm.cs.Store(ctx), act)
|
||||
mas, err := miner.Load(sm.cs.ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to load miner actor state: %w", err)
|
||||
}
|
||||
@ -249,7 +249,7 @@ func GetMinerSlashed(ctx context.Context, sm *StateManager, ts *types.TipSet, ma
|
||||
return false, xerrors.Errorf("failed to load power actor: %w", err)
|
||||
}
|
||||
|
||||
spas, err := power.Load(sm.cs.Store(ctx), act)
|
||||
spas, err := power.Load(sm.cs.ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("failed to load power actor state: %w", err)
|
||||
}
|
||||
@ -272,7 +272,7 @@ func GetStorageDeal(ctx context.Context, sm *StateManager, dealID abi.DealID, ts
|
||||
return nil, xerrors.Errorf("failed to load market actor: %w", err)
|
||||
}
|
||||
|
||||
state, err := market.Load(sm.cs.Store(ctx), act)
|
||||
state, err := market.Load(sm.cs.ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to load market actor state: %w", err)
|
||||
}
|
||||
@ -320,7 +320,7 @@ func ListMinerActors(ctx context.Context, sm *StateManager, ts *types.TipSet) ([
|
||||
return nil, xerrors.Errorf("failed to load power actor: %w", err)
|
||||
}
|
||||
|
||||
powState, err := power.Load(sm.cs.Store(ctx), act)
|
||||
powState, err := power.Load(sm.cs.ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to load power actor state: %w", err)
|
||||
}
|
||||
@ -353,7 +353,7 @@ func ComputeState(ctx context.Context, sm *StateManager, height abi.ChainEpoch,
|
||||
StateBase: base,
|
||||
Epoch: height,
|
||||
Rand: r,
|
||||
Bstore: sm.cs.Blockstore(),
|
||||
Bstore: sm.cs.StateBlockstore(),
|
||||
Syscalls: sm.cs.VMSys(),
|
||||
CircSupplyCalc: sm.GetVMCirculatingSupply,
|
||||
NtwkVersion: sm.GetNtwkVersion,
|
||||
@ -474,7 +474,7 @@ func MinerGetBaseInfo(ctx context.Context, sm *StateManager, bcs beacon.Schedule
|
||||
return nil, xerrors.Errorf("failed to load miner actor: %w", err)
|
||||
}
|
||||
|
||||
mas, err := miner.Load(sm.cs.Store(ctx), act)
|
||||
mas, err := miner.Load(sm.cs.ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to load miner actor state: %w", err)
|
||||
}
|
||||
@ -623,7 +623,7 @@ func minerHasMinPower(ctx context.Context, sm *StateManager, addr address.Addres
|
||||
return false, xerrors.Errorf("loading power actor state: %w", err)
|
||||
}
|
||||
|
||||
ps, err := power.Load(sm.cs.Store(ctx), pact)
|
||||
ps, err := power.Load(sm.cs.ActorStore(ctx), pact)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
@ -654,7 +654,7 @@ func MinerEligibleToMine(ctx context.Context, sm *StateManager, addr address.Add
|
||||
return false, xerrors.Errorf("loading power actor state: %w", err)
|
||||
}
|
||||
|
||||
pstate, err := power.Load(sm.cs.Store(ctx), pact)
|
||||
pstate, err := power.Load(sm.cs.ActorStore(ctx), pact)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
@ -664,7 +664,7 @@ func MinerEligibleToMine(ctx context.Context, sm *StateManager, addr address.Add
|
||||
return false, xerrors.Errorf("loading miner actor state: %w", err)
|
||||
}
|
||||
|
||||
mstate, err := miner.Load(sm.cs.Store(ctx), mact)
|
||||
mstate, err := miner.Load(sm.cs.ActorStore(ctx), mact)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
@ -696,7 +696,7 @@ func MinerEligibleToMine(ctx context.Context, sm *StateManager, addr address.Add
|
||||
}
|
||||
|
||||
func CheckTotalFIL(ctx context.Context, sm *StateManager, ts *types.TipSet) (abi.TokenAmount, error) {
|
||||
str, err := state.LoadStateTree(sm.ChainStore().Store(ctx), ts.ParentState())
|
||||
str, err := state.LoadStateTree(sm.ChainStore().ActorStore(ctx), ts.ParentState())
|
||||
if err != nil {
|
||||
return abi.TokenAmount{}, err
|
||||
}
|
||||
|
@ -107,11 +107,11 @@ type HeadChangeEvt struct {
|
||||
// 1. a tipset cache
|
||||
// 2. a block => messages references cache.
|
||||
type ChainStore struct {
|
||||
bs bstore.Blockstore
|
||||
localbs bstore.Blockstore
|
||||
ds dstore.Batching
|
||||
chainBlockstore bstore.Blockstore
|
||||
stateBlockstore bstore.Blockstore
|
||||
metadataDs dstore.Batching
|
||||
|
||||
localviewer bstore.Viewer
|
||||
chainLocalBlockstore bstore.Blockstore
|
||||
|
||||
heaviestLk sync.Mutex
|
||||
heaviest *types.TipSet
|
||||
@ -139,30 +139,29 @@ type ChainStore struct {
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// localbs is guaranteed to fail Get* if requested block isn't stored locally
|
||||
func NewChainStore(bs bstore.Blockstore, localbs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder, j journal.Journal) *ChainStore {
|
||||
mmCache, _ := lru.NewARC(DefaultMsgMetaCacheSize)
|
||||
tsCache, _ := lru.NewARC(DefaultTipSetCacheSize)
|
||||
func NewChainStore(chainBs bstore.Blockstore, stateBs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder, j journal.Journal) *ChainStore {
|
||||
c, _ := lru.NewARC(DefaultMsgMetaCacheSize)
|
||||
tsc, _ := lru.NewARC(DefaultTipSetCacheSize)
|
||||
if j == nil {
|
||||
j = journal.NilJournal()
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
// unwraps the fallback store in case one is configured.
|
||||
// some methods _need_ to operate on a local blockstore only.
|
||||
localbs, _ := bstore.UnwrapFallbackStore(chainBs)
|
||||
cs := &ChainStore{
|
||||
bs: bs,
|
||||
localbs: localbs,
|
||||
ds: ds,
|
||||
bestTips: pubsub.New(64),
|
||||
tipsets: make(map[abi.ChainEpoch][]cid.Cid),
|
||||
mmCache: mmCache,
|
||||
tsCache: tsCache,
|
||||
vmcalls: vmcalls,
|
||||
cancelFn: cancel,
|
||||
journal: j,
|
||||
}
|
||||
|
||||
if v, ok := localbs.(bstore.Viewer); ok {
|
||||
cs.localviewer = v
|
||||
chainBlockstore: chainBs,
|
||||
stateBlockstore: stateBs,
|
||||
chainLocalBlockstore: localbs,
|
||||
metadataDs: ds,
|
||||
bestTips: pubsub.New(64),
|
||||
tipsets: make(map[abi.ChainEpoch][]cid.Cid),
|
||||
mmCache: c,
|
||||
tsCache: tsc,
|
||||
vmcalls: vmcalls,
|
||||
cancelFn: cancel,
|
||||
journal: j,
|
||||
}
|
||||
|
||||
cs.evtTypes = [1]journal.EventType{
|
||||
@ -216,7 +215,7 @@ func (cs *ChainStore) Close() error {
|
||||
}
|
||||
|
||||
func (cs *ChainStore) Load() error {
|
||||
head, err := cs.ds.Get(chainHeadKey)
|
||||
head, err := cs.metadataDs.Get(chainHeadKey)
|
||||
if err == dstore.ErrNotFound {
|
||||
log.Warn("no previous chain state found")
|
||||
return nil
|
||||
@ -246,7 +245,7 @@ func (cs *ChainStore) writeHead(ts *types.TipSet) error {
|
||||
return xerrors.Errorf("failed to marshal tipset: %w", err)
|
||||
}
|
||||
|
||||
if err := cs.ds.Put(chainHeadKey, data); err != nil {
|
||||
if err := cs.metadataDs.Put(chainHeadKey, data); err != nil {
|
||||
return xerrors.Errorf("failed to write chain head to datastore: %w", err)
|
||||
}
|
||||
|
||||
@ -306,13 +305,13 @@ func (cs *ChainStore) SubscribeHeadChanges(f ReorgNotifee) {
|
||||
func (cs *ChainStore) IsBlockValidated(ctx context.Context, blkid cid.Cid) (bool, error) {
|
||||
key := blockValidationCacheKeyPrefix.Instance(blkid.String())
|
||||
|
||||
return cs.ds.Has(key)
|
||||
return cs.metadataDs.Has(key)
|
||||
}
|
||||
|
||||
func (cs *ChainStore) MarkBlockAsValidated(ctx context.Context, blkid cid.Cid) error {
|
||||
key := blockValidationCacheKeyPrefix.Instance(blkid.String())
|
||||
|
||||
if err := cs.ds.Put(key, []byte{0}); err != nil {
|
||||
if err := cs.metadataDs.Put(key, []byte{0}); err != nil {
|
||||
return xerrors.Errorf("cache block validation: %w", err)
|
||||
}
|
||||
|
||||
@ -322,7 +321,7 @@ func (cs *ChainStore) MarkBlockAsValidated(ctx context.Context, blkid cid.Cid) e
|
||||
func (cs *ChainStore) UnmarkBlockAsValidated(ctx context.Context, blkid cid.Cid) error {
|
||||
key := blockValidationCacheKeyPrefix.Instance(blkid.String())
|
||||
|
||||
if err := cs.ds.Delete(key); err != nil {
|
||||
if err := cs.metadataDs.Delete(key); err != nil {
|
||||
return xerrors.Errorf("removing from valid block cache: %w", err)
|
||||
}
|
||||
|
||||
@ -339,7 +338,7 @@ func (cs *ChainStore) SetGenesis(b *types.BlockHeader) error {
|
||||
return err
|
||||
}
|
||||
|
||||
return cs.ds.Put(dstore.NewKey("0"), b.Cid().Bytes())
|
||||
return cs.metadataDs.Put(dstore.NewKey("0"), b.Cid().Bytes())
|
||||
}
|
||||
|
||||
func (cs *ChainStore) PutTipSet(ctx context.Context, ts *types.TipSet) error {
|
||||
@ -594,7 +593,7 @@ func (cs *ChainStore) takeHeaviestTipSet(ctx context.Context, ts *types.TipSet)
|
||||
// FlushValidationCache removes all results of block validation from the
|
||||
// chain metadata store. Usually the first step after a new chain import.
|
||||
func (cs *ChainStore) FlushValidationCache() error {
|
||||
return FlushValidationCache(cs.ds)
|
||||
return FlushValidationCache(cs.metadataDs)
|
||||
}
|
||||
|
||||
func FlushValidationCache(ds datastore.Batching) error {
|
||||
@ -653,7 +652,7 @@ func (cs *ChainStore) SetHead(ts *types.TipSet) error {
|
||||
// Contains returns whether our BlockStore has all blocks in the supplied TipSet.
|
||||
func (cs *ChainStore) Contains(ts *types.TipSet) (bool, error) {
|
||||
for _, c := range ts.Cids() {
|
||||
has, err := cs.bs.Has(c)
|
||||
has, err := cs.chainBlockstore.Has(c)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
@ -668,16 +667,8 @@ func (cs *ChainStore) Contains(ts *types.TipSet) (bool, error) {
|
||||
// GetBlock fetches a BlockHeader with the supplied CID. It returns
|
||||
// blockstore.ErrNotFound if the block was not found in the BlockStore.
|
||||
func (cs *ChainStore) GetBlock(c cid.Cid) (*types.BlockHeader, error) {
|
||||
if cs.localviewer == nil {
|
||||
sb, err := cs.localbs.Get(c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return types.DecodeBlock(sb.RawData())
|
||||
}
|
||||
|
||||
var blk *types.BlockHeader
|
||||
err := cs.localviewer.View(c, func(b []byte) (err error) {
|
||||
err := cs.chainLocalBlockstore.View(c, func(b []byte) (err error) {
|
||||
blk, err = types.DecodeBlock(b)
|
||||
return err
|
||||
})
|
||||
@ -851,7 +842,7 @@ func (cs *ChainStore) PersistBlockHeaders(b ...*types.BlockHeader) error {
|
||||
end = len(b)
|
||||
}
|
||||
|
||||
err = multierr.Append(err, cs.bs.PutMany(sbs[start:end]))
|
||||
err = multierr.Append(err, cs.chainLocalBlockstore.PutMany(sbs[start:end]))
|
||||
}
|
||||
|
||||
return err
|
||||
@ -875,7 +866,7 @@ func PutMessage(bs bstore.Blockstore, m storable) (cid.Cid, error) {
|
||||
}
|
||||
|
||||
func (cs *ChainStore) PutMessage(m storable) (cid.Cid, error) {
|
||||
return PutMessage(cs.bs, m)
|
||||
return PutMessage(cs.chainBlockstore, m)
|
||||
}
|
||||
|
||||
func (cs *ChainStore) expandTipset(b *types.BlockHeader) (*types.TipSet, error) {
|
||||
@ -936,7 +927,7 @@ func (cs *ChainStore) AddBlock(ctx context.Context, b *types.BlockHeader) error
|
||||
}
|
||||
|
||||
func (cs *ChainStore) GetGenesis() (*types.BlockHeader, error) {
|
||||
data, err := cs.ds.Get(dstore.NewKey("0"))
|
||||
data, err := cs.metadataDs.Get(dstore.NewKey("0"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -962,17 +953,8 @@ func (cs *ChainStore) GetCMessage(c cid.Cid) (types.ChainMsg, error) {
|
||||
}
|
||||
|
||||
func (cs *ChainStore) GetMessage(c cid.Cid) (*types.Message, error) {
|
||||
if cs.localviewer == nil {
|
||||
sb, err := cs.localbs.Get(c)
|
||||
if err != nil {
|
||||
log.Errorf("get message get failed: %s: %s", c, err)
|
||||
return nil, err
|
||||
}
|
||||
return types.DecodeMessage(sb.RawData())
|
||||
}
|
||||
|
||||
var msg *types.Message
|
||||
err := cs.localviewer.View(c, func(b []byte) (err error) {
|
||||
err := cs.chainLocalBlockstore.View(c, func(b []byte) (err error) {
|
||||
msg, err = types.DecodeMessage(b)
|
||||
return err
|
||||
})
|
||||
@ -980,17 +962,8 @@ func (cs *ChainStore) GetMessage(c cid.Cid) (*types.Message, error) {
|
||||
}
|
||||
|
||||
func (cs *ChainStore) GetSignedMessage(c cid.Cid) (*types.SignedMessage, error) {
|
||||
if cs.localviewer == nil {
|
||||
sb, err := cs.localbs.Get(c)
|
||||
if err != nil {
|
||||
log.Errorf("get message get failed: %s: %s", c, err)
|
||||
return nil, err
|
||||
}
|
||||
return types.DecodeSignedMessage(sb.RawData())
|
||||
}
|
||||
|
||||
var msg *types.SignedMessage
|
||||
err := cs.localviewer.View(c, func(b []byte) (err error) {
|
||||
err := cs.chainLocalBlockstore.View(c, func(b []byte) (err error) {
|
||||
msg, err = types.DecodeSignedMessage(b)
|
||||
return err
|
||||
})
|
||||
@ -1000,7 +973,7 @@ func (cs *ChainStore) GetSignedMessage(c cid.Cid) (*types.SignedMessage, error)
|
||||
func (cs *ChainStore) readAMTCids(root cid.Cid) ([]cid.Cid, error) {
|
||||
ctx := context.TODO()
|
||||
// block headers use adt0, for now.
|
||||
a, err := blockadt.AsArray(cs.Store(ctx), root)
|
||||
a, err := blockadt.AsArray(cs.ActorStore(ctx), root)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("amt load: %w", err)
|
||||
}
|
||||
@ -1124,7 +1097,7 @@ func (cs *ChainStore) ReadMsgMetaCids(mmc cid.Cid) ([]cid.Cid, []cid.Cid, error)
|
||||
return mmcids.bls, mmcids.secpk, nil
|
||||
}
|
||||
|
||||
cst := cbor.NewCborStore(cs.localbs)
|
||||
cst := cbor.NewCborStore(cs.chainLocalBlockstore)
|
||||
var msgmeta types.MsgMeta
|
||||
if err := cst.Get(context.TODO(), mmc, &msgmeta); err != nil {
|
||||
return nil, nil, xerrors.Errorf("failed to load msgmeta (%s): %w", mmc, err)
|
||||
@ -1194,7 +1167,7 @@ func (cs *ChainStore) MessagesForBlock(b *types.BlockHeader) ([]*types.Message,
|
||||
func (cs *ChainStore) GetParentReceipt(b *types.BlockHeader, i int) (*types.MessageReceipt, error) {
|
||||
ctx := context.TODO()
|
||||
// block headers use adt0, for now.
|
||||
a, err := blockadt.AsArray(cs.Store(ctx), b.ParentMessageReceipts)
|
||||
a, err := blockadt.AsArray(cs.ActorStore(ctx), b.ParentMessageReceipts)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("amt load: %w", err)
|
||||
}
|
||||
@ -1237,16 +1210,26 @@ func (cs *ChainStore) LoadSignedMessagesFromCids(cids []cid.Cid) ([]*types.Signe
|
||||
return msgs, nil
|
||||
}
|
||||
|
||||
func (cs *ChainStore) Blockstore() bstore.Blockstore {
|
||||
return cs.bs
|
||||
// ChainBlockstore returns the chain blockstore. Currently the chain and state
|
||||
// // stores are both backed by the same physical store, albeit with different
|
||||
// // caching policies, but in the future they will segregate.
|
||||
func (cs *ChainStore) ChainBlockstore() bstore.Blockstore {
|
||||
return cs.chainBlockstore
|
||||
}
|
||||
|
||||
// StateBlockstore returns the state blockstore. Currently the chain and state
|
||||
// stores are both backed by the same physical store, albeit with different
|
||||
// caching policies, but in the future they will segregate.
|
||||
func (cs *ChainStore) StateBlockstore() bstore.Blockstore {
|
||||
return cs.stateBlockstore
|
||||
}
|
||||
|
||||
func ActorStore(ctx context.Context, bs bstore.Blockstore) adt.Store {
|
||||
return adt.WrapStore(ctx, cbor.NewCborStore(bs))
|
||||
}
|
||||
|
||||
func (cs *ChainStore) Store(ctx context.Context) adt.Store {
|
||||
return ActorStore(ctx, cs.bs)
|
||||
func (cs *ChainStore) ActorStore(ctx context.Context) adt.Store {
|
||||
return ActorStore(ctx, cs.stateBlockstore)
|
||||
}
|
||||
|
||||
func (cs *ChainStore) VMSys() vm.SyscallBuilder {
|
||||
@ -1444,8 +1427,9 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo
|
||||
return xerrors.Errorf("failed to write car header: %s", err)
|
||||
}
|
||||
|
||||
return cs.WalkSnapshot(ctx, ts, inclRecentRoots, skipOldMsgs, func(c cid.Cid) error {
|
||||
blk, err := cs.bs.Get(c)
|
||||
unionBs := bstore.Union(cs.stateBlockstore, cs.chainBlockstore)
|
||||
return cs.WalkSnapshot(ctx, ts, inclRecentRoots, skipOldMsgs, true, func(c cid.Cid) error {
|
||||
blk, err := unionBs.Get(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("writing object to car, bs.Get: %w", err)
|
||||
}
|
||||
@ -1458,7 +1442,7 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo
|
||||
})
|
||||
}
|
||||
|
||||
func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs bool, cb func(cid.Cid) error) error {
|
||||
func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs, skipMsgReceipts bool, cb func(cid.Cid) error) error {
|
||||
if ts == nil {
|
||||
ts = cs.GetHeaviestTipSet()
|
||||
}
|
||||
@ -1478,7 +1462,7 @@ func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRe
|
||||
return err
|
||||
}
|
||||
|
||||
data, err := cs.bs.Get(blk)
|
||||
data, err := cs.chainBlockstore.Get(blk)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting block: %w", err)
|
||||
}
|
||||
@ -1498,7 +1482,7 @@ func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRe
|
||||
var cids []cid.Cid
|
||||
if !skipOldMsgs || b.Height > ts.Height()-inclRecentRoots {
|
||||
if walked.Visit(b.Messages) {
|
||||
mcids, err := recurseLinks(cs.bs, walked, b.Messages, []cid.Cid{b.Messages})
|
||||
mcids, err := recurseLinks(cs.chainBlockstore, walked, b.Messages, []cid.Cid{b.Messages})
|
||||
if err != nil {
|
||||
return xerrors.Errorf("recursing messages failed: %w", err)
|
||||
}
|
||||
@ -1519,13 +1503,17 @@ func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRe
|
||||
|
||||
if b.Height == 0 || b.Height > ts.Height()-inclRecentRoots {
|
||||
if walked.Visit(b.ParentStateRoot) {
|
||||
cids, err := recurseLinks(cs.bs, walked, b.ParentStateRoot, []cid.Cid{b.ParentStateRoot})
|
||||
cids, err := recurseLinks(cs.stateBlockstore, walked, b.ParentStateRoot, []cid.Cid{b.ParentStateRoot})
|
||||
if err != nil {
|
||||
return xerrors.Errorf("recursing genesis state failed: %w", err)
|
||||
}
|
||||
|
||||
out = append(out, cids...)
|
||||
}
|
||||
|
||||
if !skipMsgReceipts && walked.Visit(b.ParentMessageReceipts) {
|
||||
out = append(out, b.ParentMessageReceipts)
|
||||
}
|
||||
}
|
||||
|
||||
for _, c := range out {
|
||||
@ -1561,7 +1549,12 @@ func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRe
|
||||
}
|
||||
|
||||
func (cs *ChainStore) Import(r io.Reader) (*types.TipSet, error) {
|
||||
header, err := car.LoadCar(cs.Blockstore(), r)
|
||||
// TODO: writing only to the state blockstore is incorrect.
|
||||
// At this time, both the state and chain blockstores are backed by the
|
||||
// universal store. When we physically segregate the stores, we will need
|
||||
// to route state objects to the state blockstore, and chain objects to
|
||||
// the chain blockstore.
|
||||
header, err := car.LoadCar(cs.StateBlockstore(), r)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("loadcar failed: %w", err)
|
||||
}
|
||||
|
@ -52,7 +52,7 @@ func BenchmarkGetRandomness(b *testing.B) {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
bs, err := lr.Blockstore(context.TODO(), repo.BlockstoreChain)
|
||||
bs, err := lr.Blockstore(context.TODO(), repo.UniversalBlockstore)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
@ -28,7 +28,7 @@ func (cs *ChainStore) Weight(ctx context.Context, ts *types.TipSet) (types.BigIn
|
||||
|
||||
tpow := big2.Zero()
|
||||
{
|
||||
cst := cbor.NewCborStore(cs.Blockstore())
|
||||
cst := cbor.NewCborStore(cs.StateBlockstore())
|
||||
state, err := state.LoadStateTree(cst, ts.ParentState())
|
||||
if err != nil {
|
||||
return types.NewInt(0), xerrors.Errorf("load state tree: %w", err)
|
||||
@ -39,7 +39,7 @@ func (cs *ChainStore) Weight(ctx context.Context, ts *types.TipSet) (types.BigIn
|
||||
return types.NewInt(0), xerrors.Errorf("get power actor: %w", err)
|
||||
}
|
||||
|
||||
powState, err := power.Load(cs.Store(ctx), act)
|
||||
powState, err := power.Load(cs.ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return types.NewInt(0), xerrors.Errorf("failed to load power actor state: %w", err)
|
||||
}
|
||||
|
@ -354,7 +354,7 @@ func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error {
|
||||
}
|
||||
|
||||
// Finally, flush.
|
||||
return vm.Copy(context.TODO(), blockstore, syncer.store.Blockstore(), smroot)
|
||||
return vm.Copy(context.TODO(), blockstore, syncer.store.ChainBlockstore(), smroot)
|
||||
}
|
||||
|
||||
func (syncer *Syncer) LocalPeer() peer.ID {
|
||||
@ -640,7 +640,7 @@ func (syncer *Syncer) minerIsValid(ctx context.Context, maddr address.Address, b
|
||||
return xerrors.Errorf("failed to load power actor: %w", err)
|
||||
}
|
||||
|
||||
powState, err := power.Load(syncer.store.Store(ctx), act)
|
||||
powState, err := power.Load(syncer.store.ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to load power actor state: %w", err)
|
||||
}
|
||||
@ -1055,7 +1055,7 @@ func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock
|
||||
return err
|
||||
}
|
||||
|
||||
st, err := state.LoadStateTree(syncer.store.Store(ctx), stateroot)
|
||||
st, err := state.LoadStateTree(syncer.store.ActorStore(ctx), stateroot)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to load base state tree: %w", err)
|
||||
}
|
||||
@ -1172,7 +1172,7 @@ func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock
|
||||
}
|
||||
|
||||
// Finally, flush.
|
||||
return vm.Copy(ctx, tmpbs, syncer.store.Blockstore(), mrcid)
|
||||
return vm.Copy(ctx, tmpbs, syncer.store.ChainBlockstore(), mrcid)
|
||||
}
|
||||
|
||||
func (syncer *Syncer) verifyBlsAggregate(ctx context.Context, sig *crypto.Signature, msgs []cid.Cid, pubks [][]byte) error {
|
||||
@ -1574,7 +1574,7 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS
|
||||
return err
|
||||
}
|
||||
|
||||
if err := copyBlockstore(ctx, bs, syncer.store.Blockstore()); err != nil {
|
||||
if err := copyBlockstore(ctx, bs, syncer.store.ChainBlockstore()); err != nil {
|
||||
return xerrors.Errorf("message processing failed: %w", err)
|
||||
}
|
||||
}
|
||||
|
@ -20,7 +20,6 @@ import (
|
||||
"github.com/cockroachdb/pebble"
|
||||
"github.com/cockroachdb/pebble/bloom"
|
||||
"github.com/ipfs/go-cid"
|
||||
metricsi "github.com/ipfs/go-metrics-interface"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
|
||||
@ -204,7 +203,7 @@ var importBenchCmd = &cli.Command{
|
||||
case cctx.Bool("use-native-badger"):
|
||||
log.Info("using native badger")
|
||||
var opts badgerbs.Options
|
||||
if opts, err = repo.BadgerBlockstoreOptions(repo.BlockstoreChain, tdir, false); err != nil {
|
||||
if opts, err = repo.BadgerBlockstoreOptions(repo.UniversalBlockstore, tdir, false); err != nil {
|
||||
return err
|
||||
}
|
||||
opts.SyncWrites = false
|
||||
@ -236,14 +235,6 @@ var importBenchCmd = &cli.Command{
|
||||
defer c.Close() //nolint:errcheck
|
||||
}
|
||||
|
||||
ctx := metricsi.CtxScope(context.Background(), "lotus")
|
||||
cacheOpts := blockstore.DefaultCacheOpts()
|
||||
cacheOpts.HasBloomFilterSize = 0
|
||||
bs, err = blockstore.CachedBlockstore(ctx, bs, cacheOpts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var verifier ffiwrapper.Verifier = ffiwrapper.ProofVerifier
|
||||
if cctx.IsSet("syscall-cache") {
|
||||
scds, err := badger.NewDatastore(cctx.String("syscall-cache"), &badger.DefaultOptions)
|
||||
@ -267,6 +258,15 @@ var importBenchCmd = &cli.Command{
|
||||
|
||||
stm := stmgr.NewStateManager(cs)
|
||||
|
||||
var carFile *os.File
|
||||
// open the CAR file if one is provided.
|
||||
if path := cctx.String("car"); path != "" {
|
||||
var err error
|
||||
if carFile, err = os.Open(path); err != nil {
|
||||
return xerrors.Errorf("failed to open provided CAR file: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
|
||||
// register a gauge that reports how long since the measurable
|
||||
@ -308,18 +308,7 @@ var importBenchCmd = &cli.Command{
|
||||
writeProfile("allocs")
|
||||
}()
|
||||
|
||||
var carFile *os.File
|
||||
|
||||
// open the CAR file if one is provided.
|
||||
if path := cctx.String("car"); path != "" {
|
||||
var err error
|
||||
if carFile, err = os.Open(path); err != nil {
|
||||
return xerrors.Errorf("failed to open provided CAR file: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
var head *types.TipSet
|
||||
|
||||
// --- IMPORT ---
|
||||
if !cctx.Bool("no-import") {
|
||||
if cctx.Bool("global-profile") {
|
||||
|
@ -175,7 +175,7 @@ var chainBalanceStateCmd = &cli.Command{
|
||||
|
||||
defer lkrepo.Close() //nolint:errcheck
|
||||
|
||||
bs, err := lkrepo.Blockstore(ctx, repo.BlockstoreChain)
|
||||
bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open blockstore: %w", err)
|
||||
}
|
||||
@ -396,7 +396,7 @@ var chainPledgeCmd = &cli.Command{
|
||||
|
||||
defer lkrepo.Close() //nolint:errcheck
|
||||
|
||||
bs, err := lkrepo.Blockstore(ctx, repo.BlockstoreChain)
|
||||
bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to open blockstore: %w", err)
|
||||
}
|
||||
|
@ -319,7 +319,7 @@ var datastoreRewriteCmd = &cli.Command{
|
||||
)
|
||||
|
||||
// open the destination (to) store.
|
||||
opts, err := repo.BadgerBlockstoreOptions(repo.BlockstoreChain, toPath, false)
|
||||
opts, err := repo.BadgerBlockstoreOptions(repo.UniversalBlockstore, toPath, false)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to get badger options: %w", err)
|
||||
}
|
||||
@ -329,7 +329,7 @@ var datastoreRewriteCmd = &cli.Command{
|
||||
}
|
||||
|
||||
// open the source (from) store.
|
||||
opts, err = repo.BadgerBlockstoreOptions(repo.BlockstoreChain, fromPath, true)
|
||||
opts, err = repo.BadgerBlockstoreOptions(repo.UniversalBlockstore, fromPath, true)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to get badger options: %w", err)
|
||||
}
|
||||
|
@ -72,7 +72,7 @@ var exportChainCmd = &cli.Command{
|
||||
|
||||
defer fi.Close() //nolint:errcheck
|
||||
|
||||
bs, err := lr.Blockstore(ctx, repo.BlockstoreChain)
|
||||
bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open blockstore: %w", err)
|
||||
}
|
||||
|
@ -47,7 +47,7 @@ var importCarCmd = &cli.Command{
|
||||
return xerrors.Errorf("opening the car file: %w", err)
|
||||
}
|
||||
|
||||
bs, err := lr.Blockstore(ctx, repo.BlockstoreChain)
|
||||
bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -118,7 +118,7 @@ var importObjectCmd = &cli.Command{
|
||||
}
|
||||
defer lr.Close() //nolint:errcheck
|
||||
|
||||
bs, err := lr.Blockstore(ctx, repo.BlockstoreChain)
|
||||
bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open blockstore: %w", err)
|
||||
}
|
||||
|
@ -131,7 +131,7 @@ var stateTreePruneCmd = &cli.Command{
|
||||
|
||||
defer lkrepo.Close() //nolint:errcheck
|
||||
|
||||
bs, err := lkrepo.Blockstore(ctx, repo.BlockstoreChain)
|
||||
bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open blockstore: %w", err)
|
||||
}
|
||||
@ -191,7 +191,7 @@ var stateTreePruneCmd = &cli.Command{
|
||||
|
||||
rrLb := abi.ChainEpoch(cctx.Int64("keep-from-lookback"))
|
||||
|
||||
if err := cs.WalkSnapshot(ctx, ts, rrLb, true, func(c cid.Cid) error {
|
||||
if err := cs.WalkSnapshot(ctx, ts, rrLb, true, true, func(c cid.Cid) error {
|
||||
if goodSet.Len()%20 == 0 {
|
||||
fmt.Printf("\renumerating keep set: %d ", goodSet.Len())
|
||||
}
|
||||
|
@ -432,7 +432,7 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool)
|
||||
}
|
||||
defer lr.Close() //nolint:errcheck
|
||||
|
||||
bs, err := lr.Blockstore(ctx, repo.BlockstoreChain)
|
||||
bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to open blockstore: %w", err)
|
||||
}
|
||||
|
@ -311,7 +311,7 @@ FIXME: Maybe mention the `Batching` interface as the developer will stumble upon
|
||||
|
||||
FIXME: IPFS blocks vs Filecoin blocks ideally happens before this / here
|
||||
|
||||
The [`Blockstore` interface](`github.com/filecoin-project/lotus/lib/blockstore.go`) structures the key-value pair
|
||||
The [`Blockstore` interface](`github.com/filecoin-project/lotus/blockstore/blockstore.go`) structures the key-value pair
|
||||
into the CID format for the key and the [`Block` interface](`github.com/ipfs/go-block-format/blocks.go`) for the value.
|
||||
The `Block` value is just a raw string of bytes addressed by its hash, which is included in the CID key.
|
||||
|
||||
|
2
go.mod
2
go.mod
@ -144,7 +144,7 @@ require (
|
||||
go.uber.org/zap v1.16.0
|
||||
golang.org/x/net v0.0.0-20201021035429-f5854403a974
|
||||
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
|
||||
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68
|
||||
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
|
||||
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
|
||||
|
3
go.sum
3
go.sum
@ -1754,8 +1754,9 @@ golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/sys v0.0.0-20200602225109-6fdc65e7d980/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200812155832-6a926be9bd1d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200926100807-9d91bd62050c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA=
|
||||
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 h1:nxC68pudNYkKU6jWhgrqdreuFiOQWj1Fs7T3VrH4Pjw=
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
|
@ -9,10 +9,12 @@ import (
|
||||
"go.opencensus.io/tag"
|
||||
|
||||
rpcmetrics "github.com/filecoin-project/go-jsonrpc/metrics"
|
||||
|
||||
"github.com/filecoin-project/lotus/blockstore"
|
||||
)
|
||||
|
||||
// Distribution
|
||||
var defaultMillisecondsDistribution = view.Distribution(0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000)
|
||||
var defaultMillisecondsDistribution = view.Distribution(0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 3000, 4000, 5000, 7500, 10000, 20000, 50000, 100000)
|
||||
|
||||
// Global Tags
|
||||
var (
|
||||
@ -179,33 +181,37 @@ var (
|
||||
)
|
||||
|
||||
// DefaultViews is an array of OpenCensus views for metric gathering purposes
|
||||
var DefaultViews = append([]*view.View{
|
||||
InfoView,
|
||||
ChainNodeHeightView,
|
||||
ChainNodeHeightExpectedView,
|
||||
ChainNodeWorkerHeightView,
|
||||
BlockReceivedView,
|
||||
BlockValidationFailureView,
|
||||
BlockValidationSuccessView,
|
||||
BlockValidationDurationView,
|
||||
BlockDelayView,
|
||||
MessagePublishedView,
|
||||
MessageReceivedView,
|
||||
MessageValidationFailureView,
|
||||
MessageValidationSuccessView,
|
||||
PeerCountView,
|
||||
PubsubPublishMessageView,
|
||||
PubsubDeliverMessageView,
|
||||
PubsubRejectMessageView,
|
||||
PubsubDuplicateMessageView,
|
||||
PubsubRecvRPCView,
|
||||
PubsubSendRPCView,
|
||||
PubsubDropRPCView,
|
||||
APIRequestDurationView,
|
||||
VMFlushCopyCountView,
|
||||
VMFlushCopyDurationView,
|
||||
},
|
||||
rpcmetrics.DefaultViews...)
|
||||
var DefaultViews = func() []*view.View {
|
||||
views := []*view.View{
|
||||
InfoView,
|
||||
ChainNodeHeightView,
|
||||
ChainNodeHeightExpectedView,
|
||||
ChainNodeWorkerHeightView,
|
||||
BlockReceivedView,
|
||||
BlockValidationFailureView,
|
||||
BlockValidationSuccessView,
|
||||
BlockValidationDurationView,
|
||||
BlockDelayView,
|
||||
MessagePublishedView,
|
||||
MessageReceivedView,
|
||||
MessageValidationFailureView,
|
||||
MessageValidationSuccessView,
|
||||
PeerCountView,
|
||||
PubsubPublishMessageView,
|
||||
PubsubDeliverMessageView,
|
||||
PubsubRejectMessageView,
|
||||
PubsubDuplicateMessageView,
|
||||
PubsubRecvRPCView,
|
||||
PubsubSendRPCView,
|
||||
PubsubDropRPCView,
|
||||
APIRequestDurationView,
|
||||
VMFlushCopyCountView,
|
||||
VMFlushCopyDurationView,
|
||||
}
|
||||
views = append(views, blockstore.DefaultViews...)
|
||||
views = append(views, rpcmetrics.DefaultViews...)
|
||||
return views
|
||||
}()
|
||||
|
||||
// SinceInMilliseconds returns the duration of time since the provide time as a float64.
|
||||
func SinceInMilliseconds(startTime time.Time) float64 {
|
||||
|
@ -145,7 +145,7 @@ const (
|
||||
HeadMetricsKey
|
||||
SettlePaymentChannelsKey
|
||||
RunPeerTaggerKey
|
||||
SetupFallbackBlockstoreKey
|
||||
SetupFallbackBlockstoresKey
|
||||
|
||||
SetApiEndpointKey
|
||||
|
||||
@ -590,12 +590,15 @@ func Repo(r repo.Repo) Option {
|
||||
Override(new(repo.LockedRepo), modules.LockedRepo(lr)), // module handles closing
|
||||
|
||||
Override(new(dtypes.MetadataDS), modules.Datastore),
|
||||
Override(new(dtypes.ChainRawBlockstore), modules.ChainRawBlockstore),
|
||||
Override(new(dtypes.ChainBlockstore), From(new(dtypes.ChainRawBlockstore))),
|
||||
Override(new(dtypes.UniversalBlockstore), modules.UniversalBlockstore),
|
||||
Override(new(dtypes.ChainBlockstore), modules.ChainBlockstore),
|
||||
Override(new(dtypes.StateBlockstore), modules.StateBlockstore),
|
||||
Override(new(dtypes.ExposedBlockstore), From(new(dtypes.UniversalBlockstore))),
|
||||
|
||||
If(os.Getenv("LOTUS_ENABLE_CHAINSTORE_FALLBACK") == "1",
|
||||
Override(new(dtypes.ChainBlockstore), modules.FallbackChainBlockstore),
|
||||
Override(SetupFallbackBlockstoreKey, modules.SetupFallbackBlockstore),
|
||||
Override(new(dtypes.StateBlockstore), modules.FallbackStateBlockstore),
|
||||
Override(SetupFallbackBlockstoresKey, modules.InitFallbackBlockstores),
|
||||
),
|
||||
|
||||
Override(new(dtypes.ClientImportMgr), modules.ClientImportMgr),
|
||||
|
@ -35,6 +35,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/store"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/chain/vm"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
)
|
||||
|
||||
var log = logging.Logger("fullnode")
|
||||
@ -57,6 +58,11 @@ type ChainModule struct {
|
||||
fx.In
|
||||
|
||||
Chain *store.ChainStore
|
||||
|
||||
// ExposedBlockstore is the global monolith blockstore that is safe to
|
||||
// expose externally. In the future, this will be segregated into two
|
||||
// blockstores.
|
||||
ExposedBlockstore dtypes.ExposedBlockstore
|
||||
}
|
||||
|
||||
var _ ChainModuleAPI = (*ChainModule)(nil)
|
||||
@ -68,6 +74,11 @@ type ChainAPI struct {
|
||||
ChainModuleAPI
|
||||
|
||||
Chain *store.ChainStore
|
||||
|
||||
// ExposedBlockstore is the global monolith blockstore that is safe to
|
||||
// expose externally. In the future, this will be segregated into two
|
||||
// blockstores.
|
||||
ExposedBlockstore dtypes.ExposedBlockstore
|
||||
}
|
||||
|
||||
func (m *ChainModule) ChainNotify(ctx context.Context) (<-chan []*api.HeadChange, error) {
|
||||
@ -212,7 +223,7 @@ func (m *ChainModule) ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpo
|
||||
}
|
||||
|
||||
func (m *ChainModule) ChainReadObj(ctx context.Context, obj cid.Cid) ([]byte, error) {
|
||||
blk, err := m.Chain.Blockstore().Get(obj)
|
||||
blk, err := m.ExposedBlockstore.Get(obj)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("blockstore get: %w", err)
|
||||
}
|
||||
@ -221,15 +232,15 @@ func (m *ChainModule) ChainReadObj(ctx context.Context, obj cid.Cid) ([]byte, er
|
||||
}
|
||||
|
||||
func (a *ChainAPI) ChainDeleteObj(ctx context.Context, obj cid.Cid) error {
|
||||
return a.Chain.Blockstore().DeleteBlock(obj)
|
||||
return a.ExposedBlockstore.DeleteBlock(obj)
|
||||
}
|
||||
|
||||
func (m *ChainModule) ChainHasObj(ctx context.Context, obj cid.Cid) (bool, error) {
|
||||
return m.Chain.Blockstore().Has(obj)
|
||||
return m.ExposedBlockstore.Has(obj)
|
||||
}
|
||||
|
||||
func (a *ChainAPI) ChainStatObj(ctx context.Context, obj cid.Cid, base cid.Cid) (api.ObjStat, error) {
|
||||
bs := a.Chain.Blockstore()
|
||||
bs := a.ExposedBlockstore
|
||||
bsvc := blockservice.New(bs, offline.Exchange(bs))
|
||||
|
||||
dag := merkledag.NewDAGService(bsvc)
|
||||
@ -514,7 +525,7 @@ func (a *ChainAPI) ChainGetNode(ctx context.Context, p string) (*api.IpldObject,
|
||||
return nil, xerrors.Errorf("parsing path: %w", err)
|
||||
}
|
||||
|
||||
bs := a.Chain.Blockstore()
|
||||
bs := a.ExposedBlockstore
|
||||
bsvc := blockservice.New(bs, offline.Exchange(bs))
|
||||
|
||||
dag := merkledag.NewDAGService(bsvc)
|
||||
|
@ -97,7 +97,7 @@ func (a *StateAPI) StateMinerSectors(ctx context.Context, addr address.Address,
|
||||
return nil, xerrors.Errorf("failed to load miner actor: %w", err)
|
||||
}
|
||||
|
||||
mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act)
|
||||
mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to load miner actor state: %w", err)
|
||||
}
|
||||
@ -111,7 +111,7 @@ func (a *StateAPI) StateMinerActiveSectors(ctx context.Context, maddr address.Ad
|
||||
return nil, xerrors.Errorf("failed to load miner actor: %w", err)
|
||||
}
|
||||
|
||||
mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act)
|
||||
mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to load miner actor state: %w", err)
|
||||
}
|
||||
@ -135,7 +135,7 @@ func (m *StateModule) StateMinerInfo(ctx context.Context, actor address.Address,
|
||||
return miner.MinerInfo{}, xerrors.Errorf("failed to load miner actor: %w", err)
|
||||
}
|
||||
|
||||
mas, err := miner.Load(m.StateManager.ChainStore().Store(ctx), act)
|
||||
mas, err := miner.Load(m.StateManager.ChainStore().ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return miner.MinerInfo{}, xerrors.Errorf("failed to load miner actor state: %w", err)
|
||||
}
|
||||
@ -153,7 +153,7 @@ func (a *StateAPI) StateMinerDeadlines(ctx context.Context, m address.Address, t
|
||||
return nil, xerrors.Errorf("failed to load miner actor: %w", err)
|
||||
}
|
||||
|
||||
mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act)
|
||||
mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to load miner actor state: %w", err)
|
||||
}
|
||||
@ -192,7 +192,7 @@ func (a *StateAPI) StateMinerPartitions(ctx context.Context, m address.Address,
|
||||
return nil, xerrors.Errorf("failed to load miner actor: %w", err)
|
||||
}
|
||||
|
||||
mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act)
|
||||
mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to load miner actor state: %w", err)
|
||||
}
|
||||
@ -253,7 +253,7 @@ func (m *StateModule) StateMinerProvingDeadline(ctx context.Context, addr addres
|
||||
return nil, xerrors.Errorf("failed to load miner actor: %w", err)
|
||||
}
|
||||
|
||||
mas, err := miner.Load(m.StateManager.ChainStore().Store(ctx), act)
|
||||
mas, err := miner.Load(m.StateManager.ChainStore().ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to load miner actor state: %w", err)
|
||||
}
|
||||
@ -272,7 +272,7 @@ func (a *StateAPI) StateMinerFaults(ctx context.Context, addr address.Address, t
|
||||
return bitfield.BitField{}, xerrors.Errorf("failed to load miner actor: %w", err)
|
||||
}
|
||||
|
||||
mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act)
|
||||
mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return bitfield.BitField{}, xerrors.Errorf("failed to load miner actor state: %w", err)
|
||||
}
|
||||
@ -329,7 +329,7 @@ func (a *StateAPI) StateMinerRecoveries(ctx context.Context, addr address.Addres
|
||||
return bitfield.BitField{}, xerrors.Errorf("failed to load miner actor: %w", err)
|
||||
}
|
||||
|
||||
mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act)
|
||||
mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return bitfield.BitField{}, xerrors.Errorf("failed to load miner actor state: %w", err)
|
||||
}
|
||||
@ -461,7 +461,7 @@ func (a *StateAPI) StateReadState(ctx context.Context, actor address.Address, ts
|
||||
return nil, xerrors.Errorf("getting actor: %w", err)
|
||||
}
|
||||
|
||||
blk, err := a.Chain.Blockstore().Get(act.Head)
|
||||
blk, err := a.Chain.StateBlockstore().Get(act.Head)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("getting actor head: %w", err)
|
||||
}
|
||||
@ -707,7 +707,7 @@ func (m *StateModule) StateMarketStorageDeal(ctx context.Context, dealId abi.Dea
|
||||
}
|
||||
|
||||
func (a *StateAPI) StateChangedActors(ctx context.Context, old cid.Cid, new cid.Cid) (map[string]types.Actor, error) {
|
||||
store := a.Chain.Store(ctx)
|
||||
store := a.Chain.ActorStore(ctx)
|
||||
|
||||
oldTree, err := state.LoadStateTree(store, old)
|
||||
if err != nil {
|
||||
@ -727,7 +727,7 @@ func (a *StateAPI) StateMinerSectorCount(ctx context.Context, addr address.Addre
|
||||
if err != nil {
|
||||
return api.MinerSectors{}, err
|
||||
}
|
||||
mas, err := miner.Load(a.Chain.Store(ctx), act)
|
||||
mas, err := miner.Load(a.Chain.ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return api.MinerSectors{}, err
|
||||
}
|
||||
@ -792,7 +792,7 @@ func (a *StateAPI) StateSectorExpiration(ctx context.Context, maddr address.Addr
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act)
|
||||
mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -804,7 +804,7 @@ func (a *StateAPI) StateSectorPartition(ctx context.Context, maddr address.Addre
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act)
|
||||
mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -890,7 +890,7 @@ func (m *StateModule) MsigGetAvailableBalance(ctx context.Context, addr address.
|
||||
if err != nil {
|
||||
return types.EmptyInt, xerrors.Errorf("failed to load multisig actor: %w", err)
|
||||
}
|
||||
msas, err := multisig.Load(m.Chain.Store(ctx), act)
|
||||
msas, err := multisig.Load(m.Chain.ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return types.EmptyInt, xerrors.Errorf("failed to load multisig actor state: %w", err)
|
||||
}
|
||||
@ -912,7 +912,7 @@ func (a *StateAPI) MsigGetVestingSchedule(ctx context.Context, addr address.Addr
|
||||
return api.EmptyVesting, xerrors.Errorf("failed to load multisig actor: %w", err)
|
||||
}
|
||||
|
||||
msas, err := multisig.Load(a.Chain.Store(ctx), act)
|
||||
msas, err := multisig.Load(a.Chain.ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return api.EmptyVesting, xerrors.Errorf("failed to load multisig actor state: %w", err)
|
||||
}
|
||||
@ -961,7 +961,7 @@ func (m *StateModule) MsigGetVested(ctx context.Context, addr address.Address, s
|
||||
return types.EmptyInt, xerrors.Errorf("failed to load multisig actor at end epoch: %w", err)
|
||||
}
|
||||
|
||||
msas, err := multisig.Load(m.Chain.Store(ctx), act)
|
||||
msas, err := multisig.Load(m.Chain.ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return types.EmptyInt, xerrors.Errorf("failed to load multisig actor state: %w", err)
|
||||
}
|
||||
@ -989,7 +989,7 @@ func (m *StateModule) MsigGetPending(ctx context.Context, addr address.Address,
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to load multisig actor: %w", err)
|
||||
}
|
||||
msas, err := multisig.Load(m.Chain.Store(ctx), act)
|
||||
msas, err := multisig.Load(m.Chain.ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to load multisig actor state: %w", err)
|
||||
}
|
||||
@ -1032,7 +1032,7 @@ func (a *StateAPI) StateMinerPreCommitDepositForPower(ctx context.Context, maddr
|
||||
return types.EmptyInt, xerrors.Errorf("failed to get resolve size: %w", err)
|
||||
}
|
||||
|
||||
store := a.Chain.Store(ctx)
|
||||
store := a.Chain.ActorStore(ctx)
|
||||
|
||||
var sectorWeight abi.StoragePower
|
||||
if act, err := state.GetActor(market.Address); err != nil {
|
||||
@ -1093,7 +1093,7 @@ func (a *StateAPI) StateMinerInitialPledgeCollateral(ctx context.Context, maddr
|
||||
return types.EmptyInt, xerrors.Errorf("failed to get resolve size: %w", err)
|
||||
}
|
||||
|
||||
store := a.Chain.Store(ctx)
|
||||
store := a.Chain.ActorStore(ctx)
|
||||
|
||||
var sectorWeight abi.StoragePower
|
||||
if act, err := state.GetActor(market.Address); err != nil {
|
||||
@ -1164,7 +1164,7 @@ func (a *StateAPI) StateMinerAvailableBalance(ctx context.Context, maddr address
|
||||
return types.EmptyInt, xerrors.Errorf("failed to load miner actor: %w", err)
|
||||
}
|
||||
|
||||
mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act)
|
||||
mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return types.EmptyInt, xerrors.Errorf("failed to load miner actor state: %w", err)
|
||||
}
|
||||
@ -1193,7 +1193,7 @@ func (a *StateAPI) StateMinerSectorAllocated(ctx context.Context, maddr address.
|
||||
return false, xerrors.Errorf("failed to load miner actor: %w", err)
|
||||
}
|
||||
|
||||
mas, err := miner.Load(a.StateManager.ChainStore().Store(ctx), act)
|
||||
mas, err := miner.Load(a.StateManager.ChainStore().ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("failed to load miner actor state: %w", err)
|
||||
}
|
||||
@ -1216,7 +1216,7 @@ func (a *StateAPI) StateVerifierStatus(ctx context.Context, addr address.Address
|
||||
return nil, err
|
||||
}
|
||||
|
||||
vrs, err := verifreg.Load(a.StateManager.ChainStore().Store(ctx), act)
|
||||
vrs, err := verifreg.Load(a.StateManager.ChainStore().ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to load verified registry state: %w", err)
|
||||
}
|
||||
@ -1247,7 +1247,7 @@ func (m *StateModule) StateVerifiedClientStatus(ctx context.Context, addr addres
|
||||
return nil, err
|
||||
}
|
||||
|
||||
vrs, err := verifreg.Load(m.StateManager.ChainStore().Store(ctx), act)
|
||||
vrs, err := verifreg.Load(m.StateManager.ChainStore().ActorStore(ctx), act)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to load verified registry state: %w", err)
|
||||
}
|
||||
@ -1269,7 +1269,7 @@ func (a *StateAPI) StateVerifiedRegistryRootKey(ctx context.Context, tsk types.T
|
||||
return address.Undef, err
|
||||
}
|
||||
|
||||
vst, err := verifreg.Load(a.StateManager.ChainStore().Store(ctx), vact)
|
||||
vst, err := verifreg.Load(a.StateManager.ChainStore().ActorStore(ctx), vact)
|
||||
if err != nil {
|
||||
return address.Undef, err
|
||||
}
|
||||
@ -1298,12 +1298,12 @@ func (m *StateModule) StateDealProviderCollateralBounds(ctx context.Context, siz
|
||||
return api.DealCollateralBounds{}, xerrors.Errorf("failed to load reward actor: %w", err)
|
||||
}
|
||||
|
||||
pst, err := power.Load(m.StateManager.ChainStore().Store(ctx), pact)
|
||||
pst, err := power.Load(m.StateManager.ChainStore().ActorStore(ctx), pact)
|
||||
if err != nil {
|
||||
return api.DealCollateralBounds{}, xerrors.Errorf("failed to load power actor state: %w", err)
|
||||
}
|
||||
|
||||
rst, err := reward.Load(m.StateManager.ChainStore().Store(ctx), ract)
|
||||
rst, err := reward.Load(m.StateManager.ChainStore().ActorStore(ctx), ract)
|
||||
if err != nil {
|
||||
return api.DealCollateralBounds{}, xerrors.Errorf("failed to load reward actor state: %w", err)
|
||||
}
|
||||
|
65
node/modules/blockstore.go
Normal file
65
node/modules/blockstore.go
Normal file
@ -0,0 +1,65 @@
|
||||
package modules
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
bstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
"go.uber.org/fx"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/blockstore"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
)
|
||||
|
||||
// UniversalBlockstore returns a single universal blockstore that stores both
|
||||
// chain data and state data.
|
||||
func UniversalBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.UniversalBlockstore, error) {
|
||||
bs, err := r.Blockstore(helpers.LifecycleCtx(mctx, lc), repo.UniversalBlockstore)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if c, ok := bs.(io.Closer); ok {
|
||||
lc.Append(fx.Hook{
|
||||
OnStop: func(_ context.Context) error {
|
||||
return c.Close()
|
||||
},
|
||||
})
|
||||
}
|
||||
return bs, err
|
||||
}
|
||||
|
||||
// StateBlockstore is a hook to overlay caches for state objects, or in the
|
||||
// future, to segregate the universal blockstore into different physical state
|
||||
// and chain stores.
|
||||
func StateBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.UniversalBlockstore) (dtypes.StateBlockstore, error) {
|
||||
return bs, nil
|
||||
}
|
||||
|
||||
// ChainBlockstore is a hook to overlay caches for state objects, or in the
|
||||
// future, to segregate the universal blockstore into different physical state
|
||||
// and chain stores.
|
||||
func ChainBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.UniversalBlockstore) (dtypes.ChainBlockstore, error) {
|
||||
return bs, nil
|
||||
}
|
||||
|
||||
func FallbackChainBlockstore(cbs dtypes.ChainBlockstore) dtypes.ChainBlockstore {
|
||||
return &blockstore.FallbackStore{Blockstore: cbs}
|
||||
}
|
||||
|
||||
func FallbackStateBlockstore(sbs dtypes.StateBlockstore) dtypes.StateBlockstore {
|
||||
return &blockstore.FallbackStore{Blockstore: sbs}
|
||||
}
|
||||
|
||||
func InitFallbackBlockstores(cbs dtypes.ChainBlockstore, sbs dtypes.StateBlockstore, rem dtypes.ChainBitswap) error {
|
||||
for _, bs := range []bstore.Blockstore{cbs, sbs} {
|
||||
if fbs, ok := bs.(*blockstore.FallbackStore); ok {
|
||||
fbs.SetFallback(rem.GetBlock)
|
||||
continue
|
||||
}
|
||||
return xerrors.Errorf("expected a FallbackStore")
|
||||
}
|
||||
return nil
|
||||
}
|
@ -1,25 +1,18 @@
|
||||
package modules
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/go-bitswap"
|
||||
"github.com/ipfs/go-bitswap/network"
|
||||
"github.com/ipfs/go-blockservice"
|
||||
"github.com/ipfs/go-datastore"
|
||||
"github.com/ipld/go-car"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/routing"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
"go.uber.org/fx"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
||||
"github.com/filecoin-project/lotus/journal"
|
||||
|
||||
"github.com/filecoin-project/lotus/blockstore"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain"
|
||||
@ -29,14 +22,15 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/messagepool"
|
||||
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||
"github.com/filecoin-project/lotus/chain/store"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/chain/vm"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
||||
"github.com/filecoin-project/lotus/journal"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
)
|
||||
|
||||
func ChainBitswap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.Routing, bs dtypes.ChainBlockstore) dtypes.ChainBitswap {
|
||||
// ChainBitswap uses a blockstore that bypasses all caches.
|
||||
func ChainBitswap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.Routing, bs dtypes.ExposedBlockstore) dtypes.ChainBitswap {
|
||||
// prefix protocol for chain bitswap
|
||||
// (so bitswap uses /chain/ipfs/bitswap/1.0.0 internally for chain sync stuff)
|
||||
bitswapNetwork := network.NewFromIpfsHost(host, rt, network.Prefix("/chain"))
|
||||
@ -60,6 +54,10 @@ func ChainBitswap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt r
|
||||
return exch
|
||||
}
|
||||
|
||||
func ChainBlockService(bs dtypes.ExposedBlockstore, rem dtypes.ChainBitswap) dtypes.ChainBlockService {
|
||||
return blockservice.New(bs, rem)
|
||||
}
|
||||
|
||||
func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS, nn dtypes.NetworkName, j journal.Journal) (*messagepool.MessagePool, error) {
|
||||
mpp := messagepool.NewProvider(sm, ps)
|
||||
mp, err := messagepool.New(mpp, ds, nn, j)
|
||||
@ -74,43 +72,8 @@ func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds
|
||||
return mp, nil
|
||||
}
|
||||
|
||||
func ChainRawBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.ChainRawBlockstore, error) {
|
||||
bs, err := r.Blockstore(helpers.LifecycleCtx(mctx, lc), repo.BlockstoreChain)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// TODO potentially replace this cached blockstore by a CBOR cache.
|
||||
cbs, err := blockstore.CachedBlockstore(helpers.LifecycleCtx(mctx, lc), bs, blockstore.DefaultCacheOpts())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return cbs, nil
|
||||
}
|
||||
|
||||
func ChainBlockService(bs dtypes.ChainRawBlockstore, rem dtypes.ChainBitswap) dtypes.ChainBlockService {
|
||||
return blockservice.New(bs, rem)
|
||||
}
|
||||
|
||||
func FallbackChainBlockstore(rbs dtypes.ChainRawBlockstore) dtypes.ChainBlockstore {
|
||||
return &blockstore.FallbackStore{
|
||||
Blockstore: rbs,
|
||||
}
|
||||
}
|
||||
|
||||
func SetupFallbackBlockstore(cbs dtypes.ChainBlockstore, rem dtypes.ChainBitswap) error {
|
||||
fbs, ok := cbs.(*blockstore.FallbackStore)
|
||||
if !ok {
|
||||
return xerrors.Errorf("expected a FallbackStore")
|
||||
}
|
||||
|
||||
fbs.SetFallback(rem.GetBlock)
|
||||
return nil
|
||||
}
|
||||
|
||||
func ChainStore(lc fx.Lifecycle, bs dtypes.ChainBlockstore, lbs dtypes.ChainRawBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore {
|
||||
chain := store.NewChainStore(bs, lbs, ds, syscalls, j)
|
||||
func ChainStore(lc fx.Lifecycle, cbs dtypes.ChainBlockstore, sbs dtypes.StateBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore {
|
||||
chain := store.NewChainStore(cbs, sbs, ds, syscalls, j)
|
||||
|
||||
if err := chain.Load(); err != nil {
|
||||
log.Warnf("loading chain state from disk: %s", err)
|
||||
@ -125,65 +88,6 @@ func ChainStore(lc fx.Lifecycle, bs dtypes.ChainBlockstore, lbs dtypes.ChainRawB
|
||||
return chain
|
||||
}
|
||||
|
||||
func ErrorGenesis() Genesis {
|
||||
return func() (header *types.BlockHeader, e error) {
|
||||
return nil, xerrors.New("No genesis block provided, provide the file with 'lotus daemon --genesis=[genesis file]'")
|
||||
}
|
||||
}
|
||||
|
||||
func LoadGenesis(genBytes []byte) func(dtypes.ChainBlockstore) Genesis {
|
||||
return func(bs dtypes.ChainBlockstore) Genesis {
|
||||
return func() (header *types.BlockHeader, e error) {
|
||||
c, err := car.LoadCar(bs, bytes.NewReader(genBytes))
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("loading genesis car file failed: %w", err)
|
||||
}
|
||||
if len(c.Roots) != 1 {
|
||||
return nil, xerrors.New("expected genesis file to have one root")
|
||||
}
|
||||
root, err := bs.Get(c.Roots[0])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
h, err := types.DecodeBlock(root.RawData())
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("decoding block failed: %w", err)
|
||||
}
|
||||
return h, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func DoSetGenesis(_ dtypes.AfterGenesisSet) {}
|
||||
|
||||
func SetGenesis(cs *store.ChainStore, g Genesis) (dtypes.AfterGenesisSet, error) {
|
||||
genFromRepo, err := cs.GetGenesis()
|
||||
if err == nil {
|
||||
if os.Getenv("LOTUS_SKIP_GENESIS_CHECK") != "_yes_" {
|
||||
expectedGenesis, err := g()
|
||||
if err != nil {
|
||||
return dtypes.AfterGenesisSet{}, xerrors.Errorf("getting expected genesis failed: %w", err)
|
||||
}
|
||||
|
||||
if genFromRepo.Cid() != expectedGenesis.Cid() {
|
||||
return dtypes.AfterGenesisSet{}, xerrors.Errorf("genesis in the repo is not the one expected by this version of Lotus!")
|
||||
}
|
||||
}
|
||||
return dtypes.AfterGenesisSet{}, nil // already set, noop
|
||||
}
|
||||
if err != datastore.ErrNotFound {
|
||||
return dtypes.AfterGenesisSet{}, xerrors.Errorf("getting genesis block failed: %w", err)
|
||||
}
|
||||
|
||||
genesis, err := g()
|
||||
if err != nil {
|
||||
return dtypes.AfterGenesisSet{}, xerrors.Errorf("genesis func failed: %w", err)
|
||||
}
|
||||
|
||||
return dtypes.AfterGenesisSet{}, cs.SetGenesis(genesis)
|
||||
}
|
||||
|
||||
func NetworkName(mctx helpers.MetricsCtx, lc fx.Lifecycle, cs *store.ChainStore, us stmgr.UpgradeSchedule, _ dtypes.AfterGenesisSet) (dtypes.NetworkName, error) {
|
||||
if !build.Devnet {
|
||||
return "testnetnet", nil
|
||||
|
@ -83,7 +83,7 @@ func ClientMultiDatastore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.Locke
|
||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||
ds, err := r.Datastore(ctx, "/client")
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("getting datastore out of reop: %w", err)
|
||||
return nil, xerrors.Errorf("getting datastore out of repo: %w", err)
|
||||
}
|
||||
|
||||
mds, err := multistore.NewMultiDstore(ds)
|
||||
|
@ -19,19 +19,41 @@ import (
|
||||
"github.com/filecoin-project/lotus/node/repo/retrievalstoremgr"
|
||||
)
|
||||
|
||||
// MetadataDS stores metadata
|
||||
// dy default it's namespaced under /metadata in main repo datastore
|
||||
// MetadataDS stores metadata. By default it's namespaced under /metadata in
|
||||
// main repo datastore.
|
||||
type MetadataDS datastore.Batching
|
||||
|
||||
type ChainRawBlockstore blockstore.Blockstore
|
||||
type ChainBlockstore blockstore.Blockstore // optionally bitswap backed
|
||||
type (
|
||||
// UniversalBlockstore is the cold blockstore.
|
||||
UniversalBlockstore blockstore.Blockstore
|
||||
|
||||
// ChainBlockstore is a blockstore to store chain data (tipsets, blocks,
|
||||
// messages). It is physically backed by the BareMonolithBlockstore, but it
|
||||
// has a cache on top that is specially tuned for chain data access
|
||||
// patterns.
|
||||
ChainBlockstore blockstore.Blockstore
|
||||
|
||||
// StateBlockstore is a blockstore to store state data (state tree). It is
|
||||
// physically backed by the BareMonolithBlockstore, but it has a cache on
|
||||
// top that is specially tuned for state data access patterns.
|
||||
StateBlockstore blockstore.Blockstore
|
||||
|
||||
// ExposedBlockstore is a blockstore that interfaces directly with the
|
||||
// network or with users, from which queries are served, and where incoming
|
||||
// data is deposited. For security reasons, this store is disconnected from
|
||||
// any internal caches. If blocks are added to this store in a way that
|
||||
// could render caches dirty (e.g. a block is added when an existence cache
|
||||
// holds a 'false' for that block), the process should signal so by calling
|
||||
// blockstore.AllCaches.Dirty(cid).
|
||||
ExposedBlockstore blockstore.Blockstore
|
||||
)
|
||||
|
||||
type ChainBitswap exchange.Interface
|
||||
type ChainBlockService bserv.BlockService
|
||||
|
||||
type ClientMultiDstore *multistore.MultiStore
|
||||
type ClientImportMgr *importmgr.Mgr
|
||||
type ClientBlockstore blockstore.Blockstore
|
||||
type ClientBlockstore blockstore.BasicBlockstore
|
||||
type ClientDealStore *statestore.StateStore
|
||||
type ClientRequestValidator *requestvalidation.UnifiedRequestValidator
|
||||
type ClientDatastore datastore.Batching
|
||||
@ -50,6 +72,6 @@ type ProviderRequestValidator *requestvalidation.UnifiedRequestValidator
|
||||
type ProviderDataTransfer datatransfer.Manager
|
||||
|
||||
type StagingDAG format.DAGService
|
||||
type StagingBlockstore blockstore.Blockstore
|
||||
type StagingBlockstore blockstore.BasicBlockstore
|
||||
type StagingGraphsync graphsync.GraphExchange
|
||||
type StagingMultiDstore *multistore.MultiStore
|
||||
|
73
node/modules/genesis.go
Normal file
73
node/modules/genesis.go
Normal file
@ -0,0 +1,73 @@
|
||||
package modules
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"os"
|
||||
|
||||
"github.com/ipfs/go-datastore"
|
||||
"github.com/ipld/go-car"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/store"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
)
|
||||
|
||||
func ErrorGenesis() Genesis {
|
||||
return func() (header *types.BlockHeader, e error) {
|
||||
return nil, xerrors.New("No genesis block provided, provide the file with 'lotus daemon --genesis=[genesis file]'")
|
||||
}
|
||||
}
|
||||
|
||||
func LoadGenesis(genBytes []byte) func(dtypes.ChainBlockstore) Genesis {
|
||||
return func(bs dtypes.ChainBlockstore) Genesis {
|
||||
return func() (header *types.BlockHeader, e error) {
|
||||
c, err := car.LoadCar(bs, bytes.NewReader(genBytes))
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("loading genesis car file failed: %w", err)
|
||||
}
|
||||
if len(c.Roots) != 1 {
|
||||
return nil, xerrors.New("expected genesis file to have one root")
|
||||
}
|
||||
root, err := bs.Get(c.Roots[0])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
h, err := types.DecodeBlock(root.RawData())
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("decoding block failed: %w", err)
|
||||
}
|
||||
return h, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func DoSetGenesis(_ dtypes.AfterGenesisSet) {}
|
||||
|
||||
func SetGenesis(cs *store.ChainStore, g Genesis) (dtypes.AfterGenesisSet, error) {
|
||||
genFromRepo, err := cs.GetGenesis()
|
||||
if err == nil {
|
||||
if os.Getenv("LOTUS_SKIP_GENESIS_CHECK") != "_yes_" {
|
||||
expectedGenesis, err := g()
|
||||
if err != nil {
|
||||
return dtypes.AfterGenesisSet{}, xerrors.Errorf("getting expected genesis failed: %w", err)
|
||||
}
|
||||
|
||||
if genFromRepo.Cid() != expectedGenesis.Cid() {
|
||||
return dtypes.AfterGenesisSet{}, xerrors.Errorf("genesis in the repo is not the one expected by this version of Lotus!")
|
||||
}
|
||||
}
|
||||
return dtypes.AfterGenesisSet{}, nil // already set, noop
|
||||
}
|
||||
if err != datastore.ErrNotFound {
|
||||
return dtypes.AfterGenesisSet{}, xerrors.Errorf("getting genesis block failed: %w", err)
|
||||
}
|
||||
|
||||
genesis, err := g()
|
||||
if err != nil {
|
||||
return dtypes.AfterGenesisSet{}, xerrors.Errorf("genesis func failed: %w", err)
|
||||
}
|
||||
|
||||
return dtypes.AfterGenesisSet{}, cs.SetGenesis(genesis)
|
||||
}
|
@ -14,8 +14,8 @@ import (
|
||||
)
|
||||
|
||||
// Graphsync creates a graphsync instance from the given loader and storer
|
||||
func Graphsync(parallelTransfers uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ChainBlockstore, h host.Host) (dtypes.Graphsync, error) {
|
||||
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ChainBlockstore, h host.Host) (dtypes.Graphsync, error) {
|
||||
func Graphsync(parallelTransfers uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ExposedBlockstore, h host.Host) (dtypes.Graphsync, error) {
|
||||
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ExposedBlockstore, h host.Host) (dtypes.Graphsync, error) {
|
||||
graphsyncNetwork := gsnet.NewFromLibp2pHost(h)
|
||||
loader := storeutil.LoaderForBlockstore(clientBs)
|
||||
storer := storeutil.StorerForBlockstore(clientBs)
|
||||
|
@ -18,7 +18,7 @@ import (
|
||||
func IpfsClientBlockstore(ipfsMaddr string, onlineMode bool) func(helpers.MetricsCtx, fx.Lifecycle, dtypes.ClientImportMgr) (dtypes.ClientBlockstore, error) {
|
||||
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, localStore dtypes.ClientImportMgr) (dtypes.ClientBlockstore, error) {
|
||||
var err error
|
||||
var ipfsbs blockstore.Blockstore
|
||||
var ipfsbs blockstore.BasicBlockstore
|
||||
if ipfsMaddr != "" {
|
||||
var ma multiaddr.Multiaddr
|
||||
ma, err = multiaddr.NewMultiaddr(ipfsMaddr)
|
||||
|
@ -5,10 +5,6 @@ import badgerbs "github.com/filecoin-project/lotus/blockstore/badger"
|
||||
// BadgerBlockstoreOptions returns the badger options to apply for the provided
|
||||
// domain.
|
||||
func BadgerBlockstoreOptions(domain BlockstoreDomain, path string, readonly bool) (badgerbs.Options, error) {
|
||||
if domain != BlockstoreChain {
|
||||
return badgerbs.Options{}, ErrInvalidBlockstoreDomain
|
||||
}
|
||||
|
||||
opts := badgerbs.DefaultOptions(path)
|
||||
|
||||
// Due to legacy usage of blockstore.Blockstore, over a datastore, all
|
||||
|
@ -13,7 +13,7 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/BurntSushi/toml"
|
||||
"github.com/filecoin-project/lotus/blockstore"
|
||||
|
||||
"github.com/ipfs/go-datastore"
|
||||
fslock "github.com/ipfs/go-fs-lock"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
@ -22,7 +22,7 @@ import (
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
lblockstore "github.com/filecoin-project/lotus/blockstore"
|
||||
"github.com/filecoin-project/lotus/blockstore"
|
||||
badgerbs "github.com/filecoin-project/lotus/blockstore/badger"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
||||
@ -264,11 +264,18 @@ type fsLockedRepo struct {
|
||||
bs blockstore.Blockstore
|
||||
bsErr error
|
||||
bsOnce sync.Once
|
||||
ssPath string
|
||||
ssErr error
|
||||
ssOnce sync.Once
|
||||
|
||||
storageLk sync.Mutex
|
||||
configLk sync.Mutex
|
||||
}
|
||||
|
||||
func (fsr *fsLockedRepo) Readonly() bool {
|
||||
return fsr.readonly
|
||||
}
|
||||
|
||||
func (fsr *fsLockedRepo) Path() string {
|
||||
return fsr.path
|
||||
}
|
||||
@ -301,7 +308,7 @@ func (fsr *fsLockedRepo) Close() error {
|
||||
|
||||
// Blockstore returns a blockstore for the provided data domain.
|
||||
func (fsr *fsLockedRepo) Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, error) {
|
||||
if domain != BlockstoreChain {
|
||||
if domain != UniversalBlockstore {
|
||||
return nil, ErrInvalidBlockstoreDomain
|
||||
}
|
||||
|
||||
@ -325,12 +332,27 @@ func (fsr *fsLockedRepo) Blockstore(ctx context.Context, domain BlockstoreDomain
|
||||
fsr.bsErr = err
|
||||
return
|
||||
}
|
||||
fsr.bs = lblockstore.WrapIDStore(bs)
|
||||
fsr.bs = blockstore.WrapIDStore(bs)
|
||||
})
|
||||
|
||||
return fsr.bs, fsr.bsErr
|
||||
}
|
||||
|
||||
func (fsr *fsLockedRepo) SplitstorePath() (string, error) {
|
||||
fsr.ssOnce.Do(func() {
|
||||
path := fsr.join(filepath.Join(fsDatastore, "splitstore"))
|
||||
|
||||
if err := os.MkdirAll(path, 0755); err != nil {
|
||||
fsr.ssErr = err
|
||||
return
|
||||
}
|
||||
|
||||
fsr.ssPath = path
|
||||
})
|
||||
|
||||
return fsr.ssPath, fsr.ssErr
|
||||
}
|
||||
|
||||
// join joins path elements with fsr.path
|
||||
func (fsr *fsLockedRepo) join(paths ...string) string {
|
||||
return filepath.Join(append([]string{fsr.path}, paths...)...)
|
||||
|
@ -16,7 +16,7 @@ type Mgr struct {
|
||||
mds *multistore.MultiStore
|
||||
ds datastore.Batching
|
||||
|
||||
Blockstore blockstore.Blockstore
|
||||
Blockstore blockstore.BasicBlockstore
|
||||
}
|
||||
|
||||
type Label string
|
||||
|
@ -4,10 +4,10 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"github.com/filecoin-project/lotus/blockstore"
|
||||
"github.com/ipfs/go-datastore"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
|
||||
"github.com/filecoin-project/lotus/blockstore"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
||||
|
||||
@ -18,11 +18,11 @@ import (
|
||||
type BlockstoreDomain string
|
||||
|
||||
const (
|
||||
// BlockstoreChain represents the blockstore domain for chain data.
|
||||
// UniversalBlockstore represents the blockstore domain for all data.
|
||||
// Right now, this includes chain objects (tipsets, blocks, messages), as
|
||||
// well as state. In the future, they may get segregated into different
|
||||
// domains.
|
||||
BlockstoreChain = BlockstoreDomain("chain")
|
||||
UniversalBlockstore = BlockstoreDomain("universal")
|
||||
)
|
||||
|
||||
var (
|
||||
@ -63,6 +63,9 @@ type LockedRepo interface {
|
||||
// the lifecycle.
|
||||
Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, error)
|
||||
|
||||
// SplitstorePath returns the path for the SplitStore
|
||||
SplitstorePath() (string, error)
|
||||
|
||||
// Returns config in this repo
|
||||
Config() (interface{}, error)
|
||||
SetConfig(func(interface{})) error
|
||||
@ -84,4 +87,7 @@ type LockedRepo interface {
|
||||
|
||||
// Path returns absolute path of the repo
|
||||
Path() string
|
||||
|
||||
// Readonly returns true if the repo is readonly
|
||||
Readonly() bool
|
||||
}
|
||||
|
@ -201,6 +201,10 @@ func (mem *MemRepo) Lock(t RepoType) (LockedRepo, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (lmem *lockedMemRepo) Readonly() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (lmem *lockedMemRepo) checkToken() error {
|
||||
lmem.RLock()
|
||||
defer lmem.RUnlock()
|
||||
@ -246,12 +250,16 @@ func (lmem *lockedMemRepo) Datastore(_ context.Context, ns string) (datastore.Ba
|
||||
}
|
||||
|
||||
func (lmem *lockedMemRepo) Blockstore(ctx context.Context, domain BlockstoreDomain) (blockstore.Blockstore, error) {
|
||||
if domain != BlockstoreChain {
|
||||
if domain != UniversalBlockstore {
|
||||
return nil, ErrInvalidBlockstoreDomain
|
||||
}
|
||||
return lmem.mem.blockstore, nil
|
||||
}
|
||||
|
||||
func (lmem *lockedMemRepo) SplitstorePath() (string, error) {
|
||||
return ioutil.TempDir("", "splitstore.*")
|
||||
}
|
||||
|
||||
func (lmem *lockedMemRepo) ListDatastores(ns string) ([]int64, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
@ -73,13 +73,13 @@ func (mrs *multiStoreRetrievalStore) DAGService() ipldformat.DAGService {
|
||||
|
||||
// BlockstoreRetrievalStoreManager manages a single blockstore as if it were multiple stores
|
||||
type BlockstoreRetrievalStoreManager struct {
|
||||
bs blockstore.Blockstore
|
||||
bs blockstore.BasicBlockstore
|
||||
}
|
||||
|
||||
var _ RetrievalStoreManager = &BlockstoreRetrievalStoreManager{}
|
||||
|
||||
// NewBlockstoreRetrievalStoreManager returns a new blockstore based RetrievalStoreManager
|
||||
func NewBlockstoreRetrievalStoreManager(bs blockstore.Blockstore) RetrievalStoreManager {
|
||||
func NewBlockstoreRetrievalStoreManager(bs blockstore.BasicBlockstore) RetrievalStoreManager {
|
||||
return &BlockstoreRetrievalStoreManager{
|
||||
bs: bs,
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user