migrate repo.Datastore(/chain) to repo.Blockstore().

This commit is contained in:
Raúl Kripalani 2020-11-01 13:03:21 +00:00
parent 9437136f84
commit 099c4b5e1d
7 changed files with 132 additions and 110 deletions

View File

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"sync/atomic" "sync/atomic"
"time" "time"
@ -138,12 +139,20 @@ func NewGeneratorWithSectors(numSectors int) (*ChainGen, error) {
return nil, xerrors.Errorf("failed to get metadata datastore: %w", err) return nil, xerrors.Errorf("failed to get metadata datastore: %w", err)
} }
bds, err := lr.Datastore("/chain") bs, err := lr.Blockstore(repo.BlockstoreChain)
if err != nil { if err != nil {
return nil, xerrors.Errorf("failed to get blocks datastore: %w", err) return nil, err
} }
bs := mybs{blockstore.NewBlockstore(bds)} defer func() {
if c, ok := bs.(io.Closer); ok {
if err := c.Close(); err != nil {
log.Warnf("failed to close blockstore: %s", err)
}
}
}()
bs = mybs{bs}
ks, err := lr.KeyStore() ks, err := lr.KeyStore()
if err != nil { if err != nil {

View File

@ -3,6 +3,7 @@ package store_test
import ( import (
"bytes" "bytes"
"context" "context"
"io"
"testing" "testing"
datastore "github.com/ipfs/go-datastore" datastore "github.com/ipfs/go-datastore"
@ -51,18 +52,24 @@ func BenchmarkGetRandomness(b *testing.B) {
b.Fatal(err) b.Fatal(err)
} }
bds, err := lr.Datastore("/chain") bs, err := lr.Blockstore(repo.BlockstoreChain)
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }
defer func() {
if c, ok := bs.(io.Closer); ok {
if err := c.Close(); err != nil {
b.Logf("WARN: failed to close blockstore: %s", err)
}
}
}()
mds, err := lr.Datastore("/metadata") mds, err := lr.Datastore("/metadata")
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }
bs := blockstore.NewBlockstore(bds)
cs := store.NewChainStore(bs, mds, nil, nil) cs := store.NewChainStore(bs, mds, nil, nil)
b.ResetTimer() b.ResetTimer()

View File

@ -3,6 +3,7 @@ package main
import ( import (
"context" "context"
"fmt" "fmt"
"io"
"strconv" "strconv"
"github.com/filecoin-project/lotus/chain/gen/genesis" "github.com/filecoin-project/lotus/chain/gen/genesis"
@ -10,6 +11,7 @@ import (
_init "github.com/filecoin-project/lotus/chain/actors/builtin/init" _init "github.com/filecoin-project/lotus/chain/actors/builtin/init"
"github.com/docker/go-units" "github.com/docker/go-units"
"github.com/filecoin-project/lotus/chain/actors/builtin" "github.com/filecoin-project/lotus/chain/actors/builtin"
"github.com/filecoin-project/lotus/chain/actors/builtin/multisig" "github.com/filecoin-project/lotus/chain/actors/builtin/multisig"
"github.com/filecoin-project/lotus/chain/actors/builtin/power" "github.com/filecoin-project/lotus/chain/actors/builtin/power"
@ -24,6 +26,7 @@ import (
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lotus/chain/actors/adt" "github.com/filecoin-project/lotus/chain/actors/adt"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/state" "github.com/filecoin-project/lotus/chain/state"
@ -33,7 +36,6 @@ import (
"github.com/filecoin-project/lotus/chain/vm" "github.com/filecoin-project/lotus/chain/vm"
lcli "github.com/filecoin-project/lotus/cli" lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/lib/blockstore"
"github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/node/repo"
) )
@ -168,18 +170,24 @@ var chainBalanceStateCmd = &cli.Command{
defer lkrepo.Close() //nolint:errcheck defer lkrepo.Close() //nolint:errcheck
ds, err := lkrepo.Datastore("/chain") bs, err := lkrepo.Blockstore(repo.BlockstoreChain)
if err != nil { if err != nil {
return err return fmt.Errorf("failed to open blockstore: %w", err)
} }
defer func() {
if c, ok := bs.(io.Closer); ok {
if err := c.Close(); err != nil {
log.Warnf("failed to close blockstore: %s", err)
}
}
}()
mds, err := lkrepo.Datastore("/metadata") mds, err := lkrepo.Datastore("/metadata")
if err != nil { if err != nil {
return err return err
} }
bs := blockstore.NewBlockstore(ds)
cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil) cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
cst := cbor.NewCborStore(bs) cst := cbor.NewCborStore(bs)
@ -382,18 +390,24 @@ var chainPledgeCmd = &cli.Command{
defer lkrepo.Close() //nolint:errcheck defer lkrepo.Close() //nolint:errcheck
ds, err := lkrepo.Datastore("/chain") bs, err := lkrepo.Blockstore(repo.BlockstoreChain)
if err != nil { if err != nil {
return err return xerrors.Errorf("failed to open blockstore: %w", err)
} }
defer func() {
if c, ok := bs.(io.Closer); ok {
if err := c.Close(); err != nil {
log.Warnf("failed to close blockstore: %s", err)
}
}
}()
mds, err := lkrepo.Datastore("/metadata") mds, err := lkrepo.Datastore("/metadata")
if err != nil { if err != nil {
return err return err
} }
bs := blockstore.NewBlockstore(ds)
cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil) cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
cst := cbor.NewCborStore(bs) cst := cbor.NewCborStore(bs)

View File

@ -3,16 +3,17 @@ package main
import ( import (
"context" "context"
"fmt" "fmt"
"io"
"os" "os"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli" lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/lib/blockstore"
"github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/node/repo"
) )
@ -71,18 +72,24 @@ var exportChainCmd = &cli.Command{
defer fi.Close() //nolint:errcheck defer fi.Close() //nolint:errcheck
ds, err := lr.Datastore("/chain") bs, err := lr.Blockstore(repo.BlockstoreChain)
if err != nil { if err != nil {
return err return fmt.Errorf("failed to open blockstore: %w", err)
} }
defer func() {
if c, ok := bs.(io.Closer); ok {
if err := c.Close(); err != nil {
log.Warnf("failed to close blockstore: %s", err)
}
}
}()
mds, err := lr.Datastore("/metadata") mds, err := lr.Datastore("/metadata")
if err != nil { if err != nil {
return err return err
} }
bs := blockstore.NewBlockstore(ds)
cs := store.NewChainStore(bs, mds, nil, nil) cs := store.NewChainStore(bs, mds, nil, nil)
if err := cs.Load(); err != nil { if err := cs.Load(); err != nil {
return err return err

View File

@ -12,7 +12,6 @@ import (
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/lib/blockstore"
"github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/node/repo"
) )
@ -45,12 +44,18 @@ var importCarCmd = &cli.Command{
return xerrors.Errorf("opening the car file: %w", err) return xerrors.Errorf("opening the car file: %w", err)
} }
ds, err := lr.Datastore("/chain") bs, err := lr.Blockstore(repo.BlockstoreChain)
if err != nil { if err != nil {
return err return err
} }
bs := blockstore.NewBlockstore(ds) defer func() {
if c, ok := bs.(io.Closer); ok {
if err := c.Close(); err != nil {
log.Warnf("failed to close blockstore: %s", err)
}
}
}()
cr, err := car.NewCarReader(f) cr, err := car.NewCarReader(f)
if err != nil { if err != nil {
@ -65,7 +70,7 @@ var importCarCmd = &cli.Command{
return err return err
} }
fmt.Println() fmt.Println()
return ds.Close() return nil
default: default:
if err := f.Close(); err != nil { if err := f.Close(); err != nil {
return err return err
@ -108,12 +113,18 @@ var importObjectCmd = &cli.Command{
} }
defer lr.Close() //nolint:errcheck defer lr.Close() //nolint:errcheck
ds, err := lr.Datastore("/chain") bs, err := lr.Blockstore(repo.BlockstoreChain)
if err != nil { if err != nil {
return err return fmt.Errorf("failed to open blockstore: %w", err)
} }
bs := blockstore.NewBlockstore(ds) defer func() {
if c, ok := bs.(io.Closer); ok {
if err := c.Close(); err != nil {
log.Warnf("failed to close blockstore: %s", err)
}
}
}()
c, err := cid.Decode(cctx.Args().Get(0)) c, err := cid.Decode(cctx.Args().Get(0))
if err != nil { if err != nil {

View File

@ -3,20 +3,19 @@ package main
import ( import (
"context" "context"
"fmt" "fmt"
"io"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/ipfs/bbloom"
"github.com/ipfs/go-cid"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/vm" "github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/lib/blockstore" badgerbs "github.com/filecoin-project/lotus/lib/blockstore/badger"
"github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/node/repo"
"github.com/ipfs/bbloom"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
dshelp "github.com/ipfs/go-ipfs-ds-help"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
) )
type cidSet interface { type cidSet interface {
@ -132,12 +131,25 @@ var stateTreePruneCmd = &cli.Command{
defer lkrepo.Close() //nolint:errcheck defer lkrepo.Close() //nolint:errcheck
ds, err := lkrepo.Datastore("/chain") bs, err := lkrepo.Blockstore(repo.BlockstoreChain)
if err != nil { if err != nil {
return err return fmt.Errorf("failed to open blockstore: %w", err)
} }
defer ds.Close() //nolint:errcheck defer func() {
if c, ok := bs.(io.Closer); ok {
if err := c.Close(); err != nil {
log.Warnf("failed to close blockstore: %s", err)
}
}
}()
// After migrating to native blockstores, this has been made
// database-specific.
badgbs, ok := bs.(*badgerbs.Blockstore)
if !ok {
return fmt.Errorf("only badger blockstores are supported")
}
mds, err := lkrepo.Datastore("/metadata") mds, err := lkrepo.Datastore("/metadata")
if err != nil { if err != nil {
@ -145,23 +157,18 @@ var stateTreePruneCmd = &cli.Command{
} }
defer mds.Close() //nolint:errcheck defer mds.Close() //nolint:errcheck
const DiscardRatio = 0.2
if cctx.Bool("only-ds-gc") { if cctx.Bool("only-ds-gc") {
gcds, ok := ds.(datastore.GCDatastore) fmt.Println("running datastore gc....")
if ok { for i := 0; i < cctx.Int("gc-count"); i++ {
fmt.Println("running datastore gc....") if err := badgbs.DB.RunValueLogGC(DiscardRatio); err != nil {
for i := 0; i < cctx.Int("gc-count"); i++ { return xerrors.Errorf("datastore GC failed: %w", err)
if err := gcds.CollectGarbage(); err != nil {
return xerrors.Errorf("datastore GC failed: %w", err)
}
} }
fmt.Println("gc complete!")
return nil
} }
return fmt.Errorf("datastore doesnt support gc") fmt.Println("gc complete!")
return nil
} }
bs := blockstore.NewBlockstore(ds)
cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil) cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
if err := cs.Load(); err != nil { if err := cs.Load(); err != nil {
return fmt.Errorf("loading chainstore: %w", err) return fmt.Errorf("loading chainstore: %w", err)
@ -199,63 +206,30 @@ var stateTreePruneCmd = &cli.Command{
return nil return nil
} }
var b datastore.Batch b := badgbs.DB.NewWriteBatch()
var batchCount int defer b.Cancel()
markForRemoval := func(c cid.Cid) error { markForRemoval := func(c cid.Cid) error {
if b == nil { return b.Delete(badgbs.PrefixedKey(c))
nb, err := ds.Batch()
if err != nil {
return fmt.Errorf("opening batch: %w", err)
}
b = nb
}
batchCount++
if err := b.Delete(dshelp.MultihashToDsKey(c.Hash())); err != nil {
return err
}
if batchCount > 100 {
if err := b.Commit(); err != nil {
return xerrors.Errorf("failed to commit batch deletes: %w", err)
}
b = nil
batchCount = 0
}
return nil
} }
res, err := ds.Query(query.Query{KeysOnly: true}) keys, err := bs.AllKeysChan(context.Background())
if err != nil { if err != nil {
return xerrors.Errorf("failed to query datastore: %w", err) return xerrors.Errorf("failed to query blockstore: %w", err)
} }
dupTo := cctx.Int("delete-up-to") dupTo := cctx.Int("delete-up-to")
var deleteCount int var deleteCount int
var goodHits int var goodHits int
for { for k := range keys {
v, ok := res.NextSync() if goodSet.HasRaw(k.Bytes()) {
if !ok {
break
}
bk, err := dshelp.BinaryFromDsKey(datastore.RawKey(v.Key[len("/blocks"):]))
if err != nil {
return xerrors.Errorf("failed to parse key: %w", err)
}
if goodSet.HasRaw(bk) {
goodHits++ goodHits++
continue continue
} }
nc := cid.NewCidV1(cid.Raw, bk) if err := markForRemoval(k); err != nil {
return fmt.Errorf("failed to remove cid %s: %w", k, err)
deleteCount++
if err := markForRemoval(nc); err != nil {
return fmt.Errorf("failed to remove cid %s: %w", nc, err)
} }
if deleteCount%20 == 0 { if deleteCount%20 == 0 {
@ -267,22 +241,17 @@ var stateTreePruneCmd = &cli.Command{
} }
} }
if b != nil { if err := b.Flush(); err != nil {
if err := b.Commit(); err != nil { return xerrors.Errorf("failed to flush final batch delete: %w", err)
return xerrors.Errorf("failed to commit final batch delete: %w", err)
}
} }
gcds, ok := ds.(datastore.GCDatastore) fmt.Println("running datastore gc....")
if ok { for i := 0; i < cctx.Int("gc-count"); i++ {
fmt.Println("running datastore gc....") if err := badgbs.DB.RunValueLogGC(DiscardRatio); err != nil {
for i := 0; i < cctx.Int("gc-count"); i++ { return xerrors.Errorf("datastore GC failed: %w", err)
if err := gcds.CollectGarbage(); err != nil {
return xerrors.Errorf("datastore GC failed: %w", err)
}
} }
fmt.Println("gc complete!")
} }
fmt.Println("gc complete!")
return nil return nil
}, },

View File

@ -35,7 +35,6 @@ import (
lcli "github.com/filecoin-project/lotus/cli" lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/lib/blockstore"
"github.com/filecoin-project/lotus/lib/peermgr" "github.com/filecoin-project/lotus/lib/peermgr"
"github.com/filecoin-project/lotus/lib/ulimit" "github.com/filecoin-project/lotus/lib/ulimit"
"github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/metrics"
@ -399,18 +398,24 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) {
} }
defer lr.Close() //nolint:errcheck defer lr.Close() //nolint:errcheck
ds, err := lr.Datastore("/chain") bs, err := lr.Blockstore(repo.BlockstoreChain)
if err != nil { if err != nil {
return err return xerrors.Errorf("failed to open blockstore: %w", err)
} }
defer func() {
if c, ok := bs.(io.Closer); ok {
if err := c.Close(); err != nil {
log.Warnf("failed to close blockstore: %s", err)
}
}
}()
mds, err := lr.Datastore("/metadata") mds, err := lr.Datastore("/metadata")
if err != nil { if err != nil {
return err return err
} }
bs := blockstore.NewBlockstore(ds)
j, err := journal.OpenFSJournal(lr, journal.EnvDisabledEvents()) j, err := journal.OpenFSJournal(lr, journal.EnvDisabledEvents())
if err != nil { if err != nil {
return xerrors.Errorf("failed to open journal: %w", err) return xerrors.Errorf("failed to open journal: %w", err)