chain prune hot -- hotstore online gc
This commit is contained in:
parent
4588523de7
commit
71b21db0d9
@ -183,10 +183,14 @@ type FullNode interface {
|
||||
// nodes.
|
||||
ChainExportRangeInternal(ctx context.Context, head, tail types.TipSetKey, cfg ChainExportConfig) error //perm:admin
|
||||
|
||||
// ChainPrune prunes the stored chain state and garbage collects; only supported if you
|
||||
// ChainPrune forces compaction on cold store and garbage collects; only supported if you
|
||||
// are using the splitstore
|
||||
ChainPrune(ctx context.Context, opts PruneOpts) error //perm:admin
|
||||
|
||||
// ChainHotGC does online (badger) GC on the hot store; only supported if you are using
|
||||
// the splitstore
|
||||
ChainHotGC(ctx context.Context, opts HotGCOpts) error //perm:admin
|
||||
|
||||
// ChainCheckBlockstore performs an (asynchronous) health check on the chain/state blockstore
|
||||
// if supported by the underlying implementation.
|
||||
ChainCheckBlockstore(context.Context) error //perm:admin
|
||||
@ -1354,6 +1358,11 @@ type PruneOpts struct {
|
||||
RetainState int64
|
||||
}
|
||||
|
||||
type HotGCOpts struct {
|
||||
Threshold float64
|
||||
Periodic bool
|
||||
}
|
||||
|
||||
type EthTxReceipt struct {
|
||||
TransactionHash ethtypes.EthHash `json:"transactionHash"`
|
||||
TransactionIndex ethtypes.EthUint64 `json:"transactionIndex"`
|
||||
|
@ -394,6 +394,20 @@ func (mr *MockFullNodeMockRecorder) ChainHead(arg0 interface{}) *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainHead", reflect.TypeOf((*MockFullNode)(nil).ChainHead), arg0)
|
||||
}
|
||||
|
||||
// ChainHotGC mocks base method.
|
||||
func (m *MockFullNode) ChainHotGC(arg0 context.Context, arg1 api.HotGCOpts) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "ChainHotGC", arg0, arg1)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// ChainHotGC indicates an expected call of ChainHotGC.
|
||||
func (mr *MockFullNodeMockRecorder) ChainHotGC(arg0, arg1 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainHotGC", reflect.TypeOf((*MockFullNode)(nil).ChainHotGC), arg0, arg1)
|
||||
}
|
||||
|
||||
// ChainNotify mocks base method.
|
||||
func (m *MockFullNode) ChainNotify(arg0 context.Context) (<-chan []*api.HeadChange, error) {
|
||||
m.ctrl.T.Helper()
|
||||
|
@ -172,6 +172,8 @@ type FullNodeMethods struct {
|
||||
|
||||
ChainHead func(p0 context.Context) (*types.TipSet, error) `perm:"read"`
|
||||
|
||||
ChainHotGC func(p0 context.Context, p1 HotGCOpts) error `perm:"admin"`
|
||||
|
||||
ChainNotify func(p0 context.Context) (<-chan []*HeadChange, error) `perm:"read"`
|
||||
|
||||
ChainPrune func(p0 context.Context, p1 PruneOpts) error `perm:"admin"`
|
||||
@ -1617,6 +1619,17 @@ func (s *FullNodeStub) ChainHead(p0 context.Context) (*types.TipSet, error) {
|
||||
return nil, ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *FullNodeStruct) ChainHotGC(p0 context.Context, p1 HotGCOpts) error {
|
||||
if s.Internal.ChainHotGC == nil {
|
||||
return ErrNotSupported
|
||||
}
|
||||
return s.Internal.ChainHotGC(p0, p1)
|
||||
}
|
||||
|
||||
func (s *FullNodeStub) ChainHotGC(p0 context.Context, p1 HotGCOpts) error {
|
||||
return ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *FullNodeStruct) ChainNotify(p0 context.Context) (<-chan []*HeadChange, error) {
|
||||
if s.Internal.ChainNotify == nil {
|
||||
return nil, ErrNotSupported
|
||||
|
64
api/v0api/gomock_reflect_637439595/prog.go
Normal file
64
api/v0api/gomock_reflect_637439595/prog.go
Normal file
@ -0,0 +1,64 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/gob"
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"reflect"
|
||||
|
||||
"github.com/golang/mock/mockgen/model"
|
||||
|
||||
pkg_ "github.com/filecoin-project/lotus/api/v0api"
|
||||
)
|
||||
|
||||
var output = flag.String("output", "", "The output file name, or empty to use stdout.")
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
|
||||
its := []struct {
|
||||
sym string
|
||||
typ reflect.Type
|
||||
}{
|
||||
|
||||
{"FullNode", reflect.TypeOf((*pkg_.FullNode)(nil)).Elem()},
|
||||
}
|
||||
pkg := &model.Package{
|
||||
// NOTE: This behaves contrary to documented behaviour if the
|
||||
// package name is not the final component of the import path.
|
||||
// The reflect package doesn't expose the package name, though.
|
||||
Name: path.Base("github.com/filecoin-project/lotus/api/v0api"),
|
||||
}
|
||||
|
||||
for _, it := range its {
|
||||
intf, err := model.InterfaceFromInterfaceType(it.typ)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Reflection: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
intf.Name = it.sym
|
||||
pkg.Interfaces = append(pkg.Interfaces, intf)
|
||||
}
|
||||
|
||||
outfile := os.Stdout
|
||||
if len(*output) != 0 {
|
||||
var err error
|
||||
outfile, err = os.Create(*output)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "failed to open output file %q", *output)
|
||||
}
|
||||
defer func() {
|
||||
if err := outfile.Close(); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "failed to close output file %q", *output)
|
||||
os.Exit(1)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
if err := gob.NewEncoder(outfile).Encode(pkg); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "gob encode: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
@ -55,7 +55,7 @@ func VersionForType(nodeType NodeType) (Version, error) {
|
||||
// semver versions of the rpc api exposed
|
||||
var (
|
||||
FullAPIVersion0 = newVer(1, 5, 0)
|
||||
FullAPIVersion1 = newVer(2, 3, 0)
|
||||
FullAPIVersion1 = newVer(2, 4, 0)
|
||||
|
||||
MinerAPIVersion0 = newVer(1, 5, 0)
|
||||
WorkerAPIVersion0 = newVer(1, 7, 0)
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
pool "github.com/libp2p/go-buffer-pool"
|
||||
"github.com/multiformats/go-base32"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/blockstore"
|
||||
)
|
||||
@ -45,6 +46,7 @@ const (
|
||||
MemoryMap = options.MemoryMap
|
||||
// LoadToRAM is equivalent to badger/options.LoadToRAM.
|
||||
LoadToRAM = options.LoadToRAM
|
||||
defaultGCThreshold = 0.125
|
||||
)
|
||||
|
||||
// Options embeds the badger options themselves, and augments them with
|
||||
@ -439,7 +441,7 @@ func (b *Blockstore) deleteDB(path string) {
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Blockstore) onlineGC() error {
|
||||
func (b *Blockstore) onlineGC(ctx context.Context, threshold float64) error {
|
||||
b.lockDB()
|
||||
defer b.unlockDB()
|
||||
|
||||
@ -448,6 +450,9 @@ func (b *Blockstore) onlineGC() error {
|
||||
if nworkers < 2 {
|
||||
nworkers = 2
|
||||
}
|
||||
if nworkers > 7 { // max out at 1 goroutine per badger level
|
||||
nworkers = 7
|
||||
}
|
||||
|
||||
err := b.db.Flatten(nworkers)
|
||||
if err != nil {
|
||||
@ -455,7 +460,12 @@ func (b *Blockstore) onlineGC() error {
|
||||
}
|
||||
|
||||
for err == nil {
|
||||
err = b.db.RunValueLogGC(0.125)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err = ctx.Err()
|
||||
default:
|
||||
err = b.db.RunValueLogGC(threshold)
|
||||
}
|
||||
}
|
||||
|
||||
if err == badger.ErrNoRewrite {
|
||||
@ -468,7 +478,7 @@ func (b *Blockstore) onlineGC() error {
|
||||
|
||||
// CollectGarbage compacts and runs garbage collection on the value log;
|
||||
// implements the BlockstoreGC trait
|
||||
func (b *Blockstore) CollectGarbage(opts ...blockstore.BlockstoreGCOption) error {
|
||||
func (b *Blockstore) CollectGarbage(ctx context.Context, opts ...blockstore.BlockstoreGCOption) error {
|
||||
if err := b.access(); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -485,8 +495,48 @@ func (b *Blockstore) CollectGarbage(opts ...blockstore.BlockstoreGCOption) error
|
||||
if options.FullGC {
|
||||
return b.movingGC()
|
||||
}
|
||||
threshold := options.Threshold
|
||||
if threshold == 0 {
|
||||
threshold = defaultGCThreshold
|
||||
}
|
||||
return b.onlineGC(ctx, threshold)
|
||||
}
|
||||
|
||||
return b.onlineGC()
|
||||
// GCOnce runs garbage collection on the value log;
|
||||
// implements BlockstoreGCOnce trait
|
||||
func (b *Blockstore) GCOnce(ctx context.Context, opts ...blockstore.BlockstoreGCOption) error {
|
||||
if err := b.access(); err != nil {
|
||||
return err
|
||||
}
|
||||
defer b.viewers.Done()
|
||||
|
||||
var options blockstore.BlockstoreGCOptions
|
||||
for _, opt := range opts {
|
||||
err := opt(&options)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if options.FullGC {
|
||||
return xerrors.Errorf("FullGC option specified for GCOnce but full GC is non incremental")
|
||||
}
|
||||
|
||||
threshold := options.Threshold
|
||||
if threshold == 0 {
|
||||
threshold = defaultGCThreshold
|
||||
}
|
||||
|
||||
b.lockDB()
|
||||
defer b.unlockDB()
|
||||
|
||||
// Note no compaction needed before single GC as we will hit at most one vlog anyway
|
||||
err := b.db.RunValueLogGC(threshold)
|
||||
if err == badger.ErrNoRewrite {
|
||||
// not really an error in this case, it signals the end of GC
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Size returns the aggregate size of the blockstore
|
||||
|
@ -145,7 +145,7 @@ func testMove(t *testing.T, optsF func(string) Options) {
|
||||
return nil
|
||||
})
|
||||
g.Go(func() error {
|
||||
return db.CollectGarbage(blockstore.WithFullGC(true))
|
||||
return db.CollectGarbage(ctx, blockstore.WithFullGC(true))
|
||||
})
|
||||
|
||||
err = g.Wait()
|
||||
@ -230,7 +230,7 @@ func testMove(t *testing.T, optsF func(string) Options) {
|
||||
checkPath()
|
||||
|
||||
// now do another FullGC to test the double move and following of symlinks
|
||||
if err := db.CollectGarbage(blockstore.WithFullGC(true)); err != nil {
|
||||
if err := db.CollectGarbage(ctx, blockstore.WithFullGC(true)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
|
@ -36,7 +36,12 @@ type BlockstoreIterator interface {
|
||||
|
||||
// BlockstoreGC is a trait for blockstores that support online garbage collection
|
||||
type BlockstoreGC interface {
|
||||
CollectGarbage(options ...BlockstoreGCOption) error
|
||||
CollectGarbage(ctx context.Context, options ...BlockstoreGCOption) error
|
||||
}
|
||||
|
||||
// BlockstoreGCOnce is a trait for a blockstore that supports incremental online garbage collection
|
||||
type BlockstoreGCOnce interface {
|
||||
GCOnce(ctx context.Context, options ...BlockstoreGCOption) error
|
||||
}
|
||||
|
||||
// BlockstoreGCOption is a functional interface for controlling blockstore GC options
|
||||
@ -45,6 +50,8 @@ type BlockstoreGCOption = func(*BlockstoreGCOptions) error
|
||||
// BlockstoreGCOptions is a struct with GC options
|
||||
type BlockstoreGCOptions struct {
|
||||
FullGC bool
|
||||
// fraction of garbage in badger vlog before its worth processing in online GC
|
||||
Threshold float64
|
||||
}
|
||||
|
||||
func WithFullGC(fullgc bool) BlockstoreGCOption {
|
||||
@ -54,6 +61,13 @@ func WithFullGC(fullgc bool) BlockstoreGCOption {
|
||||
}
|
||||
}
|
||||
|
||||
func WithThreshold(threshold float64) BlockstoreGCOption {
|
||||
return func(opts *BlockstoreGCOptions) error {
|
||||
opts.Threshold = threshold
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// BlockstoreSize is a trait for on-disk blockstores that can report their size
|
||||
type BlockstoreSize interface {
|
||||
Size() (int64, error)
|
||||
|
@ -794,7 +794,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
|
||||
|
||||
// we are done; do some housekeeping
|
||||
s.endTxnProtect()
|
||||
s.gcHotstore()
|
||||
s.gcHotAfterCompaction()
|
||||
|
||||
err = s.setBaseEpoch(boundaryEpoch)
|
||||
if err != nil {
|
||||
|
@ -7,7 +7,7 @@ import (
|
||||
bstore "github.com/filecoin-project/lotus/blockstore"
|
||||
)
|
||||
|
||||
func (s *SplitStore) gcHotstore() {
|
||||
func (s *SplitStore) gcHotAfterCompaction() {
|
||||
var opts []bstore.BlockstoreGCOption
|
||||
if s.cfg.HotStoreFullGCFrequency > 0 && s.compactionIndex%int64(s.cfg.HotStoreFullGCFrequency) == 0 {
|
||||
opts = append(opts, bstore.WithFullGC(true))
|
||||
@ -23,7 +23,7 @@ func (s *SplitStore) gcBlockstore(b bstore.Blockstore, opts []bstore.BlockstoreG
|
||||
log.Info("garbage collecting blockstore")
|
||||
startGC := time.Now()
|
||||
|
||||
if err := gc.CollectGarbage(opts...); err != nil {
|
||||
if err := gc.CollectGarbage(s.ctx, opts...); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -33,3 +33,19 @@ func (s *SplitStore) gcBlockstore(b bstore.Blockstore, opts []bstore.BlockstoreG
|
||||
|
||||
return fmt.Errorf("blockstore doesn't support garbage collection: %T", b)
|
||||
}
|
||||
|
||||
func (s *SplitStore) gcBlockstoreOnce(b bstore.Blockstore, opts []bstore.BlockstoreGCOption) error {
|
||||
if gc, ok := b.(bstore.BlockstoreGCOnce); ok {
|
||||
log.Debug("gc blockstore once")
|
||||
startGC := time.Now()
|
||||
|
||||
if err := gc.GCOnce(s.ctx, opts...); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debugw("gc blockstore once done", "took", time.Since(startGC))
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf("blockstore doesn't support gc once: %T", b)
|
||||
}
|
||||
|
@ -47,6 +47,19 @@ var (
|
||||
PruneThreshold = 7 * build.Finality
|
||||
)
|
||||
|
||||
// GCHotstore runs online GC on the chain state in the hotstore according the to options specified
|
||||
func (s *SplitStore) GCHotStore(opts api.HotGCOpts) error {
|
||||
gcOpts := []bstore.BlockstoreGCOption{bstore.WithThreshold(opts.Threshold)}
|
||||
|
||||
var err error
|
||||
if opts.Periodic {
|
||||
err = s.gcBlockstore(s.hot, gcOpts)
|
||||
} else {
|
||||
err = s.gcBlockstoreOnce(s.hot, gcOpts)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// PruneChain instructs the SplitStore to prune chain state in the coldstore, according to the
|
||||
// options specified.
|
||||
func (s *SplitStore) PruneChain(opts api.PruneOpts) error {
|
||||
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
33
cli/chain.go
33
cli/chain.go
@ -1591,7 +1591,38 @@ func createExportFile(app *cli.App, path string) (io.WriteCloser, error) {
|
||||
|
||||
var ChainPruneCmd = &cli.Command{
|
||||
Name: "prune",
|
||||
Usage: "prune the stored chain state and perform garbage collection",
|
||||
Usage: "splitstore gc",
|
||||
Subcommands: []*cli.Command{
|
||||
chainPruneColdCmd,
|
||||
chainPruneHotGCCmd,
|
||||
},
|
||||
}
|
||||
|
||||
var chainPruneHotGCCmd = &cli.Command{
|
||||
Name: "hot",
|
||||
Usage: "run online (badger vlog) garbage collection on hotstore",
|
||||
Flags: []cli.Flag{
|
||||
&cli.Float64Flag{Name: "threshold", Value: 0.01, Usage: "Threshold of vlog garbage for gc"},
|
||||
&cli.BoolFlag{Name: "periodic", Value: false, Usage: "Run periodic gc over multiple vlogs. Otherwise run gc once"},
|
||||
},
|
||||
Action: func(cctx *cli.Context) error {
|
||||
api, closer, err := GetFullNodeAPIV1(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer closer()
|
||||
ctx := ReqContext(cctx)
|
||||
opts := lapi.HotGCOpts{}
|
||||
opts.Periodic = cctx.Bool("periodic")
|
||||
opts.Threshold = cctx.Float64("threshold")
|
||||
|
||||
return api.ChainHotGC(ctx, opts)
|
||||
},
|
||||
}
|
||||
|
||||
var chainPruneColdCmd = &cli.Command{
|
||||
Name: "compact-cold",
|
||||
Usage: "force splitstore compaction on cold store state and run gc",
|
||||
Flags: []cli.Flag{
|
||||
&cli.BoolFlag{
|
||||
Name: "online-gc",
|
||||
|
@ -29,6 +29,7 @@
|
||||
* [ChainGetTipSetByHeight](#ChainGetTipSetByHeight)
|
||||
* [ChainHasObj](#ChainHasObj)
|
||||
* [ChainHead](#ChainHead)
|
||||
* [ChainHotGC](#ChainHotGC)
|
||||
* [ChainNotify](#ChainNotify)
|
||||
* [ChainPrune](#ChainPrune)
|
||||
* [ChainPutObj](#ChainPutObj)
|
||||
@ -1074,6 +1075,25 @@ Response:
|
||||
}
|
||||
```
|
||||
|
||||
### ChainHotGC
|
||||
ChainHotGC does online (badger) GC on the hot store; only supported if you are using
|
||||
the splitstore
|
||||
|
||||
|
||||
Perms: admin
|
||||
|
||||
Inputs:
|
||||
```json
|
||||
[
|
||||
{
|
||||
"Threshold": 12.3,
|
||||
"Periodic": true
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
Response: `{}`
|
||||
|
||||
### ChainNotify
|
||||
ChainNotify returns channel with chain head updates.
|
||||
First message is guaranteed to be of len == 1, and type == 'current'.
|
||||
@ -1098,7 +1118,7 @@ Response:
|
||||
```
|
||||
|
||||
### ChainPrune
|
||||
ChainPrune prunes the stored chain state and garbage collects; only supported if you
|
||||
ChainPrune forces compaction on cold store and garbage collects; only supported if you
|
||||
are using the splitstore
|
||||
|
||||
|
||||
|
@ -2117,7 +2117,7 @@ COMMANDS:
|
||||
decode decode various types
|
||||
encode encode various types
|
||||
disputer interact with the window post disputer
|
||||
prune prune the stored chain state and perform garbage collection
|
||||
prune splitstore gc
|
||||
help, h Shows a list of commands or help for one command
|
||||
|
||||
OPTIONS:
|
||||
@ -2465,10 +2465,28 @@ OPTIONS:
|
||||
### lotus chain prune
|
||||
```
|
||||
NAME:
|
||||
lotus chain prune - prune the stored chain state and perform garbage collection
|
||||
lotus chain prune - splitstore gc
|
||||
|
||||
USAGE:
|
||||
lotus chain prune [command options] [arguments...]
|
||||
lotus chain prune command [command options] [arguments...]
|
||||
|
||||
COMMANDS:
|
||||
compact-cold force splitstore compaction on cold store state and run gc
|
||||
hot run online (badger vlog) garbage collection on hotstore
|
||||
help, h Shows a list of commands or help for one command
|
||||
|
||||
OPTIONS:
|
||||
--help, -h show help (default: false)
|
||||
|
||||
```
|
||||
|
||||
#### lotus chain prune compact-cold
|
||||
```
|
||||
NAME:
|
||||
lotus chain prune compact-cold - force splitstore compaction on cold store state and run gc
|
||||
|
||||
USAGE:
|
||||
lotus chain prune compact-cold [command options] [arguments...]
|
||||
|
||||
OPTIONS:
|
||||
--moving-gc use moving gc for garbage collecting the coldstore (default: false)
|
||||
@ -2477,6 +2495,20 @@ OPTIONS:
|
||||
|
||||
```
|
||||
|
||||
#### lotus chain prune hot
|
||||
```
|
||||
NAME:
|
||||
lotus chain prune hot - run online (badger vlog) garbage collection on hotstore
|
||||
|
||||
USAGE:
|
||||
lotus chain prune hot [command options] [arguments...]
|
||||
|
||||
OPTIONS:
|
||||
--periodic Run periodic gc over multiple vlogs. Otherwise run gc once (default: false)
|
||||
--threshold value Threshold of vlog garbage for gc (default: 0.01)
|
||||
|
||||
```
|
||||
|
||||
## lotus log
|
||||
```
|
||||
NAME:
|
||||
|
@ -748,3 +748,14 @@ func (a *ChainAPI) ChainPrune(ctx context.Context, opts api.PruneOpts) error {
|
||||
|
||||
return pruner.PruneChain(opts)
|
||||
}
|
||||
|
||||
func (a *ChainAPI) ChainHotGC(ctx context.Context, opts api.HotGCOpts) error {
|
||||
pruner, ok := a.BaseBlockstore.(interface {
|
||||
GCHotStore(api.HotGCOpts) error
|
||||
})
|
||||
if !ok {
|
||||
return xerrors.Errorf("base blockstore does not support hot GC (%T)", a.BaseBlockstore)
|
||||
}
|
||||
|
||||
return pruner.GCHotStore(opts)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user