From 6670d22fb5714f4dfc651323b75a03631b0ddf97 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Tue, 15 Sep 2020 20:20:48 -0700 Subject: [PATCH 1/2] add command to (slowly) prune lotus chain datastore --- chain/store/store.go | 46 +++--- cmd/lotus-shed/main.go | 1 + cmd/lotus-shed/pruning.go | 290 ++++++++++++++++++++++++++++++++++++++ go.mod | 1 + 4 files changed, 321 insertions(+), 17 deletions(-) create mode 100644 cmd/lotus-shed/pruning.go diff --git a/chain/store/store.go b/chain/store/store.go index 20a7e3031..404befac7 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -1183,13 +1183,6 @@ func recurseLinks(bs bstore.Blockstore, walked *cid.Set, root cid.Cid, in []cid. } func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs bool, w io.Writer) error { - if ts == nil { - ts = cs.GetHeaviestTipSet() - } - - seen := cid.NewSet() - walked := cid.NewSet() - h := &car.CarHeader{ Roots: ts.Cids(), Version: 1, @@ -1199,6 +1192,28 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo return xerrors.Errorf("failed to write car header: %s", err) } + return cs.WalkSnapshot(ctx, ts, inclRecentRoots, skipOldMsgs, func(c cid.Cid) error { + blk, err := cs.bs.Get(c) + if err != nil { + return xerrors.Errorf("writing object to car, bs.Get: %w", err) + } + + if err := carutil.LdWrite(w, c.Bytes(), blk.RawData()); err != nil { + return xerrors.Errorf("failed to write block to car output: %w", err) + } + + return nil + }) +} + +func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs bool, cb func(cid.Cid) error) error { + if ts == nil { + ts = cs.GetHeaviestTipSet() + } + + seen := cid.NewSet() + walked := cid.NewSet() + blocksToWalk := ts.Cids() walkChain := func(blk cid.Cid) error { @@ -1206,15 +1221,15 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo return nil } + if err := cb(blk); err != nil { + return err + } + data, err := cs.bs.Get(blk) if err != nil { return xerrors.Errorf("getting block: %w", err) } - if err := carutil.LdWrite(w, blk.Bytes(), data.RawData()); err != nil { - return xerrors.Errorf("failed to write block to car output: %w", err) - } - var b types.BlockHeader if err := b.UnmarshalCBOR(bytes.NewBuffer(data.RawData())); err != nil { return xerrors.Errorf("unmarshaling block header (cid=%s): %w", blk, err) @@ -1254,14 +1269,11 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo if c.Prefix().Codec != cid.DagCBOR { continue } - data, err := cs.bs.Get(c) - if err != nil { - return xerrors.Errorf("writing object to car (get %s): %w", c, err) + + if err := cb(c); err != nil { + return err } - if err := carutil.LdWrite(w, c.Bytes(), data.RawData()); err != nil { - return xerrors.Errorf("failed to write out car object: %w", err) - } } } diff --git a/cmd/lotus-shed/main.go b/cmd/lotus-shed/main.go index cff3059b6..a64f981a1 100644 --- a/cmd/lotus-shed/main.go +++ b/cmd/lotus-shed/main.go @@ -35,6 +35,7 @@ func main() { mathCmd, mpoolStatsCmd, exportChainCmd, + stateTreePruneCmd, } app := &cli.App{ diff --git a/cmd/lotus-shed/pruning.go b/cmd/lotus-shed/pruning.go new file mode 100644 index 000000000..6f0c20541 --- /dev/null +++ b/cmd/lotus-shed/pruning.go @@ -0,0 +1,290 @@ +package main + +import ( + "context" + "fmt" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/vm" + "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" + "github.com/filecoin-project/lotus/lib/blockstore" + "github.com/filecoin-project/lotus/node/repo" + "github.com/ipfs/bbloom" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/query" + dshelp "github.com/ipfs/go-ipfs-ds-help" + "github.com/urfave/cli/v2" + "golang.org/x/xerrors" +) + +type cidSet interface { + Add(cid.Cid) + Has(cid.Cid) bool + HasRaw([]byte) bool + Len() int +} + +type bloomSet struct { + bloom *bbloom.Bloom +} + +func newBloomSet(size int64) (*bloomSet, error) { + b, err := bbloom.New(float64(size), 3) + if err != nil { + return nil, err + } + + return &bloomSet{bloom: b}, nil +} + +func (bs *bloomSet) Add(c cid.Cid) { + bs.bloom.Add(c.Hash()) + +} + +func (bs *bloomSet) Has(c cid.Cid) bool { + return bs.bloom.Has(c.Hash()) +} + +func (bs *bloomSet) HasRaw(b []byte) bool { + return bs.bloom.Has(b) +} + +func (bs *bloomSet) Len() int { + return int(bs.bloom.ElementsAdded()) +} + +type mapSet struct { + m map[string]struct{} +} + +func newMapSet() *mapSet { + return &mapSet{m: make(map[string]struct{})} +} + +func (bs *mapSet) Add(c cid.Cid) { + bs.m[string(c.Hash())] = struct{}{} +} + +func (bs *mapSet) Has(c cid.Cid) bool { + _, ok := bs.m[string(c.Hash())] + return ok +} + +func (bs *mapSet) HasRaw(b []byte) bool { + _, ok := bs.m[string(b)] + return ok +} + +func (bs *mapSet) Len() int { + return len(bs.m) +} + +var stateTreePruneCmd = &cli.Command{ + Name: "state-prune", + Description: "Deletes old state root data from local chainstore", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "repo", + Value: "~/.lotus", + }, + &cli.Int64Flag{ + Name: "keep-from-lookback", + Usage: "keep stateroots at or newer than the current height minus this lookback", + Value: 1800, // 2 x finality + }, + &cli.IntFlag{ + Name: "delete-up-to", + Usage: "delete up to the given number of objects (used to run a faster 'partial' sync)", + }, + &cli.BoolFlag{ + Name: "use-bloom-set", + Usage: "use a bloom filter for the 'good' set instead of a map, reduces memory usage but may not clean up as much", + }, + &cli.BoolFlag{ + Name: "dry-run", + Usage: "only enumerate the good set, don't do any deletions", + }, + &cli.BoolFlag{ + Name: "only-ds-gc", + Usage: "Only run datastore GC", + }, + &cli.IntFlag{ + Name: "gc-count", + Usage: "number of times to run gc", + Value: 20, + }, + }, + Action: func(cctx *cli.Context) error { + ctx := context.TODO() + + fsrepo, err := repo.NewFS(cctx.String("repo")) + if err != nil { + return err + } + + lkrepo, err := fsrepo.Lock(repo.FullNode) + if err != nil { + return err + } + + defer lkrepo.Close() //nolint:errcheck + + ds, err := lkrepo.Datastore("/chain") + if err != nil { + return err + } + + defer ds.Close() + + mds, err := lkrepo.Datastore("/metadata") + if err != nil { + return err + } + defer mds.Close() + + if cctx.Bool("only-ds-gc") { + gcds, ok := ds.(datastore.GCDatastore) + if ok { + fmt.Println("running datastore gc....") + for i := 0; i < cctx.Int("gc-count"); i++ { + if err := gcds.CollectGarbage(); err != nil { + return xerrors.Errorf("datastore GC failed: %w", err) + } + } + fmt.Println("gc complete!") + return nil + } else { + return fmt.Errorf("datastore doesnt support gc") + } + } + + bs := blockstore.NewBlockstore(ds) + + cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier)) + if err := cs.Load(); err != nil { + return fmt.Errorf("loading chainstore: %w", err) + } + + var goodSet cidSet + if cctx.Bool("use-bloom-set") { + bset, err := newBloomSet(10000000) + if err != nil { + return err + } + goodSet = bset + } else { + goodSet = newMapSet() + } + + ts := cs.GetHeaviestTipSet() + + rrLb := abi.ChainEpoch(cctx.Int64("keep-from-lookback")) + + if err := cs.WalkSnapshot(ctx, ts, rrLb, true, func(c cid.Cid) error { + if goodSet.Len()%20 == 0 { + fmt.Printf("\renumerating keep set: %d ", goodSet.Len()) + } + goodSet.Add(c) + return nil + }); err != nil { + return fmt.Errorf("snapshot walk failed: %w", err) + } + + fmt.Println() + fmt.Printf("Succesfully marked keep set! (%d objects)\n", goodSet.Len()) + + if cctx.Bool("dry-run") { + return nil + } + + var b datastore.Batch + var batchCount int + markForRemoval := func(c cid.Cid) error { + if b == nil { + nb, err := ds.Batch() + if err != nil { + return fmt.Errorf("opening batch: %w", err) + } + + b = nb + } + batchCount++ + + if err := b.Delete(dshelp.MultihashToDsKey(c.Hash())); err != nil { + return err + } + + if batchCount > 100 { + if err := b.Commit(); err != nil { + return xerrors.Errorf("failed to commit batch deletes: %w", err) + } + b = nil + batchCount = 0 + } + return nil + } + + res, err := ds.Query(query.Query{KeysOnly: true}) + if err != nil { + return xerrors.Errorf("failed to query datastore: %w", err) + } + + dupTo := cctx.Int("delete-up-to") + + var deleteCount int + var goodHits int + for { + v, ok := res.NextSync() + if !ok { + break + } + + bk, err := dshelp.BinaryFromDsKey(datastore.RawKey(v.Key[len("/blocks"):])) + if err != nil { + return xerrors.Errorf("failed to parse key: %w", err) + } + + if goodSet.HasRaw(bk) { + goodHits++ + continue + } + + nc := cid.NewCidV1(cid.Raw, bk) + + deleteCount++ + if err := markForRemoval(nc); err != nil { + return fmt.Errorf("failed to remove cid %s: %w", nc, err) + } + + if deleteCount%20 == 0 { + fmt.Printf("\rdeleting %d objects (good hits: %d)... ", deleteCount, goodHits) + } + + if dupTo != 0 && deleteCount > dupTo { + break + } + } + + if b != nil { + if err := b.Commit(); err != nil { + return xerrors.Errorf("failed to commit final batch delete: %w", err) + } + } + + gcds, ok := ds.(datastore.GCDatastore) + if ok { + fmt.Println("running datastore gc....") + for i := 0; i < cctx.Int("gc-count"); i++ { + if err := gcds.CollectGarbage(); err != nil { + return xerrors.Errorf("datastore GC failed: %w", err) + } + } + fmt.Println("gc complete!") + } + + return nil + }, +} diff --git a/go.mod b/go.mod index 65c2addb8..b82f60dc8 100644 --- a/go.mod +++ b/go.mod @@ -49,6 +49,7 @@ require ( github.com/hashicorp/go-multierror v1.1.0 github.com/hashicorp/golang-lru v0.5.4 github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d + github.com/ipfs/bbloom v0.0.4 github.com/ipfs/go-bitswap v0.2.20 github.com/ipfs/go-block-format v0.0.2 github.com/ipfs/go-blockservice v0.1.4-0.20200624145336-a978cec6e834 From 1319d7072ecf9094e6515e2de4bb8e41743a5948 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 2 Oct 2020 23:38:54 +0200 Subject: [PATCH 2/2] shed: Fix lint --- cmd/lotus-shed/pruning.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/cmd/lotus-shed/pruning.go b/cmd/lotus-shed/pruning.go index 6f0c20541..79158c3a3 100644 --- a/cmd/lotus-shed/pruning.go +++ b/cmd/lotus-shed/pruning.go @@ -137,13 +137,13 @@ var stateTreePruneCmd = &cli.Command{ return err } - defer ds.Close() + defer ds.Close() //nolint:errcheck mds, err := lkrepo.Datastore("/metadata") if err != nil { return err } - defer mds.Close() + defer mds.Close() //nolint:errcheck if cctx.Bool("only-ds-gc") { gcds, ok := ds.(datastore.GCDatastore) @@ -156,9 +156,8 @@ var stateTreePruneCmd = &cli.Command{ } fmt.Println("gc complete!") return nil - } else { - return fmt.Errorf("datastore doesnt support gc") } + return fmt.Errorf("datastore doesnt support gc") } bs := blockstore.NewBlockstore(ds) @@ -194,7 +193,7 @@ var stateTreePruneCmd = &cli.Command{ } fmt.Println() - fmt.Printf("Succesfully marked keep set! (%d objects)\n", goodSet.Len()) + fmt.Printf("Successfully marked keep set! (%d objects)\n", goodSet.Len()) if cctx.Bool("dry-run") { return nil