Use new specs-storage interface

This commit is contained in:
Łukasz Magiera 2020-03-17 21:19:52 +01:00
parent de1c984b48
commit d8ce5078d9
23 changed files with 283 additions and 247 deletions

View File

@ -204,11 +204,11 @@ type WorkerStruct struct {
TaskTypes func(context.Context) (map[sealmgr.TaskType]struct{}, error) `perm:"admin"`
Paths func(context.Context) ([]stores.StoragePath, error) `perm:"admin"`
SealPreCommit1 func(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) `perm:"admin"`
SealPreCommit2 func(context.Context, abi.SectorNumber, storage.PreCommit1Out) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) `perm:"admin"`
SealCommit1 func(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, sealedCID cid.Cid, unsealedCID cid.Cid) (storage.Commit1Out, error) `perm:"admin"`
SealCommit2 func(context.Context, abi.SectorNumber, storage.Commit1Out) (storage.Proof, error) `perm:"admin"`
FinalizeSector func(context.Context, abi.SectorNumber) error `perm:"admin"`
SealPreCommit1 func(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) `perm:"admin"`
SealPreCommit2 func(context.Context, abi.SectorID, storage.PreCommit1Out) (cids storage.SectorCids, err error) `perm:"admin"`
SealCommit1 func(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) `perm:"admin"`
SealCommit2 func(context.Context, abi.SectorID, storage.Commit1Out) (storage.Proof, error) `perm:"admin"`
FinalizeSector func(context.Context, abi.SectorID) error `perm:"admin"`
}
}
@ -714,24 +714,24 @@ func (w *WorkerStruct) Paths(ctx context.Context) ([]stores.StoragePath, error)
return w.Internal.Paths(ctx)
}
func (w *WorkerStruct) SealPreCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) {
return w.Internal.SealPreCommit1(ctx, sectorNum, ticket, pieces)
func (w *WorkerStruct) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) {
return w.Internal.SealPreCommit1(ctx, sector, ticket, pieces)
}
func (w *WorkerStruct) SealPreCommit2(ctx context.Context, sectorNum abi.SectorNumber, p1o storage.PreCommit1Out) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) {
return w.Internal.SealPreCommit2(ctx, sectorNum, p1o)
func (w *WorkerStruct) SealPreCommit2(ctx context.Context, sector abi.SectorID, p1o storage.PreCommit1Out) (storage.SectorCids, error) {
return w.Internal.SealPreCommit2(ctx, sector, p1o)
}
func (w *WorkerStruct) SealCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, sealedCID cid.Cid, unsealedCID cid.Cid) (storage.Commit1Out, error) {
return w.Internal.SealCommit1(ctx, sectorNum, ticket, seed, pieces, sealedCID, unsealedCID)
func (w *WorkerStruct) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) {
return w.Internal.SealCommit1(ctx, sector, ticket, seed, pieces, cids)
}
func (w *WorkerStruct) SealCommit2(ctx context.Context, sectorNum abi.SectorNumber, c1o storage.Commit1Out) (storage.Proof, error) {
return w.Internal.SealCommit2(ctx, sectorNum, c1o)
func (w *WorkerStruct) SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storage.Proof, error) {
return w.Internal.SealCommit2(ctx, sector, c1o)
}
func (w *WorkerStruct) FinalizeSector(ctx context.Context, sectorNum abi.SectorNumber) error {
return w.Internal.FinalizeSector(ctx, sectorNum)
func (w *WorkerStruct) FinalizeSector(ctx context.Context, sector abi.SectorID) error {
return w.Internal.FinalizeSector(ctx, sector)
}
var _ api.Common = &CommonStruct{}

View File

@ -42,8 +42,12 @@ func PreSeal(maddr address.Address, pt abi.RegisteredProof, offset abi.SectorNum
return nil, nil, err
}
mid, err := address.IDFromAddress(maddr)
if err != nil {
return nil, nil, err
}
cfg := &sectorbuilder.Config{
Miner: maddr,
SealProofType: spt,
PoStProofType: ppt,
}
@ -55,7 +59,6 @@ func PreSeal(maddr address.Address, pt abi.RegisteredProof, offset abi.SectorNum
next := offset
sbfs := &fs.Basic{
Miner: maddr,
Root: sbroot,
}
@ -71,7 +74,7 @@ func PreSeal(maddr address.Address, pt abi.RegisteredProof, offset abi.SectorNum
var sealedSectors []*genesis.PreSeal
for i := 0; i < sectors; i++ {
sid := next
sid := abi.SectorID{Miner: abi.ActorID(mid), Number: next}
next++
pi, err := sb.AddPiece(context.TODO(), sid, nil, abi.PaddedPieceSize(ssize).Unpadded(), rand.Reader)
@ -89,7 +92,7 @@ func PreSeal(maddr address.Address, pt abi.RegisteredProof, offset abi.SectorNum
return nil, nil, xerrors.Errorf("commit: %w", err)
}
scid, ucid, err := sb.SealPreCommit2(context.TODO(), sid, in2)
cids, err := sb.SealPreCommit2(context.TODO(), sid, in2)
if err != nil {
return nil, nil, xerrors.Errorf("commit: %w", err)
}
@ -98,11 +101,11 @@ func PreSeal(maddr address.Address, pt abi.RegisteredProof, offset abi.SectorNum
return nil, nil, xerrors.Errorf("trim cache: %w", err)
}
log.Warn("PreCommitOutput: ", sid, scid, ucid)
log.Warn("PreCommitOutput: ", sid, cids.Sealed, cids.Unsealed)
sealedSectors = append(sealedSectors, &genesis.PreSeal{
CommR: scid,
CommD: ucid,
SectorID: sid,
CommR: cids.Sealed,
CommD: cids.Unsealed,
SectorID: sid.Number,
ProofType: pt,
})
}

View File

@ -7,24 +7,12 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"github.com/filecoin-project/lotus/storage/sealmgr/stores"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
miner2 "github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/builtin/power"
crypto2 "github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/google/uuid"
"github.com/filecoin-project/go-address"
cborutil "github.com/filecoin-project/go-cbor-util"
paramfetch "github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
@ -32,6 +20,17 @@ import (
"golang.org/x/xerrors"
"gopkg.in/urfave/cli.v2"
"github.com/filecoin-project/go-address"
cborutil "github.com/filecoin-project/go-cbor-util"
paramfetch "github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
miner2 "github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/builtin/power"
crypto2 "github.com/filecoin-project/specs-actors/actors/crypto"
lapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
@ -45,6 +44,7 @@ import (
"github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/sealing"
"github.com/filecoin-project/lotus/storage/sealmgr/advmgr"
"github.com/filecoin-project/lotus/storage/sealmgr/stores"
)
var initCmd = &cli.Command{
@ -392,15 +392,19 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode,
return err
}
mid, err := address.IDFromAddress(a)
if err != nil {
return xerrors.Errorf("getting id address: %w", err)
}
smgr, err := advmgr.New(lr, stores.NewIndex(), &sectorbuilder.Config{
SealProofType: spt,
PoStProofType: ppt,
Miner: a,
}, nil, nil)
}, nil)
if err != nil {
return err
}
epp := storage.NewElectionPoStProver(smgr)
epp := storage.NewElectionPoStProver(smgr, abi.ActorID(mid))
m := miner.NewMiner(api, epp)
{

4
go.mod
View File

@ -22,11 +22,11 @@ require (
github.com/filecoin-project/go-fil-markets v0.0.0-20200304003055-d449a980d4bd
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200218225740-47c639bab663
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200314022627-38af9db49ba2
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200317165603-bd9e7cb04d81
github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9
github.com/filecoin-project/go-statestore v0.1.0
github.com/filecoin-project/specs-actors v0.0.0-20200311215506-e95895452888
github.com/filecoin-project/specs-storage v0.0.0-20200303233430-1a5a408f7513
github.com/filecoin-project/specs-storage v0.0.0-20200317133846-063ba163b217
github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/google/uuid v1.1.1

4
go.sum
View File

@ -126,6 +126,8 @@ github.com/filecoin-project/go-paramfetch v0.0.2-0.20200218225740-47c639bab663/g
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200226210935-4739f8749f56/go.mod h1:tzTc9BxxSbjlIzhFwm5h9oBkXKkRuLxeiWspntwnKyw=
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200314022627-38af9db49ba2 h1:4RjDynwobd/UYlZUprRg/GMEsMP6fAfVRTXgFs4XNfo=
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200314022627-38af9db49ba2/go.mod h1:NcE+iL0bbYnamGmYQgCPVGbSaf8VF2/CLra/61B3I3I=
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200317165603-bd9e7cb04d81 h1:W5yekTpVTUiB86rSDiZo6rTI3lrLKrsrdY0tx/IqgJA=
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200317165603-bd9e7cb04d81/go.mod h1:3c3MEU9GHLlau37+MmefFNunTo9sVEKfjaJuHBgksdY=
github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9 h1:k9qVR9ItcziSB2rxtlkN/MDWNlbsI6yzec+zjUatLW0=
github.com/filecoin-project/go-statemachine v0.0.0-20200226041606-2074af6d51d9/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig=
github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ=
@ -138,6 +140,8 @@ github.com/filecoin-project/specs-actors v0.0.0-20200311215506-e95895452888 h1:V
github.com/filecoin-project/specs-actors v0.0.0-20200311215506-e95895452888/go.mod h1:5WngRgTN5Eo4+0SjCBqLzEr2l6Mj45DrP2606gBhqI0=
github.com/filecoin-project/specs-storage v0.0.0-20200303233430-1a5a408f7513 h1:okBx3lPomwDxlPmRvyP078BwivDfdxNUlpCDhDD0ia8=
github.com/filecoin-project/specs-storage v0.0.0-20200303233430-1a5a408f7513/go.mod h1:sC2Ck2l1G8hXI5Do/3sp0yxbMRMnukbFwP9KF1CRFLw=
github.com/filecoin-project/specs-storage v0.0.0-20200317133846-063ba163b217 h1:doPA79fSLg5TnY2rJhXs5dIZHP3IoCcIiCLKFGfgrY8=
github.com/filecoin-project/specs-storage v0.0.0-20200317133846-063ba163b217/go.mod h1:dUmzHS7izOD6HW3/JpzFrjxnptxbsHXBlO8puK2UzBk=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1 h1:EzDjxMg43q1tA2c0MV3tNbaontnHLplHyFF6M5KiVP0=

View File

@ -38,7 +38,17 @@ func (rpn *retrievalProviderNode) UnsealSector(ctx context.Context, sectorID uin
if err != nil {
return nil, err
}
return rpn.sealer.ReadPieceFromSealedSector(ctx, abi.SectorNumber(sectorID), sectorbuilder.UnpaddedByteIndex(offset), abi.UnpaddedPieceSize(length), si.Ticket.Value, *si.CommD)
mid, err := address.IDFromAddress(rpn.miner.Address())
if err != nil {
panic(err)
}
sid := abi.SectorID{
Miner: abi.ActorID(mid),
Number: abi.SectorNumber(sectorID),
}
return rpn.sealer.ReadPieceFromSealedSector(ctx, sid, sectorbuilder.UnpaddedByteIndex(offset), abi.UnpaddedPieceSize(length), si.Ticket.Value, *si.CommD)
}
func (rpn *retrievalProviderNode) SavePaymentVoucher(ctx context.Context, paymentChannel address.Address, voucher *paych.SignedVoucher, proof []byte, expectedAmount abi.TokenAmount) (abi.TokenAmount, error) {

View File

@ -262,7 +262,7 @@ func Online() Option {
Override(new(*stores.Index), stores.NewIndex()),
Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig),
Override(new(stores.LocalStorage), From(new(repo.LockedRepo))),
Override(new(advmgr.SectorIDCounter), modules.SectorIDCounter),
Override(new(sealing.SectorIDCounter), modules.SectorIDCounter),
Override(new(*advmgr.Manager), advmgr.New),
Override(new(sealmgr.Manager), From(new(*advmgr.Manager))),

View File

@ -59,7 +59,7 @@ func (sm *StorageMinerAPI) WorkerStats(context.Context) (sectorbuilder.WorkerSta
}*/
func (sm *StorageMinerAPI) ActorAddress(context.Context) (address.Address, error) {
return sm.SectorBuilderConfig.Miner, nil
return sm.Miner.Address(), nil
}
func (sm *StorageMinerAPI) ActorSectorSize(ctx context.Context, addr address.Address) (abi.SectorSize, error) {

View File

@ -49,7 +49,6 @@ import (
"github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/sealing"
"github.com/filecoin-project/lotus/storage/sealmgr"
"github.com/filecoin-project/lotus/storage/sealmgr/advmgr"
)
func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) {
@ -91,7 +90,6 @@ func SectorBuilderConfig(ds dtypes.MetadataDS, fnapi lapi.FullNode) (*sectorbuil
}
sb := &sectorbuilder.Config{
Miner: minerAddr,
SealProofType: spt,
PoStProofType: ppt,
}
@ -108,7 +106,7 @@ func (s *sidsc) Next() (abi.SectorNumber, error) {
return abi.SectorNumber(i), err
}
func SectorIDCounter(ds dtypes.MetadataDS) advmgr.SectorIDCounter {
func SectorIDCounter(ds dtypes.MetadataDS) sealing.SectorIDCounter {
sc := storedcounter.New(ds, datastore.NewKey("/storage/nextid"))
return &sidsc{sc}
}

View File

@ -3,6 +3,7 @@ package storage
import (
"bytes"
"context"
"github.com/filecoin-project/go-address"
"time"
"github.com/filecoin-project/specs-actors/actors/crypto"
@ -187,19 +188,24 @@ func (s *FPoStScheduler) runPost(ctx context.Context, eps abi.ChainEpoch, ts *ty
"sectors", len(ssi),
"faults", len(faults))
scandidates, proof, err := s.sb.GenerateFallbackPoSt(ssi, abi.PoStRandomness(rand), faults)
mid, err := address.IDFromAddress(s.actor)
if err != nil {
return nil, err
}
postOut, err := s.sb.GenerateFallbackPoSt(ctx, abi.ActorID(mid), ssi, abi.PoStRandomness(rand), faults)
if err != nil {
return nil, xerrors.Errorf("running post failed: %w", err)
}
if len(scandidates) == 0 {
if len(postOut.PoStInputs) == 0 {
return nil, xerrors.Errorf("received zero candidates back from generate fallback post")
}
// TODO: until we figure out how fallback post is really supposed to work,
// let's just pass a single candidate...
scandidates = scandidates[:1]
proof = proof[:1]
scandidates := postOut.PoStInputs[:1]
proof := postOut.Proof[:1]
elapsed := time.Since(tsStart)
log.Infow("submitting PoSt", "pLen", len(proof), "elapsed", elapsed)

View File

@ -121,10 +121,11 @@ func (m *Miner) runPreflightChecks(ctx context.Context) error {
type SectorBuilderEpp struct {
prover storage.Prover
miner abi.ActorID
}
func NewElectionPoStProver(sb storage.Prover) *SectorBuilderEpp {
return &SectorBuilderEpp{sb}
func NewElectionPoStProver(sb storage.Prover, miner abi.ActorID) *SectorBuilderEpp {
return &SectorBuilderEpp{sb, miner}
}
var _ gen.ElectionPoStProver = (*SectorBuilderEpp)(nil)
@ -133,7 +134,7 @@ func (epp *SectorBuilderEpp) GenerateCandidates(ctx context.Context, ssi []abi.S
start := time.Now()
var faults []abi.SectorNumber // TODO
cds, err := epp.prover.GenerateEPostCandidates(ssi, rand, faults)
cds, err := epp.prover.GenerateEPostCandidates(ctx, epp.miner, ssi, rand, faults)
if err != nil {
return nil, xerrors.Errorf("failed to generate candidates: %w", err)
}
@ -153,7 +154,7 @@ func (epp *SectorBuilderEpp) ComputeProof(ctx context.Context, ssi []abi.SectorI
}
start := time.Now()
proof, err := epp.prover.ComputeElectionPoSt(ssi, rand, owins)
proof, err := epp.prover.ComputeElectionPoSt(ctx, epp.miner, ssi, rand, owins)
if err != nil {
return nil, err
}

View File

@ -24,7 +24,7 @@ import (
var log = logging.Logger("sbmock")
type SBMock struct {
sectors map[abi.SectorNumber]*sectorState
sectors map[abi.SectorID]*sectorState
sectorSize abi.SectorSize
nextSectorID abi.SectorNumber
rateLimit chan struct{}
@ -42,7 +42,7 @@ func NewMockSectorBuilder(threads int, ssize abi.SectorSize) *SBMock {
}
return &SBMock{
sectors: make(map[abi.SectorNumber]*sectorState),
sectors: make(map[abi.SectorID]*sectorState),
sectorSize: ssize,
nextSectorID: 5,
rateLimit: make(chan struct{}, threads),
@ -74,7 +74,11 @@ func (sb *SBMock) RateLimit() func() {
}
}
func (sb *SBMock) AddPiece(ctx context.Context, sectorId abi.SectorNumber, existingPieces []abi.UnpaddedPieceSize, size abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
func (sb *SBMock) NewSector(ctx context.Context, sector abi.SectorID) error {
return nil
}
func (sb *SBMock) AddPiece(ctx context.Context, sectorId abi.SectorID, existingPieces []abi.UnpaddedPieceSize, size abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
log.Warn("Add piece: ", sectorId, size, sb.proofType)
sb.lk.Lock()
ss, ok := sb.sectors[sectorId]
@ -114,11 +118,7 @@ func (sb *SBMock) AcquireSectorNumber() (abi.SectorNumber, error) {
return id, nil
}
func (sb *SBMock) GenerateFallbackPoSt([]abi.SectorInfo, abi.PoStRandomness, []abi.SectorNumber) ([]storage.PoStCandidateWithTicket, []abi.PoStProof, error) {
panic("NYI")
}
func (sb *SBMock) SealPreCommit1(ctx context.Context, sid abi.SectorNumber, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
func (sb *SBMock) SealPreCommit1(ctx context.Context, sid abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
sb.lk.Lock()
ss, ok := sb.sectors[sid]
sb.lk.Unlock()
@ -173,7 +173,7 @@ func (sb *SBMock) SealPreCommit1(ctx context.Context, sid abi.SectorNumber, tick
return cc, nil
}
func (sb *SBMock) SealPreCommit2(ctx context.Context, sid abi.SectorNumber, phase1Out storage.PreCommit1Out) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) {
func (sb *SBMock) SealPreCommit2(ctx context.Context, sid abi.SectorID, phase1Out storage.PreCommit1Out) (cids storage.SectorCids, err error) {
db := []byte(string(phase1Out))
db[0] ^= 'd'
@ -186,10 +186,13 @@ func (sb *SBMock) SealPreCommit2(ctx context.Context, sid abi.SectorNumber, phas
commR := commcid.DataCommitmentV1ToCID(commr)
return commR, d, nil
return storage.SectorCids{
Unsealed: d,
Sealed: commR,
}, nil
}
func (sb *SBMock) SealCommit1(ctx context.Context, sid abi.SectorNumber, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, sealedCid cid.Cid, unsealed cid.Cid) (output storage.Commit1Out, err error) {
func (sb *SBMock) SealCommit1(ctx context.Context, sid abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (output storage.Commit1Out, err error) {
sb.lk.Lock()
ss, ok := sb.sectors[sid]
sb.lk.Unlock()
@ -211,16 +214,16 @@ func (sb *SBMock) SealCommit1(ctx context.Context, sid abi.SectorNumber, ticket
var out [32]byte
for i := range out {
out[i] = unsealed.Bytes()[i] + sealedCid.Bytes()[31-i] - ticket[i]*seed[i] ^ byte(sid&0xff)
out[i] = cids.Unsealed.Bytes()[i] + cids.Sealed.Bytes()[31-i] - ticket[i]*seed[i] ^ byte(sid.Number&0xff)
}
return out[:], nil
}
func (sb *SBMock) SealCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out storage.Commit1Out) (proof storage.Proof, err error) {
func (sb *SBMock) SealCommit2(ctx context.Context, sid abi.SectorID, phase1Out storage.Commit1Out) (proof storage.Proof, err error) {
var out [32]byte
for i := range out {
out[i] = phase1Out[i] ^ byte(sectorNum&0xff)
out[i] = phase1Out[i] ^ byte(sid.Number&0xff)
}
return out[:], nil
@ -228,7 +231,7 @@ func (sb *SBMock) SealCommit2(ctx context.Context, sectorNum abi.SectorNumber, p
// Test Instrumentation Methods
func (sb *SBMock) FailSector(sid abi.SectorNumber) error {
func (sb *SBMock) FailSector(sid abi.SectorID) error {
sb.lk.Lock()
defer sb.lk.Unlock()
ss, ok := sb.sectors[sid]
@ -256,11 +259,15 @@ func AddOpFinish(ctx context.Context) (context.Context, func()) {
}
}
func (sb *SBMock) ComputeElectionPoSt(sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, winners []abi.PoStCandidate) ([]abi.PoStProof, error) {
func (sb *SBMock) GenerateFallbackPoSt(context.Context, abi.ActorID, []abi.SectorInfo, abi.PoStRandomness, []abi.SectorNumber) (storage.FallbackPostOut, error) {
panic("implement me")
}
func (sb *SBMock) GenerateEPostCandidates(sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, faults []abi.SectorNumber) ([]storage.PoStCandidateWithTicket, error) {
func (sb *SBMock) ComputeElectionPoSt(ctx context.Context, mid abi.ActorID, sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, winners []abi.PoStCandidate) ([]abi.PoStProof, error) {
panic("implement me")
}
func (sb *SBMock) GenerateEPostCandidates(ctx context.Context, mid abi.ActorID, sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, faults []abi.SectorNumber) ([]storage.PoStCandidateWithTicket, error) {
if len(faults) > 0 {
panic("todo")
}
@ -280,7 +287,7 @@ func (sb *SBMock) GenerateEPostCandidates(sectorInfo []abi.SectorInfo, challenge
Candidate: abi.PoStCandidate{
SectorID: abi.SectorID{
Number: abi.SectorNumber((int(start) + i) % len(sectorInfo)),
Miner: 1125125, //TODO
Miner: mid,
},
PartialTicket: abi.PartialTicket(challengeSeed),
},
@ -290,14 +297,14 @@ func (sb *SBMock) GenerateEPostCandidates(sectorInfo []abi.SectorInfo, challenge
return out, nil
}
func (sb *SBMock) ReadPieceFromSealedSector(ctx context.Context, sectorID abi.SectorNumber, offset sectorbuilder.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, commD cid.Cid) (io.ReadCloser, error) {
func (sb *SBMock) ReadPieceFromSealedSector(ctx context.Context, sectorID abi.SectorID, offset sectorbuilder.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, commD cid.Cid) (io.ReadCloser, error) {
if len(sb.sectors[sectorID].pieces) > 1 {
panic("implme")
}
return ioutil.NopCloser(io.LimitReader(bytes.NewReader(sb.sectors[sectorID].pieces[0].Bytes()[offset:]), int64(size))), nil
}
func (sb *SBMock) StageFakeData() (abi.SectorNumber, []abi.PieceInfo, error) {
func (sb *SBMock) StageFakeData(mid abi.ActorID) (abi.SectorNumber, []abi.PieceInfo, error) {
usize := abi.PaddedPieceSize(sb.sectorSize).Unpadded()
sid, err := sb.AcquireSectorNumber()
if err != nil {
@ -307,7 +314,10 @@ func (sb *SBMock) StageFakeData() (abi.SectorNumber, []abi.PieceInfo, error) {
buf := make([]byte, usize)
rand.Read(buf)
pi, err := sb.AddPiece(context.TODO(), sid, nil, usize, bytes.NewReader(buf))
pi, err := sb.AddPiece(context.TODO(), abi.SectorID{
Miner: mid,
Number: sid,
}, nil, usize, bytes.NewReader(buf))
if err != nil {
return 0, nil, err
}
@ -315,7 +325,7 @@ func (sb *SBMock) StageFakeData() (abi.SectorNumber, []abi.PieceInfo, error) {
return sid, []abi.PieceInfo{pi}, nil
}
func (sb *SBMock) FinalizeSector(context.Context, abi.SectorNumber) error {
func (sb *SBMock) FinalizeSector(context.Context, abi.SectorID) error {
return nil
}

View File

@ -2,6 +2,7 @@ package storage
import (
"context"
"github.com/filecoin-project/go-address"
"io"
"github.com/filecoin-project/specs-actors/actors/abi"
@ -12,6 +13,10 @@ import (
// TODO: refactor this to be direct somehow
func (m *Miner) Address() address.Address {
return m.sealing.Address()
}
func (m *Miner) AllocatePiece(size abi.UnpaddedPieceSize) (sectorID abi.SectorNumber, offset uint64, err error) {
return m.sealing.AllocatePiece(size)
}

View File

@ -16,7 +16,7 @@ func (m *Sealing) pledgeReader(size abi.UnpaddedPieceSize) io.Reader {
return io.LimitReader(&nullreader.Reader{}, int64(size))
}
func (m *Sealing) pledgeSector(ctx context.Context, sectorID abi.SectorNumber, existingPieceSizes []abi.UnpaddedPieceSize, sizes ...abi.UnpaddedPieceSize) ([]Piece, error) {
func (m *Sealing) pledgeSector(ctx context.Context, sectorID abi.SectorID, existingPieceSizes []abi.UnpaddedPieceSize, sizes ...abi.UnpaddedPieceSize) ([]Piece, error) {
if len(sizes) == 0 {
return nil, nil
}
@ -55,13 +55,18 @@ func (m *Sealing) PledgeSector() error {
return
}
sid, err := m.sealer.NewSector()
sid, err := m.sc.Next()
if err != nil {
log.Errorf("%+v", err)
return
}
err = m.sealer.NewSector(ctx, m.minerSector(sid))
if err != nil {
log.Errorf("%+v", err)
return
}
pieces, err := m.pledgeSector(ctx, sid, []abi.UnpaddedPieceSize{}, size)
pieces, err := m.pledgeSector(ctx, m.minerSector(sid), []abi.UnpaddedPieceSize{}, size)
if err != nil {
log.Errorf("%+v", err)
return

View File

@ -30,6 +30,10 @@ var log = logging.Logger("sectors")
type TicketFn func(context.Context) (*api.SealTicket, error)
type SectorIDCounter interface {
Next() (abi.SectorNumber, error)
}
type sealingApi interface { // TODO: trim down
// Call a read only method on actors (no interaction with the chain required)
StateCall(context.Context, *types.Message, types.TipSetKey) (*api.InvocResult, error)
@ -68,6 +72,7 @@ type Sealing struct {
sealer sealmgr.Manager
sectors *statemachine.StateGroup
tktFn TicketFn
sc SectorIDCounter
}
func New(api sealingApi, events *events.Events, maddr address.Address, worker address.Address, ds datastore.Batching, sealer sealmgr.Manager, tktFn TicketFn) *Sealing {
@ -104,9 +109,14 @@ func (m *Sealing) AllocatePiece(size abi.UnpaddedPieceSize) (sectorID abi.Sector
return 0, 0, xerrors.Errorf("cannot allocate unpadded piece")
}
sid, err := m.sealer.NewSector() // TODO: Put more than one thing in a sector
sid, err := m.sc.Next()
if err != nil {
return 0, 0, xerrors.Errorf("acquiring sector ID: %w", err)
return 0, 0, xerrors.Errorf("getting sector number: %w", err)
}
err = m.sealer.NewSector(context.TODO(), m.minerSector(sid)) // TODO: Put more than one thing in a sector
if err != nil {
return 0, 0, xerrors.Errorf("initializing sector: %w", err)
}
// offset hard-coded to 0 since we only put one thing in a sector for now
@ -116,7 +126,7 @@ func (m *Sealing) AllocatePiece(size abi.UnpaddedPieceSize) (sectorID abi.Sector
func (m *Sealing) SealPiece(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, sectorID abi.SectorNumber, dealID abi.DealID) error {
log.Infof("Seal piece for deal %d", dealID)
ppi, err := m.sealer.AddPiece(ctx, sectorID, []abi.UnpaddedPieceSize{}, size, r)
ppi, err := m.sealer.AddPiece(ctx, m.minerSector(sectorID), []abi.UnpaddedPieceSize{}, size, r)
if err != nil {
return xerrors.Errorf("adding piece to sector: %w", err)
}
@ -144,3 +154,19 @@ func (m *Sealing) newSector(sid abi.SectorNumber, rt abi.RegisteredProof, pieces
sectorType: rt,
})
}
func (m *Sealing) minerSector(num abi.SectorNumber) abi.SectorID {
mid, err := address.IDFromAddress(m.maddr)
if err != nil {
panic(err)
}
return abi.SectorID{
Number: num,
Miner: abi.ActorID(mid),
}
}
func (m *Sealing) Address() address.Address {
return m.maddr
}

View File

@ -2,6 +2,7 @@ package sealing
import (
"context"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/specs-actors/actors/crypto"
@ -40,7 +41,7 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err
log.Warnf("Creating %d filler pieces for sector %d", len(fillerSizes), sector.SectorID)
}
pieces, err := m.pledgeSector(ctx.Context(), sector.SectorID, sector.existingPieces(), fillerSizes...)
pieces, err := m.pledgeSector(ctx.Context(), m.minerSector(sector.SectorID), sector.existingPieces(), fillerSizes...)
if err != nil {
return xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err)
}
@ -69,19 +70,19 @@ func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) er
return ctx.Send(SectorSealFailed{xerrors.Errorf("getting ticket failed: %w", err)})
}
pc1o, err := m.sealer.SealPreCommit1(ctx.Context(), sector.SectorID, ticket.Value, sector.pieceInfos())
pc1o, err := m.sealer.SealPreCommit1(ctx.Context(), m.minerSector(sector.SectorID), ticket.Value, sector.pieceInfos())
if err != nil {
return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit failed: %w", err)})
}
sealed, unsealed, err := m.sealer.SealPreCommit2(ctx.Context(), sector.SectorID, pc1o)
cids, err := m.sealer.SealPreCommit2(ctx.Context(), m.minerSector(sector.SectorID), pc1o)
if err != nil {
return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit failed: %w", err)})
}
return ctx.Send(SectorSealed{
commD: unsealed,
commR: sealed,
commD: cids.Unsealed,
commR: cids.Sealed,
ticket: *ticket,
})
}
@ -184,12 +185,16 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
log.Infof("KOMIT %d %x(%d); %x(%d); %v; r:%x; d:%x", sector.SectorID, sector.Ticket.Value, sector.Ticket.Epoch, sector.Seed.Value, sector.Seed.Epoch, sector.pieceInfos(), sector.CommR, sector.CommD)
c2in, err := m.sealer.SealCommit1(ctx.Context(), sector.SectorID, sector.Ticket.Value, sector.Seed.Value, sector.pieceInfos(), *sector.CommR, *sector.CommD)
cids := storage.SectorCids{
Unsealed: *sector.CommD,
Sealed: *sector.CommR,
}
c2in, err := m.sealer.SealCommit1(ctx.Context(), m.minerSector(sector.SectorID), sector.Ticket.Value, sector.Seed.Value, sector.pieceInfos(), cids)
if err != nil {
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed: %w", err)})
}
proof, err := m.sealer.SealCommit2(ctx.Context(), sector.SectorID, c2in)
proof, err := m.sealer.SealCommit2(ctx.Context(), m.minerSector(sector.SectorID), c2in)
if err != nil {
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed: %w", err)})
}
@ -250,7 +255,7 @@ func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo)
func (m *Sealing) handleFinalizeSector(ctx statemachine.Context, sector SectorInfo) error {
// TODO: Maybe wait for some finality
if err := m.sealer.FinalizeSector(ctx.Context(), sector.SectorID); err != nil {
if err := m.sealer.FinalizeSector(ctx.Context(), m.minerSector(sector.SectorID)); err != nil {
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)})
}

View File

@ -4,9 +4,6 @@ import (
"context"
"io"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
@ -35,7 +32,6 @@ func NewLocalWorker(ma address.Address, spt abi.RegisteredProof, store stores.St
scfg: &sectorbuilder.Config{
SealProofType: spt,
PoStProofType: ppt,
Miner: ma,
},
storage: store,
localStore: local,
@ -47,17 +43,7 @@ type localWorkerPathProvider struct {
w *LocalWorker
}
func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, id abi.SectorNumber, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) {
mid, err := address.IDFromAddress(l.w.scfg.Miner)
if err != nil {
return sectorbuilder.SectorPaths{}, nil, xerrors.Errorf("get miner ID: %w", err)
}
sector := abi.SectorID{
Miner: abi.ActorID(mid),
Number: id,
}
func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) {
paths, storageIDs, done, err := l.w.storage.AcquireSector(ctx, sector, existing, allocate, sealing)
if err != nil {
return sectorbuilder.SectorPaths{}, nil, err
@ -84,58 +70,68 @@ func (l *LocalWorker) sb() (sectorbuilder.Basic, error) {
return sectorbuilder.New(&localWorkerPathProvider{w: l}, l.scfg)
}
func (l *LocalWorker) AddPiece(ctx context.Context, sn abi.SectorNumber, epcs []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
sb, err := l.sb()
if err != nil {
return abi.PieceInfo{}, err
}
return sb.AddPiece(ctx, sn, epcs, sz, r)
}
func (l *LocalWorker) SealPreCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage2.PreCommit1Out, err error) {
sb, err := l.sb()
if err != nil {
return nil, err
}
return sb.SealPreCommit1(ctx, sectorNum, ticket, pieces)
}
func (l *LocalWorker) SealPreCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out storage2.PreCommit1Out) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) {
sb, err := l.sb()
if err != nil {
return cid.Undef, cid.Undef, err
}
return sb.SealPreCommit2(ctx, sectorNum, phase1Out)
}
func (l *LocalWorker) SealCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, sealedCID cid.Cid, unsealedCID cid.Cid) (output storage2.Commit1Out, err error) {
sb, err := l.sb()
if err != nil {
return nil, err
}
return sb.SealCommit1(ctx, sectorNum, ticket, seed, pieces, sealedCID, unsealedCID)
}
func (l *LocalWorker) SealCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out storage2.Commit1Out) (proof storage2.Proof, err error) {
sb, err := l.sb()
if err != nil {
return nil, err
}
return sb.SealCommit2(ctx, sectorNum, phase1Out)
}
func (l *LocalWorker) FinalizeSector(ctx context.Context, sectorNum abi.SectorNumber) error {
func (l *LocalWorker) NewSector(ctx context.Context, sector abi.SectorID) error {
sb, err := l.sb()
if err != nil {
return err
}
return sb.FinalizeSector(ctx, sectorNum)
return sb.NewSector(ctx, sector)
}
func (l *LocalWorker) AddPiece(ctx context.Context, sector abi.SectorID, epcs []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
sb, err := l.sb()
if err != nil {
return abi.PieceInfo{}, err
}
return sb.AddPiece(ctx, sector, epcs, sz, r)
}
func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage2.PreCommit1Out, err error) {
sb, err := l.sb()
if err != nil {
return nil, err
}
return sb.SealPreCommit1(ctx, sector, ticket, pieces)
}
func (l *LocalWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.PreCommit1Out) (cids storage2.SectorCids, err error) {
sb, err := l.sb()
if err != nil {
return storage2.SectorCids{}, err
}
return sb.SealPreCommit2(ctx, sector, phase1Out)
}
func (l *LocalWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage2.SectorCids) (output storage2.Commit1Out, err error) {
sb, err := l.sb()
if err != nil {
return nil, err
}
return sb.SealCommit1(ctx, sector, ticket, seed, pieces, cids)
}
func (l *LocalWorker) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.Commit1Out) (proof storage2.Proof, err error) {
sb, err := l.sb()
if err != nil {
return nil, err
}
return sb.SealCommit2(ctx, sector, phase1Out)
}
func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) error {
sb, err := l.sb()
if err != nil {
return err
}
return sb.FinalizeSector(ctx, sector)
}
func (l *LocalWorker) TaskTypes(context.Context) (map[sealmgr.TaskType]struct{}, error) {

View File

@ -10,8 +10,6 @@ import (
"github.com/mitchellh/go-homedir"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/storage/sealmgr/stores"
@ -26,10 +24,6 @@ var log = logging.Logger("advmgr")
type URLs []string
type SectorIDCounter interface {
Next() (abi.SectorNumber, error)
}
type Worker interface {
sectorbuilder.Sealer
@ -40,7 +34,6 @@ type Worker interface {
type Manager struct {
workers []Worker
scfg *sectorbuilder.Config
sc SectorIDCounter
ls stores.LocalStorage
storage *stores.Local
@ -49,7 +42,7 @@ type Manager struct {
storage2.Prover
}
func New(ls stores.LocalStorage, si *stores.Index, cfg *sectorbuilder.Config, sc SectorIDCounter, urls URLs) (*Manager, error) {
func New(ls stores.LocalStorage, si *stores.Index, cfg *sectorbuilder.Config, urls URLs) (*Manager, error) {
stor, err := stores.NewLocal(ls)
if err != nil {
return nil, err
@ -59,12 +52,7 @@ func New(ls stores.LocalStorage, si *stores.Index, cfg *sectorbuilder.Config, sc
log.Errorf("Declaring local storage failed: %+v")
}
mid, err := address.IDFromAddress(cfg.Miner)
if err != nil {
return nil, xerrors.Errorf("getting miner id: %w", err)
}
prover, err := sectorbuilder.New(&readonlyProvider{stor: stor, miner: abi.ActorID(mid)}, cfg)
prover, err := sectorbuilder.New(&readonlyProvider{stor: stor}, cfg)
if err != nil {
return nil, xerrors.Errorf("creating prover instance: %w", err)
}
@ -74,7 +62,6 @@ func New(ls stores.LocalStorage, si *stores.Index, cfg *sectorbuilder.Config, sc
&LocalWorker{scfg: cfg, storage: stor},
},
scfg: cfg,
sc: sc,
ls: ls,
storage: stor,
@ -113,11 +100,7 @@ func (m *Manager) SectorSize() abi.SectorSize {
return sz
}
func (m *Manager) NewSector() (abi.SectorNumber, error) {
return m.sc.Next()
}
func (m *Manager) ReadPieceFromSealedSector(context.Context, abi.SectorNumber, sectorbuilder.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error) {
func (m *Manager) ReadPieceFromSealedSector(context.Context, abi.SectorID, sectorbuilder.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error) {
panic("implement me")
}
@ -166,7 +149,12 @@ func (m *Manager) getWorkersByPaths(task sealmgr.TaskType, inPaths []stores.Stor
return workers, paths
}
func (m *Manager) AddPiece(ctx context.Context, sn abi.SectorNumber, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
func (m *Manager) NewSector(ctx context.Context, sector abi.SectorID) error {
log.Warnf("stub NewSector")
return nil
}
func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
// TODO: consider multiple paths vs workers when initially allocating
var best []stores.StorageMeta
@ -174,7 +162,7 @@ func (m *Manager) AddPiece(ctx context.Context, sn abi.SectorNumber, existingPie
if len(existingPieces) == 0 { // new
best, err = m.storage.FindBestAllocStorage(sectorbuilder.FTUnsealed, true)
} else { // append to existing
best, err = m.storage.FindSector(m.minerID(), sn, sectorbuilder.FTUnsealed)
best, err = m.storage.FindSector(sector, sectorbuilder.FTUnsealed)
}
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("finding sector path: %w", err)
@ -188,10 +176,10 @@ func (m *Manager) AddPiece(ctx context.Context, sn abi.SectorNumber, existingPie
// TODO: select(candidateWorkers, ...)
// TODO: remove the sectorbuilder abstraction, pass path directly
return candidateWorkers[0].AddPiece(ctx, sn, existingPieces, sz, r)
return candidateWorkers[0].AddPiece(ctx, sector, existingPieces, sz, r)
}
func (m *Manager) SealPreCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage2.PreCommit1Out, err error) {
func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage2.PreCommit1Out, err error) {
// TODO: also consider where the unsealed data sits
best, err := m.storage.FindBestAllocStorage(sectorbuilder.FTCache|sectorbuilder.FTSealed, true)
@ -203,26 +191,26 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sectorNum abi.SectorNumber
// TODO: select(candidateWorkers, ...)
// TODO: remove the sectorbuilder abstraction, pass path directly
return candidateWorkers[0].SealPreCommit1(ctx, sectorNum, ticket, pieces)
return candidateWorkers[0].SealPreCommit1(ctx, sector, ticket, pieces)
}
func (m *Manager) SealPreCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out storage2.PreCommit1Out) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) {
func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.PreCommit1Out) (cids storage2.SectorCids, err error) {
// TODO: allow workers to fetch the sectors
best, err := m.storage.FindSector(m.minerID(), sectorNum, sectorbuilder.FTCache|sectorbuilder.FTSealed)
best, err := m.storage.FindSector(sector, sectorbuilder.FTCache|sectorbuilder.FTSealed)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("finding path for sector sealing: %w", err)
return storage2.SectorCids{}, xerrors.Errorf("finding path for sector sealing: %w", err)
}
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTPreCommit2, best)
// TODO: select(candidateWorkers, ...)
// TODO: remove the sectorbuilder abstraction, pass path directly
return candidateWorkers[0].SealPreCommit2(ctx, sectorNum, phase1Out)
return candidateWorkers[0].SealPreCommit2(ctx, sector, phase1Out)
}
func (m *Manager) SealCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, sealedCID cid.Cid, unsealedCID cid.Cid) (output storage2.Commit1Out, err error) {
best, err := m.storage.FindSector(m.minerID(), sectorNum, sectorbuilder.FTCache|sectorbuilder.FTSealed)
func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage2.SectorCids) (output storage2.Commit1Out, err error) {
best, err := m.storage.FindSector(sector, sectorbuilder.FTCache|sectorbuilder.FTSealed)
if err != nil {
return nil, xerrors.Errorf("finding path for sector sealing: %w", err)
}
@ -231,10 +219,10 @@ func (m *Manager) SealCommit1(ctx context.Context, sectorNum abi.SectorNumber, t
// TODO: select(candidateWorkers, ...)
// TODO: remove the sectorbuilder abstraction, pass path directly
return candidateWorkers[0].SealCommit1(ctx, sectorNum, ticket, seed, pieces, sealedCID, unsealedCID)
return candidateWorkers[0].SealCommit1(ctx, sector, ticket, seed, pieces, cids)
}
func (m *Manager) SealCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out storage2.Commit1Out) (proof storage2.Proof, err error) {
func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.Commit1Out) (proof storage2.Proof, err error) {
for _, worker := range m.workers {
tt, err := worker.TaskTypes(context.TODO())
if err != nil {
@ -245,14 +233,14 @@ func (m *Manager) SealCommit2(ctx context.Context, sectorNum abi.SectorNumber, p
continue
}
return worker.SealCommit2(ctx, sectorNum, phase1Out)
return worker.SealCommit2(ctx, sector, phase1Out)
}
return nil, xerrors.New("no worker found")
}
func (m *Manager) FinalizeSector(ctx context.Context, sectorNum abi.SectorNumber) error {
best, err := m.storage.FindSector(m.minerID(), sectorNum, sectorbuilder.FTCache|sectorbuilder.FTSealed|sectorbuilder.FTUnsealed)
func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error {
best, err := m.storage.FindSector(sector, sectorbuilder.FTCache|sectorbuilder.FTSealed|sectorbuilder.FTUnsealed)
if err != nil {
return xerrors.Errorf("finding sealed sector: %w", err)
}
@ -260,15 +248,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sectorNum abi.SectorNumber
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTPreCommit2, best) // find last worker with the sector
// TODO: Move the sector to long-term storage
return candidateWorkers[0].FinalizeSector(ctx, sectorNum)
}
func (m *Manager) minerID() abi.ActorID {
mid, err := address.IDFromAddress(m.scfg.Miner)
if err != nil {
panic(err)
}
return abi.ActorID(mid)
return candidateWorkers[0].FinalizeSector(ctx, sector)
}
var _ sealmgr.Manager = &Manager{}

View File

@ -16,7 +16,11 @@ type remote struct {
api.WorkerApi
}
func (r *remote) AddPiece(ctx context.Context, sector abi.SectorNumber, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage2.Data) (abi.PieceInfo, error) {
func (r *remote) NewSector(ctx context.Context, sector abi.SectorID) error {
return xerrors.New("unsupported")
}
func (r *remote) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage2.Data) (abi.PieceInfo, error) {
return abi.PieceInfo{}, xerrors.New("unsupported")
}

View File

@ -11,19 +11,15 @@ import (
)
type readonlyProvider struct {
miner abi.ActorID
stor *stores.Local
}
func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorNumber, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) {
func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) {
if allocate != 0 {
return sectorbuilder.SectorPaths{}, nil, xerrors.New("read-only storage")
}
p, _, done, err := l.stor.AcquireSector(ctx, abi.SectorID{
Miner: l.miner,
Number: id,
}, existing, allocate, sealing)
p, _, done, err := l.stor.AcquireSector(ctx, id, existing, allocate, sealing)
return p, done, err
}

View File

@ -5,14 +5,11 @@ import (
"io"
"sync"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/storedcounter"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/ipfs/go-cid"
)
type LocalWorker struct {
@ -24,7 +21,6 @@ var _ Worker = &LocalWorker{}
// Simple implements a very basic storage manager which has one local worker,
// running one thing locally
type Simple struct {
sc *storedcounter.StoredCounter
maddr address.Address
rateLimiter sync.Mutex
@ -39,82 +35,76 @@ func (s *Simple) SectorSize() abi.SectorSize {
return s.worker.(sszgetter).SectorSize()
}
func NewSimpleManager(sc *storedcounter.StoredCounter, maddr address.Address, sb sectorbuilder.Basic) (*Simple, error) {
func NewSimpleManager(maddr address.Address, sb sectorbuilder.Basic) (*Simple, error) {
w := &LocalWorker{
sb,
}
return &Simple{
sc: sc,
maddr: maddr,
worker: w,
}, nil
}
func (s *Simple) NewSector() (abi.SectorNumber, error) {
n, err := s.sc.Next()
if err != nil {
return 0, xerrors.Errorf("acquire sector number: %w", err)
}
return abi.SectorNumber(n), nil
func (s *Simple) NewSector(ctx context.Context, id abi.SectorID) error {
return s.worker.NewSector(ctx, id)
}
func (s *Simple) AddPiece(ctx context.Context, sectorNum abi.SectorNumber, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r storage.Data) (abi.PieceInfo, error) {
func (s *Simple) AddPiece(ctx context.Context, id abi.SectorID, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r storage.Data) (abi.PieceInfo, error) {
s.rateLimiter.Lock()
defer s.rateLimiter.Unlock()
return s.worker.AddPiece(ctx, sectorNum, existingPieces, sz, r)
return s.worker.AddPiece(ctx, id, existingPieces, sz, r)
}
func (s *Simple) SealPreCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
func (s *Simple) SealPreCommit1(ctx context.Context, id abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
s.rateLimiter.Lock()
defer s.rateLimiter.Unlock()
return s.worker.SealPreCommit1(ctx, sectorNum, ticket, pieces)
return s.worker.SealPreCommit1(ctx, id, ticket, pieces)
}
func (s *Simple) SealPreCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out storage.PreCommit1Out) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) {
func (s *Simple) SealPreCommit2(ctx context.Context, id abi.SectorID, phase1Out storage.PreCommit1Out) (cids storage.SectorCids, err error) {
s.rateLimiter.Lock()
defer s.rateLimiter.Unlock()
return s.worker.SealPreCommit2(ctx, sectorNum, phase1Out)
return s.worker.SealPreCommit2(ctx, id, phase1Out)
}
func (s *Simple) SealCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, sealedCID cid.Cid, unsealedCID cid.Cid) (output storage.Commit1Out, err error) {
func (s *Simple) SealCommit1(ctx context.Context, id abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (output storage.Commit1Out, err error) {
s.rateLimiter.Lock()
defer s.rateLimiter.Unlock()
return s.worker.SealCommit1(ctx, sectorNum, ticket, seed, pieces, sealedCID, unsealedCID)
return s.worker.SealCommit1(ctx, id, ticket, seed, pieces, cids)
}
func (s *Simple) SealCommit2(ctx context.Context, sectorNum abi.SectorNumber, phase1Out storage.Commit1Out) (proof storage.Proof, err error) {
func (s *Simple) SealCommit2(ctx context.Context, id abi.SectorID, phase1Out storage.Commit1Out) (proof storage.Proof, err error) {
s.rateLimiter.Lock()
defer s.rateLimiter.Unlock()
return s.worker.SealCommit2(ctx, sectorNum, phase1Out)
return s.worker.SealCommit2(ctx, id, phase1Out)
}
func (s *Simple) FinalizeSector(ctx context.Context, sectorNum abi.SectorNumber) error {
func (s *Simple) FinalizeSector(ctx context.Context, id abi.SectorID) error {
s.rateLimiter.Lock()
defer s.rateLimiter.Unlock()
return s.worker.FinalizeSector(ctx, sectorNum)
return s.worker.FinalizeSector(ctx, id)
}
func (s *Simple) GenerateEPostCandidates(sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, faults []abi.SectorNumber) ([]storage.PoStCandidateWithTicket, error) {
return s.worker.GenerateEPostCandidates(sectorInfo, challengeSeed, faults)
func (s *Simple) GenerateEPostCandidates(ctx context.Context, miner abi.ActorID, sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, faults []abi.SectorNumber) ([]storage.PoStCandidateWithTicket, error) {
return s.worker.GenerateEPostCandidates(ctx, miner, sectorInfo, challengeSeed, faults)
}
func (s *Simple) GenerateFallbackPoSt(sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, faults []abi.SectorNumber) ([]storage.PoStCandidateWithTicket, []abi.PoStProof, error) {
return s.worker.GenerateFallbackPoSt(sectorInfo, challengeSeed, faults)
func (s *Simple) GenerateFallbackPoSt(ctx context.Context, miner abi.ActorID, sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, faults []abi.SectorNumber) (storage.FallbackPostOut, error) {
return s.worker.GenerateFallbackPoSt(ctx, miner, sectorInfo, challengeSeed, faults)
}
func (s *Simple) ComputeElectionPoSt(sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, winners []abi.PoStCandidate) ([]abi.PoStProof, error) {
return s.worker.ComputeElectionPoSt(sectorInfo, challengeSeed, winners)
func (s *Simple) ComputeElectionPoSt(ctx context.Context, miner abi.ActorID, sectorInfo []abi.SectorInfo, challengeSeed abi.PoStRandomness, winners []abi.PoStCandidate) ([]abi.PoStProof, error) {
return s.worker.ComputeElectionPoSt(ctx, miner, sectorInfo, challengeSeed, winners)
}
func (s *Simple) ReadPieceFromSealedSector(context.Context, abi.SectorNumber, sectorbuilder.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error) {
func (s *Simple) ReadPieceFromSealedSector(context.Context, abi.SectorID, sectorbuilder.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error) {
panic("todo")
}

View File

@ -235,14 +235,11 @@ func (st *Local) FindBestAllocStorage(allocate sectorbuilder.SectorFileType, sea
return out, nil
}
func (st *Local) FindSector(mid abi.ActorID, sn abi.SectorNumber, typ sectorbuilder.SectorFileType) ([]StorageMeta, error) {
func (st *Local) FindSector(id abi.SectorID, typ sectorbuilder.SectorFileType) ([]StorageMeta, error) {
var out []StorageMeta
for _, p := range st.paths {
p.lk.Lock()
t := p.sectors[abi.SectorID{
Miner: mid,
Number: sn,
}]
t := p.sectors[id]
if t|typ == 0 {
continue
}
@ -250,7 +247,7 @@ func (st *Local) FindSector(mid abi.ActorID, sn abi.SectorNumber, typ sectorbuil
out = append(out, p.meta)
}
if len(out) == 0 {
return nil, xerrors.Errorf("sector %s/s-t0%d-%d not found", typ, mid, sn)
return nil, xerrors.Errorf("sector %s/s-t0%d-%d not found", typ, id.Miner, id.Number)
}
return out, nil

View File

@ -19,15 +19,11 @@ type Worker interface {
type Manager interface {
SectorSize() abi.SectorSize
// NewSector allocates staging area for data
// Storage manager forwards proof-related calls
NewSector() (abi.SectorNumber, error)
// TODO: Can[Pre]Commit[1,2]
// TODO: Scrub() []Faults
// TODO: Separate iface
ReadPieceFromSealedSector(context.Context, abi.SectorNumber, sectorbuilder.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error)
ReadPieceFromSealedSector(context.Context, abi.SectorID, sectorbuilder.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (io.ReadCloser, error)
sectorbuilder.Sealer
storage.Prover