Merge pull request #796 from filecoin-project/feat/seed-aggregation
implement lotus-seed aggregation commands
This commit is contained in:
commit
17c112acc0
@ -1,15 +1,23 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"encoding/json"
|
||||
|
||||
badger "github.com/ipfs/go-ds-badger"
|
||||
logging "github.com/ipfs/go-log"
|
||||
"github.com/mitchellh/go-homedir"
|
||||
"golang.org/x/xerrors"
|
||||
"gopkg.in/urfave/cli.v2"
|
||||
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/address"
|
||||
"github.com/filecoin-project/lotus/cmd/lotus-seed/seed"
|
||||
"github.com/filecoin-project/lotus/genesis"
|
||||
"github.com/filecoin-project/lotus/lib/sectorbuilder"
|
||||
)
|
||||
|
||||
var log = logging.Logger("lotus-seed")
|
||||
@ -21,6 +29,8 @@ func main() {
|
||||
|
||||
local := []*cli.Command{
|
||||
preSealCmd,
|
||||
aggregateManifestsCmd,
|
||||
aggregateSectorDirsCmd,
|
||||
}
|
||||
|
||||
app := &cli.App{
|
||||
@ -92,3 +102,192 @@ var preSealCmd = &cli.Command{
|
||||
return seed.WriteGenesisMiner(maddr, sbroot, gm)
|
||||
},
|
||||
}
|
||||
|
||||
var aggregateManifestsCmd = &cli.Command{
|
||||
Name: "aggregate-manifests",
|
||||
Usage: "aggregate a set of preseal manifests into a single file",
|
||||
Action: func(cctx *cli.Context) error {
|
||||
var inputs []map[string]genesis.GenesisMiner
|
||||
for _, infi := range cctx.Args().Slice() {
|
||||
fi, err := os.Open(infi)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer fi.Close()
|
||||
var val map[string]genesis.GenesisMiner
|
||||
if err := json.NewDecoder(fi).Decode(&val); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
inputs = append(inputs, val)
|
||||
}
|
||||
|
||||
output := make(map[string]genesis.GenesisMiner)
|
||||
for _, in := range inputs {
|
||||
for maddr, val := range in {
|
||||
if gm, ok := output[maddr]; ok {
|
||||
output[maddr] = mergeGenMiners(gm, val)
|
||||
} else {
|
||||
output[maddr] = val
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
blob, err := json.MarshalIndent(output, "", " ")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fmt.Println(string(blob))
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
var aggregateSectorDirsCmd = &cli.Command{
|
||||
Name: "aggregate-sector-dirs",
|
||||
Usage: "aggregate a set of preseal manifests into a single file",
|
||||
Flags: []cli.Flag{
|
||||
&cli.StringFlag{
|
||||
Name: "miner",
|
||||
Usage: "Specify address of miner to aggregate sectorbuilders for",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "dest",
|
||||
Usage: "specify directory to create aggregate sector store in",
|
||||
},
|
||||
&cli.Uint64Flag{
|
||||
Name: "sector-size",
|
||||
Usage: "specify size of sectors to aggregate",
|
||||
Value: 32 * 1024 * 1024 * 1024,
|
||||
},
|
||||
},
|
||||
Action: func(cctx *cli.Context) error {
|
||||
if cctx.String("miner") == "" {
|
||||
return fmt.Errorf("must specify miner address with --miner")
|
||||
}
|
||||
if cctx.String("dest") == "" {
|
||||
return fmt.Errorf("must specify dest directory with --dest")
|
||||
}
|
||||
|
||||
maddr, err := address.NewFromString(cctx.String("miner"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
destdir, err := homedir.Expand(cctx.String("dest"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(cctx.String("dest"), 0755); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
agmds, err := badger.NewDatastore(filepath.Join(destdir, "badger"), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer agmds.Close()
|
||||
|
||||
ssize := cctx.Uint64("sector-size")
|
||||
|
||||
agsb, err := sectorbuilder.New(§orbuilder.Config{
|
||||
Miner: maddr,
|
||||
SectorSize: ssize,
|
||||
CacheDir: filepath.Join(destdir, "cache"),
|
||||
SealedDir: filepath.Join(destdir, "sealed"),
|
||||
StagedDir: filepath.Join(destdir, "staging"),
|
||||
UnsealedDir: filepath.Join(destdir, "unsealed"),
|
||||
WorkerThreads: 2,
|
||||
}, agmds)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var highestSectorID uint64
|
||||
for _, dir := range cctx.Args().Slice() {
|
||||
dir, err := homedir.Expand(dir)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to expand %q: %w", dir, err)
|
||||
}
|
||||
|
||||
st, err := os.Stat(dir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !st.IsDir() {
|
||||
return fmt.Errorf("%q was not a directory", dir)
|
||||
}
|
||||
|
||||
fi, err := os.Open(filepath.Join(dir, "pre-seal-"+maddr.String()+".json"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var genmm map[string]genesis.GenesisMiner
|
||||
if err := json.NewDecoder(fi).Decode(&genmm); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
genm, ok := genmm[maddr.String()]
|
||||
if !ok {
|
||||
return xerrors.Errorf("input data did not have our miner in it (%s)", maddr)
|
||||
}
|
||||
|
||||
if genm.SectorSize != ssize {
|
||||
return xerrors.Errorf("sector size mismatch in %q (%d != %d)", dir)
|
||||
}
|
||||
|
||||
for _, s := range genm.Sectors {
|
||||
if s.SectorID > highestSectorID {
|
||||
highestSectorID = s.SectorID
|
||||
}
|
||||
}
|
||||
|
||||
opts := badger.DefaultOptions
|
||||
opts.ReadOnly = true
|
||||
mds, err := badger.NewDatastore(filepath.Join(dir, "badger"), &opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer mds.Close()
|
||||
|
||||
sb, err := sectorbuilder.New(§orbuilder.Config{
|
||||
Miner: maddr,
|
||||
SectorSize: genm.SectorSize,
|
||||
CacheDir: filepath.Join(dir, "cache"),
|
||||
SealedDir: filepath.Join(dir, "sealed"),
|
||||
StagedDir: filepath.Join(dir, "staging"),
|
||||
UnsealedDir: filepath.Join(dir, "unsealed"),
|
||||
WorkerThreads: 2,
|
||||
}, mds)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := agsb.ImportFrom(sb); err != nil {
|
||||
return xerrors.Errorf("importing sectors from %q failed: %w", dir, err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := agsb.SetLastSectorID(highestSectorID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
func mergeGenMiners(a, b genesis.GenesisMiner) genesis.GenesisMiner {
|
||||
if a.SectorSize != b.SectorSize {
|
||||
panic("sector sizes mismatch")
|
||||
}
|
||||
|
||||
return genesis.GenesisMiner{
|
||||
Owner: a.Owner,
|
||||
Worker: a.Worker,
|
||||
SectorSize: a.SectorSize,
|
||||
Key: a.Key,
|
||||
Sectors: append(a.Sectors, b.Sectors...),
|
||||
}
|
||||
}
|
||||
|
1
go.mod
1
go.mod
@ -80,6 +80,7 @@ require (
|
||||
github.com/onsi/ginkgo v1.9.0 // indirect
|
||||
github.com/onsi/gomega v1.6.0 // indirect
|
||||
github.com/opentracing/opentracing-go v1.1.0
|
||||
github.com/otiai10/copy v1.0.2
|
||||
github.com/polydawn/refmt v0.0.0-20190809202753-05966cbd336a
|
||||
github.com/smartystreets/assertions v1.0.1 // indirect
|
||||
github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337 // indirect
|
||||
|
6
go.sum
6
go.sum
@ -510,6 +510,12 @@ github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFSt
|
||||
github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU=
|
||||
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
|
||||
github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw=
|
||||
github.com/otiai10/copy v1.0.2 h1:DDNipYy6RkIkjMwy+AWzgKiNTyj2RUI9yEMeETEpVyc=
|
||||
github.com/otiai10/copy v1.0.2/go.mod h1:c7RpqBkwMom4bYTSkLSym4VSJz/XtncWRAj/J4PEIMY=
|
||||
github.com/otiai10/curr v0.0.0-20150429015615-9b4961190c95 h1:+OLn68pqasWca0z5ryit9KGfp3sUsW4Lqg32iRMJyzs=
|
||||
github.com/otiai10/curr v0.0.0-20150429015615-9b4961190c95/go.mod h1:9qAhocn7zKJG+0mI8eUu6xqkFDYS2kb2saOteoSB3cE=
|
||||
github.com/otiai10/mint v1.3.0 h1:Ady6MKVezQwHBkGzLFbrsywyp09Ah7rkmfjV3Bcr5uc=
|
||||
github.com/otiai10/mint v1.3.0/go.mod h1:F5AjcsTsWUqX+Na9fpHb52P8pcRX2CI6A3ctIT91xUo=
|
||||
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
|
||||
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
|
||||
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
|
@ -4,7 +4,6 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@ -12,6 +11,7 @@ import (
|
||||
sectorbuilder "github.com/filecoin-project/filecoin-ffi"
|
||||
"github.com/ipfs/go-datastore"
|
||||
logging "github.com/ipfs/go-log"
|
||||
dcopy "github.com/otiai10/copy"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
@ -688,15 +688,15 @@ func fallbackPostChallengeCount(sectors uint64) uint64 {
|
||||
}
|
||||
|
||||
func (sb *SectorBuilder) ImportFrom(osb *SectorBuilder) error {
|
||||
if err := moveAllFiles(osb.cacheDir, sb.cacheDir); err != nil {
|
||||
if err := dcopy.Copy(osb.cacheDir, sb.cacheDir); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := moveAllFiles(osb.sealedDir, sb.sealedDir); err != nil {
|
||||
if err := dcopy.Copy(osb.sealedDir, sb.sealedDir); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := moveAllFiles(osb.stagedDir, sb.stagedDir); err != nil {
|
||||
if err := dcopy.Copy(osb.stagedDir, sb.stagedDir); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -714,21 +714,11 @@ func (sb *SectorBuilder) ImportFrom(osb *SectorBuilder) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func moveAllFiles(from, to string) error {
|
||||
dir, err := os.Open(from)
|
||||
if err != nil {
|
||||
func (sb *SectorBuilder) SetLastSectorID(id uint64) error {
|
||||
if err := sb.ds.Put(lastSectorIdKey, []byte(fmt.Sprint(id))); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
names, err := dir.Readdirnames(0)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to list items in dir: %w", err)
|
||||
}
|
||||
for _, n := range names {
|
||||
if err := os.Rename(filepath.Join(from, n), filepath.Join(to, n)); err != nil {
|
||||
return xerrors.Errorf("moving file failed: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
sb.lastID = id
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user