implement sector dir aggregator

This commit is contained in:
whyrusleeping 2019-12-08 21:06:40 +01:00
parent 2dc12e8bb3
commit ce9be69d91
2 changed files with 182 additions and 4 deletions

View File

@ -3,17 +3,21 @@ package main
import (
"fmt"
"os"
"path/filepath"
"encoding/json"
badger "github.com/ipfs/go-ds-badger"
logging "github.com/ipfs/go-log"
"github.com/mitchellh/go-homedir"
"golang.org/x/xerrors"
"gopkg.in/urfave/cli.v2"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/cmd/lotus-seed/seed"
"github.com/filecoin-project/lotus/genesis"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
)
var log = logging.Logger("lotus-seed")
@ -26,6 +30,7 @@ func main() {
local := []*cli.Command{
preSealCmd,
aggregateManifestsCmd,
aggregateSectorDirsCmd,
}
app := &cli.App{
@ -138,6 +143,134 @@ var aggregateManifestsCmd = &cli.Command{
},
}
var aggregateSectorDirsCmd = &cli.Command{
Name: "aggregate-sector-dirs",
Usage: "aggregate a set of preseal manifests into a single file",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "miner",
Usage: "Specify address of miner to aggregate sectorbuilders for",
},
&cli.StringFlag{
Name: "dest",
Usage: "specify directory to create aggregate sector store in",
},
&cli.Uint64Flag{
Name: "sector-size",
Usage: "specify size of sectors to aggregate",
Value: 32 * 1024 * 1024 * 1024,
},
},
Action: func(cctx *cli.Context) error {
if cctx.String("miner") == "" {
return fmt.Errorf("must specify miner address with --miner")
}
if cctx.String("dest") == "" {
return fmt.Errorf("must specify dest directory with --dest")
}
maddr, err := address.NewFromString(cctx.String("miner"))
if err != nil {
return err
}
destdir, err := homedir.Expand(cctx.String("dest"))
if err != nil {
return err
}
if err := os.MkdirAll(cctx.String("dest"), 0755); err != nil {
return err
}
agmds, err := badger.NewDatastore(filepath.Join(destdir, "badger"), nil)
if err != nil {
return err
}
defer agmds.Close()
ssize := cctx.Uint64("sector-size")
agsb, err := sectorbuilder.New(&sectorbuilder.Config{
Miner: maddr,
SectorSize: ssize,
CacheDir: filepath.Join(destdir, "cache"),
SealedDir: filepath.Join(destdir, "sealed"),
StagedDir: filepath.Join(destdir, "staging"),
UnsealedDir: filepath.Join(destdir, "unsealed"),
WorkerThreads: 2,
}, agmds)
if err != nil {
return err
}
var highestSectorID uint64
for _, dir := range cctx.Args().Slice() {
dir, err := homedir.Expand(dir)
if err != nil {
return xerrors.Errorf("failed to expand %q: %w", dir, err)
}
st, err := os.Stat(dir)
if err != nil {
return err
}
if !st.IsDir() {
return fmt.Errorf("%q was not a directory", dir)
}
fi, err := os.Open(filepath.Join(dir, "pre-seal-"+maddr.String()+".json"))
if err != nil {
return err
}
var genm genesis.GenesisMiner
if err := json.NewDecoder(fi).Decode(&genm); err != nil {
return err
}
if genm.SectorSize != ssize {
return xerrors.Errorf("sector size mismatch in %q", dir)
}
for _, s := range genm.Sectors {
if s.SectorID > highestSectorID {
highestSectorID = s.SectorID
}
}
mds, err := badger.NewDatastore(filepath.Join(dir, "badger"), nil)
if err != nil {
return err
}
defer mds.Close()
sb, err := sectorbuilder.New(&sectorbuilder.Config{
Miner: maddr,
SectorSize: genm.SectorSize,
CacheDir: filepath.Join(dir, "cache"),
SealedDir: filepath.Join(dir, "sealed"),
StagedDir: filepath.Join(dir, "staging"),
UnsealedDir: filepath.Join(dir, "unsealed"),
WorkerThreads: 2,
}, mds)
if err != nil {
return err
}
if err := agsb.ImportFrom(sb); err != nil {
return xerrors.Errorf("importing sectors from %q failed: %w", dir, err)
}
}
if err := agsb.SetLastSectorID(highestSectorID); err != nil {
return err
}
return nil
},
}
func mergeGenMiners(a, b genesis.GenesisMiner) genesis.GenesisMiner {
if a.SectorSize != b.SectorSize {
panic("sector sizes mismatch")

View File

@ -688,15 +688,15 @@ func fallbackPostChallengeCount(sectors uint64) uint64 {
}
func (sb *SectorBuilder) ImportFrom(osb *SectorBuilder) error {
if err := moveAllFiles(osb.cacheDir, sb.cacheDir); err != nil {
if err := copyAllFiles(osb.cacheDir, sb.cacheDir); err != nil {
return err
}
if err := moveAllFiles(osb.sealedDir, sb.sealedDir); err != nil {
if err := copyAllFiles(osb.sealedDir, sb.sealedDir); err != nil {
return err
}
if err := moveAllFiles(osb.stagedDir, sb.stagedDir); err != nil {
if err := copyAllFiles(osb.stagedDir, sb.stagedDir); err != nil {
return err
}
@ -714,7 +714,16 @@ func (sb *SectorBuilder) ImportFrom(osb *SectorBuilder) error {
return nil
}
func moveAllFiles(from, to string) error {
func (sb *SectorBuilder) SetLastSectorID(id uint64) error {
if err := sb.ds.Put(lastSectorIdKey, []byte(fmt.Sprint(id))); err != nil {
return err
}
sb.lastID = id
return nil
}
func copyAllFiles(from, to string) error {
dir, err := os.Open(from)
if err != nil {
return err
@ -732,3 +741,39 @@ func moveAllFiles(from, to string) error {
return nil
}
func copyFile(from, to string) error {
st, err := os.Stat(to)
if err != nil {
if !os.IsNotExist(err) {
return xerrors.Errorf("stat of target file returned unexpected error: %w", err)
}
}
if st.IsDir() {
return xerrors.Errorf("copying directories not handled")
}
if st != nil {
log.Warn("destination file %q already exists! skipping copy...", to)
return nil
}
dst, err := os.Create(to)
if err != nil {
return xerrors.Errorf("failed to create target file: %w", err)
}
defer dst.Close()
src, err := os.Open(from)
if err != nil {
return xerrors.Errorf("failed to open source file: %w", err)
}
defer src.Close()
if _, err := io.Copy(dst, src); err != nil {
return xerrors.Errorf("copy failed: %w", err)
}
return nil
}