diff --git a/api/api_full.go b/api/api_full.go index 99955e537..f0009e4a1 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -183,10 +183,14 @@ type FullNode interface { // nodes. ChainExportRangeInternal(ctx context.Context, head, tail types.TipSetKey, cfg ChainExportConfig) error //perm:admin - // ChainPrune prunes the stored chain state and garbage collects; only supported if you + // ChainPrune forces compaction on cold store and garbage collects; only supported if you // are using the splitstore ChainPrune(ctx context.Context, opts PruneOpts) error //perm:admin + // ChainHotGC does online (badger) GC on the hot store; only supported if you are using + // the splitstore + ChainHotGC(ctx context.Context, opts HotGCOpts) error //perm:admin + // ChainCheckBlockstore performs an (asynchronous) health check on the chain/state blockstore // if supported by the underlying implementation. ChainCheckBlockstore(context.Context) error //perm:admin @@ -1354,6 +1358,12 @@ type PruneOpts struct { RetainState int64 } +type HotGCOpts struct { + Threshold float64 + Periodic bool + Moving bool +} + type EthTxReceipt struct { TransactionHash ethtypes.EthHash `json:"transactionHash"` TransactionIndex ethtypes.EthUint64 `json:"transactionIndex"` diff --git a/api/mocks/mock_full.go b/api/mocks/mock_full.go index 87da67179..7567c7171 100644 --- a/api/mocks/mock_full.go +++ b/api/mocks/mock_full.go @@ -394,6 +394,20 @@ func (mr *MockFullNodeMockRecorder) ChainHead(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainHead", reflect.TypeOf((*MockFullNode)(nil).ChainHead), arg0) } +// ChainHotGC mocks base method. +func (m *MockFullNode) ChainHotGC(arg0 context.Context, arg1 api.HotGCOpts) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ChainHotGC", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// ChainHotGC indicates an expected call of ChainHotGC. +func (mr *MockFullNodeMockRecorder) ChainHotGC(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainHotGC", reflect.TypeOf((*MockFullNode)(nil).ChainHotGC), arg0, arg1) +} + // ChainNotify mocks base method. func (m *MockFullNode) ChainNotify(arg0 context.Context) (<-chan []*api.HeadChange, error) { m.ctrl.T.Helper() diff --git a/api/proxy_gen.go b/api/proxy_gen.go index b5163f558..17be4088d 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -172,6 +172,8 @@ type FullNodeMethods struct { ChainHead func(p0 context.Context) (*types.TipSet, error) `perm:"read"` + ChainHotGC func(p0 context.Context, p1 HotGCOpts) error `perm:"admin"` + ChainNotify func(p0 context.Context) (<-chan []*HeadChange, error) `perm:"read"` ChainPrune func(p0 context.Context, p1 PruneOpts) error `perm:"admin"` @@ -1617,6 +1619,17 @@ func (s *FullNodeStub) ChainHead(p0 context.Context) (*types.TipSet, error) { return nil, ErrNotSupported } +func (s *FullNodeStruct) ChainHotGC(p0 context.Context, p1 HotGCOpts) error { + if s.Internal.ChainHotGC == nil { + return ErrNotSupported + } + return s.Internal.ChainHotGC(p0, p1) +} + +func (s *FullNodeStub) ChainHotGC(p0 context.Context, p1 HotGCOpts) error { + return ErrNotSupported +} + func (s *FullNodeStruct) ChainNotify(p0 context.Context) (<-chan []*HeadChange, error) { if s.Internal.ChainNotify == nil { return nil, ErrNotSupported diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index da4f9f67d..3087c01f7 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -20,6 +20,7 @@ import ( pool "github.com/libp2p/go-buffer-pool" "github.com/multiformats/go-base32" "go.uber.org/zap" + "golang.org/x/xerrors" "github.com/filecoin-project/lotus/blockstore" ) @@ -44,7 +45,8 @@ const ( // MemoryMap is equivalent to badger/options.MemoryMap. MemoryMap = options.MemoryMap // LoadToRAM is equivalent to badger/options.LoadToRAM. - LoadToRAM = options.LoadToRAM + LoadToRAM = options.LoadToRAM + defaultGCThreshold = 0.125 ) // Options embeds the badger options themselves, and augments them with @@ -439,7 +441,7 @@ func (b *Blockstore) deleteDB(path string) { } } -func (b *Blockstore) onlineGC() error { +func (b *Blockstore) onlineGC(ctx context.Context, threshold float64) error { b.lockDB() defer b.unlockDB() @@ -448,6 +450,9 @@ func (b *Blockstore) onlineGC() error { if nworkers < 2 { nworkers = 2 } + if nworkers > 7 { // max out at 1 goroutine per badger level + nworkers = 7 + } err := b.db.Flatten(nworkers) if err != nil { @@ -455,7 +460,12 @@ func (b *Blockstore) onlineGC() error { } for err == nil { - err = b.db.RunValueLogGC(0.125) + select { + case <-ctx.Done(): + err = ctx.Err() + default: + err = b.db.RunValueLogGC(threshold) + } } if err == badger.ErrNoRewrite { @@ -468,7 +478,7 @@ func (b *Blockstore) onlineGC() error { // CollectGarbage compacts and runs garbage collection on the value log; // implements the BlockstoreGC trait -func (b *Blockstore) CollectGarbage(opts ...blockstore.BlockstoreGCOption) error { +func (b *Blockstore) CollectGarbage(ctx context.Context, opts ...blockstore.BlockstoreGCOption) error { if err := b.access(); err != nil { return err } @@ -485,8 +495,48 @@ func (b *Blockstore) CollectGarbage(opts ...blockstore.BlockstoreGCOption) error if options.FullGC { return b.movingGC() } + threshold := options.Threshold + if threshold == 0 { + threshold = defaultGCThreshold + } + return b.onlineGC(ctx, threshold) +} - return b.onlineGC() +// GCOnce runs garbage collection on the value log; +// implements BlockstoreGCOnce trait +func (b *Blockstore) GCOnce(ctx context.Context, opts ...blockstore.BlockstoreGCOption) error { + if err := b.access(); err != nil { + return err + } + defer b.viewers.Done() + + var options blockstore.BlockstoreGCOptions + for _, opt := range opts { + err := opt(&options) + if err != nil { + return err + } + } + if options.FullGC { + return xerrors.Errorf("FullGC option specified for GCOnce but full GC is non incremental") + } + + threshold := options.Threshold + if threshold == 0 { + threshold = defaultGCThreshold + } + + b.lockDB() + defer b.unlockDB() + + // Note no compaction needed before single GC as we will hit at most one vlog anyway + err := b.db.RunValueLogGC(threshold) + if err == badger.ErrNoRewrite { + // not really an error in this case, it signals the end of GC + return nil + } + + return err } // Size returns the aggregate size of the blockstore diff --git a/blockstore/badger/blockstore_test.go b/blockstore/badger/blockstore_test.go index fc81be43e..bf85104bb 100644 --- a/blockstore/badger/blockstore_test.go +++ b/blockstore/badger/blockstore_test.go @@ -145,7 +145,7 @@ func testMove(t *testing.T, optsF func(string) Options) { return nil }) g.Go(func() error { - return db.CollectGarbage(blockstore.WithFullGC(true)) + return db.CollectGarbage(ctx, blockstore.WithFullGC(true)) }) err = g.Wait() @@ -230,7 +230,7 @@ func testMove(t *testing.T, optsF func(string) Options) { checkPath() // now do another FullGC to test the double move and following of symlinks - if err := db.CollectGarbage(blockstore.WithFullGC(true)); err != nil { + if err := db.CollectGarbage(ctx, blockstore.WithFullGC(true)); err != nil { t.Fatal(err) } diff --git a/blockstore/blockstore.go b/blockstore/blockstore.go index f2fb00e8a..be1f1199f 100644 --- a/blockstore/blockstore.go +++ b/blockstore/blockstore.go @@ -36,7 +36,12 @@ type BlockstoreIterator interface { // BlockstoreGC is a trait for blockstores that support online garbage collection type BlockstoreGC interface { - CollectGarbage(options ...BlockstoreGCOption) error + CollectGarbage(ctx context.Context, options ...BlockstoreGCOption) error +} + +// BlockstoreGCOnce is a trait for a blockstore that supports incremental online garbage collection +type BlockstoreGCOnce interface { + GCOnce(ctx context.Context, options ...BlockstoreGCOption) error } // BlockstoreGCOption is a functional interface for controlling blockstore GC options @@ -45,6 +50,8 @@ type BlockstoreGCOption = func(*BlockstoreGCOptions) error // BlockstoreGCOptions is a struct with GC options type BlockstoreGCOptions struct { FullGC bool + // fraction of garbage in badger vlog before its worth processing in online GC + Threshold float64 } func WithFullGC(fullgc bool) BlockstoreGCOption { @@ -54,6 +61,13 @@ func WithFullGC(fullgc bool) BlockstoreGCOption { } } +func WithThreshold(threshold float64) BlockstoreGCOption { + return func(opts *BlockstoreGCOptions) error { + opts.Threshold = threshold + return nil + } +} + // BlockstoreSize is a trait for on-disk blockstores that can report their size type BlockstoreSize interface { Size() (int64, error) diff --git a/blockstore/splitstore/splitstore_compact.go b/blockstore/splitstore/splitstore_compact.go index cb24a3991..59bdd515d 100644 --- a/blockstore/splitstore/splitstore_compact.go +++ b/blockstore/splitstore/splitstore_compact.go @@ -794,7 +794,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { // we are done; do some housekeeping s.endTxnProtect() - s.gcHotstore() + s.gcHotAfterCompaction() err = s.setBaseEpoch(boundaryEpoch) if err != nil { diff --git a/blockstore/splitstore/splitstore_gc.go b/blockstore/splitstore/splitstore_gc.go index c9be94d2e..3b53b8042 100644 --- a/blockstore/splitstore/splitstore_gc.go +++ b/blockstore/splitstore/splitstore_gc.go @@ -7,7 +7,7 @@ import ( bstore "github.com/filecoin-project/lotus/blockstore" ) -func (s *SplitStore) gcHotstore() { +func (s *SplitStore) gcHotAfterCompaction() { var opts []bstore.BlockstoreGCOption if s.cfg.HotStoreFullGCFrequency > 0 && s.compactionIndex%int64(s.cfg.HotStoreFullGCFrequency) == 0 { opts = append(opts, bstore.WithFullGC(true)) @@ -23,7 +23,7 @@ func (s *SplitStore) gcBlockstore(b bstore.Blockstore, opts []bstore.BlockstoreG log.Info("garbage collecting blockstore") startGC := time.Now() - if err := gc.CollectGarbage(opts...); err != nil { + if err := gc.CollectGarbage(s.ctx, opts...); err != nil { return err } @@ -33,3 +33,19 @@ func (s *SplitStore) gcBlockstore(b bstore.Blockstore, opts []bstore.BlockstoreG return fmt.Errorf("blockstore doesn't support garbage collection: %T", b) } + +func (s *SplitStore) gcBlockstoreOnce(b bstore.Blockstore, opts []bstore.BlockstoreGCOption) error { + if gc, ok := b.(bstore.BlockstoreGCOnce); ok { + log.Debug("gc blockstore once") + startGC := time.Now() + + if err := gc.GCOnce(s.ctx, opts...); err != nil { + return err + } + + log.Debugw("gc blockstore once done", "took", time.Since(startGC)) + return nil + } + + return fmt.Errorf("blockstore doesn't support gc once: %T", b) +} diff --git a/blockstore/splitstore/splitstore_prune.go b/blockstore/splitstore/splitstore_prune.go index 4a38045e8..08d5b8cca 100644 --- a/blockstore/splitstore/splitstore_prune.go +++ b/blockstore/splitstore/splitstore_prune.go @@ -47,6 +47,23 @@ var ( PruneThreshold = 7 * build.Finality ) +// GCHotstore runs online GC on the chain state in the hotstore according the to options specified +func (s *SplitStore) GCHotStore(opts api.HotGCOpts) error { + if opts.Moving { + gcOpts := []bstore.BlockstoreGCOption{bstore.WithFullGC(true)} + return s.gcBlockstore(s.hot, gcOpts) + } + + gcOpts := []bstore.BlockstoreGCOption{bstore.WithThreshold(opts.Threshold)} + var err error + if opts.Periodic { + err = s.gcBlockstore(s.hot, gcOpts) + } else { + err = s.gcBlockstoreOnce(s.hot, gcOpts) + } + return err +} + // PruneChain instructs the SplitStore to prune chain state in the coldstore, according to the // options specified. func (s *SplitStore) PruneChain(opts api.PruneOpts) error { diff --git a/build/openrpc/full.json.gz b/build/openrpc/full.json.gz index 9f3cec628..bb1567be5 100644 Binary files a/build/openrpc/full.json.gz and b/build/openrpc/full.json.gz differ diff --git a/build/openrpc/gateway.json.gz b/build/openrpc/gateway.json.gz index f16755589..101472808 100644 Binary files a/build/openrpc/gateway.json.gz and b/build/openrpc/gateway.json.gz differ diff --git a/build/openrpc/miner.json.gz b/build/openrpc/miner.json.gz index f7b102bf1..b23389baf 100644 Binary files a/build/openrpc/miner.json.gz and b/build/openrpc/miner.json.gz differ diff --git a/build/openrpc/worker.json.gz b/build/openrpc/worker.json.gz index 9eaf607f7..1495059f9 100644 Binary files a/build/openrpc/worker.json.gz and b/build/openrpc/worker.json.gz differ diff --git a/cli/chain.go b/cli/chain.go index c762faa8c..4344b0773 100644 --- a/cli/chain.go +++ b/cli/chain.go @@ -1591,7 +1591,64 @@ func createExportFile(app *cli.App, path string) (io.WriteCloser, error) { var ChainPruneCmd = &cli.Command{ Name: "prune", - Usage: "prune the stored chain state and perform garbage collection", + Usage: "splitstore gc", + Subcommands: []*cli.Command{ + chainPruneColdCmd, + chainPruneHotGCCmd, + chainPruneHotMovingGCCmd, + }, +} + +var chainPruneHotGCCmd = &cli.Command{ + Name: "hot", + Usage: "run online (badger vlog) garbage collection on hotstore", + Flags: []cli.Flag{ + &cli.Float64Flag{Name: "threshold", Value: 0.01, Usage: "Threshold of vlog garbage for gc"}, + &cli.BoolFlag{Name: "periodic", Value: false, Usage: "Run periodic gc over multiple vlogs. Otherwise run gc once"}, + }, + Action: func(cctx *cli.Context) error { + api, closer, err := GetFullNodeAPIV1(cctx) + if err != nil { + return err + } + defer closer() + ctx := ReqContext(cctx) + opts := lapi.HotGCOpts{} + opts.Periodic = cctx.Bool("periodic") + opts.Threshold = cctx.Float64("threshold") + + gcStart := time.Now() + err = api.ChainHotGC(ctx, opts) + gcTime := time.Since(gcStart) + fmt.Printf("Online GC took %v (periodic <%t> threshold <%f>)", gcTime, opts.Periodic, opts.Threshold) + return err + }, +} + +var chainPruneHotMovingGCCmd = &cli.Command{ + Name: "hot-moving", + Usage: "run moving gc on hotstore", + Action: func(cctx *cli.Context) error { + api, closer, err := GetFullNodeAPIV1(cctx) + if err != nil { + return err + } + defer closer() + ctx := ReqContext(cctx) + opts := lapi.HotGCOpts{} + opts.Moving = true + + gcStart := time.Now() + err = api.ChainHotGC(ctx, opts) + gcTime := time.Since(gcStart) + fmt.Printf("Moving GC took %v", gcTime) + return err + }, +} + +var chainPruneColdCmd = &cli.Command{ + Name: "compact-cold", + Usage: "force splitstore compaction on cold store state and run gc", Flags: []cli.Flag{ &cli.BoolFlag{ Name: "online-gc", diff --git a/documentation/en/api-v1-unstable-methods.md b/documentation/en/api-v1-unstable-methods.md index 93f78f573..5516acb44 100644 --- a/documentation/en/api-v1-unstable-methods.md +++ b/documentation/en/api-v1-unstable-methods.md @@ -29,6 +29,7 @@ * [ChainGetTipSetByHeight](#ChainGetTipSetByHeight) * [ChainHasObj](#ChainHasObj) * [ChainHead](#ChainHead) + * [ChainHotGC](#ChainHotGC) * [ChainNotify](#ChainNotify) * [ChainPrune](#ChainPrune) * [ChainPutObj](#ChainPutObj) @@ -1074,6 +1075,26 @@ Response: } ``` +### ChainHotGC +ChainHotGC does online (badger) GC on the hot store; only supported if you are using +the splitstore + + +Perms: admin + +Inputs: +```json +[ + { + "Threshold": 12.3, + "Periodic": true, + "Moving": true + } +] +``` + +Response: `{}` + ### ChainNotify ChainNotify returns channel with chain head updates. First message is guaranteed to be of len == 1, and type == 'current'. @@ -1098,7 +1119,7 @@ Response: ``` ### ChainPrune -ChainPrune prunes the stored chain state and garbage collects; only supported if you +ChainPrune forces compaction on cold store and garbage collects; only supported if you are using the splitstore diff --git a/documentation/en/cli-lotus.md b/documentation/en/cli-lotus.md index 7800503aa..4458599ab 100644 --- a/documentation/en/cli-lotus.md +++ b/documentation/en/cli-lotus.md @@ -2117,7 +2117,7 @@ COMMANDS: decode decode various types encode encode various types disputer interact with the window post disputer - prune prune the stored chain state and perform garbage collection + prune splitstore gc help, h Shows a list of commands or help for one command OPTIONS: @@ -2465,10 +2465,29 @@ OPTIONS: ### lotus chain prune ``` NAME: - lotus chain prune - prune the stored chain state and perform garbage collection + lotus chain prune - splitstore gc USAGE: - lotus chain prune [command options] [arguments...] + lotus chain prune command [command options] [arguments...] + +COMMANDS: + compact-cold force splitstore compaction on cold store state and run gc + hot run online (badger vlog) garbage collection on hotstore + hot-moving run moving gc on hotstore + help, h Shows a list of commands or help for one command + +OPTIONS: + --help, -h show help (default: false) + +``` + +#### lotus chain prune compact-cold +``` +NAME: + lotus chain prune compact-cold - force splitstore compaction on cold store state and run gc + +USAGE: + lotus chain prune compact-cold [command options] [arguments...] OPTIONS: --moving-gc use moving gc for garbage collecting the coldstore (default: false) @@ -2477,6 +2496,33 @@ OPTIONS: ``` +#### lotus chain prune hot +``` +NAME: + lotus chain prune hot - run online (badger vlog) garbage collection on hotstore + +USAGE: + lotus chain prune hot [command options] [arguments...] + +OPTIONS: + --periodic Run periodic gc over multiple vlogs. Otherwise run gc once (default: false) + --threshold value Threshold of vlog garbage for gc (default: 0.01) + +``` + +#### lotus chain prune hot-moving +``` +NAME: + lotus chain prune hot-moving - run moving gc on hotstore + +USAGE: + lotus chain prune hot-moving [command options] [arguments...] + +OPTIONS: + --help, -h show help (default: false) + +``` + ## lotus log ``` NAME: diff --git a/node/impl/full/chain.go b/node/impl/full/chain.go index ddaae4c98..465b680ef 100644 --- a/node/impl/full/chain.go +++ b/node/impl/full/chain.go @@ -748,3 +748,14 @@ func (a *ChainAPI) ChainPrune(ctx context.Context, opts api.PruneOpts) error { return pruner.PruneChain(opts) } + +func (a *ChainAPI) ChainHotGC(ctx context.Context, opts api.HotGCOpts) error { + pruner, ok := a.BaseBlockstore.(interface { + GCHotStore(api.HotGCOpts) error + }) + if !ok { + return xerrors.Errorf("base blockstore does not support hot GC (%T)", a.BaseBlockstore) + } + + return pruner.GCHotStore(opts) +}