diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index d5de3fb82..5ac7a1c94 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -128,6 +128,9 @@ type SplitStore struct { txnRefsMx sync.Mutex txnRefs map[cid.Cid]struct{} txnMissing map[cid.Cid]struct{} + + // registered protectors + protectors []func(func(cid.Cid) error) error } var _ bstore.Blockstore = (*SplitStore)(nil) @@ -520,6 +523,13 @@ func (s *SplitStore) Start(chain ChainAccessor) error { return nil } +func (s *SplitStore) AddProtector(protector func(func(cid.Cid) error) error) { + s.mx.Lock() + defer s.mx.Unlock() + + s.protectors = append(s.protectors, protector) +} + func (s *SplitStore) Close() error { if !atomic.CompareAndSwapInt32(&s.closing, 0, 1) { // already closing diff --git a/blockstore/splitstore/splitstore_compact.go b/blockstore/splitstore/splitstore_compact.go index 1c70ce973..d3d87bca1 100644 --- a/blockstore/splitstore/splitstore_compact.go +++ b/blockstore/splitstore/splitstore_compact.go @@ -345,6 +345,30 @@ func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSet) error { }) } +func (s *SplitStore) applyProtectors() error { + s.mx.Lock() + defer s.mx.Unlock() + + count := 0 + for _, protect := range s.protectors { + err := protect(func(c cid.Cid) error { + s.trackTxnRef(c) + count++ + return nil + }) + + if err != nil { + return xerrors.Errorf("error applynig protector: %w", err) + } + } + + if count > 0 { + log.Infof("protected %d references through %d protectors", count, len(s.protectors)) + } + + return nil +} + // --- Compaction --- // Compaction works transactionally with the following algorithm: // - We prepare a transaction, whereby all i/o referenced objects through the API are tracked. @@ -392,6 +416,14 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { // we are ready for concurrent marking s.beginTxnMarking(markSet) + // 0. track all protected references at beginning of compaction; anything added later should + // be transactionally protected by the write + log.Info("protecting references with registered protectors") + err = s.applyProtectors() + if err != nil { + return err + } + // 1. mark reachable objects by walking the chain from the current epoch; we keep state roots // and messages until the boundary epoch. log.Info("marking reachable objects") diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 865c18a3a..b2ed89cc4 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -426,6 +426,27 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName, j journ return mp, nil } +func (mp *MessagePool) ProtectMessages(protect func(cid.Cid) error) error { + mp.lk.Lock() + defer mp.lk.Unlock() + + for _, mset := range mp.pending { + for _, m := range mset.msgs { + err := protect(m.Cid()) + if err != nil { + return err + } + + err = protect(m.Message.Cid()) + if err != nil { + return err + } + } + } + + return nil +} + func (mp *MessagePool) resolveToKey(ctx context.Context, addr address.Address) (address.Address, error) { // check the cache a, f := mp.keyCache[addr] diff --git a/chain/messagepool/messagepool_test.go b/chain/messagepool/messagepool_test.go index f271249df..2ea8fdec0 100644 --- a/chain/messagepool/messagepool_test.go +++ b/chain/messagepool/messagepool_test.go @@ -105,6 +105,7 @@ func (tma *testMpoolAPI) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) func (tma *testMpoolAPI) PutMessage(m types.ChainMsg) (cid.Cid, error) { return cid.Undef, nil } + func (tma *testMpoolAPI) IsLite() bool { return false } diff --git a/node/builder.go b/node/builder.go index dc9e1f8b7..771368917 100644 --- a/node/builder.go +++ b/node/builder.go @@ -312,12 +312,14 @@ func Repo(r repo.Repo) Option { Override(new(dtypes.BasicStateBlockstore), modules.StateSplitBlockstore), Override(new(dtypes.BaseBlockstore), From(new(dtypes.SplitBlockstore))), Override(new(dtypes.ExposedBlockstore), modules.ExposedSplitBlockstore), + Override(new(dtypes.GCReferenceProtector), modules.SplitBlockstoreGCReferenceProtector), ), If(!cfg.EnableSplitstore, Override(new(dtypes.BasicChainBlockstore), modules.ChainFlatBlockstore), Override(new(dtypes.BasicStateBlockstore), modules.StateFlatBlockstore), Override(new(dtypes.BaseBlockstore), From(new(dtypes.UniversalBlockstore))), Override(new(dtypes.ExposedBlockstore), From(new(dtypes.UniversalBlockstore))), + Override(new(dtypes.GCReferenceProtector), modules.NoopGCReferenceProtector), ), Override(new(dtypes.ChainBlockstore), From(new(dtypes.BasicChainBlockstore))), diff --git a/node/modules/blockstore.go b/node/modules/blockstore.go index 50692c9f0..d113e9037 100644 --- a/node/modules/blockstore.go +++ b/node/modules/blockstore.go @@ -95,6 +95,14 @@ func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.Locked } } +func SplitBlockstoreGCReferenceProtector(_ fx.Lifecycle, s dtypes.SplitBlockstore) dtypes.GCReferenceProtector { + return s.(dtypes.GCReferenceProtector) +} + +func NoopGCReferenceProtector(_ fx.Lifecycle) dtypes.GCReferenceProtector { + return dtypes.NoopGCReferenceProtector{} +} + func ExposedSplitBlockstore(_ fx.Lifecycle, s dtypes.SplitBlockstore) dtypes.ExposedBlockstore { return s.(*splitstore.SplitStore).Expose() } diff --git a/node/modules/chain.go b/node/modules/chain.go index 954322948..e81b2e5b7 100644 --- a/node/modules/chain.go +++ b/node/modules/chain.go @@ -58,7 +58,7 @@ func ChainBlockService(bs dtypes.ExposedBlockstore, rem dtypes.ChainBitswap) dty return blockservice.New(bs, rem) } -func MessagePool(lc fx.Lifecycle, mpp messagepool.Provider, ds dtypes.MetadataDS, nn dtypes.NetworkName, j journal.Journal) (*messagepool.MessagePool, error) { +func MessagePool(lc fx.Lifecycle, mpp messagepool.Provider, ds dtypes.MetadataDS, nn dtypes.NetworkName, j journal.Journal, protector dtypes.GCReferenceProtector) (*messagepool.MessagePool, error) { mp, err := messagepool.New(mpp, ds, nn, j) if err != nil { return nil, xerrors.Errorf("constructing mpool: %w", err) @@ -68,6 +68,7 @@ func MessagePool(lc fx.Lifecycle, mpp messagepool.Provider, ds dtypes.MetadataDS return mp.Close() }, }) + protector.AddProtector(mp.ProtectMessages) return mp, nil } diff --git a/node/modules/dtypes/protector.go b/node/modules/dtypes/protector.go new file mode 100644 index 000000000..0d9625fc1 --- /dev/null +++ b/node/modules/dtypes/protector.go @@ -0,0 +1,13 @@ +package dtypes + +import ( + cid "github.com/ipfs/go-cid" +) + +type GCReferenceProtector interface { + AddProtector(func(func(cid.Cid) error) error) +} + +type NoopGCReferenceProtector struct{} + +func (p NoopGCReferenceProtector) AddProtector(func(func(cid.Cid) error) error) {}