diff --git a/api/api_storage.go b/api/api_storage.go index 0988d3bce..e8712b3b4 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -1,14 +1,13 @@ package api import ( + "bytes" "context" "github.com/ipfs/go-cid" "github.com/filecoin-project/go-address" "github.com/filecoin-project/specs-actors/actors/abi" - - "github.com/filecoin-project/lotus/storage/sealmgr" ) // alias because cbor-gen doesn't like non-alias types @@ -136,8 +135,8 @@ type SectorInfo struct { CommR *cid.Cid Proof []byte Deals []abi.DealID - Ticket sealmgr.SealTicket - Seed sealmgr.SealSeed + Ticket SealTicket + Seed SealSeed Retries uint64 LastErr string @@ -154,3 +153,21 @@ type SealedRef struct { type SealedRefs struct { Refs []SealedRef } + +type SealTicket struct { + Value abi.SealRandomness + Epoch abi.ChainEpoch +} + +type SealSeed struct { + Value abi.InteractiveSealRandomness + Epoch abi.ChainEpoch +} + +func (st *SealTicket) Equals(ost *SealTicket) bool { + return bytes.Equal(st.Value, ost.Value) && st.Epoch == ost.Epoch +} + +func (st *SealSeed) Equals(ost *SealSeed) bool { + return bytes.Equal(st.Value, ost.Value) && st.Epoch == ost.Epoch +} \ No newline at end of file diff --git a/cmd/lotus-seed/main.go b/cmd/lotus-seed/main.go index 2096ac0ff..2daa83325 100644 --- a/cmd/lotus-seed/main.go +++ b/cmd/lotus-seed/main.go @@ -6,18 +6,12 @@ import ( "fmt" "io/ioutil" "os" - "path/filepath" - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" - badger "github.com/ipfs/go-ds-badger2" logging "github.com/ipfs/go-log/v2" "github.com/mitchellh/go-homedir" - "golang.org/x/xerrors" "gopkg.in/urfave/cli.v2" "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" @@ -38,7 +32,6 @@ func main() { preSealCmd, aggregateManifestsCmd, - aggregateSectorDirsCmd, } app := &cli.App{ @@ -174,149 +167,6 @@ var aggregateManifestsCmd = &cli.Command{ }, } -var aggregateSectorDirsCmd = &cli.Command{ - Name: "aggregate-sector-dirs", - Usage: "aggregate a set of preseal manifests into a single file", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "miner", - Usage: "Specify address of miner to aggregate sectorbuilders for", - }, - &cli.StringFlag{ - Name: "dest", - Usage: "specify directory to create aggregate sector store in", - }, - &cli.Uint64Flag{ - Name: "sector-size", - Usage: "specify size of sectors to aggregate", - Value: 32 * 1024 * 1024 * 1024, - }, - }, - Action: func(cctx *cli.Context) error { - if cctx.String("miner") == "" { - return fmt.Errorf("must specify miner address with --miner") - } - if cctx.String("dest") == "" { - return fmt.Errorf("must specify dest directory with --dest") - } - - maddr, err := address.NewFromString(cctx.String("miner")) - if err != nil { - return err - } - - destdir, err := homedir.Expand(cctx.String("dest")) - if err != nil { - return err - } - - if err := os.MkdirAll(destdir, 0755); err != nil { - return err - } - - agmds, err := badger.NewDatastore(filepath.Join(destdir, "badger"), nil) - if err != nil { - return err - } - defer agmds.Close() - - ssize := abi.SectorSize(cctx.Uint64("sector-size")) - - ppt, spt, err := lapi.ProofTypeFromSectorSize(abi.SectorSize(cctx.Uint64("sector-size"))) - if err != nil { - return err - } - - agsb, err := sectorbuilder.New(§orbuilder.Config{ - Miner: maddr, - SealProofType: spt, - PoStProofType: ppt, - Paths: sectorbuilder.SimplePath(destdir), - WorkerThreads: 2, - }, namespace.Wrap(agmds, datastore.NewKey("/sectorbuilder"))) - if err != nil { - return err - } - - var aggrGenMiner genesis.Miner - var highestSectorID abi.SectorNumber - for _, dir := range cctx.Args().Slice() { - dir, err := homedir.Expand(dir) - if err != nil { - return xerrors.Errorf("failed to expand %q: %w", dir, err) - } - - st, err := os.Stat(dir) - if err != nil { - return err - } - if !st.IsDir() { - return fmt.Errorf("%q was not a directory", dir) - } - - fi, err := os.Open(filepath.Join(dir, "pre-seal-"+maddr.String()+".json")) - if err != nil { - return err - } - - var genmm map[string]genesis.Miner - if err := json.NewDecoder(fi).Decode(&genmm); err != nil { - return err - } - - genm, ok := genmm[maddr.String()] - if !ok { - return xerrors.Errorf("input data did not have our miner in it (%s)", maddr) - } - - if genm.SectorSize != ssize { - return xerrors.Errorf("sector size mismatch in %q (%d != %d)", dir) - } - - for _, s := range genm.Sectors { - if s.SectorID > highestSectorID { - highestSectorID = s.SectorID - } - } - - aggrGenMiner = mergeGenMiners(aggrGenMiner, genm) - - opts := badger.DefaultOptions - opts.ReadOnly = true - mds, err := badger.NewDatastore(filepath.Join(dir, "badger"), &opts) - if err != nil { - return err - } - defer mds.Close() - - sb, err := sectorbuilder.New(§orbuilder.Config{ - Miner: maddr, - SealProofType: spt, - PoStProofType: ppt, - Paths: sectorbuilder.SimplePath(dir), - WorkerThreads: 2, - }, namespace.Wrap(mds, datastore.NewKey("/sectorbuilder"))) - if err != nil { - return err - } - - if err := agsb.ImportFrom(sb, false); err != nil { - return xerrors.Errorf("importing sectors from %q failed: %w", dir, err) - } - } - - if err := agsb.SetLastSectorNum(highestSectorID); err != nil { - return err - } - - if err := seed.WriteGenesisMiner(maddr, destdir, &aggrGenMiner, nil); err != nil { - return err - } - - return nil - }, -} - func mergeGenMiners(a, b genesis.Miner) genesis.Miner { if a.SectorSize != b.SectorSize { panic("sector sizes mismatch") diff --git a/cmd/lotus-seed/seed/seed.go b/cmd/lotus-seed/seed/seed.go index 8af65e3bb..b35d1cbc2 100644 --- a/cmd/lotus-seed/seed/seed.go +++ b/cmd/lotus-seed/seed/seed.go @@ -11,25 +11,23 @@ import ( "os" "path/filepath" + "github.com/google/uuid" + badger "github.com/ipfs/go-ds-badger2" + logging "github.com/ipfs/go-log/v2" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + sectorbuilder "github.com/filecoin-project/go-sectorbuilder" + "github.com/filecoin-project/go-sectorbuilder/fs" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/filecoin-project/specs-actors/actors/crypto" - "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" - badger "github.com/ipfs/go-ds-badger2" - logging "github.com/ipfs/go-log/v2" - "github.com/multiformats/go-multihash" - "golang.org/x/xerrors" - - sectorbuilder "github.com/filecoin-project/go-sectorbuilder" - - "github.com/filecoin-project/go-address" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/wallet" "github.com/filecoin-project/lotus/genesis" + "github.com/filecoin-project/lotus/node/config" ) var log = logging.Logger("preseal") @@ -46,12 +44,9 @@ func PreSeal(maddr address.Address, pt abi.RegisteredProof, offset abi.SectorNum } cfg := §orbuilder.Config{ - Miner: maddr, - SealProofType: spt, - PoStProofType: ppt, - FallbackLastNum: offset, - Paths: sectorbuilder.SimplePath(sbroot), - WorkerThreads: 2, + Miner: maddr, + SealProofType: spt, + PoStProofType: ppt, } if err := os.MkdirAll(sbroot, 0775); err != nil { @@ -63,7 +58,13 @@ func PreSeal(maddr address.Address, pt abi.RegisteredProof, offset abi.SectorNum return nil, nil, err } - sb, err := sectorbuilder.New(cfg, namespace.Wrap(mds, datastore.NewKey("/sectorbuilder"))) + sbfs := &fs.Basic{ + Miner: maddr, + NextID: offset, + Root: sbroot, + } + + sb, err := sectorbuilder.New(sbfs, cfg) if err != nil { return nil, nil, err } @@ -75,7 +76,7 @@ func PreSeal(maddr address.Address, pt abi.RegisteredProof, offset abi.SectorNum var sealedSectors []*genesis.PreSeal for i := 0; i < sectors; i++ { - sid, err := sb.AcquireSectorNumber() + sid, err := sbfs.AcquireSectorNumber() if err != nil { return nil, nil, err } @@ -90,12 +91,17 @@ func PreSeal(maddr address.Address, pt abi.RegisteredProof, offset abi.SectorNum fmt.Printf("sector-id: %d, piece info: %v\n", sid, pi) - scid, ucid, err := sb.SealPreCommit(context.TODO(), sid, ticket, []abi.PieceInfo{pi}) + in2, err := sb.SealPreCommit1(context.TODO(), sid, ticket, []abi.PieceInfo{pi}) if err != nil { return nil, nil, xerrors.Errorf("commit: %w", err) } - if err := sb.TrimCache(context.TODO(), sid); err != nil { + scid, ucid, err := sb.SealPreCommit2(context.TODO(), sid, in2) + if err != nil { + return nil, nil, xerrors.Errorf("commit: %w", err) + } + + if err := sb.FinalizeSector(context.TODO(), sid); err != nil { return nil, nil, xerrors.Errorf("trim cache: %w", err) } @@ -138,6 +144,22 @@ func PreSeal(maddr address.Address, pt abi.RegisteredProof, offset abi.SectorNum return nil, nil, xerrors.Errorf("closing datastore: %w", err) } + { + b, err := json.Marshal(&config.StorageMeta{ + ID: uuid.New().String(), + Weight: 0, // read-only + CanCommit: false, + CanStore: false, + }) + if err != nil { + return nil, nil, xerrors.Errorf("marshaling storage config: %w", err) + } + + if err := ioutil.WriteFile(filepath.Join(sbroot, "storage.json"), b, 0644); err != nil { + return nil, nil, xerrors.Errorf("persisting storage metadata (%s): %w", filepath.Join(sbroot, "storage.json"), err) + } + } + return miner, &minerAddr.KeyInfo, nil } @@ -172,19 +194,6 @@ func WriteGenesisMiner(maddr address.Address, sbroot string, gm *genesis.Miner, return nil } -func commDCID(commd []byte) cid.Cid { - d, err := cid.Prefix{ - Version: 1, - Codec: cid.Raw, - MhType: multihash.IDENTITY, - MhLength: len(commd), - }.Sum(commd) - if err != nil { - panic(err) - } - return d -} - func createDeals(m *genesis.Miner, k *wallet.Key, maddr address.Address, ssize abi.SectorSize) error { for _, sector := range m.Sectors { proposal := &market.DealProposal{ diff --git a/cmd/lotus-storage-miner/info.go b/cmd/lotus-storage-miner/info.go index 711234529..d3f13ad2c 100644 --- a/cmd/lotus-storage-miner/info.go +++ b/cmd/lotus-storage-miner/info.go @@ -72,7 +72,8 @@ var infoCmd = &cli.Command{ float64(10000*uint64(len(faults))/secCounts.Pset)/100.) } - // TODO: indicate whether the post worker is in use + panic("todo") + /*// TODO: indicate whether the post worker is in use wstat, err := nodeApi.WorkerStats(ctx) if err != nil { return err @@ -86,7 +87,7 @@ var infoCmd = &cli.Command{ fmt.Printf("\tAddPiece: %d\n", wstat.AddPieceWait) fmt.Printf("\tPreCommit: %d\n", wstat.PreCommitWait) fmt.Printf("\tCommit: %d\n", wstat.CommitWait) - fmt.Printf("\tUnseal: %d\n", wstat.UnsealWait) + fmt.Printf("\tUnseal: %d\n", wstat.UnsealWait)*/ ps, err := api.StateMinerPostState(ctx, maddr, types.EmptyTSK) if err != nil { diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index c1e24717b..bfd26572b 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -7,7 +7,6 @@ import ( "fmt" "io/ioutil" "os" - "path/filepath" "strconv" "github.com/filecoin-project/specs-actors/actors/builtin" @@ -19,11 +18,8 @@ import ( "github.com/filecoin-project/go-address" cborutil "github.com/filecoin-project/go-cbor-util" paramfetch "github.com/filecoin-project/go-paramfetch" - "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" - badger "github.com/ipfs/go-ds-badger2" "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/peer" "github.com/mitchellh/go-homedir" @@ -38,11 +34,11 @@ import ( "github.com/filecoin-project/lotus/genesis" "github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/node/config" - "github.com/filecoin-project/lotus/node/modules" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage/sealing" + "github.com/filecoin-project/lotus/storage/sealmgr/advmgr" ) var initCmd = &cli.Command{ @@ -77,7 +73,7 @@ var initCmd = &cli.Command{ Usage: "specify sector size to use", Value: uint64(build.SectorSizes[0]), }, - &cli.StringFlag{ + &cli.StringSliceFlag{ Name: "pre-sealed-sectors", Usage: "specify set of presealed sectors for starting as a genesis miner", }, @@ -159,60 +155,33 @@ var initCmd = &cli.Command{ return err } - if pssb := cctx.String("pre-sealed-sectors"); pssb != "" { - pssb, err := homedir.Expand(pssb) - if err != nil { - return err - } + if pssb := cctx.StringSlice("pre-sealed-sectors"); len(pssb) != 0 { + log.Infof("Setting up storage config with presealed sector", pssb) - log.Infof("moving pre-sealed-sectors from %s into newly created storage miner repo", pssb) lr, err := r.Lock(repo.StorageMiner) if err != nil { return err } - mds, err := lr.Datastore("/metadata") + sc, err := lr.GetStorage() if err != nil { - return err + return xerrors.Errorf("get storage config: %w", err) } - bopts := badger.DefaultOptions - bopts.ReadOnly = true - oldmds, err := badger.NewDatastore(filepath.Join(pssb, "badger"), &bopts) - if err != nil { - return err + for _, psp := range pssb { + psp, err := homedir.Expand(psp) + if err != nil { + return err + } + sc.StoragePaths = append(sc.StoragePaths, config.LocalPath{ + Path: psp, + }) } - ppt, spt, err := lapi.ProofTypeFromSectorSize(ssize) - if err != nil { - return err + if err := lr.SetStorage(sc); err != nil { + return xerrors.Errorf("set storage config: %w", err) } - oldsb, err := sectorbuilder.New(§orbuilder.Config{ - SealProofType: spt, - PoStProofType: ppt, - WorkerThreads: 2, - Paths: sectorbuilder.SimplePath(pssb), - }, namespace.Wrap(oldmds, datastore.NewKey("/sectorbuilder"))) - if err != nil { - return xerrors.Errorf("failed to open up preseal sectorbuilder: %w", err) - } - - nsb, err := sectorbuilder.New(§orbuilder.Config{ - SealProofType: spt, - PoStProofType: ppt, - WorkerThreads: 2, - Paths: sectorbuilder.SimplePath(lr.Path()), - }, namespace.Wrap(mds, datastore.NewKey("/sectorbuilder"))) - if err != nil { - return xerrors.Errorf("failed to open up sectorbuilder: %w", err) - } - - if err := nsb.ImportFrom(oldsb, symlink); err != nil { - return err - } - if err := lr.Close(); err != nil { - return xerrors.Errorf("unlocking repo after preseal migration: %w", err) - } + panic("persist last sector id somehow") } if err := storageMinerInit(ctx, cctx, api, r); err != nil { @@ -374,30 +343,8 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode, return err } - c, err := lr.Config() - if err != nil { - return err - } - - cfg, ok := c.(*config.StorageMiner) - if !ok { - return xerrors.Errorf("invalid config from repo, got: %T", c) - } - - scfg := sectorbuilder.SimplePath(lr.Path()) - if len(cfg.SectorBuilder.Storage) > 0 { - scfg = cfg.SectorBuilder.Storage - } - - sbcfg, err := modules.SectorBuilderConfig(scfg, 2, false, false)(mds, api) - if err != nil { - return xerrors.Errorf("getting genesis miner sector builder config: %w", err) - } - sb, err := sectorbuilder.New(sbcfg, mds) - if err != nil { - return xerrors.Errorf("failed to set up sectorbuilder for genesis mining: %w", err) - } - epp := storage.NewElectionPoStProver(sb) + smgr := advmgr.New(lr) + epp := storage.NewElectionPoStProver(smgr) m := miner.NewMiner(api, epp) { diff --git a/documentation/en/local-dev-net.md b/documentation/en/local-dev-net.md index 2bf2b35c3..b992d9bff 100644 --- a/documentation/en/local-dev-net.md +++ b/documentation/en/local-dev-net.md @@ -6,15 +6,15 @@ Build the Lotus Binaries in debug mode, This enables the use of 1024 byte sector make debug ``` -Download the 1024 byte parameters: +Download the 2048 byte parameters: ```sh -./lotus fetch-params --proving-params 1024 +./lotus fetch-params --proving-params 2048 ``` Pre-seal some sectors: ```sh -./lotus-seed pre-seal --sector-size 1024 --num-sectors 2 +./lotus-seed pre-seal --sector-size 2048 --num-sectors 2 ``` Create the genesis block and start up the first node: @@ -34,7 +34,7 @@ Then, in another console, import the genesis miner key: Set up the genesis miner: ```sh -./lotus-storage-miner init --genesis-miner --actor=t01000 --sector-size=1024 --pre-sealed-sectors=~/.genesis-sectors --pre-sealed-metadata=~/.genesis-sectors/pre-seal-t0101.json --nosync +./lotus-storage-miner init --genesis-miner --actor=t01000 --sector-size=2048 --pre-sealed-sectors=~/.genesis-sectors --pre-sealed-metadata=~/.genesis-sectors/pre-seal-t0101.json --nosync ``` Now, finally, start up the miner: diff --git a/markets/retrievaladapter/provider.go b/markets/retrievaladapter/provider.go index ec7505b66..a586cf01d 100644 --- a/markets/retrievaladapter/provider.go +++ b/markets/retrievaladapter/provider.go @@ -12,18 +12,19 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/storage" + "github.com/filecoin-project/lotus/storage/sealmgr" ) type retrievalProviderNode struct { - miner *storage.Miner - sb sectorbuilder.Interface - full api.FullNode + miner *storage.Miner + sealer sealmgr.Manager + full api.FullNode } // NewRetrievalProviderNode returns a new node adapter for a retrieval provider that talks to the // Lotus Node -func NewRetrievalProviderNode(miner *storage.Miner, sb sectorbuilder.Interface, full api.FullNode) retrievalmarket.RetrievalProviderNode { - return &retrievalProviderNode{miner, sb, full} +func NewRetrievalProviderNode(miner *storage.Miner, sealer sealmgr.Manager, full api.FullNode) retrievalmarket.RetrievalProviderNode { + return &retrievalProviderNode{miner, sealer, full} } func (rpn *retrievalProviderNode) UnsealSector(ctx context.Context, sectorID uint64, offset uint64, length uint64) (io.ReadCloser, error) { @@ -31,7 +32,7 @@ func (rpn *retrievalProviderNode) UnsealSector(ctx context.Context, sectorID uin if err != nil { return nil, err } - return rpn.sb.ReadPieceFromSealedSector(ctx, abi.SectorNumber(sectorID), sectorbuilder.UnpaddedByteIndex(offset), abi.UnpaddedPieceSize(length), si.Ticket.Value, *si.CommD) + return rpn.sealer.ReadPieceFromSealedSector(ctx, abi.SectorNumber(sectorID), sectorbuilder.UnpaddedByteIndex(offset), abi.UnpaddedPieceSize(length), si.Ticket.Value, *si.CommD) } func (rpn *retrievalProviderNode) SavePaymentVoucher(ctx context.Context, paymentChannel address.Address, voucher *paych.SignedVoucher, proof []byte, expectedAmount abi.TokenAmount) (abi.TokenAmount, error) { diff --git a/node/builder.go b/node/builder.go index 6d153ba10..4a17ef7bf 100644 --- a/node/builder.go +++ b/node/builder.go @@ -253,7 +253,6 @@ func Online() Option { // Storage miner ApplyIf(func(s *Settings) bool { return s.nodeType == repo.StorageMiner }, - Override(new(sectorbuilder.Interface), modules.SectorBuilder), Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks), Override(new(sealing.TicketFn), modules.SealTicketGen), Override(new(*storage.Miner), modules.StorageMiner), @@ -346,30 +345,13 @@ func ConfigFullNode(c interface{}) Option { ) } -func ConfigStorageMiner(c interface{}, lr repo.LockedRepo) Option { +func ConfigStorageMiner(c interface{}) Option { cfg, ok := c.(*config.StorageMiner) if !ok { return Error(xerrors.Errorf("invalid config from repo, got: %T", c)) } - scfg := sectorbuilder.SimplePath(lr.Path()) - if cfg.SectorBuilder.Path == "" { - if len(cfg.SectorBuilder.Storage) > 0 { - scfg = cfg.SectorBuilder.Storage - } - } else { - scfg = sectorbuilder.SimplePath(cfg.SectorBuilder.Path) - log.Warn("LEGACY SectorBuilder.Path FOUND IN CONFIG. Please use the new storage config") - } - - return Options( - ConfigCommon(&cfg.Common), - - Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig(scfg, - cfg.SectorBuilder.WorkerCount, - cfg.SectorBuilder.DisableLocalPreCommit, - cfg.SectorBuilder.DisableLocalCommit)), - ) + return Options(ConfigCommon(&cfg.Common)) } func Repo(r repo.Repo) Option { @@ -387,7 +369,7 @@ func Repo(r repo.Repo) Option { Override(new(repo.LockedRepo), modules.LockedRepo(lr)), // module handles closing ApplyIf(isType(repo.FullNode), ConfigFullNode(c)), - ApplyIf(isType(repo.StorageMiner), ConfigStorageMiner(c, lr)), + ApplyIf(isType(repo.StorageMiner), ConfigStorageMiner(c)), Override(new(dtypes.MetadataDS), modules.Datastore), Override(new(dtypes.ChainBlockstore), modules.ChainBlockstore), diff --git a/node/config/def.go b/node/config/def.go index 43007b684..dafef3f4d 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -3,8 +3,6 @@ package config import ( "encoding" "time" - - "github.com/filecoin-project/go-sectorbuilder/fs" ) // Common is common config between full node and miner @@ -25,7 +23,7 @@ type FullNode struct { type StorageMiner struct { Common - SectorBuilder SectorBuilder + Storage Storage } // API contains configs for API endpoint @@ -54,14 +52,8 @@ type Metrics struct { } // // Storage Miner +type Storage struct { -type SectorBuilder struct { - Path string // TODO: remove // FORK (-ish) - Storage []fs.PathConfig - WorkerCount uint - - DisableLocalPreCommit bool - DisableLocalCommit bool } func defCommon() Common { @@ -95,9 +87,7 @@ func DefaultStorageMiner() *StorageMiner { cfg := &StorageMiner{ Common: defCommon(), - SectorBuilder: SectorBuilder{ - WorkerCount: 5, - }, + Storage: Storage{}, } cfg.Common.API.ListenAddress = "/ip4/127.0.0.1/tcp/2345/http" return cfg diff --git a/node/config/storage.go b/node/config/storage.go new file mode 100644 index 000000000..57bf80bff --- /dev/null +++ b/node/config/storage.go @@ -0,0 +1,68 @@ +package config + +import ( + "encoding/json" + "io" + "io/ioutil" + "os" + + "golang.org/x/xerrors" +) + +type LocalPath struct { + Path string +} + +// .lotusstorage/storage.json +type StorageConfig struct { + StoragePaths []LocalPath +} + +// [path]/metadata.json +type StorageMeta struct { + ID string + Weight int // 0 = readonly + + CanCommit bool + CanStore bool +} + +func StorageFromFile(path string, def *StorageConfig) (*StorageConfig, error) { + file, err := os.Open(path) + switch { + case os.IsNotExist(err): + if def == nil { + return nil, xerrors.Errorf("couldn't load storage config: %w", err) + } + return def, nil + case err != nil: + return nil, err + } + + defer file.Close() //nolint:errcheck // The file is RO + return StorageFromReader(file, *def) +} + +func StorageFromReader(reader io.Reader, def StorageConfig) (*StorageConfig, error) { + cfg := def + err := json.NewDecoder(reader).Decode(&cfg) + if err != nil { + return nil, err + } + + return &cfg, nil +} + +func WriteStorageFile(path string, config StorageConfig) error { + b, err := json.Marshal(config) + if err != nil { + return xerrors.Errorf("marshaling storage config: %w", err) + } + + if err := ioutil.WriteFile(path, b, 0644); err != nil { + return xerrors.Errorf("persisting storage config (%s): %w", path, err) + } + + return nil +} + diff --git a/node/impl/storminer.go b/node/impl/storminer.go index e115c089e..e16510d34 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -3,25 +3,18 @@ package impl import ( "context" "encoding/json" - "io" - "mime" "net/http" - "os" "strconv" "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/specs-actors/actors/abi" - "github.com/gorilla/mux" - files "github.com/ipfs/go-ipfs-files" - "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-sectorbuilder" - "github.com/filecoin-project/go-sectorbuilder/fs" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/gorilla/mux" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api/apistruct" - "github.com/filecoin-project/lotus/lib/tarutil" "github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage/sectorblocks" @@ -31,7 +24,7 @@ type StorageMinerAPI struct { CommonAPI SectorBuilderConfig *sectorbuilder.Config - SectorBuilder sectorbuilder.Interface + //SectorBuilder sectorbuilder.Interface SectorBlocks *sectorblocks.SectorBlocks Miner *storage.Miner @@ -57,7 +50,8 @@ func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) { } func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Request) { - vars := mux.Vars(r) + panic("todo") +/* vars := mux.Vars(r) id, err := strconv.ParseUint(vars["id"], 10, 64) if err != nil { @@ -98,11 +92,12 @@ func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Reques if _, err := io.Copy(w, rd); err != nil { log.Error(err) return - } + }*/ } func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Request) { - vars := mux.Vars(r) + panic("todo") +/* vars := mux.Vars(r) id, err := strconv.ParseUint(vars["id"], 10, 64) if err != nil { @@ -159,13 +154,13 @@ func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Reques w.WriteHeader(200) - log.Infof("received %s sector (%s): %d bytes", vars["type"], vars["sname"], r.ContentLength) + log.Infof("received %s sector (%s): %d bytes", vars["type"], vars["sname"], r.ContentLength)*/ } - +/* func (sm *StorageMinerAPI) WorkerStats(context.Context) (sectorbuilder.WorkerStats, error) { stat := sm.SectorBuilder.WorkerStats() return stat, nil -} +}*/ func (sm *StorageMinerAPI) ActorAddress(context.Context) (address.Address, error) { return sm.SectorBuilderConfig.Miner, nil @@ -252,7 +247,7 @@ func (sm *StorageMinerAPI) SectorsRefs(context.Context) (map[string][]api.Sealed func (sm *StorageMinerAPI) SectorsUpdate(ctx context.Context, id abi.SectorNumber, state api.SectorState) error { return sm.Miner.ForceSectorState(ctx, id, state) } - +/* func (sm *StorageMinerAPI) WorkerQueue(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) { return sm.SectorBuilder.AddWorker(ctx, cfg) } @@ -260,5 +255,5 @@ func (sm *StorageMinerAPI) WorkerQueue(ctx context.Context, cfg sectorbuilder.Wo func (sm *StorageMinerAPI) WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error { return sm.SectorBuilder.TaskDone(ctx, task, res) } - +*/ var _ api.StorageMiner = &StorageMinerAPI{} diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 81b98f00b..896d954c7 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -2,11 +2,8 @@ package modules import ( "context" - "math" "reflect" - "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/go-address" dtgraphsync "github.com/filecoin-project/go-data-transfer/impl/graphsync" piecefilestore "github.com/filecoin-project/go-fil-markets/filestore" @@ -20,7 +17,6 @@ import ( smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network" paramfetch "github.com/filecoin-project/go-paramfetch" "github.com/filecoin-project/go-sectorbuilder" - "github.com/filecoin-project/go-sectorbuilder/fs" "github.com/filecoin-project/go-statestore" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/crypto" @@ -37,10 +33,12 @@ import ( "github.com/ipfs/go-merkledag" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/routing" - "github.com/mitchellh/go-homedir" "go.uber.org/fx" "golang.org/x/xerrors" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/storage/sealmgr" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/gen" @@ -75,51 +73,32 @@ func GetParams(sbc *sectorbuilder.Config) error { return nil } -func SectorBuilderConfig(storage []fs.PathConfig, threads uint, noprecommit, nocommit bool) func(dtypes.MetadataDS, api.FullNode) (*sectorbuilder.Config, error) { - return func(ds dtypes.MetadataDS, fnapi api.FullNode) (*sectorbuilder.Config, error) { - minerAddr, err := minerAddrFromDS(ds) - if err != nil { - return nil, err - } - - ssize, err := fnapi.StateMinerSectorSize(context.TODO(), minerAddr, types.EmptyTSK) - if err != nil { - return nil, err - } - - for i := range storage { - storage[i].Path, err = homedir.Expand(storage[i].Path) - if err != nil { - return nil, err - } - } - - if threads > math.MaxUint8 { - return nil, xerrors.Errorf("too many sectorbuilder threads specified: %d, max allowed: %d", threads, math.MaxUint8) - } - - ppt, spt, err := api.ProofTypeFromSectorSize(ssize) - if err != nil { - return nil, xerrors.Errorf("bad sector size: %w", err) - } - - sb := §orbuilder.Config{ - Miner: minerAddr, - SealProofType: spt, - PoStProofType: ppt, - - WorkerThreads: uint8(threads), - NoPreCommit: noprecommit, - NoCommit: nocommit, - - Paths: storage, - } - - return sb, nil +func SectorBuilderConfig(ds dtypes.MetadataDS, fnapi api.FullNode) (*sectorbuilder.Config, error) { + minerAddr, err := minerAddrFromDS(ds) + if err != nil { + return nil, err } + + ssize, err := fnapi.StateMinerSectorSize(context.TODO(), minerAddr, types.EmptyTSK) + if err != nil { + return nil, err + } + + ppt, spt, err := api.ProofTypeFromSectorSize(ssize) + if err != nil { + return nil, xerrors.Errorf("bad sector size: %w", err) + } + + sb := §orbuilder.Config{ + Miner: minerAddr, + SealProofType: spt, + PoStProofType: ppt, + } + + return sb, nil } -func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, sb sectorbuilder.Interface, tktFn sealing.TicketFn) (*storage.Miner, error) { +func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, sealer sealmgr.Manager, tktFn sealing.TicketFn) (*storage.Miner, error) { maddr, err := minerAddrFromDS(ds) if err != nil { return nil, err @@ -132,9 +111,9 @@ func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h return nil, err } - fps := storage.NewFPoStScheduler(api, sb, maddr, worker) + fps := storage.NewFPoStScheduler(api, sealer, maddr, worker) - sm, err := storage.NewMiner(api, maddr, worker, h, ds, sb, tktFn) + sm, err := storage.NewMiner(api, maddr, worker, h, ds, sealer, tktFn) if err != nil { return nil, err } @@ -271,15 +250,6 @@ func SetupBlockProducer(lc fx.Lifecycle, ds dtypes.MetadataDS, api api.FullNode, return m, nil } -func SectorBuilder(cfg *sectorbuilder.Config, ds dtypes.MetadataDS) (*sectorbuilder.SectorBuilder, error) { - sb, err := sectorbuilder.New(cfg, namespace.Wrap(ds, datastore.NewKey("/sectorbuilder"))) - if err != nil { - return nil, err - } - - return sb, nil -} - func SealTicketGen(fapi api.FullNode) sealing.TicketFn { return func(ctx context.Context) (*api.SealTicket, error) { ts, err := fapi.ChainHead(ctx) @@ -333,8 +303,8 @@ func StorageProvider(ctx helpers.MetricsCtx, fapi api.FullNode, h host.Host, ds } // RetrievalProvider creates a new retrieval provider attached to the provider blockstore -func RetrievalProvider(h host.Host, miner *storage.Miner, sb sectorbuilder.Interface, full api.FullNode, ds dtypes.MetadataDS, pieceStore dtypes.ProviderPieceStore, ibs dtypes.StagingBlockstore) (retrievalmarket.RetrievalProvider, error) { - adapter := retrievaladapter.NewRetrievalProviderNode(miner, sb, full) +func RetrievalProvider(h host.Host, miner *storage.Miner, sealer sealmgr.Manager, full api.FullNode, ds dtypes.MetadataDS, pieceStore dtypes.ProviderPieceStore, ibs dtypes.StagingBlockstore) (retrievalmarket.RetrievalProvider, error) { + adapter := retrievaladapter.NewRetrievalProviderNode(miner, sealer, full) address, err := minerAddrFromDS(ds) if err != nil { return nil, err diff --git a/node/repo/fsrepo.go b/node/repo/fsrepo.go index ceab72f0a..881acad63 100644 --- a/node/repo/fsrepo.go +++ b/node/repo/fsrepo.go @@ -28,6 +28,7 @@ const ( fsAPI = "api" fsAPIToken = "token" fsConfig = "config.toml" + fsStorageConfig = "storage.json" fsDatastore = "datastore" fsLock = "repo.lock" fsKeystore = "keystore" @@ -87,7 +88,7 @@ func (fsr *FsRepo) Exists() (bool, error) { func (fsr *FsRepo) Init(t RepoType) error { exist, err := fsr.Exists() - if err != nil { + if err != nil{ return err } if exist { @@ -276,6 +277,18 @@ func (fsr *fsLockedRepo) Config() (interface{}, error) { return config.FromFile(fsr.join(fsConfig), defConfForType(fsr.repoType)) } +func (fsr *fsLockedRepo) GetStorage() (config.StorageConfig, error) { + c, err := config.StorageFromFile(fsr.join(fsStorageConfig), nil) + if err != nil { + return config.StorageConfig{}, err + } + return *c, nil +} + +func (fsr *fsLockedRepo) SetStorage(c config.StorageConfig) error { + return config.WriteStorageFile(fsr.join(fsStorageConfig), c) +} + func (fsr *fsLockedRepo) SetAPIEndpoint(ma multiaddr.Multiaddr) error { if err := fsr.stillValid(); err != nil { return err diff --git a/node/repo/interface.go b/node/repo/interface.go index b4c2b0950..58d6a7fdd 100644 --- a/node/repo/interface.go +++ b/node/repo/interface.go @@ -7,6 +7,7 @@ import ( "github.com/multiformats/go-multiaddr" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/node/config" ) var ( @@ -37,6 +38,9 @@ type LockedRepo interface { // Returns config in this repo Config() (interface{}, error) + GetStorage() (config.StorageConfig, error) + SetStorage(config.StorageConfig) error + // SetAPIEndpoint sets the endpoint of the current API // so it can be read by API clients SetAPIEndpoint(multiaddr.Multiaddr) error diff --git a/node/repo/memrepo.go b/node/repo/memrepo.go index 6cc54ddb9..d1b491831 100644 --- a/node/repo/memrepo.go +++ b/node/repo/memrepo.go @@ -12,6 +12,7 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/node/config" ) type MemRepo struct { @@ -38,6 +39,14 @@ type lockedMemRepo struct { token *byte } +func (lmem *lockedMemRepo) GetStorage() (config.StorageConfig, error) { + panic("implement me") +} + +func (lmem *lockedMemRepo) SetStorage(config.StorageConfig) error { + panic("implement me") +} + func (lmem *lockedMemRepo) Path() string { t, err := ioutil.TempDir(os.TempDir(), "lotus-memrepo-temp-") if err != nil { @@ -169,6 +178,10 @@ func (lmem *lockedMemRepo) Config() (interface{}, error) { return lmem.mem.configF(lmem.t), nil } +func (lmem *lockedMemRepo) Storage() (config.StorageConfig, error) { + panic("implement me") +} + func (lmem *lockedMemRepo) SetAPIEndpoint(ma multiaddr.Multiaddr) error { if err := lmem.checkToken(); err != nil { return err diff --git a/storage/fpost_run.go b/storage/fpost_run.go index 1ac99a3e7..5110d00b8 100644 --- a/storage/fpost_run.go +++ b/storage/fpost_run.go @@ -90,7 +90,9 @@ func (s *FPoStScheduler) declareFaults(ctx context.Context, fc uint64, params *m } func (s *FPoStScheduler) checkFaults(ctx context.Context, ssi []abi.SectorNumber) ([]abi.SectorNumber, error) { - faults := s.sb.Scrub(ssi) + //faults := s.sb.Scrub(ssi) + log.Warnf("Stub checkFaults") + var faults []struct{SectorNum abi.SectorNumber; Err error} declaredFaults := map[abi.SectorNumber]struct{}{} diff --git a/storage/fpost_sched.go b/storage/fpost_sched.go index 27eb7eb11..506092015 100644 --- a/storage/fpost_sched.go +++ b/storage/fpost_sched.go @@ -22,7 +22,7 @@ const StartConfidence = 4 // TODO: config type FPoStScheduler struct { api storageMinerApi - sb sectorbuilder.Interface + sb sectorbuilder.Prover actor address.Address worker address.Address @@ -37,7 +37,7 @@ type FPoStScheduler struct { failLk sync.Mutex } -func NewFPoStScheduler(api storageMinerApi, sb sectorbuilder.Interface, actor address.Address, worker address.Address) *FPoStScheduler { +func NewFPoStScheduler(api storageMinerApi, sb sectorbuilder.Prover, actor address.Address, worker address.Address) *FPoStScheduler { return &FPoStScheduler{api: api, sb: sb, actor: actor, worker: worker} } diff --git a/storage/miner.go b/storage/miner.go index 41088908d..32ef250bb 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -13,6 +13,8 @@ import ( "golang.org/x/xerrors" ffi "github.com/filecoin-project/filecoin-ffi" + "github.com/filecoin-project/lotus/storage/sealmgr" + "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/builtin/miner" @@ -30,11 +32,11 @@ import ( var log = logging.Logger("storageminer") type Miner struct { - api storageMinerApi - h host.Host - sb sectorbuilder.Interface - ds datastore.Batching - tktFn sealing.TicketFn + api storageMinerApi + h host.Host + sealer sealmgr.Manager + ds datastore.Batching + tktFn sealing.TicketFn maddr address.Address worker address.Address @@ -71,13 +73,13 @@ type storageMinerApi interface { WalletHas(context.Context, address.Address) (bool, error) } -func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, ds datastore.Batching, sb sectorbuilder.Interface, tktFn sealing.TicketFn) (*Miner, error) { +func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, ds datastore.Batching, sealer sealmgr.Manager, tktFn sealing.TicketFn) (*Miner, error) { m := &Miner{ - api: api, - h: h, - sb: sb, - ds: ds, - tktFn: tktFn, + api: api, + h: h, + sealer: sealer, + ds: ds, + tktFn: tktFn, maddr: maddr, worker: worker, @@ -92,7 +94,7 @@ func (m *Miner) Run(ctx context.Context) error { } evts := events.NewEvents(ctx, m.api) - m.sealing = sealing.New(m.api, evts, m.maddr, m.worker, m.ds, m.sb, m.tktFn) + m.sealing = sealing.New(m.api, evts, m.maddr, m.worker, m.ds, m.sealer, m.tktFn) go m.sealing.Run(ctx) @@ -119,10 +121,10 @@ func (m *Miner) runPreflightChecks(ctx context.Context) error { } type SectorBuilderEpp struct { - sb sectorbuilder.Interface + prover sectorbuilder.Prover } -func NewElectionPoStProver(sb sectorbuilder.Interface) *SectorBuilderEpp { +func NewElectionPoStProver(sb sectorbuilder.Prover) *SectorBuilderEpp { return &SectorBuilderEpp{sb} } @@ -132,7 +134,7 @@ func (epp *SectorBuilderEpp) GenerateCandidates(ctx context.Context, ssi []abi.S start := time.Now() var faults []abi.SectorNumber // TODO - cds, err := epp.sb.GenerateEPostCandidates(ssi, rand, faults) + cds, err := epp.prover.GenerateEPostCandidates(ssi, rand, faults) if err != nil { return nil, xerrors.Errorf("failed to generate candidates: %w", err) } @@ -152,7 +154,7 @@ func (epp *SectorBuilderEpp) ComputeProof(ctx context.Context, ssi []abi.SectorI } start := time.Now() - proof, err := epp.sb.ComputeElectionPoSt(ssi, rand, owins) + proof, err := epp.prover.ComputeElectionPoSt(ssi, rand, owins) if err != nil { return nil, err } diff --git a/storage/sealing/garbage.go b/storage/sealing/garbage.go index 660a2604a..42d1bc80a 100644 --- a/storage/sealing/garbage.go +++ b/storage/sealing/garbage.go @@ -25,7 +25,7 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID abi.SectorNumber, e out := make([]Piece, len(sizes)) for i, size := range sizes { - ppi, err := m.sb.AddPiece(ctx, size, sectorID, m.pledgeReader(size), existingPieceSizes) + ppi, err := m.sealer.AddPiece(ctx, size, sectorID, m.pledgeReader(size), existingPieceSizes) if err != nil { return nil, xerrors.Errorf("add piece: %w", err) } @@ -47,15 +47,15 @@ func (m *Sealing) PledgeSector() error { // this, as we run everything here async, and it's cancelled when the // command exits - size := abi.PaddedPieceSize(m.sb.SectorSize()).Unpadded() + size := abi.PaddedPieceSize(m.sealer.SectorSize()).Unpadded() - rt, _, err := api.ProofTypeFromSectorSize(m.sb.SectorSize()) + rt, _, err := api.ProofTypeFromSectorSize(m.sealer.SectorSize()) if err != nil { log.Error(err) return } - sid, err := m.sb.AcquireSectorNumber() + sid, err := m.sealer.NewSector() if err != nil { log.Errorf("%+v", err) return diff --git a/storage/sealing/sealing.go b/storage/sealing/sealing.go index cd2f6ff5f..a7d11586a 100644 --- a/storage/sealing/sealing.go +++ b/storage/sealing/sealing.go @@ -12,7 +12,6 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/go-padreader" - "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/crypto" @@ -22,6 +21,7 @@ import ( "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/lib/statemachine" + "github.com/filecoin-project/lotus/storage/sealmgr" ) const SectorStorePrefix = "/sectors" @@ -65,19 +65,19 @@ type Sealing struct { maddr address.Address worker address.Address - sb sectorbuilder.Interface + sealer sealmgr.Manager sectors *statemachine.StateGroup tktFn TicketFn } -func New(api sealingApi, events *events.Events, maddr address.Address, worker address.Address, ds datastore.Batching, sb sectorbuilder.Interface, tktFn TicketFn) *Sealing { +func New(api sealingApi, events *events.Events, maddr address.Address, worker address.Address, ds datastore.Batching, sealer sealmgr.Manager, tktFn TicketFn) *Sealing { s := &Sealing{ api: api, events: events, maddr: maddr, worker: worker, - sb: sb, + sealer: sealer, tktFn: tktFn, } @@ -104,7 +104,7 @@ func (m *Sealing) AllocatePiece(size abi.UnpaddedPieceSize) (sectorID abi.Sector return 0, 0, xerrors.Errorf("cannot allocate unpadded piece") } - sid, err := m.sb.AcquireSectorNumber() // TODO: Put more than one thing in a sector + sid, err := m.sealer.NewSector() // TODO: Put more than one thing in a sector if err != nil { return 0, 0, xerrors.Errorf("acquiring sector ID: %w", err) } @@ -116,12 +116,12 @@ func (m *Sealing) AllocatePiece(size abi.UnpaddedPieceSize) (sectorID abi.Sector func (m *Sealing) SealPiece(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, sectorID abi.SectorNumber, dealID abi.DealID) error { log.Infof("Seal piece for deal %d", dealID) - ppi, err := m.sb.AddPiece(ctx, size, sectorID, r, []abi.UnpaddedPieceSize{}) + ppi, err := m.sealer.AddPiece(ctx, size, sectorID, r, []abi.UnpaddedPieceSize{}) if err != nil { return xerrors.Errorf("adding piece to sector: %w", err) } - _, rt, err := api.ProofTypeFromSectorSize(m.sb.SectorSize()) + _, rt, err := api.ProofTypeFromSectorSize(m.sealer.SectorSize()) if err != nil { return xerrors.Errorf("bad sector size: %w", err) } diff --git a/storage/sealing/states.go b/storage/sealing/states.go index b3f033a9d..d7966a373 100644 --- a/storage/sealing/states.go +++ b/storage/sealing/states.go @@ -5,7 +5,6 @@ import ( "github.com/filecoin-project/specs-actors/actors/crypto" - "github.com/filecoin-project/go-sectorbuilder/fs" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin/miner" @@ -26,7 +25,7 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err allocated += piece.Size } - ubytes := abi.PaddedPieceSize(m.sb.SectorSize()).Unpadded() + ubytes := abi.PaddedPieceSize(m.sealer.SectorSize()).Unpadded() if allocated > ubytes { return xerrors.Errorf("too much data in sector: %d > %d", allocated, ubytes) @@ -70,7 +69,12 @@ func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) er return ctx.Send(SectorSealFailed{xerrors.Errorf("getting ticket failed: %w", err)}) } - sealed, unsealed, err := m.sb.SealPreCommit(ctx.Context(), sector.SectorID, ticket.Value, sector.pieceInfos()) + pc1o, err := m.sealer.SealPreCommit1(ctx.Context(), sector.SectorID, ticket.Value, sector.pieceInfos()) + if err != nil { + return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit failed: %w", err)}) + } + + sealed, unsealed, err := m.sealer.SealPreCommit2(ctx.Context(), sector.SectorID, pc1o) if err != nil { return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit failed: %w", err)}) } @@ -180,7 +184,12 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) log.Infof("KOMIT %d %x(%d); %x(%d); %v; r:%x; d:%x", sector.SectorID, sector.Ticket.Value, sector.Ticket.Epoch, sector.Seed.Value, sector.Seed.Epoch, sector.pieceInfos(), sector.CommR, sector.CommD) - proof, err := m.sb.SealCommit(ctx.Context(), sector.SectorID, sector.Ticket.Value, sector.Seed.Value, sector.pieceInfos(), *sector.CommR, *sector.CommD) + c2in, err := m.sealer.SealCommit1(ctx.Context(), sector.SectorID, sector.Ticket.Value, sector.Seed.Value, sector.pieceInfos(), *sector.CommR, *sector.CommD) + if err != nil { + return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed: %w", err)}) + } + + proof, err := m.sealer.SealCommit2(ctx.Context(), sector.SectorID, c2in) if err != nil { return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed: %w", err)}) } @@ -241,15 +250,8 @@ func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) func (m *Sealing) handleFinalizeSector(ctx statemachine.Context, sector SectorInfo) error { // TODO: Maybe wait for some finality - if err := m.sb.FinalizeSector(ctx.Context(), sector.SectorID); err != nil { - if !xerrors.Is(err, fs.ErrNoSuitablePath) { - return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)}) - } - log.Warnf("finalize sector: %v", err) - } - - if err := m.sb.DropStaged(ctx.Context(), sector.SectorID); err != nil { - return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("drop staged: %w", err)}) + if err := m.sealer.FinalizeSector(ctx.Context(), sector.SectorID); err != nil { + return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)}) } return ctx.Send(SectorFinalized{}) diff --git a/storage/sealing/types.go b/storage/sealing/types.go index b25a43ef8..4e9e0dccc 100644 --- a/storage/sealing/types.go +++ b/storage/sealing/types.go @@ -1,9 +1,10 @@ package sealing import ( - "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/ipfs/go-cid" + + "github.com/filecoin-project/lotus/api" ) type Piece struct { diff --git a/storage/sealmgr/advmgr/lotus_storage_manager.go b/storage/sealmgr/advmgr/lotus_storage_manager.go new file mode 100644 index 000000000..bd6407a0e --- /dev/null +++ b/storage/sealmgr/advmgr/lotus_storage_manager.go @@ -0,0 +1,102 @@ +package advmgr + +import ( + "context" + "io" + "sync" + + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/go-sectorbuilder" + "github.com/filecoin-project/specs-actors/actors/abi" + + ffi "github.com/filecoin-project/filecoin-ffi" + "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/lotus/storage/sealmgr" +) + +type LocalStorage interface { + GetStorage() (config.StorageConfig, error) + SetStorage(config.StorageConfig) error +} + +type Path struct { + ID string + Weight uint64 + + LocalPath string + + CanSeal bool + CanStore bool +} + +type Worker interface { + sealmgr.Worker + + Paths() []Path +} + +type Manager struct { + workers []sealmgr.Worker + + localLk sync.RWMutex + localStorage LocalStorage +} + +func (m Manager) SectorSize() abi.SectorSize { + panic("implement me") +} + +func (m Manager) NewSector() (abi.SectorNumber, error) { + panic("implement me") +} + +func (m Manager) ReadPieceFromSealedSector(context.Context, abi.SectorNumber, sectorbuilder.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error) { + panic("implement me") +} + +func (m Manager) AddPiece(context.Context, abi.UnpaddedPieceSize, abi.SectorNumber, io.Reader, []abi.UnpaddedPieceSize) (abi.PieceInfo, error) { + panic("implement me") +} + +func (m Manager) SealPreCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out []byte, err error) { + panic("implement me") +} + +func (m Manager) SealPreCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out []byte) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) { + panic("implement me") +} + +func (m Manager) SealCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, sealedCID cid.Cid, unsealedCID cid.Cid) (output []byte, err error) { + panic("implement me") +} + +func (m Manager) SealCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out []byte) (proof []byte, err error) { + panic("implement me") +} + +func (m Manager) FinalizeSector(context.Context, abi.SectorNumber) error { + panic("implement me") +} + +func (m Manager) GenerateEPostCandidates(sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, faults []abi.SectorNumber) ([]ffi.PoStCandidateWithTicket, error) { + panic("implement me") +} + +func (m Manager) GenerateFallbackPoSt(sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, faults []abi.SectorNumber) ([]ffi.PoStCandidateWithTicket, []abi.PoStProof, error) { + panic("implement me") +} + +func (m Manager) ComputeElectionPoSt(sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, winners []abi.PoStCandidate) ([]abi.PoStProof, error) { + panic("implement me") +} + +func New(ls LocalStorage) *Manager { + return &Manager{ + workers: nil, + + localStorage: ls, + } +} + +var _ sealmgr.Manager = &Manager{} diff --git a/storage/sealmgr/simple.go b/storage/sealmgr/simple.go index 7d8f13298..4d0292e65 100644 --- a/storage/sealmgr/simple.go +++ b/storage/sealmgr/simple.go @@ -26,15 +26,13 @@ type Simple struct { worker Worker } -func NewSimpleManager(sc *storedcounter.StoredCounter, maddr address.Address, sb sectorbuilder.Basic) (*Simple, error) { - mid, err := address.IDFromAddress(maddr) - if err != nil { - return nil, xerrors.Errorf("get miner id: %w", err) - } +func (s *Simple) SectorSize() abi.SectorSize { + panic("implement me") +} +func NewSimpleManager(sc *storedcounter.StoredCounter, maddr address.Address, sb sectorbuilder.Basic) (*Simple, error) { w := &LocalWorker{ - sealer: sb, - mid: abi.ActorID(mid), + sb, } return &Simple{ @@ -44,49 +42,71 @@ func NewSimpleManager(sc *storedcounter.StoredCounter, maddr address.Address, sb }, nil } -func (s *Simple) NewSector() (SectorInfo, error) { +func (s *Simple) NewSector() (abi.SectorNumber, error) { n, err := s.sc.Next() if err != nil { - return SectorInfo{}, xerrors.Errorf("acquire sector number: %w", err) + return 0, xerrors.Errorf("acquire sector number: %w", err) } - mid, err := address.IDFromAddress(s.maddr) - if err != nil { - return SectorInfo{}, xerrors.Errorf("get miner id: %w", err) - } - - return SectorInfo{ - ID: abi.SectorID{ - Miner: abi.ActorID(mid), - Number: abi.SectorNumber(n), - }, - }, nil + return abi.SectorNumber(n), nil } -func (s *Simple) AddPiece(ctx context.Context, si SectorInfo, sz abi.UnpaddedPieceSize, r io.Reader) (cid.Cid, SectorInfo, error) { +func (s *Simple) AddPiece(ctx context.Context, sz abi.UnpaddedPieceSize, sectorNum abi.SectorNumber, r io.Reader, existingPieces []abi.UnpaddedPieceSize) (abi.PieceInfo, error) { s.rateLimiter.Lock() defer s.rateLimiter.Unlock() - return s.worker.AddPiece(ctx, si, sz, r) + return s.worker.AddPiece(ctx, sz, sectorNum, r, existingPieces) } -func (s *Simple) RunSeal(ctx context.Context, task TaskType, si SectorInfo) (SectorInfo, error) { +func (s *Simple) SealPreCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out []byte, err error) { s.rateLimiter.Lock() defer s.rateLimiter.Unlock() - return s.worker.Run(ctx, task, si) + return s.worker.SealPreCommit1(ctx, sectorNum, ticket, pieces) +} + +func (s *Simple) SealPreCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out []byte) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) { + s.rateLimiter.Lock() + defer s.rateLimiter.Unlock() + + return s.worker.SealPreCommit2(ctx, sectorNum, phase1Out) +} + +func (s *Simple) SealCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, sealedCID cid.Cid, unsealedCID cid.Cid) (output []byte, err error) { + s.rateLimiter.Lock() + defer s.rateLimiter.Unlock() + + return s.worker.SealCommit1(ctx, sectorNum, ticket, seed, pieces, sealedCID, unsealedCID) +} + +func (s *Simple) SealCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out []byte) (proof []byte, err error) { + s.rateLimiter.Lock() + defer s.rateLimiter.Unlock() + + return s.worker.SealCommit2(ctx, sectorNum, phase1Out) +} + +func (s *Simple) FinalizeSector(ctx context.Context, sectorNum abi.SectorNumber) error { + s.rateLimiter.Lock() + defer s.rateLimiter.Unlock() + + return s.worker.FinalizeSector(ctx, sectorNum) } func (s *Simple) GenerateEPostCandidates(sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, faults []abi.SectorNumber) ([]ffi.PoStCandidateWithTicket, error) { - return s.worker.(*LocalWorker).sealer.GenerateEPostCandidates(sectorInfo, challengeSeed, faults) + return s.worker.GenerateEPostCandidates(sectorInfo, challengeSeed, faults) } func (s *Simple) GenerateFallbackPoSt(sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, faults []abi.SectorNumber) ([]ffi.PoStCandidateWithTicket, []abi.PoStProof, error) { - return s.worker.(*LocalWorker).sealer.GenerateFallbackPoSt(sectorInfo, challengeSeed, faults) + return s.worker.GenerateFallbackPoSt(sectorInfo, challengeSeed, faults) } func (s *Simple) ComputeElectionPoSt(sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, winners []abi.PoStCandidate) ([]abi.PoStProof, error) { - return s.worker.(*LocalWorker).sealer.ComputeElectionPoSt(sectorInfo, challengeSeed, winners) + return s.worker.ComputeElectionPoSt(sectorInfo, challengeSeed, winners) +} + +func (s *Simple) ReadPieceFromSealedSector(context.Context, abi.SectorNumber, sectorbuilder.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error) { + panic("todo") } var _ Manager = &Simple{} diff --git a/storage/sealmgr/types.go b/storage/sealmgr/types.go index a81346a97..52da12c71 100644 --- a/storage/sealmgr/types.go +++ b/storage/sealmgr/types.go @@ -1,80 +1,32 @@ package sealmgr import ( - "bytes" "context" "io" - "github.com/ipfs/go-cid" - "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/ipfs/go-cid" ) - -type SealTicket struct { - Value abi.SealRandomness - Epoch abi.ChainEpoch -} - -type SealSeed struct { - Value abi.InteractiveSealRandomness - Epoch abi.ChainEpoch -} - -func (st *SealTicket) Equals(ost *SealTicket) bool { - return bytes.Equal(st.Value, ost.Value) && st.Epoch == ost.Epoch -} - -func (st *SealSeed) Equals(ost *SealSeed) bool { - return bytes.Equal(st.Value, ost.Value) && st.Epoch == ost.Epoch -} - -// SectorInfo holds all sector-related metadata -type SectorInfo struct { - ID abi.SectorID - - Pieces []abi.PieceInfo - - Ticket SealTicket - Seed SealSeed - - PreCommit1Out []byte - - Sealed *cid.Cid - Unsealed *cid.Cid - - CommitInput []byte - Proof []byte -} - -func (si SectorInfo) PieceSizes() []abi.UnpaddedPieceSize { - out := make([]abi.UnpaddedPieceSize, len(si.Pieces)) - for i := range out { - out[i] = si.Pieces[i].Size.Unpadded() - } - - return nil -} - type Worker interface { - AddPiece(context.Context, SectorInfo, abi.UnpaddedPieceSize, io.Reader) (cid.Cid, SectorInfo, error) - Run(context.Context, TaskType, SectorInfo) (SectorInfo, error) + sectorbuilder.Sealer + sectorbuilder.Prover } type Manager interface { + SectorSize() abi.SectorSize + // NewSector allocates staging area for data - NewSector() (SectorInfo, error) + // Storage manager forwards proof-related calls + NewSector() (abi.SectorNumber, error) - // AddPiece appends the piece to the specified sector. Returns PieceCID, and - // mutated sector info - // - // Note: The passed reader can support other transfer mechanisms, making - // it possible to move the data between data transfer module and workers - AddPiece(context.Context, SectorInfo, abi.UnpaddedPieceSize, io.Reader) (cid.Cid, SectorInfo, error) + // TODO: Can[Pre]Commit[1,2] + // TODO: Scrub() []Faults - RunSeal(ctx context.Context, task TaskType, si SectorInfo) (SectorInfo, error) + // TODO: Separate iface + ReadPieceFromSealedSector(context.Context, abi.SectorNumber, sectorbuilder.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error) - // Storage manager forwards proving calls + sectorbuilder.Sealer sectorbuilder.Prover } diff --git a/storage/sealmgr/worker_local.go b/storage/sealmgr/worker_local.go index 05435967b..67636766a 100644 --- a/storage/sealmgr/worker_local.go +++ b/storage/sealmgr/worker_local.go @@ -1,74 +1,11 @@ package sealmgr import ( - "context" - "io" - - "github.com/ipfs/go-cid" - "golang.org/x/xerrors" - "github.com/filecoin-project/go-sectorbuilder" - "github.com/filecoin-project/specs-actors/actors/abi" ) type LocalWorker struct { - sealer sectorbuilder.Basic - mid abi.ActorID -} - -func (w *LocalWorker) Run(ctx context.Context, task TaskType, si SectorInfo) (SectorInfo, error) { - if si.ID.Miner != w.mid { - return si, xerrors.Errorf("received a task with wrong actor id; worker for %d, task for %d", w.mid, si.ID.Miner) - } - - switch task { - case TTPreCommit1: - pco, err := w.sealer.SealPreCommit1(ctx, si.ID.Number, si.Ticket.Value, si.Pieces) - if err != nil { - return si, xerrors.Errorf("calling sealer: %w", err) - } - si.PreCommit1Out = pco - case TTPreCommit2: - sealed, unsealed, err := w.sealer.SealPreCommit2(ctx, si.ID.Number, si.PreCommit1Out) - if err != nil { - return si, xerrors.Errorf("calling sealer (precommit2): %w", err) - } - - si.Sealed = &sealed - si.Unsealed = &unsealed - - // We also call Commit1 here as it only grabs some inputs for the snark, - // which is very fast (<1s), and it doesn't really make sense to have a separate - // task type for it - - c2in, err := w.sealer.SealCommit1(ctx, si.ID.Number, si.Ticket.Value, si.Seed.Value, si.Pieces, *si.Sealed, *si.Unsealed) - if err != nil { - return si, xerrors.Errorf("calling sealer (commit1): %w", err) - } - - si.CommitInput = c2in - case TTCommit2: - proof, err := w.sealer.SealCommit2(ctx, si.ID.Number, si.CommitInput) - if err != nil { - return SectorInfo{}, xerrors.Errorf("calling sealer: %w", err) - } - - si.Proof = proof - default: - return si, xerrors.Errorf("unknown task type '%s'", task) - } - - return si, nil -} - -func (w *LocalWorker) AddPiece(ctx context.Context, si SectorInfo, sz abi.UnpaddedPieceSize, r io.Reader) (cid.Cid, SectorInfo, error) { - pi, err := w.sealer.AddPiece(ctx, sz, si.ID.Number, r, si.PieceSizes()) - if err != nil { - return cid.Cid{}, SectorInfo{}, xerrors.Errorf("addPiece on local worker: %w", err) - } - - si.Pieces = append(si.Pieces, pi) - return pi.PieceCID, si, nil + sectorbuilder.Basic } var _ Worker = &LocalWorker{} diff --git a/storage/sectorblocks/blocks.go b/storage/sectorblocks/blocks.go index f485e2860..b8561fffa 100644 --- a/storage/sectorblocks/blocks.go +++ b/storage/sectorblocks/blocks.go @@ -9,7 +9,6 @@ import ( "sync" "github.com/filecoin-project/go-padreader" - sectorbuilder "github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/specs-actors/actors/abi" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" @@ -51,16 +50,14 @@ func DsKeyToDealID(key datastore.Key) (uint64, error) { type SectorBlocks struct { *storage.Miner - sb sectorbuilder.Interface keys datastore.Batching keyLk sync.Mutex } -func NewSectorBlocks(miner *storage.Miner, ds dtypes.MetadataDS, sb sectorbuilder.Interface) *SectorBlocks { +func NewSectorBlocks(miner *storage.Miner, ds dtypes.MetadataDS) *SectorBlocks { sbc := &SectorBlocks{ Miner: miner, - sb: sb, keys: namespace.Wrap(ds, dsPrefix), }