Merge pull request #4133 from filecoin-project/feat/backup

Miner backup/restore commands
This commit is contained in:
Łukasz Magiera 2020-10-06 02:05:03 +02:00 committed by GitHub
commit 0e2f697217
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1174 additions and 10 deletions

View File

@ -484,6 +484,12 @@ type FullNode interface {
PaychVoucherAdd(context.Context, address.Address, *paych.SignedVoucher, []byte, types.BigInt) (types.BigInt, error)
PaychVoucherList(context.Context, address.Address) ([]*paych.SignedVoucher, error)
PaychVoucherSubmit(context.Context, address.Address, *paych.SignedVoucher, []byte, []byte) (cid.Cid, error)
// CreateBackup creates node backup onder the specified file name. The
// method requires that the lotus daemon is running with the
// LOTUS_BACKUP_BASE_PATH environment variable set to some path, and that
// the path specified when calling CreateBackup is within the base path
CreateBackup(ctx context.Context, fpath string) error
}
type FileRef struct {

View File

@ -101,6 +101,12 @@ type StorageMiner interface {
PiecesListCidInfos(ctx context.Context) ([]cid.Cid, error)
PiecesGetPieceInfo(ctx context.Context, pieceCid cid.Cid) (*piecestore.PieceInfo, error)
PiecesGetCIDInfo(ctx context.Context, payloadCid cid.Cid) (*piecestore.CIDInfo, error)
// CreateBackup creates node backup onder the specified file name. The
// method requires that the lotus-miner is running with the
// LOTUS_BACKUP_BASE_PATH environment variable set to some path, and that
// the path specified when calling CreateBackup is within the base path
CreateBackup(ctx context.Context, fpath string) error
}
type SealRes struct {

View File

@ -243,6 +243,8 @@ type FullNodeStruct struct {
PaychVoucherCreate func(context.Context, address.Address, big.Int, uint64) (*api.VoucherCreateResult, error) `perm:"sign"`
PaychVoucherList func(context.Context, address.Address) ([]*paych.SignedVoucher, error) `perm:"write"`
PaychVoucherSubmit func(context.Context, address.Address, *paych.SignedVoucher, []byte, []byte) (cid.Cid, error) `perm:"sign"`
CreateBackup func(ctx context.Context, fpath string) error `perm:"admin"`
}
}
@ -323,6 +325,8 @@ type StorageMinerStruct struct {
PiecesListCidInfos func(ctx context.Context) ([]cid.Cid, error) `perm:"read"`
PiecesGetPieceInfo func(ctx context.Context, pieceCid cid.Cid) (*piecestore.PieceInfo, error) `perm:"read"`
PiecesGetCIDInfo func(ctx context.Context, payloadCid cid.Cid) (*piecestore.CIDInfo, error) `perm:"read"`
CreateBackup func(ctx context.Context, fpath string) error `perm:"admin"`
}
}
@ -1041,6 +1045,10 @@ func (c *FullNodeStruct) PaychVoucherSubmit(ctx context.Context, ch address.Addr
return c.Internal.PaychVoucherSubmit(ctx, ch, sv, secret, proof)
}
func (c *FullNodeStruct) CreateBackup(ctx context.Context, fpath string) error {
return c.Internal.CreateBackup(ctx, fpath)
}
// StorageMinerStruct
func (c *StorageMinerStruct) ActorAddress(ctx context.Context) (address.Address, error) {
@ -1281,6 +1289,10 @@ func (c *StorageMinerStruct) PiecesGetCIDInfo(ctx context.Context, payloadCid ci
return c.Internal.PiecesGetCIDInfo(ctx, payloadCid)
}
func (c *StorageMinerStruct) CreateBackup(ctx context.Context, fpath string) error {
return c.Internal.CreateBackup(ctx, fpath)
}
// WorkerStruct
func (w *WorkerStruct) Version(ctx context.Context) (build.Version, error) {

125
cli/backup.go Normal file
View File

@ -0,0 +1,125 @@
package cli
import (
"context"
"fmt"
"os"
logging "github.com/ipfs/go-log/v2"
"github.com/mitchellh/go-homedir"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/lotus/lib/backupds"
"github.com/filecoin-project/lotus/node/repo"
)
type BackupAPI interface {
CreateBackup(ctx context.Context, fpath string) error
}
type BackupApiFn func(ctx *cli.Context) (BackupAPI, jsonrpc.ClientCloser, error)
func BackupCmd(repoFlag string, rt repo.RepoType, getApi BackupApiFn) *cli.Command {
var offlineBackup = func(cctx *cli.Context) error {
logging.SetLogLevel("badger", "ERROR") // nolint:errcheck
repoPath := cctx.String(repoFlag)
r, err := repo.NewFS(repoPath)
if err != nil {
return err
}
ok, err := r.Exists()
if err != nil {
return err
}
if !ok {
return xerrors.Errorf("repo at '%s' is not initialized", cctx.String(repoFlag))
}
lr, err := r.LockRO(rt)
if err != nil {
return xerrors.Errorf("locking repo: %w", err)
}
defer lr.Close() // nolint:errcheck
mds, err := lr.Datastore("/metadata")
if err != nil {
return xerrors.Errorf("getting metadata datastore: %w", err)
}
bds := backupds.Wrap(mds)
fpath, err := homedir.Expand(cctx.Args().First())
if err != nil {
return xerrors.Errorf("expanding file path: %w", err)
}
out, err := os.OpenFile(fpath, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return xerrors.Errorf("opening backup file %s: %w", fpath, err)
}
if err := bds.Backup(out); err != nil {
if cerr := out.Close(); cerr != nil {
log.Errorw("error closing backup file while handling backup error", "closeErr", cerr, "backupErr", err)
}
return xerrors.Errorf("backup error: %w", err)
}
if err := out.Close(); err != nil {
return xerrors.Errorf("closing backup file: %w", err)
}
return nil
}
var onlineBackup = func(cctx *cli.Context) error {
api, closer, err := getApi(cctx)
if err != nil {
return xerrors.Errorf("getting api: %w (if the node isn't running you can use the --offline flag)", err)
}
defer closer()
err = api.CreateBackup(ReqContext(cctx), cctx.Args().First())
if err != nil {
return err
}
fmt.Println("Success")
return nil
}
return &cli.Command{
Name: "backup",
Usage: "Create node metadata backup",
Description: `The backup command writes a copy of node metadata under the specified path
Online backups:
For security reasons, the daemon must be have LOTUS_BACKUP_BASE_PATH env var set
to a path where backup files are supposed to be saved, and the path specified in
this command must be within this base path`,
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "offline",
Usage: "create backup without the node running",
},
},
ArgsUsage: "[backup file path]",
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() != 1 {
return xerrors.Errorf("expected 1 argument")
}
if cctx.Bool("offline") {
return offlineBackup(cctx)
}
return onlineBackup(cctx)
},
}
}

290
cmd/lotus-shed/datastore.go Normal file
View File

@ -0,0 +1,290 @@
package main
import (
"encoding/json"
"fmt"
"os"
"strings"
"github.com/docker/go-units"
"github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
logging "github.com/ipfs/go-log"
"github.com/polydawn/refmt/cbor"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/lib/backupds"
"github.com/filecoin-project/lotus/node/repo"
)
var datastoreCmd = &cli.Command{
Name: "datastore",
Description: "access node datastores directly",
Subcommands: []*cli.Command{
datastoreBackupCmd,
datastoreListCmd,
datastoreGetCmd,
},
}
var datastoreListCmd = &cli.Command{
Name: "list",
Description: "list datastore keys",
Flags: []cli.Flag{
&cli.IntFlag{
Name: "repo-type",
Usage: "node type (1 - full, 2 - storage, 3 - worker)",
Value: 1,
},
&cli.BoolFlag{
Name: "top-level",
Usage: "only print top-level keys",
},
&cli.StringFlag{
Name: "get-enc",
Usage: "print values [esc/hex/cbor]",
},
},
ArgsUsage: "[namespace prefix]",
Action: func(cctx *cli.Context) error {
logging.SetLogLevel("badger", "ERROR") // nolint:errcheck
r, err := repo.NewFS(cctx.String("repo"))
if err != nil {
return xerrors.Errorf("opening fs repo: %w", err)
}
exists, err := r.Exists()
if err != nil {
return err
}
if !exists {
return xerrors.Errorf("lotus repo doesn't exist")
}
lr, err := r.Lock(repo.RepoType(cctx.Int("repo-type")))
if err != nil {
return err
}
defer lr.Close() //nolint:errcheck
ds, err := lr.Datastore(datastore.NewKey(cctx.Args().First()).String())
if err != nil {
return err
}
genc := cctx.String("get-enc")
q, err := ds.Query(dsq.Query{
Prefix: datastore.NewKey(cctx.Args().Get(1)).String(),
KeysOnly: genc == "",
})
if err != nil {
return xerrors.Errorf("datastore query: %w", err)
}
defer q.Close() //nolint:errcheck
printKv := kvPrinter(cctx.Bool("top-level"), genc)
for res := range q.Next() {
if err := printKv(res.Key, res.Value); err != nil {
return err
}
}
return nil
},
}
var datastoreGetCmd = &cli.Command{
Name: "get",
Description: "list datastore keys",
Flags: []cli.Flag{
&cli.IntFlag{
Name: "repo-type",
Usage: "node type (1 - full, 2 - storage, 3 - worker)",
Value: 1,
},
&cli.StringFlag{
Name: "enc",
Usage: "encoding (esc/hex/cbor)",
Value: "esc",
},
},
ArgsUsage: "[namespace key]",
Action: func(cctx *cli.Context) error {
logging.SetLogLevel("badger", "ERROR") // nolint:errchec
r, err := repo.NewFS(cctx.String("repo"))
if err != nil {
return xerrors.Errorf("opening fs repo: %w", err)
}
exists, err := r.Exists()
if err != nil {
return err
}
if !exists {
return xerrors.Errorf("lotus repo doesn't exist")
}
lr, err := r.Lock(repo.RepoType(cctx.Int("repo-type")))
if err != nil {
return err
}
defer lr.Close() //nolint:errcheck
ds, err := lr.Datastore(datastore.NewKey(cctx.Args().First()).String())
if err != nil {
return err
}
val, err := ds.Get(datastore.NewKey(cctx.Args().Get(1)))
if err != nil {
return xerrors.Errorf("get: %w", err)
}
return printVal(cctx.String("enc"), val)
},
}
var datastoreBackupCmd = &cli.Command{
Name: "backup",
Description: "manage datastore backups",
Subcommands: []*cli.Command{
datastoreBackupStatCmd,
datastoreBackupListCmd,
},
}
var datastoreBackupStatCmd = &cli.Command{
Name: "stat",
Description: "validate and print info about datastore backup",
ArgsUsage: "[file]",
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() != 1 {
return xerrors.Errorf("expected 1 argument")
}
f, err := os.Open(cctx.Args().First())
if err != nil {
return xerrors.Errorf("opening backup file: %w", err)
}
defer f.Close() // nolint:errcheck
var keys, kbytes, vbytes uint64
err = backupds.ReadBackup(f, func(key datastore.Key, value []byte) error {
keys++
kbytes += uint64(len(key.String()))
vbytes += uint64(len(value))
return nil
})
if err != nil {
return err
}
fmt.Println("Keys: ", keys)
fmt.Println("Key bytes: ", units.BytesSize(float64(kbytes)))
fmt.Println("Value bytes: ", units.BytesSize(float64(vbytes)))
return err
},
}
var datastoreBackupListCmd = &cli.Command{
Name: "list",
Description: "list data in a backup",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "top-level",
Usage: "only print top-level keys",
},
&cli.StringFlag{
Name: "get-enc",
Usage: "print values [esc/hex/cbor]",
},
},
ArgsUsage: "[file]",
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() != 1 {
return xerrors.Errorf("expected 1 argument")
}
f, err := os.Open(cctx.Args().First())
if err != nil {
return xerrors.Errorf("opening backup file: %w", err)
}
defer f.Close() // nolint:errcheck
printKv := kvPrinter(cctx.Bool("top-level"), cctx.String("get-enc"))
err = backupds.ReadBackup(f, func(key datastore.Key, value []byte) error {
return printKv(key.String(), value)
})
if err != nil {
return err
}
return err
},
}
func kvPrinter(toplevel bool, genc string) func(sk string, value []byte) error {
seen := map[string]struct{}{}
return func(s string, value []byte) error {
if toplevel {
k := datastore.NewKey(datastore.NewKey(s).List()[0])
if k.Type() != "" {
s = k.Type()
} else {
s = k.String()
}
_, has := seen[s]
if has {
return nil
}
seen[s] = struct{}{}
}
s = fmt.Sprintf("%q", s)
s = strings.Trim(s, "\"")
fmt.Println(s)
if genc != "" {
fmt.Print("\t")
if err := printVal(genc, value); err != nil {
return err
}
}
return nil
}
}
func printVal(enc string, val []byte) error {
switch enc {
case "esc":
s := fmt.Sprintf("%q", string(val))
s = strings.Trim(s, "\"")
fmt.Println(s)
case "hex":
fmt.Printf("%x\n", val)
case "cbor":
var out interface{}
if err := cbor.Unmarshal(cbor.DecodeOptions{}, val, &out); err != nil {
return xerrors.Errorf("unmarshaling cbor: %w", err)
}
s, err := json.Marshal(&out)
if err != nil {
return xerrors.Errorf("remarshaling as json: %w", err)
}
fmt.Println(string(s))
default:
return xerrors.New("unknown encoding")
}
return nil
}

View File

@ -40,6 +40,7 @@ func main() {
serveDealStatsCmd,
syncCmd,
stateTreePruneCmd,
datastoreCmd,
}
app := &cli.App{

View File

@ -0,0 +1,14 @@
package main
import (
"github.com/urfave/cli/v2"
"github.com/filecoin-project/go-jsonrpc"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/node/repo"
)
var backupCmd = lcli.BackupCmd(FlagMinerRepo, repo.StorageMiner, func(cctx *cli.Context) (lcli.BackupAPI, jsonrpc.ClientCloser, error) {
return lcli.GetStorageMinerAPI(cctx)
})

View File

@ -115,6 +115,9 @@ var initCmd = &cli.Command{
Usage: "select which address to send actor creation message from",
},
},
Subcommands: []*cli.Command{
initRestoreCmd,
},
Action: func(cctx *cli.Context) error {
log.Info("Initializing lotus miner")

View File

@ -0,0 +1,274 @@
package main
import (
"encoding/json"
"io/ioutil"
"os"
"github.com/docker/go-units"
"github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/mitchellh/go-homedir"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
"gopkg.in/cheggaaa/pb.v1"
"github.com/filecoin-project/go-address"
paramfetch "github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/lib/backupds"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/repo"
)
var initRestoreCmd = &cli.Command{
Name: "restore",
Usage: "Initialize a lotus miner repo from a backup",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "nosync",
Usage: "don't check full-node sync status",
},
&cli.StringFlag{
Name: "config",
Usage: "config file (config.toml)",
},
&cli.StringFlag{
Name: "storage-config",
Usage: "storage paths config (storage.json)",
},
},
ArgsUsage: "[backupFile]",
Action: func(cctx *cli.Context) error {
log.Info("Initializing lotus miner using a backup")
if cctx.Args().Len() != 1 {
return xerrors.Errorf("expected 1 argument")
}
log.Info("Trying to connect to full node RPC")
api, closer, err := lcli.GetFullNodeAPI(cctx) // TODO: consider storing full node address in config
if err != nil {
return err
}
defer closer()
log.Info("Checking full node version")
ctx := lcli.ReqContext(cctx)
v, err := api.Version(ctx)
if err != nil {
return err
}
if !v.APIVersion.EqMajorMinor(build.FullAPIVersion) {
return xerrors.Errorf("Remote API version didn't match (expected %s, remote %s)", build.FullAPIVersion, v.APIVersion)
}
if !cctx.Bool("nosync") {
if err := lcli.SyncWait(ctx, api); err != nil {
return xerrors.Errorf("sync wait: %w", err)
}
}
bf, err := homedir.Expand(cctx.Args().First())
if err != nil {
return xerrors.Errorf("expand backup file path: %w", err)
}
st, err := os.Stat(bf)
if err != nil {
return xerrors.Errorf("stat backup file (%s): %w", bf, err)
}
f, err := os.Open(bf)
if err != nil {
return xerrors.Errorf("opening backup file: %w", err)
}
defer f.Close() // nolint:errcheck
log.Info("Checking if repo exists")
repoPath := cctx.String(FlagMinerRepo)
r, err := repo.NewFS(repoPath)
if err != nil {
return err
}
ok, err := r.Exists()
if err != nil {
return err
}
if ok {
return xerrors.Errorf("repo at '%s' is already initialized", cctx.String(FlagMinerRepo))
}
log.Info("Initializing repo")
if err := r.Init(repo.StorageMiner); err != nil {
return err
}
lr, err := r.Lock(repo.StorageMiner)
if err != nil {
return err
}
defer lr.Close() //nolint:errcheck
if cctx.IsSet("config") {
log.Info("Restoring config")
cf, err := homedir.Expand(cctx.String("config"))
if err != nil {
return xerrors.Errorf("expanding config path: %w", err)
}
_, err = os.Stat(cf)
if err != nil {
return xerrors.Errorf("stat config file (%s): %w", cf, err)
}
var cerr error
err = lr.SetConfig(func(raw interface{}) {
rcfg, ok := raw.(*config.StorageMiner)
if !ok {
cerr = xerrors.New("expected miner config")
return
}
ff, err := config.FromFile(cf, rcfg)
if err != nil {
cerr = xerrors.Errorf("loading config: %w", err)
return
}
*rcfg = *ff.(*config.StorageMiner)
})
if cerr != nil {
return cerr
}
if err != nil {
return xerrors.Errorf("setting config: %w", err)
}
} else {
log.Warn("--config NOT SET, WILL USE DEFAULT VALUES")
}
if cctx.IsSet("storage-config") {
log.Info("Restoring storage path config")
cf, err := homedir.Expand(cctx.String("storage-config"))
if err != nil {
return xerrors.Errorf("expanding storage config path: %w", err)
}
cfb, err := ioutil.ReadFile(cf)
if err != nil {
return xerrors.Errorf("reading storage config: %w", err)
}
var cerr error
err = lr.SetStorage(func(scfg *stores.StorageConfig) {
cerr = json.Unmarshal(cfb, scfg)
})
if cerr != nil {
return xerrors.Errorf("unmarshalling storage config: %w", cerr)
}
if err != nil {
return xerrors.Errorf("setting storage config: %w", err)
}
} else {
log.Warn("--storage-config NOT SET. NO SECTOR PATHS WILL BE CONFIGURED")
}
log.Info("Restoring metadata backup")
mds, err := lr.Datastore("/metadata")
if err != nil {
return err
}
bar := pb.New64(st.Size())
br := bar.NewProxyReader(f)
bar.ShowTimeLeft = true
bar.ShowPercent = true
bar.ShowSpeed = true
bar.Units = pb.U_BYTES
bar.Start()
err = backupds.RestoreInto(br, mds)
bar.Finish()
if err != nil {
return xerrors.Errorf("restoring metadata: %w", err)
}
log.Info("Checking actor metadata")
abytes, err := mds.Get(datastore.NewKey("miner-address"))
if err != nil {
return xerrors.Errorf("getting actor address from metadata datastore: %w", err)
}
maddr, err := address.NewFromBytes(abytes)
if err != nil {
return xerrors.Errorf("parsing actor address: %w", err)
}
log.Info("ACTOR ADDRESS: ", maddr.String())
mi, err := api.StateMinerInfo(ctx, maddr, types.EmptyTSK)
if err != nil {
return xerrors.Errorf("getting miner info: %w", err)
}
log.Info("SECTOR SIZE: ", units.BytesSize(float64(mi.SectorSize)))
wk, err := api.StateAccountKey(ctx, mi.Worker, types.EmptyTSK)
if err != nil {
return xerrors.Errorf("resolving worker key: %w", err)
}
has, err := api.WalletHas(ctx, wk)
if err != nil {
return xerrors.Errorf("checking worker address: %w", err)
}
if !has {
return xerrors.Errorf("worker address %s for miner actor %s not present in full node wallet", mi.Worker, maddr)
}
log.Info("Checking proof parameters")
if err := paramfetch.GetParams(ctx, build.ParametersJSON(), uint64(mi.SectorSize)); err != nil {
return xerrors.Errorf("fetching proof parameters: %w", err)
}
log.Info("Initializing libp2p identity")
p2pSk, err := makeHostKey(lr)
if err != nil {
return xerrors.Errorf("make host key: %w", err)
}
peerid, err := peer.IDFromPrivateKey(p2pSk)
if err != nil {
return xerrors.Errorf("peer ID from private key: %w", err)
}
log.Info("Configuring miner actor")
if err := configureStorageMiner(ctx, api, maddr, peerid, big.Zero()); err != nil {
return err
}
return nil
},
}

View File

@ -35,6 +35,7 @@ func main() {
runCmd,
stopCmd,
configCmd,
backupCmd,
lcli.WithCategory("chain", actorCmd),
lcli.WithCategory("chain", infoCmd),
lcli.WithCategory("market", storageDealsCmd),

14
cmd/lotus/backup.go Normal file
View File

@ -0,0 +1,14 @@
package main
import (
"github.com/urfave/cli/v2"
"github.com/filecoin-project/go-jsonrpc"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/node/repo"
)
var backupCmd = lcli.BackupCmd("repo", repo.FullNode, func(cctx *cli.Context) (lcli.BackupAPI, jsonrpc.ClientCloser, error) {
return lcli.GetFullNodeAPI(cctx)
})

View File

@ -22,6 +22,7 @@ func main() {
local := []*cli.Command{
DaemonCmd,
backupCmd,
}
if AdvanceBlockCmd != nil {
local = append(local, AdvanceBlockCmd)

View File

@ -50,6 +50,8 @@
* [ClientRetrieveTryRestartInsufficientFunds](#ClientRetrieveTryRestartInsufficientFunds)
* [ClientRetrieveWithEvents](#ClientRetrieveWithEvents)
* [ClientStartDeal](#ClientStartDeal)
* [Create](#Create)
* [CreateBackup](#CreateBackup)
* [Gas](#Gas)
* [GasEstimateFeeCap](#GasEstimateFeeCap)
* [GasEstimateGasLimit](#GasEstimateGasLimit)
@ -1283,6 +1285,27 @@ Inputs:
Response: `null`
## Create
### CreateBackup
CreateBackup creates node backup onder the specified file name. The
method requires that the lotus daemon is running with the
LOTUS_BACKUP_BASE_PATH environment variable set to some path, and that
the path specified when calling CreateBackup is within the base path
Perms: admin
Inputs:
```json
[
"string value"
]
```
Response: `{}`
## Gas

1
go.mod
View File

@ -110,6 +110,7 @@ require (
github.com/multiformats/go-multibase v0.0.3
github.com/multiformats/go-multihash v0.0.14
github.com/opentracing/opentracing-go v1.2.0
github.com/polydawn/refmt v0.0.0-20190809202753-05966cbd336a
github.com/raulk/clock v1.1.0
github.com/stretchr/testify v1.6.1
github.com/supranational/blst v0.1.1

189
lib/backupds/datastore.go Normal file
View File

@ -0,0 +1,189 @@
package backupds
import (
"crypto/sha256"
"io"
"sync"
logging "github.com/ipfs/go-log/v2"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
)
var log = logging.Logger("backupds")
type Datastore struct {
child datastore.Batching
backupLk sync.RWMutex
}
func Wrap(child datastore.Batching) *Datastore {
return &Datastore{
child: child,
}
}
// Writes a datastore dump into the provided writer as
// [array(*) of [key, value] tuples, checksum]
func (d *Datastore) Backup(out io.Writer) error {
scratch := make([]byte, 9)
if err := cbg.WriteMajorTypeHeaderBuf(scratch, out, cbg.MajArray, 2); err != nil {
return xerrors.Errorf("writing tuple header: %w", err)
}
hasher := sha256.New()
hout := io.MultiWriter(hasher, out)
// write KVs
{
// write indefinite length array header
if _, err := hout.Write([]byte{0x9f}); err != nil {
return xerrors.Errorf("writing header: %w", err)
}
d.backupLk.Lock()
defer d.backupLk.Unlock()
log.Info("Starting datastore backup")
defer log.Info("Datastore backup done")
qr, err := d.child.Query(query.Query{})
if err != nil {
return xerrors.Errorf("query: %w", err)
}
defer func() {
if err := qr.Close(); err != nil {
log.Errorf("query close error: %+v", err)
return
}
}()
for result := range qr.Next() {
if err := cbg.WriteMajorTypeHeaderBuf(scratch, hout, cbg.MajArray, 2); err != nil {
return xerrors.Errorf("writing tuple header: %w", err)
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, hout, cbg.MajByteString, uint64(len([]byte(result.Key)))); err != nil {
return xerrors.Errorf("writing key header: %w", err)
}
if _, err := hout.Write([]byte(result.Key)[:]); err != nil {
return xerrors.Errorf("writing key: %w", err)
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, hout, cbg.MajByteString, uint64(len(result.Value))); err != nil {
return xerrors.Errorf("writing value header: %w", err)
}
if _, err := hout.Write(result.Value[:]); err != nil {
return xerrors.Errorf("writing value: %w", err)
}
}
// array break
if _, err := hout.Write([]byte{0xff}); err != nil {
return xerrors.Errorf("writing array 'break': %w", err)
}
}
// Write the checksum
{
sum := hasher.Sum(nil)
if err := cbg.WriteMajorTypeHeaderBuf(scratch, hout, cbg.MajByteString, uint64(len(sum))); err != nil {
return xerrors.Errorf("writing checksum header: %w", err)
}
if _, err := hout.Write(sum[:]); err != nil {
return xerrors.Errorf("writing checksum: %w", err)
}
}
return nil
}
// proxy
func (d *Datastore) Get(key datastore.Key) (value []byte, err error) {
return d.child.Get(key)
}
func (d *Datastore) Has(key datastore.Key) (exists bool, err error) {
return d.child.Has(key)
}
func (d *Datastore) GetSize(key datastore.Key) (size int, err error) {
return d.child.GetSize(key)
}
func (d *Datastore) Query(q query.Query) (query.Results, error) {
return d.child.Query(q)
}
func (d *Datastore) Put(key datastore.Key, value []byte) error {
d.backupLk.RLock()
defer d.backupLk.RUnlock()
return d.child.Put(key, value)
}
func (d *Datastore) Delete(key datastore.Key) error {
d.backupLk.RLock()
defer d.backupLk.RUnlock()
return d.child.Delete(key)
}
func (d *Datastore) Sync(prefix datastore.Key) error {
d.backupLk.RLock()
defer d.backupLk.RUnlock()
return d.child.Sync(prefix)
}
func (d *Datastore) Close() error {
d.backupLk.RLock()
defer d.backupLk.RUnlock()
return d.child.Close()
}
func (d *Datastore) Batch() (datastore.Batch, error) {
b, err := d.child.Batch()
if err != nil {
return nil, err
}
return &bbatch{
b: b,
rlk: d.backupLk.RLocker(),
}, nil
}
type bbatch struct {
b datastore.Batch
rlk sync.Locker
}
func (b *bbatch) Put(key datastore.Key, value []byte) error {
return b.b.Put(key, value)
}
func (b *bbatch) Delete(key datastore.Key) error {
return b.b.Delete(key)
}
func (b *bbatch) Commit() error {
b.rlk.Lock()
defer b.rlk.Unlock()
return b.b.Commit()
}
var _ datastore.Batch = &bbatch{}
var _ datastore.Batching = &Datastore{}

100
lib/backupds/read.go Normal file
View File

@ -0,0 +1,100 @@
package backupds
import (
"bytes"
"crypto/sha256"
"io"
"github.com/ipfs/go-datastore"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
)
func ReadBackup(r io.Reader, cb func(key datastore.Key, value []byte) error) error {
scratch := make([]byte, 9)
if _, err := r.Read(scratch[:1]); err != nil {
return xerrors.Errorf("reading array header: %w", err)
}
if scratch[0] != 0x82 {
return xerrors.Errorf("expected array(2) header byte 0x82, got %x", scratch[0])
}
hasher := sha256.New()
hr := io.TeeReader(r, hasher)
if _, err := hr.Read(scratch[:1]); err != nil {
return xerrors.Errorf("reading array header: %w", err)
}
if scratch[0] != 0x9f {
return xerrors.Errorf("expected indefinite length array header byte 0x9f, got %x", scratch[0])
}
for {
if _, err := hr.Read(scratch[:1]); err != nil {
return xerrors.Errorf("reading tuple header: %w", err)
}
if scratch[0] == 0xff {
break
}
if scratch[0] != 0x82 {
return xerrors.Errorf("expected array(2) header 0x82, got %x", scratch[0])
}
keyb, err := cbg.ReadByteArray(hr, 1<<40)
if err != nil {
return xerrors.Errorf("reading key: %w", err)
}
key := datastore.NewKey(string(keyb))
value, err := cbg.ReadByteArray(hr, 1<<40)
if err != nil {
return xerrors.Errorf("reading value: %w", err)
}
if err := cb(key, value); err != nil {
return err
}
}
sum := hasher.Sum(nil)
expSum, err := cbg.ReadByteArray(r, 32)
if err != nil {
return xerrors.Errorf("reading expected checksum: %w", err)
}
if !bytes.Equal(sum, expSum) {
return xerrors.Errorf("checksum didn't match; expected %x, got %x", expSum, sum)
}
return nil
}
func RestoreInto(r io.Reader, dest datastore.Batching) error {
batch, err := dest.Batch()
if err != nil {
return xerrors.Errorf("creating batch: %w", err)
}
err = ReadBackup(r, func(key datastore.Key, value []byte) error {
if err := batch.Put(key, value); err != nil {
return xerrors.Errorf("put key: %w", err)
}
return nil
})
if err != nil {
return xerrors.Errorf("reading backup: %w", err)
}
if err := batch.Commit(); err != nil {
return xerrors.Errorf("committing batch: %w", err)
}
return nil
}

67
node/impl/backup.go Normal file
View File

@ -0,0 +1,67 @@
package impl
import (
"os"
"path/filepath"
"strings"
"github.com/mitchellh/go-homedir"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/lib/backupds"
"github.com/filecoin-project/lotus/node/modules/dtypes"
)
func backup(mds dtypes.MetadataDS, fpath string) error {
bb, ok := os.LookupEnv("LOTUS_BACKUP_BASE_PATH")
if !ok {
return xerrors.Errorf("LOTUS_BACKUP_BASE_PATH env var not set")
}
bds, ok := mds.(*backupds.Datastore)
if !ok {
return xerrors.Errorf("expected a backup datastore")
}
bb, err := homedir.Expand(bb)
if err != nil {
return xerrors.Errorf("expanding base path: %w", err)
}
bb, err = filepath.Abs(bb)
if err != nil {
return xerrors.Errorf("getting absolute base path: %w", err)
}
fpath, err = homedir.Expand(fpath)
if err != nil {
return xerrors.Errorf("expanding file path: %w", err)
}
fpath, err = filepath.Abs(fpath)
if err != nil {
return xerrors.Errorf("getting absolute file path: %w", err)
}
if !strings.HasPrefix(fpath, bb) {
return xerrors.Errorf("backup file name (%s) must be inside base path (%s)", fpath, bb)
}
out, err := os.OpenFile(fpath, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return xerrors.Errorf("open %s: %w", fpath, err)
}
if err := bds.Backup(out); err != nil {
if cerr := out.Close(); cerr != nil {
log.Errorw("error closing backup file while handling backup error", "closeErr", cerr, "backupErr", err)
}
return xerrors.Errorf("backup error: %w", err)
}
if err := out.Close(); err != nil {
return xerrors.Errorf("closing backup file: %w", err)
}
return nil
}

View File

@ -1,6 +1,8 @@
package impl
import (
"context"
logging "github.com/ipfs/go-log/v2"
"github.com/filecoin-project/lotus/api"
@ -9,6 +11,7 @@ import (
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/node/impl/market"
"github.com/filecoin-project/lotus/node/impl/paych"
"github.com/filecoin-project/lotus/node/modules/dtypes"
)
var log = logging.Logger("node")
@ -26,6 +29,12 @@ type FullNodeAPI struct {
full.WalletAPI
full.SyncAPI
full.BeaconAPI
DS dtypes.MetadataDS
}
func (n *FullNodeAPI) CreateBackup(ctx context.Context, fpath string) error {
return backup(n.DS, fpath)
}
var _ api.FullNode = &FullNodeAPI{}

View File

@ -8,18 +8,18 @@ import (
"strconv"
"time"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-state-types/big"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/host"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/piecestore"
retrievalmarket "github.com/filecoin-project/go-fil-markets/retrievalmarket"
storagemarket "github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
@ -56,6 +56,8 @@ type StorageMinerAPI struct {
DataTransfer dtypes.ProviderDataTransfer
Host host.Host
DS dtypes.MetadataDS
ConsiderOnlineStorageDealsConfigFunc dtypes.ConsiderOnlineStorageDealsConfigFunc
SetConsiderOnlineStorageDealsConfigFunc dtypes.SetConsiderOnlineStorageDealsConfigFunc
ConsiderOnlineRetrievalDealsConfigFunc dtypes.ConsiderOnlineRetrievalDealsConfigFunc
@ -516,4 +518,8 @@ func (sm *StorageMinerAPI) PiecesGetCIDInfo(ctx context.Context, payloadCid cid.
return &ci, nil
}
func (sm *StorageMinerAPI) CreateBackup(ctx context.Context, fpath string) error {
return backup(sm.DS, fpath)
}
var _ api.StorageMiner = &StorageMinerAPI{}

View File

@ -6,6 +6,7 @@ import (
"go.uber.org/fx"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/backupds"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
)
@ -27,5 +28,10 @@ func KeyStore(lr repo.LockedRepo) (types.KeyStore, error) {
}
func Datastore(r repo.LockedRepo) (dtypes.MetadataDS, error) {
return r.Datastore("/metadata")
mds, err := r.Datastore("/metadata")
if err != nil {
return nil, err
}
return backupds.Wrap(mds), nil
}

View File

@ -226,11 +226,23 @@ func (fsr *FsRepo) Lock(repoType RepoType) (LockedRepo, error) {
}, nil
}
// Like Lock, except datastores will work in read-only mode
func (fsr *FsRepo) LockRO(repoType RepoType) (LockedRepo, error) {
lr, err := fsr.Lock(repoType)
if err != nil {
return nil, err
}
lr.(*fsLockedRepo).readonly = true
return lr, nil
}
type fsLockedRepo struct {
path string
configPath string
repoType RepoType
closer io.Closer
readonly bool
ds map[string]datastore.Batching
dsErr error

View File

@ -14,7 +14,7 @@ import (
ldbopts "github.com/syndtr/goleveldb/leveldb/opt"
)
type dsCtor func(path string) (datastore.Batching, error)
type dsCtor func(path string, readonly bool) (datastore.Batching, error)
var fsDatastores = map[string]dsCtor{
"chain": chainBadgerDs,
@ -26,9 +26,10 @@ var fsDatastores = map[string]dsCtor{
"client": badgerDs, // client specific
}
func chainBadgerDs(path string) (datastore.Batching, error) {
func chainBadgerDs(path string, readonly bool) (datastore.Batching, error) {
opts := badger.DefaultOptions
opts.GcInterval = 0 // disable GC for chain datastore
opts.ReadOnly = readonly
opts.Options = dgbadger.DefaultOptions("").WithTruncate(true).
WithValueThreshold(1 << 10)
@ -36,23 +37,26 @@ func chainBadgerDs(path string) (datastore.Batching, error) {
return badger.NewDatastore(path, &opts)
}
func badgerDs(path string) (datastore.Batching, error) {
func badgerDs(path string, readonly bool) (datastore.Batching, error) {
opts := badger.DefaultOptions
opts.ReadOnly = readonly
opts.Options = dgbadger.DefaultOptions("").WithTruncate(true).
WithValueThreshold(1 << 10)
return badger.NewDatastore(path, &opts)
}
func levelDs(path string) (datastore.Batching, error) {
func levelDs(path string, readonly bool) (datastore.Batching, error) {
return levelds.NewDatastore(path, &levelds.Options{
Compression: ldbopts.NoCompression,
NoSync: false,
Strict: ldbopts.StrictAll,
ReadOnly: readonly,
})
}
func (fsr *fsLockedRepo) openDatastores() (map[string]datastore.Batching, error) {
func (fsr *fsLockedRepo) openDatastores(readonly bool) (map[string]datastore.Batching, error) {
if err := os.MkdirAll(fsr.join(fsDatastore), 0755); err != nil {
return nil, xerrors.Errorf("mkdir %s: %w", fsr.join(fsDatastore), err)
}
@ -63,7 +67,7 @@ func (fsr *fsLockedRepo) openDatastores() (map[string]datastore.Batching, error)
prefix := datastore.NewKey(p)
// TODO: optimization: don't init datastores we don't need
ds, err := ctor(fsr.join(filepath.Join(fsDatastore, p)))
ds, err := ctor(fsr.join(filepath.Join(fsDatastore, p)), readonly)
if err != nil {
return nil, xerrors.Errorf("opening datastore %s: %w", prefix, err)
}
@ -78,7 +82,7 @@ func (fsr *fsLockedRepo) openDatastores() (map[string]datastore.Batching, error)
func (fsr *fsLockedRepo) Datastore(ns string) (datastore.Batching, error) {
fsr.dsOnce.Do(func() {
fsr.ds, fsr.dsErr = fsr.openDatastores()
fsr.ds, fsr.dsErr = fsr.openDatastores(fsr.readonly)
})
if fsr.dsErr != nil {