Merge pull request #3876 from filecoin-project/feat/pruning-command
add command to (slowly) prune lotus chain datastore
This commit is contained in:
commit
44e352cd8e
@ -1191,13 +1191,6 @@ func recurseLinks(bs bstore.Blockstore, walked *cid.Set, root cid.Cid, in []cid.
|
||||
}
|
||||
|
||||
func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs bool, w io.Writer) error {
|
||||
if ts == nil {
|
||||
ts = cs.GetHeaviestTipSet()
|
||||
}
|
||||
|
||||
seen := cid.NewSet()
|
||||
walked := cid.NewSet()
|
||||
|
||||
h := &car.CarHeader{
|
||||
Roots: ts.Cids(),
|
||||
Version: 1,
|
||||
@ -1207,6 +1200,28 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo
|
||||
return xerrors.Errorf("failed to write car header: %s", err)
|
||||
}
|
||||
|
||||
return cs.WalkSnapshot(ctx, ts, inclRecentRoots, skipOldMsgs, func(c cid.Cid) error {
|
||||
blk, err := cs.bs.Get(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("writing object to car, bs.Get: %w", err)
|
||||
}
|
||||
|
||||
if err := carutil.LdWrite(w, c.Bytes(), blk.RawData()); err != nil {
|
||||
return xerrors.Errorf("failed to write block to car output: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs bool, cb func(cid.Cid) error) error {
|
||||
if ts == nil {
|
||||
ts = cs.GetHeaviestTipSet()
|
||||
}
|
||||
|
||||
seen := cid.NewSet()
|
||||
walked := cid.NewSet()
|
||||
|
||||
blocksToWalk := ts.Cids()
|
||||
currentMinHeight := ts.Height()
|
||||
|
||||
@ -1215,15 +1230,15 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := cb(blk); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
data, err := cs.bs.Get(blk)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting block: %w", err)
|
||||
}
|
||||
|
||||
if err := carutil.LdWrite(w, blk.Bytes(), data.RawData()); err != nil {
|
||||
return xerrors.Errorf("failed to write block to car output: %w", err)
|
||||
}
|
||||
|
||||
var b types.BlockHeader
|
||||
if err := b.UnmarshalCBOR(bytes.NewBuffer(data.RawData())); err != nil {
|
||||
return xerrors.Errorf("unmarshaling block header (cid=%s): %w", blk, err)
|
||||
@ -1270,14 +1285,11 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo
|
||||
if c.Prefix().Codec != cid.DagCBOR {
|
||||
continue
|
||||
}
|
||||
data, err := cs.bs.Get(c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("writing object to car (get %s): %w", c, err)
|
||||
|
||||
if err := cb(c); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := carutil.LdWrite(w, c.Bytes(), data.RawData()); err != nil {
|
||||
return xerrors.Errorf("failed to write out car object: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -39,6 +39,7 @@ func main() {
|
||||
consensusCmd,
|
||||
serveDealStatsCmd,
|
||||
syncCmd,
|
||||
stateTreePruneCmd,
|
||||
}
|
||||
|
||||
app := &cli.App{
|
||||
|
289
cmd/lotus-shed/pruning.go
Normal file
289
cmd/lotus-shed/pruning.go
Normal file
@ -0,0 +1,289 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/lotus/chain/store"
|
||||
"github.com/filecoin-project/lotus/chain/vm"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
||||
"github.com/filecoin-project/lotus/lib/blockstore"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
"github.com/ipfs/bbloom"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipfs/go-datastore"
|
||||
"github.com/ipfs/go-datastore/query"
|
||||
dshelp "github.com/ipfs/go-ipfs-ds-help"
|
||||
"github.com/urfave/cli/v2"
|
||||
"golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
type cidSet interface {
|
||||
Add(cid.Cid)
|
||||
Has(cid.Cid) bool
|
||||
HasRaw([]byte) bool
|
||||
Len() int
|
||||
}
|
||||
|
||||
type bloomSet struct {
|
||||
bloom *bbloom.Bloom
|
||||
}
|
||||
|
||||
func newBloomSet(size int64) (*bloomSet, error) {
|
||||
b, err := bbloom.New(float64(size), 3)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &bloomSet{bloom: b}, nil
|
||||
}
|
||||
|
||||
func (bs *bloomSet) Add(c cid.Cid) {
|
||||
bs.bloom.Add(c.Hash())
|
||||
|
||||
}
|
||||
|
||||
func (bs *bloomSet) Has(c cid.Cid) bool {
|
||||
return bs.bloom.Has(c.Hash())
|
||||
}
|
||||
|
||||
func (bs *bloomSet) HasRaw(b []byte) bool {
|
||||
return bs.bloom.Has(b)
|
||||
}
|
||||
|
||||
func (bs *bloomSet) Len() int {
|
||||
return int(bs.bloom.ElementsAdded())
|
||||
}
|
||||
|
||||
type mapSet struct {
|
||||
m map[string]struct{}
|
||||
}
|
||||
|
||||
func newMapSet() *mapSet {
|
||||
return &mapSet{m: make(map[string]struct{})}
|
||||
}
|
||||
|
||||
func (bs *mapSet) Add(c cid.Cid) {
|
||||
bs.m[string(c.Hash())] = struct{}{}
|
||||
}
|
||||
|
||||
func (bs *mapSet) Has(c cid.Cid) bool {
|
||||
_, ok := bs.m[string(c.Hash())]
|
||||
return ok
|
||||
}
|
||||
|
||||
func (bs *mapSet) HasRaw(b []byte) bool {
|
||||
_, ok := bs.m[string(b)]
|
||||
return ok
|
||||
}
|
||||
|
||||
func (bs *mapSet) Len() int {
|
||||
return len(bs.m)
|
||||
}
|
||||
|
||||
var stateTreePruneCmd = &cli.Command{
|
||||
Name: "state-prune",
|
||||
Description: "Deletes old state root data from local chainstore",
|
||||
Flags: []cli.Flag{
|
||||
&cli.StringFlag{
|
||||
Name: "repo",
|
||||
Value: "~/.lotus",
|
||||
},
|
||||
&cli.Int64Flag{
|
||||
Name: "keep-from-lookback",
|
||||
Usage: "keep stateroots at or newer than the current height minus this lookback",
|
||||
Value: 1800, // 2 x finality
|
||||
},
|
||||
&cli.IntFlag{
|
||||
Name: "delete-up-to",
|
||||
Usage: "delete up to the given number of objects (used to run a faster 'partial' sync)",
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "use-bloom-set",
|
||||
Usage: "use a bloom filter for the 'good' set instead of a map, reduces memory usage but may not clean up as much",
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "dry-run",
|
||||
Usage: "only enumerate the good set, don't do any deletions",
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "only-ds-gc",
|
||||
Usage: "Only run datastore GC",
|
||||
},
|
||||
&cli.IntFlag{
|
||||
Name: "gc-count",
|
||||
Usage: "number of times to run gc",
|
||||
Value: 20,
|
||||
},
|
||||
},
|
||||
Action: func(cctx *cli.Context) error {
|
||||
ctx := context.TODO()
|
||||
|
||||
fsrepo, err := repo.NewFS(cctx.String("repo"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
lkrepo, err := fsrepo.Lock(repo.FullNode)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer lkrepo.Close() //nolint:errcheck
|
||||
|
||||
ds, err := lkrepo.Datastore("/chain")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer ds.Close() //nolint:errcheck
|
||||
|
||||
mds, err := lkrepo.Datastore("/metadata")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer mds.Close() //nolint:errcheck
|
||||
|
||||
if cctx.Bool("only-ds-gc") {
|
||||
gcds, ok := ds.(datastore.GCDatastore)
|
||||
if ok {
|
||||
fmt.Println("running datastore gc....")
|
||||
for i := 0; i < cctx.Int("gc-count"); i++ {
|
||||
if err := gcds.CollectGarbage(); err != nil {
|
||||
return xerrors.Errorf("datastore GC failed: %w", err)
|
||||
}
|
||||
}
|
||||
fmt.Println("gc complete!")
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("datastore doesnt support gc")
|
||||
}
|
||||
|
||||
bs := blockstore.NewBlockstore(ds)
|
||||
|
||||
cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier))
|
||||
if err := cs.Load(); err != nil {
|
||||
return fmt.Errorf("loading chainstore: %w", err)
|
||||
}
|
||||
|
||||
var goodSet cidSet
|
||||
if cctx.Bool("use-bloom-set") {
|
||||
bset, err := newBloomSet(10000000)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
goodSet = bset
|
||||
} else {
|
||||
goodSet = newMapSet()
|
||||
}
|
||||
|
||||
ts := cs.GetHeaviestTipSet()
|
||||
|
||||
rrLb := abi.ChainEpoch(cctx.Int64("keep-from-lookback"))
|
||||
|
||||
if err := cs.WalkSnapshot(ctx, ts, rrLb, true, func(c cid.Cid) error {
|
||||
if goodSet.Len()%20 == 0 {
|
||||
fmt.Printf("\renumerating keep set: %d ", goodSet.Len())
|
||||
}
|
||||
goodSet.Add(c)
|
||||
return nil
|
||||
}); err != nil {
|
||||
return fmt.Errorf("snapshot walk failed: %w", err)
|
||||
}
|
||||
|
||||
fmt.Println()
|
||||
fmt.Printf("Successfully marked keep set! (%d objects)\n", goodSet.Len())
|
||||
|
||||
if cctx.Bool("dry-run") {
|
||||
return nil
|
||||
}
|
||||
|
||||
var b datastore.Batch
|
||||
var batchCount int
|
||||
markForRemoval := func(c cid.Cid) error {
|
||||
if b == nil {
|
||||
nb, err := ds.Batch()
|
||||
if err != nil {
|
||||
return fmt.Errorf("opening batch: %w", err)
|
||||
}
|
||||
|
||||
b = nb
|
||||
}
|
||||
batchCount++
|
||||
|
||||
if err := b.Delete(dshelp.MultihashToDsKey(c.Hash())); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if batchCount > 100 {
|
||||
if err := b.Commit(); err != nil {
|
||||
return xerrors.Errorf("failed to commit batch deletes: %w", err)
|
||||
}
|
||||
b = nil
|
||||
batchCount = 0
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
res, err := ds.Query(query.Query{KeysOnly: true})
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to query datastore: %w", err)
|
||||
}
|
||||
|
||||
dupTo := cctx.Int("delete-up-to")
|
||||
|
||||
var deleteCount int
|
||||
var goodHits int
|
||||
for {
|
||||
v, ok := res.NextSync()
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
|
||||
bk, err := dshelp.BinaryFromDsKey(datastore.RawKey(v.Key[len("/blocks"):]))
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to parse key: %w", err)
|
||||
}
|
||||
|
||||
if goodSet.HasRaw(bk) {
|
||||
goodHits++
|
||||
continue
|
||||
}
|
||||
|
||||
nc := cid.NewCidV1(cid.Raw, bk)
|
||||
|
||||
deleteCount++
|
||||
if err := markForRemoval(nc); err != nil {
|
||||
return fmt.Errorf("failed to remove cid %s: %w", nc, err)
|
||||
}
|
||||
|
||||
if deleteCount%20 == 0 {
|
||||
fmt.Printf("\rdeleting %d objects (good hits: %d)... ", deleteCount, goodHits)
|
||||
}
|
||||
|
||||
if dupTo != 0 && deleteCount > dupTo {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if b != nil {
|
||||
if err := b.Commit(); err != nil {
|
||||
return xerrors.Errorf("failed to commit final batch delete: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
gcds, ok := ds.(datastore.GCDatastore)
|
||||
if ok {
|
||||
fmt.Println("running datastore gc....")
|
||||
for i := 0; i < cctx.Int("gc-count"); i++ {
|
||||
if err := gcds.CollectGarbage(); err != nil {
|
||||
return xerrors.Errorf("datastore GC failed: %w", err)
|
||||
}
|
||||
}
|
||||
fmt.Println("gc complete!")
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
}
|
1
go.mod
1
go.mod
@ -51,6 +51,7 @@ require (
|
||||
github.com/hashicorp/go-multierror v1.1.0
|
||||
github.com/hashicorp/golang-lru v0.5.4
|
||||
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d
|
||||
github.com/ipfs/bbloom v0.0.4
|
||||
github.com/ipfs/go-bitswap v0.2.20
|
||||
github.com/ipfs/go-block-format v0.0.2
|
||||
github.com/ipfs/go-blockservice v0.1.4-0.20200624145336-a978cec6e834
|
||||
|
Loading…
Reference in New Issue
Block a user