support out-of-chain reference protection

This commit is contained in:
vyzo 2021-07-17 20:33:56 +03:00
parent 1c30d07d68
commit ebbaf23af8
8 changed files with 89 additions and 1 deletions

View File

@ -128,6 +128,9 @@ type SplitStore struct {
txnRefsMx sync.Mutex txnRefsMx sync.Mutex
txnRefs map[cid.Cid]struct{} txnRefs map[cid.Cid]struct{}
txnMissing map[cid.Cid]struct{} txnMissing map[cid.Cid]struct{}
// registered protectors
protectors []func(func(cid.Cid) error) error
} }
var _ bstore.Blockstore = (*SplitStore)(nil) var _ bstore.Blockstore = (*SplitStore)(nil)
@ -520,6 +523,13 @@ func (s *SplitStore) Start(chain ChainAccessor) error {
return nil return nil
} }
func (s *SplitStore) AddProtector(protector func(func(cid.Cid) error) error) {
s.mx.Lock()
defer s.mx.Unlock()
s.protectors = append(s.protectors, protector)
}
func (s *SplitStore) Close() error { func (s *SplitStore) Close() error {
if !atomic.CompareAndSwapInt32(&s.closing, 0, 1) { if !atomic.CompareAndSwapInt32(&s.closing, 0, 1) {
// already closing // already closing

View File

@ -345,6 +345,30 @@ func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSet) error {
}) })
} }
func (s *SplitStore) applyProtectors() error {
s.mx.Lock()
defer s.mx.Unlock()
count := 0
for _, protect := range s.protectors {
err := protect(func(c cid.Cid) error {
s.trackTxnRef(c)
count++
return nil
})
if err != nil {
return xerrors.Errorf("error applynig protector: %w", err)
}
}
if count > 0 {
log.Infof("protected %d references through %d protectors", count, len(s.protectors))
}
return nil
}
// --- Compaction --- // --- Compaction ---
// Compaction works transactionally with the following algorithm: // Compaction works transactionally with the following algorithm:
// - We prepare a transaction, whereby all i/o referenced objects through the API are tracked. // - We prepare a transaction, whereby all i/o referenced objects through the API are tracked.
@ -392,6 +416,14 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
// we are ready for concurrent marking // we are ready for concurrent marking
s.beginTxnMarking(markSet) s.beginTxnMarking(markSet)
// 0. track all protected references at beginning of compaction; anything added later should
// be transactionally protected by the write
log.Info("protecting references with registered protectors")
err = s.applyProtectors()
if err != nil {
return err
}
// 1. mark reachable objects by walking the chain from the current epoch; we keep state roots // 1. mark reachable objects by walking the chain from the current epoch; we keep state roots
// and messages until the boundary epoch. // and messages until the boundary epoch.
log.Info("marking reachable objects") log.Info("marking reachable objects")

View File

@ -426,6 +426,27 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName, j journ
return mp, nil return mp, nil
} }
func (mp *MessagePool) ProtectMessages(protect func(cid.Cid) error) error {
mp.lk.Lock()
defer mp.lk.Unlock()
for _, mset := range mp.pending {
for _, m := range mset.msgs {
err := protect(m.Cid())
if err != nil {
return err
}
err = protect(m.Message.Cid())
if err != nil {
return err
}
}
}
return nil
}
func (mp *MessagePool) resolveToKey(ctx context.Context, addr address.Address) (address.Address, error) { func (mp *MessagePool) resolveToKey(ctx context.Context, addr address.Address) (address.Address, error) {
// check the cache // check the cache
a, f := mp.keyCache[addr] a, f := mp.keyCache[addr]

View File

@ -105,6 +105,7 @@ func (tma *testMpoolAPI) SubscribeHeadChanges(cb func(rev, app []*types.TipSet)
func (tma *testMpoolAPI) PutMessage(m types.ChainMsg) (cid.Cid, error) { func (tma *testMpoolAPI) PutMessage(m types.ChainMsg) (cid.Cid, error) {
return cid.Undef, nil return cid.Undef, nil
} }
func (tma *testMpoolAPI) IsLite() bool { func (tma *testMpoolAPI) IsLite() bool {
return false return false
} }

View File

@ -312,12 +312,14 @@ func Repo(r repo.Repo) Option {
Override(new(dtypes.BasicStateBlockstore), modules.StateSplitBlockstore), Override(new(dtypes.BasicStateBlockstore), modules.StateSplitBlockstore),
Override(new(dtypes.BaseBlockstore), From(new(dtypes.SplitBlockstore))), Override(new(dtypes.BaseBlockstore), From(new(dtypes.SplitBlockstore))),
Override(new(dtypes.ExposedBlockstore), modules.ExposedSplitBlockstore), Override(new(dtypes.ExposedBlockstore), modules.ExposedSplitBlockstore),
Override(new(dtypes.GCReferenceProtector), modules.SplitBlockstoreGCReferenceProtector),
), ),
If(!cfg.EnableSplitstore, If(!cfg.EnableSplitstore,
Override(new(dtypes.BasicChainBlockstore), modules.ChainFlatBlockstore), Override(new(dtypes.BasicChainBlockstore), modules.ChainFlatBlockstore),
Override(new(dtypes.BasicStateBlockstore), modules.StateFlatBlockstore), Override(new(dtypes.BasicStateBlockstore), modules.StateFlatBlockstore),
Override(new(dtypes.BaseBlockstore), From(new(dtypes.UniversalBlockstore))), Override(new(dtypes.BaseBlockstore), From(new(dtypes.UniversalBlockstore))),
Override(new(dtypes.ExposedBlockstore), From(new(dtypes.UniversalBlockstore))), Override(new(dtypes.ExposedBlockstore), From(new(dtypes.UniversalBlockstore))),
Override(new(dtypes.GCReferenceProtector), modules.NoopGCReferenceProtector),
), ),
Override(new(dtypes.ChainBlockstore), From(new(dtypes.BasicChainBlockstore))), Override(new(dtypes.ChainBlockstore), From(new(dtypes.BasicChainBlockstore))),

View File

@ -95,6 +95,14 @@ func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.Locked
} }
} }
func SplitBlockstoreGCReferenceProtector(_ fx.Lifecycle, s dtypes.SplitBlockstore) dtypes.GCReferenceProtector {
return s.(dtypes.GCReferenceProtector)
}
func NoopGCReferenceProtector(_ fx.Lifecycle) dtypes.GCReferenceProtector {
return dtypes.NoopGCReferenceProtector{}
}
func ExposedSplitBlockstore(_ fx.Lifecycle, s dtypes.SplitBlockstore) dtypes.ExposedBlockstore { func ExposedSplitBlockstore(_ fx.Lifecycle, s dtypes.SplitBlockstore) dtypes.ExposedBlockstore {
return s.(*splitstore.SplitStore).Expose() return s.(*splitstore.SplitStore).Expose()
} }

View File

@ -58,7 +58,7 @@ func ChainBlockService(bs dtypes.ExposedBlockstore, rem dtypes.ChainBitswap) dty
return blockservice.New(bs, rem) return blockservice.New(bs, rem)
} }
func MessagePool(lc fx.Lifecycle, mpp messagepool.Provider, ds dtypes.MetadataDS, nn dtypes.NetworkName, j journal.Journal) (*messagepool.MessagePool, error) { func MessagePool(lc fx.Lifecycle, mpp messagepool.Provider, ds dtypes.MetadataDS, nn dtypes.NetworkName, j journal.Journal, protector dtypes.GCReferenceProtector) (*messagepool.MessagePool, error) {
mp, err := messagepool.New(mpp, ds, nn, j) mp, err := messagepool.New(mpp, ds, nn, j)
if err != nil { if err != nil {
return nil, xerrors.Errorf("constructing mpool: %w", err) return nil, xerrors.Errorf("constructing mpool: %w", err)
@ -68,6 +68,7 @@ func MessagePool(lc fx.Lifecycle, mpp messagepool.Provider, ds dtypes.MetadataDS
return mp.Close() return mp.Close()
}, },
}) })
protector.AddProtector(mp.ProtectMessages)
return mp, nil return mp, nil
} }

View File

@ -0,0 +1,13 @@
package dtypes
import (
cid "github.com/ipfs/go-cid"
)
type GCReferenceProtector interface {
AddProtector(func(func(cid.Cid) error) error)
}
type NoopGCReferenceProtector struct{}
func (p NoopGCReferenceProtector) AddProtector(func(func(cid.Cid) error) error) {}