Merge pull request #6811 from filecoin-project/feat/splitstore-shed-utils

splitstore shed utils
This commit is contained in:
Łukasz Magiera 2021-07-26 13:44:27 +02:00 committed by GitHub
commit 5048c3f717
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 625 additions and 3 deletions

View File

@ -164,6 +164,13 @@ type FullNode interface {
// If oldmsgskip is set, messages from before the requested roots are also not included.
ChainExport(ctx context.Context, nroots abi.ChainEpoch, oldmsgskip bool, tsk types.TipSetKey) (<-chan []byte, error) //perm:read
// ChainCheckBlockstore performs an (asynchronous) health check on the chain/state blockstore
// if supported by the underlying implementation.
ChainCheckBlockstore(context.Context) error //perm:admin
// ChainBlockstoreInfo returns some basic information about the blockstore
ChainBlockstoreInfo(context.Context) (map[string]interface{}, error) //perm:read
// MethodGroup: Beacon
// The Beacon method group contains methods for interacting with the random beacon (DRAND)

View File

@ -105,6 +105,35 @@ func (mr *MockFullNodeMockRecorder) BeaconGetEntry(arg0, arg1 interface{}) *gomo
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BeaconGetEntry", reflect.TypeOf((*MockFullNode)(nil).BeaconGetEntry), arg0, arg1)
}
// ChainBlockstoreInfo mocks base method.
func (m *MockFullNode) ChainBlockstoreInfo(arg0 context.Context) (map[string]interface{}, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ChainBlockstoreInfo", arg0)
ret0, _ := ret[0].(map[string]interface{})
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ChainBlockstoreInfo indicates an expected call of ChainBlockstoreInfo.
func (mr *MockFullNodeMockRecorder) ChainBlockstoreInfo(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainBlockstoreInfo", reflect.TypeOf((*MockFullNode)(nil).ChainBlockstoreInfo), arg0)
}
// ChainCheckBlockstore mocks base method.
func (m *MockFullNode) ChainCheckBlockstore(arg0 context.Context) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ChainCheckBlockstore", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// ChainCheckBlockstore indicates an expected call of ChainCheckBlockstore.
func (mr *MockFullNodeMockRecorder) ChainCheckBlockstore(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainCheckBlockstore", reflect.TypeOf((*MockFullNode)(nil).ChainCheckBlockstore), arg0)
}
// ChainDeleteObj mocks base method.
func (m *MockFullNode) ChainDeleteObj(arg0 context.Context, arg1 cid.Cid) error {
m.ctrl.T.Helper()

View File

@ -98,6 +98,10 @@ type FullNodeStruct struct {
Internal struct {
BeaconGetEntry func(p0 context.Context, p1 abi.ChainEpoch) (*types.BeaconEntry, error) `perm:"read"`
ChainBlockstoreInfo func(p0 context.Context) (map[string]interface{}, error) `perm:"read"`
ChainCheckBlockstore func(p0 context.Context) error `perm:"admin"`
ChainDeleteObj func(p0 context.Context, p1 cid.Cid) error `perm:"admin"`
ChainExport func(p0 context.Context, p1 abi.ChainEpoch, p2 bool, p3 types.TipSetKey) (<-chan []byte, error) `perm:"read"`
@ -951,6 +955,22 @@ func (s *FullNodeStub) BeaconGetEntry(p0 context.Context, p1 abi.ChainEpoch) (*t
return nil, xerrors.New("method not supported")
}
func (s *FullNodeStruct) ChainBlockstoreInfo(p0 context.Context) (map[string]interface{}, error) {
return s.Internal.ChainBlockstoreInfo(p0)
}
func (s *FullNodeStub) ChainBlockstoreInfo(p0 context.Context) (map[string]interface{}, error) {
return *new(map[string]interface{}), xerrors.New("method not supported")
}
func (s *FullNodeStruct) ChainCheckBlockstore(p0 context.Context) error {
return s.Internal.ChainCheckBlockstore(p0)
}
func (s *FullNodeStub) ChainCheckBlockstore(p0 context.Context) error {
return xerrors.New("method not supported")
}
func (s *FullNodeStruct) ChainDeleteObj(p0 context.Context, p1 cid.Cid) error {
return s.Internal.ChainDeleteObj(p0, p1)
}

View File

@ -4,6 +4,8 @@ import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"runtime"
"sync"
@ -84,7 +86,8 @@ type Blockstore struct {
state int
viewers sync.WaitGroup
DB *badger.DB
DB *badger.DB
opts Options
prefixing bool
prefix []byte
@ -95,6 +98,7 @@ var _ blockstore.Blockstore = (*Blockstore)(nil)
var _ blockstore.Viewer = (*Blockstore)(nil)
var _ blockstore.BlockstoreIterator = (*Blockstore)(nil)
var _ blockstore.BlockstoreGC = (*Blockstore)(nil)
var _ blockstore.BlockstoreSize = (*Blockstore)(nil)
var _ io.Closer = (*Blockstore)(nil)
// Open creates a new badger-backed blockstore, with the supplied options.
@ -109,7 +113,7 @@ func Open(opts Options) (*Blockstore, error) {
return nil, fmt.Errorf("failed to open badger blockstore: %w", err)
}
bs := &Blockstore{DB: db}
bs := &Blockstore{DB: db, opts: opts}
if p := opts.Prefix; p != "" {
bs.prefixing = true
bs.prefix = []byte(p)
@ -191,6 +195,37 @@ func (b *Blockstore) CollectGarbage() error {
return err
}
// Size returns the aggregate size of the blockstore
func (b *Blockstore) Size() (int64, error) {
if err := b.access(); err != nil {
return 0, err
}
defer b.viewers.Done()
lsm, vlog := b.DB.Size()
size := lsm + vlog
if size == 0 {
// badger reports a 0 size on symlinked directories... sigh
dir := b.opts.Dir
entries, err := os.ReadDir(dir)
if err != nil {
return 0, err
}
for _, e := range entries {
path := filepath.Join(dir, e.Name())
finfo, err := os.Stat(path)
if err != nil {
return 0, err
}
size += finfo.Size()
}
}
return size, nil
}
// View implements blockstore.Viewer, which leverages zero-copy read-only
// access to values.
func (b *Blockstore) View(cid cid.Cid, fn func([]byte) error) error {

View File

@ -40,6 +40,11 @@ type BlockstoreGC interface {
CollectGarbage() error
}
// BlockstoreSize is a trait for on-disk blockstores that can report their size
type BlockstoreSize interface {
Size() (int64, error)
}
// WrapIDStore wraps the underlying blockstore in an "identity" blockstore.
// The ID store filters out all puts for blocks with CIDs using the "identity"
// hash function. It also extracts inlined blocks from CIDs using the identity

View File

@ -99,3 +99,17 @@ Compaction works transactionally with the following algorithm:
## Garbage Collection
TBD -- see [#6577](https://github.com/filecoin-project/lotus/issues/6577)
## Utilities
`lotus-shed` has a `splitstore` command which provides some utilities:
- `rollback` -- rolls back a splitstore installation.
This command copies the hotstore on top of the coldstore, and then deletes the splitstore
directory and associated metadata keys.
It can also optionally compact/gc the coldstore after the copy (with the `--gc-coldstore` flag)
and automatically rewrite the lotus config to disable splitstore (with the `--rewrite-config` flag).
Note: the node *must be stopped* before running this command.
- `check` -- asynchronously runs a basic healthcheck on the splitstore.
The results are appended to `<lotus-repo>/datastore/splitstore/check.txt`.
- `info` -- prints some basic information about the splitstore.

View File

@ -102,7 +102,8 @@ type SplitStore struct {
compacting int32 // compaction/prune/warmup in progress
closing int32 // the splitstore is closing
cfg *Config
cfg *Config
path string
mx sync.Mutex
warmupEpoch abi.ChainEpoch // protected by mx
@ -169,6 +170,7 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
// and now we can make a SplitStore
ss := &SplitStore{
cfg: cfg,
path: path,
ds: ds,
cold: cold,
hot: hots,

View File

@ -0,0 +1,150 @@
package splitstore
import (
"fmt"
"os"
"path/filepath"
"sync/atomic"
"time"
"golang.org/x/xerrors"
cid "github.com/ipfs/go-cid"
bstore "github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/chain/types"
)
// performs an asynchronous health-check on the splitstore; results are appended to
// <splitstore-path>/check.txt
func (s *SplitStore) Check() error {
s.headChangeMx.Lock()
defer s.headChangeMx.Unlock()
// try to take compaction lock and inhibit compaction while the health-check is running
if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) {
return xerrors.Errorf("can't acquire compaction lock; compacting operation in progress")
}
if s.compactionIndex == 0 {
atomic.StoreInt32(&s.compacting, 0)
return xerrors.Errorf("splitstore hasn't compacted yet; health check is not meaningful")
}
// check if we are actually closing first
if err := s.checkClosing(); err != nil {
atomic.StoreInt32(&s.compacting, 0)
return err
}
curTs := s.chain.GetHeaviestTipSet()
go func() {
defer atomic.StoreInt32(&s.compacting, 0)
log.Info("checking splitstore health")
start := time.Now()
err := s.doCheck(curTs)
if err != nil {
log.Errorf("error checking splitstore health: %s", err)
return
}
log.Infow("health check done", "took", time.Since(start))
}()
return nil
}
func (s *SplitStore) doCheck(curTs *types.TipSet) error {
currentEpoch := curTs.Height()
boundaryEpoch := currentEpoch - CompactionBoundary
outputPath := filepath.Join(s.path, "check.txt")
output, err := os.OpenFile(outputPath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
return xerrors.Errorf("error opening check output file %s: %w", outputPath, err)
}
defer output.Close() //nolint:errcheck
write := func(format string, args ...interface{}) {
_, err := fmt.Fprintf(output, format+"\n", args...)
if err != nil {
log.Warnf("error writing check output: %s", err)
}
}
ts, _ := time.Now().MarshalText()
write("---------------------------------------------")
write("start check at %s", ts)
write("current epoch: %d", currentEpoch)
write("boundary epoch: %d", boundaryEpoch)
write("compaction index: %d", s.compactionIndex)
write("--")
var coldCnt, missingCnt int64
err = s.walkChain(curTs, boundaryEpoch, boundaryEpoch,
func(c cid.Cid) error {
if isUnitaryObject(c) {
return errStopWalk
}
has, err := s.hot.Has(c)
if err != nil {
return xerrors.Errorf("error checking hotstore: %w", err)
}
if has {
return nil
}
has, err = s.cold.Has(c)
if err != nil {
return xerrors.Errorf("error checking coldstore: %w", err)
}
if has {
coldCnt++
write("cold object reference: %s", c)
} else {
missingCnt++
write("missing object reference: %s", c)
return errStopWalk
}
return nil
})
if err != nil {
err = xerrors.Errorf("error walking chain: %w", err)
write("ERROR: %s", err)
return err
}
log.Infow("check done", "cold", coldCnt, "missing", missingCnt)
write("--")
write("cold: %d missing: %d", coldCnt, missingCnt)
write("DONE")
return nil
}
// provides some basic information about the splitstore
func (s *SplitStore) Info() map[string]interface{} {
info := make(map[string]interface{})
info["base epoch"] = s.baseEpoch
info["warmup epoch"] = s.warmupEpoch
info["compactions"] = s.compactionIndex
sizer, ok := s.hot.(bstore.BlockstoreSize)
if ok {
size, err := sizer.Size()
if err != nil {
log.Warnf("error getting hotstore size: %s", err)
} else {
info["hotstore size"] = size
}
}
return info
}

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -60,6 +60,7 @@ func main() {
actorCmd,
minerTypesCmd,
minerMultisigsCmd,
splitstoreCmd,
}
app := &cli.App{

View File

@ -0,0 +1,310 @@
package main
import (
"bufio"
"context"
"fmt"
"io"
"os"
"path/filepath"
"runtime"
"github.com/dgraph-io/badger/v2"
"github.com/urfave/cli/v2"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
"go.uber.org/zap"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/repo"
)
var splitstoreCmd = &cli.Command{
Name: "splitstore",
Description: "splitstore utilities",
Subcommands: []*cli.Command{
splitstoreRollbackCmd,
splitstoreCheckCmd,
splitstoreInfoCmd,
},
}
var splitstoreRollbackCmd = &cli.Command{
Name: "rollback",
Description: "rollbacks a splitstore installation",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "repo",
Value: "~/.lotus",
},
&cli.BoolFlag{
Name: "gc-coldstore",
Usage: "compact and garbage collect the coldstore after copying the hotstore",
},
&cli.BoolFlag{
Name: "rewrite-config",
Usage: "rewrite the lotus configuration to disable splitstore",
},
},
Action: func(cctx *cli.Context) error {
r, err := repo.NewFS(cctx.String("repo"))
if err != nil {
return xerrors.Errorf("error opening fs repo: %w", err)
}
exists, err := r.Exists()
if err != nil {
return err
}
if !exists {
return xerrors.Errorf("lotus repo doesn't exist")
}
lr, err := r.Lock(repo.FullNode)
if err != nil {
return xerrors.Errorf("error locking repo: %w", err)
}
defer lr.Close() //nolint:errcheck
cfg, err := lr.Config()
if err != nil {
return xerrors.Errorf("error getting config: %w", err)
}
fncfg, ok := cfg.(*config.FullNode)
if !ok {
return xerrors.Errorf("wrong config type: %T", cfg)
}
if !fncfg.Chainstore.EnableSplitstore {
return xerrors.Errorf("splitstore is not enabled")
}
fmt.Println("copying hotstore to coldstore...")
err = copyHotstoreToColdstore(lr, cctx.Bool("gc-coldstore"))
if err != nil {
return xerrors.Errorf("error copying hotstore to coldstore: %w", err)
}
fmt.Println("deleting splitstore directory...")
err = deleteSplitstoreDir(lr)
if err != nil {
return xerrors.Errorf("error deleting splitstore directory: %w", err)
}
fmt.Println("deleting splitstore keys from metadata datastore...")
err = deleteSplitstoreKeys(lr)
if err != nil {
return xerrors.Errorf("error deleting splitstore keys: %w", err)
}
if cctx.Bool("rewrite-config") {
fmt.Println("disabling splitstore in config...")
err = lr.SetConfig(func(cfg interface{}) {
cfg.(*config.FullNode).Chainstore.EnableSplitstore = false
})
if err != nil {
return xerrors.Errorf("error disabling splitstore in config: %w", err)
}
}
fmt.Println("splitstore has been rolled back.")
return nil
},
}
func copyHotstoreToColdstore(lr repo.LockedRepo, gcColdstore bool) error {
repoPath := lr.Path()
dataPath := filepath.Join(repoPath, "datastore")
coldPath := filepath.Join(dataPath, "chain")
hotPath := filepath.Join(dataPath, "splitstore", "hot.badger")
blog := &badgerLogger{
SugaredLogger: log.Desugar().WithOptions(zap.AddCallerSkip(1)).Sugar(),
skip2: log.Desugar().WithOptions(zap.AddCallerSkip(2)).Sugar(),
}
coldOpts, err := repo.BadgerBlockstoreOptions(repo.UniversalBlockstore, coldPath, false)
if err != nil {
return xerrors.Errorf("error getting coldstore badger options: %w", err)
}
coldOpts.SyncWrites = false
coldOpts.Logger = blog
hotOpts, err := repo.BadgerBlockstoreOptions(repo.HotBlockstore, hotPath, true)
if err != nil {
return xerrors.Errorf("error getting hotstore badger options: %w", err)
}
hotOpts.Logger = blog
cold, err := badger.Open(coldOpts.Options)
if err != nil {
return xerrors.Errorf("error opening coldstore: %w", err)
}
defer cold.Close() //nolint
hot, err := badger.Open(hotOpts.Options)
if err != nil {
return xerrors.Errorf("error opening hotstore: %w", err)
}
defer hot.Close() //nolint
rd, wr := io.Pipe()
g := new(errgroup.Group)
g.Go(func() error {
bwr := bufio.NewWriterSize(wr, 64<<20)
_, err := hot.Backup(bwr, 0)
if err != nil {
_ = wr.CloseWithError(err)
return err
}
err = bwr.Flush()
if err != nil {
_ = wr.CloseWithError(err)
return err
}
return wr.Close()
})
g.Go(func() error {
err := cold.Load(rd, 1024)
if err != nil {
return err
}
return cold.Sync()
})
err = g.Wait()
if err != nil {
return err
}
// compact + gc the coldstore if so requested
if gcColdstore {
fmt.Println("compacting coldstore...")
nworkers := runtime.NumCPU()
if nworkers < 2 {
nworkers = 2
}
err = cold.Flatten(nworkers)
if err != nil {
return xerrors.Errorf("error compacting coldstore: %w", err)
}
fmt.Println("garbage collecting coldstore...")
for err == nil {
err = cold.RunValueLogGC(0.0625)
}
if err != badger.ErrNoRewrite {
return xerrors.Errorf("error garbage collecting coldstore: %w", err)
}
}
return nil
}
func deleteSplitstoreDir(lr repo.LockedRepo) error {
path, err := lr.SplitstorePath()
if err != nil {
return xerrors.Errorf("error getting splitstore path: %w", err)
}
return os.RemoveAll(path)
}
func deleteSplitstoreKeys(lr repo.LockedRepo) error {
ds, err := lr.Datastore(context.TODO(), "/metadata")
if err != nil {
return xerrors.Errorf("error opening datastore: %w", err)
}
if closer, ok := ds.(io.Closer); ok {
defer closer.Close() //nolint
}
var keys []datastore.Key
res, err := ds.Query(query.Query{Prefix: "/splitstore"})
if err != nil {
return xerrors.Errorf("error querying datastore for splitstore keys: %w", err)
}
for r := range res.Next() {
if r.Error != nil {
return xerrors.Errorf("datastore query error: %w", r.Error)
}
keys = append(keys, datastore.NewKey(r.Key))
}
for _, k := range keys {
fmt.Printf("deleting %s from datastore...\n", k)
err = ds.Delete(k)
if err != nil {
return xerrors.Errorf("error deleting key %s from datastore: %w", k, err)
}
}
return nil
}
// badger logging through go-log
type badgerLogger struct {
*zap.SugaredLogger
skip2 *zap.SugaredLogger
}
func (b *badgerLogger) Warningf(format string, args ...interface{}) {}
func (b *badgerLogger) Infof(format string, args ...interface{}) {}
func (b *badgerLogger) Debugf(format string, args ...interface{}) {}
var splitstoreCheckCmd = &cli.Command{
Name: "check",
Description: "runs a healthcheck on a splitstore installation",
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetFullNodeAPIV1(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
return api.ChainCheckBlockstore(ctx)
},
}
var splitstoreInfoCmd = &cli.Command{
Name: "info",
Description: "prints some basic splitstore information",
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetFullNodeAPIV1(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
info, err := api.ChainBlockstoreInfo(ctx)
if err != nil {
return err
}
for k, v := range info {
fmt.Print(k)
fmt.Print(": ")
fmt.Println(v)
}
return nil
},
}

View File

@ -11,6 +11,8 @@
* [Beacon](#Beacon)
* [BeaconGetEntry](#BeaconGetEntry)
* [Chain](#Chain)
* [ChainBlockstoreInfo](#ChainBlockstoreInfo)
* [ChainCheckBlockstore](#ChainCheckBlockstore)
* [ChainDeleteObj](#ChainDeleteObj)
* [ChainExport](#ChainExport)
* [ChainGetBlock](#ChainGetBlock)
@ -350,6 +352,32 @@ The Chain method group contains methods for interacting with the
blockchain, but that do not require any form of state computation.
### ChainBlockstoreInfo
ChainBlockstoreInfo returns some basic information about the blockstore
Perms: read
Inputs: `null`
Response:
```json
{
"abc": 123
}
```
### ChainCheckBlockstore
ChainCheckBlockstore performs an (asynchronous) health check on the chain/state blockstore
if supported by the underlying implementation.
Perms: admin
Inputs: `null`
Response: `{}`
### ChainDeleteObj
ChainDeleteObj deletes node referenced by the given CID

View File

@ -83,6 +83,9 @@ type ChainAPI struct {
// expose externally. In the future, this will be segregated into two
// blockstores.
ExposedBlockstore dtypes.ExposedBlockstore
// BaseBlockstore is the underlying blockstore
BaseBlockstore dtypes.BaseBlockstore
}
func (m *ChainModule) ChainNotify(ctx context.Context) (<-chan []*api.HeadChange, error) {
@ -644,3 +647,21 @@ func (a *ChainAPI) ChainExport(ctx context.Context, nroots abi.ChainEpoch, skipo
return out, nil
}
func (a *ChainAPI) ChainCheckBlockstore(ctx context.Context) error {
checker, ok := a.BaseBlockstore.(interface{ Check() error })
if !ok {
return xerrors.Errorf("underlying blockstore does not support health checks")
}
return checker.Check()
}
func (a *ChainAPI) ChainBlockstoreInfo(ctx context.Context) (map[string]interface{}, error) {
info, ok := a.BaseBlockstore.(interface{ Info() map[string]interface{} })
if !ok {
return nil, xerrors.Errorf("underlying blockstore does not provide info")
}
return info.Info(), nil
}