remove dependency from blockstore/splitstore => chain/store.

This commit is contained in:
Raúl Kripalani 2021-03-01 17:38:02 +00:00 committed by vyzo
parent b9400c590f
commit b1b452bc0f
4 changed files with 36 additions and 22 deletions

View File

@ -19,7 +19,6 @@ import (
bstore "github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
)
@ -59,6 +58,15 @@ type Config struct {
Archival bool
}
// ChainAccessor allows the Splitstore to access the chain. It will most likely
// be a ChainStore at runtime.
type ChainAccessor interface {
GetTipsetByHeight(context.Context, abi.ChainEpoch, *types.TipSet, bool) (*types.TipSet, error)
GetHeaviestTipSet() *types.TipSet
SubscribeHeadChanges(change func(revert []*types.TipSet, apply []*types.TipSet) error)
WalkSnapshot(context.Context, *types.TipSet, abi.ChainEpoch, bool, bool, func(cid.Cid) error) error
}
type SplitStore struct {
compacting int32
@ -73,7 +81,7 @@ type SplitStore struct {
mx sync.Mutex
curTs *types.TipSet
cs *store.ChainStore
chain ChainAccessor
ds dstore.Datastore
hot bstore.Blockstore
cold bstore.Blockstore
@ -260,9 +268,9 @@ func (s *SplitStore) View(cid cid.Cid, cb func([]byte) error) error {
}
// State tracking
func (s *SplitStore) Start(cs *store.ChainStore) error {
s.cs = cs
s.curTs = cs.GetHeaviestTipSet()
func (s *SplitStore) Start(chain ChainAccessor) error {
s.chain = chain
s.curTs = chain.GetHeaviestTipSet()
// load base epoch from metadata ds
// if none, then use current epoch because it's a fresh start
@ -301,7 +309,7 @@ func (s *SplitStore) Start(cs *store.ChainStore) error {
}
// watch the chain
cs.SubscribeHeadChanges(s.HeadChange)
chain.SubscribeHeadChanges(s.HeadChange)
return nil
}
@ -317,7 +325,7 @@ func (s *SplitStore) Close() error {
return s.env.Close()
}
func (s *SplitStore) HeadChange(revert, apply []*types.TipSet) error {
func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
s.mx.Lock()
curTs := apply[len(apply)-1]
epoch := curTs.Height()
@ -432,7 +440,7 @@ func (s *SplitStore) compact(curTs *types.TipSet) {
func (s *SplitStore) estimateLiveSetSize(curTs *types.TipSet) {
s.liveSetSize = 0
err := s.cs.WalkSnapshot(context.Background(), curTs, 1, s.skipOldMsgs, s.skipMsgReceipts,
err := s.chain.WalkSnapshot(context.Background(), curTs, 1, s.skipOldMsgs, s.skipMsgReceipts,
func(cid cid.Cid) error {
s.liveSetSize++
return nil
@ -460,14 +468,14 @@ func (s *SplitStore) compactSimple(curTs *types.TipSet) {
log.Info("marking reachable cold objects")
startMark := time.Now()
coldTs, err := s.cs.GetTipsetByHeight(context.Background(), coldEpoch, curTs, true)
coldTs, err := s.chain.GetTipsetByHeight(context.Background(), coldEpoch, curTs, true)
if err != nil {
// TODO do something better here
panic(err)
}
count := int64(0)
err = s.cs.WalkSnapshot(context.Background(), coldTs, 1, s.skipOldMsgs, s.skipMsgReceipts,
err = s.chain.WalkSnapshot(context.Background(), coldTs, 1, s.skipOldMsgs, s.skipMsgReceipts,
func(cid cid.Cid) error {
count++
return coldSet.Mark(cid)
@ -646,7 +654,7 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) {
// Phase 1a: mark all reachable CIDs in the hot range
count := int64(0)
err = s.cs.WalkSnapshot(context.Background(), curTs, epoch-coldEpoch, s.skipOldMsgs, s.skipMsgReceipts,
err = s.chain.WalkSnapshot(context.Background(), curTs, epoch-coldEpoch, s.skipOldMsgs, s.skipMsgReceipts,
func(cid cid.Cid) error {
count++
return hotSet.Mark(cid)
@ -662,14 +670,14 @@ func (s *SplitStore) compactFull(curTs *types.TipSet) {
}
// Phase 1b: mark all reachable CIDs in the cold range
coldTs, err := s.cs.GetTipsetByHeight(context.Background(), coldEpoch, curTs, true)
coldTs, err := s.chain.GetTipsetByHeight(context.Background(), coldEpoch, curTs, true)
if err != nil {
// TODO do something better here
panic(err)
}
count = 0
err = s.cs.WalkSnapshot(context.Background(), coldTs, CompactionCold, s.skipOldMsgs, s.skipMsgReceipts,
err = s.chain.WalkSnapshot(context.Background(), coldTs, CompactionCold, s.skipOldMsgs, s.skipMsgReceipts,
func(cid cid.Cid) error {
count++
return coldSet.Mark(cid)

View File

@ -81,7 +81,7 @@ func init() {
}
// ReorgNotifee represents a callback that gets called upon reorgs.
type ReorgNotifee func(rev, app []*types.TipSet) error
type ReorgNotifee = func(rev, app []*types.TipSet) error
// Journal event types.
const (

View File

@ -20,7 +20,8 @@ import (
)
// UniversalBlockstore returns a single universal blockstore that stores both
// chain data and state data.
// chain data and state data. It can be backed by a blockstore directly
// (e.g. Badger), or by a Splitstore.
func UniversalBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.UniversalBlockstore, error) {
bs, err := r.Blockstore(helpers.LifecycleCtx(mctx, lc), repo.UniversalBlockstore)
if err != nil {
@ -94,19 +95,19 @@ func SplitBlockstore(cfg *config.Blockstore) func(lc fx.Lifecycle, r repo.Locked
}
}
func StateFlatBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.UniversalBlockstore) (dtypes.StateBlockstore, error) {
func StateFlatBlockstore(_ fx.Lifecycle, _ helpers.MetricsCtx, bs dtypes.UniversalBlockstore) (dtypes.StateBlockstore, error) {
return bs, nil
}
func StateSplitBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.SplitBlockstore) (dtypes.StateBlockstore, error) {
func StateSplitBlockstore(_ fx.Lifecycle, _ helpers.MetricsCtx, bs dtypes.SplitBlockstore) (dtypes.StateBlockstore, error) {
return bs, nil
}
func ChainFlatBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.UniversalBlockstore) (dtypes.ChainBlockstore, error) {
func ChainFlatBlockstore(_ fx.Lifecycle, _ helpers.MetricsCtx, bs dtypes.UniversalBlockstore) (dtypes.ChainBlockstore, error) {
return bs, nil
}
func ChainSplitBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.SplitBlockstore) (dtypes.ChainBlockstore, error) {
func ChainSplitBlockstore(_ fx.Lifecycle, _ helpers.MetricsCtx, bs dtypes.SplitBlockstore) (dtypes.ChainBlockstore, error) {
return bs, nil
}

View File

@ -80,14 +80,19 @@ func ChainStore(lc fx.Lifecycle, cbs dtypes.ChainBlockstore, sbs dtypes.StateBlo
log.Warnf("loading chain state from disk: %s", err)
}
var startHook func(context.Context) error
if ss, ok := basebs.(*splitstore.SplitStore); ok {
err := ss.Start(chain)
if err != nil {
log.Errorf("error starting splitstore: %s", err)
startHook = func(_ context.Context) error {
err := ss.Start(chain)
if err != nil {
err = xerrors.Errorf("error starting splitstore: %w", err)
}
return err
}
}
lc.Append(fx.Hook{
OnStart: startHook,
OnStop: func(_ context.Context) error {
return chain.Close()
},