Implement yield friendly online GC
This commit is contained in:
parent
fd66db680f
commit
0ce9ae4809
@ -444,7 +444,7 @@ func (b *Blockstore) deleteDB(path string) {
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Blockstore) onlineGC(ctx context.Context, threshold float64) error {
|
||||
func (b *Blockstore) onlineGC(ctx context.Context, threshold float64, checkFreq time.Duration, check func() error) error {
|
||||
b.lockDB()
|
||||
defer b.unlockDB()
|
||||
|
||||
@ -461,11 +461,15 @@ func (b *Blockstore) onlineGC(ctx context.Context, threshold float64) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
checkTick := time.NewTimer(checkFreq)
|
||||
defer checkTick.Stop()
|
||||
for err == nil {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err = ctx.Err()
|
||||
case <-checkTick.C:
|
||||
check()
|
||||
checkTick.Reset(checkFreq)
|
||||
default:
|
||||
err = b.db.RunValueLogGC(threshold)
|
||||
}
|
||||
@ -502,7 +506,17 @@ func (b *Blockstore) CollectGarbage(ctx context.Context, opts ...blockstore.Bloc
|
||||
if threshold == 0 {
|
||||
threshold = defaultGCThreshold
|
||||
}
|
||||
return b.onlineGC(ctx, threshold)
|
||||
checkFreq := options.CheckFreq
|
||||
if checkFreq < 30*time.Second { // disallow checking more frequently than block time
|
||||
checkFreq = 30 * time.Second
|
||||
}
|
||||
check := options.Check
|
||||
if check == nil {
|
||||
check = func() error {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return b.onlineGC(ctx, threshold, checkFreq, check)
|
||||
}
|
||||
|
||||
// GCOnce runs garbage collection on the value log;
|
||||
|
@ -2,6 +2,7 @@ package blockstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
ds "github.com/ipfs/go-datastore"
|
||||
@ -57,6 +58,10 @@ type BlockstoreGCOptions struct {
|
||||
FullGC bool
|
||||
// fraction of garbage in badger vlog before its worth processing in online GC
|
||||
Threshold float64
|
||||
// how often to call the check function
|
||||
CheckFreq time.Duration
|
||||
// function to call periodically to pause or early terminate GC
|
||||
Check func() error
|
||||
}
|
||||
|
||||
func WithFullGC(fullgc bool) BlockstoreGCOption {
|
||||
@ -73,6 +78,20 @@ func WithThreshold(threshold float64) BlockstoreGCOption {
|
||||
}
|
||||
}
|
||||
|
||||
func WithCheckFreq(f time.Duration) BlockstoreGCOption {
|
||||
return func(opts *BlockstoreGCOptions) error {
|
||||
opts.CheckFreq = f
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithCheck(check func() error) BlockstoreGCOption {
|
||||
return func(opts *BlockstoreGCOptions) error {
|
||||
opts.Check = check
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// BlockstoreSize is a trait for on-disk blockstores that can report their size
|
||||
type BlockstoreSize interface {
|
||||
Size() (int64, error)
|
||||
|
@ -72,6 +72,8 @@ func (s *SplitStore) gcBlockstore(b bstore.Blockstore, opts []bstore.BlockstoreG
|
||||
log.Info("garbage collecting blockstore")
|
||||
startGC := time.Now()
|
||||
|
||||
opts = append(opts, bstore.WithCheckFreq(90*time.Second))
|
||||
opts = append(opts, bstore.WithCheck(s.checkYield))
|
||||
if err := gc.CollectGarbage(s.ctx, opts...); err != nil {
|
||||
return err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user