Merge pull request #1305 from filecoin-project/feat/4stage-seal

Begin work on integrating new sectorbuilder interfaces
This commit is contained in:
Łukasz Magiera 2020-03-06 00:59:42 +01:00 committed by GitHub
commit 0ef6c756b6
45 changed files with 1655 additions and 767 deletions

View File

@ -4,12 +4,12 @@ import (
"bytes"
"context"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/chain/types"
)
@ -107,13 +107,16 @@ type StorageMiner interface {
SectorsUpdate(context.Context, abi.SectorNumber, SectorState) error
WorkerStats(context.Context) (sectorbuilder.WorkerStats, error)
/*WorkerStats(context.Context) (sealsched.WorkerStats, error)*/
/*// WorkerQueue registers a remote worker
WorkerQueue(context.Context, WorkerCfg) (<-chan WorkerTask, error)
// WorkerQueue registers a remote worker
WorkerQueue(context.Context, sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error)
WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error
*/
MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error
MarketListDeals(ctx context.Context) ([]storagemarket.StorageDeal, error)
MarketListIncompleteDeals(ctx context.Context) ([]storagemarket.MinerDeal, error)
@ -121,6 +124,15 @@ type StorageMiner interface {
DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error
DealsList(ctx context.Context) ([]storagemarket.StorageDeal, error)
StorageAddLocal(ctx context.Context, path string) error
}
type SealRes struct {
Err string
GoErr error `json:"-"`
Proof []byte
}
type SectorLog struct {

View File

@ -9,7 +9,6 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/storagemarket"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
@ -172,15 +171,17 @@ type StorageMinerStruct struct {
SectorsRefs func(context.Context) (map[string][]api.SealedRef, error) `perm:"read"`
SectorsUpdate func(context.Context, abi.SectorNumber, api.SectorState) error `perm:"write"`
WorkerStats func(context.Context) (sectorbuilder.WorkerStats, error) `perm:"read"`
WorkerQueue func(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) `perm:"admin"` // TODO: worker perm
WorkerDone func(ctx context.Context, task uint64, res sectorbuilder.SealRes) error `perm:"admin"`
/* WorkerStats func(context.Context) (sectorbuilder.WorkerStats, error) `perm:"read"`
WorkerQueue func(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) `perm:"admin"` // TODO: worker perm
WorkerDone func(ctx context.Context, task uint64, res sectorbuilder.SealRes) error `perm:"admin"`
*/
SetPrice func(context.Context, types.BigInt) error `perm:"admin"`
DealsImportData func(ctx context.Context, dealPropCid cid.Cid, file string) error `perm:"write"`
DealsList func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"`
StorageAddLocal func(ctx context.Context, path string) error `perm:"admin"`
}
}
@ -610,17 +611,17 @@ func (c *StorageMinerStruct) SectorsUpdate(ctx context.Context, id abi.SectorNum
return c.Internal.SectorsUpdate(ctx, id, state)
}
func (c *StorageMinerStruct) WorkerStats(ctx context.Context) (sectorbuilder.WorkerStats, error) {
/*func (c *StorageMinerStruct) WorkerStats(ctx context.Context) (sealsched.WorkerStats, error) {
return c.Internal.WorkerStats(ctx)
}
}*/
func (c *StorageMinerStruct) WorkerQueue(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) {
/*func (c *StorageMinerStruct) WorkerQueue(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) {
return c.Internal.WorkerQueue(ctx, cfg)
}
func (c *StorageMinerStruct) WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error {
return c.Internal.WorkerDone(ctx, task, res)
}
}*/
func (c *StorageMinerStruct) MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error {
return c.Internal.MarketImportDealData(ctx, propcid, path)
@ -646,6 +647,10 @@ func (c *StorageMinerStruct) DealsList(ctx context.Context) ([]storagemarket.Sto
return c.Internal.DealsList(ctx)
}
func (c *StorageMinerStruct) StorageAddLocal(ctx context.Context, path string) error {
return c.Internal.StorageAddLocal(ctx, path)
}
var _ api.Common = &CommonStruct{}
var _ api.FullNode = &FullNodeStruct{}
var _ api.StorageMiner = &StorageMinerStruct{}

View File

@ -2,6 +2,7 @@ package chain
import (
"context"
"crypto/sha256"
"errors"
"fmt"
"sync"
@ -722,8 +723,9 @@ func (syncer *Syncer) VerifyElectionPoStProof(ctx context.Context, h *types.Bloc
// TODO: why do we need this here?
challengeCount := sectorbuilder.ElectionPostChallengeCount(uint64(len(sectorInfo)), 0)
hvrf := sha256.Sum256(h.EPostProof.PostRand)
pvi := abi.PoStVerifyInfo{
Randomness: h.EPostProof.PostRand,
Randomness: hvrf[:],
Candidates: candidates,
Proofs: h.EPostProof.Proofs,
EligibleSectors: sectorInfo,
@ -737,6 +739,7 @@ func (syncer *Syncer) VerifyElectionPoStProof(ctx context.Context, h *types.Bloc
}
if !ok {
log.Errorf("invalid election post (%x; %v)", pvi.Randomness, candidates)
return xerrors.Errorf("election post was invalid")
}

View File

@ -1,6 +1,6 @@
package types
import(
import (
"encoding/base64"
"github.com/filecoin-project/specs-actors/actors/builtin/paych"

View File

@ -14,8 +14,8 @@ import (
"github.com/docker/go-units"
paramfetch "github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/go-sectorbuilder/fs"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
"github.com/mitchellh/go-homedir"
"golang.org/x/xerrors"
@ -23,6 +23,7 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-sectorbuilder"
lapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
@ -44,11 +45,19 @@ type BenchResults struct {
}
type SealingResult struct {
AddPiece time.Duration
PreCommit time.Duration
Commit time.Duration
Verify time.Duration
Unseal time.Duration
AddPiece time.Duration
PreCommit1 time.Duration
PreCommit2 time.Duration
Commit1 time.Duration
Commit2 time.Duration
Verify time.Duration
Unseal time.Duration
}
type Commit2In struct {
SectorNum int64
Phase1Out []byte
SectorSize uint64
}
func main() {
@ -62,6 +71,9 @@ func main() {
Name: "lotus-bench",
Usage: "Benchmark performance of lotus on your hardware",
Version: build.UserVersion,
Commands: []*cli.Command{
proveCmd,
},
Flags: []cli.Flag{
&cli.StringFlag{
Name: "storage-dir",
@ -70,7 +82,7 @@ func main() {
},
&cli.StringFlag{
Name: "sector-size",
Value: "1GiB",
Value: "512MiB",
Usage: "size of the sectors in bytes, i.e. 32GiB",
},
&cli.BoolFlag{
@ -80,7 +92,7 @@ func main() {
&cli.StringFlag{
Name: "miner-addr",
Usage: "pass miner address (only necessary if using existing sectorbuilder)",
Value: "t0101",
Value: "t01000",
},
&cli.StringFlag{
Name: "benchmark-existing-sectorbuilder",
@ -94,6 +106,10 @@ func main() {
Name: "skip-unseal",
Usage: "skip the unseal portion of the benchmark",
},
&cli.StringFlag{
Name: "save-commit2-input",
Usage: "Save commit2 input to a file",
},
},
Action: func(c *cli.Context) error {
if c.Bool("no-gpu") {
@ -146,13 +162,10 @@ func main() {
return err
}
mds := datastore.NewMapDatastore()
cfg := &sectorbuilder.Config{
Miner: maddr,
SealProofType: spt,
PoStProofType: ppt,
WorkerThreads: 2,
Paths: sectorbuilder.SimplePath(sbdir),
}
if robench == "" {
@ -164,7 +177,14 @@ func main() {
if err := paramfetch.GetParams(build.ParametersJson(), uint64(sectorSize)); err != nil {
return xerrors.Errorf("getting params: %w", err)
}
sb, err := sectorbuilder.New(cfg, mds)
sbfs := &fs.Basic{
Miner: maddr,
NextID: 1,
Root: sbdir,
}
sb, err := sectorbuilder.New(sbfs, cfg)
if err != nil {
return err
}
@ -195,17 +215,25 @@ func main() {
trand := sha256.Sum256([]byte(c.String("ticket-preimage")))
ticket := abi.SealRandomness(trand[:])
log.Info("Running replication...")
log.Info("Running replication(1)...")
pieces := []abi.PieceInfo{pi}
commR, commD, err := sb.SealPreCommit(context.TODO(), i, ticket, pieces)
pc1o, err := sb.SealPreCommit1(context.TODO(), i, ticket, pieces)
if err != nil {
return xerrors.Errorf("commit: %w", err)
}
precommit := time.Now()
precommit1 := time.Now()
log.Info("Running replication(2)...")
commR, commD, err := sb.SealPreCommit2(context.TODO(), i, pc1o)
if err != nil {
return xerrors.Errorf("commit: %w", err)
}
precommit2 := time.Now()
sealedSectors = append(sealedSectors, abi.SectorInfo{
RegisteredProof: ppt,
RegisteredProof: spt,
SectorNumber: i,
SealedCID: commR,
})
@ -215,20 +243,46 @@ func main() {
Value: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 255},
}
log.Info("Generating PoRep for sector")
proof, err := sb.SealCommit(context.TODO(), i, ticket, seed.Value, pieces, commR, commD)
log.Info("Generating PoRep for sector (1)")
c1o, err := sb.SealCommit1(context.TODO(), i, ticket, seed.Value, pieces, commR, commD)
if err != nil {
return err
}
sealcommit := time.Now()
sealcommit1 := time.Now()
log.Info("Generating PoRep for sector (2)")
if c.String("save-commit2-input") != "" {
c2in := Commit2In{
SectorNum: int64(i),
Phase1Out: c1o,
SectorSize: uint64(sectorSize),
}
b, err := json.Marshal(&c2in)
if err != nil {
return err
}
if err := ioutil.WriteFile(c.String("save-commit2-input"), b, 0664); err != nil {
log.Warnf("%+v", err)
}
}
proof, err := sb.SealCommit2(context.TODO(), i, c1o)
if err != nil {
return err
}
sealcommit2 := time.Now()
svi := abi.SealVerifyInfo{
SectorID: abi.SectorID{Miner: abi.ActorID(mid), Number: i},
SectorID: abi.SectorID{Miner: mid, Number: i},
OnChain: abi.OnChainSealVerifyInfo{
SealedCID: commR,
InteractiveEpoch: seed.Epoch,
RegisteredProof: ppt,
RegisteredProof: spt,
Proof: proof,
DealIDs: nil,
SectorNumber: i,
@ -263,11 +317,13 @@ func main() {
unseal := time.Now()
sealTimings = append(sealTimings, SealingResult{
AddPiece: addpiece.Sub(start),
PreCommit: precommit.Sub(addpiece),
Commit: sealcommit.Sub(precommit),
Verify: verifySeal.Sub(sealcommit),
Unseal: unseal.Sub(verifySeal),
AddPiece: addpiece.Sub(start),
PreCommit1: precommit1.Sub(addpiece),
PreCommit2: precommit2.Sub(precommit1),
Commit1: sealcommit1.Sub(precommit2),
Commit2: sealcommit2.Sub(sealcommit1),
Verify: verifySeal.Sub(sealcommit2),
Unseal: unseal.Sub(verifySeal),
})
}
@ -312,6 +368,7 @@ func main() {
var candidates []abi.PoStCandidate
for _, c := range fcandidates {
c.Candidate.RegisteredProof = ppt
candidates = append(candidates, c.Candidate)
}
@ -340,7 +397,7 @@ func main() {
Candidates: candidates[:1],
Proofs: proof1,
EligibleSectors: sealedSectors,
Prover: abi.ActorID(mid),
Prover: mid,
ChallengeCount: ccount,
}
ok, err := sectorbuilder.ProofVerifier.VerifyElectionPost(context.TODO(), pvi1)
@ -358,7 +415,7 @@ func main() {
Candidates: candidates[:1],
Proofs: proof2,
EligibleSectors: sealedSectors,
Prover: abi.ActorID(mid),
Prover: mid,
ChallengeCount: ccount,
}
@ -390,11 +447,13 @@ func main() {
fmt.Println(string(data))
} else {
fmt.Printf("results (%d)\n", sectorSize)
fmt.Printf("----\nresults (v23) (%d)\n", sectorSize)
if robench == "" {
fmt.Printf("seal: addPiece: %s (%s)\n", bo.SealingResults[0].AddPiece, bps(bo.SectorSize, bo.SealingResults[0].AddPiece)) // TODO: average across multiple sealings
fmt.Printf("seal: preCommit: %s (%s)\n", bo.SealingResults[0].PreCommit, bps(bo.SectorSize, bo.SealingResults[0].PreCommit))
fmt.Printf("seal: commit: %s (%s)\n", bo.SealingResults[0].Commit, bps(bo.SectorSize, bo.SealingResults[0].Commit))
fmt.Printf("seal: preCommit phase 1: %s (%s)\n", bo.SealingResults[0].PreCommit1, bps(bo.SectorSize, bo.SealingResults[0].PreCommit1))
fmt.Printf("seal: preCommit phase 2: %s (%s)\n", bo.SealingResults[0].PreCommit2, bps(bo.SectorSize, bo.SealingResults[0].PreCommit2))
fmt.Printf("seal: commit phase 1: %s (%s)\n", bo.SealingResults[0].Commit1, bps(bo.SectorSize, bo.SealingResults[0].Commit1))
fmt.Printf("seal: commit phase 2: %s (%s)\n", bo.SealingResults[0].Commit2, bps(bo.SectorSize, bo.SealingResults[0].Commit2))
fmt.Printf("seal: verify: %s\n", bo.SealingResults[0].Verify)
if !c.Bool("skip-unseal") {
fmt.Printf("unseal: %s (%s)\n", bo.SealingResults[0].Unseal, bps(bo.SectorSize, bo.SealingResults[0].Unseal))
@ -416,6 +475,78 @@ func main() {
}
}
var proveCmd = &cli.Command{
Name: "prove",
Usage: "Benchmark a proof computation",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "no-gpu",
Usage: "disable gpu usage for the benchmark run",
},
},
Action: func(c *cli.Context) error {
if c.Bool("no-gpu") {
os.Setenv("BELLMAN_NO_GPU", "1")
}
if !c.Args().Present() {
return xerrors.Errorf("Usage: lotus-bench prove [input.json]")
}
inb, err := ioutil.ReadFile(c.Args().First())
if err != nil {
return xerrors.Errorf("reading input file: %w", err)
}
var c2in Commit2In
if err := json.Unmarshal(inb, &c2in); err != nil {
return xerrors.Errorf("unmarshalling input file: %w", err)
}
if err := paramfetch.GetParams(build.ParametersJson(), c2in.SectorSize); err != nil {
return xerrors.Errorf("getting params: %w", err)
}
maddr, err := address.NewFromString(c.String("miner-addr"))
if err != nil {
return err
}
ppt, spt, err := lapi.ProofTypeFromSectorSize(abi.SectorSize(c2in.SectorSize))
if err != nil {
return err
}
cfg := &sectorbuilder.Config{
Miner: maddr,
SealProofType: spt,
PoStProofType: ppt,
}
sb, err := sectorbuilder.New(nil, cfg)
if err != nil {
return err
}
start := time.Now()
proof, err := sb.SealCommit2(context.TODO(), abi.SectorNumber(c2in.SectorNum), c2in.Phase1Out)
if err != nil {
return err
}
sealCommit2 := time.Now()
fmt.Printf("proof: %x\n", proof)
fmt.Printf("----\nresults (v23) (%d)\n", c2in.SectorSize)
dur := sealCommit2.Sub(start)
fmt.Printf("seal: commit phase 2: %s (%s)\n", dur, bps(abi.SectorSize(c2in.SectorSize), dur))
return nil
},
}
func bps(data abi.SectorSize, d time.Duration) string {
bdata := new(big.Int).SetUint64(uint64(data))
bdata = bdata.Mul(bdata, big.NewInt(time.Second.Nanoseconds()))

View File

@ -2,21 +2,11 @@ package main
import (
"os"
"sync"
paramfetch "github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/mitchellh/go-homedir"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"gopkg.in/urfave/cli.v2"
manet "github.com/multiformats/go-multiaddr-net"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/lib/lotuslog"
"github.com/filecoin-project/lotus/node/repo"
)
@ -85,91 +75,91 @@ var runCmd = &cli.Command{
Name: "run",
Usage: "Start lotus worker",
Action: func(cctx *cli.Context) error {
if !cctx.Bool("enable-gpu-proving") {
os.Setenv("BELLMAN_NO_GPU", "true")
}
/* if !cctx.Bool("enable-gpu-proving") {
os.Setenv("BELLMAN_NO_GPU", "true")
}
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return xerrors.Errorf("getting miner api: %w", err)
}
defer closer()
ctx := lcli.ReqContext(cctx)
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return xerrors.Errorf("getting miner api: %w", err)
}
defer closer()
ctx := lcli.ReqContext(cctx)
ainfo, err := lcli.GetAPIInfo(cctx, repo.StorageMiner)
if err != nil {
return xerrors.Errorf("could not get api info: %w", err)
}
_, storageAddr, err := manet.DialArgs(ainfo.Addr)
ainfo, err := lcli.GetAPIInfo(cctx, repo.StorageMiner)
if err != nil {
return xerrors.Errorf("could not get api info: %w", err)
}
_, storageAddr, err := manet.DialArgs(ainfo.Addr)
r, err := homedir.Expand(cctx.String("repo"))
if err != nil {
return err
}
r, err := homedir.Expand(cctx.String("repo"))
if err != nil {
return err
}
v, err := nodeApi.Version(ctx)
if err != nil {
return err
}
if v.APIVersion != build.APIVersion {
return xerrors.Errorf("lotus-storage-miner API version doesn't match: local: ", api.Version{APIVersion: build.APIVersion})
}
v, err := nodeApi.Version(ctx)
if err != nil {
return err
}
if v.APIVersion != build.APIVersion {
return xerrors.Errorf("lotus-storage-miner API version doesn't match: local: ", api.Version{APIVersion: build.APIVersion})
}
go func() {
<-ctx.Done()
log.Warn("Shutting down..")
}()
limiter := &limits{
workLimit: make(chan struct{}, workers),
transferLimit: make(chan struct{}, transfers),
}
act, err := nodeApi.ActorAddress(ctx)
if err != nil {
return err
}
ssize, err := nodeApi.ActorSectorSize(ctx, act)
if err != nil {
return err
}
if err := paramfetch.GetParams(build.ParametersJson(), uint64(ssize)); err != nil {
return xerrors.Errorf("get params: %w", err)
}
ppt, spt, err := api.ProofTypeFromSectorSize(ssize)
if err != nil {
return err
}
sb, err := sectorbuilder.NewStandalone(&sectorbuilder.Config{
SealProofType: spt,
PoStProofType: ppt,
Miner: act,
WorkerThreads: workers,
Paths: sectorbuilder.SimplePath(r),
})
if err != nil {
return err
}
nQueues := workers + transfers
var wg sync.WaitGroup
wg.Add(nQueues)
for i := 0; i < nQueues; i++ {
go func() {
defer wg.Done()
if err := acceptJobs(ctx, nodeApi, sb, limiter, "http://"+storageAddr, ainfo.AuthHeader(), r, cctx.Bool("no-precommit"), cctx.Bool("no-commit")); err != nil {
log.Warnf("%+v", err)
return
}
<-ctx.Done()
log.Warn("Shutting down..")
}()
}
wg.Wait()
limiter := &limits{
workLimit: make(chan struct{}, workers),
transferLimit: make(chan struct{}, transfers),
}
act, err := nodeApi.ActorAddress(ctx)
if err != nil {
return err
}
ssize, err := nodeApi.ActorSectorSize(ctx, act)
if err != nil {
return err
}
if err := paramfetch.GetParams(build.ParametersJson(), uint64(ssize)); err != nil {
return xerrors.Errorf("get params: %w", err)
}
/*ppt, spt, err := api.ProofTypeFromSectorSize(ssize)
if err != nil {
return err
}
/*sb, err := sectorbuilder.NewStandalone(&sectorbuilder.Config{
SealProofType: spt,
PoStProofType: ppt,
Miner: act,
WorkerThreads: workers,
Paths: sectorbuilder.SimplePath(r),
})
if err != nil {
return err
}
nQueues := workers + transfers
var wg sync.WaitGroup
wg.Add(nQueues)
for i := 0; i < nQueues; i++ {
go func() {
defer wg.Done()
/* if err := acceptJobs(ctx, nodeApi, sb, limiter, "http://"+storageAddr, ainfo.AuthHeader(), r, cctx.Bool("no-precommit"), cctx.Bool("no-commit")); err != nil {
log.Warnf("%+v", err)
return
}
}()
}
wg.Wait()*/
return nil
},
}

View File

@ -1,5 +1,6 @@
package main
/*
import (
"context"
"net/http"
@ -128,3 +129,4 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask)
func errRes(err error) sectorbuilder.SealRes {
return sectorbuilder.SealRes{Err: err.Error(), GoErr: err}
}
*/

View File

@ -1,5 +1,6 @@
package main
/*
import (
"fmt"
"io"
@ -174,3 +175,4 @@ func (w *worker) fetchSector(sectorID abi.SectorNumber, typ sectorbuilder.Worker
}
return nil
}
*/

View File

@ -6,18 +6,12 @@ import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
badger "github.com/ipfs/go-ds-badger2"
logging "github.com/ipfs/go-log/v2"
"github.com/mitchellh/go-homedir"
"golang.org/x/xerrors"
"gopkg.in/urfave/cli.v2"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
@ -38,7 +32,6 @@ func main() {
preSealCmd,
aggregateManifestsCmd,
aggregateSectorDirsCmd,
}
app := &cli.App{
@ -174,149 +167,6 @@ var aggregateManifestsCmd = &cli.Command{
},
}
var aggregateSectorDirsCmd = &cli.Command{
Name: "aggregate-sector-dirs",
Usage: "aggregate a set of preseal manifests into a single file",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "miner",
Usage: "Specify address of miner to aggregate sectorbuilders for",
},
&cli.StringFlag{
Name: "dest",
Usage: "specify directory to create aggregate sector store in",
},
&cli.Uint64Flag{
Name: "sector-size",
Usage: "specify size of sectors to aggregate",
Value: 32 * 1024 * 1024 * 1024,
},
},
Action: func(cctx *cli.Context) error {
if cctx.String("miner") == "" {
return fmt.Errorf("must specify miner address with --miner")
}
if cctx.String("dest") == "" {
return fmt.Errorf("must specify dest directory with --dest")
}
maddr, err := address.NewFromString(cctx.String("miner"))
if err != nil {
return err
}
destdir, err := homedir.Expand(cctx.String("dest"))
if err != nil {
return err
}
if err := os.MkdirAll(destdir, 0755); err != nil {
return err
}
agmds, err := badger.NewDatastore(filepath.Join(destdir, "badger"), nil)
if err != nil {
return err
}
defer agmds.Close()
ssize := abi.SectorSize(cctx.Uint64("sector-size"))
ppt, spt, err := lapi.ProofTypeFromSectorSize(abi.SectorSize(cctx.Uint64("sector-size")))
if err != nil {
return err
}
agsb, err := sectorbuilder.New(&sectorbuilder.Config{
Miner: maddr,
SealProofType: spt,
PoStProofType: ppt,
Paths: sectorbuilder.SimplePath(destdir),
WorkerThreads: 2,
}, namespace.Wrap(agmds, datastore.NewKey("/sectorbuilder")))
if err != nil {
return err
}
var aggrGenMiner genesis.Miner
var highestSectorID abi.SectorNumber
for _, dir := range cctx.Args().Slice() {
dir, err := homedir.Expand(dir)
if err != nil {
return xerrors.Errorf("failed to expand %q: %w", dir, err)
}
st, err := os.Stat(dir)
if err != nil {
return err
}
if !st.IsDir() {
return fmt.Errorf("%q was not a directory", dir)
}
fi, err := os.Open(filepath.Join(dir, "pre-seal-"+maddr.String()+".json"))
if err != nil {
return err
}
var genmm map[string]genesis.Miner
if err := json.NewDecoder(fi).Decode(&genmm); err != nil {
return err
}
genm, ok := genmm[maddr.String()]
if !ok {
return xerrors.Errorf("input data did not have our miner in it (%s)", maddr)
}
if genm.SectorSize != ssize {
return xerrors.Errorf("sector size mismatch in %q (%d != %d)", dir)
}
for _, s := range genm.Sectors {
if s.SectorID > highestSectorID {
highestSectorID = s.SectorID
}
}
aggrGenMiner = mergeGenMiners(aggrGenMiner, genm)
opts := badger.DefaultOptions
opts.ReadOnly = true
mds, err := badger.NewDatastore(filepath.Join(dir, "badger"), &opts)
if err != nil {
return err
}
defer mds.Close()
sb, err := sectorbuilder.New(&sectorbuilder.Config{
Miner: maddr,
SealProofType: spt,
PoStProofType: ppt,
Paths: sectorbuilder.SimplePath(dir),
WorkerThreads: 2,
}, namespace.Wrap(mds, datastore.NewKey("/sectorbuilder")))
if err != nil {
return err
}
if err := agsb.ImportFrom(sb, false); err != nil {
return xerrors.Errorf("importing sectors from %q failed: %w", dir, err)
}
}
if err := agsb.SetLastSectorNum(highestSectorID); err != nil {
return err
}
if err := seed.WriteGenesisMiner(maddr, destdir, &aggrGenMiner, nil); err != nil {
return err
}
return nil
},
}
func mergeGenMiners(a, b genesis.Miner) genesis.Miner {
if a.SectorSize != b.SectorSize {
panic("sector sizes mismatch")

View File

@ -11,25 +11,22 @@ import (
"os"
"path/filepath"
"github.com/google/uuid"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/go-sectorbuilder/fs"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
"github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
badger "github.com/ipfs/go-ds-badger2"
logging "github.com/ipfs/go-log/v2"
"github.com/multiformats/go-multihash"
"golang.org/x/xerrors"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/wallet"
"github.com/filecoin-project/lotus/genesis"
"github.com/filecoin-project/lotus/node/config"
)
var log = logging.Logger("preseal")
@ -46,24 +43,22 @@ func PreSeal(maddr address.Address, pt abi.RegisteredProof, offset abi.SectorNum
}
cfg := &sectorbuilder.Config{
Miner: maddr,
SealProofType: spt,
PoStProofType: ppt,
FallbackLastNum: offset,
Paths: sectorbuilder.SimplePath(sbroot),
WorkerThreads: 2,
Miner: maddr,
SealProofType: spt,
PoStProofType: ppt,
}
if err := os.MkdirAll(sbroot, 0775); err != nil {
return nil, nil, err
}
mds, err := badger.NewDatastore(filepath.Join(sbroot, "badger"), nil)
if err != nil {
return nil, nil, err
sbfs := &fs.Basic{
Miner: maddr,
NextID: offset,
Root: sbroot,
}
sb, err := sectorbuilder.New(cfg, namespace.Wrap(mds, datastore.NewKey("/sectorbuilder")))
sb, err := sectorbuilder.New(sbfs, cfg)
if err != nil {
return nil, nil, err
}
@ -75,7 +70,7 @@ func PreSeal(maddr address.Address, pt abi.RegisteredProof, offset abi.SectorNum
var sealedSectors []*genesis.PreSeal
for i := 0; i < sectors; i++ {
sid, err := sb.AcquireSectorNumber()
sid, err := sbfs.AcquireSectorNumber()
if err != nil {
return nil, nil, err
}
@ -90,12 +85,17 @@ func PreSeal(maddr address.Address, pt abi.RegisteredProof, offset abi.SectorNum
fmt.Printf("sector-id: %d, piece info: %v\n", sid, pi)
scid, ucid, err := sb.SealPreCommit(context.TODO(), sid, ticket, []abi.PieceInfo{pi})
in2, err := sb.SealPreCommit1(context.TODO(), sid, ticket, []abi.PieceInfo{pi})
if err != nil {
return nil, nil, xerrors.Errorf("commit: %w", err)
}
if err := sb.TrimCache(context.TODO(), sid); err != nil {
scid, ucid, err := sb.SealPreCommit2(context.TODO(), sid, in2)
if err != nil {
return nil, nil, xerrors.Errorf("commit: %w", err)
}
if err := sb.FinalizeSector(context.TODO(), sid); err != nil {
return nil, nil, xerrors.Errorf("trim cache: %w", err)
}
@ -134,8 +134,20 @@ func PreSeal(maddr address.Address, pt abi.RegisteredProof, offset abi.SectorNum
return nil, nil, xerrors.Errorf("creating deals: %w", err)
}
if err := mds.Close(); err != nil {
return nil, nil, xerrors.Errorf("closing datastore: %w", err)
{
b, err := json.MarshalIndent(&config.StorageMeta{
ID: uuid.New().String(),
Weight: 0, // read-only
CanSeal: false,
CanStore: false,
}, "", " ")
if err != nil {
return nil, nil, xerrors.Errorf("marshaling storage config: %w", err)
}
if err := ioutil.WriteFile(filepath.Join(sbroot, "sectorstore.json"), b, 0644); err != nil {
return nil, nil, xerrors.Errorf("persisting storage metadata (%s): %w", filepath.Join(sbroot, "storage.json"), err)
}
}
return miner, &minerAddr.KeyInfo, nil
@ -172,19 +184,6 @@ func WriteGenesisMiner(maddr address.Address, sbroot string, gm *genesis.Miner,
return nil
}
func commDCID(commd []byte) cid.Cid {
d, err := cid.Prefix{
Version: 1,
Codec: cid.Raw,
MhType: multihash.IDENTITY,
MhLength: len(commd),
}.Sum(commd)
if err != nil {
panic(err)
}
return d
}
func createDeals(m *genesis.Miner, k *wallet.Key, maddr address.Address, ssize abi.SectorSize) error {
for _, sector := range m.Sectors {
proposal := &market.DealProposal{

View File

@ -72,7 +72,7 @@ var infoCmd = &cli.Command{
float64(10000*uint64(len(faults))/secCounts.Pset)/100.)
}
// TODO: indicate whether the post worker is in use
/*// TODO: indicate whether the post worker is in use
wstat, err := nodeApi.WorkerStats(ctx)
if err != nil {
return err
@ -86,7 +86,7 @@ var infoCmd = &cli.Command{
fmt.Printf("\tAddPiece: %d\n", wstat.AddPieceWait)
fmt.Printf("\tPreCommit: %d\n", wstat.PreCommitWait)
fmt.Printf("\tCommit: %d\n", wstat.CommitWait)
fmt.Printf("\tUnseal: %d\n", wstat.UnsealWait)
fmt.Printf("\tUnseal: %d\n", wstat.UnsealWait)*/
ps, err := api.StateMinerPostState(ctx, maddr, types.EmptyTSK)
if err != nil {

View File

@ -3,6 +3,7 @@ package main
import (
"context"
"crypto/rand"
"encoding/binary"
"encoding/json"
"fmt"
"io/ioutil"
@ -10,20 +11,19 @@ import (
"path/filepath"
"strconv"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
miner2 "github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/builtin/power"
crypto2 "github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/google/uuid"
"github.com/filecoin-project/go-address"
cborutil "github.com/filecoin-project/go-cbor-util"
paramfetch "github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
badger "github.com/ipfs/go-ds-badger2"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/mitchellh/go-homedir"
@ -38,11 +38,11 @@ import (
"github.com/filecoin-project/lotus/genesis"
"github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/sealing"
"github.com/filecoin-project/lotus/storage/sealmgr/advmgr"
)
var initCmd = &cli.Command{
@ -77,7 +77,7 @@ var initCmd = &cli.Command{
Usage: "specify sector size to use",
Value: uint64(build.SectorSizes[0]),
},
&cli.StringFlag{
&cli.StringSliceFlag{
Name: "pre-sealed-sectors",
Usage: "specify set of presealed sectors for starting as a genesis miner",
},
@ -93,6 +93,10 @@ var initCmd = &cli.Command{
Name: "symlink-imported-sectors",
Usage: "attempt to symlink to presealed sectors instead of copying them into place",
},
&cli.BoolFlag{
Name: "no-local-storage",
Usage: "don't use storageminer repo for sector storage",
},
},
Action: func(cctx *cli.Context) error {
log.Info("Initializing lotus storage miner")
@ -159,63 +163,58 @@ var initCmd = &cli.Command{
return err
}
if pssb := cctx.String("pre-sealed-sectors"); pssb != "" {
pssb, err := homedir.Expand(pssb)
if err != nil {
return err
}
log.Infof("moving pre-sealed-sectors from %s into newly created storage miner repo", pssb)
{
lr, err := r.Lock(repo.StorageMiner)
if err != nil {
return err
}
mds, err := lr.Datastore("/metadata")
if err != nil {
return err
var sc config.StorageConfig
if pssb := cctx.StringSlice("pre-sealed-sectors"); len(pssb) != 0 {
log.Infof("Setting up storage config with presealed sector", pssb)
for _, psp := range pssb {
psp, err := homedir.Expand(psp)
if err != nil {
return err
}
sc.StoragePaths = append(sc.StoragePaths, config.LocalPath{
Path: psp,
})
}
}
bopts := badger.DefaultOptions
bopts.ReadOnly = true
oldmds, err := badger.NewDatastore(filepath.Join(pssb, "badger"), &bopts)
if err != nil {
return err
if !cctx.Bool("no-local-storage") {
b, err := json.MarshalIndent(&config.StorageMeta{
ID: uuid.New().String(),
Weight: 10,
CanSeal: true,
CanStore: true,
}, "", " ")
if err != nil {
return xerrors.Errorf("marshaling storage config: %w", err)
}
if err := ioutil.WriteFile(filepath.Join(lr.Path(), "sectorstore.json"), b, 0644); err != nil {
return xerrors.Errorf("persisting storage metadata (%s): %w", filepath.Join(lr.Path(), "storage.json"), err)
}
}
ppt, spt, err := lapi.ProofTypeFromSectorSize(ssize)
if err != nil {
return err
sc.StoragePaths = append(sc.StoragePaths, config.LocalPath{
Path: lr.Path(),
})
if err := lr.SetStorage(sc); err != nil {
return xerrors.Errorf("set storage config: %w", err)
}
oldsb, err := sectorbuilder.New(&sectorbuilder.Config{
SealProofType: spt,
PoStProofType: ppt,
WorkerThreads: 2,
Paths: sectorbuilder.SimplePath(pssb),
}, namespace.Wrap(oldmds, datastore.NewKey("/sectorbuilder")))
if err != nil {
return xerrors.Errorf("failed to open up preseal sectorbuilder: %w", err)
}
nsb, err := sectorbuilder.New(&sectorbuilder.Config{
SealProofType: spt,
PoStProofType: ppt,
WorkerThreads: 2,
Paths: sectorbuilder.SimplePath(lr.Path()),
}, namespace.Wrap(mds, datastore.NewKey("/sectorbuilder")))
if err != nil {
return xerrors.Errorf("failed to open up sectorbuilder: %w", err)
}
if err := nsb.ImportFrom(oldsb, symlink); err != nil {
return err
}
if err := lr.Close(); err != nil {
return xerrors.Errorf("unlocking repo after preseal migration: %w", err)
return err
}
}
if err := storageMinerInit(ctx, cctx, api, r); err != nil {
if err := storageMinerInit(ctx, cctx, api, r, ssize); err != nil {
log.Errorf("Failed to initialize lotus-storage-miner: %+v", err)
path, err := homedir.Expand(repoPath)
if err != nil {
@ -251,6 +250,7 @@ func migratePreSealMeta(ctx context.Context, api lapi.FullNode, metadata string,
return xerrors.Errorf("unmarshaling preseal metadata: %w", err)
}
maxSectorID := abi.SectorNumber(0)
for _, sector := range meta.Sectors {
sectorKey := datastore.NewKey(sealing.SectorStorePrefix).ChildString(fmt.Sprint(sector.SectorID))
@ -289,6 +289,10 @@ func migratePreSealMeta(ctx context.Context, api lapi.FullNode, metadata string,
return err
}
if sector.SectorID > maxSectorID {
maxSectorID = sector.SectorID
}
/* // TODO: Import deals into market
pnd, err := cborutil.AsIpld(sector.Deal)
if err != nil {
@ -319,7 +323,9 @@ func migratePreSealMeta(ctx context.Context, api lapi.FullNode, metadata string,
}*/
}
return nil
buf := make([]byte, binary.MaxVarintLen64)
size := binary.PutUvarint(buf, uint64(maxSectorID+1))
return mds.Put(datastore.NewKey("/storage/nextid"), buf[:size])
}
func findMarketDealID(ctx context.Context, api lapi.FullNode, deal market.DealProposal) (abi.DealID, error) {
@ -341,7 +347,7 @@ func findMarketDealID(ctx context.Context, api lapi.FullNode, deal market.DealPr
return 0, xerrors.New("deal not found")
}
func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode, r repo.Repo) error {
func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode, r repo.Repo, ssize abi.SectorSize) error {
lr, err := r.Lock(repo.StorageMiner)
if err != nil {
return err
@ -377,30 +383,20 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode,
return err
}
c, err := lr.Config()
ppt, spt, err := lapi.ProofTypeFromSectorSize(ssize)
if err != nil {
return err
}
cfg, ok := c.(*config.StorageMiner)
if !ok {
return xerrors.Errorf("invalid config from repo, got: %T", c)
}
scfg := sectorbuilder.SimplePath(lr.Path())
if len(cfg.SectorBuilder.Storage) > 0 {
scfg = cfg.SectorBuilder.Storage
}
sbcfg, err := modules.SectorBuilderConfig(scfg, 2, false, false)(mds, api)
smgr, err := advmgr.New(lr, &sectorbuilder.Config{
SealProofType: spt,
PoStProofType: ppt,
Miner: a,
}, nil)
if err != nil {
return xerrors.Errorf("getting genesis miner sector builder config: %w", err)
return err
}
sb, err := sectorbuilder.New(sbcfg, mds)
if err != nil {
return xerrors.Errorf("failed to set up sectorbuilder for genesis mining: %w", err)
}
epp := storage.NewElectionPoStProver(sb)
epp := storage.NewElectionPoStProver(smgr)
m := miner.NewMiner(api, epp)
{

View File

@ -6,15 +6,15 @@ Build the Lotus Binaries in debug mode, This enables the use of 1024 byte sector
make debug
```
Download the 1024 byte parameters:
Download the 2048 byte parameters:
```sh
./lotus fetch-params --proving-params 1024
./lotus fetch-params --proving-params 2048
```
Pre-seal some sectors:
```sh
./lotus-seed pre-seal --sector-size 1024 --num-sectors 2
./lotus-seed pre-seal --sector-size 2048 --num-sectors 2
```
Create the genesis block and start up the first node:
@ -34,7 +34,7 @@ Then, in another console, import the genesis miner key:
Set up the genesis miner:
```sh
./lotus-storage-miner init --genesis-miner --actor=t01000 --sector-size=1024 --pre-sealed-sectors=~/.genesis-sectors --pre-sealed-metadata=~/.genesis-sectors/pre-seal-t0101.json --nosync
./lotus-storage-miner init --genesis-miner --actor=t01000 --sector-size=2048 --pre-sealed-sectors=~/.genesis-sectors --pre-sealed-metadata=~/.genesis-sectors/pre-seal-t01000.json --nosync
```
Now, finally, start up the miner:

5
go.mod
View File

@ -11,7 +11,7 @@ require (
github.com/coreos/go-systemd/v22 v22.0.0
github.com/docker/go-units v0.4.0
github.com/filecoin-project/chain-validation v0.0.6-0.20200305212458-670d41260fd7
github.com/filecoin-project/filecoin-ffi v0.0.0-20200226205820-4da0bccccefb
github.com/filecoin-project/filecoin-ffi v0.0.0-20200226231125-fc253ccb5294
github.com/filecoin-project/go-address v0.0.2-0.20200218010043-eb9bb40ed5be
github.com/filecoin-project/go-amt-ipld/v2 v2.0.1-0.20200131012142-05d80eeccc5e
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2
@ -21,7 +21,7 @@ require (
github.com/filecoin-project/go-fil-markets v0.0.0-20200304003055-d449a980d4bd
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200218225740-47c639bab663
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200228181617-f00e2c4cc050
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200304050010-2cfac00a93e7
github.com/filecoin-project/go-statestore v0.1.0
github.com/filecoin-project/specs-actors v0.0.0-20200305205312-53bb01da9aeb
github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1
@ -105,7 +105,6 @@ require (
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
golang.org/x/tools v0.0.0-20200108195415-316d2f248479 // indirect
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543
gopkg.in/cheggaaa/pb.v1 v1.0.28
gopkg.in/urfave/cli.v2 v2.0.0-20180128182452-d3ae77c26ac8
gotest.tools v2.2.0+incompatible
launchpad.net/gocheck v0.0.0-20140225173054-000000000087 // indirect

4
go.sum
View File

@ -120,8 +120,8 @@ github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6/go.m
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200218225740-47c639bab663 h1:eYxi6vI5CyeXD15X1bB3bledDXbqKxqf0wQzTLgwYwA=
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200218225740-47c639bab663/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc=
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200226210935-4739f8749f56/go.mod h1:tzTc9BxxSbjlIzhFwm5h9oBkXKkRuLxeiWspntwnKyw=
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200228181617-f00e2c4cc050 h1:mUr2IegjC5TumVe7opY7CuXS/Ud4VCmDLidJDFeBNJQ=
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200228181617-f00e2c4cc050/go.mod h1:tzTc9BxxSbjlIzhFwm5h9oBkXKkRuLxeiWspntwnKyw=
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200304050010-2cfac00a93e7 h1:pGvvPZgIfBXAda+OGc943zFcM8Wt4GG9UVymVeyfx6c=
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200304050010-2cfac00a93e7/go.mod h1:gEQJVRVqQX8Vx02IosTdC2UA4l2MgHfG3POJEI2GIpc=
github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9 h1:k9qVR9ItcziSB2rxtlkN/MDWNlbsI6yzec+zjUatLW0=
github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig=
github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ=

View File

@ -480,12 +480,16 @@ func (c *wsConn) handleWsConn(ctx context.Context) {
c.closeChans()
c.incoming = make(chan io.Reader) // listen again for responses
go func() {
if c.connFactory == nil { // likely the server side, don't try to reconnect
return
}
var conn *websocket.Conn
for conn == nil {
time.Sleep(c.reconnectInterval)
var err error
if conn, err = c.connFactory(); err != nil {
log.Debugw("websocket connection retried failed", "error", err)
log.Debugw("websocket connection retry failed", "error", err)
}
}

View File

@ -13,18 +13,19 @@ import (
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/sealmgr"
)
type retrievalProviderNode struct {
miner *storage.Miner
sb sectorbuilder.Interface
full api.FullNode
miner *storage.Miner
sealer sealmgr.Manager
full api.FullNode
}
// NewRetrievalProviderNode returns a new node adapter for a retrieval provider that talks to the
// Lotus Node
func NewRetrievalProviderNode(miner *storage.Miner, sb sectorbuilder.Interface, full api.FullNode) retrievalmarket.RetrievalProviderNode {
return &retrievalProviderNode{miner, sb, full}
func NewRetrievalProviderNode(miner *storage.Miner, sealer sealmgr.Manager, full api.FullNode) retrievalmarket.RetrievalProviderNode {
return &retrievalProviderNode{miner, sealer, full}
}
func (rpn *retrievalProviderNode) GetMinerWorker(ctx context.Context, miner address.Address) (address.Address, error) {
@ -37,7 +38,7 @@ func (rpn *retrievalProviderNode) UnsealSector(ctx context.Context, sectorID uin
if err != nil {
return nil, err
}
return rpn.sb.ReadPieceFromSealedSector(ctx, abi.SectorNumber(sectorID), sectorbuilder.UnpaddedByteIndex(offset), abi.UnpaddedPieceSize(length), si.Ticket.Value, *si.CommD)
return rpn.sealer.ReadPieceFromSealedSector(ctx, abi.SectorNumber(sectorID), sectorbuilder.UnpaddedByteIndex(offset), abi.UnpaddedPieceSize(length), si.Ticket.Value, *si.CommD)
}
func (rpn *retrievalProviderNode) SavePaymentVoucher(ctx context.Context, paymentChannel address.Address, voucher *paych.SignedVoucher, proof []byte, expectedAmount abi.TokenAmount) (abi.TokenAmount, error) {

View File

@ -56,6 +56,8 @@ import (
"github.com/filecoin-project/lotus/paychmgr"
"github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/sealing"
"github.com/filecoin-project/lotus/storage/sealmgr"
"github.com/filecoin-project/lotus/storage/sealmgr/advmgr"
"github.com/filecoin-project/lotus/storage/sectorblocks"
)
@ -253,7 +255,14 @@ func Online() Option {
// Storage miner
ApplyIf(func(s *Settings) bool { return s.nodeType == repo.StorageMiner },
Override(new(sectorbuilder.Interface), modules.SectorBuilder),
Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig),
Override(new(advmgr.LocalStorage), From(new(repo.LockedRepo))),
Override(new(advmgr.SectorIDCounter), modules.SectorIDCounter),
Override(new(*advmgr.Manager), advmgr.New),
Override(new(sealmgr.Manager), From(new(*advmgr.Manager))),
Override(new(sectorbuilder.Prover), From(new(sealmgr.Manager))),
Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks),
Override(new(sealing.TicketFn), modules.SealTicketGen),
Override(new(*storage.Miner), modules.StorageMiner),
@ -346,30 +355,13 @@ func ConfigFullNode(c interface{}) Option {
)
}
func ConfigStorageMiner(c interface{}, lr repo.LockedRepo) Option {
func ConfigStorageMiner(c interface{}) Option {
cfg, ok := c.(*config.StorageMiner)
if !ok {
return Error(xerrors.Errorf("invalid config from repo, got: %T", c))
}
scfg := sectorbuilder.SimplePath(lr.Path())
if cfg.SectorBuilder.Path == "" {
if len(cfg.SectorBuilder.Storage) > 0 {
scfg = cfg.SectorBuilder.Storage
}
} else {
scfg = sectorbuilder.SimplePath(cfg.SectorBuilder.Path)
log.Warn("LEGACY SectorBuilder.Path FOUND IN CONFIG. Please use the new storage config")
}
return Options(
ConfigCommon(&cfg.Common),
Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig(scfg,
cfg.SectorBuilder.WorkerCount,
cfg.SectorBuilder.DisableLocalPreCommit,
cfg.SectorBuilder.DisableLocalCommit)),
)
return Options(ConfigCommon(&cfg.Common))
}
func Repo(r repo.Repo) Option {
@ -387,7 +379,7 @@ func Repo(r repo.Repo) Option {
Override(new(repo.LockedRepo), modules.LockedRepo(lr)), // module handles closing
ApplyIf(isType(repo.FullNode), ConfigFullNode(c)),
ApplyIf(isType(repo.StorageMiner), ConfigStorageMiner(c, lr)),
ApplyIf(isType(repo.StorageMiner), ConfigStorageMiner(c)),
Override(new(dtypes.MetadataDS), modules.Datastore),
Override(new(dtypes.ChainBlockstore), modules.ChainBlockstore),

View File

@ -3,8 +3,6 @@ package config
import (
"encoding"
"time"
"github.com/filecoin-project/go-sectorbuilder/fs"
)
// Common is common config between full node and miner
@ -25,7 +23,7 @@ type FullNode struct {
type StorageMiner struct {
Common
SectorBuilder SectorBuilder
Storage Storage
}
// API contains configs for API endpoint
@ -54,14 +52,7 @@ type Metrics struct {
}
// // Storage Miner
type SectorBuilder struct {
Path string // TODO: remove // FORK (-ish)
Storage []fs.PathConfig
WorkerCount uint
DisableLocalPreCommit bool
DisableLocalCommit bool
type Storage struct {
}
func defCommon() Common {
@ -95,9 +86,7 @@ func DefaultStorageMiner() *StorageMiner {
cfg := &StorageMiner{
Common: defCommon(),
SectorBuilder: SectorBuilder{
WorkerCount: 5,
},
Storage: Storage{},
}
cfg.Common.API.ListenAddress = "/ip4/127.0.0.1/tcp/2345/http"
return cfg

67
node/config/storage.go Normal file
View File

@ -0,0 +1,67 @@
package config
import (
"encoding/json"
"io"
"io/ioutil"
"os"
"golang.org/x/xerrors"
)
type LocalPath struct {
Path string
}
// .lotusstorage/storage.json
type StorageConfig struct {
StoragePaths []LocalPath
}
// [path]/metadata.json
type StorageMeta struct {
ID string
Weight uint64 // 0 = readonly
CanSeal bool
CanStore bool
}
func StorageFromFile(path string, def *StorageConfig) (*StorageConfig, error) {
file, err := os.Open(path)
switch {
case os.IsNotExist(err):
if def == nil {
return nil, xerrors.Errorf("couldn't load storage config: %w", err)
}
return def, nil
case err != nil:
return nil, err
}
defer file.Close() //nolint:errcheck // The file is RO
return StorageFromReader(file)
}
func StorageFromReader(reader io.Reader) (*StorageConfig, error) {
var cfg StorageConfig
err := json.NewDecoder(reader).Decode(&cfg)
if err != nil {
return nil, err
}
return &cfg, nil
}
func WriteStorageFile(path string, config StorageConfig) error {
b, err := json.MarshalIndent(config, "", " ")
if err != nil {
return xerrors.Errorf("marshaling storage config: %w", err)
}
if err := ioutil.WriteFile(path, b, 0644); err != nil {
return xerrors.Errorf("persisting storage config (%s): %w", path, err)
}
return nil
}

View File

@ -3,30 +3,25 @@ package impl
import (
"context"
"encoding/json"
"io"
"mime"
"net/http"
"os"
"strconv"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/gorilla/mux"
"github.com/ipfs/go-cid"
files "github.com/ipfs/go-ipfs-files"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
storagemarket "github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/go-sectorbuilder/fs"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/apistruct"
"github.com/filecoin-project/lotus/lib/tarutil"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/sealmgr/advmgr"
"github.com/filecoin-project/lotus/storage/sectorblocks"
)
@ -34,13 +29,14 @@ type StorageMinerAPI struct {
CommonAPI
SectorBuilderConfig *sectorbuilder.Config
SectorBuilder sectorbuilder.Interface
SectorBlocks *sectorblocks.SectorBlocks
//SectorBuilder sectorbuilder.Interface
SectorBlocks *sectorblocks.SectorBlocks
StorageProvider storagemarket.StorageProvider
Miner *storage.Miner
BlockMiner *miner.Miner
Full api.FullNode
StorageMgr *advmgr.Manager `optional:"true"`
}
func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) {
@ -61,115 +57,118 @@ func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) {
}
func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
panic("todo")
/* vars := mux.Vars(r)
id, err := strconv.ParseUint(vars["id"], 10, 64)
if err != nil {
log.Error("parsing sector id: ", err)
w.WriteHeader(500)
return
}
path, err := sm.SectorBuilder.SectorPath(fs.DataType(vars["type"]), abi.SectorNumber(id))
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
stat, err := os.Stat(string(path))
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
var rd io.Reader
if stat.IsDir() {
rd, err = tarutil.TarDirectory(string(path))
w.Header().Set("Content-Type", "application/x-tar")
} else {
rd, err = os.OpenFile(string(path), os.O_RDONLY, 0644)
w.Header().Set("Content-Type", "application/octet-stream")
}
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
w.WriteHeader(200)
if _, err := io.Copy(w, rd); err != nil {
log.Error(err)
return
}
}
func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
id, err := strconv.ParseUint(vars["id"], 10, 64)
if err != nil {
log.Error("parsing sector id: ", err)
w.WriteHeader(500)
return
}
// This is going to get better with worker-to-worker transfers
path, err := sm.SectorBuilder.SectorPath(fs.DataType(vars["type"]), abi.SectorNumber(id))
if err != nil {
if err != fs.ErrNotFound {
log.Error(err)
id, err := strconv.ParseUint(vars["id"], 10, 64)
if err != nil {
log.Error("parsing sector id: ", err)
w.WriteHeader(500)
return
}
path, err = sm.SectorBuilder.AllocSectorPath(fs.DataType(vars["type"]), abi.SectorNumber(id), true)
path, err := sm.SectorBuilder.SectorPath(fs.DataType(vars["type"]), abi.SectorNumber(id))
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
}
mediatype, _, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
if err := os.RemoveAll(string(path)); err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
switch mediatype {
case "application/x-tar":
if err := tarutil.ExtractTar(r.Body, string(path)); err != nil {
stat, err := os.Stat(string(path))
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
default:
if err := files.WriteTo(files.NewReaderFile(r.Body), string(path)); err != nil {
var rd io.Reader
if stat.IsDir() {
rd, err = tarutil.TarDirectory(string(path))
w.Header().Set("Content-Type", "application/x-tar")
} else {
rd, err = os.OpenFile(string(path), os.O_RDONLY, 0644)
w.Header().Set("Content-Type", "application/octet-stream")
}
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
}
w.WriteHeader(200)
log.Infof("received %s sector (%s): %d bytes", vars["type"], vars["sname"], r.ContentLength)
w.WriteHeader(200)
if _, err := io.Copy(w, rd); err != nil {
log.Error(err)
return
}*/
}
func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Request) {
panic("todo")
/* vars := mux.Vars(r)
id, err := strconv.ParseUint(vars["id"], 10, 64)
if err != nil {
log.Error("parsing sector id: ", err)
w.WriteHeader(500)
return
}
// This is going to get better with worker-to-worker transfers
path, err := sm.SectorBuilder.SectorPath(fs.DataType(vars["type"]), abi.SectorNumber(id))
if err != nil {
if err != fs.ErrNotFound {
log.Error(err)
w.WriteHeader(500)
return
}
path, err = sm.SectorBuilder.AllocSectorPath(fs.DataType(vars["type"]), abi.SectorNumber(id), true)
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
}
mediatype, _, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
if err := os.RemoveAll(string(path)); err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
switch mediatype {
case "application/x-tar":
if err := tarutil.ExtractTar(r.Body, string(path)); err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
default:
if err := files.WriteTo(files.NewReaderFile(r.Body), string(path)); err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
}
w.WriteHeader(200)
log.Infof("received %s sector (%s): %d bytes", vars["type"], vars["sname"], r.ContentLength)*/
}
/*
func (sm *StorageMinerAPI) WorkerStats(context.Context) (sectorbuilder.WorkerStats, error) {
stat := sm.SectorBuilder.WorkerStats()
return stat, nil
}
}*/
func (sm *StorageMinerAPI) ActorAddress(context.Context) (address.Address, error) {
return sm.SectorBuilderConfig.Miner, nil
@ -257,6 +256,7 @@ func (sm *StorageMinerAPI) SectorsUpdate(ctx context.Context, id abi.SectorNumbe
return sm.Miner.ForceSectorState(ctx, id, state)
}
/*
func (sm *StorageMinerAPI) WorkerQueue(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) {
return sm.SectorBuilder.AddWorker(ctx, cfg)
}
@ -264,6 +264,7 @@ func (sm *StorageMinerAPI) WorkerQueue(ctx context.Context, cfg sectorbuilder.Wo
func (sm *StorageMinerAPI) WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error {
return sm.SectorBuilder.TaskDone(ctx, task, res)
}
*/
func (sm *StorageMinerAPI) MarketImportDealData(ctx context.Context, propCid cid.Cid, path string) error {
fi, err := os.Open(path)
@ -301,4 +302,12 @@ func (sm *StorageMinerAPI) DealsImportData(ctx context.Context, deal cid.Cid, fn
return sm.StorageProvider.ImportDataForDeal(ctx, deal, fi)
}
func (sm *StorageMinerAPI) StorageAddLocal(ctx context.Context, path string) error {
if sm.StorageMgr == nil {
return xerrors.Errorf("no storage manager")
}
return sm.StorageMgr.AddLocalStorage(path)
}
var _ api.StorageMiner = &StorageMinerAPI{}

View File

@ -71,7 +71,7 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.P
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, mpool *messagepool.MessagePool) {
ctx := helpers.LifecycleCtx(mctx, lc)
msgsub, err := ps.Subscribe(build.BlocksTopic)
msgsub, err := ps.Subscribe(build.MessagesTopic)
if err != nil {
panic(err)
}

View File

@ -2,11 +2,8 @@ package modules
import (
"context"
"math"
"reflect"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/go-address"
dtgraphsync "github.com/filecoin-project/go-data-transfer/impl/graphsync"
piecefilestore "github.com/filecoin-project/go-fil-markets/filestore"
@ -18,9 +15,9 @@ import (
deals "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-fil-markets/storedcounter"
paramfetch "github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/go-sectorbuilder/fs"
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/crypto"
@ -37,10 +34,13 @@ import (
"github.com/ipfs/go-merkledag"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/mitchellh/go-homedir"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/storage/sealmgr"
"github.com/filecoin-project/lotus/storage/sealmgr/advmgr"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/gen"
@ -75,51 +75,46 @@ func GetParams(sbc *sectorbuilder.Config) error {
return nil
}
func SectorBuilderConfig(storage []fs.PathConfig, threads uint, noprecommit, nocommit bool) func(dtypes.MetadataDS, api.FullNode) (*sectorbuilder.Config, error) {
return func(ds dtypes.MetadataDS, fnapi api.FullNode) (*sectorbuilder.Config, error) {
minerAddr, err := minerAddrFromDS(ds)
if err != nil {
return nil, err
}
ssize, err := fnapi.StateMinerSectorSize(context.TODO(), minerAddr, types.EmptyTSK)
if err != nil {
return nil, err
}
for i := range storage {
storage[i].Path, err = homedir.Expand(storage[i].Path)
if err != nil {
return nil, err
}
}
if threads > math.MaxUint8 {
return nil, xerrors.Errorf("too many sectorbuilder threads specified: %d, max allowed: %d", threads, math.MaxUint8)
}
ppt, spt, err := api.ProofTypeFromSectorSize(ssize)
if err != nil {
return nil, xerrors.Errorf("bad sector size: %w", err)
}
sb := &sectorbuilder.Config{
Miner: minerAddr,
SealProofType: spt,
PoStProofType: ppt,
WorkerThreads: uint8(threads),
NoPreCommit: noprecommit,
NoCommit: nocommit,
Paths: storage,
}
return sb, nil
func SectorBuilderConfig(ds dtypes.MetadataDS, fnapi api.FullNode) (*sectorbuilder.Config, error) {
minerAddr, err := minerAddrFromDS(ds)
if err != nil {
return nil, err
}
ssize, err := fnapi.StateMinerSectorSize(context.TODO(), minerAddr, types.EmptyTSK)
if err != nil {
return nil, err
}
ppt, spt, err := api.ProofTypeFromSectorSize(ssize)
if err != nil {
return nil, xerrors.Errorf("bad sector size: %w", err)
}
sb := &sectorbuilder.Config{
Miner: minerAddr,
SealProofType: spt,
PoStProofType: ppt,
}
return sb, nil
}
func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, sb sectorbuilder.Interface, tktFn sealing.TicketFn) (*storage.Miner, error) {
type sidsc struct {
sc *storedcounter.StoredCounter
}
func (s *sidsc) Next() (abi.SectorNumber, error) {
i, err := s.sc.Next()
return abi.SectorNumber(i), err
}
func SectorIDCounter(ds dtypes.MetadataDS) advmgr.SectorIDCounter {
sc := storedcounter.New(ds, datastore.NewKey("/storage/nextid"))
return &sidsc{sc}
}
func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, sealer sealmgr.Manager, tktFn sealing.TicketFn) (*storage.Miner, error) {
maddr, err := minerAddrFromDS(ds)
if err != nil {
return nil, err
@ -132,9 +127,9 @@ func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h
return nil, err
}
fps := storage.NewFPoStScheduler(api, sb, maddr, worker)
fps := storage.NewFPoStScheduler(api, sealer, maddr, worker)
sm, err := storage.NewMiner(api, maddr, worker, h, ds, sb, tktFn)
sm, err := storage.NewMiner(api, maddr, worker, h, ds, sealer, tktFn)
if err != nil {
return nil, err
}
@ -271,15 +266,6 @@ func SetupBlockProducer(lc fx.Lifecycle, ds dtypes.MetadataDS, api api.FullNode,
return m, nil
}
func SectorBuilder(cfg *sectorbuilder.Config, ds dtypes.MetadataDS) (*sectorbuilder.SectorBuilder, error) {
sb, err := sectorbuilder.New(cfg, namespace.Wrap(ds, datastore.NewKey("/sectorbuilder")))
if err != nil {
return nil, err
}
return sb, nil
}
func SealTicketGen(fapi api.FullNode) sealing.TicketFn {
return func(ctx context.Context) (*api.SealTicket, error) {
ts, err := fapi.ChainHead(ctx)
@ -333,8 +319,8 @@ func StorageProvider(ctx helpers.MetricsCtx, fapi api.FullNode, h host.Host, ds
}
// RetrievalProvider creates a new retrieval provider attached to the provider blockstore
func RetrievalProvider(h host.Host, miner *storage.Miner, sb sectorbuilder.Interface, full api.FullNode, ds dtypes.MetadataDS, pieceStore dtypes.ProviderPieceStore, ibs dtypes.StagingBlockstore) (retrievalmarket.RetrievalProvider, error) {
adapter := retrievaladapter.NewRetrievalProviderNode(miner, sb, full)
func RetrievalProvider(h host.Host, miner *storage.Miner, sealer sealmgr.Manager, full api.FullNode, ds dtypes.MetadataDS, pieceStore dtypes.ProviderPieceStore, ibs dtypes.StagingBlockstore) (retrievalmarket.RetrievalProvider, error) {
adapter := retrievaladapter.NewRetrievalProviderNode(miner, sealer, full)
address, err := minerAddrFromDS(ds)
if err != nil {
return nil, err

View File

@ -6,13 +6,11 @@ import (
"crypto/rand"
"io/ioutil"
"net/http/httptest"
"path/filepath"
"testing"
"time"
"github.com/filecoin-project/go-fil-markets/storedcounter"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
badger "github.com/ipfs/go-ds-badger2"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
@ -39,11 +37,12 @@ import (
"github.com/filecoin-project/lotus/lib/jsonrpc"
"github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/impl"
"github.com/filecoin-project/lotus/node/modules"
modtest "github.com/filecoin-project/lotus/node/modules/testing"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/storage/sbmock"
"github.com/filecoin-project/lotus/storage/sealmgr"
"github.com/filecoin-project/lotus/storage/sealmgr/advmgr"
)
func init() {
@ -240,40 +239,24 @@ func builder(t *testing.T, nFull int, storage []int) ([]test.TestNode, []test.Te
f := fulls[full]
if _, err := f.FullNode.WalletImport(ctx, &keys[i].KeyInfo); err != nil {
return nil, nil
t.Fatal(err)
}
if err := f.FullNode.WalletSetDefault(ctx, keys[i].Address); err != nil {
return nil, nil
t.Fatal(err)
}
genMiner := maddrs[i]
wa := genms[i].Worker
storers[i] = testStorageNode(ctx, t, wa, genMiner, pk, f, mn, node.Options())
sma := storers[i].StorageMiner.(*impl.StorageMinerAPI)
psd := presealDirs[i]
mds, err := badger.NewDatastore(filepath.Join(psd, "badger"), nil)
if err != nil {
t.Fatal(err)
}
osb, err := sectorbuilder.New(&sectorbuilder.Config{
SealProofType: abi.RegisteredProof_StackedDRG2KiBSeal,
PoStProofType: abi.RegisteredProof_StackedDRG2KiBPoSt,
WorkerThreads: 2,
Miner: genMiner,
Paths: sectorbuilder.SimplePath(psd),
}, namespace.Wrap(mds, datastore.NewKey("/sectorbuilder")))
if err != nil {
t.Fatal(err)
}
if err := sma.SectorBuilder.(*sectorbuilder.SectorBuilder).ImportFrom(osb, false); err != nil {
if err := storers[i].StorageAddLocal(ctx, presealDirs[i]); err != nil {
t.Fatal(err)
}
/*
sma := storers[i].StorageMiner.(*impl.StorageMinerAPI)
psd := presealDirs[i]
*/
}
if err := mn.LinkAll(); err != nil {
@ -394,7 +377,10 @@ func mockSbBuilder(t *testing.T, nFull int, storage []int) ([]test.TestNode, []t
wa := genms[i].Worker
storers[i] = testStorageNode(ctx, t, wa, genMiner, pk, f, mn, node.Options(
node.Override(new(sectorbuilder.Interface), sbmock.NewMockSectorBuilder(5, build.SectorSizes[0])),
node.Override(new(sealmgr.Manager), func() (sealmgr.Manager, error) {
return sealmgr.NewSimpleManager(storedcounter.New(datastore.NewMapDatastore(), datastore.NewKey("/potato")), genMiner, sbmock.NewMockSectorBuilder(5, build.SectorSizes[0]))
}),
node.Unset(new(*advmgr.Manager)),
))
}

View File

@ -84,6 +84,15 @@ func Unset(typ interface{}) Option {
}
}
// From(*T) -> func(t T) T {return t}
func From(typ interface{}) interface{} {
rt := []reflect.Type{reflect.TypeOf(typ).Elem()}
ft := reflect.FuncOf(rt, rt, false)
return reflect.MakeFunc(ft, func(args []reflect.Value) (results []reflect.Value) {
return args
}).Interface()
}
// from go-ipfs
// as casts input constructor to a given interface (if a value is given, it
// wraps it into a constructor).

View File

@ -25,12 +25,13 @@ import (
)
const (
fsAPI = "api"
fsAPIToken = "token"
fsConfig = "config.toml"
fsDatastore = "datastore"
fsLock = "repo.lock"
fsKeystore = "keystore"
fsAPI = "api"
fsAPIToken = "token"
fsConfig = "config.toml"
fsStorageConfig = "storage.json"
fsDatastore = "datastore"
fsLock = "repo.lock"
fsKeystore = "keystore"
)
type RepoType int
@ -276,6 +277,18 @@ func (fsr *fsLockedRepo) Config() (interface{}, error) {
return config.FromFile(fsr.join(fsConfig), defConfForType(fsr.repoType))
}
func (fsr *fsLockedRepo) GetStorage() (config.StorageConfig, error) {
c, err := config.StorageFromFile(fsr.join(fsStorageConfig), nil)
if err != nil {
return config.StorageConfig{}, err
}
return *c, nil
}
func (fsr *fsLockedRepo) SetStorage(c config.StorageConfig) error {
return config.WriteStorageFile(fsr.join(fsStorageConfig), c)
}
func (fsr *fsLockedRepo) SetAPIEndpoint(ma multiaddr.Multiaddr) error {
if err := fsr.stillValid(); err != nil {
return err

View File

@ -7,6 +7,7 @@ import (
"github.com/multiformats/go-multiaddr"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/config"
)
var (
@ -37,6 +38,9 @@ type LockedRepo interface {
// Returns config in this repo
Config() (interface{}, error)
GetStorage() (config.StorageConfig, error)
SetStorage(config.StorageConfig) error
// SetAPIEndpoint sets the endpoint of the current API
// so it can be read by API clients
SetAPIEndpoint(multiaddr.Multiaddr) error

View File

@ -1,10 +1,13 @@
package repo
import (
"encoding/json"
"io/ioutil"
"os"
"path/filepath"
"sync"
"github.com/google/uuid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
dssync "github.com/ipfs/go-datastore/sync"
@ -12,6 +15,7 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/config"
)
type MemRepo struct {
@ -38,15 +42,53 @@ type lockedMemRepo struct {
token *byte
}
func (lmem *lockedMemRepo) GetStorage() (config.StorageConfig, error) {
return config.StorageConfig{StoragePaths: []config.LocalPath{
{Path: lmem.Path()},
}}, nil
}
func (lmem *lockedMemRepo) SetStorage(config.StorageConfig) error {
panic("implement me")
}
func (lmem *lockedMemRepo) Path() string {
lmem.Lock()
defer lmem.Unlock()
if lmem.tempDir != "" {
return lmem.tempDir
}
t, err := ioutil.TempDir(os.TempDir(), "lotus-memrepo-temp-")
if err != nil {
panic(err) // only used in tests, probably fine
}
lmem.Lock()
if lmem.t == StorageMiner {
if err := config.WriteStorageFile(filepath.Join(t, fsStorageConfig), config.StorageConfig{
StoragePaths: []config.LocalPath{
{Path: t},
}}); err != nil {
panic(err)
}
b, err := json.MarshalIndent(&config.StorageMeta{
ID: uuid.New().String(),
Weight: 10,
CanSeal: true,
CanStore: true,
}, "", " ")
if err != nil {
panic(err)
}
if err := ioutil.WriteFile(filepath.Join(t, "sectorstore.json"), b, 0644); err != nil {
panic(err)
}
}
lmem.tempDir = t
lmem.Unlock()
return t
}
@ -169,6 +211,10 @@ func (lmem *lockedMemRepo) Config() (interface{}, error) {
return lmem.mem.configF(lmem.t), nil
}
func (lmem *lockedMemRepo) Storage() (config.StorageConfig, error) {
panic("implement me")
}
func (lmem *lockedMemRepo) SetAPIEndpoint(ma multiaddr.Multiaddr) error {
if err := lmem.checkToken(); err != nil {
return err

View File

@ -90,7 +90,12 @@ func (s *FPoStScheduler) declareFaults(ctx context.Context, fc uint64, params *m
}
func (s *FPoStScheduler) checkFaults(ctx context.Context, ssi []abi.SectorNumber) ([]abi.SectorNumber, error) {
faults := s.sb.Scrub(ssi)
//faults := s.sb.Scrub(ssi)
log.Warnf("Stub checkFaults")
var faults []struct {
SectorNum abi.SectorNumber
Err error
}
declaredFaults := map[abi.SectorNumber]struct{}{}

View File

@ -22,7 +22,7 @@ const StartConfidence = 4 // TODO: config
type FPoStScheduler struct {
api storageMinerApi
sb sectorbuilder.Interface
sb sectorbuilder.Prover
actor address.Address
worker address.Address
@ -37,7 +37,7 @@ type FPoStScheduler struct {
failLk sync.Mutex
}
func NewFPoStScheduler(api storageMinerApi, sb sectorbuilder.Interface, actor address.Address, worker address.Address) *FPoStScheduler {
func NewFPoStScheduler(api storageMinerApi, sb sectorbuilder.Prover, actor address.Address, worker address.Address) *FPoStScheduler {
return &FPoStScheduler{api: api, sb: sb, actor: actor, worker: worker}
}

View File

@ -13,6 +13,8 @@ import (
"golang.org/x/xerrors"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/lotus/storage/sealmgr"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
@ -30,11 +32,11 @@ import (
var log = logging.Logger("storageminer")
type Miner struct {
api storageMinerApi
h host.Host
sb sectorbuilder.Interface
ds datastore.Batching
tktFn sealing.TicketFn
api storageMinerApi
h host.Host
sealer sealmgr.Manager
ds datastore.Batching
tktFn sealing.TicketFn
maddr address.Address
worker address.Address
@ -71,13 +73,13 @@ type storageMinerApi interface {
WalletHas(context.Context, address.Address) (bool, error)
}
func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, ds datastore.Batching, sb sectorbuilder.Interface, tktFn sealing.TicketFn) (*Miner, error) {
func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, ds datastore.Batching, sealer sealmgr.Manager, tktFn sealing.TicketFn) (*Miner, error) {
m := &Miner{
api: api,
h: h,
sb: sb,
ds: ds,
tktFn: tktFn,
api: api,
h: h,
sealer: sealer,
ds: ds,
tktFn: tktFn,
maddr: maddr,
worker: worker,
@ -92,7 +94,7 @@ func (m *Miner) Run(ctx context.Context) error {
}
evts := events.NewEvents(ctx, m.api)
m.sealing = sealing.New(m.api, evts, m.maddr, m.worker, m.ds, m.sb, m.tktFn)
m.sealing = sealing.New(m.api, evts, m.maddr, m.worker, m.ds, m.sealer, m.tktFn)
go m.sealing.Run(ctx)
@ -119,10 +121,10 @@ func (m *Miner) runPreflightChecks(ctx context.Context) error {
}
type SectorBuilderEpp struct {
sb sectorbuilder.Interface
prover sectorbuilder.Prover
}
func NewElectionPoStProver(sb sectorbuilder.Interface) *SectorBuilderEpp {
func NewElectionPoStProver(sb sectorbuilder.Prover) *SectorBuilderEpp {
return &SectorBuilderEpp{sb}
}
@ -132,7 +134,7 @@ func (epp *SectorBuilderEpp) GenerateCandidates(ctx context.Context, ssi []abi.S
start := time.Now()
var faults []abi.SectorNumber // TODO
cds, err := epp.sb.GenerateEPostCandidates(ssi, rand, faults)
cds, err := epp.prover.GenerateEPostCandidates(ssi, rand, faults)
if err != nil {
return nil, xerrors.Errorf("failed to generate candidates: %w", err)
}
@ -152,7 +154,7 @@ func (epp *SectorBuilderEpp) ComputeProof(ctx context.Context, ssi []abi.SectorI
}
start := time.Now()
proof, err := epp.sb.ComputeElectionPoSt(ssi, rand, owins)
proof, err := epp.prover.ComputeElectionPoSt(ssi, rand, owins)
if err != nil {
return nil, err
}

View File

@ -10,15 +10,16 @@ import (
"math/rand"
"sync"
ffi "github.com/filecoin-project/filecoin-ffi"
commcid "github.com/filecoin-project/go-fil-commcid"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/go-sectorbuilder/fs"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
ffi "github.com/filecoin-project/filecoin-ffi"
)
var log = logging.Logger("sbmock")
@ -114,41 +115,16 @@ func (sb *SBMock) AcquireSectorNumber() (abi.SectorNumber, error) {
return id, nil
}
func (sb *SBMock) Scrub([]abi.SectorNumber) []*sectorbuilder.Fault {
sb.lk.Lock()
mcopy := make(map[abi.SectorNumber]*sectorState)
for k, v := range sb.sectors {
mcopy[k] = v
}
sb.lk.Unlock()
var out []*sectorbuilder.Fault
for sid, ss := range mcopy {
ss.lk.Lock()
if ss.failed {
out = append(out, &sectorbuilder.Fault{
SectorNum: sid,
Err: fmt.Errorf("mock sector failed"),
})
}
ss.lk.Unlock()
}
return out
}
func (sb *SBMock) GenerateFallbackPoSt([]abi.SectorInfo, abi.PoStRandomness, []abi.SectorNumber) ([]ffi.PoStCandidateWithTicket, []abi.PoStProof, error) {
panic("NYI")
}
func (sb *SBMock) SealPreCommit(ctx context.Context, sid abi.SectorNumber, ticket abi.SealRandomness, pieces []abi.PieceInfo) (cid.Cid, cid.Cid, error) {
log.Warn("Seal PreCommit", sid)
func (sb *SBMock) SealPreCommit1(ctx context.Context, sid abi.SectorNumber, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out []byte, err error) {
sb.lk.Lock()
ss, ok := sb.sectors[sid]
sb.lk.Unlock()
if !ok {
return cid.Undef, cid.Undef, xerrors.Errorf("no sector with id %d in sectorbuilder", sid)
return nil, xerrors.Errorf("no sector with id %d in sectorbuilder", sid)
}
ss.lk.Lock()
@ -164,11 +140,11 @@ func (sb *SBMock) SealPreCommit(ctx context.Context, sid abi.SectorNumber, ticke
}
if sum != ussize {
return cid.Undef, cid.Undef, xerrors.Errorf("aggregated piece sizes don't match up: %d != %d", sum, ussize)
return nil, xerrors.Errorf("aggregated piece sizes don't match up: %d != %d", sum, ussize)
}
if ss.state != statePacking {
return cid.Undef, cid.Undef, xerrors.Errorf("cannot call pre-seal on sector not in 'packing' state")
return nil, xerrors.Errorf("cannot call pre-seal on sector not in 'packing' state")
}
opFinishWait(ctx)
@ -185,31 +161,36 @@ func (sb *SBMock) SealPreCommit(ctx context.Context, sid abi.SectorNumber, ticke
commd, err := MockVerifier.GenerateDataCommitment(abi.PaddedPieceSize(sb.sectorSize), pis)
if err != nil {
return cid.Undef, cid.Undef, err
return nil, err
}
commR := commRfromD(commd)
return commR, commd, nil
}
// so we can 'verify' that the commR comes from the commD
func commRfromD(commD cid.Cid) cid.Cid {
cc, _, err := commcid.CIDToCommitment(commD)
cc, _, err := commcid.CIDToCommitment(commd)
if err != nil {
panic(err)
}
commr := make([]byte, 32)
for i := range cc {
commr[32-(i+1)] = cc[i]
}
cc[0] ^= 'd'
return commcid.DataCommitmentV1ToCID(commr)
return cc, nil
}
func (sb *SBMock) SealCommit(ctx context.Context, sid abi.SectorNumber, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, sealedCid cid.Cid, unsealed cid.Cid) ([]byte, error) {
log.Warn("Seal Commit!", sid, sealedCid, unsealed)
func (sb *SBMock) SealPreCommit2(ctx context.Context, sid abi.SectorNumber, phase1Out []byte) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) {
db := []byte(string(phase1Out))
db[0] ^= 'd'
d := commcid.DataCommitmentV1ToCID(db)
commr := make([]byte, 32)
for i := range db {
commr[32-(i+1)] = db[i]
}
commR := commcid.DataCommitmentV1ToCID(commr)
return commR, d, nil
}
func (sb *SBMock) SealCommit1(ctx context.Context, sid abi.SectorNumber, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, sealedCid cid.Cid, unsealed cid.Cid) (output []byte, err error) {
sb.lk.Lock()
ss, ok := sb.sectors[sid]
sb.lk.Unlock()
@ -231,30 +212,19 @@ func (sb *SBMock) SealCommit(ctx context.Context, sid abi.SectorNumber, ticket a
var out [32]byte
for i := range out {
out[i] = unsealed.Bytes()[i] + sealedCid.Bytes()[31-i] - ticket[i]*seed[i]
out[i] = unsealed.Bytes()[i] + sealedCid.Bytes()[31-i] - ticket[i]*seed[i] ^ byte(sid&0xff)
}
return out[:], nil
}
func (sb *SBMock) GetPath(string, string) (string, error) {
panic("nyi")
}
func (sb *SBMock) SealCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out []byte) (proof []byte, err error) {
var out [32]byte
for i := range out {
out[i] = phase1Out[i] ^ byte(sectorNum&0xff)
}
func (sb *SBMock) CanCommit(sectorID abi.SectorNumber) (bool, error) {
return true, nil
}
func (sb *SBMock) WorkerStats() sectorbuilder.WorkerStats {
panic("nyi")
}
func (sb *SBMock) AddWorker(context.Context, sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) {
panic("nyi")
}
func (sb *SBMock) TaskDone(context.Context, uint64, sectorbuilder.SealRes) error {
panic("nyi")
return out[:], nil
}
// Test Instrumentation Methods
@ -350,22 +320,6 @@ func (sb *SBMock) FinalizeSector(context.Context, abi.SectorNumber) error {
return nil
}
func (sb *SBMock) DropStaged(context.Context, abi.SectorNumber) error {
return nil
}
func (sb *SBMock) SectorPath(typ fs.DataType, sectorID abi.SectorNumber) (fs.SectorPath, error) {
panic("implement me")
}
func (sb *SBMock) AllocSectorPath(typ fs.DataType, sectorID abi.SectorNumber, cache bool) (fs.SectorPath, error) {
panic("implement me")
}
func (sb *SBMock) ReleaseSector(fs.DataType, fs.SectorPath) {
panic("implement me")
}
func (m mockVerif) VerifyElectionPost(ctx context.Context, pvi abi.PoStVerifyInfo) (bool, error) {
panic("implement me")
}
@ -402,4 +356,4 @@ func (m mockVerif) GenerateDataCommitment(ssize abi.PaddedPieceSize, pieces []ab
var MockVerifier = mockVerif{}
var _ sectorbuilder.Verifier = MockVerifier
var _ sectorbuilder.Interface = &SBMock{}
var _ sectorbuilder.Basic = &SBMock{}

View File

@ -20,7 +20,7 @@ func TestOpFinish(t *testing.T) {
finished := make(chan struct{})
go func() {
_, _, err := sb.SealPreCommit(ctx, sid, abi.SealRandomness{}, pieces)
_, err := sb.SealPreCommit1(ctx, sid, abi.SealRandomness{}, pieces)
if err != nil {
t.Error(err)
return

View File

@ -25,7 +25,7 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID abi.SectorNumber, e
out := make([]Piece, len(sizes))
for i, size := range sizes {
ppi, err := m.sb.AddPiece(ctx, size, sectorID, m.pledgeReader(size), existingPieceSizes)
ppi, err := m.sealer.AddPiece(ctx, size, sectorID, m.pledgeReader(size), existingPieceSizes)
if err != nil {
return nil, xerrors.Errorf("add piece: %w", err)
}
@ -47,15 +47,15 @@ func (m *Sealing) PledgeSector() error {
// this, as we run everything here async, and it's cancelled when the
// command exits
size := abi.PaddedPieceSize(m.sb.SectorSize()).Unpadded()
size := abi.PaddedPieceSize(m.sealer.SectorSize()).Unpadded()
rt, _, err := api.ProofTypeFromSectorSize(m.sb.SectorSize())
_, rt, err := api.ProofTypeFromSectorSize(m.sealer.SectorSize())
if err != nil {
log.Error(err)
return
}
sid, err := m.sb.AcquireSectorNumber()
sid, err := m.sealer.NewSector()
if err != nil {
log.Errorf("%+v", err)
return

View File

@ -12,7 +12,6 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/crypto"
@ -22,6 +21,7 @@ import (
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/statemachine"
"github.com/filecoin-project/lotus/storage/sealmgr"
)
const SectorStorePrefix = "/sectors"
@ -65,19 +65,19 @@ type Sealing struct {
maddr address.Address
worker address.Address
sb sectorbuilder.Interface
sealer sealmgr.Manager
sectors *statemachine.StateGroup
tktFn TicketFn
}
func New(api sealingApi, events *events.Events, maddr address.Address, worker address.Address, ds datastore.Batching, sb sectorbuilder.Interface, tktFn TicketFn) *Sealing {
func New(api sealingApi, events *events.Events, maddr address.Address, worker address.Address, ds datastore.Batching, sealer sealmgr.Manager, tktFn TicketFn) *Sealing {
s := &Sealing{
api: api,
events: events,
maddr: maddr,
worker: worker,
sb: sb,
sealer: sealer,
tktFn: tktFn,
}
@ -104,7 +104,7 @@ func (m *Sealing) AllocatePiece(size abi.UnpaddedPieceSize) (sectorID abi.Sector
return 0, 0, xerrors.Errorf("cannot allocate unpadded piece")
}
sid, err := m.sb.AcquireSectorNumber() // TODO: Put more than one thing in a sector
sid, err := m.sealer.NewSector() // TODO: Put more than one thing in a sector
if err != nil {
return 0, 0, xerrors.Errorf("acquiring sector ID: %w", err)
}
@ -116,12 +116,12 @@ func (m *Sealing) AllocatePiece(size abi.UnpaddedPieceSize) (sectorID abi.Sector
func (m *Sealing) SealPiece(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, sectorID abi.SectorNumber, dealID abi.DealID) error {
log.Infof("Seal piece for deal %d", dealID)
ppi, err := m.sb.AddPiece(ctx, size, sectorID, r, []abi.UnpaddedPieceSize{})
ppi, err := m.sealer.AddPiece(ctx, size, sectorID, r, []abi.UnpaddedPieceSize{})
if err != nil {
return xerrors.Errorf("adding piece to sector: %w", err)
}
_, rt, err := api.ProofTypeFromSectorSize(m.sb.SectorSize())
_, rt, err := api.ProofTypeFromSectorSize(m.sealer.SectorSize())
if err != nil {
return xerrors.Errorf("bad sector size: %w", err)
}

View File

@ -5,7 +5,6 @@ import (
"github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/filecoin-project/go-sectorbuilder/fs"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
@ -26,7 +25,7 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err
allocated += piece.Size
}
ubytes := abi.PaddedPieceSize(m.sb.SectorSize()).Unpadded()
ubytes := abi.PaddedPieceSize(m.sealer.SectorSize()).Unpadded()
if allocated > ubytes {
return xerrors.Errorf("too much data in sector: %d > %d", allocated, ubytes)
@ -70,7 +69,12 @@ func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) er
return ctx.Send(SectorSealFailed{xerrors.Errorf("getting ticket failed: %w", err)})
}
sealed, unsealed, err := m.sb.SealPreCommit(ctx.Context(), sector.SectorID, ticket.Value, sector.pieceInfos())
pc1o, err := m.sealer.SealPreCommit1(ctx.Context(), sector.SectorID, ticket.Value, sector.pieceInfos())
if err != nil {
return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit failed: %w", err)})
}
sealed, unsealed, err := m.sealer.SealPreCommit2(ctx.Context(), sector.SectorID, pc1o)
if err != nil {
return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit failed: %w", err)})
}
@ -180,7 +184,12 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
log.Infof("KOMIT %d %x(%d); %x(%d); %v; r:%x; d:%x", sector.SectorID, sector.Ticket.Value, sector.Ticket.Epoch, sector.Seed.Value, sector.Seed.Epoch, sector.pieceInfos(), sector.CommR, sector.CommD)
proof, err := m.sb.SealCommit(ctx.Context(), sector.SectorID, sector.Ticket.Value, sector.Seed.Value, sector.pieceInfos(), *sector.CommR, *sector.CommD)
c2in, err := m.sealer.SealCommit1(ctx.Context(), sector.SectorID, sector.Ticket.Value, sector.Seed.Value, sector.pieceInfos(), *sector.CommR, *sector.CommD)
if err != nil {
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed: %w", err)})
}
proof, err := m.sealer.SealCommit2(ctx.Context(), sector.SectorID, c2in)
if err != nil {
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed: %w", err)})
}
@ -241,15 +250,8 @@ func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo)
func (m *Sealing) handleFinalizeSector(ctx statemachine.Context, sector SectorInfo) error {
// TODO: Maybe wait for some finality
if err := m.sb.FinalizeSector(ctx.Context(), sector.SectorID); err != nil {
if !xerrors.Is(err, fs.ErrNoSuitablePath) {
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)})
}
log.Warnf("finalize sector: %v", err)
}
if err := m.sb.DropStaged(ctx.Context(), sector.SectorID); err != nil {
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("drop staged: %w", err)})
if err := m.sealer.FinalizeSector(ctx.Context(), sector.SectorID); err != nil {
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)})
}
return ctx.Send(SectorFinalized{})

View File

@ -1,9 +1,10 @@
package sealing
import (
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/lotus/api"
)
type Piece struct {

View File

@ -0,0 +1,114 @@
package advmgr
import (
"context"
"io"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/lotus/storage/sealmgr"
)
type localWorker struct {
scfg *sectorbuilder.Config
storage *storage
}
type localWorkerPathProvider struct {
w *localWorker
}
func (l *localWorkerPathProvider) AcquireSectorNumber() (abi.SectorNumber, error) {
return 0, xerrors.Errorf("unsupported")
}
func (l *localWorkerPathProvider) FinalizeSector(abi.SectorNumber) error {
return xerrors.Errorf("unsupported")
}
func (l *localWorkerPathProvider) AcquireSector(id abi.SectorNumber, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) {
mid, err := address.IDFromAddress(l.w.scfg.Miner)
if err != nil {
return sectorbuilder.SectorPaths{}, nil, xerrors.Errorf("get miner ID: %w", err)
}
return l.w.storage.acquireSector(abi.ActorID(mid), id, existing, allocate, sealing)
}
func (l *localWorker) sb() (sectorbuilder.Basic, error) {
return sectorbuilder.New(&localWorkerPathProvider{w: l}, l.scfg)
}
func (l *localWorker) AddPiece(ctx context.Context, sz abi.UnpaddedPieceSize, sn abi.SectorNumber, r io.Reader, epcs []abi.UnpaddedPieceSize) (abi.PieceInfo, error) {
sb, err := l.sb()
if err != nil {
return abi.PieceInfo{}, err
}
return sb.AddPiece(ctx, sz, sn, r, epcs)
}
func (l *localWorker) SealPreCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out []byte, err error) {
sb, err := l.sb()
if err != nil {
return nil, err
}
return sb.SealPreCommit1(ctx, sectorNum, ticket, pieces)
}
func (l *localWorker) SealPreCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out []byte) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) {
sb, err := l.sb()
if err != nil {
return cid.Undef, cid.Undef, err
}
return sb.SealPreCommit2(ctx, sectorNum, phase1Out)
}
func (l *localWorker) SealCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, sealedCID cid.Cid, unsealedCID cid.Cid) (output []byte, err error) {
sb, err := l.sb()
if err != nil {
return nil, err
}
return sb.SealCommit1(ctx, sectorNum, ticket, seed, pieces, sealedCID, unsealedCID)
}
func (l *localWorker) SealCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out []byte) (proof []byte, err error) {
sb, err := l.sb()
if err != nil {
return nil, err
}
return sb.SealCommit2(ctx, sectorNum, phase1Out)
}
func (l *localWorker) FinalizeSector(ctx context.Context, sectorNum abi.SectorNumber) error {
sb, err := l.sb()
if err != nil {
return err
}
return sb.FinalizeSector(ctx, sectorNum)
}
func (l *localWorker) TaskTypes() map[sealmgr.TaskType]struct{} {
return map[sealmgr.TaskType]struct{}{
sealmgr.TTAddPiece: {},
sealmgr.TTPreCommit1: {},
sealmgr.TTPreCommit2: {},
sealmgr.TTCommit2: {},
}
}
func (l *localWorker) Paths() []Path {
return l.storage.local()
}
var _ Worker = &localWorker{}

View File

@ -0,0 +1,254 @@
package advmgr
import (
"context"
"io"
"github.com/ipfs/go-cid"
"github.com/mitchellh/go-homedir"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/storage/sealmgr"
)
type SectorIDCounter interface {
Next() (abi.SectorNumber, error)
}
type LocalStorage interface {
GetStorage() (config.StorageConfig, error)
SetStorage(config.StorageConfig) error
}
type Path struct {
ID string
Weight uint64
LocalPath string
CanSeal bool
CanStore bool
}
type Worker interface {
sectorbuilder.Sealer
TaskTypes() map[sealmgr.TaskType]struct{}
Paths() []Path
}
type Manager struct {
workers []Worker
scfg *sectorbuilder.Config
sc SectorIDCounter
storage *storage
sectorbuilder.Prover
}
func New(ls LocalStorage, cfg *sectorbuilder.Config, sc SectorIDCounter) (*Manager, error) {
stor := &storage{
localStorage: ls,
}
if err := stor.open(); err != nil {
return nil, err
}
mid, err := address.IDFromAddress(cfg.Miner)
if err != nil {
return nil, xerrors.Errorf("getting miner id: %w", err)
}
prover, err := sectorbuilder.New(&readonlyProvider{stor: stor, miner: abi.ActorID(mid)}, cfg)
if err != nil {
return nil, xerrors.Errorf("creating prover instance: %w", err)
}
m := &Manager{
workers: []Worker{
&localWorker{scfg: cfg, storage: stor},
},
scfg: cfg,
sc: sc,
storage: stor,
Prover: prover,
}
return m, nil
}
func (m *Manager) AddLocalStorage(path string) error {
path, err := homedir.Expand(path)
if err != nil {
return xerrors.Errorf("expanding local path: %w", err)
}
if err := m.storage.openPath(path); err != nil {
return xerrors.Errorf("opening local path: %w", err)
}
sc, err := m.storage.localStorage.GetStorage()
if err != nil {
return xerrors.Errorf("get storage config: %w", err)
}
sc.StoragePaths = append(sc.StoragePaths, config.LocalPath{Path: path})
return nil
}
func (m *Manager) SectorSize() abi.SectorSize {
sz, _ := m.scfg.SealProofType.SectorSize()
return sz
}
func (m *Manager) NewSector() (abi.SectorNumber, error) {
return m.sc.Next()
}
func (m *Manager) ReadPieceFromSealedSector(context.Context, abi.SectorNumber, sectorbuilder.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error) {
panic("implement me")
}
func (m *Manager) getWorkersByPaths(task sealmgr.TaskType, inPaths []config.StorageMeta) ([]Worker, map[int]config.StorageMeta) {
var workers []Worker
paths := map[int]config.StorageMeta{}
for i, worker := range m.workers {
if _, ok := worker.TaskTypes()[task]; !ok {
continue
}
// check if the worker has access to the path we selected
var st *config.StorageMeta
for _, p := range worker.Paths() {
for _, m := range inPaths {
if p.ID == m.ID {
if st != nil && st.Weight > p.Weight {
continue
}
p := m // copy
st = &p
}
}
}
if st == nil {
continue
}
paths[i] = *st
workers = append(workers, worker)
}
return workers, paths
}
func (m *Manager) AddPiece(ctx context.Context, sz abi.UnpaddedPieceSize, sn abi.SectorNumber, r io.Reader, existingPieces []abi.UnpaddedPieceSize) (abi.PieceInfo, error) {
// TODO: consider multiple paths vs workers when initially allocating
var best []config.StorageMeta
var err error
if len(existingPieces) == 0 { // new
best, err = m.storage.findBestAllocStorage(sectorbuilder.FTUnsealed, true)
} else { // append to existing
best, err = m.storage.findSector(m.minerID(), sn, sectorbuilder.FTUnsealed)
}
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("finding sector path: %w", err)
}
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTAddPiece, best)
if len(candidateWorkers) == 0 {
return abi.PieceInfo{}, xerrors.New("no worker found")
}
// TODO: select(candidateWorkers, ...)
// TODO: remove the sectorbuilder abstraction, pass path directly
return candidateWorkers[0].AddPiece(ctx, sz, sn, r, existingPieces)
}
func (m *Manager) SealPreCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out []byte, err error) {
// TODO: also consider where the unsealed data sits
best, err := m.storage.findBestAllocStorage(sectorbuilder.FTCache|sectorbuilder.FTSealed, true)
if err != nil {
return nil, xerrors.Errorf("finding path for sector sealing: %w", err)
}
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTPreCommit1, best)
// TODO: select(candidateWorkers, ...)
// TODO: remove the sectorbuilder abstraction, pass path directly
return candidateWorkers[0].SealPreCommit1(ctx, sectorNum, ticket, pieces)
}
func (m *Manager) SealPreCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out []byte) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) {
// TODO: allow workers to fetch the sectors
best, err := m.storage.findSector(m.minerID(), sectorNum, sectorbuilder.FTCache|sectorbuilder.FTSealed)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("finding path for sector sealing: %w", err)
}
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTPreCommit2, best)
// TODO: select(candidateWorkers, ...)
// TODO: remove the sectorbuilder abstraction, pass path directly
return candidateWorkers[0].SealPreCommit2(ctx, sectorNum, phase1Out)
}
func (m *Manager) SealCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, sealedCID cid.Cid, unsealedCID cid.Cid) (output []byte, err error) {
best, err := m.storage.findSector(m.minerID(), sectorNum, sectorbuilder.FTCache|sectorbuilder.FTSealed)
if err != nil {
return nil, xerrors.Errorf("finding path for sector sealing: %w", err)
}
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTPreCommit2, best)
// TODO: select(candidateWorkers, ...)
// TODO: remove the sectorbuilder abstraction, pass path directly
return candidateWorkers[0].SealCommit1(ctx, sectorNum, ticket, seed, pieces, sealedCID, unsealedCID)
}
func (m *Manager) SealCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out []byte) (proof []byte, err error) {
for _, worker := range m.workers {
if _, ok := worker.TaskTypes()[sealmgr.TTCommit2]; !ok {
continue
}
return worker.SealCommit2(ctx, sectorNum, phase1Out)
}
return nil, xerrors.New("no worker found")
}
func (m *Manager) FinalizeSector(ctx context.Context, sectorNum abi.SectorNumber) error {
best, err := m.storage.findSector(m.minerID(), sectorNum, sectorbuilder.FTCache|sectorbuilder.FTSealed|sectorbuilder.FTUnsealed)
if err != nil {
return xerrors.Errorf("finding sealed sector: %w", err)
}
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTPreCommit2, best) // find last worker with the sector
// TODO: Move the sector to long-term storage
return candidateWorkers[0].FinalizeSector(ctx, sectorNum)
}
func (m *Manager) minerID() abi.ActorID {
mid, err := address.IDFromAddress(m.scfg.Miner)
if err != nil {
panic(err)
}
return abi.ActorID(mid)
}
var _ sealmgr.Manager = &Manager{}

View File

@ -0,0 +1,28 @@
package advmgr
import (
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
"golang.org/x/xerrors"
)
type readonlyProvider struct {
miner abi.ActorID
stor *storage
}
func (l *readonlyProvider) AcquireSectorNumber() (abi.SectorNumber, error) {
return 0, xerrors.New("read-only provider")
}
func (l *readonlyProvider) FinalizeSector(abi.SectorNumber) error {
return xerrors.New("read-only provider")
}
func (l *readonlyProvider) AcquireSector(id abi.SectorNumber, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) {
if allocate != 0 {
return sectorbuilder.SectorPaths{}, nil, xerrors.New("read-only storage")
}
return l.stor.acquireSector(l.miner, id, existing, allocate, sealing)
}

View File

@ -0,0 +1,272 @@
package advmgr
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/lotus/node/config"
)
const metaFile = "sectorstore.json"
var pathTypes = []sectorbuilder.SectorFileType{sectorbuilder.FTUnsealed, sectorbuilder.FTSealed, sectorbuilder.FTCache}
type storage struct {
localLk sync.RWMutex
localStorage LocalStorage
paths []path
}
type path struct {
meta config.StorageMeta
local string
sectors map[abi.SectorID]sectorbuilder.SectorFileType
}
func (st *storage) openPath(p string) error {
mb, err := ioutil.ReadFile(filepath.Join(p, metaFile))
if err != nil {
return xerrors.Errorf("reading storage metadata for %s: %w", p, err)
}
var meta config.StorageMeta
if err := json.Unmarshal(mb, &meta); err != nil {
return xerrors.Errorf("unmarshalling storage metadata for %s: %w", p, err)
}
// TODO: Check existing / dedupe
out := path{
meta: meta,
local: p,
sectors: map[abi.SectorID]sectorbuilder.SectorFileType{},
}
for _, t := range pathTypes {
ents, err := ioutil.ReadDir(filepath.Join(p, t.String()))
if err != nil {
if os.IsNotExist(err) {
if err := os.MkdirAll(filepath.Join(p, t.String()), 0755); err != nil {
return xerrors.Errorf("mkdir '%s': %w", filepath.Join(p, t.String()), err)
}
continue
}
return xerrors.Errorf("listing %s: %w", filepath.Join(p, t.String()), err)
}
for _, ent := range ents {
sid, err := parseSectorID(ent.Name())
if err != nil {
return xerrors.Errorf("parse sector id %s: %w", ent.Name(), err)
}
out.sectors[sid] |= t
}
}
st.paths = append(st.paths, out)
return nil
}
func (st *storage) open() error {
st.localLk.Lock()
defer st.localLk.Unlock()
cfg, err := st.localStorage.GetStorage()
if err != nil {
return xerrors.Errorf("getting local storage config: %w", err)
}
if len(cfg.StoragePaths) == 0 {
return xerrors.New("no local storage paths configured")
}
for _, path := range cfg.StoragePaths {
err := st.openPath(path.Path)
if err != nil {
return xerrors.Errorf("opening path %s: %w", path.Path, err)
}
}
return nil
}
func (st *storage) acquireSector(mid abi.ActorID, id abi.SectorNumber, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) {
if existing|allocate != existing^allocate {
return sectorbuilder.SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
}
st.localLk.RLock()
var out sectorbuilder.SectorPaths
for _, fileType := range pathTypes {
if fileType&existing == 0 {
continue
}
for _, p := range st.paths {
s, ok := p.sectors[abi.SectorID{
Miner: mid,
Number: id,
}]
if !ok {
continue
}
if s&fileType == 0 {
continue
}
spath := filepath.Join(p.local, fileType.String(), fmt.Sprintf("s-t0%d-%d", mid, id))
switch fileType {
case sectorbuilder.FTUnsealed:
out.Unsealed = spath
case sectorbuilder.FTSealed:
out.Sealed = spath
case sectorbuilder.FTCache:
out.Cache = spath
}
existing ^= fileType
}
}
for _, fileType := range pathTypes {
if fileType&allocate == 0 {
continue
}
var best string
for _, p := range st.paths {
if sealing && !p.meta.CanSeal {
continue
}
if !sealing && !p.meta.CanStore {
continue
}
p.sectors[abi.SectorID{
Miner: mid,
Number: id,
}] |= fileType
// TODO: Check free space
// TODO: Calc weights
best = filepath.Join(p.local, fileType.String(), fmt.Sprintf("s-t0%d-%d", mid, id))
break // todo: the first path won't always be the best
}
if best == "" {
st.localLk.RUnlock()
return sectorbuilder.SectorPaths{}, nil, xerrors.Errorf("couldn't find a suitable path for a sector")
}
switch fileType {
case sectorbuilder.FTUnsealed:
out.Unsealed = best
case sectorbuilder.FTSealed:
out.Sealed = best
case sectorbuilder.FTCache:
out.Cache = best
}
allocate ^= fileType
}
return out, st.localLk.RUnlock, nil
}
func (st *storage) findBestAllocStorage(allocate sectorbuilder.SectorFileType, sealing bool) ([]config.StorageMeta, error) {
var out []config.StorageMeta
for _, p := range st.paths {
if sealing && !p.meta.CanSeal {
continue
}
if !sealing && !p.meta.CanStore {
continue
}
// TODO: filter out of space
out = append(out, p.meta)
}
if len(out) == 0 {
return nil, xerrors.New("no good path found")
}
// todo: sort by some kind of preference
return out, nil
}
func (st *storage) findSector(mid abi.ActorID, sn abi.SectorNumber, typ sectorbuilder.SectorFileType) ([]config.StorageMeta, error) {
var out []config.StorageMeta
for _, p := range st.paths {
t := p.sectors[abi.SectorID{
Miner: mid,
Number: sn,
}]
if t|typ == 0 {
continue
}
out = append(out, p.meta)
}
if len(out) == 0 {
return nil, xerrors.Errorf("sector %s/s-t0%d-%d not found", typ, mid, sn)
}
return out, nil
}
func (st *storage) local() []Path {
var out []Path
for _, p := range st.paths {
if p.local == "" {
continue
}
out = append(out, Path{
ID: p.meta.ID,
Weight: p.meta.Weight,
LocalPath: p.local,
CanSeal: p.meta.CanSeal,
CanStore: p.meta.CanStore,
})
}
return out
}
func parseSectorID(baseName string) (abi.SectorID, error) {
var n abi.SectorNumber
var mid abi.ActorID
read, err := fmt.Sscanf(baseName, "s-t0%d-%d", &mid, &n)
if err != nil {
return abi.SectorID{}, xerrors.Errorf(": %w", err)
}
if read != 2 {
return abi.SectorID{}, xerrors.Errorf("parseSectorID expected to scan 2 values, got %d", read)
}
return abi.SectorID{
Miner: mid,
Number: n,
}, nil
}

122
storage/sealmgr/simple.go Normal file
View File

@ -0,0 +1,122 @@
package sealmgr
import (
"context"
"io"
"sync"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/storedcounter"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
ffi "github.com/filecoin-project/filecoin-ffi"
)
type LocalWorker struct {
sectorbuilder.Basic
}
var _ Worker = &LocalWorker{}
// Simple implements a very basic storage manager which has one local worker,
// running one thing locally
type Simple struct {
sc *storedcounter.StoredCounter
maddr address.Address
rateLimiter sync.Mutex
worker Worker
}
type sszgetter interface {
SectorSize() abi.SectorSize
}
func (s *Simple) SectorSize() abi.SectorSize {
return s.worker.(sszgetter).SectorSize()
}
func NewSimpleManager(sc *storedcounter.StoredCounter, maddr address.Address, sb sectorbuilder.Basic) (*Simple, error) {
w := &LocalWorker{
sb,
}
return &Simple{
sc: sc,
maddr: maddr,
worker: w,
}, nil
}
func (s *Simple) NewSector() (abi.SectorNumber, error) {
n, err := s.sc.Next()
if err != nil {
return 0, xerrors.Errorf("acquire sector number: %w", err)
}
return abi.SectorNumber(n), nil
}
func (s *Simple) AddPiece(ctx context.Context, sz abi.UnpaddedPieceSize, sectorNum abi.SectorNumber, r io.Reader, existingPieces []abi.UnpaddedPieceSize) (abi.PieceInfo, error) {
s.rateLimiter.Lock()
defer s.rateLimiter.Unlock()
return s.worker.AddPiece(ctx, sz, sectorNum, r, existingPieces)
}
func (s *Simple) SealPreCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out []byte, err error) {
s.rateLimiter.Lock()
defer s.rateLimiter.Unlock()
return s.worker.SealPreCommit1(ctx, sectorNum, ticket, pieces)
}
func (s *Simple) SealPreCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out []byte) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) {
s.rateLimiter.Lock()
defer s.rateLimiter.Unlock()
return s.worker.SealPreCommit2(ctx, sectorNum, phase1Out)
}
func (s *Simple) SealCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, sealedCID cid.Cid, unsealedCID cid.Cid) (output []byte, err error) {
s.rateLimiter.Lock()
defer s.rateLimiter.Unlock()
return s.worker.SealCommit1(ctx, sectorNum, ticket, seed, pieces, sealedCID, unsealedCID)
}
func (s *Simple) SealCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out []byte) (proof []byte, err error) {
s.rateLimiter.Lock()
defer s.rateLimiter.Unlock()
return s.worker.SealCommit2(ctx, sectorNum, phase1Out)
}
func (s *Simple) FinalizeSector(ctx context.Context, sectorNum abi.SectorNumber) error {
s.rateLimiter.Lock()
defer s.rateLimiter.Unlock()
return s.worker.FinalizeSector(ctx, sectorNum)
}
func (s *Simple) GenerateEPostCandidates(sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, faults []abi.SectorNumber) ([]ffi.PoStCandidateWithTicket, error) {
return s.worker.GenerateEPostCandidates(sectorInfo, challengeSeed, faults)
}
func (s *Simple) GenerateFallbackPoSt(sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, faults []abi.SectorNumber) ([]ffi.PoStCandidateWithTicket, []abi.PoStProof, error) {
return s.worker.GenerateFallbackPoSt(sectorInfo, challengeSeed, faults)
}
func (s *Simple) ComputeElectionPoSt(sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, winners []abi.PoStCandidate) ([]abi.PoStProof, error) {
return s.worker.ComputeElectionPoSt(sectorInfo, challengeSeed, winners)
}
func (s *Simple) ReadPieceFromSealedSector(context.Context, abi.SectorNumber, sectorbuilder.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error) {
panic("todo")
}
var _ Manager = &Simple{}

10
storage/sealmgr/task.go Normal file
View File

@ -0,0 +1,10 @@
package sealmgr
type TaskType string
const (
TTAddPiece TaskType = "seal/v0/addpiece"
TTPreCommit1 TaskType = "seal/v0/precommit/1"
TTPreCommit2 TaskType = "seal/v0/precommit/2" // Commit1 is called here too
TTCommit2 TaskType = "seal/v0/commit/2"
)

32
storage/sealmgr/types.go Normal file
View File

@ -0,0 +1,32 @@
package sealmgr
import (
"context"
"io"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/ipfs/go-cid"
)
type Worker interface {
sectorbuilder.Sealer
sectorbuilder.Prover
}
type Manager interface {
SectorSize() abi.SectorSize
// NewSector allocates staging area for data
// Storage manager forwards proof-related calls
NewSector() (abi.SectorNumber, error)
// TODO: Can[Pre]Commit[1,2]
// TODO: Scrub() []Faults
// TODO: Separate iface
ReadPieceFromSealedSector(context.Context, abi.SectorNumber, sectorbuilder.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error)
sectorbuilder.Sealer
sectorbuilder.Prover
}

View File

@ -9,7 +9,6 @@ import (
"sync"
"github.com/filecoin-project/go-padreader"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
@ -51,16 +50,14 @@ func DsKeyToDealID(key datastore.Key) (uint64, error) {
type SectorBlocks struct {
*storage.Miner
sb sectorbuilder.Interface
keys datastore.Batching
keyLk sync.Mutex
}
func NewSectorBlocks(miner *storage.Miner, ds dtypes.MetadataDS, sb sectorbuilder.Interface) *SectorBlocks {
func NewSectorBlocks(miner *storage.Miner, ds dtypes.MetadataDS) *SectorBlocks {
sbc := &SectorBlocks{
Miner: miner,
sb: sb,
keys: namespace.Wrap(ds, dsPrefix),
}