Storage Manager refactor

This commit is contained in:
Łukasz Magiera 2020-03-03 23:19:22 +01:00
parent 3abb59a550
commit a0dbb6bdd6
27 changed files with 459 additions and 584 deletions

View File

@ -1,14 +1,13 @@
package api package api
import ( import (
"bytes"
"context" "context"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/lotus/storage/sealmgr"
) )
// alias because cbor-gen doesn't like non-alias types // alias because cbor-gen doesn't like non-alias types
@ -136,8 +135,8 @@ type SectorInfo struct {
CommR *cid.Cid CommR *cid.Cid
Proof []byte Proof []byte
Deals []abi.DealID Deals []abi.DealID
Ticket sealmgr.SealTicket Ticket SealTicket
Seed sealmgr.SealSeed Seed SealSeed
Retries uint64 Retries uint64
LastErr string LastErr string
@ -154,3 +153,21 @@ type SealedRef struct {
type SealedRefs struct { type SealedRefs struct {
Refs []SealedRef Refs []SealedRef
} }
type SealTicket struct {
Value abi.SealRandomness
Epoch abi.ChainEpoch
}
type SealSeed struct {
Value abi.InteractiveSealRandomness
Epoch abi.ChainEpoch
}
func (st *SealTicket) Equals(ost *SealTicket) bool {
return bytes.Equal(st.Value, ost.Value) && st.Epoch == ost.Epoch
}
func (st *SealSeed) Equals(ost *SealSeed) bool {
return bytes.Equal(st.Value, ost.Value) && st.Epoch == ost.Epoch
}

View File

@ -6,18 +6,12 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
badger "github.com/ipfs/go-ds-badger2"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
"github.com/mitchellh/go-homedir" "github.com/mitchellh/go-homedir"
"golang.org/x/xerrors"
"gopkg.in/urfave/cli.v2" "gopkg.in/urfave/cli.v2"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/abi/big"
@ -38,7 +32,6 @@ func main() {
preSealCmd, preSealCmd,
aggregateManifestsCmd, aggregateManifestsCmd,
aggregateSectorDirsCmd,
} }
app := &cli.App{ app := &cli.App{
@ -174,149 +167,6 @@ var aggregateManifestsCmd = &cli.Command{
}, },
} }
var aggregateSectorDirsCmd = &cli.Command{
Name: "aggregate-sector-dirs",
Usage: "aggregate a set of preseal manifests into a single file",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "miner",
Usage: "Specify address of miner to aggregate sectorbuilders for",
},
&cli.StringFlag{
Name: "dest",
Usage: "specify directory to create aggregate sector store in",
},
&cli.Uint64Flag{
Name: "sector-size",
Usage: "specify size of sectors to aggregate",
Value: 32 * 1024 * 1024 * 1024,
},
},
Action: func(cctx *cli.Context) error {
if cctx.String("miner") == "" {
return fmt.Errorf("must specify miner address with --miner")
}
if cctx.String("dest") == "" {
return fmt.Errorf("must specify dest directory with --dest")
}
maddr, err := address.NewFromString(cctx.String("miner"))
if err != nil {
return err
}
destdir, err := homedir.Expand(cctx.String("dest"))
if err != nil {
return err
}
if err := os.MkdirAll(destdir, 0755); err != nil {
return err
}
agmds, err := badger.NewDatastore(filepath.Join(destdir, "badger"), nil)
if err != nil {
return err
}
defer agmds.Close()
ssize := abi.SectorSize(cctx.Uint64("sector-size"))
ppt, spt, err := lapi.ProofTypeFromSectorSize(abi.SectorSize(cctx.Uint64("sector-size")))
if err != nil {
return err
}
agsb, err := sectorbuilder.New(&sectorbuilder.Config{
Miner: maddr,
SealProofType: spt,
PoStProofType: ppt,
Paths: sectorbuilder.SimplePath(destdir),
WorkerThreads: 2,
}, namespace.Wrap(agmds, datastore.NewKey("/sectorbuilder")))
if err != nil {
return err
}
var aggrGenMiner genesis.Miner
var highestSectorID abi.SectorNumber
for _, dir := range cctx.Args().Slice() {
dir, err := homedir.Expand(dir)
if err != nil {
return xerrors.Errorf("failed to expand %q: %w", dir, err)
}
st, err := os.Stat(dir)
if err != nil {
return err
}
if !st.IsDir() {
return fmt.Errorf("%q was not a directory", dir)
}
fi, err := os.Open(filepath.Join(dir, "pre-seal-"+maddr.String()+".json"))
if err != nil {
return err
}
var genmm map[string]genesis.Miner
if err := json.NewDecoder(fi).Decode(&genmm); err != nil {
return err
}
genm, ok := genmm[maddr.String()]
if !ok {
return xerrors.Errorf("input data did not have our miner in it (%s)", maddr)
}
if genm.SectorSize != ssize {
return xerrors.Errorf("sector size mismatch in %q (%d != %d)", dir)
}
for _, s := range genm.Sectors {
if s.SectorID > highestSectorID {
highestSectorID = s.SectorID
}
}
aggrGenMiner = mergeGenMiners(aggrGenMiner, genm)
opts := badger.DefaultOptions
opts.ReadOnly = true
mds, err := badger.NewDatastore(filepath.Join(dir, "badger"), &opts)
if err != nil {
return err
}
defer mds.Close()
sb, err := sectorbuilder.New(&sectorbuilder.Config{
Miner: maddr,
SealProofType: spt,
PoStProofType: ppt,
Paths: sectorbuilder.SimplePath(dir),
WorkerThreads: 2,
}, namespace.Wrap(mds, datastore.NewKey("/sectorbuilder")))
if err != nil {
return err
}
if err := agsb.ImportFrom(sb, false); err != nil {
return xerrors.Errorf("importing sectors from %q failed: %w", dir, err)
}
}
if err := agsb.SetLastSectorNum(highestSectorID); err != nil {
return err
}
if err := seed.WriteGenesisMiner(maddr, destdir, &aggrGenMiner, nil); err != nil {
return err
}
return nil
},
}
func mergeGenMiners(a, b genesis.Miner) genesis.Miner { func mergeGenMiners(a, b genesis.Miner) genesis.Miner {
if a.SectorSize != b.SectorSize { if a.SectorSize != b.SectorSize {
panic("sector sizes mismatch") panic("sector sizes mismatch")

View File

@ -11,25 +11,23 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"github.com/google/uuid"
badger "github.com/ipfs/go-ds-badger2"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/go-sectorbuilder/fs"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/filecoin-project/specs-actors/actors/builtin/market"
"github.com/filecoin-project/specs-actors/actors/crypto" "github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
badger "github.com/ipfs/go-ds-badger2"
logging "github.com/ipfs/go-log/v2"
"github.com/multiformats/go-multihash"
"golang.org/x/xerrors"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/wallet" "github.com/filecoin-project/lotus/chain/wallet"
"github.com/filecoin-project/lotus/genesis" "github.com/filecoin-project/lotus/genesis"
"github.com/filecoin-project/lotus/node/config"
) )
var log = logging.Logger("preseal") var log = logging.Logger("preseal")
@ -49,9 +47,6 @@ func PreSeal(maddr address.Address, pt abi.RegisteredProof, offset abi.SectorNum
Miner: maddr, Miner: maddr,
SealProofType: spt, SealProofType: spt,
PoStProofType: ppt, PoStProofType: ppt,
FallbackLastNum: offset,
Paths: sectorbuilder.SimplePath(sbroot),
WorkerThreads: 2,
} }
if err := os.MkdirAll(sbroot, 0775); err != nil { if err := os.MkdirAll(sbroot, 0775); err != nil {
@ -63,7 +58,13 @@ func PreSeal(maddr address.Address, pt abi.RegisteredProof, offset abi.SectorNum
return nil, nil, err return nil, nil, err
} }
sb, err := sectorbuilder.New(cfg, namespace.Wrap(mds, datastore.NewKey("/sectorbuilder"))) sbfs := &fs.Basic{
Miner: maddr,
NextID: offset,
Root: sbroot,
}
sb, err := sectorbuilder.New(sbfs, cfg)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -75,7 +76,7 @@ func PreSeal(maddr address.Address, pt abi.RegisteredProof, offset abi.SectorNum
var sealedSectors []*genesis.PreSeal var sealedSectors []*genesis.PreSeal
for i := 0; i < sectors; i++ { for i := 0; i < sectors; i++ {
sid, err := sb.AcquireSectorNumber() sid, err := sbfs.AcquireSectorNumber()
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -90,12 +91,17 @@ func PreSeal(maddr address.Address, pt abi.RegisteredProof, offset abi.SectorNum
fmt.Printf("sector-id: %d, piece info: %v\n", sid, pi) fmt.Printf("sector-id: %d, piece info: %v\n", sid, pi)
scid, ucid, err := sb.SealPreCommit(context.TODO(), sid, ticket, []abi.PieceInfo{pi}) in2, err := sb.SealPreCommit1(context.TODO(), sid, ticket, []abi.PieceInfo{pi})
if err != nil { if err != nil {
return nil, nil, xerrors.Errorf("commit: %w", err) return nil, nil, xerrors.Errorf("commit: %w", err)
} }
if err := sb.TrimCache(context.TODO(), sid); err != nil { scid, ucid, err := sb.SealPreCommit2(context.TODO(), sid, in2)
if err != nil {
return nil, nil, xerrors.Errorf("commit: %w", err)
}
if err := sb.FinalizeSector(context.TODO(), sid); err != nil {
return nil, nil, xerrors.Errorf("trim cache: %w", err) return nil, nil, xerrors.Errorf("trim cache: %w", err)
} }
@ -138,6 +144,22 @@ func PreSeal(maddr address.Address, pt abi.RegisteredProof, offset abi.SectorNum
return nil, nil, xerrors.Errorf("closing datastore: %w", err) return nil, nil, xerrors.Errorf("closing datastore: %w", err)
} }
{
b, err := json.Marshal(&config.StorageMeta{
ID: uuid.New().String(),
Weight: 0, // read-only
CanCommit: false,
CanStore: false,
})
if err != nil {
return nil, nil, xerrors.Errorf("marshaling storage config: %w", err)
}
if err := ioutil.WriteFile(filepath.Join(sbroot, "storage.json"), b, 0644); err != nil {
return nil, nil, xerrors.Errorf("persisting storage metadata (%s): %w", filepath.Join(sbroot, "storage.json"), err)
}
}
return miner, &minerAddr.KeyInfo, nil return miner, &minerAddr.KeyInfo, nil
} }
@ -172,19 +194,6 @@ func WriteGenesisMiner(maddr address.Address, sbroot string, gm *genesis.Miner,
return nil return nil
} }
func commDCID(commd []byte) cid.Cid {
d, err := cid.Prefix{
Version: 1,
Codec: cid.Raw,
MhType: multihash.IDENTITY,
MhLength: len(commd),
}.Sum(commd)
if err != nil {
panic(err)
}
return d
}
func createDeals(m *genesis.Miner, k *wallet.Key, maddr address.Address, ssize abi.SectorSize) error { func createDeals(m *genesis.Miner, k *wallet.Key, maddr address.Address, ssize abi.SectorSize) error {
for _, sector := range m.Sectors { for _, sector := range m.Sectors {
proposal := &market.DealProposal{ proposal := &market.DealProposal{

View File

@ -72,7 +72,8 @@ var infoCmd = &cli.Command{
float64(10000*uint64(len(faults))/secCounts.Pset)/100.) float64(10000*uint64(len(faults))/secCounts.Pset)/100.)
} }
// TODO: indicate whether the post worker is in use panic("todo")
/*// TODO: indicate whether the post worker is in use
wstat, err := nodeApi.WorkerStats(ctx) wstat, err := nodeApi.WorkerStats(ctx)
if err != nil { if err != nil {
return err return err
@ -86,7 +87,7 @@ var infoCmd = &cli.Command{
fmt.Printf("\tAddPiece: %d\n", wstat.AddPieceWait) fmt.Printf("\tAddPiece: %d\n", wstat.AddPieceWait)
fmt.Printf("\tPreCommit: %d\n", wstat.PreCommitWait) fmt.Printf("\tPreCommit: %d\n", wstat.PreCommitWait)
fmt.Printf("\tCommit: %d\n", wstat.CommitWait) fmt.Printf("\tCommit: %d\n", wstat.CommitWait)
fmt.Printf("\tUnseal: %d\n", wstat.UnsealWait) fmt.Printf("\tUnseal: %d\n", wstat.UnsealWait)*/
ps, err := api.StateMinerPostState(ctx, maddr, types.EmptyTSK) ps, err := api.StateMinerPostState(ctx, maddr, types.EmptyTSK)
if err != nil { if err != nil {

View File

@ -7,7 +7,6 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath"
"strconv" "strconv"
"github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin"
@ -19,11 +18,8 @@ import (
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
cborutil "github.com/filecoin-project/go-cbor-util" cborutil "github.com/filecoin-project/go-cbor-util"
paramfetch "github.com/filecoin-project/go-paramfetch" paramfetch "github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
badger "github.com/ipfs/go-ds-badger2"
"github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/mitchellh/go-homedir" "github.com/mitchellh/go-homedir"
@ -38,11 +34,11 @@ import (
"github.com/filecoin-project/lotus/genesis" "github.com/filecoin-project/lotus/genesis"
"github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/sealing" "github.com/filecoin-project/lotus/storage/sealing"
"github.com/filecoin-project/lotus/storage/sealmgr/advmgr"
) )
var initCmd = &cli.Command{ var initCmd = &cli.Command{
@ -77,7 +73,7 @@ var initCmd = &cli.Command{
Usage: "specify sector size to use", Usage: "specify sector size to use",
Value: uint64(build.SectorSizes[0]), Value: uint64(build.SectorSizes[0]),
}, },
&cli.StringFlag{ &cli.StringSliceFlag{
Name: "pre-sealed-sectors", Name: "pre-sealed-sectors",
Usage: "specify set of presealed sectors for starting as a genesis miner", Usage: "specify set of presealed sectors for starting as a genesis miner",
}, },
@ -159,60 +155,33 @@ var initCmd = &cli.Command{
return err return err
} }
if pssb := cctx.String("pre-sealed-sectors"); pssb != "" { if pssb := cctx.StringSlice("pre-sealed-sectors"); len(pssb) != 0 {
pssb, err := homedir.Expand(pssb) log.Infof("Setting up storage config with presealed sector", pssb)
if err != nil {
return err
}
log.Infof("moving pre-sealed-sectors from %s into newly created storage miner repo", pssb)
lr, err := r.Lock(repo.StorageMiner) lr, err := r.Lock(repo.StorageMiner)
if err != nil { if err != nil {
return err return err
} }
mds, err := lr.Datastore("/metadata") sc, err := lr.GetStorage()
if err != nil {
return xerrors.Errorf("get storage config: %w", err)
}
for _, psp := range pssb {
psp, err := homedir.Expand(psp)
if err != nil { if err != nil {
return err return err
} }
sc.StoragePaths = append(sc.StoragePaths, config.LocalPath{
bopts := badger.DefaultOptions Path: psp,
bopts.ReadOnly = true })
oldmds, err := badger.NewDatastore(filepath.Join(pssb, "badger"), &bopts)
if err != nil {
return err
} }
ppt, spt, err := lapi.ProofTypeFromSectorSize(ssize) if err := lr.SetStorage(sc); err != nil {
if err != nil { return xerrors.Errorf("set storage config: %w", err)
return err
} }
oldsb, err := sectorbuilder.New(&sectorbuilder.Config{ panic("persist last sector id somehow")
SealProofType: spt,
PoStProofType: ppt,
WorkerThreads: 2,
Paths: sectorbuilder.SimplePath(pssb),
}, namespace.Wrap(oldmds, datastore.NewKey("/sectorbuilder")))
if err != nil {
return xerrors.Errorf("failed to open up preseal sectorbuilder: %w", err)
}
nsb, err := sectorbuilder.New(&sectorbuilder.Config{
SealProofType: spt,
PoStProofType: ppt,
WorkerThreads: 2,
Paths: sectorbuilder.SimplePath(lr.Path()),
}, namespace.Wrap(mds, datastore.NewKey("/sectorbuilder")))
if err != nil {
return xerrors.Errorf("failed to open up sectorbuilder: %w", err)
}
if err := nsb.ImportFrom(oldsb, symlink); err != nil {
return err
}
if err := lr.Close(); err != nil {
return xerrors.Errorf("unlocking repo after preseal migration: %w", err)
}
} }
if err := storageMinerInit(ctx, cctx, api, r); err != nil { if err := storageMinerInit(ctx, cctx, api, r); err != nil {
@ -374,30 +343,8 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode,
return err return err
} }
c, err := lr.Config() smgr := advmgr.New(lr)
if err != nil { epp := storage.NewElectionPoStProver(smgr)
return err
}
cfg, ok := c.(*config.StorageMiner)
if !ok {
return xerrors.Errorf("invalid config from repo, got: %T", c)
}
scfg := sectorbuilder.SimplePath(lr.Path())
if len(cfg.SectorBuilder.Storage) > 0 {
scfg = cfg.SectorBuilder.Storage
}
sbcfg, err := modules.SectorBuilderConfig(scfg, 2, false, false)(mds, api)
if err != nil {
return xerrors.Errorf("getting genesis miner sector builder config: %w", err)
}
sb, err := sectorbuilder.New(sbcfg, mds)
if err != nil {
return xerrors.Errorf("failed to set up sectorbuilder for genesis mining: %w", err)
}
epp := storage.NewElectionPoStProver(sb)
m := miner.NewMiner(api, epp) m := miner.NewMiner(api, epp)
{ {

View File

@ -6,15 +6,15 @@ Build the Lotus Binaries in debug mode, This enables the use of 1024 byte sector
make debug make debug
``` ```
Download the 1024 byte parameters: Download the 2048 byte parameters:
```sh ```sh
./lotus fetch-params --proving-params 1024 ./lotus fetch-params --proving-params 2048
``` ```
Pre-seal some sectors: Pre-seal some sectors:
```sh ```sh
./lotus-seed pre-seal --sector-size 1024 --num-sectors 2 ./lotus-seed pre-seal --sector-size 2048 --num-sectors 2
``` ```
Create the genesis block and start up the first node: Create the genesis block and start up the first node:
@ -34,7 +34,7 @@ Then, in another console, import the genesis miner key:
Set up the genesis miner: Set up the genesis miner:
```sh ```sh
./lotus-storage-miner init --genesis-miner --actor=t01000 --sector-size=1024 --pre-sealed-sectors=~/.genesis-sectors --pre-sealed-metadata=~/.genesis-sectors/pre-seal-t0101.json --nosync ./lotus-storage-miner init --genesis-miner --actor=t01000 --sector-size=2048 --pre-sealed-sectors=~/.genesis-sectors --pre-sealed-metadata=~/.genesis-sectors/pre-seal-t0101.json --nosync
``` ```
Now, finally, start up the miner: Now, finally, start up the miner:

View File

@ -12,18 +12,19 @@ import (
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/sealmgr"
) )
type retrievalProviderNode struct { type retrievalProviderNode struct {
miner *storage.Miner miner *storage.Miner
sb sectorbuilder.Interface sealer sealmgr.Manager
full api.FullNode full api.FullNode
} }
// NewRetrievalProviderNode returns a new node adapter for a retrieval provider that talks to the // NewRetrievalProviderNode returns a new node adapter for a retrieval provider that talks to the
// Lotus Node // Lotus Node
func NewRetrievalProviderNode(miner *storage.Miner, sb sectorbuilder.Interface, full api.FullNode) retrievalmarket.RetrievalProviderNode { func NewRetrievalProviderNode(miner *storage.Miner, sealer sealmgr.Manager, full api.FullNode) retrievalmarket.RetrievalProviderNode {
return &retrievalProviderNode{miner, sb, full} return &retrievalProviderNode{miner, sealer, full}
} }
func (rpn *retrievalProviderNode) UnsealSector(ctx context.Context, sectorID uint64, offset uint64, length uint64) (io.ReadCloser, error) { func (rpn *retrievalProviderNode) UnsealSector(ctx context.Context, sectorID uint64, offset uint64, length uint64) (io.ReadCloser, error) {
@ -31,7 +32,7 @@ func (rpn *retrievalProviderNode) UnsealSector(ctx context.Context, sectorID uin
if err != nil { if err != nil {
return nil, err return nil, err
} }
return rpn.sb.ReadPieceFromSealedSector(ctx, abi.SectorNumber(sectorID), sectorbuilder.UnpaddedByteIndex(offset), abi.UnpaddedPieceSize(length), si.Ticket.Value, *si.CommD) return rpn.sealer.ReadPieceFromSealedSector(ctx, abi.SectorNumber(sectorID), sectorbuilder.UnpaddedByteIndex(offset), abi.UnpaddedPieceSize(length), si.Ticket.Value, *si.CommD)
} }
func (rpn *retrievalProviderNode) SavePaymentVoucher(ctx context.Context, paymentChannel address.Address, voucher *paych.SignedVoucher, proof []byte, expectedAmount abi.TokenAmount) (abi.TokenAmount, error) { func (rpn *retrievalProviderNode) SavePaymentVoucher(ctx context.Context, paymentChannel address.Address, voucher *paych.SignedVoucher, proof []byte, expectedAmount abi.TokenAmount) (abi.TokenAmount, error) {

View File

@ -253,7 +253,6 @@ func Online() Option {
// Storage miner // Storage miner
ApplyIf(func(s *Settings) bool { return s.nodeType == repo.StorageMiner }, ApplyIf(func(s *Settings) bool { return s.nodeType == repo.StorageMiner },
Override(new(sectorbuilder.Interface), modules.SectorBuilder),
Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks), Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks),
Override(new(sealing.TicketFn), modules.SealTicketGen), Override(new(sealing.TicketFn), modules.SealTicketGen),
Override(new(*storage.Miner), modules.StorageMiner), Override(new(*storage.Miner), modules.StorageMiner),
@ -346,30 +345,13 @@ func ConfigFullNode(c interface{}) Option {
) )
} }
func ConfigStorageMiner(c interface{}, lr repo.LockedRepo) Option { func ConfigStorageMiner(c interface{}) Option {
cfg, ok := c.(*config.StorageMiner) cfg, ok := c.(*config.StorageMiner)
if !ok { if !ok {
return Error(xerrors.Errorf("invalid config from repo, got: %T", c)) return Error(xerrors.Errorf("invalid config from repo, got: %T", c))
} }
scfg := sectorbuilder.SimplePath(lr.Path()) return Options(ConfigCommon(&cfg.Common))
if cfg.SectorBuilder.Path == "" {
if len(cfg.SectorBuilder.Storage) > 0 {
scfg = cfg.SectorBuilder.Storage
}
} else {
scfg = sectorbuilder.SimplePath(cfg.SectorBuilder.Path)
log.Warn("LEGACY SectorBuilder.Path FOUND IN CONFIG. Please use the new storage config")
}
return Options(
ConfigCommon(&cfg.Common),
Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig(scfg,
cfg.SectorBuilder.WorkerCount,
cfg.SectorBuilder.DisableLocalPreCommit,
cfg.SectorBuilder.DisableLocalCommit)),
)
} }
func Repo(r repo.Repo) Option { func Repo(r repo.Repo) Option {
@ -387,7 +369,7 @@ func Repo(r repo.Repo) Option {
Override(new(repo.LockedRepo), modules.LockedRepo(lr)), // module handles closing Override(new(repo.LockedRepo), modules.LockedRepo(lr)), // module handles closing
ApplyIf(isType(repo.FullNode), ConfigFullNode(c)), ApplyIf(isType(repo.FullNode), ConfigFullNode(c)),
ApplyIf(isType(repo.StorageMiner), ConfigStorageMiner(c, lr)), ApplyIf(isType(repo.StorageMiner), ConfigStorageMiner(c)),
Override(new(dtypes.MetadataDS), modules.Datastore), Override(new(dtypes.MetadataDS), modules.Datastore),
Override(new(dtypes.ChainBlockstore), modules.ChainBlockstore), Override(new(dtypes.ChainBlockstore), modules.ChainBlockstore),

View File

@ -3,8 +3,6 @@ package config
import ( import (
"encoding" "encoding"
"time" "time"
"github.com/filecoin-project/go-sectorbuilder/fs"
) )
// Common is common config between full node and miner // Common is common config between full node and miner
@ -25,7 +23,7 @@ type FullNode struct {
type StorageMiner struct { type StorageMiner struct {
Common Common
SectorBuilder SectorBuilder Storage Storage
} }
// API contains configs for API endpoint // API contains configs for API endpoint
@ -54,14 +52,8 @@ type Metrics struct {
} }
// // Storage Miner // // Storage Miner
type Storage struct {
type SectorBuilder struct {
Path string // TODO: remove // FORK (-ish)
Storage []fs.PathConfig
WorkerCount uint
DisableLocalPreCommit bool
DisableLocalCommit bool
} }
func defCommon() Common { func defCommon() Common {
@ -95,9 +87,7 @@ func DefaultStorageMiner() *StorageMiner {
cfg := &StorageMiner{ cfg := &StorageMiner{
Common: defCommon(), Common: defCommon(),
SectorBuilder: SectorBuilder{ Storage: Storage{},
WorkerCount: 5,
},
} }
cfg.Common.API.ListenAddress = "/ip4/127.0.0.1/tcp/2345/http" cfg.Common.API.ListenAddress = "/ip4/127.0.0.1/tcp/2345/http"
return cfg return cfg

68
node/config/storage.go Normal file
View File

@ -0,0 +1,68 @@
package config
import (
"encoding/json"
"io"
"io/ioutil"
"os"
"golang.org/x/xerrors"
)
type LocalPath struct {
Path string
}
// .lotusstorage/storage.json
type StorageConfig struct {
StoragePaths []LocalPath
}
// [path]/metadata.json
type StorageMeta struct {
ID string
Weight int // 0 = readonly
CanCommit bool
CanStore bool
}
func StorageFromFile(path string, def *StorageConfig) (*StorageConfig, error) {
file, err := os.Open(path)
switch {
case os.IsNotExist(err):
if def == nil {
return nil, xerrors.Errorf("couldn't load storage config: %w", err)
}
return def, nil
case err != nil:
return nil, err
}
defer file.Close() //nolint:errcheck // The file is RO
return StorageFromReader(file, *def)
}
func StorageFromReader(reader io.Reader, def StorageConfig) (*StorageConfig, error) {
cfg := def
err := json.NewDecoder(reader).Decode(&cfg)
if err != nil {
return nil, err
}
return &cfg, nil
}
func WriteStorageFile(path string, config StorageConfig) error {
b, err := json.Marshal(config)
if err != nil {
return xerrors.Errorf("marshaling storage config: %w", err)
}
if err := ioutil.WriteFile(path, b, 0644); err != nil {
return xerrors.Errorf("persisting storage config (%s): %w", path, err)
}
return nil
}

View File

@ -3,25 +3,18 @@ package impl
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"io"
"mime"
"net/http" "net/http"
"os"
"strconv" "strconv"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/gorilla/mux"
files "github.com/ipfs/go-ipfs-files"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/go-sectorbuilder/fs" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/gorilla/mux"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/apistruct" "github.com/filecoin-project/lotus/api/apistruct"
"github.com/filecoin-project/lotus/lib/tarutil"
"github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/sectorblocks" "github.com/filecoin-project/lotus/storage/sectorblocks"
@ -31,7 +24,7 @@ type StorageMinerAPI struct {
CommonAPI CommonAPI
SectorBuilderConfig *sectorbuilder.Config SectorBuilderConfig *sectorbuilder.Config
SectorBuilder sectorbuilder.Interface //SectorBuilder sectorbuilder.Interface
SectorBlocks *sectorblocks.SectorBlocks SectorBlocks *sectorblocks.SectorBlocks
Miner *storage.Miner Miner *storage.Miner
@ -57,7 +50,8 @@ func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) {
} }
func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Request) { func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r) panic("todo")
/* vars := mux.Vars(r)
id, err := strconv.ParseUint(vars["id"], 10, 64) id, err := strconv.ParseUint(vars["id"], 10, 64)
if err != nil { if err != nil {
@ -98,11 +92,12 @@ func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Reques
if _, err := io.Copy(w, rd); err != nil { if _, err := io.Copy(w, rd); err != nil {
log.Error(err) log.Error(err)
return return
} }*/
} }
func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Request) { func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r) panic("todo")
/* vars := mux.Vars(r)
id, err := strconv.ParseUint(vars["id"], 10, 64) id, err := strconv.ParseUint(vars["id"], 10, 64)
if err != nil { if err != nil {
@ -159,13 +154,13 @@ func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Reques
w.WriteHeader(200) w.WriteHeader(200)
log.Infof("received %s sector (%s): %d bytes", vars["type"], vars["sname"], r.ContentLength) log.Infof("received %s sector (%s): %d bytes", vars["type"], vars["sname"], r.ContentLength)*/
} }
/*
func (sm *StorageMinerAPI) WorkerStats(context.Context) (sectorbuilder.WorkerStats, error) { func (sm *StorageMinerAPI) WorkerStats(context.Context) (sectorbuilder.WorkerStats, error) {
stat := sm.SectorBuilder.WorkerStats() stat := sm.SectorBuilder.WorkerStats()
return stat, nil return stat, nil
} }*/
func (sm *StorageMinerAPI) ActorAddress(context.Context) (address.Address, error) { func (sm *StorageMinerAPI) ActorAddress(context.Context) (address.Address, error) {
return sm.SectorBuilderConfig.Miner, nil return sm.SectorBuilderConfig.Miner, nil
@ -252,7 +247,7 @@ func (sm *StorageMinerAPI) SectorsRefs(context.Context) (map[string][]api.Sealed
func (sm *StorageMinerAPI) SectorsUpdate(ctx context.Context, id abi.SectorNumber, state api.SectorState) error { func (sm *StorageMinerAPI) SectorsUpdate(ctx context.Context, id abi.SectorNumber, state api.SectorState) error {
return sm.Miner.ForceSectorState(ctx, id, state) return sm.Miner.ForceSectorState(ctx, id, state)
} }
/*
func (sm *StorageMinerAPI) WorkerQueue(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) { func (sm *StorageMinerAPI) WorkerQueue(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) {
return sm.SectorBuilder.AddWorker(ctx, cfg) return sm.SectorBuilder.AddWorker(ctx, cfg)
} }
@ -260,5 +255,5 @@ func (sm *StorageMinerAPI) WorkerQueue(ctx context.Context, cfg sectorbuilder.Wo
func (sm *StorageMinerAPI) WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error { func (sm *StorageMinerAPI) WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error {
return sm.SectorBuilder.TaskDone(ctx, task, res) return sm.SectorBuilder.TaskDone(ctx, task, res)
} }
*/
var _ api.StorageMiner = &StorageMinerAPI{} var _ api.StorageMiner = &StorageMinerAPI{}

View File

@ -2,11 +2,8 @@ package modules
import ( import (
"context" "context"
"math"
"reflect" "reflect"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
dtgraphsync "github.com/filecoin-project/go-data-transfer/impl/graphsync" dtgraphsync "github.com/filecoin-project/go-data-transfer/impl/graphsync"
piecefilestore "github.com/filecoin-project/go-fil-markets/filestore" piecefilestore "github.com/filecoin-project/go-fil-markets/filestore"
@ -20,7 +17,6 @@ import (
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network" smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
paramfetch "github.com/filecoin-project/go-paramfetch" paramfetch "github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/go-sectorbuilder/fs"
"github.com/filecoin-project/go-statestore" "github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/crypto" "github.com/filecoin-project/specs-actors/actors/crypto"
@ -37,10 +33,12 @@ import (
"github.com/ipfs/go-merkledag" "github.com/ipfs/go-merkledag"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/routing" "github.com/libp2p/go-libp2p-core/routing"
"github.com/mitchellh/go-homedir"
"go.uber.org/fx" "go.uber.org/fx"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/storage/sealmgr"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/gen" "github.com/filecoin-project/lotus/chain/gen"
@ -75,8 +73,7 @@ func GetParams(sbc *sectorbuilder.Config) error {
return nil return nil
} }
func SectorBuilderConfig(storage []fs.PathConfig, threads uint, noprecommit, nocommit bool) func(dtypes.MetadataDS, api.FullNode) (*sectorbuilder.Config, error) { func SectorBuilderConfig(ds dtypes.MetadataDS, fnapi api.FullNode) (*sectorbuilder.Config, error) {
return func(ds dtypes.MetadataDS, fnapi api.FullNode) (*sectorbuilder.Config, error) {
minerAddr, err := minerAddrFromDS(ds) minerAddr, err := minerAddrFromDS(ds)
if err != nil { if err != nil {
return nil, err return nil, err
@ -87,17 +84,6 @@ func SectorBuilderConfig(storage []fs.PathConfig, threads uint, noprecommit, noc
return nil, err return nil, err
} }
for i := range storage {
storage[i].Path, err = homedir.Expand(storage[i].Path)
if err != nil {
return nil, err
}
}
if threads > math.MaxUint8 {
return nil, xerrors.Errorf("too many sectorbuilder threads specified: %d, max allowed: %d", threads, math.MaxUint8)
}
ppt, spt, err := api.ProofTypeFromSectorSize(ssize) ppt, spt, err := api.ProofTypeFromSectorSize(ssize)
if err != nil { if err != nil {
return nil, xerrors.Errorf("bad sector size: %w", err) return nil, xerrors.Errorf("bad sector size: %w", err)
@ -107,19 +93,12 @@ func SectorBuilderConfig(storage []fs.PathConfig, threads uint, noprecommit, noc
Miner: minerAddr, Miner: minerAddr,
SealProofType: spt, SealProofType: spt,
PoStProofType: ppt, PoStProofType: ppt,
WorkerThreads: uint8(threads),
NoPreCommit: noprecommit,
NoCommit: nocommit,
Paths: storage,
} }
return sb, nil return sb, nil
} }
}
func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, sb sectorbuilder.Interface, tktFn sealing.TicketFn) (*storage.Miner, error) { func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, sealer sealmgr.Manager, tktFn sealing.TicketFn) (*storage.Miner, error) {
maddr, err := minerAddrFromDS(ds) maddr, err := minerAddrFromDS(ds)
if err != nil { if err != nil {
return nil, err return nil, err
@ -132,9 +111,9 @@ func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h
return nil, err return nil, err
} }
fps := storage.NewFPoStScheduler(api, sb, maddr, worker) fps := storage.NewFPoStScheduler(api, sealer, maddr, worker)
sm, err := storage.NewMiner(api, maddr, worker, h, ds, sb, tktFn) sm, err := storage.NewMiner(api, maddr, worker, h, ds, sealer, tktFn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -271,15 +250,6 @@ func SetupBlockProducer(lc fx.Lifecycle, ds dtypes.MetadataDS, api api.FullNode,
return m, nil return m, nil
} }
func SectorBuilder(cfg *sectorbuilder.Config, ds dtypes.MetadataDS) (*sectorbuilder.SectorBuilder, error) {
sb, err := sectorbuilder.New(cfg, namespace.Wrap(ds, datastore.NewKey("/sectorbuilder")))
if err != nil {
return nil, err
}
return sb, nil
}
func SealTicketGen(fapi api.FullNode) sealing.TicketFn { func SealTicketGen(fapi api.FullNode) sealing.TicketFn {
return func(ctx context.Context) (*api.SealTicket, error) { return func(ctx context.Context) (*api.SealTicket, error) {
ts, err := fapi.ChainHead(ctx) ts, err := fapi.ChainHead(ctx)
@ -333,8 +303,8 @@ func StorageProvider(ctx helpers.MetricsCtx, fapi api.FullNode, h host.Host, ds
} }
// RetrievalProvider creates a new retrieval provider attached to the provider blockstore // RetrievalProvider creates a new retrieval provider attached to the provider blockstore
func RetrievalProvider(h host.Host, miner *storage.Miner, sb sectorbuilder.Interface, full api.FullNode, ds dtypes.MetadataDS, pieceStore dtypes.ProviderPieceStore, ibs dtypes.StagingBlockstore) (retrievalmarket.RetrievalProvider, error) { func RetrievalProvider(h host.Host, miner *storage.Miner, sealer sealmgr.Manager, full api.FullNode, ds dtypes.MetadataDS, pieceStore dtypes.ProviderPieceStore, ibs dtypes.StagingBlockstore) (retrievalmarket.RetrievalProvider, error) {
adapter := retrievaladapter.NewRetrievalProviderNode(miner, sb, full) adapter := retrievaladapter.NewRetrievalProviderNode(miner, sealer, full)
address, err := minerAddrFromDS(ds) address, err := minerAddrFromDS(ds)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -28,6 +28,7 @@ const (
fsAPI = "api" fsAPI = "api"
fsAPIToken = "token" fsAPIToken = "token"
fsConfig = "config.toml" fsConfig = "config.toml"
fsStorageConfig = "storage.json"
fsDatastore = "datastore" fsDatastore = "datastore"
fsLock = "repo.lock" fsLock = "repo.lock"
fsKeystore = "keystore" fsKeystore = "keystore"
@ -276,6 +277,18 @@ func (fsr *fsLockedRepo) Config() (interface{}, error) {
return config.FromFile(fsr.join(fsConfig), defConfForType(fsr.repoType)) return config.FromFile(fsr.join(fsConfig), defConfForType(fsr.repoType))
} }
func (fsr *fsLockedRepo) GetStorage() (config.StorageConfig, error) {
c, err := config.StorageFromFile(fsr.join(fsStorageConfig), nil)
if err != nil {
return config.StorageConfig{}, err
}
return *c, nil
}
func (fsr *fsLockedRepo) SetStorage(c config.StorageConfig) error {
return config.WriteStorageFile(fsr.join(fsStorageConfig), c)
}
func (fsr *fsLockedRepo) SetAPIEndpoint(ma multiaddr.Multiaddr) error { func (fsr *fsLockedRepo) SetAPIEndpoint(ma multiaddr.Multiaddr) error {
if err := fsr.stillValid(); err != nil { if err := fsr.stillValid(); err != nil {
return err return err

View File

@ -7,6 +7,7 @@ import (
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/config"
) )
var ( var (
@ -37,6 +38,9 @@ type LockedRepo interface {
// Returns config in this repo // Returns config in this repo
Config() (interface{}, error) Config() (interface{}, error)
GetStorage() (config.StorageConfig, error)
SetStorage(config.StorageConfig) error
// SetAPIEndpoint sets the endpoint of the current API // SetAPIEndpoint sets the endpoint of the current API
// so it can be read by API clients // so it can be read by API clients
SetAPIEndpoint(multiaddr.Multiaddr) error SetAPIEndpoint(multiaddr.Multiaddr) error

View File

@ -12,6 +12,7 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/config"
) )
type MemRepo struct { type MemRepo struct {
@ -38,6 +39,14 @@ type lockedMemRepo struct {
token *byte token *byte
} }
func (lmem *lockedMemRepo) GetStorage() (config.StorageConfig, error) {
panic("implement me")
}
func (lmem *lockedMemRepo) SetStorage(config.StorageConfig) error {
panic("implement me")
}
func (lmem *lockedMemRepo) Path() string { func (lmem *lockedMemRepo) Path() string {
t, err := ioutil.TempDir(os.TempDir(), "lotus-memrepo-temp-") t, err := ioutil.TempDir(os.TempDir(), "lotus-memrepo-temp-")
if err != nil { if err != nil {
@ -169,6 +178,10 @@ func (lmem *lockedMemRepo) Config() (interface{}, error) {
return lmem.mem.configF(lmem.t), nil return lmem.mem.configF(lmem.t), nil
} }
func (lmem *lockedMemRepo) Storage() (config.StorageConfig, error) {
panic("implement me")
}
func (lmem *lockedMemRepo) SetAPIEndpoint(ma multiaddr.Multiaddr) error { func (lmem *lockedMemRepo) SetAPIEndpoint(ma multiaddr.Multiaddr) error {
if err := lmem.checkToken(); err != nil { if err := lmem.checkToken(); err != nil {
return err return err

View File

@ -90,7 +90,9 @@ func (s *FPoStScheduler) declareFaults(ctx context.Context, fc uint64, params *m
} }
func (s *FPoStScheduler) checkFaults(ctx context.Context, ssi []abi.SectorNumber) ([]abi.SectorNumber, error) { func (s *FPoStScheduler) checkFaults(ctx context.Context, ssi []abi.SectorNumber) ([]abi.SectorNumber, error) {
faults := s.sb.Scrub(ssi) //faults := s.sb.Scrub(ssi)
log.Warnf("Stub checkFaults")
var faults []struct{SectorNum abi.SectorNumber; Err error}
declaredFaults := map[abi.SectorNumber]struct{}{} declaredFaults := map[abi.SectorNumber]struct{}{}

View File

@ -22,7 +22,7 @@ const StartConfidence = 4 // TODO: config
type FPoStScheduler struct { type FPoStScheduler struct {
api storageMinerApi api storageMinerApi
sb sectorbuilder.Interface sb sectorbuilder.Prover
actor address.Address actor address.Address
worker address.Address worker address.Address
@ -37,7 +37,7 @@ type FPoStScheduler struct {
failLk sync.Mutex failLk sync.Mutex
} }
func NewFPoStScheduler(api storageMinerApi, sb sectorbuilder.Interface, actor address.Address, worker address.Address) *FPoStScheduler { func NewFPoStScheduler(api storageMinerApi, sb sectorbuilder.Prover, actor address.Address, worker address.Address) *FPoStScheduler {
return &FPoStScheduler{api: api, sb: sb, actor: actor, worker: worker} return &FPoStScheduler{api: api, sb: sb, actor: actor, worker: worker}
} }

View File

@ -13,6 +13,8 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
ffi "github.com/filecoin-project/filecoin-ffi" ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/lotus/storage/sealmgr"
"github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/builtin/miner"
@ -32,7 +34,7 @@ var log = logging.Logger("storageminer")
type Miner struct { type Miner struct {
api storageMinerApi api storageMinerApi
h host.Host h host.Host
sb sectorbuilder.Interface sealer sealmgr.Manager
ds datastore.Batching ds datastore.Batching
tktFn sealing.TicketFn tktFn sealing.TicketFn
@ -71,11 +73,11 @@ type storageMinerApi interface {
WalletHas(context.Context, address.Address) (bool, error) WalletHas(context.Context, address.Address) (bool, error)
} }
func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, ds datastore.Batching, sb sectorbuilder.Interface, tktFn sealing.TicketFn) (*Miner, error) { func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, ds datastore.Batching, sealer sealmgr.Manager, tktFn sealing.TicketFn) (*Miner, error) {
m := &Miner{ m := &Miner{
api: api, api: api,
h: h, h: h,
sb: sb, sealer: sealer,
ds: ds, ds: ds,
tktFn: tktFn, tktFn: tktFn,
@ -92,7 +94,7 @@ func (m *Miner) Run(ctx context.Context) error {
} }
evts := events.NewEvents(ctx, m.api) evts := events.NewEvents(ctx, m.api)
m.sealing = sealing.New(m.api, evts, m.maddr, m.worker, m.ds, m.sb, m.tktFn) m.sealing = sealing.New(m.api, evts, m.maddr, m.worker, m.ds, m.sealer, m.tktFn)
go m.sealing.Run(ctx) go m.sealing.Run(ctx)
@ -119,10 +121,10 @@ func (m *Miner) runPreflightChecks(ctx context.Context) error {
} }
type SectorBuilderEpp struct { type SectorBuilderEpp struct {
sb sectorbuilder.Interface prover sectorbuilder.Prover
} }
func NewElectionPoStProver(sb sectorbuilder.Interface) *SectorBuilderEpp { func NewElectionPoStProver(sb sectorbuilder.Prover) *SectorBuilderEpp {
return &SectorBuilderEpp{sb} return &SectorBuilderEpp{sb}
} }
@ -132,7 +134,7 @@ func (epp *SectorBuilderEpp) GenerateCandidates(ctx context.Context, ssi []abi.S
start := time.Now() start := time.Now()
var faults []abi.SectorNumber // TODO var faults []abi.SectorNumber // TODO
cds, err := epp.sb.GenerateEPostCandidates(ssi, rand, faults) cds, err := epp.prover.GenerateEPostCandidates(ssi, rand, faults)
if err != nil { if err != nil {
return nil, xerrors.Errorf("failed to generate candidates: %w", err) return nil, xerrors.Errorf("failed to generate candidates: %w", err)
} }
@ -152,7 +154,7 @@ func (epp *SectorBuilderEpp) ComputeProof(ctx context.Context, ssi []abi.SectorI
} }
start := time.Now() start := time.Now()
proof, err := epp.sb.ComputeElectionPoSt(ssi, rand, owins) proof, err := epp.prover.ComputeElectionPoSt(ssi, rand, owins)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -25,7 +25,7 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID abi.SectorNumber, e
out := make([]Piece, len(sizes)) out := make([]Piece, len(sizes))
for i, size := range sizes { for i, size := range sizes {
ppi, err := m.sb.AddPiece(ctx, size, sectorID, m.pledgeReader(size), existingPieceSizes) ppi, err := m.sealer.AddPiece(ctx, size, sectorID, m.pledgeReader(size), existingPieceSizes)
if err != nil { if err != nil {
return nil, xerrors.Errorf("add piece: %w", err) return nil, xerrors.Errorf("add piece: %w", err)
} }
@ -47,15 +47,15 @@ func (m *Sealing) PledgeSector() error {
// this, as we run everything here async, and it's cancelled when the // this, as we run everything here async, and it's cancelled when the
// command exits // command exits
size := abi.PaddedPieceSize(m.sb.SectorSize()).Unpadded() size := abi.PaddedPieceSize(m.sealer.SectorSize()).Unpadded()
rt, _, err := api.ProofTypeFromSectorSize(m.sb.SectorSize()) rt, _, err := api.ProofTypeFromSectorSize(m.sealer.SectorSize())
if err != nil { if err != nil {
log.Error(err) log.Error(err)
return return
} }
sid, err := m.sb.AcquireSectorNumber() sid, err := m.sealer.NewSector()
if err != nil { if err != nil {
log.Errorf("%+v", err) log.Errorf("%+v", err)
return return

View File

@ -12,7 +12,6 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-padreader" "github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/crypto" "github.com/filecoin-project/specs-actors/actors/crypto"
@ -22,6 +21,7 @@ import (
"github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/statemachine" "github.com/filecoin-project/lotus/lib/statemachine"
"github.com/filecoin-project/lotus/storage/sealmgr"
) )
const SectorStorePrefix = "/sectors" const SectorStorePrefix = "/sectors"
@ -65,19 +65,19 @@ type Sealing struct {
maddr address.Address maddr address.Address
worker address.Address worker address.Address
sb sectorbuilder.Interface sealer sealmgr.Manager
sectors *statemachine.StateGroup sectors *statemachine.StateGroup
tktFn TicketFn tktFn TicketFn
} }
func New(api sealingApi, events *events.Events, maddr address.Address, worker address.Address, ds datastore.Batching, sb sectorbuilder.Interface, tktFn TicketFn) *Sealing { func New(api sealingApi, events *events.Events, maddr address.Address, worker address.Address, ds datastore.Batching, sealer sealmgr.Manager, tktFn TicketFn) *Sealing {
s := &Sealing{ s := &Sealing{
api: api, api: api,
events: events, events: events,
maddr: maddr, maddr: maddr,
worker: worker, worker: worker,
sb: sb, sealer: sealer,
tktFn: tktFn, tktFn: tktFn,
} }
@ -104,7 +104,7 @@ func (m *Sealing) AllocatePiece(size abi.UnpaddedPieceSize) (sectorID abi.Sector
return 0, 0, xerrors.Errorf("cannot allocate unpadded piece") return 0, 0, xerrors.Errorf("cannot allocate unpadded piece")
} }
sid, err := m.sb.AcquireSectorNumber() // TODO: Put more than one thing in a sector sid, err := m.sealer.NewSector() // TODO: Put more than one thing in a sector
if err != nil { if err != nil {
return 0, 0, xerrors.Errorf("acquiring sector ID: %w", err) return 0, 0, xerrors.Errorf("acquiring sector ID: %w", err)
} }
@ -116,12 +116,12 @@ func (m *Sealing) AllocatePiece(size abi.UnpaddedPieceSize) (sectorID abi.Sector
func (m *Sealing) SealPiece(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, sectorID abi.SectorNumber, dealID abi.DealID) error { func (m *Sealing) SealPiece(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, sectorID abi.SectorNumber, dealID abi.DealID) error {
log.Infof("Seal piece for deal %d", dealID) log.Infof("Seal piece for deal %d", dealID)
ppi, err := m.sb.AddPiece(ctx, size, sectorID, r, []abi.UnpaddedPieceSize{}) ppi, err := m.sealer.AddPiece(ctx, size, sectorID, r, []abi.UnpaddedPieceSize{})
if err != nil { if err != nil {
return xerrors.Errorf("adding piece to sector: %w", err) return xerrors.Errorf("adding piece to sector: %w", err)
} }
_, rt, err := api.ProofTypeFromSectorSize(m.sb.SectorSize()) _, rt, err := api.ProofTypeFromSectorSize(m.sealer.SectorSize())
if err != nil { if err != nil {
return xerrors.Errorf("bad sector size: %w", err) return xerrors.Errorf("bad sector size: %w", err)
} }

View File

@ -5,7 +5,6 @@ import (
"github.com/filecoin-project/specs-actors/actors/crypto" "github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/filecoin-project/go-sectorbuilder/fs"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/builtin/miner"
@ -26,7 +25,7 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err
allocated += piece.Size allocated += piece.Size
} }
ubytes := abi.PaddedPieceSize(m.sb.SectorSize()).Unpadded() ubytes := abi.PaddedPieceSize(m.sealer.SectorSize()).Unpadded()
if allocated > ubytes { if allocated > ubytes {
return xerrors.Errorf("too much data in sector: %d > %d", allocated, ubytes) return xerrors.Errorf("too much data in sector: %d > %d", allocated, ubytes)
@ -70,7 +69,12 @@ func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) er
return ctx.Send(SectorSealFailed{xerrors.Errorf("getting ticket failed: %w", err)}) return ctx.Send(SectorSealFailed{xerrors.Errorf("getting ticket failed: %w", err)})
} }
sealed, unsealed, err := m.sb.SealPreCommit(ctx.Context(), sector.SectorID, ticket.Value, sector.pieceInfos()) pc1o, err := m.sealer.SealPreCommit1(ctx.Context(), sector.SectorID, ticket.Value, sector.pieceInfos())
if err != nil {
return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit failed: %w", err)})
}
sealed, unsealed, err := m.sealer.SealPreCommit2(ctx.Context(), sector.SectorID, pc1o)
if err != nil { if err != nil {
return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit failed: %w", err)}) return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit failed: %w", err)})
} }
@ -180,7 +184,12 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
log.Infof("KOMIT %d %x(%d); %x(%d); %v; r:%x; d:%x", sector.SectorID, sector.Ticket.Value, sector.Ticket.Epoch, sector.Seed.Value, sector.Seed.Epoch, sector.pieceInfos(), sector.CommR, sector.CommD) log.Infof("KOMIT %d %x(%d); %x(%d); %v; r:%x; d:%x", sector.SectorID, sector.Ticket.Value, sector.Ticket.Epoch, sector.Seed.Value, sector.Seed.Epoch, sector.pieceInfos(), sector.CommR, sector.CommD)
proof, err := m.sb.SealCommit(ctx.Context(), sector.SectorID, sector.Ticket.Value, sector.Seed.Value, sector.pieceInfos(), *sector.CommR, *sector.CommD) c2in, err := m.sealer.SealCommit1(ctx.Context(), sector.SectorID, sector.Ticket.Value, sector.Seed.Value, sector.pieceInfos(), *sector.CommR, *sector.CommD)
if err != nil {
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed: %w", err)})
}
proof, err := m.sealer.SealCommit2(ctx.Context(), sector.SectorID, c2in)
if err != nil { if err != nil {
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed: %w", err)}) return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed: %w", err)})
} }
@ -241,16 +250,9 @@ func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo)
func (m *Sealing) handleFinalizeSector(ctx statemachine.Context, sector SectorInfo) error { func (m *Sealing) handleFinalizeSector(ctx statemachine.Context, sector SectorInfo) error {
// TODO: Maybe wait for some finality // TODO: Maybe wait for some finality
if err := m.sb.FinalizeSector(ctx.Context(), sector.SectorID); err != nil { if err := m.sealer.FinalizeSector(ctx.Context(), sector.SectorID); err != nil {
if !xerrors.Is(err, fs.ErrNoSuitablePath) {
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)}) return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)})
} }
log.Warnf("finalize sector: %v", err)
}
if err := m.sb.DropStaged(ctx.Context(), sector.SectorID); err != nil {
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("drop staged: %w", err)})
}
return ctx.Send(SectorFinalized{}) return ctx.Send(SectorFinalized{})
} }

View File

@ -1,9 +1,10 @@
package sealing package sealing
import ( import (
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/filecoin-project/lotus/api"
) )
type Piece struct { type Piece struct {

View File

@ -0,0 +1,102 @@
package advmgr
import (
"context"
"io"
"sync"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/storage/sealmgr"
)
type LocalStorage interface {
GetStorage() (config.StorageConfig, error)
SetStorage(config.StorageConfig) error
}
type Path struct {
ID string
Weight uint64
LocalPath string
CanSeal bool
CanStore bool
}
type Worker interface {
sealmgr.Worker
Paths() []Path
}
type Manager struct {
workers []sealmgr.Worker
localLk sync.RWMutex
localStorage LocalStorage
}
func (m Manager) SectorSize() abi.SectorSize {
panic("implement me")
}
func (m Manager) NewSector() (abi.SectorNumber, error) {
panic("implement me")
}
func (m Manager) ReadPieceFromSealedSector(context.Context, abi.SectorNumber, sectorbuilder.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error) {
panic("implement me")
}
func (m Manager) AddPiece(context.Context, abi.UnpaddedPieceSize, abi.SectorNumber, io.Reader, []abi.UnpaddedPieceSize) (abi.PieceInfo, error) {
panic("implement me")
}
func (m Manager) SealPreCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out []byte, err error) {
panic("implement me")
}
func (m Manager) SealPreCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out []byte) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) {
panic("implement me")
}
func (m Manager) SealCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, sealedCID cid.Cid, unsealedCID cid.Cid) (output []byte, err error) {
panic("implement me")
}
func (m Manager) SealCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out []byte) (proof []byte, err error) {
panic("implement me")
}
func (m Manager) FinalizeSector(context.Context, abi.SectorNumber) error {
panic("implement me")
}
func (m Manager) GenerateEPostCandidates(sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, faults []abi.SectorNumber) ([]ffi.PoStCandidateWithTicket, error) {
panic("implement me")
}
func (m Manager) GenerateFallbackPoSt(sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, faults []abi.SectorNumber) ([]ffi.PoStCandidateWithTicket, []abi.PoStProof, error) {
panic("implement me")
}
func (m Manager) ComputeElectionPoSt(sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, winners []abi.PoStCandidate) ([]abi.PoStProof, error) {
panic("implement me")
}
func New(ls LocalStorage) *Manager {
return &Manager{
workers: nil,
localStorage: ls,
}
}
var _ sealmgr.Manager = &Manager{}

View File

@ -26,15 +26,13 @@ type Simple struct {
worker Worker worker Worker
} }
func NewSimpleManager(sc *storedcounter.StoredCounter, maddr address.Address, sb sectorbuilder.Basic) (*Simple, error) { func (s *Simple) SectorSize() abi.SectorSize {
mid, err := address.IDFromAddress(maddr) panic("implement me")
if err != nil {
return nil, xerrors.Errorf("get miner id: %w", err)
} }
func NewSimpleManager(sc *storedcounter.StoredCounter, maddr address.Address, sb sectorbuilder.Basic) (*Simple, error) {
w := &LocalWorker{ w := &LocalWorker{
sealer: sb, sb,
mid: abi.ActorID(mid),
} }
return &Simple{ return &Simple{
@ -44,49 +42,71 @@ func NewSimpleManager(sc *storedcounter.StoredCounter, maddr address.Address, sb
}, nil }, nil
} }
func (s *Simple) NewSector() (SectorInfo, error) { func (s *Simple) NewSector() (abi.SectorNumber, error) {
n, err := s.sc.Next() n, err := s.sc.Next()
if err != nil { if err != nil {
return SectorInfo{}, xerrors.Errorf("acquire sector number: %w", err) return 0, xerrors.Errorf("acquire sector number: %w", err)
} }
mid, err := address.IDFromAddress(s.maddr) return abi.SectorNumber(n), nil
if err != nil {
return SectorInfo{}, xerrors.Errorf("get miner id: %w", err)
} }
return SectorInfo{ func (s *Simple) AddPiece(ctx context.Context, sz abi.UnpaddedPieceSize, sectorNum abi.SectorNumber, r io.Reader, existingPieces []abi.UnpaddedPieceSize) (abi.PieceInfo, error) {
ID: abi.SectorID{
Miner: abi.ActorID(mid),
Number: abi.SectorNumber(n),
},
}, nil
}
func (s *Simple) AddPiece(ctx context.Context, si SectorInfo, sz abi.UnpaddedPieceSize, r io.Reader) (cid.Cid, SectorInfo, error) {
s.rateLimiter.Lock() s.rateLimiter.Lock()
defer s.rateLimiter.Unlock() defer s.rateLimiter.Unlock()
return s.worker.AddPiece(ctx, si, sz, r) return s.worker.AddPiece(ctx, sz, sectorNum, r, existingPieces)
} }
func (s *Simple) RunSeal(ctx context.Context, task TaskType, si SectorInfo) (SectorInfo, error) { func (s *Simple) SealPreCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out []byte, err error) {
s.rateLimiter.Lock() s.rateLimiter.Lock()
defer s.rateLimiter.Unlock() defer s.rateLimiter.Unlock()
return s.worker.Run(ctx, task, si) return s.worker.SealPreCommit1(ctx, sectorNum, ticket, pieces)
}
func (s *Simple) SealPreCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out []byte) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) {
s.rateLimiter.Lock()
defer s.rateLimiter.Unlock()
return s.worker.SealPreCommit2(ctx, sectorNum, phase1Out)
}
func (s *Simple) SealCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, sealedCID cid.Cid, unsealedCID cid.Cid) (output []byte, err error) {
s.rateLimiter.Lock()
defer s.rateLimiter.Unlock()
return s.worker.SealCommit1(ctx, sectorNum, ticket, seed, pieces, sealedCID, unsealedCID)
}
func (s *Simple) SealCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out []byte) (proof []byte, err error) {
s.rateLimiter.Lock()
defer s.rateLimiter.Unlock()
return s.worker.SealCommit2(ctx, sectorNum, phase1Out)
}
func (s *Simple) FinalizeSector(ctx context.Context, sectorNum abi.SectorNumber) error {
s.rateLimiter.Lock()
defer s.rateLimiter.Unlock()
return s.worker.FinalizeSector(ctx, sectorNum)
} }
func (s *Simple) GenerateEPostCandidates(sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, faults []abi.SectorNumber) ([]ffi.PoStCandidateWithTicket, error) { func (s *Simple) GenerateEPostCandidates(sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, faults []abi.SectorNumber) ([]ffi.PoStCandidateWithTicket, error) {
return s.worker.(*LocalWorker).sealer.GenerateEPostCandidates(sectorInfo, challengeSeed, faults) return s.worker.GenerateEPostCandidates(sectorInfo, challengeSeed, faults)
} }
func (s *Simple) GenerateFallbackPoSt(sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, faults []abi.SectorNumber) ([]ffi.PoStCandidateWithTicket, []abi.PoStProof, error) { func (s *Simple) GenerateFallbackPoSt(sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, faults []abi.SectorNumber) ([]ffi.PoStCandidateWithTicket, []abi.PoStProof, error) {
return s.worker.(*LocalWorker).sealer.GenerateFallbackPoSt(sectorInfo, challengeSeed, faults) return s.worker.GenerateFallbackPoSt(sectorInfo, challengeSeed, faults)
} }
func (s *Simple) ComputeElectionPoSt(sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, winners []abi.PoStCandidate) ([]abi.PoStProof, error) { func (s *Simple) ComputeElectionPoSt(sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, winners []abi.PoStCandidate) ([]abi.PoStProof, error) {
return s.worker.(*LocalWorker).sealer.ComputeElectionPoSt(sectorInfo, challengeSeed, winners) return s.worker.ComputeElectionPoSt(sectorInfo, challengeSeed, winners)
}
func (s *Simple) ReadPieceFromSealedSector(context.Context, abi.SectorNumber, sectorbuilder.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error) {
panic("todo")
} }
var _ Manager = &Simple{} var _ Manager = &Simple{}

View File

@ -1,80 +1,32 @@
package sealmgr package sealmgr
import ( import (
"bytes"
"context" "context"
"io" "io"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/ipfs/go-cid"
) )
type SealTicket struct {
Value abi.SealRandomness
Epoch abi.ChainEpoch
}
type SealSeed struct {
Value abi.InteractiveSealRandomness
Epoch abi.ChainEpoch
}
func (st *SealTicket) Equals(ost *SealTicket) bool {
return bytes.Equal(st.Value, ost.Value) && st.Epoch == ost.Epoch
}
func (st *SealSeed) Equals(ost *SealSeed) bool {
return bytes.Equal(st.Value, ost.Value) && st.Epoch == ost.Epoch
}
// SectorInfo holds all sector-related metadata
type SectorInfo struct {
ID abi.SectorID
Pieces []abi.PieceInfo
Ticket SealTicket
Seed SealSeed
PreCommit1Out []byte
Sealed *cid.Cid
Unsealed *cid.Cid
CommitInput []byte
Proof []byte
}
func (si SectorInfo) PieceSizes() []abi.UnpaddedPieceSize {
out := make([]abi.UnpaddedPieceSize, len(si.Pieces))
for i := range out {
out[i] = si.Pieces[i].Size.Unpadded()
}
return nil
}
type Worker interface { type Worker interface {
AddPiece(context.Context, SectorInfo, abi.UnpaddedPieceSize, io.Reader) (cid.Cid, SectorInfo, error) sectorbuilder.Sealer
Run(context.Context, TaskType, SectorInfo) (SectorInfo, error) sectorbuilder.Prover
} }
type Manager interface { type Manager interface {
SectorSize() abi.SectorSize
// NewSector allocates staging area for data // NewSector allocates staging area for data
NewSector() (SectorInfo, error) // Storage manager forwards proof-related calls
NewSector() (abi.SectorNumber, error)
// AddPiece appends the piece to the specified sector. Returns PieceCID, and // TODO: Can[Pre]Commit[1,2]
// mutated sector info // TODO: Scrub() []Faults
//
// Note: The passed reader can support other transfer mechanisms, making
// it possible to move the data between data transfer module and workers
AddPiece(context.Context, SectorInfo, abi.UnpaddedPieceSize, io.Reader) (cid.Cid, SectorInfo, error)
RunSeal(ctx context.Context, task TaskType, si SectorInfo) (SectorInfo, error) // TODO: Separate iface
ReadPieceFromSealedSector(context.Context, abi.SectorNumber, sectorbuilder.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error)
// Storage manager forwards proving calls sectorbuilder.Sealer
sectorbuilder.Prover sectorbuilder.Prover
} }

View File

@ -1,74 +1,11 @@
package sealmgr package sealmgr
import ( import (
"context"
"io"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-sectorbuilder" "github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
) )
type LocalWorker struct { type LocalWorker struct {
sealer sectorbuilder.Basic sectorbuilder.Basic
mid abi.ActorID
}
func (w *LocalWorker) Run(ctx context.Context, task TaskType, si SectorInfo) (SectorInfo, error) {
if si.ID.Miner != w.mid {
return si, xerrors.Errorf("received a task with wrong actor id; worker for %d, task for %d", w.mid, si.ID.Miner)
}
switch task {
case TTPreCommit1:
pco, err := w.sealer.SealPreCommit1(ctx, si.ID.Number, si.Ticket.Value, si.Pieces)
if err != nil {
return si, xerrors.Errorf("calling sealer: %w", err)
}
si.PreCommit1Out = pco
case TTPreCommit2:
sealed, unsealed, err := w.sealer.SealPreCommit2(ctx, si.ID.Number, si.PreCommit1Out)
if err != nil {
return si, xerrors.Errorf("calling sealer (precommit2): %w", err)
}
si.Sealed = &sealed
si.Unsealed = &unsealed
// We also call Commit1 here as it only grabs some inputs for the snark,
// which is very fast (<1s), and it doesn't really make sense to have a separate
// task type for it
c2in, err := w.sealer.SealCommit1(ctx, si.ID.Number, si.Ticket.Value, si.Seed.Value, si.Pieces, *si.Sealed, *si.Unsealed)
if err != nil {
return si, xerrors.Errorf("calling sealer (commit1): %w", err)
}
si.CommitInput = c2in
case TTCommit2:
proof, err := w.sealer.SealCommit2(ctx, si.ID.Number, si.CommitInput)
if err != nil {
return SectorInfo{}, xerrors.Errorf("calling sealer: %w", err)
}
si.Proof = proof
default:
return si, xerrors.Errorf("unknown task type '%s'", task)
}
return si, nil
}
func (w *LocalWorker) AddPiece(ctx context.Context, si SectorInfo, sz abi.UnpaddedPieceSize, r io.Reader) (cid.Cid, SectorInfo, error) {
pi, err := w.sealer.AddPiece(ctx, sz, si.ID.Number, r, si.PieceSizes())
if err != nil {
return cid.Cid{}, SectorInfo{}, xerrors.Errorf("addPiece on local worker: %w", err)
}
si.Pieces = append(si.Pieces, pi)
return pi.PieceCID, si, nil
} }
var _ Worker = &LocalWorker{} var _ Worker = &LocalWorker{}

View File

@ -9,7 +9,6 @@ import (
"sync" "sync"
"github.com/filecoin-project/go-padreader" "github.com/filecoin-project/go-padreader"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace" "github.com/ipfs/go-datastore/namespace"
@ -51,16 +50,14 @@ func DsKeyToDealID(key datastore.Key) (uint64, error) {
type SectorBlocks struct { type SectorBlocks struct {
*storage.Miner *storage.Miner
sb sectorbuilder.Interface
keys datastore.Batching keys datastore.Batching
keyLk sync.Mutex keyLk sync.Mutex
} }
func NewSectorBlocks(miner *storage.Miner, ds dtypes.MetadataDS, sb sectorbuilder.Interface) *SectorBlocks { func NewSectorBlocks(miner *storage.Miner, ds dtypes.MetadataDS) *SectorBlocks {
sbc := &SectorBlocks{ sbc := &SectorBlocks{
Miner: miner, Miner: miner,
sb: sb,
keys: namespace.Wrap(ds, dsPrefix), keys: namespace.Wrap(ds, dsPrefix),
} }