Merge pull request #10641 from filecoin-project/phi/backport-splitstore-oos
Backport #10392 into v1.21.0
This commit is contained in:
commit
5e890165e2
@ -444,7 +444,7 @@ func (b *Blockstore) deleteDB(path string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Blockstore) onlineGC(ctx context.Context, threshold float64) error {
|
func (b *Blockstore) onlineGC(ctx context.Context, threshold float64, checkFreq time.Duration, check func() error) error {
|
||||||
b.lockDB()
|
b.lockDB()
|
||||||
defer b.unlockDB()
|
defer b.unlockDB()
|
||||||
|
|
||||||
@ -461,11 +461,15 @@ func (b *Blockstore) onlineGC(ctx context.Context, threshold float64) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
checkTick := time.NewTimer(checkFreq)
|
||||||
|
defer checkTick.Stop()
|
||||||
for err == nil {
|
for err == nil {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
err = ctx.Err()
|
err = ctx.Err()
|
||||||
|
case <-checkTick.C:
|
||||||
|
err = check()
|
||||||
|
checkTick.Reset(checkFreq)
|
||||||
default:
|
default:
|
||||||
err = b.db.RunValueLogGC(threshold)
|
err = b.db.RunValueLogGC(threshold)
|
||||||
}
|
}
|
||||||
@ -502,7 +506,17 @@ func (b *Blockstore) CollectGarbage(ctx context.Context, opts ...blockstore.Bloc
|
|||||||
if threshold == 0 {
|
if threshold == 0 {
|
||||||
threshold = defaultGCThreshold
|
threshold = defaultGCThreshold
|
||||||
}
|
}
|
||||||
return b.onlineGC(ctx, threshold)
|
checkFreq := options.CheckFreq
|
||||||
|
if checkFreq < 30*time.Second { // disallow checking more frequently than block time
|
||||||
|
checkFreq = 30 * time.Second
|
||||||
|
}
|
||||||
|
check := options.Check
|
||||||
|
if check == nil {
|
||||||
|
check = func() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return b.onlineGC(ctx, threshold, checkFreq, check)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GCOnce runs garbage collection on the value log;
|
// GCOnce runs garbage collection on the value log;
|
||||||
|
@ -2,6 +2,7 @@ package blockstore
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
ds "github.com/ipfs/go-datastore"
|
ds "github.com/ipfs/go-datastore"
|
||||||
@ -57,6 +58,10 @@ type BlockstoreGCOptions struct {
|
|||||||
FullGC bool
|
FullGC bool
|
||||||
// fraction of garbage in badger vlog before its worth processing in online GC
|
// fraction of garbage in badger vlog before its worth processing in online GC
|
||||||
Threshold float64
|
Threshold float64
|
||||||
|
// how often to call the check function
|
||||||
|
CheckFreq time.Duration
|
||||||
|
// function to call periodically to pause or early terminate GC
|
||||||
|
Check func() error
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithFullGC(fullgc bool) BlockstoreGCOption {
|
func WithFullGC(fullgc bool) BlockstoreGCOption {
|
||||||
@ -73,6 +78,20 @@ func WithThreshold(threshold float64) BlockstoreGCOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithCheckFreq(f time.Duration) BlockstoreGCOption {
|
||||||
|
return func(opts *BlockstoreGCOptions) error {
|
||||||
|
opts.CheckFreq = f
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithCheck(check func() error) BlockstoreGCOption {
|
||||||
|
return func(opts *BlockstoreGCOptions) error {
|
||||||
|
opts.Check = check
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// BlockstoreSize is a trait for on-disk blockstores that can report their size
|
// BlockstoreSize is a trait for on-disk blockstores that can report their size
|
||||||
type BlockstoreSize interface {
|
type BlockstoreSize interface {
|
||||||
Size() (int64, error)
|
Size() (int64, error)
|
||||||
|
@ -187,6 +187,11 @@ type SplitStore struct {
|
|||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel func()
|
cancel func()
|
||||||
|
|
||||||
|
outOfSync int32 // for fast checking
|
||||||
|
chainSyncMx sync.Mutex
|
||||||
|
chainSyncCond sync.Cond
|
||||||
|
chainSyncFinished bool // protected by chainSyncMx
|
||||||
|
|
||||||
debug *debugLog
|
debug *debugLog
|
||||||
|
|
||||||
// transactional protection for concurrent read/writes during compaction
|
// transactional protection for concurrent read/writes during compaction
|
||||||
@ -261,6 +266,7 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
|
|||||||
|
|
||||||
ss.txnViewsCond.L = &ss.txnViewsMx
|
ss.txnViewsCond.L = &ss.txnViewsMx
|
||||||
ss.txnSyncCond.L = &ss.txnSyncMx
|
ss.txnSyncCond.L = &ss.txnSyncMx
|
||||||
|
ss.chainSyncCond.L = &ss.chainSyncMx
|
||||||
ss.ctx, ss.cancel = context.WithCancel(context.Background())
|
ss.ctx, ss.cancel = context.WithCancel(context.Background())
|
||||||
|
|
||||||
ss.reifyCond.L = &ss.reifyMx
|
ss.reifyCond.L = &ss.reifyMx
|
||||||
@ -822,6 +828,11 @@ func (s *SplitStore) Close() error {
|
|||||||
s.txnSyncCond.Broadcast()
|
s.txnSyncCond.Broadcast()
|
||||||
s.txnSyncMx.Unlock()
|
s.txnSyncMx.Unlock()
|
||||||
|
|
||||||
|
s.chainSyncMx.Lock()
|
||||||
|
s.chainSyncFinished = true
|
||||||
|
s.chainSyncCond.Broadcast()
|
||||||
|
s.chainSyncMx.Unlock()
|
||||||
|
|
||||||
log.Warn("close with ongoing compaction in progress; waiting for it to finish...")
|
log.Warn("close with ongoing compaction in progress; waiting for it to finish...")
|
||||||
for atomic.LoadInt32(&s.compacting) == 1 {
|
for atomic.LoadInt32(&s.compacting) == 1 {
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
|
@ -91,7 +91,35 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
|
|||||||
// Regardless, we put a mutex in HeadChange just to be safe
|
// Regardless, we put a mutex in HeadChange just to be safe
|
||||||
|
|
||||||
if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) {
|
if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) {
|
||||||
// we are currently compacting -- protect the new tipset(s)
|
// we are currently compacting
|
||||||
|
// 1. Signal sync condition to yield compaction when out of sync and resume when in sync
|
||||||
|
timestamp := time.Unix(int64(curTs.MinTimestamp()), 0)
|
||||||
|
if CheckSyncGap && time.Since(timestamp) > SyncGapTime {
|
||||||
|
/* Chain out of sync */
|
||||||
|
if atomic.CompareAndSwapInt32(&s.outOfSync, 0, 1) {
|
||||||
|
// transition from in sync to out of sync
|
||||||
|
s.chainSyncMx.Lock()
|
||||||
|
s.chainSyncFinished = false
|
||||||
|
s.chainSyncMx.Unlock()
|
||||||
|
}
|
||||||
|
// already out of sync, no signaling necessary
|
||||||
|
|
||||||
|
}
|
||||||
|
// TODO: ok to use hysteresis with no transitions between 30s and 1m?
|
||||||
|
if time.Since(timestamp) < SyncWaitTime {
|
||||||
|
/* Chain in sync */
|
||||||
|
if atomic.CompareAndSwapInt32(&s.outOfSync, 0, 0) {
|
||||||
|
// already in sync, no signaling necessary
|
||||||
|
} else {
|
||||||
|
// transition from out of sync to in sync
|
||||||
|
s.chainSyncMx.Lock()
|
||||||
|
s.chainSyncFinished = true
|
||||||
|
s.chainSyncCond.Broadcast()
|
||||||
|
s.chainSyncMx.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
// 2. protect the new tipset(s)
|
||||||
s.protectTipSets(apply)
|
s.protectTipSets(apply)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -427,7 +455,7 @@ func (s *SplitStore) protectTxnRefs(markSet MarkSet) error {
|
|||||||
// transactionally protect a reference by walking the object and marking.
|
// transactionally protect a reference by walking the object and marking.
|
||||||
// concurrent markings are short circuited by checking the markset.
|
// concurrent markings are short circuited by checking the markset.
|
||||||
func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSet) (int64, error) {
|
func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSet) (int64, error) {
|
||||||
if err := s.checkClosing(); err != nil {
|
if err := s.checkYield(); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -545,7 +573,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
}
|
}
|
||||||
defer coldSet.Close() //nolint:errcheck
|
defer coldSet.Close() //nolint:errcheck
|
||||||
|
|
||||||
if err := s.checkClosing(); err != nil {
|
if err := s.checkYield(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -617,7 +645,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
|
|
||||||
log.Infow("marking done", "took", time.Since(startMark), "marked", *count)
|
log.Infow("marking done", "took", time.Since(startMark), "marked", *count)
|
||||||
|
|
||||||
if err := s.checkClosing(); err != nil {
|
if err := s.checkYield(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -627,7 +655,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
return xerrors.Errorf("error protecting transactional refs: %w", err)
|
return xerrors.Errorf("error protecting transactional refs: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.checkClosing(); err != nil {
|
if err := s.checkYield(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -704,7 +732,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
stats.Record(s.ctx, metrics.SplitstoreCompactionHot.M(hotCnt))
|
stats.Record(s.ctx, metrics.SplitstoreCompactionHot.M(hotCnt))
|
||||||
stats.Record(s.ctx, metrics.SplitstoreCompactionCold.M(coldCnt))
|
stats.Record(s.ctx, metrics.SplitstoreCompactionCold.M(coldCnt))
|
||||||
|
|
||||||
if err := s.checkClosing(); err != nil {
|
if err := s.checkYield(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -713,7 +741,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
// possibly delete objects we didn't have when we were collecting cold objects)
|
// possibly delete objects we didn't have when we were collecting cold objects)
|
||||||
s.waitForMissingRefs(markSet)
|
s.waitForMissingRefs(markSet)
|
||||||
|
|
||||||
if err := s.checkClosing(); err != nil {
|
if err := s.checkYield(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -733,7 +761,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
}
|
}
|
||||||
log.Infow("moving done", "took", time.Since(startMove))
|
log.Infow("moving done", "took", time.Since(startMove))
|
||||||
|
|
||||||
if err := s.checkClosing(); err != nil {
|
if err := s.checkYield(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -764,7 +792,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// wait for the head to catch up so that the current tipset is marked
|
// wait for the head to catch up so that the current tipset is marked
|
||||||
s.waitForSync()
|
s.waitForTxnSync()
|
||||||
|
|
||||||
if err := s.checkClosing(); err != nil {
|
if err := s.checkClosing(); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -865,7 +893,7 @@ func (s *SplitStore) beginCriticalSection(markSet MarkSet) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SplitStore) waitForSync() {
|
func (s *SplitStore) waitForTxnSync() {
|
||||||
log.Info("waiting for sync")
|
log.Info("waiting for sync")
|
||||||
if !CheckSyncGap {
|
if !CheckSyncGap {
|
||||||
log.Warnf("If you see this outside of test it is a serious splitstore issue")
|
log.Warnf("If you see this outside of test it is a serious splitstore issue")
|
||||||
@ -884,6 +912,25 @@ func (s *SplitStore) waitForSync() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Block compaction operations if chain sync has fallen behind
|
||||||
|
func (s *SplitStore) waitForSync() {
|
||||||
|
if atomic.LoadInt32(&s.outOfSync) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.chainSyncMx.Lock()
|
||||||
|
defer s.chainSyncMx.Unlock()
|
||||||
|
|
||||||
|
for !s.chainSyncFinished {
|
||||||
|
s.chainSyncCond.Wait()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Combined sync and closing check
|
||||||
|
func (s *SplitStore) checkYield() error {
|
||||||
|
s.waitForSync()
|
||||||
|
return s.checkClosing()
|
||||||
|
}
|
||||||
|
|
||||||
func (s *SplitStore) endTxnProtect() {
|
func (s *SplitStore) endTxnProtect() {
|
||||||
s.txnLk.Lock()
|
s.txnLk.Lock()
|
||||||
defer s.txnLk.Unlock()
|
defer s.txnLk.Unlock()
|
||||||
@ -1037,7 +1084,7 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
|
|||||||
|
|
||||||
for len(toWalk) > 0 {
|
for len(toWalk) > 0 {
|
||||||
// walking can take a while, so check this with every opportunity
|
// walking can take a while, so check this with every opportunity
|
||||||
if err := s.checkClosing(); err != nil {
|
if err := s.checkYield(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1106,7 +1153,7 @@ func (s *SplitStore) walkObject(c cid.Cid, visitor ObjectVisitor, f func(cid.Cid
|
|||||||
}
|
}
|
||||||
|
|
||||||
// check this before recursing
|
// check this before recursing
|
||||||
if err := s.checkClosing(); err != nil {
|
if err := s.checkYield(); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1175,7 +1222,7 @@ func (s *SplitStore) walkObjectIncomplete(c cid.Cid, visitor ObjectVisitor, f, m
|
|||||||
}
|
}
|
||||||
|
|
||||||
// check this before recursing
|
// check this before recursing
|
||||||
if err := s.checkClosing(); err != nil {
|
if err := s.checkYield(); err != nil {
|
||||||
return sz, err
|
return sz, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1262,7 +1309,7 @@ func (s *SplitStore) moveColdBlocks(coldr *ColdSetReader) error {
|
|||||||
batch := make([]blocks.Block, 0, batchSize)
|
batch := make([]blocks.Block, 0, batchSize)
|
||||||
|
|
||||||
err := coldr.ForEach(func(c cid.Cid) error {
|
err := coldr.ForEach(func(c cid.Cid) error {
|
||||||
if err := s.checkClosing(); err != nil {
|
if err := s.checkYield(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
blk, err := s.hot.Get(s.ctx, c)
|
blk, err := s.hot.Get(s.ctx, c)
|
||||||
|
@ -65,10 +65,15 @@ func (s *SplitStore) gcHotAfterCompaction() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *SplitStore) gcBlockstore(b bstore.Blockstore, opts []bstore.BlockstoreGCOption) error {
|
func (s *SplitStore) gcBlockstore(b bstore.Blockstore, opts []bstore.BlockstoreGCOption) error {
|
||||||
|
if err := s.checkYield(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
if gc, ok := b.(bstore.BlockstoreGC); ok {
|
if gc, ok := b.(bstore.BlockstoreGC); ok {
|
||||||
log.Info("garbage collecting blockstore")
|
log.Info("garbage collecting blockstore")
|
||||||
startGC := time.Now()
|
startGC := time.Now()
|
||||||
|
|
||||||
|
opts = append(opts, bstore.WithCheckFreq(90*time.Second))
|
||||||
|
opts = append(opts, bstore.WithCheck(s.checkYield))
|
||||||
if err := gc.CollectGarbage(s.ctx, opts...); err != nil {
|
if err := gc.CollectGarbage(s.ctx, opts...); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user